diff --git a/src/ingestion/orchestrator.rs b/src/ingestion/orchestrator.rs index 113c5d7..c04b770 100644 --- a/src/ingestion/orchestrator.rs +++ b/src/ingestion/orchestrator.rs @@ -130,6 +130,12 @@ pub async fn ingest_project_issues_with_progress( progress: Option, signal: &ShutdownSignal, ) -> Result { + // Reclaim stale locks once at entry, not per-drain-function + let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?; + if reclaimed > 0 { + debug!(reclaimed, "Reclaimed stale locks at issue sync start"); + } + let mut result = IngestProjectResult::default(); let emit = |event: ProgressEvent| { if let Some(ref cb) = progress { @@ -176,7 +182,7 @@ pub async fn ingest_project_issues_with_progress( None => { warn!("Cannot enrich statuses: project path not found for project_id={project_id}"); result.status_enrichment_error = Some("project_path_missing".into()); - result.status_enrichment_mode = "fetched".into(); + result.status_enrichment_mode = "error".into(); emit(ProgressEvent::StatusEnrichmentComplete { enriched: 0, cleared: 0, @@ -260,7 +266,7 @@ pub async fn ingest_project_issues_with_progress( Err(e) => { warn!("Status enrichment fetch failed: {e}"); result.status_enrichment_error = Some(e.to_string()); - result.status_enrichment_mode = "fetched".into(); + result.status_enrichment_mode = "fetch_error".into(); emit(ProgressEvent::StatusEnrichmentComplete { enriched: 0, cleared: 0, @@ -460,7 +466,8 @@ async fn sync_discussions_sequential( progress: &Option, signal: &ShutdownSignal, ) -> Result> { - let batch_size = config.sync.dependent_concurrency as usize; + // Guard against batch_size == 0 which would panic in .chunks() + let batch_size = (config.sync.dependent_concurrency as usize).max(1); let total = issues.len(); let mut results = Vec::with_capacity(issues.len()); @@ -531,6 +538,12 @@ pub async fn ingest_project_merge_requests_with_progress( progress: Option, signal: &ShutdownSignal, ) -> Result { + // Reclaim stale locks once at entry, not per-drain-function + let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?; + if reclaimed > 0 { + debug!(reclaimed, "Reclaimed stale locks at MR sync start"); + } + let mut result = IngestMrProjectResult::default(); let emit = |event: ProgressEvent| { if let Some(ref cb) = progress { @@ -766,7 +779,8 @@ async fn sync_mr_discussions_sequential( progress: &Option, signal: &ShutdownSignal, ) -> Result> { - let batch_size = config.sync.dependent_concurrency as usize; + // Guard against batch_size == 0 which would panic in .chunks() + let batch_size = (config.sync.dependent_concurrency as usize).max(1); let total = mrs.len(); let mut results = Vec::with_capacity(mrs.len()); @@ -941,10 +955,7 @@ async fn drain_resource_events( let mut result = DrainResult::default(); let batch_size = config.sync.dependent_concurrency as usize; - let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?; - if reclaimed > 0 { - debug!(reclaimed, "Reclaimed stale resource event locks"); - } + // Note: stale locks are reclaimed once at sync entry point, not here let claimable_counts = count_claimable_jobs(conn, project_id)?; let total_pending = claimable_counts @@ -1263,10 +1274,7 @@ async fn drain_mr_closes_issues( let mut result = DrainResult::default(); let batch_size = config.sync.dependent_concurrency as usize; - let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?; - if reclaimed > 0 { - debug!(reclaimed, "Reclaimed stale mr_closes_issues locks"); - } + // Note: stale locks are reclaimed once at sync entry point, not here let claimable_counts = count_claimable_jobs(conn, project_id)?; let total_pending = claimable_counts @@ -1523,10 +1531,7 @@ async fn drain_mr_diffs( let mut result = DrainResult::default(); let batch_size = config.sync.dependent_concurrency as usize; - let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?; - if reclaimed > 0 { - debug!(reclaimed, "Reclaimed stale mr_diffs locks"); - } + // Note: stale locks are reclaimed once at sync entry point, not here let claimable_counts = count_claimable_jobs(conn, project_id)?; let total_pending = claimable_counts.get("mr_diffs").copied().unwrap_or(0);