diff --git a/src/ingestion/orchestrator.rs b/src/ingestion/orchestrator.rs index 33a0b9e..58c7a6a 100644 --- a/src/ingestion/orchestrator.rs +++ b/src/ingestion/orchestrator.rs @@ -131,13 +131,11 @@ pub async fn ingest_project_issues_with_progress( let issues_needing_sync = issue_result.issues_needing_discussion_sync; - let total_issues: i64 = conn - .query_row( - "SELECT COUNT(*) FROM issues WHERE project_id = ?", - [project_id], - |row| row.get(0), - ) - .unwrap_or(0); + let total_issues: i64 = conn.query_row( + "SELECT COUNT(*) FROM issues WHERE project_id = ?", + [project_id], + |row| row.get(0), + )?; let total_issues = total_issues as usize; result.issues_skipped_discussion_sync = total_issues.saturating_sub(issues_needing_sync.len()); @@ -346,13 +344,11 @@ pub async fn ingest_project_merge_requests_with_progress( let mrs_needing_sync = get_mrs_needing_discussion_sync(conn, project_id)?; - let total_mrs: i64 = conn - .query_row( - "SELECT COUNT(*) FROM merge_requests WHERE project_id = ?", - [project_id], - |row| row.get(0), - ) - .unwrap_or(0); + let total_mrs: i64 = conn.query_row( + "SELECT COUNT(*) FROM merge_requests WHERE project_id = ?", + [project_id], + |row| row.get(0), + )?; let total_mrs = total_mrs as usize; result.mrs_skipped_discussion_sync = total_mrs.saturating_sub(mrs_needing_sync.len()); @@ -573,7 +569,12 @@ fn enqueue_resource_events_for_entity_type( [project_id], )?; } - _ => {} + other => { + warn!( + entity_type = other, + "Unknown entity_type in enqueue_resource_events, skipping stale job cleanup" + ); + } } let entities: Vec<(i64, i64)> = match entity_type { @@ -900,7 +901,12 @@ fn update_resource_event_watermark_tx( [entity_local_id], )?; } - _ => {} + other => { + warn!( + entity_type = other, + "Unknown entity_type in watermark update, skipping" + ); + } } Ok(()) } @@ -1138,34 +1144,46 @@ fn store_closes_issues_refs( mr_local_id: i64, closes_issues: &[crate::gitlab::types::GitLabIssueRef], ) -> Result<()> { - for issue_ref in closes_issues { - let target_local_id = resolve_issue_local_id(conn, project_id, issue_ref.iid)?; + conn.execute_batch("SAVEPOINT store_closes_refs")?; + let inner = || -> Result<()> { + for issue_ref in closes_issues { + let target_local_id = resolve_issue_local_id(conn, project_id, issue_ref.iid)?; - let (target_id, target_path, target_iid) = if let Some(local_id) = target_local_id { - (Some(local_id), None, None) - } else { - let path = resolve_project_path(conn, issue_ref.project_id)?; - let fallback = - path.unwrap_or_else(|| format!("gitlab_project:{}", issue_ref.project_id)); - (None, Some(fallback), Some(issue_ref.iid)) - }; + let (target_id, target_path, target_iid) = if let Some(local_id) = target_local_id { + (Some(local_id), None, None) + } else { + let path = resolve_project_path(conn, issue_ref.project_id)?; + let fallback = + path.unwrap_or_else(|| format!("gitlab_project:{}", issue_ref.project_id)); + (None, Some(fallback), Some(issue_ref.iid)) + }; - let ref_ = EntityReference { - project_id, - source_entity_type: "merge_request", - source_entity_id: mr_local_id, - target_entity_type: "issue", - target_entity_id: target_id, - target_project_path: target_path.as_deref(), - target_entity_iid: target_iid, - reference_type: "closes", - source_method: "api", - }; + let ref_ = EntityReference { + project_id, + source_entity_type: "merge_request", + source_entity_id: mr_local_id, + target_entity_type: "issue", + target_entity_id: target_id, + target_project_path: target_path.as_deref(), + target_entity_iid: target_iid, + reference_type: "closes", + source_method: "api", + }; - insert_entity_reference(conn, &ref_)?; + insert_entity_reference(conn, &ref_)?; + } + Ok(()) + }; + match inner() { + Ok(()) => { + conn.execute_batch("RELEASE store_closes_refs")?; + Ok(()) + } + Err(e) => { + let _ = conn.execute_batch("ROLLBACK TO store_closes_refs; RELEASE store_closes_refs"); + Err(e) + } } - - Ok(()) } #[cfg(test)]