refactor(orchestrator): consolidate stale lock reclamation and fix edge cases

Several improvements to the ingestion orchestrator:

1. Stale lock reclamation consolidation:
   Previously, reclaim_stale_locks() was called redundantly in multiple
   drain functions (drain_resource_events, drain_closes_issues, etc.).
   Now it's called once at sync entry points (ingest_project_issues,
   ingest_project_mrs) to reduce overhead and DB contention.

2. Fix status_enrichment_mode error values:
   - "fetched" -> "error" when project path is missing
   - "fetched" -> "fetch_error" when GraphQL fetch fails
   These values are used in robot mode JSON output and should accurately
   reflect the error condition.

3. Add batch_size zero guard:
   Added .max(1) to batch_size calculation to prevent panic in .chunks()
   when config.sync.dependent_concurrency is 0. This makes the code
   defensive against misconfiguration.

These changes improve correctness and reduce unnecessary DB operations
during sync, particularly beneficial for large projects with many entities.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
teernisse
2026-02-26 11:06:32 -05:00
parent 23efb15599
commit 2d2e470621

View File

@@ -130,6 +130,12 @@ pub async fn ingest_project_issues_with_progress(
progress: Option<ProgressCallback>,
signal: &ShutdownSignal,
) -> Result<IngestProjectResult> {
// Reclaim stale locks once at entry, not per-drain-function
let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?;
if reclaimed > 0 {
debug!(reclaimed, "Reclaimed stale locks at issue sync start");
}
let mut result = IngestProjectResult::default();
let emit = |event: ProgressEvent| {
if let Some(ref cb) = progress {
@@ -176,7 +182,7 @@ pub async fn ingest_project_issues_with_progress(
None => {
warn!("Cannot enrich statuses: project path not found for project_id={project_id}");
result.status_enrichment_error = Some("project_path_missing".into());
result.status_enrichment_mode = "fetched".into();
result.status_enrichment_mode = "error".into();
emit(ProgressEvent::StatusEnrichmentComplete {
enriched: 0,
cleared: 0,
@@ -260,7 +266,7 @@ pub async fn ingest_project_issues_with_progress(
Err(e) => {
warn!("Status enrichment fetch failed: {e}");
result.status_enrichment_error = Some(e.to_string());
result.status_enrichment_mode = "fetched".into();
result.status_enrichment_mode = "fetch_error".into();
emit(ProgressEvent::StatusEnrichmentComplete {
enriched: 0,
cleared: 0,
@@ -460,7 +466,8 @@ async fn sync_discussions_sequential(
progress: &Option<ProgressCallback>,
signal: &ShutdownSignal,
) -> Result<Vec<super::discussions::IngestDiscussionsResult>> {
let batch_size = config.sync.dependent_concurrency as usize;
// Guard against batch_size == 0 which would panic in .chunks()
let batch_size = (config.sync.dependent_concurrency as usize).max(1);
let total = issues.len();
let mut results = Vec::with_capacity(issues.len());
@@ -531,6 +538,12 @@ pub async fn ingest_project_merge_requests_with_progress(
progress: Option<ProgressCallback>,
signal: &ShutdownSignal,
) -> Result<IngestMrProjectResult> {
// Reclaim stale locks once at entry, not per-drain-function
let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?;
if reclaimed > 0 {
debug!(reclaimed, "Reclaimed stale locks at MR sync start");
}
let mut result = IngestMrProjectResult::default();
let emit = |event: ProgressEvent| {
if let Some(ref cb) = progress {
@@ -766,7 +779,8 @@ async fn sync_mr_discussions_sequential(
progress: &Option<ProgressCallback>,
signal: &ShutdownSignal,
) -> Result<Vec<super::mr_discussions::IngestMrDiscussionsResult>> {
let batch_size = config.sync.dependent_concurrency as usize;
// Guard against batch_size == 0 which would panic in .chunks()
let batch_size = (config.sync.dependent_concurrency as usize).max(1);
let total = mrs.len();
let mut results = Vec::with_capacity(mrs.len());
@@ -941,10 +955,7 @@ async fn drain_resource_events(
let mut result = DrainResult::default();
let batch_size = config.sync.dependent_concurrency as usize;
let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?;
if reclaimed > 0 {
debug!(reclaimed, "Reclaimed stale resource event locks");
}
// Note: stale locks are reclaimed once at sync entry point, not here
let claimable_counts = count_claimable_jobs(conn, project_id)?;
let total_pending = claimable_counts
@@ -1263,10 +1274,7 @@ async fn drain_mr_closes_issues(
let mut result = DrainResult::default();
let batch_size = config.sync.dependent_concurrency as usize;
let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?;
if reclaimed > 0 {
debug!(reclaimed, "Reclaimed stale mr_closes_issues locks");
}
// Note: stale locks are reclaimed once at sync entry point, not here
let claimable_counts = count_claimable_jobs(conn, project_id)?;
let total_pending = claimable_counts
@@ -1523,10 +1531,7 @@ async fn drain_mr_diffs(
let mut result = DrainResult::default();
let batch_size = config.sync.dependent_concurrency as usize;
let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?;
if reclaimed > 0 {
debug!(reclaimed, "Reclaimed stale mr_diffs locks");
}
// Note: stale locks are reclaimed once at sync entry point, not here
let claimable_counts = count_claimable_jobs(conn, project_id)?;
let total_pending = claimable_counts.get("mr_diffs").copied().unwrap_or(0);