From f3f3560e0d3c89e8d3deb43fa5fbc0b05073deae Mon Sep 17 00:00:00 2001 From: Taylor Eernisse Date: Fri, 6 Feb 2026 22:42:40 -0500 Subject: [PATCH] fix(ingestion): proper error propagation and transaction safety Three hardening improvements to the ingestion orchestrator: - Replace .unwrap_or(0) with ? on COUNT(*) queries for total_issues and total_mrs. These are simple aggregate queries that should never fail, but if they do (e.g. table missing after failed migration), propagating the error gives an actionable message instead of silently reporting 0 items. - Wrap store_closes_issues_refs in a SAVEPOINT with proper ROLLBACK/RELEASE. Previously, a failure mid-loop (e.g. on the 5th of 10 close-issue references) would leave partial refs committed. Now the entire batch is atomic. - Replace silent catch-all (_ => {}) arms in enqueue_resource_events and update_resource_event_watermark with explicit warnings for unknown entity_type values. Makes debugging easier when new entity types are added but the match arms aren't updated. Co-Authored-By: Claude Opus 4.6 --- src/ingestion/orchestrator.rs | 98 +++++++++++++++++++++-------------- 1 file changed, 58 insertions(+), 40 deletions(-) diff --git a/src/ingestion/orchestrator.rs b/src/ingestion/orchestrator.rs index 33a0b9e..58c7a6a 100644 --- a/src/ingestion/orchestrator.rs +++ b/src/ingestion/orchestrator.rs @@ -131,13 +131,11 @@ pub async fn ingest_project_issues_with_progress( let issues_needing_sync = issue_result.issues_needing_discussion_sync; - let total_issues: i64 = conn - .query_row( - "SELECT COUNT(*) FROM issues WHERE project_id = ?", - [project_id], - |row| row.get(0), - ) - .unwrap_or(0); + let total_issues: i64 = conn.query_row( + "SELECT COUNT(*) FROM issues WHERE project_id = ?", + [project_id], + |row| row.get(0), + )?; let total_issues = total_issues as usize; result.issues_skipped_discussion_sync = total_issues.saturating_sub(issues_needing_sync.len()); @@ -346,13 +344,11 @@ pub async fn ingest_project_merge_requests_with_progress( let mrs_needing_sync = get_mrs_needing_discussion_sync(conn, project_id)?; - let total_mrs: i64 = conn - .query_row( - "SELECT COUNT(*) FROM merge_requests WHERE project_id = ?", - [project_id], - |row| row.get(0), - ) - .unwrap_or(0); + let total_mrs: i64 = conn.query_row( + "SELECT COUNT(*) FROM merge_requests WHERE project_id = ?", + [project_id], + |row| row.get(0), + )?; let total_mrs = total_mrs as usize; result.mrs_skipped_discussion_sync = total_mrs.saturating_sub(mrs_needing_sync.len()); @@ -573,7 +569,12 @@ fn enqueue_resource_events_for_entity_type( [project_id], )?; } - _ => {} + other => { + warn!( + entity_type = other, + "Unknown entity_type in enqueue_resource_events, skipping stale job cleanup" + ); + } } let entities: Vec<(i64, i64)> = match entity_type { @@ -900,7 +901,12 @@ fn update_resource_event_watermark_tx( [entity_local_id], )?; } - _ => {} + other => { + warn!( + entity_type = other, + "Unknown entity_type in watermark update, skipping" + ); + } } Ok(()) } @@ -1138,34 +1144,46 @@ fn store_closes_issues_refs( mr_local_id: i64, closes_issues: &[crate::gitlab::types::GitLabIssueRef], ) -> Result<()> { - for issue_ref in closes_issues { - let target_local_id = resolve_issue_local_id(conn, project_id, issue_ref.iid)?; + conn.execute_batch("SAVEPOINT store_closes_refs")?; + let inner = || -> Result<()> { + for issue_ref in closes_issues { + let target_local_id = resolve_issue_local_id(conn, project_id, issue_ref.iid)?; - let (target_id, target_path, target_iid) = if let Some(local_id) = target_local_id { - (Some(local_id), None, None) - } else { - let path = resolve_project_path(conn, issue_ref.project_id)?; - let fallback = - path.unwrap_or_else(|| format!("gitlab_project:{}", issue_ref.project_id)); - (None, Some(fallback), Some(issue_ref.iid)) - }; + let (target_id, target_path, target_iid) = if let Some(local_id) = target_local_id { + (Some(local_id), None, None) + } else { + let path = resolve_project_path(conn, issue_ref.project_id)?; + let fallback = + path.unwrap_or_else(|| format!("gitlab_project:{}", issue_ref.project_id)); + (None, Some(fallback), Some(issue_ref.iid)) + }; - let ref_ = EntityReference { - project_id, - source_entity_type: "merge_request", - source_entity_id: mr_local_id, - target_entity_type: "issue", - target_entity_id: target_id, - target_project_path: target_path.as_deref(), - target_entity_iid: target_iid, - reference_type: "closes", - source_method: "api", - }; + let ref_ = EntityReference { + project_id, + source_entity_type: "merge_request", + source_entity_id: mr_local_id, + target_entity_type: "issue", + target_entity_id: target_id, + target_project_path: target_path.as_deref(), + target_entity_iid: target_iid, + reference_type: "closes", + source_method: "api", + }; - insert_entity_reference(conn, &ref_)?; + insert_entity_reference(conn, &ref_)?; + } + Ok(()) + }; + match inner() { + Ok(()) => { + conn.execute_batch("RELEASE store_closes_refs")?; + Ok(()) + } + Err(e) => { + let _ = conn.execute_batch("ROLLBACK TO store_closes_refs; RELEASE store_closes_refs"); + Err(e) + } } - - Ok(()) } #[cfg(test)]