#[derive(Debug, Default)] pub struct SyncOptions { pub full: bool, pub force: bool, pub no_embed: bool, pub no_docs: bool, pub no_events: bool, pub robot_mode: bool, pub dry_run: bool, pub issue_iids: Vec, pub mr_iids: Vec, pub project: Option, pub preflight_only: bool, } impl SyncOptions { pub const MAX_SURGICAL_TARGETS: usize = 100; pub fn is_surgical(&self) -> bool { !self.issue_iids.is_empty() || !self.mr_iids.is_empty() } } #[derive(Debug, Default, Serialize)] pub struct SurgicalIids { pub issues: Vec, pub merge_requests: Vec, } #[derive(Debug, Serialize)] pub struct EntitySyncResult { pub entity_type: String, pub iid: u64, pub outcome: String, #[serde(skip_serializing_if = "Option::is_none")] pub error: Option, #[serde(skip_serializing_if = "Option::is_none")] pub toctou_reason: Option, } #[derive(Debug, Default, Serialize)] pub struct SyncResult { #[serde(skip)] pub run_id: String, pub issues_updated: usize, pub mrs_updated: usize, pub discussions_fetched: usize, pub resource_events_fetched: usize, pub resource_events_failed: usize, pub mr_diffs_fetched: usize, pub mr_diffs_failed: usize, pub documents_regenerated: usize, pub documents_errored: usize, pub documents_embedded: usize, pub embedding_failed: usize, pub status_enrichment_errors: usize, pub statuses_enriched: usize, #[serde(skip_serializing_if = "Option::is_none")] pub surgical_mode: Option, #[serde(skip_serializing_if = "Option::is_none")] pub surgical_iids: Option, #[serde(skip_serializing_if = "Option::is_none")] pub entity_results: Option>, #[serde(skip_serializing_if = "Option::is_none")] pub preflight_only: Option, #[serde(skip)] pub issue_projects: Vec, #[serde(skip)] pub mr_projects: Vec, } /// Alias for [`Theme::color_icon`] to keep call sites concise. fn color_icon(icon: &str, has_errors: bool) -> String { Theme::color_icon(icon, has_errors) } pub async fn run_sync( config: &Config, options: SyncOptions, run_id: Option<&str>, signal: &ShutdownSignal, ) -> Result { // Surgical dispatch: if any IIDs specified, route to surgical pipeline if options.is_surgical() { return run_sync_surgical(config, options, run_id, signal).await; } let generated_id; let run_id = match run_id { Some(id) => id, None => { generated_id = uuid::Uuid::new_v4().simple().to_string(); &generated_id[..8] } }; let span = tracing::info_span!("sync", %run_id); async move { let mut result = SyncResult { run_id: run_id.to_string(), ..SyncResult::default() }; // Handle dry_run mode - show preview without making any changes if options.dry_run { return run_sync_dry_run(config, &options).await; } let ingest_display = if options.robot_mode { IngestDisplay::silent() } else { IngestDisplay::progress_only() }; // ── Stage: Issues ── let stage_start = Instant::now(); let spinner = stage_spinner_v2(Icons::sync(), "Issues", "fetching...", options.robot_mode); debug!("Sync: ingesting issues"); let issues_result = run_ingest( config, "issues", None, options.force, options.full, false, // dry_run - sync has its own dry_run handling ingest_display, Some(spinner.clone()), signal, ) .await?; result.issues_updated = issues_result.issues_upserted; result.discussions_fetched += issues_result.discussions_fetched; result.resource_events_fetched += issues_result.resource_events_fetched; result.resource_events_failed += issues_result.resource_events_failed; result.status_enrichment_errors += issues_result.status_enrichment_errors; for sep in &issues_result.status_enrichment_projects { result.statuses_enriched += sep.enriched; } result.issue_projects = issues_result.project_summaries; let issues_elapsed = stage_start.elapsed(); if !options.robot_mode { let (status_summary, status_has_errors) = summarize_status_enrichment(&issues_result.status_enrichment_projects); let status_icon = color_icon( if status_has_errors { Icons::warning() } else { Icons::success() }, status_has_errors, ); let mut status_lines = vec![format_stage_line( &status_icon, "Status", &status_summary, issues_elapsed, )]; status_lines.extend(status_sub_rows(&issues_result.status_enrichment_projects)); print_static_lines(&status_lines); } let mut issues_summary = format!( "{} issues from {} {}", format_number(result.issues_updated as i64), issues_result.projects_synced, if issues_result.projects_synced == 1 { "project" } else { "projects" } ); append_failures( &mut issues_summary, &[ ("event failures", issues_result.resource_events_failed), ("status errors", issues_result.status_enrichment_errors), ], ); let issues_icon = color_icon( if issues_result.resource_events_failed > 0 || issues_result.status_enrichment_errors > 0 { Icons::warning() } else { Icons::success() }, issues_result.resource_events_failed > 0 || issues_result.status_enrichment_errors > 0, ); if options.robot_mode { emit_stage_line(&spinner, &issues_icon, "Issues", &issues_summary, issues_elapsed); } else { let sub_rows = issue_sub_rows(&result.issue_projects); emit_stage_block( &spinner, &issues_icon, "Issues", &issues_summary, issues_elapsed, &sub_rows, ); } if signal.is_cancelled() { debug!("Shutdown requested after issues stage, returning partial sync results"); return Ok(result); } // ── Stage: MRs ── let stage_start = Instant::now(); let spinner = stage_spinner_v2(Icons::sync(), "MRs", "fetching...", options.robot_mode); debug!("Sync: ingesting merge requests"); let mrs_result = run_ingest( config, "mrs", None, options.force, options.full, false, // dry_run - sync has its own dry_run handling ingest_display, Some(spinner.clone()), signal, ) .await?; result.mrs_updated = mrs_result.mrs_upserted; result.discussions_fetched += mrs_result.discussions_fetched; result.resource_events_fetched += mrs_result.resource_events_fetched; result.resource_events_failed += mrs_result.resource_events_failed; result.mr_diffs_fetched += mrs_result.mr_diffs_fetched; result.mr_diffs_failed += mrs_result.mr_diffs_failed; result.mr_projects = mrs_result.project_summaries; let mrs_elapsed = stage_start.elapsed(); let mut mrs_summary = format!( "{} merge requests from {} {}", format_number(result.mrs_updated as i64), mrs_result.projects_synced, if mrs_result.projects_synced == 1 { "project" } else { "projects" } ); append_failures( &mut mrs_summary, &[ ("event failures", mrs_result.resource_events_failed), ("diff failures", mrs_result.mr_diffs_failed), ], ); let mrs_icon = color_icon( if mrs_result.resource_events_failed > 0 || mrs_result.mr_diffs_failed > 0 { Icons::warning() } else { Icons::success() }, mrs_result.resource_events_failed > 0 || mrs_result.mr_diffs_failed > 0, ); if options.robot_mode { emit_stage_line(&spinner, &mrs_icon, "MRs", &mrs_summary, mrs_elapsed); } else { let sub_rows = mr_sub_rows(&result.mr_projects); emit_stage_block(&spinner, &mrs_icon, "MRs", &mrs_summary, mrs_elapsed, &sub_rows); } if signal.is_cancelled() { debug!("Shutdown requested after MRs stage, returning partial sync results"); return Ok(result); } // ── Stage: Docs ── if !options.no_docs { let stage_start = Instant::now(); let spinner = stage_spinner_v2(Icons::sync(), "Docs", "generating...", options.robot_mode); debug!("Sync: generating documents"); let docs_bar = nested_progress("Docs", 0, options.robot_mode); let docs_bar_clone = docs_bar.clone(); let docs_cb: Box = Box::new(move |processed, total| { if total > 0 { docs_bar_clone.set_length(total as u64); docs_bar_clone.set_position(processed as u64); } }); let docs_result = run_generate_docs(config, options.full, None, Some(docs_cb))?; result.documents_regenerated = docs_result.regenerated; result.documents_errored = docs_result.errored; docs_bar.finish_and_clear(); let mut docs_summary = format!( "{} documents generated", format_number(result.documents_regenerated as i64), ); append_failures(&mut docs_summary, &[("errors", docs_result.errored)]); let docs_icon = color_icon( if docs_result.errored > 0 { Icons::warning() } else { Icons::success() }, docs_result.errored > 0, ); emit_stage_line(&spinner, &docs_icon, "Docs", &docs_summary, stage_start.elapsed()); } else { debug!("Sync: skipping document generation (--no-docs)"); } // ── Stage: Embed ── if !options.no_embed { let stage_start = Instant::now(); let spinner = stage_spinner_v2(Icons::sync(), "Embed", "preparing...", options.robot_mode); debug!("Sync: embedding documents"); let embed_bar = nested_progress("Embed", 0, options.robot_mode); let embed_bar_clone = embed_bar.clone(); let embed_cb: Box = Box::new(move |processed, total| { if total > 0 { embed_bar_clone.set_length(total as u64); embed_bar_clone.set_position(processed as u64); } }); match run_embed(config, options.full, false, Some(embed_cb), signal).await { Ok(embed_result) => { result.documents_embedded = embed_result.docs_embedded; result.embedding_failed = embed_result.failed; embed_bar.finish_and_clear(); let mut embed_summary = format!( "{} chunks embedded", format_number(embed_result.chunks_embedded as i64), ); let mut tail_parts = Vec::new(); if embed_result.failed > 0 { tail_parts.push(format!("{} failed", embed_result.failed)); } if embed_result.skipped > 0 { tail_parts.push(format!("{} skipped", embed_result.skipped)); } if !tail_parts.is_empty() { embed_summary.push_str(&format!(" ({})", tail_parts.join(", "))); } let embed_icon = color_icon( if embed_result.failed > 0 { Icons::warning() } else { Icons::success() }, embed_result.failed > 0, ); emit_stage_line( &spinner, &embed_icon, "Embed", &embed_summary, stage_start.elapsed(), ); } Err(e) => { embed_bar.finish_and_clear(); let warn_summary = format!("skipped ({})", e); let warn_icon = color_icon(Icons::warning(), true); emit_stage_line( &spinner, &warn_icon, "Embed", &warn_summary, stage_start.elapsed(), ); warn!(error = %e, "Embedding stage failed (Ollama may be unavailable), continuing"); } } } else { debug!("Sync: skipping embedding (--no-embed)"); } debug!( issues = result.issues_updated, mrs = result.mrs_updated, discussions = result.discussions_fetched, resource_events = result.resource_events_fetched, resource_events_failed = result.resource_events_failed, mr_diffs = result.mr_diffs_fetched, mr_diffs_failed = result.mr_diffs_failed, docs = result.documents_regenerated, embedded = result.documents_embedded, "Sync pipeline complete" ); Ok(result) } .instrument(span) .await }