fix(events): Resource events now run on incremental syncs, fix output and progress bar

Three bugs fixed:

1. Early return in orchestrator when no discussions needed sync also
   skipped resource event enqueue+drain. On incremental syncs (the most
   common case), resource events were never fetched. Restructured to use
   if/else instead of early return so Step 4 always executes.

2. Ingest command JSON and human-readable output silently dropped
   resource_events_fetched/failed counts. Added to IngestJsonData and
   print_ingest_summary.

3. Progress bar reuse after finish_and_clear caused indicatif to silently
   ignore subsequent set_position/set_length calls. Added reset() call
   before reconfiguring the bar for resource events.

Also removed stale comment referencing "unsafe" that didn't reflect
the actual unchecked_transaction approach.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Taylor Eernisse
2026-02-03 13:06:35 -05:00
parent 2bcd8db0e9
commit bb75a9d228
2 changed files with 76 additions and 64 deletions

View File

@@ -253,8 +253,8 @@ pub async fn run_ingest(
disc_bar_clone.finish_and_clear(); disc_bar_clone.finish_and_clear();
} }
ProgressEvent::ResourceEventsFetchStarted { total } => { ProgressEvent::ResourceEventsFetchStarted { total } => {
disc_bar_clone.reset();
disc_bar_clone.set_length(total as u64); disc_bar_clone.set_length(total as u64);
disc_bar_clone.set_position(0);
disc_bar_clone.set_style( disc_bar_clone.set_style(
ProgressStyle::default_bar() ProgressStyle::default_bar()
.template(" {spinner:.blue} Fetching resource events [{bar:30.cyan/dim}] {pos}/{len}") .template(" {spinner:.blue} Fetching resource events [{bar:30.cyan/dim}] {pos}/{len}")
@@ -495,6 +495,8 @@ struct IngestJsonData {
labels_created: usize, labels_created: usize,
discussions_fetched: usize, discussions_fetched: usize,
notes_upserted: usize, notes_upserted: usize,
resource_events_fetched: usize,
resource_events_failed: usize,
} }
#[derive(Serialize)] #[derive(Serialize)]
@@ -553,6 +555,8 @@ pub fn print_ingest_summary_json(result: &IngestResult) {
labels_created: result.labels_created, labels_created: result.labels_created,
discussions_fetched: result.discussions_fetched, discussions_fetched: result.discussions_fetched,
notes_upserted: result.notes_upserted, notes_upserted: result.notes_upserted,
resource_events_fetched: result.resource_events_fetched,
resource_events_failed: result.resource_events_failed,
}, },
}; };
@@ -613,4 +617,16 @@ pub fn print_ingest_summary(result: &IngestResult) {
); );
} }
} }
if result.resource_events_fetched > 0 || result.resource_events_failed > 0 {
println!(
" Resource events: {} fetched{}",
result.resource_events_fetched,
if result.resource_events_failed > 0 {
format!(", {} failed", result.resource_events_failed)
} else {
String::new()
}
);
}
} }

View File

@@ -144,40 +144,40 @@ pub async fn ingest_project_issues_with_progress(
let total_issues = total_issues as usize; let total_issues = total_issues as usize;
result.issues_skipped_discussion_sync = total_issues.saturating_sub(issues_needing_sync.len()); result.issues_skipped_discussion_sync = total_issues.saturating_sub(issues_needing_sync.len());
// Step 3: Sync discussions for issues that need it
if issues_needing_sync.is_empty() { if issues_needing_sync.is_empty() {
info!("No issues need discussion sync"); info!("No issues need discussion sync");
return Ok(result); } else {
} info!(
count = issues_needing_sync.len(),
"Starting discussion sync for issues"
);
info!( emit(ProgressEvent::DiscussionSyncStarted {
count = issues_needing_sync.len(), total: issues_needing_sync.len(),
"Starting discussion sync for issues" });
);
emit(ProgressEvent::DiscussionSyncStarted { // Execute sequential discussion sync (see function doc for why not concurrent)
total: issues_needing_sync.len(), let discussion_results = sync_discussions_sequential(
}); conn,
client,
config,
gitlab_project_id,
project_id,
&issues_needing_sync,
&progress,
)
.await?;
// Step 3: Execute sequential discussion sync (see function doc for why not concurrent) emit(ProgressEvent::DiscussionSyncComplete);
let discussion_results = sync_discussions_sequential(
conn,
client,
config,
gitlab_project_id,
project_id,
&issues_needing_sync,
&progress,
)
.await?;
emit(ProgressEvent::DiscussionSyncComplete); // Aggregate discussion results
for disc_result in discussion_results {
// Aggregate discussion results result.discussions_fetched += disc_result.discussions_fetched;
for disc_result in discussion_results { result.discussions_upserted += disc_result.discussions_upserted;
result.discussions_fetched += disc_result.discussions_fetched; result.notes_upserted += disc_result.notes_upserted;
result.discussions_upserted += disc_result.discussions_upserted; result.issues_synced_discussions += 1;
result.notes_upserted += disc_result.notes_upserted; }
result.issues_synced_discussions += 1;
} }
// Step 4: Enqueue and drain resource events (if enabled) // Step 4: Enqueue and drain resource events (if enabled)
@@ -333,43 +333,43 @@ pub async fn ingest_project_merge_requests_with_progress(
let total_mrs = total_mrs as usize; let total_mrs = total_mrs as usize;
result.mrs_skipped_discussion_sync = total_mrs.saturating_sub(mrs_needing_sync.len()); result.mrs_skipped_discussion_sync = total_mrs.saturating_sub(mrs_needing_sync.len());
// Step 3: Sync discussions for MRs that need it
if mrs_needing_sync.is_empty() { if mrs_needing_sync.is_empty() {
info!("No MRs need discussion sync"); info!("No MRs need discussion sync");
return Ok(result); } else {
} info!(
count = mrs_needing_sync.len(),
"Starting discussion sync for MRs"
);
info!( emit(ProgressEvent::MrDiscussionSyncStarted {
count = mrs_needing_sync.len(), total: mrs_needing_sync.len(),
"Starting discussion sync for MRs" });
);
emit(ProgressEvent::MrDiscussionSyncStarted { // Execute sequential MR discussion sync
total: mrs_needing_sync.len(), let discussion_results = sync_mr_discussions_sequential(
}); conn,
client,
config,
gitlab_project_id,
project_id,
&mrs_needing_sync,
&progress,
)
.await?;
// Step 3: Execute sequential MR discussion sync emit(ProgressEvent::MrDiscussionSyncComplete);
let discussion_results = sync_mr_discussions_sequential(
conn,
client,
config,
gitlab_project_id,
project_id,
&mrs_needing_sync,
&progress,
)
.await?;
emit(ProgressEvent::MrDiscussionSyncComplete); // Aggregate discussion results
for disc_result in discussion_results {
// Aggregate discussion results result.discussions_fetched += disc_result.discussions_fetched;
for disc_result in discussion_results { result.discussions_upserted += disc_result.discussions_upserted;
result.discussions_fetched += disc_result.discussions_fetched; result.notes_upserted += disc_result.notes_upserted;
result.discussions_upserted += disc_result.discussions_upserted; result.notes_skipped_bad_timestamp += disc_result.notes_skipped_bad_timestamp;
result.notes_upserted += disc_result.notes_upserted; result.diffnotes_count += disc_result.diffnotes_count;
result.notes_skipped_bad_timestamp += disc_result.notes_skipped_bad_timestamp; if disc_result.pagination_succeeded {
result.diffnotes_count += disc_result.diffnotes_count; result.mrs_synced_discussions += 1;
if disc_result.pagination_succeeded { }
result.mrs_synced_discussions += 1;
} }
} }
@@ -563,10 +563,6 @@ async fn drain_resource_events(
for job in &jobs { for job in &jobs {
iterations += 1; iterations += 1;
// conn is &Connection but upsert functions need &mut Connection.
// We need to use unsafe to get a mutable reference since rusqlite
// operations are internally safe with WAL mode and we're single-threaded.
// Instead, we'll use a savepoint approach via the Connection directly.
match client match client
.fetch_all_resource_events(gitlab_project_id, &job.entity_type, job.entity_iid) .fetch_all_resource_events(gitlab_project_id, &job.entity_type, job.entity_iid)
.await .await