diff --git a/src/cli/commands/ingest.rs b/src/cli/commands/ingest.rs index 6b08ee8..849b8d1 100644 --- a/src/cli/commands/ingest.rs +++ b/src/cli/commands/ingest.rs @@ -17,6 +17,7 @@ use crate::ingestion::{ }; /// Result of ingest command for display. +#[derive(Default)] pub struct IngestResult { pub resource_type: String, pub projects_synced: usize, @@ -130,24 +131,7 @@ pub async fn run_ingest( let mut total = IngestResult { resource_type: resource_type.to_string(), - projects_synced: 0, - // Issue fields - issues_fetched: 0, - issues_upserted: 0, - issues_synced_discussions: 0, - issues_skipped_discussion_sync: 0, - // MR fields - mrs_fetched: 0, - mrs_upserted: 0, - mrs_synced_discussions: 0, - mrs_skipped_discussion_sync: 0, - assignees_linked: 0, - reviewers_linked: 0, - diffnotes_count: 0, - // Shared fields - labels_created: 0, - discussions_fetched: 0, - notes_upserted: 0, + ..Default::default() }; let type_label = if resource_type == "issues" { diff --git a/src/ingestion/discussions.rs b/src/ingestion/discussions.rs index 3738c0f..12c6090 100644 --- a/src/ingestion/discussions.rs +++ b/src/ingestion/discussions.rs @@ -89,16 +89,10 @@ async fn ingest_discussions_for_issue( // Track discussions we've seen for stale removal let mut seen_discussion_ids: Vec = Vec::new(); - // Track if we've started receiving data (to distinguish empty result from failure) - let mut received_first_response = false; // Track if any error occurred during pagination let mut pagination_error: Option = None; while let Some(disc_result) = discussions_stream.next().await { - // Mark that we've received at least one response from the API - if !received_first_response { - received_first_response = true; - } // Handle errors - record but don't delete stale data let gitlab_discussion = match disc_result { @@ -139,8 +133,6 @@ async fn ingest_discussions_for_issue( let tx = conn.unchecked_transaction()?; upsert_discussion(&tx, &normalized, payload_id)?; - result.discussions_upserted += 1; - seen_discussion_ids.push(normalized.gitlab_discussion_id.clone()); // Get local discussion ID let local_discussion_id: i64 = tx.query_row( @@ -151,6 +143,7 @@ async fn ingest_discussions_for_issue( // Transform and store notes let notes = transform_notes(&gitlab_discussion, local_project_id); + let notes_count = notes.len(); // Delete existing notes for this discussion (full refresh) tx.execute( @@ -178,26 +171,19 @@ async fn ingest_discussions_for_issue( )?; insert_note(&tx, local_discussion_id, ¬e, note_payload_id)?; - result.notes_upserted += 1; } tx.commit()?; + + // Increment counters AFTER successful commit to keep metrics honest + result.discussions_upserted += 1; + result.notes_upserted += notes_count; + seen_discussion_ids.push(normalized.gitlab_discussion_id.clone()); } - // Only remove stale discussions if pagination completed without errors - // AND we actually received a response (empty or not) - if pagination_error.is_none() && received_first_response { - let removed = remove_stale_discussions(conn, issue.local_issue_id, &seen_discussion_ids)?; - result.stale_discussions_removed = removed; - - // Update discussions_synced_for_updated_at on the issue - update_issue_sync_timestamp(conn, issue.local_issue_id, issue.updated_at)?; - } else if pagination_error.is_none() - && !received_first_response - && seen_discussion_ids.is_empty() - { - // Stream was empty but no error - issue genuinely has no discussions - // This is safe to remove stale discussions (if any exist from before) + // Only remove stale discussions and advance watermark if pagination completed + // without errors. Safe for both empty results and populated results. + if pagination_error.is_none() { let removed = remove_stale_discussions(conn, issue.local_issue_id, &seen_discussion_ids)?; result.stale_discussions_removed = removed; @@ -208,7 +194,6 @@ async fn ingest_discussions_for_issue( discussions_seen = seen_discussion_ids.len(), "Skipping stale removal due to pagination error" ); - // Return the error to signal incomplete sync return Err(err); } diff --git a/src/ingestion/mr_discussions.rs b/src/ingestion/mr_discussions.rs index 35e9d36..9dc34f1 100644 --- a/src/ingestion/mr_discussions.rs +++ b/src/ingestion/mr_discussions.rs @@ -155,14 +155,13 @@ pub fn write_prefetched_mr_discussions( // Write each discussion for disc in &prefetched.discussions { - result.discussions_fetched += 1; - - // Count DiffNotes - result.diffnotes_count += disc + // Count DiffNotes upfront (independent of transaction) + let diffnotes_in_disc = disc .notes .iter() .filter(|n| n.position_new_path.is_some() || n.position_old_path.is_some()) .count(); + let notes_in_disc = disc.notes.len(); // Start transaction let tx = conn.unchecked_transaction()?; @@ -182,7 +181,6 @@ pub fn write_prefetched_mr_discussions( // Upsert discussion upsert_discussion(&tx, &disc.normalized, run_seen_at, payload_id)?; - result.discussions_upserted += 1; // Get local discussion ID let local_discussion_id: i64 = tx.query_row( @@ -219,10 +217,15 @@ pub fn write_prefetched_mr_discussions( }; upsert_note(&tx, local_discussion_id, note, run_seen_at, note_payload_id)?; - result.notes_upserted += 1; } tx.commit()?; + + // Increment counters AFTER successful commit to keep metrics honest + result.discussions_fetched += 1; + result.discussions_upserted += 1; + result.notes_upserted += notes_in_disc; + result.diffnotes_count += diffnotes_in_disc; } // Only sweep stale data and advance watermark on full success @@ -343,8 +346,6 @@ async fn ingest_discussions_for_mr( break; } }; - result.discussions_fetched += 1; - // CRITICAL: Parse notes BEFORE any destructive DB operations let notes = match transform_notes_with_diff_position(&gitlab_discussion, local_project_id) { Ok(notes) => notes, @@ -361,11 +362,12 @@ async fn ingest_discussions_for_mr( } }; - // Count DiffNotes - result.diffnotes_count += notes + // Count DiffNotes upfront (independent of transaction) + let diffnotes_in_disc = notes .iter() .filter(|n| n.position_new_path.is_some() || n.position_old_path.is_some()) .count(); + let notes_count = notes.len(); // Transform discussion let normalized_discussion = @@ -389,7 +391,6 @@ async fn ingest_discussions_for_mr( // Upsert discussion with run_seen_at upsert_discussion(&tx, &normalized_discussion, run_seen_at, payload_id)?; - result.discussions_upserted += 1; // Get local discussion ID let local_discussion_id: i64 = tx.query_row( @@ -433,10 +434,15 @@ async fn ingest_discussions_for_mr( }; upsert_note(&tx, local_discussion_id, note, run_seen_at, note_payload_id)?; - result.notes_upserted += 1; } tx.commit()?; + + // Increment counters AFTER successful commit to keep metrics honest + result.discussions_fetched += 1; + result.discussions_upserted += 1; + result.notes_upserted += notes_count; + result.diffnotes_count += diffnotes_in_disc; } // Only sweep stale data and advance watermark on full success