diff --git a/src/ingestion/merge_requests.rs b/src/ingestion/merge_requests.rs index 136096d..88c7ee5 100644 --- a/src/ingestion/merge_requests.rs +++ b/src/ingestion/merge_requests.rs @@ -380,7 +380,8 @@ fn reset_discussion_watermarks(conn: &Connection, project_id: i64) -> Result<()> discussions_sync_attempts = 0, discussions_sync_last_error = NULL, resource_events_synced_for_updated_at = NULL, - closes_issues_synced_for_updated_at = NULL + closes_issues_synced_for_updated_at = NULL, + diffs_synced_for_updated_at = NULL WHERE project_id = ?", [project_id], )?; diff --git a/src/ingestion/mr_diffs.rs b/src/ingestion/mr_diffs.rs index 90698a7..63d25ab 100644 --- a/src/ingestion/mr_diffs.rs +++ b/src/ingestion/mr_diffs.rs @@ -25,12 +25,14 @@ pub fn upsert_mr_file_changes( project_id: i64, diffs: &[GitLabMrDiff], ) -> Result { - conn.execute( + let tx = conn.unchecked_transaction()?; + + tx.execute( "DELETE FROM mr_file_changes WHERE merge_request_id = ?1", [mr_local_id], )?; - let mut stmt = conn.prepare_cached( + let mut stmt = tx.prepare_cached( "INSERT INTO mr_file_changes (merge_request_id, project_id, old_path, new_path, change_type) \ VALUES (?1, ?2, ?3, ?4, ?5)", )?; @@ -54,6 +56,10 @@ pub fn upsert_mr_file_changes( inserted += 1; } + // Drop the prepared statement before committing the transaction. + drop(stmt); + tx.commit()?; + if inserted > 0 { debug!(inserted, mr_local_id, "Stored MR file changes"); } diff --git a/src/ingestion/orchestrator.rs b/src/ingestion/orchestrator.rs index 13bccbc..d177f92 100644 --- a/src/ingestion/orchestrator.rs +++ b/src/ingestion/orchestrator.rs @@ -516,7 +516,10 @@ pub async fn ingest_project_merge_requests_with_progress( tracing::Span::current().record("items_processed", result.mrs_upserted); tracing::Span::current().record("items_skipped", result.mrs_skipped_discussion_sync); - tracing::Span::current().record("errors", result.resource_events_failed); + tracing::Span::current().record( + "errors", + result.resource_events_failed + result.closes_issues_failed + result.mr_diffs_failed, + ); Ok(result) } @@ -774,8 +777,9 @@ async fn drain_resource_events( for p in prefetched { match p.result { Ok((state_events, label_events, milestone_events)) => { + let tx = conn.unchecked_transaction()?; let store_result = store_resource_events( - conn, + &tx, p.project_id, &p.entity_type, p.entity_local_id, @@ -786,7 +790,6 @@ async fn drain_resource_events( match store_result { Ok(()) => { - let tx = conn.unchecked_transaction()?; complete_job_tx(&tx, p.job_id)?; update_resource_event_watermark_tx( &tx, @@ -797,6 +800,7 @@ async fn drain_resource_events( result.fetched += 1; } Err(e) => { + drop(tx); warn!( entity_type = %p.entity_type, entity_iid = p.entity_iid, @@ -861,6 +865,7 @@ async fn drain_resource_events( Ok(result) } +/// Store resource events using the provided connection (caller manages the transaction). fn store_resource_events( conn: &Connection, project_id: i64, @@ -870,11 +875,9 @@ fn store_resource_events( label_events: &[crate::gitlab::types::GitLabLabelEvent], milestone_events: &[crate::gitlab::types::GitLabMilestoneEvent], ) -> Result<()> { - let tx = conn.unchecked_transaction()?; - if !state_events.is_empty() { crate::core::events_db::upsert_state_events( - &tx, + conn, project_id, entity_type, entity_local_id, @@ -884,7 +887,7 @@ fn store_resource_events( if !label_events.is_empty() { crate::core::events_db::upsert_label_events( - &tx, + conn, project_id, entity_type, entity_local_id, @@ -894,7 +897,7 @@ fn store_resource_events( if !milestone_events.is_empty() { crate::core::events_db::upsert_milestone_events( - &tx, + conn, project_id, entity_type, entity_local_id, @@ -902,7 +905,6 @@ fn store_resource_events( )?; } - tx.commit()?; Ok(()) } @@ -1095,8 +1097,9 @@ async fn drain_mr_closes_issues( for p in prefetched { match p.result { Ok(closes_issues) => { + let tx = conn.unchecked_transaction()?; let store_result = store_closes_issues_refs( - conn, + &tx, project_id, p.entity_local_id, &closes_issues, @@ -1104,13 +1107,13 @@ async fn drain_mr_closes_issues( match store_result { Ok(()) => { - let tx = conn.unchecked_transaction()?; complete_job_tx(&tx, p.job_id)?; update_closes_issues_watermark_tx(&tx, p.entity_local_id)?; tx.commit()?; result.fetched += 1; } Err(e) => { + drop(tx); warn!( entity_iid = p.entity_iid, error = %e, @@ -1364,8 +1367,9 @@ async fn drain_mr_diffs( for p in prefetched { match p.result { Ok(diffs) => { + let tx = conn.unchecked_transaction()?; let store_result = super::mr_diffs::upsert_mr_file_changes( - conn, + &tx, p.entity_local_id, project_id, &diffs, @@ -1373,13 +1377,13 @@ async fn drain_mr_diffs( match store_result { Ok(_) => { - let tx = conn.unchecked_transaction()?; complete_job_tx(&tx, p.job_id)?; update_diffs_watermark_tx(&tx, p.entity_local_id)?; tx.commit()?; result.fetched += 1; } Err(e) => { + drop(tx); warn!( entity_iid = p.entity_iid, error = %e,