From 7d40a81512b36d8841433c72c8d8fd522b32e392 Mon Sep 17 00:00:00 2001 From: Taylor Eernisse Date: Mon, 9 Feb 2026 11:56:15 -0500 Subject: [PATCH] fix(ingestion): remove nested transaction in upsert_mr_file_changes drain_mr_diffs in orchestrator.rs already wraps each MR diff store in an unchecked_transaction (alongside job completion and watermark update). upsert_mr_file_changes was also starting its own inner transaction via conn.unchecked_transaction(), causing every call to fail with "cannot start a transaction within a transaction". Remove the inner transaction management from upsert_mr_file_changes so it operates on whatever Connection (or Transaction deref'd to Connection) the caller provides. The caller in drain_mr_diffs owns the transaction boundary. Standalone callers (tests, future direct use) auto-commit each statement, which is correct for their use case. Co-Authored-By: Claude Opus 4.6 --- src/ingestion/mr_diffs.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/ingestion/mr_diffs.rs b/src/ingestion/mr_diffs.rs index 63d25ab..7643b42 100644 --- a/src/ingestion/mr_diffs.rs +++ b/src/ingestion/mr_diffs.rs @@ -19,20 +19,22 @@ fn derive_change_type(diff: &GitLabMrDiff) -> &'static str { /// Replace all file change records for a given MR with the provided diffs. /// Uses DELETE+INSERT (simpler than UPSERT for array replacement). +/// +/// Does NOT manage its own transaction — the caller is responsible for +/// wrapping this in a transaction when atomicity with other operations +/// (job completion, watermark update) is needed. pub fn upsert_mr_file_changes( conn: &Connection, mr_local_id: i64, project_id: i64, diffs: &[GitLabMrDiff], ) -> Result { - let tx = conn.unchecked_transaction()?; - - tx.execute( + conn.execute( "DELETE FROM mr_file_changes WHERE merge_request_id = ?1", [mr_local_id], )?; - let mut stmt = tx.prepare_cached( + let mut stmt = conn.prepare_cached( "INSERT INTO mr_file_changes (merge_request_id, project_id, old_path, new_path, change_type) \ VALUES (?1, ?2, ?3, ?4, ?5)", )?; @@ -56,10 +58,6 @@ pub fn upsert_mr_file_changes( inserted += 1; } - // Drop the prepared statement before committing the transaction. - drop(stmt); - tx.commit()?; - if inserted > 0 { debug!(inserted, mr_local_id, "Stored MR file changes"); }