From bb75a9d2288fca81100ffb328c9d707b82390041 Mon Sep 17 00:00:00 2001 From: Taylor Eernisse Date: Tue, 3 Feb 2026 13:06:35 -0500 Subject: [PATCH] fix(events): Resource events now run on incremental syncs, fix output and progress bar Three bugs fixed: 1. Early return in orchestrator when no discussions needed sync also skipped resource event enqueue+drain. On incremental syncs (the most common case), resource events were never fetched. Restructured to use if/else instead of early return so Step 4 always executes. 2. Ingest command JSON and human-readable output silently dropped resource_events_fetched/failed counts. Added to IngestJsonData and print_ingest_summary. 3. Progress bar reuse after finish_and_clear caused indicatif to silently ignore subsequent set_position/set_length calls. Added reset() call before reconfiguring the bar for resource events. Also removed stale comment referencing "unsafe" that didn't reflect the actual unchecked_transaction approach. Co-Authored-By: Claude Opus 4.5 --- src/cli/commands/ingest.rs | 18 ++++- src/ingestion/orchestrator.rs | 122 ++++++++++++++++------------------ 2 files changed, 76 insertions(+), 64 deletions(-) diff --git a/src/cli/commands/ingest.rs b/src/cli/commands/ingest.rs index 02a0296..b03942a 100644 --- a/src/cli/commands/ingest.rs +++ b/src/cli/commands/ingest.rs @@ -253,8 +253,8 @@ pub async fn run_ingest( disc_bar_clone.finish_and_clear(); } ProgressEvent::ResourceEventsFetchStarted { total } => { + disc_bar_clone.reset(); disc_bar_clone.set_length(total as u64); - disc_bar_clone.set_position(0); disc_bar_clone.set_style( ProgressStyle::default_bar() .template(" {spinner:.blue} Fetching resource events [{bar:30.cyan/dim}] {pos}/{len}") @@ -495,6 +495,8 @@ struct IngestJsonData { labels_created: usize, discussions_fetched: usize, notes_upserted: usize, + resource_events_fetched: usize, + resource_events_failed: usize, } #[derive(Serialize)] @@ -553,6 +555,8 @@ pub fn print_ingest_summary_json(result: &IngestResult) { labels_created: result.labels_created, discussions_fetched: result.discussions_fetched, notes_upserted: result.notes_upserted, + resource_events_fetched: result.resource_events_fetched, + resource_events_failed: result.resource_events_failed, }, }; @@ -613,4 +617,16 @@ pub fn print_ingest_summary(result: &IngestResult) { ); } } + + if result.resource_events_fetched > 0 || result.resource_events_failed > 0 { + println!( + " Resource events: {} fetched{}", + result.resource_events_fetched, + if result.resource_events_failed > 0 { + format!(", {} failed", result.resource_events_failed) + } else { + String::new() + } + ); + } } diff --git a/src/ingestion/orchestrator.rs b/src/ingestion/orchestrator.rs index 74186c7..859b729 100644 --- a/src/ingestion/orchestrator.rs +++ b/src/ingestion/orchestrator.rs @@ -144,40 +144,40 @@ pub async fn ingest_project_issues_with_progress( let total_issues = total_issues as usize; result.issues_skipped_discussion_sync = total_issues.saturating_sub(issues_needing_sync.len()); + // Step 3: Sync discussions for issues that need it if issues_needing_sync.is_empty() { info!("No issues need discussion sync"); - return Ok(result); - } + } else { + info!( + count = issues_needing_sync.len(), + "Starting discussion sync for issues" + ); - info!( - count = issues_needing_sync.len(), - "Starting discussion sync for issues" - ); + emit(ProgressEvent::DiscussionSyncStarted { + total: issues_needing_sync.len(), + }); - emit(ProgressEvent::DiscussionSyncStarted { - total: issues_needing_sync.len(), - }); + // Execute sequential discussion sync (see function doc for why not concurrent) + let discussion_results = sync_discussions_sequential( + conn, + client, + config, + gitlab_project_id, + project_id, + &issues_needing_sync, + &progress, + ) + .await?; - // Step 3: Execute sequential discussion sync (see function doc for why not concurrent) - let discussion_results = sync_discussions_sequential( - conn, - client, - config, - gitlab_project_id, - project_id, - &issues_needing_sync, - &progress, - ) - .await?; + emit(ProgressEvent::DiscussionSyncComplete); - emit(ProgressEvent::DiscussionSyncComplete); - - // Aggregate discussion results - for disc_result in discussion_results { - result.discussions_fetched += disc_result.discussions_fetched; - result.discussions_upserted += disc_result.discussions_upserted; - result.notes_upserted += disc_result.notes_upserted; - result.issues_synced_discussions += 1; + // Aggregate discussion results + for disc_result in discussion_results { + result.discussions_fetched += disc_result.discussions_fetched; + result.discussions_upserted += disc_result.discussions_upserted; + result.notes_upserted += disc_result.notes_upserted; + result.issues_synced_discussions += 1; + } } // Step 4: Enqueue and drain resource events (if enabled) @@ -333,43 +333,43 @@ pub async fn ingest_project_merge_requests_with_progress( let total_mrs = total_mrs as usize; result.mrs_skipped_discussion_sync = total_mrs.saturating_sub(mrs_needing_sync.len()); + // Step 3: Sync discussions for MRs that need it if mrs_needing_sync.is_empty() { info!("No MRs need discussion sync"); - return Ok(result); - } + } else { + info!( + count = mrs_needing_sync.len(), + "Starting discussion sync for MRs" + ); - info!( - count = mrs_needing_sync.len(), - "Starting discussion sync for MRs" - ); + emit(ProgressEvent::MrDiscussionSyncStarted { + total: mrs_needing_sync.len(), + }); - emit(ProgressEvent::MrDiscussionSyncStarted { - total: mrs_needing_sync.len(), - }); + // Execute sequential MR discussion sync + let discussion_results = sync_mr_discussions_sequential( + conn, + client, + config, + gitlab_project_id, + project_id, + &mrs_needing_sync, + &progress, + ) + .await?; - // Step 3: Execute sequential MR discussion sync - let discussion_results = sync_mr_discussions_sequential( - conn, - client, - config, - gitlab_project_id, - project_id, - &mrs_needing_sync, - &progress, - ) - .await?; + emit(ProgressEvent::MrDiscussionSyncComplete); - emit(ProgressEvent::MrDiscussionSyncComplete); - - // Aggregate discussion results - for disc_result in discussion_results { - result.discussions_fetched += disc_result.discussions_fetched; - result.discussions_upserted += disc_result.discussions_upserted; - result.notes_upserted += disc_result.notes_upserted; - result.notes_skipped_bad_timestamp += disc_result.notes_skipped_bad_timestamp; - result.diffnotes_count += disc_result.diffnotes_count; - if disc_result.pagination_succeeded { - result.mrs_synced_discussions += 1; + // Aggregate discussion results + for disc_result in discussion_results { + result.discussions_fetched += disc_result.discussions_fetched; + result.discussions_upserted += disc_result.discussions_upserted; + result.notes_upserted += disc_result.notes_upserted; + result.notes_skipped_bad_timestamp += disc_result.notes_skipped_bad_timestamp; + result.diffnotes_count += disc_result.diffnotes_count; + if disc_result.pagination_succeeded { + result.mrs_synced_discussions += 1; + } } } @@ -563,10 +563,6 @@ async fn drain_resource_events( for job in &jobs { iterations += 1; - // conn is &Connection but upsert functions need &mut Connection. - // We need to use unsafe to get a mutable reference since rusqlite - // operations are internally safe with WAL mode and we're single-threaded. - // Instead, we'll use a savepoint approach via the Connection directly. match client .fetch_all_resource_events(gitlab_project_id, &job.entity_type, job.entity_iid) .await