use crate::cli::render::{self, Icons, Theme, format_number}; use serde::Serialize; use std::time::Instant; use tracing::Instrument; use tracing::{debug, warn}; use crate::Config; use crate::cli::progress::{finish_stage, nested_progress, stage_spinner_v2}; use crate::core::error::Result; use crate::core::metrics::{MetricsLayer, StageTiming}; use crate::core::shutdown::ShutdownSignal; use super::embed::run_embed; use super::generate_docs::run_generate_docs; use super::ingest::{DryRunPreview, IngestDisplay, ProjectSummary, run_ingest, run_ingest_dry_run}; #[derive(Debug, Default)] pub struct SyncOptions { pub full: bool, pub force: bool, pub no_embed: bool, pub no_docs: bool, pub no_events: bool, pub robot_mode: bool, pub dry_run: bool, } #[derive(Debug, Default, Serialize)] pub struct SyncResult { #[serde(skip)] pub run_id: String, pub issues_updated: usize, pub mrs_updated: usize, pub discussions_fetched: usize, pub resource_events_fetched: usize, pub resource_events_failed: usize, pub mr_diffs_fetched: usize, pub mr_diffs_failed: usize, pub documents_regenerated: usize, pub documents_embedded: usize, pub status_enrichment_errors: usize, pub statuses_enriched: usize, #[serde(skip)] pub issue_projects: Vec, #[serde(skip)] pub mr_projects: Vec, } pub async fn run_sync( config: &Config, options: SyncOptions, run_id: Option<&str>, signal: &ShutdownSignal, ) -> Result { let generated_id; let run_id = match run_id { Some(id) => id, None => { generated_id = uuid::Uuid::new_v4().simple().to_string(); &generated_id[..8] } }; let span = tracing::info_span!("sync", %run_id); async move { let mut result = SyncResult { run_id: run_id.to_string(), ..SyncResult::default() }; // Handle dry_run mode - show preview without making any changes if options.dry_run { return run_sync_dry_run(config, &options).await; } let ingest_display = if options.robot_mode { IngestDisplay::silent() } else { IngestDisplay::progress_only() }; // ── Stage: Issues ── let stage_start = Instant::now(); let spinner = stage_spinner_v2(Icons::sync(), "Issues", "fetching...", options.robot_mode); debug!("Sync: ingesting issues"); let issues_result = run_ingest( config, "issues", None, options.force, options.full, false, // dry_run - sync has its own dry_run handling ingest_display, Some(spinner.clone()), signal, ) .await?; result.issues_updated = issues_result.issues_upserted; result.discussions_fetched += issues_result.discussions_fetched; result.resource_events_fetched += issues_result.resource_events_fetched; result.resource_events_failed += issues_result.resource_events_failed; result.status_enrichment_errors += issues_result.status_enrichment_errors; for sep in &issues_result.status_enrichment_projects { result.statuses_enriched += sep.enriched; } result.issue_projects = issues_result.project_summaries; let issues_summary = format!( "{} issues from {} {}", format_number(result.issues_updated as i64), issues_result.projects_synced, if issues_result.projects_synced == 1 { "project" } else { "projects" } ); finish_stage(&spinner, Icons::success(), "Issues", &issues_summary, stage_start.elapsed()); if !options.robot_mode { print_issue_sub_rows(&result.issue_projects); } if signal.is_cancelled() { debug!("Shutdown requested after issues stage, returning partial sync results"); return Ok(result); } // ── Stage: MRs ── let stage_start = Instant::now(); let spinner = stage_spinner_v2(Icons::sync(), "MRs", "fetching...", options.robot_mode); debug!("Sync: ingesting merge requests"); let mrs_result = run_ingest( config, "mrs", None, options.force, options.full, false, // dry_run - sync has its own dry_run handling ingest_display, Some(spinner.clone()), signal, ) .await?; result.mrs_updated = mrs_result.mrs_upserted; result.discussions_fetched += mrs_result.discussions_fetched; result.resource_events_fetched += mrs_result.resource_events_fetched; result.resource_events_failed += mrs_result.resource_events_failed; result.mr_diffs_fetched += mrs_result.mr_diffs_fetched; result.mr_diffs_failed += mrs_result.mr_diffs_failed; result.mr_projects = mrs_result.project_summaries; let mrs_summary = format!( "{} merge requests from {} {}", format_number(result.mrs_updated as i64), mrs_result.projects_synced, if mrs_result.projects_synced == 1 { "project" } else { "projects" } ); finish_stage(&spinner, Icons::success(), "MRs", &mrs_summary, stage_start.elapsed()); if !options.robot_mode { print_mr_sub_rows(&result.mr_projects); } if signal.is_cancelled() { debug!("Shutdown requested after MRs stage, returning partial sync results"); return Ok(result); } // ── Stage: Docs ── if !options.no_docs { let stage_start = Instant::now(); let spinner = stage_spinner_v2(Icons::sync(), "Docs", "generating...", options.robot_mode); debug!("Sync: generating documents"); let docs_bar = nested_progress("Docs", 0, options.robot_mode); let docs_bar_clone = docs_bar.clone(); let docs_cb: Box = Box::new(move |processed, total| { if total > 0 { docs_bar_clone.set_length(total as u64); docs_bar_clone.set_position(processed as u64); } }); let docs_result = run_generate_docs(config, options.full, None, Some(docs_cb))?; result.documents_regenerated = docs_result.regenerated; docs_bar.finish_and_clear(); let docs_summary = format!( "{} documents generated", format_number(result.documents_regenerated as i64), ); finish_stage(&spinner, Icons::success(), "Docs", &docs_summary, stage_start.elapsed()); } else { debug!("Sync: skipping document generation (--no-docs)"); } // ── Stage: Embed ── if !options.no_embed { let stage_start = Instant::now(); let spinner = stage_spinner_v2(Icons::sync(), "Embed", "preparing...", options.robot_mode); debug!("Sync: embedding documents"); let embed_bar = nested_progress("Embed", 0, options.robot_mode); let embed_bar_clone = embed_bar.clone(); let embed_cb: Box = Box::new(move |processed, total| { if total > 0 { embed_bar_clone.set_length(total as u64); embed_bar_clone.set_position(processed as u64); } }); match run_embed(config, options.full, false, Some(embed_cb), signal).await { Ok(embed_result) => { result.documents_embedded = embed_result.docs_embedded; embed_bar.finish_and_clear(); let embed_summary = format!( "{} chunks embedded", format_number(embed_result.chunks_embedded as i64), ); finish_stage(&spinner, Icons::success(), "Embed", &embed_summary, stage_start.elapsed()); } Err(e) => { embed_bar.finish_and_clear(); let warn_summary = format!("skipped ({})", e); finish_stage(&spinner, Icons::warning(), "Embed", &warn_summary, stage_start.elapsed()); warn!(error = %e, "Embedding stage failed (Ollama may be unavailable), continuing"); } } } else { debug!("Sync: skipping embedding (--no-embed)"); } debug!( issues = result.issues_updated, mrs = result.mrs_updated, discussions = result.discussions_fetched, resource_events = result.resource_events_fetched, resource_events_failed = result.resource_events_failed, mr_diffs = result.mr_diffs_fetched, mr_diffs_failed = result.mr_diffs_failed, docs = result.documents_regenerated, embedded = result.documents_embedded, "Sync pipeline complete" ); Ok(result) } .instrument(span) .await } pub fn print_sync( result: &SyncResult, elapsed: std::time::Duration, metrics: Option<&MetricsLayer>, ) { let has_data = result.issues_updated > 0 || result.mrs_updated > 0 || result.discussions_fetched > 0 || result.resource_events_fetched > 0 || result.mr_diffs_fetched > 0 || result.documents_regenerated > 0 || result.documents_embedded > 0 || result.statuses_enriched > 0; if !has_data { println!( "\n {} ({:.1}s)\n", Theme::dim().render("Already up to date"), elapsed.as_secs_f64() ); } else { // Headline: what happened, how long println!( "\n {} {} issues and {} MRs in {:.1}s", Theme::success().bold().render("Synced"), Theme::bold().render(&result.issues_updated.to_string()), Theme::bold().render(&result.mrs_updated.to_string()), elapsed.as_secs_f64() ); // Detail: supporting counts, compact middle-dot format, zero-suppressed let mut details: Vec = Vec::new(); if result.discussions_fetched > 0 { details.push(format!("{} discussions", result.discussions_fetched)); } if result.resource_events_fetched > 0 { details.push(format!("{} events", result.resource_events_fetched)); } if result.mr_diffs_fetched > 0 { details.push(format!("{} diffs", result.mr_diffs_fetched)); } if result.statuses_enriched > 0 { details.push(format!("{} statuses updated", result.statuses_enriched)); } if !details.is_empty() { println!(" {}", Theme::dim().render(&details.join(" \u{b7} "))); } // Documents: regeneration + embedding as a second detail line let mut doc_parts: Vec = Vec::new(); if result.documents_regenerated > 0 { doc_parts.push(format!("{} docs regenerated", result.documents_regenerated)); } if result.documents_embedded > 0 { doc_parts.push(format!("{} embedded", result.documents_embedded)); } if !doc_parts.is_empty() { println!(" {}", Theme::dim().render(&doc_parts.join(" \u{b7} "))); } // Errors: visually prominent, only if non-zero let mut errors: Vec = Vec::new(); if result.resource_events_failed > 0 { errors.push(format!("{} event failures", result.resource_events_failed)); } if result.mr_diffs_failed > 0 { errors.push(format!("{} diff failures", result.mr_diffs_failed)); } if result.status_enrichment_errors > 0 { errors.push(format!("{} status errors", result.status_enrichment_errors)); } if !errors.is_empty() { println!(" {}", Theme::error().render(&errors.join(" \u{b7} "))); } println!(); } if let Some(metrics) = metrics { let stages = metrics.extract_timings(); if !stages.is_empty() { print_timing_summary(&stages); } } } fn print_issue_sub_rows(projects: &[ProjectSummary]) { if projects.len() <= 1 { return; } for p in projects { let mut parts: Vec = Vec::new(); parts.push(format!( "{} {}", p.items_upserted, if p.items_upserted == 1 { "issue" } else { "issues" } )); if p.discussions_synced > 0 { parts.push(format!("{} discussions", p.discussions_synced)); } if p.statuses_enriched > 0 { parts.push(format!("{} statuses updated", p.statuses_enriched)); } if p.events_fetched > 0 { parts.push(format!("{} events", p.events_fetched)); } let detail = parts.join(" \u{b7} "); let _ = crate::cli::progress::multi().println(format!( " {}", Theme::dim().render(&format!("{:<30} {}", p.path, detail)) )); } } fn print_mr_sub_rows(projects: &[ProjectSummary]) { if projects.len() <= 1 { return; } for p in projects { let mut parts: Vec = Vec::new(); parts.push(format!( "{} {}", p.items_upserted, if p.items_upserted == 1 { "MR" } else { "MRs" } )); if p.discussions_synced > 0 { parts.push(format!("{} discussions", p.discussions_synced)); } if p.mr_diffs_fetched > 0 { parts.push(format!("{} diffs", p.mr_diffs_fetched)); } if p.events_fetched > 0 { parts.push(format!("{} events", p.events_fetched)); } let detail = parts.join(" \u{b7} "); let _ = crate::cli::progress::multi().println(format!( " {}", Theme::dim().render(&format!("{:<30} {}", p.path, detail)) )); } } fn section(title: &str) { println!("{}", render::section_divider(title)); } fn print_timing_summary(stages: &[StageTiming]) { section("Timing"); for stage in stages { for sub in &stage.sub_stages { print_stage_line(sub, 1); } } } fn print_stage_line(stage: &StageTiming, depth: usize) { let indent = " ".repeat(depth); let name = if let Some(ref project) = stage.project { format!("{} ({})", stage.name, project) } else { stage.name.clone() }; let pad_width = 30_usize.saturating_sub(indent.len() + name.len()); let dots = Theme::dim().render(&".".repeat(pad_width.max(2))); let time_str = Theme::bold().render(&format!("{:.1}s", stage.elapsed_ms as f64 / 1000.0)); let mut parts: Vec = Vec::new(); if stage.items_processed > 0 { parts.push(format!("{} items", stage.items_processed)); } if stage.errors > 0 { parts.push(Theme::error().render(&format!("{} errors", stage.errors))); } if stage.rate_limit_hits > 0 { parts.push(Theme::warning().render(&format!("{} rate limits", stage.rate_limit_hits))); } if parts.is_empty() { println!("{indent}{name} {dots} {time_str}"); } else { let suffix = parts.join(" \u{b7} "); println!("{indent}{name} {dots} {time_str} ({suffix})"); } for sub in &stage.sub_stages { print_stage_line(sub, depth + 1); } } #[derive(Serialize)] struct SyncJsonOutput<'a> { ok: bool, data: &'a SyncResult, meta: SyncMeta, } #[derive(Serialize)] struct SyncMeta { run_id: String, elapsed_ms: u64, #[serde(skip_serializing_if = "Vec::is_empty")] stages: Vec, } pub fn print_sync_json(result: &SyncResult, elapsed_ms: u64, metrics: Option<&MetricsLayer>) { let stages = metrics.map_or_else(Vec::new, MetricsLayer::extract_timings); let output = SyncJsonOutput { ok: true, data: result, meta: SyncMeta { run_id: result.run_id.clone(), elapsed_ms, stages, }, }; println!("{}", serde_json::to_string(&output).unwrap()); } #[derive(Debug, Default, Serialize)] pub struct SyncDryRunResult { pub issues_preview: DryRunPreview, pub mrs_preview: DryRunPreview, pub would_generate_docs: bool, pub would_embed: bool, } async fn run_sync_dry_run(config: &Config, options: &SyncOptions) -> Result { // Get dry run previews for both issues and MRs let issues_preview = run_ingest_dry_run(config, "issues", None, options.full)?; let mrs_preview = run_ingest_dry_run(config, "mrs", None, options.full)?; let dry_result = SyncDryRunResult { issues_preview, mrs_preview, would_generate_docs: !options.no_docs, would_embed: !options.no_embed, }; if options.robot_mode { print_sync_dry_run_json(&dry_result); } else { print_sync_dry_run(&dry_result); } // Return an empty SyncResult since this is just a preview Ok(SyncResult::default()) } pub fn print_sync_dry_run(result: &SyncDryRunResult) { println!( "\n {} {}", Theme::info().bold().render("Dry run"), Theme::dim().render("(no changes will be made)") ); print_dry_run_entity("Issues", &result.issues_preview); print_dry_run_entity("Merge Requests", &result.mrs_preview); // Pipeline stages section("Pipeline"); let mut stages: Vec = Vec::new(); if result.would_generate_docs { stages.push("generate-docs".to_string()); } else { stages.push(Theme::dim().render("generate-docs (skip)")); } if result.would_embed { stages.push("embed".to_string()); } else { stages.push(Theme::dim().render("embed (skip)")); } println!(" {}", stages.join(" \u{b7} ")); } fn print_dry_run_entity(label: &str, preview: &DryRunPreview) { section(label); let mode = if preview.sync_mode == "full" { Theme::warning().render("full") } else { Theme::success().render("incremental") }; println!(" {} \u{b7} {} projects", mode, preview.projects.len()); for project in &preview.projects { let sync_status = if !project.has_cursor { Theme::warning().render("initial sync") } else { Theme::success().render("incremental") }; if project.existing_count > 0 { println!( " {} \u{b7} {} \u{b7} {} existing", &project.path, sync_status, project.existing_count ); } else { println!(" {} \u{b7} {}", &project.path, sync_status); } } } #[derive(Serialize)] struct SyncDryRunJsonOutput { ok: bool, dry_run: bool, data: SyncDryRunJsonData, } #[derive(Serialize)] struct SyncDryRunJsonData { stages: Vec, } #[derive(Serialize)] struct SyncDryRunStage { name: String, would_run: bool, #[serde(skip_serializing_if = "Option::is_none")] preview: Option, } pub fn print_sync_dry_run_json(result: &SyncDryRunResult) { let output = SyncDryRunJsonOutput { ok: true, dry_run: true, data: SyncDryRunJsonData { stages: vec![ SyncDryRunStage { name: "ingest_issues".to_string(), would_run: true, preview: Some(result.issues_preview.clone()), }, SyncDryRunStage { name: "ingest_mrs".to_string(), would_run: true, preview: Some(result.mrs_preview.clone()), }, SyncDryRunStage { name: "generate_docs".to_string(), would_run: result.would_generate_docs, preview: None, }, SyncDryRunStage { name: "embed".to_string(), would_run: result.would_embed, preview: None, }, ], }, }; println!("{}", serde_json::to_string(&output).unwrap()); }