fix(ingestion): proper error propagation and transaction safety

Three hardening improvements to the ingestion orchestrator:

- Replace .unwrap_or(0) with ? on COUNT(*) queries for total_issues
  and total_mrs. These are simple aggregate queries that should never
  fail, but if they do (e.g. table missing after failed migration),
  propagating the error gives an actionable message instead of silently
  reporting 0 items.

- Wrap store_closes_issues_refs in a SAVEPOINT with proper
  ROLLBACK/RELEASE. Previously, a failure mid-loop (e.g. on the 5th of
  10 close-issue references) would leave partial refs committed. Now
  the entire batch is atomic.

- Replace silent catch-all (_ => {}) arms in enqueue_resource_events
  and update_resource_event_watermark with explicit warnings for
  unknown entity_type values. Makes debugging easier when new entity
  types are added but the match arms aren't updated.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Taylor Eernisse
2026-02-06 22:42:40 -05:00
parent 2bfa4f1f8c
commit f3f3560e0d

View File

@@ -131,13 +131,11 @@ pub async fn ingest_project_issues_with_progress(
let issues_needing_sync = issue_result.issues_needing_discussion_sync; let issues_needing_sync = issue_result.issues_needing_discussion_sync;
let total_issues: i64 = conn let total_issues: i64 = conn.query_row(
.query_row( "SELECT COUNT(*) FROM issues WHERE project_id = ?",
"SELECT COUNT(*) FROM issues WHERE project_id = ?", [project_id],
[project_id], |row| row.get(0),
|row| row.get(0), )?;
)
.unwrap_or(0);
let total_issues = total_issues as usize; let total_issues = total_issues as usize;
result.issues_skipped_discussion_sync = total_issues.saturating_sub(issues_needing_sync.len()); result.issues_skipped_discussion_sync = total_issues.saturating_sub(issues_needing_sync.len());
@@ -346,13 +344,11 @@ pub async fn ingest_project_merge_requests_with_progress(
let mrs_needing_sync = get_mrs_needing_discussion_sync(conn, project_id)?; let mrs_needing_sync = get_mrs_needing_discussion_sync(conn, project_id)?;
let total_mrs: i64 = conn let total_mrs: i64 = conn.query_row(
.query_row( "SELECT COUNT(*) FROM merge_requests WHERE project_id = ?",
"SELECT COUNT(*) FROM merge_requests WHERE project_id = ?", [project_id],
[project_id], |row| row.get(0),
|row| row.get(0), )?;
)
.unwrap_or(0);
let total_mrs = total_mrs as usize; let total_mrs = total_mrs as usize;
result.mrs_skipped_discussion_sync = total_mrs.saturating_sub(mrs_needing_sync.len()); result.mrs_skipped_discussion_sync = total_mrs.saturating_sub(mrs_needing_sync.len());
@@ -573,7 +569,12 @@ fn enqueue_resource_events_for_entity_type(
[project_id], [project_id],
)?; )?;
} }
_ => {} other => {
warn!(
entity_type = other,
"Unknown entity_type in enqueue_resource_events, skipping stale job cleanup"
);
}
} }
let entities: Vec<(i64, i64)> = match entity_type { let entities: Vec<(i64, i64)> = match entity_type {
@@ -900,7 +901,12 @@ fn update_resource_event_watermark_tx(
[entity_local_id], [entity_local_id],
)?; )?;
} }
_ => {} other => {
warn!(
entity_type = other,
"Unknown entity_type in watermark update, skipping"
);
}
} }
Ok(()) Ok(())
} }
@@ -1138,34 +1144,46 @@ fn store_closes_issues_refs(
mr_local_id: i64, mr_local_id: i64,
closes_issues: &[crate::gitlab::types::GitLabIssueRef], closes_issues: &[crate::gitlab::types::GitLabIssueRef],
) -> Result<()> { ) -> Result<()> {
for issue_ref in closes_issues { conn.execute_batch("SAVEPOINT store_closes_refs")?;
let target_local_id = resolve_issue_local_id(conn, project_id, issue_ref.iid)?; let inner = || -> Result<()> {
for issue_ref in closes_issues {
let target_local_id = resolve_issue_local_id(conn, project_id, issue_ref.iid)?;
let (target_id, target_path, target_iid) = if let Some(local_id) = target_local_id { let (target_id, target_path, target_iid) = if let Some(local_id) = target_local_id {
(Some(local_id), None, None) (Some(local_id), None, None)
} else { } else {
let path = resolve_project_path(conn, issue_ref.project_id)?; let path = resolve_project_path(conn, issue_ref.project_id)?;
let fallback = let fallback =
path.unwrap_or_else(|| format!("gitlab_project:{}", issue_ref.project_id)); path.unwrap_or_else(|| format!("gitlab_project:{}", issue_ref.project_id));
(None, Some(fallback), Some(issue_ref.iid)) (None, Some(fallback), Some(issue_ref.iid))
}; };
let ref_ = EntityReference { let ref_ = EntityReference {
project_id, project_id,
source_entity_type: "merge_request", source_entity_type: "merge_request",
source_entity_id: mr_local_id, source_entity_id: mr_local_id,
target_entity_type: "issue", target_entity_type: "issue",
target_entity_id: target_id, target_entity_id: target_id,
target_project_path: target_path.as_deref(), target_project_path: target_path.as_deref(),
target_entity_iid: target_iid, target_entity_iid: target_iid,
reference_type: "closes", reference_type: "closes",
source_method: "api", source_method: "api",
}; };
insert_entity_reference(conn, &ref_)?; insert_entity_reference(conn, &ref_)?;
}
Ok(())
};
match inner() {
Ok(()) => {
conn.execute_batch("RELEASE store_closes_refs")?;
Ok(())
}
Err(e) => {
let _ = conn.execute_batch("ROLLBACK TO store_closes_refs; RELEASE store_closes_refs");
Err(e)
}
} }
Ok(())
} }
#[cfg(test)] #[cfg(test)]