From 6e82f723c352dc075d77c7a1527eaf1d6673ea11 Mon Sep 17 00:00:00 2001 From: Taylor Eernisse Date: Sun, 8 Feb 2026 14:33:47 -0500 Subject: [PATCH] fix(ingestion): unify store + watermark + job-complete in single transaction Previously, drain_resource_events, drain_mr_closes_issues, and drain_mr_diffs each opened a transaction only for the job-complete + watermark update, but the store operation ran outside that transaction. If the process crashed between the store and the watermark update, data would be persisted without the watermark advancing, causing silent duplicates on the next sync. Now each drain function opens the transaction before the store call and commits it only after both the store and the watermark update succeed. On error, the transaction is explicitly dropped so the connection is not left in a half-committed state. Also: - store_resource_events no longer manages its own transaction; the caller passes in a connection (which is actually the transaction) - upsert_mr_file_changes wraps DELETE + INSERT in a transaction internally - reset_discussion_watermarks now also clears diffs_synced_for_updated_at - Orchestrator error span now includes closes_issues_failed + mr_diffs_failed Co-Authored-By: Claude Opus 4.6 --- src/ingestion/merge_requests.rs | 3 ++- src/ingestion/mr_diffs.rs | 10 ++++++++-- src/ingestion/orchestrator.rs | 30 +++++++++++++++++------------- 3 files changed, 27 insertions(+), 16 deletions(-) diff --git a/src/ingestion/merge_requests.rs b/src/ingestion/merge_requests.rs index 136096d..88c7ee5 100644 --- a/src/ingestion/merge_requests.rs +++ b/src/ingestion/merge_requests.rs @@ -380,7 +380,8 @@ fn reset_discussion_watermarks(conn: &Connection, project_id: i64) -> Result<()> discussions_sync_attempts = 0, discussions_sync_last_error = NULL, resource_events_synced_for_updated_at = NULL, - closes_issues_synced_for_updated_at = NULL + closes_issues_synced_for_updated_at = NULL, + diffs_synced_for_updated_at = NULL WHERE project_id = ?", [project_id], )?; diff --git a/src/ingestion/mr_diffs.rs b/src/ingestion/mr_diffs.rs index 90698a7..63d25ab 100644 --- a/src/ingestion/mr_diffs.rs +++ b/src/ingestion/mr_diffs.rs @@ -25,12 +25,14 @@ pub fn upsert_mr_file_changes( project_id: i64, diffs: &[GitLabMrDiff], ) -> Result { - conn.execute( + let tx = conn.unchecked_transaction()?; + + tx.execute( "DELETE FROM mr_file_changes WHERE merge_request_id = ?1", [mr_local_id], )?; - let mut stmt = conn.prepare_cached( + let mut stmt = tx.prepare_cached( "INSERT INTO mr_file_changes (merge_request_id, project_id, old_path, new_path, change_type) \ VALUES (?1, ?2, ?3, ?4, ?5)", )?; @@ -54,6 +56,10 @@ pub fn upsert_mr_file_changes( inserted += 1; } + // Drop the prepared statement before committing the transaction. + drop(stmt); + tx.commit()?; + if inserted > 0 { debug!(inserted, mr_local_id, "Stored MR file changes"); } diff --git a/src/ingestion/orchestrator.rs b/src/ingestion/orchestrator.rs index 13bccbc..d177f92 100644 --- a/src/ingestion/orchestrator.rs +++ b/src/ingestion/orchestrator.rs @@ -516,7 +516,10 @@ pub async fn ingest_project_merge_requests_with_progress( tracing::Span::current().record("items_processed", result.mrs_upserted); tracing::Span::current().record("items_skipped", result.mrs_skipped_discussion_sync); - tracing::Span::current().record("errors", result.resource_events_failed); + tracing::Span::current().record( + "errors", + result.resource_events_failed + result.closes_issues_failed + result.mr_diffs_failed, + ); Ok(result) } @@ -774,8 +777,9 @@ async fn drain_resource_events( for p in prefetched { match p.result { Ok((state_events, label_events, milestone_events)) => { + let tx = conn.unchecked_transaction()?; let store_result = store_resource_events( - conn, + &tx, p.project_id, &p.entity_type, p.entity_local_id, @@ -786,7 +790,6 @@ async fn drain_resource_events( match store_result { Ok(()) => { - let tx = conn.unchecked_transaction()?; complete_job_tx(&tx, p.job_id)?; update_resource_event_watermark_tx( &tx, @@ -797,6 +800,7 @@ async fn drain_resource_events( result.fetched += 1; } Err(e) => { + drop(tx); warn!( entity_type = %p.entity_type, entity_iid = p.entity_iid, @@ -861,6 +865,7 @@ async fn drain_resource_events( Ok(result) } +/// Store resource events using the provided connection (caller manages the transaction). fn store_resource_events( conn: &Connection, project_id: i64, @@ -870,11 +875,9 @@ fn store_resource_events( label_events: &[crate::gitlab::types::GitLabLabelEvent], milestone_events: &[crate::gitlab::types::GitLabMilestoneEvent], ) -> Result<()> { - let tx = conn.unchecked_transaction()?; - if !state_events.is_empty() { crate::core::events_db::upsert_state_events( - &tx, + conn, project_id, entity_type, entity_local_id, @@ -884,7 +887,7 @@ fn store_resource_events( if !label_events.is_empty() { crate::core::events_db::upsert_label_events( - &tx, + conn, project_id, entity_type, entity_local_id, @@ -894,7 +897,7 @@ fn store_resource_events( if !milestone_events.is_empty() { crate::core::events_db::upsert_milestone_events( - &tx, + conn, project_id, entity_type, entity_local_id, @@ -902,7 +905,6 @@ fn store_resource_events( )?; } - tx.commit()?; Ok(()) } @@ -1095,8 +1097,9 @@ async fn drain_mr_closes_issues( for p in prefetched { match p.result { Ok(closes_issues) => { + let tx = conn.unchecked_transaction()?; let store_result = store_closes_issues_refs( - conn, + &tx, project_id, p.entity_local_id, &closes_issues, @@ -1104,13 +1107,13 @@ async fn drain_mr_closes_issues( match store_result { Ok(()) => { - let tx = conn.unchecked_transaction()?; complete_job_tx(&tx, p.job_id)?; update_closes_issues_watermark_tx(&tx, p.entity_local_id)?; tx.commit()?; result.fetched += 1; } Err(e) => { + drop(tx); warn!( entity_iid = p.entity_iid, error = %e, @@ -1364,8 +1367,9 @@ async fn drain_mr_diffs( for p in prefetched { match p.result { Ok(diffs) => { + let tx = conn.unchecked_transaction()?; let store_result = super::mr_diffs::upsert_mr_file_changes( - conn, + &tx, p.entity_local_id, project_id, &diffs, @@ -1373,13 +1377,13 @@ async fn drain_mr_diffs( match store_result { Ok(_) => { - let tx = conn.unchecked_transaction()?; complete_job_tx(&tx, p.job_id)?; update_diffs_watermark_tx(&tx, p.entity_local_id)?; tx.commit()?; result.fetched += 1; } Err(e) => { + drop(tx); warn!( entity_iid = p.entity_iid, error = %e,