fix(ingestion): unify store + watermark + job-complete in single transaction

Previously, drain_resource_events, drain_mr_closes_issues, and
drain_mr_diffs each opened a transaction only for the job-complete +
watermark update, but the store operation ran outside that transaction.
If the process crashed between the store and the watermark update, data
would be persisted without the watermark advancing, causing silent
duplicates on the next sync.

Now each drain function opens the transaction before the store call and
commits it only after both the store and the watermark update succeed.
On error, the transaction is explicitly dropped so the connection is
not left in a half-committed state.

Also:
- store_resource_events no longer manages its own transaction; the caller
  passes in a connection (which is actually the transaction)
- upsert_mr_file_changes wraps DELETE + INSERT in a transaction internally
- reset_discussion_watermarks now also clears diffs_synced_for_updated_at
- Orchestrator error span now includes closes_issues_failed + mr_diffs_failed

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Taylor Eernisse
2026-02-08 14:33:47 -05:00
parent 940a96375a
commit 6e82f723c3
3 changed files with 27 additions and 16 deletions

View File

@@ -380,7 +380,8 @@ fn reset_discussion_watermarks(conn: &Connection, project_id: i64) -> Result<()>
discussions_sync_attempts = 0,
discussions_sync_last_error = NULL,
resource_events_synced_for_updated_at = NULL,
closes_issues_synced_for_updated_at = NULL
closes_issues_synced_for_updated_at = NULL,
diffs_synced_for_updated_at = NULL
WHERE project_id = ?",
[project_id],
)?;

View File

@@ -25,12 +25,14 @@ pub fn upsert_mr_file_changes(
project_id: i64,
diffs: &[GitLabMrDiff],
) -> Result<usize> {
conn.execute(
let tx = conn.unchecked_transaction()?;
tx.execute(
"DELETE FROM mr_file_changes WHERE merge_request_id = ?1",
[mr_local_id],
)?;
let mut stmt = conn.prepare_cached(
let mut stmt = tx.prepare_cached(
"INSERT INTO mr_file_changes (merge_request_id, project_id, old_path, new_path, change_type) \
VALUES (?1, ?2, ?3, ?4, ?5)",
)?;
@@ -54,6 +56,10 @@ pub fn upsert_mr_file_changes(
inserted += 1;
}
// Drop the prepared statement before committing the transaction.
drop(stmt);
tx.commit()?;
if inserted > 0 {
debug!(inserted, mr_local_id, "Stored MR file changes");
}

View File

@@ -516,7 +516,10 @@ pub async fn ingest_project_merge_requests_with_progress(
tracing::Span::current().record("items_processed", result.mrs_upserted);
tracing::Span::current().record("items_skipped", result.mrs_skipped_discussion_sync);
tracing::Span::current().record("errors", result.resource_events_failed);
tracing::Span::current().record(
"errors",
result.resource_events_failed + result.closes_issues_failed + result.mr_diffs_failed,
);
Ok(result)
}
@@ -774,8 +777,9 @@ async fn drain_resource_events(
for p in prefetched {
match p.result {
Ok((state_events, label_events, milestone_events)) => {
let tx = conn.unchecked_transaction()?;
let store_result = store_resource_events(
conn,
&tx,
p.project_id,
&p.entity_type,
p.entity_local_id,
@@ -786,7 +790,6 @@ async fn drain_resource_events(
match store_result {
Ok(()) => {
let tx = conn.unchecked_transaction()?;
complete_job_tx(&tx, p.job_id)?;
update_resource_event_watermark_tx(
&tx,
@@ -797,6 +800,7 @@ async fn drain_resource_events(
result.fetched += 1;
}
Err(e) => {
drop(tx);
warn!(
entity_type = %p.entity_type,
entity_iid = p.entity_iid,
@@ -861,6 +865,7 @@ async fn drain_resource_events(
Ok(result)
}
/// Store resource events using the provided connection (caller manages the transaction).
fn store_resource_events(
conn: &Connection,
project_id: i64,
@@ -870,11 +875,9 @@ fn store_resource_events(
label_events: &[crate::gitlab::types::GitLabLabelEvent],
milestone_events: &[crate::gitlab::types::GitLabMilestoneEvent],
) -> Result<()> {
let tx = conn.unchecked_transaction()?;
if !state_events.is_empty() {
crate::core::events_db::upsert_state_events(
&tx,
conn,
project_id,
entity_type,
entity_local_id,
@@ -884,7 +887,7 @@ fn store_resource_events(
if !label_events.is_empty() {
crate::core::events_db::upsert_label_events(
&tx,
conn,
project_id,
entity_type,
entity_local_id,
@@ -894,7 +897,7 @@ fn store_resource_events(
if !milestone_events.is_empty() {
crate::core::events_db::upsert_milestone_events(
&tx,
conn,
project_id,
entity_type,
entity_local_id,
@@ -902,7 +905,6 @@ fn store_resource_events(
)?;
}
tx.commit()?;
Ok(())
}
@@ -1095,8 +1097,9 @@ async fn drain_mr_closes_issues(
for p in prefetched {
match p.result {
Ok(closes_issues) => {
let tx = conn.unchecked_transaction()?;
let store_result = store_closes_issues_refs(
conn,
&tx,
project_id,
p.entity_local_id,
&closes_issues,
@@ -1104,13 +1107,13 @@ async fn drain_mr_closes_issues(
match store_result {
Ok(()) => {
let tx = conn.unchecked_transaction()?;
complete_job_tx(&tx, p.job_id)?;
update_closes_issues_watermark_tx(&tx, p.entity_local_id)?;
tx.commit()?;
result.fetched += 1;
}
Err(e) => {
drop(tx);
warn!(
entity_iid = p.entity_iid,
error = %e,
@@ -1364,8 +1367,9 @@ async fn drain_mr_diffs(
for p in prefetched {
match p.result {
Ok(diffs) => {
let tx = conn.unchecked_transaction()?;
let store_result = super::mr_diffs::upsert_mr_file_changes(
conn,
&tx,
p.entity_local_id,
project_id,
&diffs,
@@ -1373,13 +1377,13 @@ async fn drain_mr_diffs(
match store_result {
Ok(_) => {
let tx = conn.unchecked_transaction()?;
complete_job_tx(&tx, p.job_id)?;
update_diffs_watermark_tx(&tx, p.entity_local_id)?;
tx.commit()?;
result.fetched += 1;
}
Err(e) => {
drop(tx);
warn!(
entity_iid = p.entity_iid,
error = %e,