Files
gitlore/src/cli/commands/sync.rs
teernisse 9ec1344945 feat(surgical-sync): add per-IID surgical sync pipeline with preflight validation
Add the ability to sync specific issues or merge requests by IID without
running a full incremental sync. This enables fast, targeted data refresh
for individual entities — useful for agent workflows, debugging, and
real-time investigation of specific issues or MRs.

Architecture:
- New CLI flags: --issue <IID> and --mr <IID> (repeatable, up to 100 total)
  scoped to a single project via -p/--project
- Preflight phase validates all IIDs exist on GitLab before any DB writes,
  with TOCTOU-aware soft verification at ingest time
- 6-stage pipeline: preflight -> fetch -> ingest -> dependents -> docs -> embed
- Each stage is cancellation-aware via ShutdownSignal
- Dedicated SyncRunRecorder extensions track surgical-specific counters
  (issues_fetched, mrs_ingested, docs_regenerated, etc.)

New modules:
- src/ingestion/surgical.rs: Core surgical fetch/ingest/dependent logic
  with preflight_fetch(), ingest_issue_by_iid(), ingest_mr_by_iid(),
  and fetch_dependents_for_{issue,mr}()
- src/cli/commands/sync_surgical.rs: Full CLI orchestrator with progress
  spinners, human/robot output, and cancellation handling
- src/embedding/pipeline.rs: embed_documents_by_ids() for scoped embedding
- src/documents/regenerator.rs: regenerate_dirty_documents_for_sources()
  for scoped document regeneration

Database changes:
- Migration 027: Extends sync_runs with mode, phase, surgical_iids_json,
  per-entity counters, and cancelled_at column
- New indexes: idx_sync_runs_mode_started, idx_sync_runs_status_phase_started

GitLab client:
- get_issue_by_iid() and get_mr_by_iid() single-entity fetch methods

Error handling:
- New SurgicalPreflightFailed error variant with entity_type, iid, project,
  and reason fields. Shares exit code 6 with GitLabNotFound.

Includes comprehensive test coverage:
- 645 lines of surgical ingestion tests (wiremock-based)
- 184 lines of scoped embedding tests
- 85 lines of scoped regeneration tests
- 113 lines of GitLab client single-entity tests
- 236 lines of sync_run surgical column/counter tests
- Unit tests for SyncOptions, error codes, and CLI validation
2026-02-18 16:28:21 -05:00

1202 lines
40 KiB
Rust

use crate::cli::render::{self, Icons, Theme, format_number};
use serde::Serialize;
use std::time::Instant;
use tracing::Instrument;
use tracing::{debug, warn};
use crate::Config;
use crate::cli::progress::{format_stage_line, nested_progress, stage_spinner_v2};
use crate::core::error::Result;
use crate::core::metrics::{MetricsLayer, StageTiming};
use crate::core::shutdown::ShutdownSignal;
use super::embed::run_embed;
use super::generate_docs::run_generate_docs;
use super::ingest::{
DryRunPreview, IngestDisplay, ProjectStatusEnrichment, ProjectSummary, run_ingest,
run_ingest_dry_run,
};
use super::sync_surgical::run_sync_surgical;
#[derive(Debug, Default)]
pub struct SyncOptions {
pub full: bool,
pub force: bool,
pub no_embed: bool,
pub no_docs: bool,
pub no_events: bool,
pub robot_mode: bool,
pub dry_run: bool,
pub issue_iids: Vec<u64>,
pub mr_iids: Vec<u64>,
pub project: Option<String>,
pub preflight_only: bool,
}
impl SyncOptions {
pub const MAX_SURGICAL_TARGETS: usize = 100;
pub fn is_surgical(&self) -> bool {
!self.issue_iids.is_empty() || !self.mr_iids.is_empty()
}
}
#[derive(Debug, Default, Serialize)]
pub struct SurgicalIids {
pub issues: Vec<u64>,
pub merge_requests: Vec<u64>,
}
#[derive(Debug, Serialize)]
pub struct EntitySyncResult {
pub entity_type: String,
pub iid: u64,
pub outcome: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub toctou_reason: Option<String>,
}
#[derive(Debug, Default, Serialize)]
pub struct SyncResult {
#[serde(skip)]
pub run_id: String,
pub issues_updated: usize,
pub mrs_updated: usize,
pub discussions_fetched: usize,
pub resource_events_fetched: usize,
pub resource_events_failed: usize,
pub mr_diffs_fetched: usize,
pub mr_diffs_failed: usize,
pub documents_regenerated: usize,
pub documents_errored: usize,
pub documents_embedded: usize,
pub embedding_failed: usize,
pub status_enrichment_errors: usize,
pub statuses_enriched: usize,
#[serde(skip_serializing_if = "Option::is_none")]
pub surgical_mode: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub surgical_iids: Option<SurgicalIids>,
#[serde(skip_serializing_if = "Option::is_none")]
pub entity_results: Option<Vec<EntitySyncResult>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub preflight_only: Option<bool>,
#[serde(skip)]
pub issue_projects: Vec<ProjectSummary>,
#[serde(skip)]
pub mr_projects: Vec<ProjectSummary>,
}
/// Alias for [`Theme::color_icon`] to keep call sites concise.
fn color_icon(icon: &str, has_errors: bool) -> String {
Theme::color_icon(icon, has_errors)
}
pub async fn run_sync(
config: &Config,
options: SyncOptions,
run_id: Option<&str>,
signal: &ShutdownSignal,
) -> Result<SyncResult> {
// Surgical dispatch: if any IIDs specified, route to surgical pipeline
if options.is_surgical() {
return run_sync_surgical(config, options, run_id, signal).await;
}
let generated_id;
let run_id = match run_id {
Some(id) => id,
None => {
generated_id = uuid::Uuid::new_v4().simple().to_string();
&generated_id[..8]
}
};
let span = tracing::info_span!("sync", %run_id);
async move {
let mut result = SyncResult {
run_id: run_id.to_string(),
..SyncResult::default()
};
// Handle dry_run mode - show preview without making any changes
if options.dry_run {
return run_sync_dry_run(config, &options).await;
}
let ingest_display = if options.robot_mode {
IngestDisplay::silent()
} else {
IngestDisplay::progress_only()
};
// ── Stage: Issues ──
let stage_start = Instant::now();
let spinner = stage_spinner_v2(Icons::sync(), "Issues", "fetching...", options.robot_mode);
debug!("Sync: ingesting issues");
let issues_result = run_ingest(
config,
"issues",
None,
options.force,
options.full,
false, // dry_run - sync has its own dry_run handling
ingest_display,
Some(spinner.clone()),
signal,
)
.await?;
result.issues_updated = issues_result.issues_upserted;
result.discussions_fetched += issues_result.discussions_fetched;
result.resource_events_fetched += issues_result.resource_events_fetched;
result.resource_events_failed += issues_result.resource_events_failed;
result.status_enrichment_errors += issues_result.status_enrichment_errors;
for sep in &issues_result.status_enrichment_projects {
result.statuses_enriched += sep.enriched;
}
result.issue_projects = issues_result.project_summaries;
let issues_elapsed = stage_start.elapsed();
if !options.robot_mode {
let (status_summary, status_has_errors) =
summarize_status_enrichment(&issues_result.status_enrichment_projects);
let status_icon = color_icon(
if status_has_errors {
Icons::warning()
} else {
Icons::success()
},
status_has_errors,
);
let mut status_lines = vec![format_stage_line(
&status_icon,
"Status",
&status_summary,
issues_elapsed,
)];
status_lines.extend(status_sub_rows(&issues_result.status_enrichment_projects));
print_static_lines(&status_lines);
}
let mut issues_summary = format!(
"{} issues from {} {}",
format_number(result.issues_updated as i64),
issues_result.projects_synced,
if issues_result.projects_synced == 1 { "project" } else { "projects" }
);
append_failures(
&mut issues_summary,
&[
("event failures", issues_result.resource_events_failed),
("status errors", issues_result.status_enrichment_errors),
],
);
let issues_icon = color_icon(
if issues_result.resource_events_failed > 0 || issues_result.status_enrichment_errors > 0
{
Icons::warning()
} else {
Icons::success()
},
issues_result.resource_events_failed > 0 || issues_result.status_enrichment_errors > 0,
);
if options.robot_mode {
emit_stage_line(&spinner, &issues_icon, "Issues", &issues_summary, issues_elapsed);
} else {
let sub_rows = issue_sub_rows(&result.issue_projects);
emit_stage_block(
&spinner,
&issues_icon,
"Issues",
&issues_summary,
issues_elapsed,
&sub_rows,
);
}
if signal.is_cancelled() {
debug!("Shutdown requested after issues stage, returning partial sync results");
return Ok(result);
}
// ── Stage: MRs ──
let stage_start = Instant::now();
let spinner = stage_spinner_v2(Icons::sync(), "MRs", "fetching...", options.robot_mode);
debug!("Sync: ingesting merge requests");
let mrs_result = run_ingest(
config,
"mrs",
None,
options.force,
options.full,
false, // dry_run - sync has its own dry_run handling
ingest_display,
Some(spinner.clone()),
signal,
)
.await?;
result.mrs_updated = mrs_result.mrs_upserted;
result.discussions_fetched += mrs_result.discussions_fetched;
result.resource_events_fetched += mrs_result.resource_events_fetched;
result.resource_events_failed += mrs_result.resource_events_failed;
result.mr_diffs_fetched += mrs_result.mr_diffs_fetched;
result.mr_diffs_failed += mrs_result.mr_diffs_failed;
result.mr_projects = mrs_result.project_summaries;
let mrs_elapsed = stage_start.elapsed();
let mut mrs_summary = format!(
"{} merge requests from {} {}",
format_number(result.mrs_updated as i64),
mrs_result.projects_synced,
if mrs_result.projects_synced == 1 { "project" } else { "projects" }
);
append_failures(
&mut mrs_summary,
&[
("event failures", mrs_result.resource_events_failed),
("diff failures", mrs_result.mr_diffs_failed),
],
);
let mrs_icon = color_icon(
if mrs_result.resource_events_failed > 0 || mrs_result.mr_diffs_failed > 0 {
Icons::warning()
} else {
Icons::success()
},
mrs_result.resource_events_failed > 0 || mrs_result.mr_diffs_failed > 0,
);
if options.robot_mode {
emit_stage_line(&spinner, &mrs_icon, "MRs", &mrs_summary, mrs_elapsed);
} else {
let sub_rows = mr_sub_rows(&result.mr_projects);
emit_stage_block(&spinner, &mrs_icon, "MRs", &mrs_summary, mrs_elapsed, &sub_rows);
}
if signal.is_cancelled() {
debug!("Shutdown requested after MRs stage, returning partial sync results");
return Ok(result);
}
// ── Stage: Docs ──
if !options.no_docs {
let stage_start = Instant::now();
let spinner = stage_spinner_v2(Icons::sync(), "Docs", "generating...", options.robot_mode);
debug!("Sync: generating documents");
let docs_bar = nested_progress("Docs", 0, options.robot_mode);
let docs_bar_clone = docs_bar.clone();
let docs_cb: Box<dyn Fn(usize, usize)> = Box::new(move |processed, total| {
if total > 0 {
docs_bar_clone.set_length(total as u64);
docs_bar_clone.set_position(processed as u64);
}
});
let docs_result = run_generate_docs(config, options.full, None, Some(docs_cb))?;
result.documents_regenerated = docs_result.regenerated;
result.documents_errored = docs_result.errored;
docs_bar.finish_and_clear();
let mut docs_summary = format!(
"{} documents generated",
format_number(result.documents_regenerated as i64),
);
append_failures(&mut docs_summary, &[("errors", docs_result.errored)]);
let docs_icon = color_icon(
if docs_result.errored > 0 {
Icons::warning()
} else {
Icons::success()
},
docs_result.errored > 0,
);
emit_stage_line(&spinner, &docs_icon, "Docs", &docs_summary, stage_start.elapsed());
} else {
debug!("Sync: skipping document generation (--no-docs)");
}
// ── Stage: Embed ──
if !options.no_embed {
let stage_start = Instant::now();
let spinner = stage_spinner_v2(Icons::sync(), "Embed", "preparing...", options.robot_mode);
debug!("Sync: embedding documents");
let embed_bar = nested_progress("Embed", 0, options.robot_mode);
let embed_bar_clone = embed_bar.clone();
let embed_cb: Box<dyn Fn(usize, usize)> = Box::new(move |processed, total| {
if total > 0 {
embed_bar_clone.set_length(total as u64);
embed_bar_clone.set_position(processed as u64);
}
});
match run_embed(config, options.full, false, Some(embed_cb), signal).await {
Ok(embed_result) => {
result.documents_embedded = embed_result.docs_embedded;
result.embedding_failed = embed_result.failed;
embed_bar.finish_and_clear();
let mut embed_summary = format!(
"{} chunks embedded",
format_number(embed_result.chunks_embedded as i64),
);
let mut tail_parts = Vec::new();
if embed_result.failed > 0 {
tail_parts.push(format!("{} failed", embed_result.failed));
}
if embed_result.skipped > 0 {
tail_parts.push(format!("{} skipped", embed_result.skipped));
}
if !tail_parts.is_empty() {
embed_summary.push_str(&format!(" ({})", tail_parts.join(", ")));
}
let embed_icon = color_icon(
if embed_result.failed > 0 {
Icons::warning()
} else {
Icons::success()
},
embed_result.failed > 0,
);
emit_stage_line(
&spinner,
&embed_icon,
"Embed",
&embed_summary,
stage_start.elapsed(),
);
}
Err(e) => {
embed_bar.finish_and_clear();
let warn_summary = format!("skipped ({})", e);
let warn_icon = color_icon(Icons::warning(), true);
emit_stage_line(
&spinner,
&warn_icon,
"Embed",
&warn_summary,
stage_start.elapsed(),
);
warn!(error = %e, "Embedding stage failed (Ollama may be unavailable), continuing");
}
}
} else {
debug!("Sync: skipping embedding (--no-embed)");
}
debug!(
issues = result.issues_updated,
mrs = result.mrs_updated,
discussions = result.discussions_fetched,
resource_events = result.resource_events_fetched,
resource_events_failed = result.resource_events_failed,
mr_diffs = result.mr_diffs_fetched,
mr_diffs_failed = result.mr_diffs_failed,
docs = result.documents_regenerated,
embedded = result.documents_embedded,
"Sync pipeline complete"
);
Ok(result)
}
.instrument(span)
.await
}
pub fn print_sync(
result: &SyncResult,
elapsed: std::time::Duration,
metrics: Option<&MetricsLayer>,
show_timings: bool,
) {
let has_data = result.issues_updated > 0
|| result.mrs_updated > 0
|| result.discussions_fetched > 0
|| result.resource_events_fetched > 0
|| result.mr_diffs_fetched > 0
|| result.documents_regenerated > 0
|| result.documents_embedded > 0
|| result.statuses_enriched > 0;
let has_failures = result.resource_events_failed > 0
|| result.mr_diffs_failed > 0
|| result.status_enrichment_errors > 0
|| result.documents_errored > 0
|| result.embedding_failed > 0;
if !has_data && !has_failures {
println!(
"\n {} ({})\n",
Theme::dim().render("Already up to date"),
Theme::timing().render(&format!("{:.1}s", elapsed.as_secs_f64()))
);
} else {
let headline = if has_failures {
Theme::warning().bold().render("Sync completed with issues")
} else {
Theme::success().bold().render("Synced")
};
println!(
"\n {} {} issues and {} MRs in {}",
headline,
Theme::info()
.bold()
.render(&result.issues_updated.to_string()),
Theme::info().bold().render(&result.mrs_updated.to_string()),
Theme::timing().render(&format!("{:.1}s", elapsed.as_secs_f64()))
);
// Detail: supporting counts, compact middle-dot format, zero-suppressed
let mut details: Vec<String> = Vec::new();
if result.discussions_fetched > 0 {
details.push(format!(
"{} {}",
Theme::info().render(&result.discussions_fetched.to_string()),
Theme::dim().render("discussions")
));
}
if result.resource_events_fetched > 0 {
details.push(format!(
"{} {}",
Theme::info().render(&result.resource_events_fetched.to_string()),
Theme::dim().render("events")
));
}
if result.mr_diffs_fetched > 0 {
details.push(format!(
"{} {}",
Theme::info().render(&result.mr_diffs_fetched.to_string()),
Theme::dim().render("diffs")
));
}
if result.statuses_enriched > 0 {
details.push(format!(
"{} {}",
Theme::info().render(&result.statuses_enriched.to_string()),
Theme::dim().render("statuses updated")
));
}
if !details.is_empty() {
let sep = Theme::dim().render(" \u{b7} ");
println!(" {}", details.join(&sep));
}
// Documents: regeneration + embedding as a second detail line
let mut doc_parts: Vec<String> = Vec::new();
if result.documents_regenerated > 0 {
doc_parts.push(format!(
"{} {}",
Theme::info().render(&result.documents_regenerated.to_string()),
Theme::dim().render("docs regenerated")
));
}
if result.documents_embedded > 0 {
doc_parts.push(format!(
"{} {}",
Theme::info().render(&result.documents_embedded.to_string()),
Theme::dim().render("embedded")
));
}
if result.documents_errored > 0 {
doc_parts
.push(Theme::error().render(&format!("{} doc errors", result.documents_errored)));
}
if !doc_parts.is_empty() {
let sep = Theme::dim().render(" \u{b7} ");
println!(" {}", doc_parts.join(&sep));
}
// Errors: visually prominent, only if non-zero
let mut errors: Vec<String> = Vec::new();
if result.resource_events_failed > 0 {
errors.push(format!("{} event failures", result.resource_events_failed));
}
if result.mr_diffs_failed > 0 {
errors.push(format!("{} diff failures", result.mr_diffs_failed));
}
if result.status_enrichment_errors > 0 {
errors.push(format!("{} status errors", result.status_enrichment_errors));
}
if result.embedding_failed > 0 {
errors.push(format!("{} embedding failures", result.embedding_failed));
}
if !errors.is_empty() {
println!(" {}", Theme::error().render(&errors.join(" \u{b7} ")));
}
println!();
}
if let Some(metrics) = metrics {
let stages = metrics.extract_timings();
if should_print_timings(show_timings, &stages) {
print_timing_summary(&stages);
}
}
}
fn issue_sub_rows(projects: &[ProjectSummary]) -> Vec<String> {
projects
.iter()
.map(|p| {
let mut parts: Vec<String> = Vec::new();
parts.push(format!(
"{} {}",
p.items_upserted,
if p.items_upserted == 1 {
"issue"
} else {
"issues"
}
));
if p.discussions_synced > 0 {
parts.push(format!("{} discussions", p.discussions_synced));
}
if p.statuses_seen > 0 || p.statuses_enriched > 0 {
parts.push(format!("{} statuses updated", p.statuses_enriched));
}
if p.events_fetched > 0 {
parts.push(format!("{} events", p.events_fetched));
}
if p.status_errors > 0 {
parts.push(Theme::warning().render(&format!("{} status errors", p.status_errors)));
}
if p.events_failed > 0 {
parts.push(Theme::warning().render(&format!("{} event failures", p.events_failed)));
}
let sep = Theme::dim().render(" \u{b7} ");
let detail = parts.join(&sep);
let path = Theme::muted().render(&format!("{:<30}", p.path));
format!(" {path} {detail}")
})
.collect()
}
fn status_sub_rows(projects: &[ProjectStatusEnrichment]) -> Vec<String> {
projects
.iter()
.map(|p| {
let total_errors = p.partial_errors + usize::from(p.error.is_some());
let mut parts: Vec<String> = vec![format!("{} statuses updated", p.enriched)];
if p.cleared > 0 {
parts.push(format!("{} cleared", p.cleared));
}
if p.seen > 0 {
parts.push(format!("{} seen", p.seen));
}
if total_errors > 0 {
parts.push(Theme::warning().render(&format!("{} errors", total_errors)));
} else if p.mode == "skipped" {
if let Some(reason) = &p.reason {
parts.push(Theme::dim().render(&format!("skipped ({reason})")));
} else {
parts.push(Theme::dim().render("skipped"));
}
}
let sep = Theme::dim().render(" \u{b7} ");
let detail = parts.join(&sep);
let path = Theme::muted().render(&format!("{:<30}", p.path));
format!(" {path} {detail}")
})
.collect()
}
fn mr_sub_rows(projects: &[ProjectSummary]) -> Vec<String> {
projects
.iter()
.map(|p| {
let mut parts: Vec<String> = Vec::new();
parts.push(format!(
"{} {}",
p.items_upserted,
if p.items_upserted == 1 { "MR" } else { "MRs" }
));
if p.discussions_synced > 0 {
parts.push(format!("{} discussions", p.discussions_synced));
}
if p.mr_diffs_fetched > 0 {
parts.push(format!("{} diffs", p.mr_diffs_fetched));
}
if p.events_fetched > 0 {
parts.push(format!("{} events", p.events_fetched));
}
if p.mr_diffs_failed > 0 {
parts
.push(Theme::warning().render(&format!("{} diff failures", p.mr_diffs_failed)));
}
if p.events_failed > 0 {
parts.push(Theme::warning().render(&format!("{} event failures", p.events_failed)));
}
let sep = Theme::dim().render(" \u{b7} ");
let detail = parts.join(&sep);
let path = Theme::muted().render(&format!("{:<30}", p.path));
format!(" {path} {detail}")
})
.collect()
}
fn emit_stage_line(
pb: &indicatif::ProgressBar,
icon: &str,
label: &str,
summary: &str,
elapsed: std::time::Duration,
) {
pb.finish_and_clear();
print_static_lines(&[format_stage_line(icon, label, summary, elapsed)]);
}
fn emit_stage_block(
pb: &indicatif::ProgressBar,
icon: &str,
label: &str,
summary: &str,
elapsed: std::time::Duration,
sub_rows: &[String],
) {
pb.finish_and_clear();
let mut lines = Vec::with_capacity(1 + sub_rows.len());
lines.push(format_stage_line(icon, label, summary, elapsed));
lines.extend(sub_rows.iter().cloned());
print_static_lines(&lines);
}
fn print_static_lines(lines: &[String]) {
crate::cli::progress::multi().suspend(|| {
for line in lines {
println!("{line}");
}
});
}
fn should_print_timings(show_timings: bool, stages: &[StageTiming]) -> bool {
show_timings && !stages.is_empty()
}
fn append_failures(summary: &mut String, failures: &[(&str, usize)]) {
let rendered: Vec<String> = failures
.iter()
.filter_map(|(label, count)| {
(*count > 0).then_some(Theme::warning().render(&format!("{count} {label}")))
})
.collect();
if !rendered.is_empty() {
summary.push_str(&format!(" ({})", rendered.join(", ")));
}
}
fn summarize_status_enrichment(projects: &[ProjectStatusEnrichment]) -> (String, bool) {
let statuses_enriched: usize = projects.iter().map(|p| p.enriched).sum();
let statuses_seen: usize = projects.iter().map(|p| p.seen).sum();
let statuses_cleared: usize = projects.iter().map(|p| p.cleared).sum();
let status_errors: usize = projects
.iter()
.map(|p| p.partial_errors + usize::from(p.error.is_some()))
.sum();
let skipped = projects.iter().filter(|p| p.mode == "skipped").count();
let mut parts = vec![format!(
"{} statuses updated",
format_number(statuses_enriched as i64)
)];
if statuses_cleared > 0 {
parts.push(format!(
"{} cleared",
format_number(statuses_cleared as i64)
));
}
if statuses_seen > 0 {
parts.push(format!("{} seen", format_number(statuses_seen as i64)));
}
if status_errors > 0 {
parts.push(format!("{} errors", format_number(status_errors as i64)));
} else if projects.is_empty() || skipped == projects.len() {
parts.push("skipped".to_string());
}
(parts.join(" \u{b7} "), status_errors > 0)
}
fn section(title: &str) {
println!("{}", render::section_divider(title));
}
fn print_timing_summary(stages: &[StageTiming]) {
section("Timing");
for stage in stages {
for sub in &stage.sub_stages {
print_stage_line(sub, 1);
}
}
}
fn print_stage_line(stage: &StageTiming, depth: usize) {
let indent = " ".repeat(depth);
let name = if let Some(ref project) = stage.project {
format!("{} ({})", stage.name, project)
} else {
stage.name.clone()
};
let pad_width = 30_usize.saturating_sub(indent.len() + name.len());
let dots = Theme::dim().render(&".".repeat(pad_width.max(2)));
let time_str = Theme::bold().render(&format!("{:.1}s", stage.elapsed_ms as f64 / 1000.0));
let mut parts: Vec<String> = Vec::new();
if stage.items_processed > 0 {
parts.push(format!("{} items", stage.items_processed));
}
if stage.errors > 0 {
parts.push(Theme::error().render(&format!("{} errors", stage.errors)));
}
if stage.rate_limit_hits > 0 {
parts.push(Theme::warning().render(&format!("{} rate limits", stage.rate_limit_hits)));
}
if parts.is_empty() {
println!("{indent}{name} {dots} {time_str}");
} else {
let suffix = parts.join(" \u{b7} ");
println!("{indent}{name} {dots} {time_str} ({suffix})");
}
for sub in &stage.sub_stages {
print_stage_line(sub, depth + 1);
}
}
#[derive(Serialize)]
struct SyncJsonOutput<'a> {
ok: bool,
data: &'a SyncResult,
meta: SyncMeta,
}
#[derive(Serialize)]
struct SyncMeta {
run_id: String,
elapsed_ms: u64,
#[serde(skip_serializing_if = "Vec::is_empty")]
stages: Vec<StageTiming>,
}
pub fn print_sync_json(result: &SyncResult, elapsed_ms: u64, metrics: Option<&MetricsLayer>) {
let stages = metrics.map_or_else(Vec::new, MetricsLayer::extract_timings);
let output = SyncJsonOutput {
ok: true,
data: result,
meta: SyncMeta {
run_id: result.run_id.clone(),
elapsed_ms,
stages,
},
};
match serde_json::to_string(&output) {
Ok(json) => println!("{json}"),
Err(e) => eprintln!("Error serializing to JSON: {e}"),
}
}
#[derive(Debug, Default, Serialize)]
pub struct SyncDryRunResult {
pub issues_preview: DryRunPreview,
pub mrs_preview: DryRunPreview,
pub would_generate_docs: bool,
pub would_embed: bool,
}
async fn run_sync_dry_run(config: &Config, options: &SyncOptions) -> Result<SyncResult> {
// Get dry run previews for both issues and MRs
let issues_preview = run_ingest_dry_run(config, "issues", None, options.full)?;
let mrs_preview = run_ingest_dry_run(config, "mrs", None, options.full)?;
let dry_result = SyncDryRunResult {
issues_preview,
mrs_preview,
would_generate_docs: !options.no_docs,
would_embed: !options.no_embed,
};
if options.robot_mode {
print_sync_dry_run_json(&dry_result);
} else {
print_sync_dry_run(&dry_result);
}
// Return an empty SyncResult since this is just a preview
Ok(SyncResult::default())
}
pub fn print_sync_dry_run(result: &SyncDryRunResult) {
println!(
"\n {} {}",
Theme::info().bold().render("Dry run"),
Theme::dim().render("(no changes will be made)")
);
print_dry_run_entity("Issues", &result.issues_preview);
print_dry_run_entity("Merge Requests", &result.mrs_preview);
// Pipeline stages
section("Pipeline");
let mut stages: Vec<String> = Vec::new();
if result.would_generate_docs {
stages.push("generate-docs".to_string());
} else {
stages.push(Theme::dim().render("generate-docs (skip)"));
}
if result.would_embed {
stages.push("embed".to_string());
} else {
stages.push(Theme::dim().render("embed (skip)"));
}
println!(" {}", stages.join(" \u{b7} "));
}
fn print_dry_run_entity(label: &str, preview: &DryRunPreview) {
section(label);
let mode = if preview.sync_mode == "full" {
Theme::warning().render("full")
} else {
Theme::success().render("incremental")
};
println!(" {} \u{b7} {} projects", mode, preview.projects.len());
for project in &preview.projects {
let sync_status = if !project.has_cursor {
Theme::warning().render("initial sync")
} else {
Theme::success().render("incremental")
};
if project.existing_count > 0 {
println!(
" {} \u{b7} {} \u{b7} {} existing",
&project.path, sync_status, project.existing_count
);
} else {
println!(" {} \u{b7} {}", &project.path, sync_status);
}
}
}
#[derive(Serialize)]
struct SyncDryRunJsonOutput {
ok: bool,
dry_run: bool,
data: SyncDryRunJsonData,
}
#[derive(Serialize)]
struct SyncDryRunJsonData {
stages: Vec<SyncDryRunStage>,
}
#[derive(Serialize)]
struct SyncDryRunStage {
name: String,
would_run: bool,
#[serde(skip_serializing_if = "Option::is_none")]
preview: Option<DryRunPreview>,
}
pub fn print_sync_dry_run_json(result: &SyncDryRunResult) {
let output = SyncDryRunJsonOutput {
ok: true,
dry_run: true,
data: SyncDryRunJsonData {
stages: vec![
SyncDryRunStage {
name: "ingest_issues".to_string(),
would_run: true,
preview: Some(result.issues_preview.clone()),
},
SyncDryRunStage {
name: "ingest_mrs".to_string(),
would_run: true,
preview: Some(result.mrs_preview.clone()),
},
SyncDryRunStage {
name: "generate_docs".to_string(),
would_run: result.would_generate_docs,
preview: None,
},
SyncDryRunStage {
name: "embed".to_string(),
would_run: result.would_embed,
preview: None,
},
],
},
};
match serde_json::to_string(&output) {
Ok(json) => println!("{json}"),
Err(e) => eprintln!("Error serializing to JSON: {e}"),
}
}
#[cfg(test)]
mod tests {
use super::*;
fn default_options() -> SyncOptions {
SyncOptions {
full: false,
force: false,
no_embed: false,
no_docs: false,
no_events: false,
robot_mode: false,
dry_run: false,
issue_iids: vec![],
mr_iids: vec![],
project: None,
preflight_only: false,
}
}
#[test]
fn append_failures_skips_zeroes() {
let mut summary = "base".to_string();
append_failures(&mut summary, &[("errors", 0), ("failures", 0)]);
assert_eq!(summary, "base");
}
#[test]
fn append_failures_renders_non_zero_counts() {
let mut summary = "base".to_string();
append_failures(&mut summary, &[("errors", 2), ("failures", 1)]);
assert!(summary.contains("base"));
assert!(summary.contains("2 errors"));
assert!(summary.contains("1 failures"));
}
#[test]
fn summarize_status_enrichment_reports_skipped_when_all_skipped() {
let projects = vec![ProjectStatusEnrichment {
path: "vs/typescript-code".to_string(),
mode: "skipped".to_string(),
reason: None,
seen: 0,
enriched: 0,
cleared: 0,
without_widget: 0,
partial_errors: 0,
first_partial_error: None,
error: None,
}];
let (summary, has_errors) = summarize_status_enrichment(&projects);
assert!(summary.contains("0 statuses updated"));
assert!(summary.contains("skipped"));
assert!(!has_errors);
}
#[test]
fn summarize_status_enrichment_reports_errors() {
let projects = vec![ProjectStatusEnrichment {
path: "vs/typescript-code".to_string(),
mode: "fetched".to_string(),
reason: None,
seen: 3,
enriched: 1,
cleared: 1,
without_widget: 0,
partial_errors: 2,
first_partial_error: None,
error: Some("boom".to_string()),
}];
let (summary, has_errors) = summarize_status_enrichment(&projects);
assert!(summary.contains("1 statuses updated"));
assert!(summary.contains("1 cleared"));
assert!(summary.contains("3 seen"));
assert!(summary.contains("3 errors"));
assert!(has_errors);
}
#[test]
fn should_print_timings_only_when_enabled_and_non_empty() {
let stages = vec![StageTiming {
name: "x".to_string(),
elapsed_ms: 10,
items_processed: 0,
items_skipped: 0,
errors: 0,
rate_limit_hits: 0,
retries: 0,
project: None,
sub_stages: vec![],
}];
assert!(should_print_timings(true, &stages));
assert!(!should_print_timings(false, &stages));
assert!(!should_print_timings(true, &[]));
}
#[test]
fn issue_sub_rows_include_project_and_statuses() {
let rows = issue_sub_rows(&[ProjectSummary {
path: "vs/typescript-code".to_string(),
items_upserted: 2,
discussions_synced: 0,
events_fetched: 0,
events_failed: 0,
statuses_enriched: 1,
statuses_seen: 5,
status_errors: 0,
mr_diffs_fetched: 0,
mr_diffs_failed: 0,
}]);
assert_eq!(rows.len(), 1);
assert!(rows[0].contains("vs/typescript-code"));
assert!(rows[0].contains("2 issues"));
assert!(rows[0].contains("1 statuses updated"));
}
#[test]
fn mr_sub_rows_include_project_and_diff_failures() {
let rows = mr_sub_rows(&[ProjectSummary {
path: "vs/python-code".to_string(),
items_upserted: 3,
discussions_synced: 0,
events_fetched: 0,
events_failed: 0,
statuses_enriched: 0,
statuses_seen: 0,
status_errors: 0,
mr_diffs_fetched: 4,
mr_diffs_failed: 1,
}]);
assert_eq!(rows.len(), 1);
assert!(rows[0].contains("vs/python-code"));
assert!(rows[0].contains("3 MRs"));
assert!(rows[0].contains("4 diffs"));
assert!(rows[0].contains("1 diff failures"));
}
#[test]
fn status_sub_rows_include_project_and_skip_reason() {
let rows = status_sub_rows(&[ProjectStatusEnrichment {
path: "vs/python-code".to_string(),
mode: "skipped".to_string(),
reason: Some("disabled".to_string()),
seen: 0,
enriched: 0,
cleared: 0,
without_widget: 0,
partial_errors: 0,
first_partial_error: None,
error: None,
}]);
assert_eq!(rows.len(), 1);
assert!(rows[0].contains("vs/python-code"));
assert!(rows[0].contains("0 statuses updated"));
assert!(rows[0].contains("skipped (disabled)"));
}
#[test]
fn is_surgical_with_issues() {
let opts = SyncOptions {
issue_iids: vec![1],
..default_options()
};
assert!(opts.is_surgical());
}
#[test]
fn is_surgical_with_mrs() {
let opts = SyncOptions {
mr_iids: vec![10],
..default_options()
};
assert!(opts.is_surgical());
}
#[test]
fn is_surgical_empty() {
let opts = default_options();
assert!(!opts.is_surgical());
}
#[test]
fn max_surgical_targets_is_100() {
assert_eq!(SyncOptions::MAX_SURGICAL_TARGETS, 100);
}
#[test]
fn sync_result_default_omits_surgical_fields() {
let result = SyncResult::default();
let json = serde_json::to_value(&result).unwrap();
assert!(json.get("surgical_mode").is_none());
assert!(json.get("surgical_iids").is_none());
assert!(json.get("entity_results").is_none());
assert!(json.get("preflight_only").is_none());
}
#[test]
fn sync_result_with_surgical_fields_serializes_correctly() {
let result = SyncResult {
surgical_mode: Some(true),
surgical_iids: Some(SurgicalIids {
issues: vec![7, 42],
merge_requests: vec![10],
}),
entity_results: Some(vec![
EntitySyncResult {
entity_type: "issue".to_string(),
iid: 7,
outcome: "synced".to_string(),
error: None,
toctou_reason: None,
},
EntitySyncResult {
entity_type: "issue".to_string(),
iid: 42,
outcome: "skipped_toctou".to_string(),
error: None,
toctou_reason: Some("updated_at changed".to_string()),
},
]),
preflight_only: Some(false),
..SyncResult::default()
};
let json = serde_json::to_value(&result).unwrap();
assert_eq!(json["surgical_mode"], true);
assert_eq!(json["surgical_iids"]["issues"], serde_json::json!([7, 42]));
assert_eq!(json["entity_results"].as_array().unwrap().len(), 2);
assert_eq!(json["entity_results"][1]["outcome"], "skipped_toctou");
assert_eq!(json["preflight_only"], false);
}
#[test]
fn entity_sync_result_omits_none_fields() {
let entity = EntitySyncResult {
entity_type: "merge_request".to_string(),
iid: 10,
outcome: "synced".to_string(),
error: None,
toctou_reason: None,
};
let json = serde_json::to_value(&entity).unwrap();
assert!(json.get("error").is_none());
assert!(json.get("toctou_reason").is_none());
assert!(json.get("entity_type").is_some());
}
#[test]
fn is_surgical_with_both_issues_and_mrs() {
let opts = SyncOptions {
issue_iids: vec![1, 2],
mr_iids: vec![10],
..default_options()
};
assert!(opts.is_surgical());
}
#[test]
fn is_not_surgical_with_only_project() {
let opts = SyncOptions {
project: Some("group/repo".to_string()),
..default_options()
};
assert!(!opts.is_surgical());
}
}