From 2d2e470621907a0a356361315f83878b32ac0926 Mon Sep 17 00:00:00 2001 From: teernisse Date: Thu, 26 Feb 2026 11:06:32 -0500 Subject: [PATCH] refactor(orchestrator): consolidate stale lock reclamation and fix edge cases Several improvements to the ingestion orchestrator: 1. Stale lock reclamation consolidation: Previously, reclaim_stale_locks() was called redundantly in multiple drain functions (drain_resource_events, drain_closes_issues, etc.). Now it's called once at sync entry points (ingest_project_issues, ingest_project_mrs) to reduce overhead and DB contention. 2. Fix status_enrichment_mode error values: - "fetched" -> "error" when project path is missing - "fetched" -> "fetch_error" when GraphQL fetch fails These values are used in robot mode JSON output and should accurately reflect the error condition. 3. Add batch_size zero guard: Added .max(1) to batch_size calculation to prevent panic in .chunks() when config.sync.dependent_concurrency is 0. This makes the code defensive against misconfiguration. These changes improve correctness and reduce unnecessary DB operations during sync, particularly beneficial for large projects with many entities. Co-Authored-By: Claude Opus 4.5 --- src/ingestion/orchestrator.rs | 37 ++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/src/ingestion/orchestrator.rs b/src/ingestion/orchestrator.rs index 113c5d7..c04b770 100644 --- a/src/ingestion/orchestrator.rs +++ b/src/ingestion/orchestrator.rs @@ -130,6 +130,12 @@ pub async fn ingest_project_issues_with_progress( progress: Option, signal: &ShutdownSignal, ) -> Result { + // Reclaim stale locks once at entry, not per-drain-function + let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?; + if reclaimed > 0 { + debug!(reclaimed, "Reclaimed stale locks at issue sync start"); + } + let mut result = IngestProjectResult::default(); let emit = |event: ProgressEvent| { if let Some(ref cb) = progress { @@ -176,7 +182,7 @@ pub async fn ingest_project_issues_with_progress( None => { warn!("Cannot enrich statuses: project path not found for project_id={project_id}"); result.status_enrichment_error = Some("project_path_missing".into()); - result.status_enrichment_mode = "fetched".into(); + result.status_enrichment_mode = "error".into(); emit(ProgressEvent::StatusEnrichmentComplete { enriched: 0, cleared: 0, @@ -260,7 +266,7 @@ pub async fn ingest_project_issues_with_progress( Err(e) => { warn!("Status enrichment fetch failed: {e}"); result.status_enrichment_error = Some(e.to_string()); - result.status_enrichment_mode = "fetched".into(); + result.status_enrichment_mode = "fetch_error".into(); emit(ProgressEvent::StatusEnrichmentComplete { enriched: 0, cleared: 0, @@ -460,7 +466,8 @@ async fn sync_discussions_sequential( progress: &Option, signal: &ShutdownSignal, ) -> Result> { - let batch_size = config.sync.dependent_concurrency as usize; + // Guard against batch_size == 0 which would panic in .chunks() + let batch_size = (config.sync.dependent_concurrency as usize).max(1); let total = issues.len(); let mut results = Vec::with_capacity(issues.len()); @@ -531,6 +538,12 @@ pub async fn ingest_project_merge_requests_with_progress( progress: Option, signal: &ShutdownSignal, ) -> Result { + // Reclaim stale locks once at entry, not per-drain-function + let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?; + if reclaimed > 0 { + debug!(reclaimed, "Reclaimed stale locks at MR sync start"); + } + let mut result = IngestMrProjectResult::default(); let emit = |event: ProgressEvent| { if let Some(ref cb) = progress { @@ -766,7 +779,8 @@ async fn sync_mr_discussions_sequential( progress: &Option, signal: &ShutdownSignal, ) -> Result> { - let batch_size = config.sync.dependent_concurrency as usize; + // Guard against batch_size == 0 which would panic in .chunks() + let batch_size = (config.sync.dependent_concurrency as usize).max(1); let total = mrs.len(); let mut results = Vec::with_capacity(mrs.len()); @@ -941,10 +955,7 @@ async fn drain_resource_events( let mut result = DrainResult::default(); let batch_size = config.sync.dependent_concurrency as usize; - let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?; - if reclaimed > 0 { - debug!(reclaimed, "Reclaimed stale resource event locks"); - } + // Note: stale locks are reclaimed once at sync entry point, not here let claimable_counts = count_claimable_jobs(conn, project_id)?; let total_pending = claimable_counts @@ -1263,10 +1274,7 @@ async fn drain_mr_closes_issues( let mut result = DrainResult::default(); let batch_size = config.sync.dependent_concurrency as usize; - let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?; - if reclaimed > 0 { - debug!(reclaimed, "Reclaimed stale mr_closes_issues locks"); - } + // Note: stale locks are reclaimed once at sync entry point, not here let claimable_counts = count_claimable_jobs(conn, project_id)?; let total_pending = claimable_counts @@ -1523,10 +1531,7 @@ async fn drain_mr_diffs( let mut result = DrainResult::default(); let batch_size = config.sync.dependent_concurrency as usize; - let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?; - if reclaimed > 0 { - debug!(reclaimed, "Reclaimed stale mr_diffs locks"); - } + // Note: stale locks are reclaimed once at sync entry point, not here let claimable_counts = count_claimable_jobs(conn, project_id)?; let total_pending = claimable_counts.get("mr_diffs").copied().unwrap_or(0);