fix(ingestion): Move counter increments after transaction commit

Ingestion counters (discussions_upserted, notes_upserted,
discussions_fetched, diffnotes_count) were incremented before
tx.commit(), meaning a failed commit would report inflated
metrics. Counters now increment only after successful commit
so reported numbers accurately reflect persisted state.

Also simplifies the stale-removal guard in issue discussions:
the received_first_response flag was unnecessary since an empty
seen_discussion_ids list is safe to pass to remove_stale -- if
there were no discussions, stale removal correctly sweeps all
previously-stored discussions. The two separate code paths
(empty vs populated) are collapsed into a single branch.

Derives Default on IngestResult to eliminate verbose zero-init.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Taylor Eernisse
2026-01-29 08:42:11 -05:00
parent 753ff46bb4
commit 8fe5feda7e
3 changed files with 29 additions and 54 deletions

View File

@@ -17,6 +17,7 @@ use crate::ingestion::{
}; };
/// Result of ingest command for display. /// Result of ingest command for display.
#[derive(Default)]
pub struct IngestResult { pub struct IngestResult {
pub resource_type: String, pub resource_type: String,
pub projects_synced: usize, pub projects_synced: usize,
@@ -130,24 +131,7 @@ pub async fn run_ingest(
let mut total = IngestResult { let mut total = IngestResult {
resource_type: resource_type.to_string(), resource_type: resource_type.to_string(),
projects_synced: 0, ..Default::default()
// Issue fields
issues_fetched: 0,
issues_upserted: 0,
issues_synced_discussions: 0,
issues_skipped_discussion_sync: 0,
// MR fields
mrs_fetched: 0,
mrs_upserted: 0,
mrs_synced_discussions: 0,
mrs_skipped_discussion_sync: 0,
assignees_linked: 0,
reviewers_linked: 0,
diffnotes_count: 0,
// Shared fields
labels_created: 0,
discussions_fetched: 0,
notes_upserted: 0,
}; };
let type_label = if resource_type == "issues" { let type_label = if resource_type == "issues" {

View File

@@ -89,16 +89,10 @@ async fn ingest_discussions_for_issue(
// Track discussions we've seen for stale removal // Track discussions we've seen for stale removal
let mut seen_discussion_ids: Vec<String> = Vec::new(); let mut seen_discussion_ids: Vec<String> = Vec::new();
// Track if we've started receiving data (to distinguish empty result from failure)
let mut received_first_response = false;
// Track if any error occurred during pagination // Track if any error occurred during pagination
let mut pagination_error: Option<crate::core::error::GiError> = None; let mut pagination_error: Option<crate::core::error::GiError> = None;
while let Some(disc_result) = discussions_stream.next().await { while let Some(disc_result) = discussions_stream.next().await {
// Mark that we've received at least one response from the API
if !received_first_response {
received_first_response = true;
}
// Handle errors - record but don't delete stale data // Handle errors - record but don't delete stale data
let gitlab_discussion = match disc_result { let gitlab_discussion = match disc_result {
@@ -139,8 +133,6 @@ async fn ingest_discussions_for_issue(
let tx = conn.unchecked_transaction()?; let tx = conn.unchecked_transaction()?;
upsert_discussion(&tx, &normalized, payload_id)?; upsert_discussion(&tx, &normalized, payload_id)?;
result.discussions_upserted += 1;
seen_discussion_ids.push(normalized.gitlab_discussion_id.clone());
// Get local discussion ID // Get local discussion ID
let local_discussion_id: i64 = tx.query_row( let local_discussion_id: i64 = tx.query_row(
@@ -151,6 +143,7 @@ async fn ingest_discussions_for_issue(
// Transform and store notes // Transform and store notes
let notes = transform_notes(&gitlab_discussion, local_project_id); let notes = transform_notes(&gitlab_discussion, local_project_id);
let notes_count = notes.len();
// Delete existing notes for this discussion (full refresh) // Delete existing notes for this discussion (full refresh)
tx.execute( tx.execute(
@@ -178,26 +171,19 @@ async fn ingest_discussions_for_issue(
)?; )?;
insert_note(&tx, local_discussion_id, &note, note_payload_id)?; insert_note(&tx, local_discussion_id, &note, note_payload_id)?;
result.notes_upserted += 1;
} }
tx.commit()?; tx.commit()?;
// Increment counters AFTER successful commit to keep metrics honest
result.discussions_upserted += 1;
result.notes_upserted += notes_count;
seen_discussion_ids.push(normalized.gitlab_discussion_id.clone());
} }
// Only remove stale discussions if pagination completed without errors // Only remove stale discussions and advance watermark if pagination completed
// AND we actually received a response (empty or not) // without errors. Safe for both empty results and populated results.
if pagination_error.is_none() && received_first_response { if pagination_error.is_none() {
let removed = remove_stale_discussions(conn, issue.local_issue_id, &seen_discussion_ids)?;
result.stale_discussions_removed = removed;
// Update discussions_synced_for_updated_at on the issue
update_issue_sync_timestamp(conn, issue.local_issue_id, issue.updated_at)?;
} else if pagination_error.is_none()
&& !received_first_response
&& seen_discussion_ids.is_empty()
{
// Stream was empty but no error - issue genuinely has no discussions
// This is safe to remove stale discussions (if any exist from before)
let removed = remove_stale_discussions(conn, issue.local_issue_id, &seen_discussion_ids)?; let removed = remove_stale_discussions(conn, issue.local_issue_id, &seen_discussion_ids)?;
result.stale_discussions_removed = removed; result.stale_discussions_removed = removed;
@@ -208,7 +194,6 @@ async fn ingest_discussions_for_issue(
discussions_seen = seen_discussion_ids.len(), discussions_seen = seen_discussion_ids.len(),
"Skipping stale removal due to pagination error" "Skipping stale removal due to pagination error"
); );
// Return the error to signal incomplete sync
return Err(err); return Err(err);
} }

View File

@@ -155,14 +155,13 @@ pub fn write_prefetched_mr_discussions(
// Write each discussion // Write each discussion
for disc in &prefetched.discussions { for disc in &prefetched.discussions {
result.discussions_fetched += 1; // Count DiffNotes upfront (independent of transaction)
let diffnotes_in_disc = disc
// Count DiffNotes
result.diffnotes_count += disc
.notes .notes
.iter() .iter()
.filter(|n| n.position_new_path.is_some() || n.position_old_path.is_some()) .filter(|n| n.position_new_path.is_some() || n.position_old_path.is_some())
.count(); .count();
let notes_in_disc = disc.notes.len();
// Start transaction // Start transaction
let tx = conn.unchecked_transaction()?; let tx = conn.unchecked_transaction()?;
@@ -182,7 +181,6 @@ pub fn write_prefetched_mr_discussions(
// Upsert discussion // Upsert discussion
upsert_discussion(&tx, &disc.normalized, run_seen_at, payload_id)?; upsert_discussion(&tx, &disc.normalized, run_seen_at, payload_id)?;
result.discussions_upserted += 1;
// Get local discussion ID // Get local discussion ID
let local_discussion_id: i64 = tx.query_row( let local_discussion_id: i64 = tx.query_row(
@@ -219,10 +217,15 @@ pub fn write_prefetched_mr_discussions(
}; };
upsert_note(&tx, local_discussion_id, note, run_seen_at, note_payload_id)?; upsert_note(&tx, local_discussion_id, note, run_seen_at, note_payload_id)?;
result.notes_upserted += 1;
} }
tx.commit()?; tx.commit()?;
// Increment counters AFTER successful commit to keep metrics honest
result.discussions_fetched += 1;
result.discussions_upserted += 1;
result.notes_upserted += notes_in_disc;
result.diffnotes_count += diffnotes_in_disc;
} }
// Only sweep stale data and advance watermark on full success // Only sweep stale data and advance watermark on full success
@@ -343,8 +346,6 @@ async fn ingest_discussions_for_mr(
break; break;
} }
}; };
result.discussions_fetched += 1;
// CRITICAL: Parse notes BEFORE any destructive DB operations // CRITICAL: Parse notes BEFORE any destructive DB operations
let notes = match transform_notes_with_diff_position(&gitlab_discussion, local_project_id) { let notes = match transform_notes_with_diff_position(&gitlab_discussion, local_project_id) {
Ok(notes) => notes, Ok(notes) => notes,
@@ -361,11 +362,12 @@ async fn ingest_discussions_for_mr(
} }
}; };
// Count DiffNotes // Count DiffNotes upfront (independent of transaction)
result.diffnotes_count += notes let diffnotes_in_disc = notes
.iter() .iter()
.filter(|n| n.position_new_path.is_some() || n.position_old_path.is_some()) .filter(|n| n.position_new_path.is_some() || n.position_old_path.is_some())
.count(); .count();
let notes_count = notes.len();
// Transform discussion // Transform discussion
let normalized_discussion = let normalized_discussion =
@@ -389,7 +391,6 @@ async fn ingest_discussions_for_mr(
// Upsert discussion with run_seen_at // Upsert discussion with run_seen_at
upsert_discussion(&tx, &normalized_discussion, run_seen_at, payload_id)?; upsert_discussion(&tx, &normalized_discussion, run_seen_at, payload_id)?;
result.discussions_upserted += 1;
// Get local discussion ID // Get local discussion ID
let local_discussion_id: i64 = tx.query_row( let local_discussion_id: i64 = tx.query_row(
@@ -433,10 +434,15 @@ async fn ingest_discussions_for_mr(
}; };
upsert_note(&tx, local_discussion_id, note, run_seen_at, note_payload_id)?; upsert_note(&tx, local_discussion_id, note, run_seen_at, note_payload_id)?;
result.notes_upserted += 1;
} }
tx.commit()?; tx.commit()?;
// Increment counters AFTER successful commit to keep metrics honest
result.discussions_fetched += 1;
result.discussions_upserted += 1;
result.notes_upserted += notes_count;
result.diffnotes_count += diffnotes_in_disc;
} }
// Only sweep stale data and advance watermark on full success // Only sweep stale data and advance watermark on full success