diff --git a/src/cli/commands/embed.rs b/src/cli/commands/embed.rs index 330c70d..e46f88f 100644 --- a/src/cli/commands/embed.rs +++ b/src/cli/commands/embed.rs @@ -19,10 +19,13 @@ pub struct EmbedCommandResult { } /// Run the embed command. +/// +/// `progress_callback` reports `(processed, total)` as documents are embedded. pub async fn run_embed( config: &Config, full: bool, retry_failed: bool, + progress_callback: Option>, ) -> Result { let db_path = get_db_path(config.storage.db_path.as_deref()); let conn = create_connection(&db_path)?; @@ -58,7 +61,7 @@ pub async fn run_embed( } let model_name = &config.embedding.model; - let result = embed_documents(&conn, &client, model_name, None).await?; + let result = embed_documents(&conn, &client, model_name, progress_callback).await?; Ok(EmbedCommandResult { embedded: result.embedded, diff --git a/src/cli/commands/generate_docs.rs b/src/cli/commands/generate_docs.rs index 166dc27..64476f6 100644 --- a/src/cli/commands/generate_docs.rs +++ b/src/cli/commands/generate_docs.rs @@ -28,10 +28,13 @@ pub struct GenerateDocsResult { /// /// Default mode: process only existing dirty_sources entries. /// Full mode: seed dirty_sources with ALL entities, then drain. +/// +/// `progress_callback` reports `(processed, estimated_total)` as documents are generated. pub fn run_generate_docs( config: &Config, full: bool, project_filter: Option<&str>, + progress_callback: Option>, ) -> Result { let db_path = get_db_path(config.storage.db_path.as_deref()); let conn = create_connection(&db_path)?; @@ -46,7 +49,8 @@ pub fn run_generate_docs( result.seeded += seed_dirty(&conn, SourceType::Discussion, project_filter)?; } - let regen = regenerate_dirty_documents(&conn)?; + let regen = + regenerate_dirty_documents(&conn, progress_callback.as_ref().map(|cb| cb.as_ref()))?; result.regenerated = regen.regenerated; result.unchanged = regen.unchanged; result.errored = regen.errored; diff --git a/src/cli/commands/ingest.rs b/src/cli/commands/ingest.rs index 9bae9d1..4108e33 100644 --- a/src/cli/commands/ingest.rs +++ b/src/cli/commands/ingest.rs @@ -1,5 +1,8 @@ //! Ingest command - fetch data from GitLab. +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; + use console::style; use indicatif::{ProgressBar, ProgressStyle}; use rusqlite::Connection; @@ -106,6 +109,9 @@ impl IngestDisplay { } /// Run the ingest command. +/// +/// `stage_bar` is an optional `ProgressBar` (typically from sync's stage spinner) +/// that will be updated with aggregate progress across all projects. pub async fn run_ingest( config: &Config, resource_type: &str, @@ -113,14 +119,23 @@ pub async fn run_ingest( force: bool, full: bool, display: IngestDisplay, + stage_bar: Option, ) -> Result { let run_id = uuid::Uuid::new_v4().simple().to_string(); let run_id = &run_id[..8]; let span = tracing::info_span!("ingest", %run_id, %resource_type); - run_ingest_inner(config, resource_type, project_filter, force, full, display) - .instrument(span) - .await + run_ingest_inner( + config, + resource_type, + project_filter, + force, + full, + display, + stage_bar, + ) + .instrument(span) + .await } /// Inner implementation of run_ingest, instrumented with a root span. @@ -131,6 +146,7 @@ async fn run_ingest_inner( force: bool, full: bool, display: IngestDisplay, + stage_bar: Option, ) -> Result { // Validate resource type early if resource_type != "issues" && resource_type != "mrs" { @@ -237,6 +253,14 @@ async fn run_ingest_inner( let concurrency = config.sync.primary_concurrency as usize; let resource_type_owned = resource_type.to_string(); + // Aggregate counters for stage_bar updates (shared across concurrent projects) + let agg_fetched = Arc::new(AtomicUsize::new(0)); + let agg_discussions = Arc::new(AtomicUsize::new(0)); + let agg_disc_total = Arc::new(AtomicUsize::new(0)); + let agg_events = Arc::new(AtomicUsize::new(0)); + let agg_events_total = Arc::new(AtomicUsize::new(0)); + let stage_bar = stage_bar.unwrap_or_else(ProgressBar::hidden); + use futures::stream::{self, StreamExt}; let project_results: Vec> = stream::iter(projects.iter()) @@ -248,6 +272,12 @@ async fn run_ingest_inner( let path = path.clone(); let local_project_id = *local_project_id; let gitlab_project_id = *gitlab_project_id; + let stage_bar = stage_bar.clone(); + let agg_fetched = Arc::clone(&agg_fetched); + let agg_discussions = Arc::clone(&agg_discussions); + let agg_disc_total = Arc::clone(&agg_disc_total); + let agg_events = Arc::clone(&agg_events); + let agg_events_total = Arc::clone(&agg_events_total); async move { let proj_conn = create_connection(&db_path)?; @@ -286,28 +316,70 @@ async fn run_ingest_inner( let spinner_clone = spinner.clone(); let disc_bar_clone = disc_bar.clone(); + let stage_bar_clone = stage_bar.clone(); + let agg_fetched_clone = Arc::clone(&agg_fetched); + let agg_discussions_clone = Arc::clone(&agg_discussions); + let agg_disc_total_clone = Arc::clone(&agg_disc_total); + let agg_events_clone = Arc::clone(&agg_events); + let agg_events_total_clone = Arc::clone(&agg_events_total); + let path_for_cb = path.clone(); let progress_callback: crate::ingestion::ProgressCallback = if !display.show_progress { Box::new(|_| {}) } else { Box::new(move |event: ProgressEvent| match event { + ProgressEvent::IssuesFetchStarted | ProgressEvent::MrsFetchStarted => { + // Spinner already showing fetch message + } + ProgressEvent::IssuesFetchComplete { total } | ProgressEvent::MrsFetchComplete { total } => { + let agg = agg_fetched_clone.fetch_add(total, Ordering::Relaxed) + total; + spinner_clone.set_message(format!( + "{path_for_cb}: {total} {type_label} fetched" + )); + stage_bar_clone.set_message(format!( + "Fetching {type_label}... ({agg} fetched across projects)" + )); + } + ProgressEvent::IssueFetched { count } | ProgressEvent::MrFetched { count } => { + spinner_clone.set_message(format!( + "{path_for_cb}: {count} fetched so far..." + )); + } ProgressEvent::DiscussionSyncStarted { total } => { spinner_clone.finish_and_clear(); + let agg_total = agg_disc_total_clone.fetch_add(total, Ordering::Relaxed) + total; disc_bar_clone.set_length(total as u64); disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(100)); + stage_bar_clone.set_message(format!( + "Syncing discussions... (0/{agg_total})" + )); } ProgressEvent::DiscussionSynced { current, total: _ } => { disc_bar_clone.set_position(current as u64); + let agg = agg_discussions_clone.fetch_add(1, Ordering::Relaxed) + 1; + let agg_total = agg_disc_total_clone.load(Ordering::Relaxed); + stage_bar_clone.set_message(format!( + "Syncing discussions... ({agg}/{agg_total})" + )); } ProgressEvent::DiscussionSyncComplete => { disc_bar_clone.finish_and_clear(); } ProgressEvent::MrDiscussionSyncStarted { total } => { spinner_clone.finish_and_clear(); + let agg_total = agg_disc_total_clone.fetch_add(total, Ordering::Relaxed) + total; disc_bar_clone.set_length(total as u64); disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(100)); + stage_bar_clone.set_message(format!( + "Syncing discussions... (0/{agg_total})" + )); } ProgressEvent::MrDiscussionSynced { current, total: _ } => { disc_bar_clone.set_position(current as u64); + let agg = agg_discussions_clone.fetch_add(1, Ordering::Relaxed) + 1; + let agg_total = agg_disc_total_clone.load(Ordering::Relaxed); + stage_bar_clone.set_message(format!( + "Syncing discussions... ({agg}/{agg_total})" + )); } ProgressEvent::MrDiscussionSyncComplete => { disc_bar_clone.finish_and_clear(); @@ -322,14 +394,22 @@ async fn run_ingest_inner( .progress_chars("=> "), ); disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(100)); + agg_events_total_clone.fetch_add(total, Ordering::Relaxed); + stage_bar_clone.set_message( + "Fetching resource events...".to_string() + ); } ProgressEvent::ResourceEventFetched { current, total: _ } => { disc_bar_clone.set_position(current as u64); + let agg = agg_events_clone.fetch_add(1, Ordering::Relaxed) + 1; + let agg_total = agg_events_total_clone.load(Ordering::Relaxed); + stage_bar_clone.set_message(format!( + "Fetching resource events... ({agg}/{agg_total})" + )); } ProgressEvent::ResourceEventsFetchComplete { .. } => { disc_bar_clone.finish_and_clear(); } - _ => {} }) }; diff --git a/src/cli/commands/sync.rs b/src/cli/commands/sync.rs index 7f9d788..4ce59d7 100644 --- a/src/cli/commands/sync.rs +++ b/src/cli/commands/sync.rs @@ -40,6 +40,9 @@ pub struct SyncResult { } /// Create a styled spinner for a sync stage. +/// +/// Uses `{prefix}` for the `[N/M]` stage label so callers can update `{msg}` +/// independently without losing the stage context. fn stage_spinner(stage: u8, total: u8, msg: &str, robot_mode: bool) -> ProgressBar { if robot_mode { return ProgressBar::hidden(); @@ -47,11 +50,12 @@ fn stage_spinner(stage: u8, total: u8, msg: &str, robot_mode: bool) -> ProgressB let pb = crate::cli::progress::multi().add(ProgressBar::new_spinner()); pb.set_style( ProgressStyle::default_spinner() - .template("{spinner:.blue} {msg}") + .template("{spinner:.blue} {prefix} {msg}") .expect("valid template"), ); pb.enable_steady_tick(std::time::Duration::from_millis(80)); - pb.set_message(format!("[{stage}/{total}] {msg}")); + pb.set_prefix(format!("[{stage}/{total}]")); + pb.set_message(msg.to_string()); pb } @@ -112,6 +116,7 @@ pub async fn run_sync( options.force, options.full, ingest_display, + Some(spinner.clone()), ) .await?; result.issues_updated = issues_result.issues_upserted; @@ -136,6 +141,7 @@ pub async fn run_sync( options.force, options.full, ingest_display, + Some(spinner.clone()), ) .await?; result.mrs_updated = mrs_result.mrs_upserted; @@ -154,7 +160,15 @@ pub async fn run_sync( options.robot_mode, ); info!("Sync stage {current_stage}/{total_stages}: generating documents"); - let docs_result = run_generate_docs(config, false, None)?; + let docs_spinner = spinner.clone(); + let docs_cb: Box = Box::new(move |processed, total| { + if total > 0 { + docs_spinner.set_message(format!( + "Processing documents... ({processed}/{total})" + )); + } + }); + let docs_result = run_generate_docs(config, false, None, Some(docs_cb))?; result.documents_regenerated = docs_result.regenerated; spinner.finish_and_clear(); } else { @@ -171,7 +185,13 @@ pub async fn run_sync( options.robot_mode, ); info!("Sync stage {current_stage}/{total_stages}: embedding documents"); - match run_embed(config, options.full, false).await { + let embed_spinner = spinner.clone(); + let embed_cb: Box = Box::new(move |processed, total| { + embed_spinner.set_message(format!( + "Embedding documents... ({processed}/{total})" + )); + }); + match run_embed(config, options.full, false, Some(embed_cb)).await { Ok(embed_result) => { result.documents_embedded = embed_result.embedded; spinner.finish_and_clear(); diff --git a/src/main.rs b/src/main.rs index b065f81..8319258 100644 --- a/src/main.rs +++ b/src/main.rs @@ -501,6 +501,7 @@ async fn handle_ingest( force, full, display, + None, ) .await?; @@ -527,6 +528,7 @@ async fn handle_ingest( force, full, display, + None, ) .await?; @@ -537,6 +539,7 @@ async fn handle_ingest( force, full, display, + None, ) .await?; @@ -1225,7 +1228,7 @@ async fn handle_generate_docs( ) -> Result<(), Box> { let config = Config::load(config_override)?; - let result = run_generate_docs(&config, args.full, args.project.as_deref())?; + let result = run_generate_docs(&config, args.full, args.project.as_deref(), None)?; if robot_mode { print_generate_docs_json(&result); } else { @@ -1242,7 +1245,7 @@ async fn handle_embed( let config = Config::load(config_override)?; let full = args.full && !args.no_full; let retry_failed = args.retry_failed && !args.no_retry_failed; - let result = run_embed(&config, full, retry_failed).await?; + let result = run_embed(&config, full, retry_failed, None).await?; if robot_mode { print_embed_json(&result); } else {