feat(sync): Wire progress callbacks through sync pipeline stages

The sync command's stage spinners now show real-time aggregate progress
for each pipeline phase instead of static "syncing..." messages.

- Add `progress_callback` parameter to `run_embed` and
  `run_generate_docs` so callers can receive `(processed, total)` updates
- Add `stage_bar` parameter to `run_ingest` for aggregate progress
  across concurrently-ingested projects using shared AtomicUsize counters
- Update `stage_spinner` to use `{prefix}` for the `[N/M]` label,
  allowing `{msg}` to be updated independently with progress details
- Thread `ProgressBar` clones into each concurrent project task so
  per-entity progress (fetch, discussions, events) is reflected on the
  aggregate spinner
- Pass `None` for progress callbacks at standalone CLI entry points
  (handle_ingest, handle_generate_docs, handle_embed) to preserve
  existing behavior when commands are run outside of sync

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Taylor Eernisse
2026-02-04 14:16:21 -05:00
parent a65ea2f56f
commit 266ed78e73
5 changed files with 122 additions and 12 deletions

View File

@@ -1,5 +1,8 @@
//! Ingest command - fetch data from GitLab.
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use console::style;
use indicatif::{ProgressBar, ProgressStyle};
use rusqlite::Connection;
@@ -106,6 +109,9 @@ impl IngestDisplay {
}
/// Run the ingest command.
///
/// `stage_bar` is an optional `ProgressBar` (typically from sync's stage spinner)
/// that will be updated with aggregate progress across all projects.
pub async fn run_ingest(
config: &Config,
resource_type: &str,
@@ -113,14 +119,23 @@ pub async fn run_ingest(
force: bool,
full: bool,
display: IngestDisplay,
stage_bar: Option<ProgressBar>,
) -> Result<IngestResult> {
let run_id = uuid::Uuid::new_v4().simple().to_string();
let run_id = &run_id[..8];
let span = tracing::info_span!("ingest", %run_id, %resource_type);
run_ingest_inner(config, resource_type, project_filter, force, full, display)
.instrument(span)
.await
run_ingest_inner(
config,
resource_type,
project_filter,
force,
full,
display,
stage_bar,
)
.instrument(span)
.await
}
/// Inner implementation of run_ingest, instrumented with a root span.
@@ -131,6 +146,7 @@ async fn run_ingest_inner(
force: bool,
full: bool,
display: IngestDisplay,
stage_bar: Option<ProgressBar>,
) -> Result<IngestResult> {
// Validate resource type early
if resource_type != "issues" && resource_type != "mrs" {
@@ -237,6 +253,14 @@ async fn run_ingest_inner(
let concurrency = config.sync.primary_concurrency as usize;
let resource_type_owned = resource_type.to_string();
// Aggregate counters for stage_bar updates (shared across concurrent projects)
let agg_fetched = Arc::new(AtomicUsize::new(0));
let agg_discussions = Arc::new(AtomicUsize::new(0));
let agg_disc_total = Arc::new(AtomicUsize::new(0));
let agg_events = Arc::new(AtomicUsize::new(0));
let agg_events_total = Arc::new(AtomicUsize::new(0));
let stage_bar = stage_bar.unwrap_or_else(ProgressBar::hidden);
use futures::stream::{self, StreamExt};
let project_results: Vec<Result<ProjectIngestOutcome>> = stream::iter(projects.iter())
@@ -248,6 +272,12 @@ async fn run_ingest_inner(
let path = path.clone();
let local_project_id = *local_project_id;
let gitlab_project_id = *gitlab_project_id;
let stage_bar = stage_bar.clone();
let agg_fetched = Arc::clone(&agg_fetched);
let agg_discussions = Arc::clone(&agg_discussions);
let agg_disc_total = Arc::clone(&agg_disc_total);
let agg_events = Arc::clone(&agg_events);
let agg_events_total = Arc::clone(&agg_events_total);
async move {
let proj_conn = create_connection(&db_path)?;
@@ -286,28 +316,70 @@ async fn run_ingest_inner(
let spinner_clone = spinner.clone();
let disc_bar_clone = disc_bar.clone();
let stage_bar_clone = stage_bar.clone();
let agg_fetched_clone = Arc::clone(&agg_fetched);
let agg_discussions_clone = Arc::clone(&agg_discussions);
let agg_disc_total_clone = Arc::clone(&agg_disc_total);
let agg_events_clone = Arc::clone(&agg_events);
let agg_events_total_clone = Arc::clone(&agg_events_total);
let path_for_cb = path.clone();
let progress_callback: crate::ingestion::ProgressCallback = if !display.show_progress {
Box::new(|_| {})
} else {
Box::new(move |event: ProgressEvent| match event {
ProgressEvent::IssuesFetchStarted | ProgressEvent::MrsFetchStarted => {
// Spinner already showing fetch message
}
ProgressEvent::IssuesFetchComplete { total } | ProgressEvent::MrsFetchComplete { total } => {
let agg = agg_fetched_clone.fetch_add(total, Ordering::Relaxed) + total;
spinner_clone.set_message(format!(
"{path_for_cb}: {total} {type_label} fetched"
));
stage_bar_clone.set_message(format!(
"Fetching {type_label}... ({agg} fetched across projects)"
));
}
ProgressEvent::IssueFetched { count } | ProgressEvent::MrFetched { count } => {
spinner_clone.set_message(format!(
"{path_for_cb}: {count} fetched so far..."
));
}
ProgressEvent::DiscussionSyncStarted { total } => {
spinner_clone.finish_and_clear();
let agg_total = agg_disc_total_clone.fetch_add(total, Ordering::Relaxed) + total;
disc_bar_clone.set_length(total as u64);
disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(100));
stage_bar_clone.set_message(format!(
"Syncing discussions... (0/{agg_total})"
));
}
ProgressEvent::DiscussionSynced { current, total: _ } => {
disc_bar_clone.set_position(current as u64);
let agg = agg_discussions_clone.fetch_add(1, Ordering::Relaxed) + 1;
let agg_total = agg_disc_total_clone.load(Ordering::Relaxed);
stage_bar_clone.set_message(format!(
"Syncing discussions... ({agg}/{agg_total})"
));
}
ProgressEvent::DiscussionSyncComplete => {
disc_bar_clone.finish_and_clear();
}
ProgressEvent::MrDiscussionSyncStarted { total } => {
spinner_clone.finish_and_clear();
let agg_total = agg_disc_total_clone.fetch_add(total, Ordering::Relaxed) + total;
disc_bar_clone.set_length(total as u64);
disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(100));
stage_bar_clone.set_message(format!(
"Syncing discussions... (0/{agg_total})"
));
}
ProgressEvent::MrDiscussionSynced { current, total: _ } => {
disc_bar_clone.set_position(current as u64);
let agg = agg_discussions_clone.fetch_add(1, Ordering::Relaxed) + 1;
let agg_total = agg_disc_total_clone.load(Ordering::Relaxed);
stage_bar_clone.set_message(format!(
"Syncing discussions... ({agg}/{agg_total})"
));
}
ProgressEvent::MrDiscussionSyncComplete => {
disc_bar_clone.finish_and_clear();
@@ -322,14 +394,22 @@ async fn run_ingest_inner(
.progress_chars("=> "),
);
disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(100));
agg_events_total_clone.fetch_add(total, Ordering::Relaxed);
stage_bar_clone.set_message(
"Fetching resource events...".to_string()
);
}
ProgressEvent::ResourceEventFetched { current, total: _ } => {
disc_bar_clone.set_position(current as u64);
let agg = agg_events_clone.fetch_add(1, Ordering::Relaxed) + 1;
let agg_total = agg_events_total_clone.load(Ordering::Relaxed);
stage_bar_clone.set_message(format!(
"Fetching resource events... ({agg}/{agg_total})"
));
}
ProgressEvent::ResourceEventsFetchComplete { .. } => {
disc_bar_clone.finish_and_clear();
}
_ => {}
})
};