fix(ingestion): remove nested transaction in upsert_mr_file_changes

drain_mr_diffs in orchestrator.rs already wraps each MR diff store
in an unchecked_transaction (alongside job completion and watermark
update). upsert_mr_file_changes was also starting its own inner
transaction via conn.unchecked_transaction(), causing every call to
fail with "cannot start a transaction within a transaction".

Remove the inner transaction management from upsert_mr_file_changes
so it operates on whatever Connection (or Transaction deref'd to
Connection) the caller provides. The caller in drain_mr_diffs owns
the transaction boundary. Standalone callers (tests, future direct
use) auto-commit each statement, which is correct for their use case.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Taylor Eernisse
2026-02-09 11:56:15 -05:00
parent 4185abe05d
commit 7d40a81512

View File

@@ -19,20 +19,22 @@ fn derive_change_type(diff: &GitLabMrDiff) -> &'static str {
/// Replace all file change records for a given MR with the provided diffs. /// Replace all file change records for a given MR with the provided diffs.
/// Uses DELETE+INSERT (simpler than UPSERT for array replacement). /// Uses DELETE+INSERT (simpler than UPSERT for array replacement).
///
/// Does NOT manage its own transaction — the caller is responsible for
/// wrapping this in a transaction when atomicity with other operations
/// (job completion, watermark update) is needed.
pub fn upsert_mr_file_changes( pub fn upsert_mr_file_changes(
conn: &Connection, conn: &Connection,
mr_local_id: i64, mr_local_id: i64,
project_id: i64, project_id: i64,
diffs: &[GitLabMrDiff], diffs: &[GitLabMrDiff],
) -> Result<usize> { ) -> Result<usize> {
let tx = conn.unchecked_transaction()?; conn.execute(
tx.execute(
"DELETE FROM mr_file_changes WHERE merge_request_id = ?1", "DELETE FROM mr_file_changes WHERE merge_request_id = ?1",
[mr_local_id], [mr_local_id],
)?; )?;
let mut stmt = tx.prepare_cached( let mut stmt = conn.prepare_cached(
"INSERT INTO mr_file_changes (merge_request_id, project_id, old_path, new_path, change_type) \ "INSERT INTO mr_file_changes (merge_request_id, project_id, old_path, new_path, change_type) \
VALUES (?1, ?2, ?3, ?4, ?5)", VALUES (?1, ?2, ?3, ?4, ?5)",
)?; )?;
@@ -56,10 +58,6 @@ pub fn upsert_mr_file_changes(
inserted += 1; inserted += 1;
} }
// Drop the prepared statement before committing the transaction.
drop(stmt);
tx.commit()?;
if inserted > 0 { if inserted > 0 {
debug!(inserted, mr_local_id, "Stored MR file changes"); debug!(inserted, mr_local_id, "Stored MR file changes");
} }