From f6d19a94676367fce62b199688a251a928c30eba Mon Sep 17 00:00:00 2001 From: teernisse Date: Wed, 4 Feb 2026 10:01:28 -0500 Subject: [PATCH] feat(sync): Instrument pipeline with tracing spans, run_id correlation, and metrics Add end-to-end observability to the sync and ingest pipelines: Sync command: - Generate UUID-based run_id for each sync invocation, propagated through all child spans for log correlation across stages - Accept MetricsLayer reference to extract hierarchical StageTiming data after pipeline completion for robot-mode performance output - Record sync runs in DB via SyncRunRecorder (start/succeed/fail lifecycle) - Wrap entire sync execution in a root tracing span with run_id field Ingest command: - Wrap run_ingest in an instrumented root span with run_id and resource_type - Add project path prefix to discussion progress bars for multi-project clarity - Reset resource_events_synced_for_updated_at on --full re-sync Sync status: - Expand from single last_run to configurable recent runs list (default 10) - Parse and expose StageTiming metrics from stored metrics_json - Add run_id, total_items_processed, total_errors to SyncRunInfo - Add mr_count to DataSummary for complete entity coverage Orchestrator: - Add #[instrument] with structured fields to issue and MR ingestion functions - Record items_processed, items_skipped, errors on span close for MetricsLayer - Emit granular progress events (IssuesFetchStarted, IssuesFetchComplete) - Pass project_id through to drain_resource_events for scoped job claiming Document regenerator and embedding pipeline: - Add #[instrument] spans with items_processed, items_skipped, errors fields - Record final counts on span close for metrics extraction Co-Authored-By: Claude Opus 4.5 --- src/cli/commands/ingest.rs | 33 ++- src/cli/commands/sync.rs | 350 ++++++++++++++++++++------------ src/cli/commands/sync_status.rs | 230 ++++++++++++++------- src/documents/regenerator.rs | 8 +- src/embedding/pipeline.rs | 9 +- src/ingestion/orchestrator.rs | 207 ++++++++++++++++--- 6 files changed, 603 insertions(+), 234 deletions(-) diff --git a/src/cli/commands/ingest.rs b/src/cli/commands/ingest.rs index b006f83..9bae9d1 100644 --- a/src/cli/commands/ingest.rs +++ b/src/cli/commands/ingest.rs @@ -5,6 +5,8 @@ use indicatif::{ProgressBar, ProgressStyle}; use rusqlite::Connection; use serde::Serialize; +use tracing::Instrument; + use crate::Config; use crate::core::db::create_connection; use crate::core::error::{LoreError, Result}; @@ -111,6 +113,24 @@ pub async fn run_ingest( force: bool, full: bool, display: IngestDisplay, +) -> Result { + let run_id = uuid::Uuid::new_v4().simple().to_string(); + let run_id = &run_id[..8]; + let span = tracing::info_span!("ingest", %run_id, %resource_type); + + run_ingest_inner(config, resource_type, project_filter, force, full, display) + .instrument(span) + .await +} + +/// Inner implementation of run_ingest, instrumented with a root span. +async fn run_ingest_inner( + config: &Config, + resource_type: &str, + project_filter: Option<&str>, + force: bool, + full: bool, + display: IngestDisplay, ) -> Result { // Validate resource type early if resource_type != "issues" && resource_type != "mrs" { @@ -162,15 +182,15 @@ pub async fn run_ingest( } for (local_project_id, _, path) in &projects { if resource_type == "issues" { - // Reset issue discussion watermarks first so discussions get re-synced + // Reset issue discussion and resource event watermarks so everything gets re-synced conn.execute( - "UPDATE issues SET discussions_synced_for_updated_at = NULL WHERE project_id = ?", + "UPDATE issues SET discussions_synced_for_updated_at = NULL, resource_events_synced_for_updated_at = NULL WHERE project_id = ?", [*local_project_id], )?; } else if resource_type == "mrs" { - // Reset MR discussion watermarks + // Reset MR discussion and resource event watermarks conn.execute( - "UPDATE merge_requests SET discussions_synced_for_updated_at = NULL WHERE project_id = ?", + "UPDATE merge_requests SET discussions_synced_for_updated_at = NULL, resource_events_synced_for_updated_at = NULL WHERE project_id = ?", [*local_project_id], )?; } @@ -255,11 +275,12 @@ pub async fn run_ingest( b.set_style( ProgressStyle::default_bar() .template( - " {spinner:.blue} Syncing discussions [{bar:30.cyan/dim}] {pos}/{len}", + " {spinner:.blue} {prefix:.cyan} Syncing discussions [{bar:30.cyan/dim}] {pos}/{len}", ) .unwrap() .progress_chars("=> "), ); + b.set_prefix(path.clone()); b }; @@ -296,7 +317,7 @@ pub async fn run_ingest( disc_bar_clone.set_length(total as u64); disc_bar_clone.set_style( ProgressStyle::default_bar() - .template(" {spinner:.blue} Fetching resource events [{bar:30.cyan/dim}] {pos}/{len}") + .template(" {spinner:.blue} {prefix:.cyan} Fetching resource events [{bar:30.cyan/dim}] {pos}/{len}") .unwrap() .progress_chars("=> "), ); diff --git a/src/cli/commands/sync.rs b/src/cli/commands/sync.rs index eb75cca..7f9d788 100644 --- a/src/cli/commands/sync.rs +++ b/src/cli/commands/sync.rs @@ -3,10 +3,12 @@ use console::style; use indicatif::{ProgressBar, ProgressStyle}; use serde::Serialize; +use tracing::Instrument; use tracing::{info, warn}; use crate::Config; use crate::core::error::Result; +use crate::core::metrics::{MetricsLayer, StageTiming}; use super::embed::run_embed; use super::generate_docs::run_generate_docs; @@ -26,6 +28,8 @@ pub struct SyncOptions { /// Result of the sync command. #[derive(Debug, Default, Serialize)] pub struct SyncResult { + #[serde(skip)] + pub run_id: String, pub issues_updated: usize, pub mrs_updated: usize, pub discussions_fetched: usize, @@ -52,133 +56,162 @@ fn stage_spinner(stage: u8, total: u8, msg: &str, robot_mode: bool) -> ProgressB } /// Run the full sync pipeline: ingest -> generate-docs -> embed. -pub async fn run_sync(config: &Config, options: SyncOptions) -> Result { - let mut result = SyncResult::default(); - - let ingest_display = if options.robot_mode { - IngestDisplay::silent() - } else { - IngestDisplay::progress_only() - }; - - let total_stages: u8 = if options.no_docs && options.no_embed { - 2 - } else if options.no_docs || options.no_embed { - 3 - } else { - 4 - }; - let mut current_stage: u8 = 0; - - // Stage 1: Ingest issues - current_stage += 1; - let spinner = stage_spinner( - current_stage, - total_stages, - "Fetching issues from GitLab...", - options.robot_mode, - ); - info!("Sync stage {current_stage}/{total_stages}: ingesting issues"); - let issues_result = run_ingest( - config, - "issues", - None, - options.force, - options.full, - ingest_display, - ) - .await?; - result.issues_updated = issues_result.issues_upserted; - result.discussions_fetched += issues_result.discussions_fetched; - result.resource_events_fetched += issues_result.resource_events_fetched; - result.resource_events_failed += issues_result.resource_events_failed; - spinner.finish_and_clear(); - - // Stage 2: Ingest MRs - current_stage += 1; - let spinner = stage_spinner( - current_stage, - total_stages, - "Fetching merge requests from GitLab...", - options.robot_mode, - ); - info!("Sync stage {current_stage}/{total_stages}: ingesting merge requests"); - let mrs_result = run_ingest( - config, - "mrs", - None, - options.force, - options.full, - ingest_display, - ) - .await?; - result.mrs_updated = mrs_result.mrs_upserted; - result.discussions_fetched += mrs_result.discussions_fetched; - result.resource_events_fetched += mrs_result.resource_events_fetched; - result.resource_events_failed += mrs_result.resource_events_failed; - spinner.finish_and_clear(); - - // Stage 3: Generate documents (unless --no-docs) - if !options.no_docs { - current_stage += 1; - let spinner = stage_spinner( - current_stage, - total_stages, - "Processing documents...", - options.robot_mode, - ); - info!("Sync stage {current_stage}/{total_stages}: generating documents"); - let docs_result = run_generate_docs(config, false, None)?; - result.documents_regenerated = docs_result.regenerated; - spinner.finish_and_clear(); - } else { - info!("Sync: skipping document generation (--no-docs)"); - } - - // Stage 4: Embed documents (unless --no-embed) - if !options.no_embed { - current_stage += 1; - let spinner = stage_spinner( - current_stage, - total_stages, - "Generating embeddings...", - options.robot_mode, - ); - info!("Sync stage {current_stage}/{total_stages}: embedding documents"); - match run_embed(config, options.full, false).await { - Ok(embed_result) => { - result.documents_embedded = embed_result.embedded; - spinner.finish_and_clear(); - } - Err(e) => { - // Graceful degradation: Ollama down is a warning, not an error - spinner.finish_and_clear(); - if !options.robot_mode { - eprintln!(" {} Embedding skipped ({})", style("warn").yellow(), e); - } - warn!(error = %e, "Embedding stage failed (Ollama may be unavailable), continuing"); - } +/// +/// `run_id` is an optional correlation ID for log/metrics tracing. +/// When called from `handle_sync_cmd`, this should be the same ID +/// stored in the `sync_runs` table so logs and DB records correlate. +pub async fn run_sync( + config: &Config, + options: SyncOptions, + run_id: Option<&str>, +) -> Result { + let generated_id; + let run_id = match run_id { + Some(id) => id, + None => { + generated_id = uuid::Uuid::new_v4().simple().to_string(); + &generated_id[..8] } - } else { - info!("Sync: skipping embedding (--no-embed)"); + }; + let span = tracing::info_span!("sync", %run_id); + + async move { + let mut result = SyncResult { + run_id: run_id.to_string(), + ..SyncResult::default() + }; + + let ingest_display = if options.robot_mode { + IngestDisplay::silent() + } else { + IngestDisplay::progress_only() + }; + + let total_stages: u8 = if options.no_docs && options.no_embed { + 2 + } else if options.no_docs || options.no_embed { + 3 + } else { + 4 + }; + let mut current_stage: u8 = 0; + + // Stage 1: Ingest issues + current_stage += 1; + let spinner = stage_spinner( + current_stage, + total_stages, + "Fetching issues from GitLab...", + options.robot_mode, + ); + info!("Sync stage {current_stage}/{total_stages}: ingesting issues"); + let issues_result = run_ingest( + config, + "issues", + None, + options.force, + options.full, + ingest_display, + ) + .await?; + result.issues_updated = issues_result.issues_upserted; + result.discussions_fetched += issues_result.discussions_fetched; + result.resource_events_fetched += issues_result.resource_events_fetched; + result.resource_events_failed += issues_result.resource_events_failed; + spinner.finish_and_clear(); + + // Stage 2: Ingest MRs + current_stage += 1; + let spinner = stage_spinner( + current_stage, + total_stages, + "Fetching merge requests from GitLab...", + options.robot_mode, + ); + info!("Sync stage {current_stage}/{total_stages}: ingesting merge requests"); + let mrs_result = run_ingest( + config, + "mrs", + None, + options.force, + options.full, + ingest_display, + ) + .await?; + result.mrs_updated = mrs_result.mrs_upserted; + result.discussions_fetched += mrs_result.discussions_fetched; + result.resource_events_fetched += mrs_result.resource_events_fetched; + result.resource_events_failed += mrs_result.resource_events_failed; + spinner.finish_and_clear(); + + // Stage 3: Generate documents (unless --no-docs) + if !options.no_docs { + current_stage += 1; + let spinner = stage_spinner( + current_stage, + total_stages, + "Processing documents...", + options.robot_mode, + ); + info!("Sync stage {current_stage}/{total_stages}: generating documents"); + let docs_result = run_generate_docs(config, false, None)?; + result.documents_regenerated = docs_result.regenerated; + spinner.finish_and_clear(); + } else { + info!("Sync: skipping document generation (--no-docs)"); + } + + // Stage 4: Embed documents (unless --no-embed) + if !options.no_embed { + current_stage += 1; + let spinner = stage_spinner( + current_stage, + total_stages, + "Generating embeddings...", + options.robot_mode, + ); + info!("Sync stage {current_stage}/{total_stages}: embedding documents"); + match run_embed(config, options.full, false).await { + Ok(embed_result) => { + result.documents_embedded = embed_result.embedded; + spinner.finish_and_clear(); + } + Err(e) => { + // Graceful degradation: Ollama down is a warning, not an error + spinner.finish_and_clear(); + if !options.robot_mode { + eprintln!(" {} Embedding skipped ({})", style("warn").yellow(), e); + } + warn!(error = %e, "Embedding stage failed (Ollama may be unavailable), continuing"); + } + } + } else { + info!("Sync: skipping embedding (--no-embed)"); + } + + info!( + issues = result.issues_updated, + mrs = result.mrs_updated, + discussions = result.discussions_fetched, + resource_events = result.resource_events_fetched, + resource_events_failed = result.resource_events_failed, + docs = result.documents_regenerated, + embedded = result.documents_embedded, + "Sync pipeline complete" + ); + + Ok(result) } - - info!( - issues = result.issues_updated, - mrs = result.mrs_updated, - discussions = result.discussions_fetched, - resource_events = result.resource_events_fetched, - resource_events_failed = result.resource_events_failed, - docs = result.documents_regenerated, - embedded = result.documents_embedded, - "Sync pipeline complete" - ); - - Ok(result) + .instrument(span) + .await } /// Print human-readable sync summary. -pub fn print_sync(result: &SyncResult, elapsed: std::time::Duration) { +pub fn print_sync( + result: &SyncResult, + elapsed: std::time::Duration, + metrics: Option<&MetricsLayer>, +) { println!("{} Sync complete:", style("done").green().bold(),); println!(" Issues updated: {}", result.issues_updated); println!(" MRs updated: {}", result.mrs_updated); @@ -204,6 +237,65 @@ pub fn print_sync(result: &SyncResult, elapsed: std::time::Duration) { ); println!(" Documents embedded: {}", result.documents_embedded); println!(" Elapsed: {:.1}s", elapsed.as_secs_f64()); + + // Print per-stage timing breakdown if metrics are available + if let Some(metrics) = metrics { + let stages = metrics.extract_timings(); + if !stages.is_empty() { + print_timing_summary(&stages); + } + } +} + +/// Print per-stage timing breakdown for interactive users. +fn print_timing_summary(stages: &[StageTiming]) { + println!(); + println!("{}", style("Stage timing:").dim()); + for stage in stages { + for sub in &stage.sub_stages { + print_stage_line(sub, 1); + } + } +} + +/// Print a single stage timing line with indentation. +fn print_stage_line(stage: &StageTiming, depth: usize) { + let indent = " ".repeat(depth); + let name = if let Some(ref project) = stage.project { + format!("{} ({})", stage.name, project) + } else { + stage.name.clone() + }; + let pad_width = 30_usize.saturating_sub(indent.len() + name.len()); + let dots = ".".repeat(pad_width.max(2)); + + let mut suffix = String::new(); + if stage.items_processed > 0 { + suffix.push_str(&format!("{} items", stage.items_processed)); + } + if stage.errors > 0 { + if !suffix.is_empty() { + suffix.push_str(", "); + } + suffix.push_str(&format!("{} errors", stage.errors)); + } + if stage.rate_limit_hits > 0 { + if !suffix.is_empty() { + suffix.push_str(", "); + } + suffix.push_str(&format!("{} rate limits", stage.rate_limit_hits)); + } + + let time_str = format!("{:.1}s", stage.elapsed_ms as f64 / 1000.0); + if suffix.is_empty() { + println!("{indent}{name} {dots} {time_str}"); + } else { + println!("{indent}{name} {dots} {time_str} ({suffix})"); + } + + for sub in &stage.sub_stages { + print_stage_line(sub, depth + 1); + } } /// JSON output for sync. @@ -216,15 +308,23 @@ struct SyncJsonOutput<'a> { #[derive(Serialize)] struct SyncMeta { + run_id: String, elapsed_ms: u64, + #[serde(skip_serializing_if = "Vec::is_empty")] + stages: Vec, } -/// Print JSON robot-mode sync output. -pub fn print_sync_json(result: &SyncResult, elapsed_ms: u64) { +/// Print JSON robot-mode sync output with optional metrics. +pub fn print_sync_json(result: &SyncResult, elapsed_ms: u64, metrics: Option<&MetricsLayer>) { + let stages = metrics.map_or_else(Vec::new, MetricsLayer::extract_timings); let output = SyncJsonOutput { ok: true, data: result, - meta: SyncMeta { elapsed_ms }, + meta: SyncMeta { + run_id: result.run_id.clone(), + elapsed_ms, + stages, + }, }; println!("{}", serde_json::to_string(&output).unwrap()); } diff --git a/src/cli/commands/sync_status.rs b/src/cli/commands/sync_status.rs index aad407e..a68ab92 100644 --- a/src/cli/commands/sync_status.rs +++ b/src/cli/commands/sync_status.rs @@ -7,8 +7,11 @@ use serde::Serialize; use crate::Config; use crate::core::db::create_connection; use crate::core::error::Result; +use crate::core::metrics::StageTiming; use crate::core::paths::get_db_path; -use crate::core::time::ms_to_iso; +use crate::core::time::{format_full_datetime, ms_to_iso}; + +const RECENT_RUNS_LIMIT: usize = 10; /// Sync run information. #[derive(Debug)] @@ -19,6 +22,10 @@ pub struct SyncRunInfo { pub status: String, pub command: String, pub error: Option, + pub run_id: Option, + pub total_items_processed: i64, + pub total_errors: i64, + pub stages: Option>, } /// Cursor position information. @@ -34,6 +41,7 @@ pub struct CursorInfo { #[derive(Debug)] pub struct DataSummary { pub issue_count: i64, + pub mr_count: i64, pub discussion_count: i64, pub note_count: i64, pub system_note_count: i64, @@ -42,7 +50,7 @@ pub struct DataSummary { /// Complete sync status result. #[derive(Debug)] pub struct SyncStatusResult { - pub last_run: Option, + pub runs: Vec, pub cursors: Vec, pub summary: DataSummary, } @@ -52,42 +60,49 @@ pub fn run_sync_status(config: &Config) -> Result { let db_path = get_db_path(config.storage.db_path.as_deref()); let conn = create_connection(&db_path)?; - let last_run = get_last_sync_run(&conn)?; + let runs = get_recent_sync_runs(&conn, RECENT_RUNS_LIMIT)?; let cursors = get_cursor_positions(&conn)?; let summary = get_data_summary(&conn)?; Ok(SyncStatusResult { - last_run, + runs, cursors, summary, }) } -/// Get the most recent sync run. -fn get_last_sync_run(conn: &Connection) -> Result> { +/// Get the most recent sync runs. +fn get_recent_sync_runs(conn: &Connection, limit: usize) -> Result> { let mut stmt = conn.prepare( - "SELECT id, started_at, finished_at, status, command, error + "SELECT id, started_at, finished_at, status, command, error, + run_id, total_items_processed, total_errors, metrics_json FROM sync_runs ORDER BY started_at DESC - LIMIT 1", + LIMIT ?1", )?; - let result = stmt.query_row([], |row| { - Ok(SyncRunInfo { - id: row.get(0)?, - started_at: row.get(1)?, - finished_at: row.get(2)?, - status: row.get(3)?, - command: row.get(4)?, - error: row.get(5)?, - }) - }); + let runs: std::result::Result, _> = stmt + .query_map([limit as i64], |row| { + let metrics_json: Option = row.get(9)?; + let stages: Option> = + metrics_json.and_then(|json| serde_json::from_str(&json).ok()); - match result { - Ok(info) => Ok(Some(info)), - Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), - Err(e) => Err(e.into()), - } + Ok(SyncRunInfo { + id: row.get(0)?, + started_at: row.get(1)?, + finished_at: row.get(2)?, + status: row.get(3)?, + command: row.get(4)?, + error: row.get(5)?, + run_id: row.get(6)?, + total_items_processed: row.get::<_, Option>(7)?.unwrap_or(0), + total_errors: row.get::<_, Option>(8)?.unwrap_or(0), + stages, + }) + })? + .collect(); + + Ok(runs?) } /// Get cursor positions for all projects/resource types. @@ -119,6 +134,10 @@ fn get_data_summary(conn: &Connection) -> Result { .query_row("SELECT COUNT(*) FROM issues", [], |row| row.get(0)) .unwrap_or(0); + let mr_count: i64 = conn + .query_row("SELECT COUNT(*) FROM merge_requests", [], |row| row.get(0)) + .unwrap_or(0); + let discussion_count: i64 = conn .query_row("SELECT COUNT(*) FROM discussions", [], |row| row.get(0)) .unwrap_or(0); @@ -133,6 +152,7 @@ fn get_data_summary(conn: &Connection) -> Result { Ok(DataSummary { issue_count, + mr_count, discussion_count, note_count, system_note_count, @@ -149,15 +169,17 @@ fn format_duration(ms: i64) -> String { format!("{}h {}m {}s", hours, minutes % 60, seconds % 60) } else if minutes > 0 { format!("{}m {}s", minutes, seconds % 60) + } else if ms >= 1000 { + format!("{:.1}s", ms as f64 / 1000.0) } else { - format!("{}s", seconds) + format!("{}ms", ms) } } /// Format number with thousands separators. fn format_number(n: i64) -> String { let is_negative = n < 0; - let abs_n = n.abs(); + let abs_n = n.unsigned_abs(); let s = abs_n.to_string(); let chars: Vec = s.chars().collect(); let mut result = String::new(); @@ -176,7 +198,10 @@ fn format_number(n: i64) -> String { result } -/// JSON output structures for robot mode. +// ============================================================================ +// JSON output structures for robot mode +// ============================================================================ + #[derive(Serialize)] struct SyncStatusJsonOutput { ok: bool, @@ -185,7 +210,7 @@ struct SyncStatusJsonOutput { #[derive(Serialize)] struct SyncStatusJsonData { - last_sync: Option, + runs: Vec, cursors: Vec, summary: SummaryJsonInfo, } @@ -201,7 +226,13 @@ struct SyncRunJsonInfo { #[serde(skip_serializing_if = "Option::is_none")] duration_ms: Option, #[serde(skip_serializing_if = "Option::is_none")] + run_id: Option, + total_items_processed: i64, + total_errors: i64, + #[serde(skip_serializing_if = "Option::is_none")] error: Option, + #[serde(skip_serializing_if = "Option::is_none")] + stages: Option>, } #[derive(Serialize)] @@ -217,6 +248,7 @@ struct CursorJsonInfo { #[derive(Serialize)] struct SummaryJsonInfo { issues: i64, + merge_requests: i64, discussions: i64, notes: i64, system_notes: i64, @@ -224,18 +256,26 @@ struct SummaryJsonInfo { /// Print sync status as JSON (robot mode). pub fn print_sync_status_json(result: &SyncStatusResult) { - let last_sync = result.last_run.as_ref().map(|run| { - let duration_ms = run.finished_at.map(|f| f - run.started_at); - SyncRunJsonInfo { - id: run.id, - status: run.status.clone(), - command: run.command.clone(), - started_at: ms_to_iso(run.started_at), - completed_at: run.finished_at.map(ms_to_iso), - duration_ms, - error: run.error.clone(), - } - }); + let runs = result + .runs + .iter() + .map(|run| { + let duration_ms = run.finished_at.map(|f| f - run.started_at); + SyncRunJsonInfo { + id: run.id, + status: run.status.clone(), + command: run.command.clone(), + started_at: ms_to_iso(run.started_at), + completed_at: run.finished_at.map(ms_to_iso), + duration_ms, + run_id: run.run_id.clone(), + total_items_processed: run.total_items_processed, + total_errors: run.total_errors, + error: run.error.clone(), + stages: run.stages.clone(), + } + }) + .collect(); let cursors = result .cursors @@ -251,10 +291,11 @@ pub fn print_sync_status_json(result: &SyncStatusResult) { let output = SyncStatusJsonOutput { ok: true, data: SyncStatusJsonData { - last_sync, + runs, cursors, summary: SummaryJsonInfo { issues: result.summary.issue_count, + merge_requests: result.summary.mr_count, discussions: result.summary.discussion_count, notes: result.summary.note_count - result.summary.system_note_count, system_notes: result.summary.system_note_count, @@ -265,41 +306,25 @@ pub fn print_sync_status_json(result: &SyncStatusResult) { println!("{}", serde_json::to_string(&output).unwrap()); } +// ============================================================================ +// Human-readable output +// ============================================================================ + /// Print sync status result. pub fn print_sync_status(result: &SyncStatusResult) { - // Last Sync section - println!("{}", style("Last Sync").bold().underlined()); + // Recent Runs section + println!("{}", style("Recent Sync Runs").bold().underlined()); println!(); - match &result.last_run { - Some(run) => { - let status_styled = match run.status.as_str() { - "succeeded" => style(&run.status).green(), - "failed" => style(&run.status).red(), - "running" => style(&run.status).yellow(), - _ => style(&run.status).dim(), - }; - - println!(" Status: {}", status_styled); - println!(" Command: {}", run.command); - println!(" Started: {}", ms_to_iso(run.started_at)); - - if let Some(finished) = run.finished_at { - println!(" Completed: {}", ms_to_iso(finished)); - let duration = finished - run.started_at; - println!(" Duration: {}", format_duration(duration)); - } - - if let Some(error) = &run.error { - println!(" Error: {}", style(error).red()); - } - } - None => { - println!(" {}", style("No sync runs recorded yet.").dim()); - println!( - " {}", - style("Run 'lore ingest --type=issues' to start.").dim() - ); + if result.runs.is_empty() { + println!(" {}", style("No sync runs recorded yet.").dim()); + println!( + " {}", + style("Run 'lore sync' or 'lore ingest' to start.").dim() + ); + } else { + for run in &result.runs { + print_run_line(run); } } @@ -344,6 +369,10 @@ pub fn print_sync_status(result: &SyncStatusResult) { " Issues: {}", style(format_number(result.summary.issue_count)).bold() ); + println!( + " MRs: {}", + style(format_number(result.summary.mr_count)).bold() + ); println!( " Discussions: {}", style(format_number(result.summary.discussion_count)).bold() @@ -361,14 +390,63 @@ pub fn print_sync_status(result: &SyncStatusResult) { ); } +/// Print a single run as a compact one-liner. +fn print_run_line(run: &SyncRunInfo) { + let status_styled = match run.status.as_str() { + "succeeded" => style(&run.status).green(), + "failed" => style(&run.status).red(), + "running" => style(&run.status).yellow(), + _ => style(&run.status).dim(), + }; + + let run_label = run + .run_id + .as_deref() + .map_or_else(|| format!("#{}", run.id), |id| format!("Run {id}")); + + let duration = run.finished_at.map(|f| format_duration(f - run.started_at)); + + let time = format_full_datetime(run.started_at); + + let mut parts = vec![ + format!("{}", style(run_label).bold()), + format!("{status_styled}"), + format!("{}", style(&run.command).dim()), + time, + ]; + + if let Some(d) = duration { + parts.push(d); + } else { + parts.push("in progress".to_string()); + } + + if run.total_items_processed > 0 { + parts.push(format!("{} items", run.total_items_processed)); + } + + if run.total_errors > 0 { + parts.push(format!( + "{}", + style(format!("{} errors", run.total_errors)).red() + )); + } + + println!(" {}", parts.join(" | ")); + + if let Some(error) = &run.error { + println!(" {}", style(error).red()); + } +} + #[cfg(test)] mod tests { use super::*; #[test] fn format_duration_handles_seconds() { - assert_eq!(format_duration(5_000), "5s"); - assert_eq!(format_duration(59_000), "59s"); + assert_eq!(format_duration(5_000), "5.0s"); + assert_eq!(format_duration(59_000), "59.0s"); } #[test] @@ -385,6 +463,12 @@ mod tests { assert_eq!(format_duration(3_723_000), "1h 2m 3s"); } + #[test] + fn format_duration_handles_milliseconds() { + assert_eq!(format_duration(500), "500ms"); + assert_eq!(format_duration(0), "0ms"); + } + #[test] fn format_number_adds_thousands_separators() { assert_eq!(format_number(1000), "1,000"); diff --git a/src/documents/regenerator.rs b/src/documents/regenerator.rs index e84844d..d39eb8f 100644 --- a/src/documents/regenerator.rs +++ b/src/documents/regenerator.rs @@ -1,6 +1,6 @@ use rusqlite::Connection; use rusqlite::OptionalExtension; -use tracing::{debug, warn}; +use tracing::{debug, instrument, warn}; use crate::core::error::Result; use crate::documents::{ @@ -21,6 +21,7 @@ pub struct RegenerateResult { /// /// Uses per-item error handling (fail-soft) and drains the queue completely /// via a bounded batch loop. Each dirty item is processed independently. +#[instrument(skip(conn), fields(items_processed, items_skipped, errors))] pub fn regenerate_dirty_documents(conn: &Connection) -> Result { let mut result = RegenerateResult::default(); @@ -61,6 +62,10 @@ pub fn regenerate_dirty_documents(conn: &Connection) -> Result "Document regeneration complete" ); + tracing::Span::current().record("items_processed", result.regenerated); + tracing::Span::current().record("items_skipped", result.unchanged); + tracing::Span::current().record("errors", result.errored); + Ok(result) } @@ -282,6 +287,7 @@ mod tests { updated_at INTEGER NOT NULL, last_seen_at INTEGER NOT NULL, discussions_synced_for_updated_at INTEGER, + resource_events_synced_for_updated_at INTEGER, web_url TEXT, raw_payload_id INTEGER ); diff --git a/src/embedding/pipeline.rs b/src/embedding/pipeline.rs index cd90816..56e7132 100644 --- a/src/embedding/pipeline.rs +++ b/src/embedding/pipeline.rs @@ -4,7 +4,7 @@ use std::collections::HashSet; use rusqlite::Connection; use sha2::{Digest, Sha256}; -use tracing::{info, warn}; +use tracing::{info, instrument, warn}; use crate::core::error::Result; use crate::embedding::change_detector::{count_pending_documents, find_pending_documents}; @@ -37,6 +37,7 @@ struct ChunkWork { /// /// Processes batches of BATCH_SIZE texts per Ollama API call. /// Uses keyset pagination over documents (DB_PAGE_SIZE per page). +#[instrument(skip(conn, client, progress_callback), fields(%model_name, items_processed, items_skipped, errors))] pub async fn embed_documents( conn: &Connection, client: &OllamaClient, @@ -87,7 +88,7 @@ pub async fn embed_documents( // Overflow guard: skip documents that produce too many chunks. // Must run BEFORE clear_document_embeddings so existing embeddings // are preserved when we skip. - if total_chunks as i64 >= CHUNK_ROWID_MULTIPLIER { + if total_chunks as i64 > CHUNK_ROWID_MULTIPLIER { warn!( doc_id = doc.document_id, chunk_count = total_chunks, @@ -295,6 +296,10 @@ pub async fn embed_documents( "Embedding pipeline complete" ); + tracing::Span::current().record("items_processed", result.embedded); + tracing::Span::current().record("items_skipped", result.skipped); + tracing::Span::current().record("errors", result.failed); + Ok(result) } diff --git a/src/ingestion/orchestrator.rs b/src/ingestion/orchestrator.rs index 20a9794..d34cd52 100644 --- a/src/ingestion/orchestrator.rs +++ b/src/ingestion/orchestrator.rs @@ -7,11 +7,11 @@ use futures::future::join_all; use rusqlite::Connection; -use tracing::{debug, info, warn}; +use tracing::{debug, info, instrument, warn}; use crate::Config; use crate::core::dependent_queue::{ - claim_jobs, complete_job, count_pending_jobs, enqueue_job, fail_job, reclaim_stale_locks, + claim_jobs, complete_job, count_claimable_jobs, enqueue_job, fail_job, reclaim_stale_locks, }; use crate::core::error::Result; use crate::gitlab::GitLabClient; @@ -108,6 +108,10 @@ pub async fn ingest_project_issues( } /// Ingest all issues and their discussions for a project with progress reporting. +#[instrument( + skip(conn, client, config, progress), + fields(project_id, gitlab_project_id, items_processed, items_skipped, errors) +)] pub async fn ingest_project_issues_with_progress( conn: &Connection, client: &GitLabClient, @@ -124,12 +128,17 @@ pub async fn ingest_project_issues_with_progress( }; // Step 1: Ingest issues + emit(ProgressEvent::IssuesFetchStarted); let issue_result = ingest_issues(conn, client, config, project_id, gitlab_project_id).await?; result.issues_fetched = issue_result.fetched; result.issues_upserted = issue_result.upserted; result.labels_created = issue_result.labels_created; + emit(ProgressEvent::IssuesFetchComplete { + total: result.issues_fetched, + }); + // Step 2: Sync discussions for issues that need it let issues_needing_sync = issue_result.issues_needing_discussion_sync; @@ -189,8 +198,15 @@ pub async fn ingest_project_issues_with_progress( } // Drain the queue - let drain_result = - drain_resource_events(conn, client, config, gitlab_project_id, &progress).await?; + let drain_result = drain_resource_events( + conn, + client, + config, + project_id, + gitlab_project_id, + &progress, + ) + .await?; result.resource_events_fetched = drain_result.fetched; result.resource_events_failed = drain_result.failed; } @@ -208,6 +224,10 @@ pub async fn ingest_project_issues_with_progress( "Project ingestion complete" ); + tracing::Span::current().record("items_processed", result.issues_upserted); + tracing::Span::current().record("items_skipped", result.issues_skipped_discussion_sync); + tracing::Span::current().record("errors", result.resource_events_failed); + Ok(result) } @@ -280,6 +300,10 @@ pub async fn ingest_project_merge_requests( } /// Ingest all merge requests and their discussions for a project with progress reporting. +#[instrument( + skip(conn, client, config, progress), + fields(project_id, gitlab_project_id, items_processed, items_skipped, errors) +)] pub async fn ingest_project_merge_requests_with_progress( conn: &Connection, client: &GitLabClient, @@ -380,8 +404,15 @@ pub async fn ingest_project_merge_requests_with_progress( debug!(enqueued, "Enqueued resource events jobs for MRs"); } - let drain_result = - drain_resource_events(conn, client, config, gitlab_project_id, &progress).await?; + let drain_result = drain_resource_events( + conn, + client, + config, + project_id, + gitlab_project_id, + &progress, + ) + .await?; result.resource_events_fetched = drain_result.fetched; result.resource_events_failed = drain_result.failed; } @@ -400,6 +431,10 @@ pub async fn ingest_project_merge_requests_with_progress( "MR project ingestion complete" ); + tracing::Span::current().record("items_processed", result.mrs_upserted); + tracing::Span::current().record("items_skipped", result.mrs_skipped_discussion_sync); + tracing::Span::current().record("errors", result.resource_events_failed); + Ok(result) } @@ -455,6 +490,7 @@ async fn sync_mr_discussions_sequential( pub struct DrainResult { pub fetched: usize, pub failed: usize, + pub skipped_not_found: usize, } /// Enqueue resource_events jobs for all entities of a given type in a project. @@ -466,21 +502,60 @@ fn enqueue_resource_events_for_entity_type( project_id: i64, entity_type: &str, ) -> Result { - // Query all entities for this project and enqueue resource_events jobs. - // The UNIQUE constraint on pending_dependent_fetches makes this idempotent - - // already-queued entities are silently skipped via INSERT OR IGNORE. + // Clean up obsolete jobs: remove resource_events jobs for entities whose + // watermark is already current (updated_at <= resource_events_synced_for_updated_at). + // These are leftover from prior runs that failed after watermark-stamping but + // before job deletion, or from entities that no longer need syncing. + // We intentionally keep jobs for entities that still need syncing (including + // in-progress or failed-with-backoff jobs) to preserve retry state. + match entity_type { + "issue" => { + conn.execute( + "DELETE FROM pending_dependent_fetches \ + WHERE project_id = ?1 AND entity_type = 'issue' AND job_type = 'resource_events' \ + AND entity_local_id IN ( \ + SELECT id FROM issues \ + WHERE project_id = ?1 \ + AND updated_at <= COALESCE(resource_events_synced_for_updated_at, 0) \ + )", + [project_id], + )?; + } + "merge_request" => { + conn.execute( + "DELETE FROM pending_dependent_fetches \ + WHERE project_id = ?1 AND entity_type = 'merge_request' AND job_type = 'resource_events' \ + AND entity_local_id IN ( \ + SELECT id FROM merge_requests \ + WHERE project_id = ?1 \ + AND updated_at <= COALESCE(resource_events_synced_for_updated_at, 0) \ + )", + [project_id], + )?; + } + _ => {} + } + + // Enqueue resource_events jobs only for entities whose updated_at exceeds + // their last resource event sync watermark. // // Use separate hardcoded queries per entity type to avoid format!-based SQL. let entities: Vec<(i64, i64)> = match entity_type { "issue" => { - let mut stmt = - conn.prepare_cached("SELECT id, iid FROM issues WHERE project_id = ?1")?; + let mut stmt = conn.prepare_cached( + "SELECT id, iid FROM issues \ + WHERE project_id = ?1 \ + AND updated_at > COALESCE(resource_events_synced_for_updated_at, 0)", + )?; stmt.query_map([project_id], |row| Ok((row.get(0)?, row.get(1)?)))? .collect::, _>>()? } "merge_request" => { - let mut stmt = - conn.prepare_cached("SELECT id, iid FROM merge_requests WHERE project_id = ?1")?; + let mut stmt = conn.prepare_cached( + "SELECT id, iid FROM merge_requests \ + WHERE project_id = ?1 \ + AND updated_at > COALESCE(resource_events_synced_for_updated_at, 0)", + )?; stmt.query_map([project_id], |row| Ok((row.get(0)?, row.get(1)?)))? .collect::, _>>()? } @@ -509,10 +584,15 @@ fn enqueue_resource_events_for_entity_type( /// /// Processes jobs sequentially since `rusqlite::Connection` is not `Send`. /// Uses exponential backoff on failure via `fail_job`. +#[instrument( + skip(conn, client, config, progress), + fields(project_id, gitlab_project_id, items_processed, errors) +)] async fn drain_resource_events( conn: &Connection, client: &GitLabClient, config: &Config, + project_id: i64, gitlab_project_id: i64, progress: &Option, ) -> Result { @@ -525,9 +605,15 @@ async fn drain_resource_events( info!(reclaimed, "Reclaimed stale resource event locks"); } - // Count total pending jobs for progress reporting - let pending_counts = count_pending_jobs(conn)?; - let total_pending = pending_counts.get("resource_events").copied().unwrap_or(0); + // Count only claimable jobs (unlocked, past retry backoff) for accurate progress. + // Using count_pending_jobs here would inflate the total with locked/backing-off + // jobs that can't be claimed in this drain run, causing the progress bar to + // never reach 100%. + let claimable_counts = count_claimable_jobs(conn, project_id)?; + let total_pending = claimable_counts + .get("resource_events") + .copied() + .unwrap_or(0); if total_pending == 0 { return Ok(result); @@ -547,15 +633,19 @@ async fn drain_resource_events( let mut seen_job_ids = std::collections::HashSet::new(); loop { - let jobs = claim_jobs(conn, "resource_events", batch_size)?; + let jobs = claim_jobs(conn, "resource_events", project_id, batch_size)?; if jobs.is_empty() { break; } + // Track whether any job in this batch was actually new. If every + // claimed job was already seen, break to avoid an infinite loop + // (can happen with clock skew or zero-backoff edge cases). + let mut any_new_in_batch = false; + for job in &jobs { // Guard against re-processing a job that was failed and re-claimed - // within the same drain run (shouldn't happen due to backoff, but - // defensive against clock skew or zero-backoff edge cases). + // within the same drain run. if !seen_job_ids.insert(job.id) { warn!( job_id = job.id, @@ -563,6 +653,7 @@ async fn drain_resource_events( ); continue; } + any_new_in_batch = true; match client .fetch_all_resource_events(gitlab_project_id, &job.entity_type, job.entity_iid) @@ -582,6 +673,11 @@ async fn drain_resource_events( match store_result { Ok(()) => { complete_job(conn, job.id)?; + update_resource_event_watermark( + conn, + &job.entity_type, + job.entity_local_id, + )?; result.fetched += 1; } Err(e) => { @@ -597,14 +693,34 @@ async fn drain_resource_events( } } Err(e) => { - warn!( - entity_type = %job.entity_type, - entity_iid = job.entity_iid, - error = %e, - "Failed to fetch resource events from GitLab" - ); - fail_job(conn, job.id, &e.to_string())?; - result.failed += 1; + // Only 404 (not found) is truly permanent -- the resource + // events endpoint doesn't exist for this entity. Stamp the + // watermark so we skip it next run. All other errors + // (403, auth, network) get backoff retry. + if e.is_permanent_api_error() { + debug!( + entity_type = %job.entity_type, + entity_iid = job.entity_iid, + error = %e, + "Permanent API error for resource events, marking complete" + ); + complete_job(conn, job.id)?; + update_resource_event_watermark( + conn, + &job.entity_type, + job.entity_local_id, + )?; + result.skipped_not_found += 1; + } else { + warn!( + entity_type = %job.entity_type, + entity_iid = job.entity_iid, + error = %e, + "Failed to fetch resource events from GitLab" + ); + fail_job(conn, job.id, &e.to_string())?; + result.failed += 1; + } } } @@ -614,6 +730,12 @@ async fn drain_resource_events( total: total_pending, }); } + + // If every job in this batch was already seen, stop to prevent spinning. + if !any_new_in_batch { + warn!("All claimed jobs were already processed, breaking drain loop"); + break; + } } emit(ProgressEvent::ResourceEventsFetchComplete { @@ -629,6 +751,9 @@ async fn drain_resource_events( ); } + tracing::Span::current().record("items_processed", result.fetched); + tracing::Span::current().record("errors", result.failed); + Ok(result) } @@ -680,6 +805,33 @@ fn store_resource_events( Ok(()) } +/// Update the resource event watermark for an entity after successful event fetch. +/// +/// Sets `resource_events_synced_for_updated_at = updated_at` so the entity +/// won't be re-enqueued until its `updated_at` advances again. +fn update_resource_event_watermark( + conn: &Connection, + entity_type: &str, + entity_local_id: i64, +) -> Result<()> { + match entity_type { + "issue" => { + conn.execute( + "UPDATE issues SET resource_events_synced_for_updated_at = updated_at WHERE id = ?", + [entity_local_id], + )?; + } + "merge_request" => { + conn.execute( + "UPDATE merge_requests SET resource_events_synced_for_updated_at = updated_at WHERE id = ?", + [entity_local_id], + )?; + } + _ => {} + } + Ok(()) +} + #[cfg(test)] mod tests { use super::*; @@ -722,6 +874,7 @@ mod tests { let result = DrainResult::default(); assert_eq!(result.fetched, 0); assert_eq!(result.failed, 0); + assert_eq!(result.skipped_not_found, 0); } #[test]