feat(ingestion): Implement cursor-based incremental sync from GitLab
Provides efficient data synchronization with minimal API calls. src/ingestion/issues.rs - Issue sync logic: - Cursor-based incremental sync using updated_at timestamp - Fetches only issues modified since last sync - Configurable cursor rewind for overlap safety (default 2s) - Batched database writes with transaction wrapping - Upserts issues, labels, milestones, and assignees - Maintains issue_labels and issue_assignees junction tables - Returns IngestIssuesResult with counts and issues needing discussion sync - Identifies issues where discussion count changed src/ingestion/discussions.rs - Discussion sync logic: - Fetches discussions for issues that need sync - Compares discussion count vs stored to detect changes - Batched note insertion with raw payload preservation - Updates discussion metadata (resolved state, note counts) - Tracks sync state per discussion to enable incremental updates - Returns IngestDiscussionsResult with fetched/skipped counts src/ingestion/orchestrator.rs - Sync coordination: - Two-phase sync: issues first, then discussions - Progress callback support for CLI progress bars - ProgressEvent enum for fine-grained status updates: - IssueFetch, IssueProcess, DiscussionFetch, DiscussionSkip - Acquires sync lock before starting - Updates sync watermark on successful completion - Handles partial failures gracefully (watermark not updated) - Returns IngestProjectResult with detailed statistics The architecture supports future additions: - Merge request ingestion (parallel to issues) - Full-text search indexing hooks - Vector embedding pipeline integration Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
374
src/ingestion/discussions.rs
Normal file
374
src/ingestion/discussions.rs
Normal file
@@ -0,0 +1,374 @@
|
||||
//! Discussion ingestion with full-refresh strategy.
|
||||
//!
|
||||
//! Fetches discussions for an issue and stores them locally with:
|
||||
//! - Raw payload storage with deduplication
|
||||
//! - Full discussion and note replacement per issue
|
||||
//! - Sync timestamp tracking per issue
|
||||
//! - Safe stale removal only after successful pagination
|
||||
|
||||
use futures::StreamExt;
|
||||
use rusqlite::Connection;
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
use crate::Config;
|
||||
use crate::core::error::Result;
|
||||
use crate::core::payloads::{StorePayloadOptions, store_payload};
|
||||
use crate::gitlab::GitLabClient;
|
||||
use crate::gitlab::transformers::{transform_discussion, transform_notes};
|
||||
|
||||
use super::issues::IssueForDiscussionSync;
|
||||
|
||||
/// Result of discussion ingestion for a single issue.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct IngestDiscussionsResult {
|
||||
pub discussions_fetched: usize,
|
||||
pub discussions_upserted: usize,
|
||||
pub notes_upserted: usize,
|
||||
pub stale_discussions_removed: usize,
|
||||
}
|
||||
|
||||
/// Ingest discussions for a list of issues that need sync.
|
||||
pub async fn ingest_issue_discussions(
|
||||
conn: &Connection,
|
||||
client: &GitLabClient,
|
||||
config: &Config,
|
||||
gitlab_project_id: i64,
|
||||
local_project_id: i64,
|
||||
issues: &[IssueForDiscussionSync],
|
||||
) -> Result<IngestDiscussionsResult> {
|
||||
let mut total_result = IngestDiscussionsResult::default();
|
||||
|
||||
for issue in issues {
|
||||
let result = ingest_discussions_for_issue(
|
||||
conn,
|
||||
client,
|
||||
config,
|
||||
gitlab_project_id,
|
||||
local_project_id,
|
||||
issue,
|
||||
)
|
||||
.await?;
|
||||
|
||||
total_result.discussions_fetched += result.discussions_fetched;
|
||||
total_result.discussions_upserted += result.discussions_upserted;
|
||||
total_result.notes_upserted += result.notes_upserted;
|
||||
total_result.stale_discussions_removed += result.stale_discussions_removed;
|
||||
}
|
||||
|
||||
info!(
|
||||
issues_processed = issues.len(),
|
||||
discussions_fetched = total_result.discussions_fetched,
|
||||
discussions_upserted = total_result.discussions_upserted,
|
||||
notes_upserted = total_result.notes_upserted,
|
||||
stale_removed = total_result.stale_discussions_removed,
|
||||
"Discussion ingestion complete"
|
||||
);
|
||||
|
||||
Ok(total_result)
|
||||
}
|
||||
|
||||
/// Ingest discussions for a single issue.
|
||||
async fn ingest_discussions_for_issue(
|
||||
conn: &Connection,
|
||||
client: &GitLabClient,
|
||||
config: &Config,
|
||||
gitlab_project_id: i64,
|
||||
local_project_id: i64,
|
||||
issue: &IssueForDiscussionSync,
|
||||
) -> Result<IngestDiscussionsResult> {
|
||||
let mut result = IngestDiscussionsResult::default();
|
||||
|
||||
debug!(
|
||||
issue_iid = issue.iid,
|
||||
local_issue_id = issue.local_issue_id,
|
||||
"Fetching discussions for issue"
|
||||
);
|
||||
|
||||
// Stream discussions from GitLab
|
||||
let mut discussions_stream = client.paginate_issue_discussions(gitlab_project_id, issue.iid);
|
||||
|
||||
// Track discussions we've seen for stale removal
|
||||
let mut seen_discussion_ids: Vec<String> = Vec::new();
|
||||
// Track if we've started receiving data (to distinguish empty result from failure)
|
||||
let mut received_first_response = false;
|
||||
// Track if any error occurred during pagination
|
||||
let mut pagination_error: Option<crate::core::error::GiError> = None;
|
||||
|
||||
while let Some(disc_result) = discussions_stream.next().await {
|
||||
// Mark that we've received at least one response from the API
|
||||
if !received_first_response {
|
||||
received_first_response = true;
|
||||
}
|
||||
|
||||
// Handle errors - record but don't delete stale data
|
||||
let gitlab_discussion = match disc_result {
|
||||
Ok(d) => d,
|
||||
Err(e) => {
|
||||
warn!(
|
||||
issue_iid = issue.iid,
|
||||
error = %e,
|
||||
"Error during discussion pagination, skipping stale removal"
|
||||
);
|
||||
pagination_error = Some(e);
|
||||
break;
|
||||
}
|
||||
};
|
||||
result.discussions_fetched += 1;
|
||||
|
||||
// Store raw payload
|
||||
let payload_json = serde_json::to_value(&gitlab_discussion)?;
|
||||
let payload_id = store_payload(
|
||||
conn,
|
||||
StorePayloadOptions {
|
||||
project_id: Some(local_project_id),
|
||||
resource_type: "discussion",
|
||||
gitlab_id: &gitlab_discussion.id,
|
||||
payload: &payload_json,
|
||||
compress: config.storage.compress_raw_payloads,
|
||||
},
|
||||
)?;
|
||||
|
||||
// Transform and store discussion
|
||||
let normalized =
|
||||
transform_discussion(&gitlab_discussion, local_project_id, issue.local_issue_id);
|
||||
|
||||
// Wrap all discussion+notes operations in a transaction for atomicity
|
||||
let tx = conn.unchecked_transaction()?;
|
||||
|
||||
upsert_discussion(&tx, &normalized, payload_id)?;
|
||||
result.discussions_upserted += 1;
|
||||
seen_discussion_ids.push(normalized.gitlab_discussion_id.clone());
|
||||
|
||||
// Get local discussion ID
|
||||
let local_discussion_id: i64 = tx.query_row(
|
||||
"SELECT id FROM discussions WHERE project_id = ? AND gitlab_discussion_id = ?",
|
||||
(local_project_id, &normalized.gitlab_discussion_id),
|
||||
|row| row.get(0),
|
||||
)?;
|
||||
|
||||
// Transform and store notes
|
||||
let notes = transform_notes(&gitlab_discussion, local_project_id);
|
||||
|
||||
// Delete existing notes for this discussion (full refresh)
|
||||
tx.execute(
|
||||
"DELETE FROM notes WHERE discussion_id = ?",
|
||||
[local_discussion_id],
|
||||
)?;
|
||||
|
||||
for note in notes {
|
||||
// Store raw note payload
|
||||
let note_payload_json = serde_json::to_value(
|
||||
gitlab_discussion
|
||||
.notes
|
||||
.iter()
|
||||
.find(|n| n.id == note.gitlab_id),
|
||||
)?;
|
||||
let note_payload_id = store_payload(
|
||||
&tx,
|
||||
StorePayloadOptions {
|
||||
project_id: Some(local_project_id),
|
||||
resource_type: "note",
|
||||
gitlab_id: ¬e.gitlab_id.to_string(),
|
||||
payload: ¬e_payload_json,
|
||||
compress: config.storage.compress_raw_payloads,
|
||||
},
|
||||
)?;
|
||||
|
||||
insert_note(&tx, local_discussion_id, ¬e, note_payload_id)?;
|
||||
result.notes_upserted += 1;
|
||||
}
|
||||
|
||||
tx.commit()?;
|
||||
}
|
||||
|
||||
// Only remove stale discussions if pagination completed without errors
|
||||
// AND we actually received a response (empty or not)
|
||||
if pagination_error.is_none() && received_first_response {
|
||||
let removed = remove_stale_discussions(conn, issue.local_issue_id, &seen_discussion_ids)?;
|
||||
result.stale_discussions_removed = removed;
|
||||
|
||||
// Update discussions_synced_for_updated_at on the issue
|
||||
update_issue_sync_timestamp(conn, issue.local_issue_id, issue.updated_at)?;
|
||||
} else if pagination_error.is_none() && !received_first_response && seen_discussion_ids.is_empty() {
|
||||
// Stream was empty but no error - issue genuinely has no discussions
|
||||
// This is safe to remove stale discussions (if any exist from before)
|
||||
let removed = remove_stale_discussions(conn, issue.local_issue_id, &seen_discussion_ids)?;
|
||||
result.stale_discussions_removed = removed;
|
||||
|
||||
update_issue_sync_timestamp(conn, issue.local_issue_id, issue.updated_at)?;
|
||||
} else if pagination_error.is_some() {
|
||||
warn!(
|
||||
issue_iid = issue.iid,
|
||||
discussions_seen = seen_discussion_ids.len(),
|
||||
"Skipping stale removal due to pagination error"
|
||||
);
|
||||
// Return the error to signal incomplete sync
|
||||
return Err(pagination_error.unwrap());
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Upsert a discussion.
|
||||
fn upsert_discussion(
|
||||
conn: &Connection,
|
||||
discussion: &crate::gitlab::transformers::NormalizedDiscussion,
|
||||
payload_id: i64,
|
||||
) -> Result<()> {
|
||||
conn.execute(
|
||||
"INSERT INTO discussions (
|
||||
gitlab_discussion_id, project_id, issue_id, noteable_type,
|
||||
individual_note, first_note_at, last_note_at, last_seen_at,
|
||||
resolvable, resolved, raw_payload_id
|
||||
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
|
||||
ON CONFLICT(project_id, gitlab_discussion_id) DO UPDATE SET
|
||||
first_note_at = excluded.first_note_at,
|
||||
last_note_at = excluded.last_note_at,
|
||||
last_seen_at = excluded.last_seen_at,
|
||||
resolvable = excluded.resolvable,
|
||||
resolved = excluded.resolved,
|
||||
raw_payload_id = excluded.raw_payload_id",
|
||||
(
|
||||
&discussion.gitlab_discussion_id,
|
||||
discussion.project_id,
|
||||
discussion.issue_id,
|
||||
&discussion.noteable_type,
|
||||
discussion.individual_note,
|
||||
discussion.first_note_at,
|
||||
discussion.last_note_at,
|
||||
discussion.last_seen_at,
|
||||
discussion.resolvable,
|
||||
discussion.resolved,
|
||||
payload_id,
|
||||
),
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Insert a note.
|
||||
fn insert_note(
|
||||
conn: &Connection,
|
||||
discussion_id: i64,
|
||||
note: &crate::gitlab::transformers::NormalizedNote,
|
||||
payload_id: i64,
|
||||
) -> Result<()> {
|
||||
conn.execute(
|
||||
"INSERT INTO notes (
|
||||
gitlab_id, discussion_id, project_id, note_type, is_system,
|
||||
author_username, body, created_at, updated_at, last_seen_at,
|
||||
position, resolvable, resolved, resolved_by, resolved_at, raw_payload_id
|
||||
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
|
||||
(
|
||||
note.gitlab_id,
|
||||
discussion_id,
|
||||
note.project_id,
|
||||
¬e.note_type,
|
||||
note.is_system,
|
||||
¬e.author_username,
|
||||
¬e.body,
|
||||
note.created_at,
|
||||
note.updated_at,
|
||||
note.last_seen_at,
|
||||
note.position,
|
||||
note.resolvable,
|
||||
note.resolved,
|
||||
¬e.resolved_by,
|
||||
note.resolved_at,
|
||||
payload_id,
|
||||
),
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Remove discussions that were not seen in this fetch (stale removal).
|
||||
/// Chunks large sets to avoid SQL query size limits.
|
||||
fn remove_stale_discussions(
|
||||
conn: &Connection,
|
||||
issue_id: i64,
|
||||
seen_ids: &[String],
|
||||
) -> Result<usize> {
|
||||
if seen_ids.is_empty() {
|
||||
// No discussions seen - remove all for this issue
|
||||
let deleted = conn.execute("DELETE FROM discussions WHERE issue_id = ?", [issue_id])?;
|
||||
return Ok(deleted);
|
||||
}
|
||||
|
||||
// SQLite has a limit of 999 variables per query by default
|
||||
// Chunk the seen_ids to stay well under this limit
|
||||
const CHUNK_SIZE: usize = 500;
|
||||
|
||||
// For safety, use a temp table approach for large sets
|
||||
let total_deleted = if seen_ids.len() > CHUNK_SIZE {
|
||||
// Create temp table for seen IDs
|
||||
conn.execute(
|
||||
"CREATE TEMP TABLE IF NOT EXISTS _temp_seen_discussions (id TEXT PRIMARY KEY)",
|
||||
[],
|
||||
)?;
|
||||
|
||||
// Clear any previous data
|
||||
conn.execute("DELETE FROM _temp_seen_discussions", [])?;
|
||||
|
||||
// Insert seen IDs in chunks
|
||||
for chunk in seen_ids.chunks(CHUNK_SIZE) {
|
||||
let placeholders: Vec<&str> = chunk.iter().map(|_| "(?)").collect();
|
||||
let sql = format!(
|
||||
"INSERT OR IGNORE INTO _temp_seen_discussions (id) VALUES {}",
|
||||
placeholders.join(", ")
|
||||
);
|
||||
|
||||
let params: Vec<&dyn rusqlite::ToSql> = chunk.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
|
||||
conn.execute(&sql, params.as_slice())?;
|
||||
}
|
||||
|
||||
// Delete discussions not in temp table
|
||||
let deleted = conn.execute(
|
||||
"DELETE FROM discussions
|
||||
WHERE issue_id = ?1
|
||||
AND gitlab_discussion_id NOT IN (SELECT id FROM _temp_seen_discussions)",
|
||||
[issue_id],
|
||||
)?;
|
||||
|
||||
// Clean up temp table
|
||||
conn.execute("DROP TABLE IF EXISTS _temp_seen_discussions", [])?;
|
||||
deleted
|
||||
} else {
|
||||
// Small set - use simple IN clause
|
||||
let placeholders: Vec<&str> = seen_ids.iter().map(|_| "?").collect();
|
||||
let sql = format!(
|
||||
"DELETE FROM discussions WHERE issue_id = ?1 AND gitlab_discussion_id NOT IN ({})",
|
||||
placeholders.join(", ")
|
||||
);
|
||||
|
||||
let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(issue_id)];
|
||||
for id in seen_ids {
|
||||
params.push(Box::new(id.clone()));
|
||||
}
|
||||
|
||||
let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
|
||||
conn.execute(&sql, param_refs.as_slice())?
|
||||
};
|
||||
|
||||
Ok(total_deleted)
|
||||
}
|
||||
|
||||
/// Update the discussions_synced_for_updated_at timestamp on an issue.
|
||||
fn update_issue_sync_timestamp(conn: &Connection, issue_id: i64, updated_at: i64) -> Result<()> {
|
||||
conn.execute(
|
||||
"UPDATE issues SET discussions_synced_for_updated_at = ? WHERE id = ?",
|
||||
(updated_at, issue_id),
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn result_default_has_zero_counts() {
|
||||
let result = IngestDiscussionsResult::default();
|
||||
assert_eq!(result.discussions_fetched, 0);
|
||||
assert_eq!(result.discussions_upserted, 0);
|
||||
assert_eq!(result.notes_upserted, 0);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user