From 329c8f453980ec1eaeca41e3a8ab88f9c723519b Mon Sep 17 00:00:00 2001 From: teernisse Date: Wed, 4 Feb 2026 10:01:28 -0500 Subject: [PATCH] feat(observability): Add metrics, logging, and sync-run core modules Introduce the foundational observability layer for the sync pipeline: - MetricsLayer: Custom tracing subscriber layer that captures span timing and structured fields, materializing them into a hierarchical Vec tree for robot-mode performance data output - logging: Dual-layer subscriber infrastructure with configurable stderr verbosity (-v/-vv/-vvv) and always-on JSON file logging with daily rotation and configurable retention (default 30 days) - SyncRunRecorder: Compile-time enforced lifecycle recorder for sync_runs table (start -> succeed|fail), with correlation IDs and aggregate counts - LoggingConfig: New config section for log_dir, retention_days, and file_logging toggle - get_log_dir(): Path helper for log directory resolution - is_permanent_api_error(): Distinguish retryable vs permanent API failures (only 404 is truly permanent; 403/auth errors may be environmental) Database changes: - Migration 013: Add resource_events_synced_for_updated_at watermark columns to issues and merge_requests tables for incremental resource event sync - Migration 014: Enrich sync_runs with run_id correlation ID, aggregate counts (total_items_processed, total_errors), and run_id index - Wrap file-based migrations in savepoints for rollback safety Dependencies: Add uuid (run_id generation), tracing-appender (file logging) Co-Authored-By: Claude Opus 4.5 --- Cargo.lock | 93 +++ Cargo.toml | 3 +- migrations/013_resource_event_watermarks.sql | 10 + migrations/014_sync_runs_enrichment.sql | 12 + src/core/config.rs | 38 ++ src/core/db.rs | 30 +- src/core/error.rs | 11 +- src/core/logging.rs | 217 +++++++ src/core/metrics.rs | 609 +++++++++++++++++++ src/core/mod.rs | 3 + src/core/paths.rs | 9 + src/core/sync_run.rs | 232 +++++++ 12 files changed, 1263 insertions(+), 4 deletions(-) create mode 100644 migrations/013_resource_event_watermarks.sql create mode 100644 migrations/014_sync_runs_enrichment.sql create mode 100644 src/core/logging.rs create mode 100644 src/core/metrics.rs create mode 100644 src/core/sync_run.rs diff --git a/Cargo.lock b/Cargo.lock index ea9a279..0d446b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -296,6 +296,21 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + [[package]] name = "crossterm" version = "0.29.0" @@ -347,6 +362,15 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b" +[[package]] +name = "deranged" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ececcb659e7ba858fb4f10388c250a7252eb0a27373f1a72b8748afdd248e587" +dependencies = [ + "powerfmt", +] + [[package]] name = "dialoguer" version = "0.12.0" @@ -1108,6 +1132,7 @@ dependencies = [ "thiserror", "tokio", "tracing", + "tracing-appender", "tracing-subscriber", "url", "urlencoding", @@ -1183,6 +1208,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "num-conv" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050" + [[package]] name = "num-traits" version = "0.2.19" @@ -1343,6 +1374,12 @@ dependencies = [ "zerovec", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -1878,6 +1915,37 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "time" +version = "0.3.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9da98b7d9b7dad93488a84b8248efc35352b0b2657397d4167e7ad67e5d535e5" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde_core", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca" + +[[package]] +name = "time-macros" +version = "0.2.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78cc610bac2dcee56805c99642447d4c5dbde4d01f752ffea0199aee1f601dc4" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tinystr" version = "0.8.2" @@ -2003,6 +2071,18 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-appender" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "786d480bce6247ab75f005b14ae1624ad978d3029d9113f0a22fa1ac773faeaf" +dependencies = [ + "crossbeam-channel", + "thiserror", + "time", + "tracing-subscriber", +] + [[package]] name = "tracing-attributes" version = "0.1.31" @@ -2035,6 +2115,16 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-serde" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1" +dependencies = [ + "serde", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.22" @@ -2045,12 +2135,15 @@ dependencies = [ "nu-ansi-term", "once_cell", "regex-automata", + "serde", + "serde_json", "sharded-slab", "smallvec", "thread_local", "tracing", "tracing-core", "tracing-log", + "tracing-serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 32b5cf6..3af10e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,7 +52,8 @@ libc = "0.2" # Logging tracing = "0.1" -tracing-subscriber = { version = "0.3", features = ["env-filter"] } +tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } +tracing-appender = "0.2" [dev-dependencies] tempfile = "3" diff --git a/migrations/013_resource_event_watermarks.sql b/migrations/013_resource_event_watermarks.sql new file mode 100644 index 0000000..ad9d641 --- /dev/null +++ b/migrations/013_resource_event_watermarks.sql @@ -0,0 +1,10 @@ +-- Migration 013: Add resource event sync watermarks +-- Mirrors the discussions_synced_for_updated_at pattern so that only entities +-- whose updated_at exceeds the last resource event sync get re-enqueued. + +ALTER TABLE issues ADD COLUMN resource_events_synced_for_updated_at INTEGER; +ALTER TABLE merge_requests ADD COLUMN resource_events_synced_for_updated_at INTEGER; + +-- Update schema version +INSERT INTO schema_version (version, applied_at, description) +VALUES (13, strftime('%s', 'now') * 1000, 'Add resource event sync watermarks to issues and merge_requests'); diff --git a/migrations/014_sync_runs_enrichment.sql b/migrations/014_sync_runs_enrichment.sql new file mode 100644 index 0000000..dc83412 --- /dev/null +++ b/migrations/014_sync_runs_enrichment.sql @@ -0,0 +1,12 @@ +-- Migration 014: sync_runs enrichment for observability +-- Adds correlation ID and aggregate counts for queryable sync history + +ALTER TABLE sync_runs ADD COLUMN run_id TEXT; +ALTER TABLE sync_runs ADD COLUMN total_items_processed INTEGER DEFAULT 0; +ALTER TABLE sync_runs ADD COLUMN total_errors INTEGER DEFAULT 0; + +-- Index for correlation queries (find run by run_id from logs) +CREATE INDEX IF NOT EXISTS idx_sync_runs_run_id ON sync_runs(run_id); + +INSERT INTO schema_version (version, applied_at, description) +VALUES (14, strftime('%s', 'now') * 1000, 'Sync runs enrichment for observability'); diff --git a/src/core/config.rs b/src/core/config.rs index 57d1e60..6cae328 100644 --- a/src/core/config.rs +++ b/src/core/config.rs @@ -120,6 +120,41 @@ impl Default for EmbeddingConfig { } } +/// Logging and observability settings. +#[derive(Debug, Clone, Deserialize)] +#[serde(default)] +pub struct LoggingConfig { + /// Directory for log files. Default: ~/.local/share/lore/logs/ + #[serde(rename = "logDir")] + pub log_dir: Option, + + /// Days to retain log files. Default: 30. Set to 0 to disable file logging. + #[serde(rename = "retentionDays", default = "default_retention_days")] + pub retention_days: u32, + + /// Enable JSON log files. Default: true. + #[serde(rename = "fileLogging", default = "default_file_logging")] + pub file_logging: bool, +} + +fn default_retention_days() -> u32 { + 30 +} + +fn default_file_logging() -> bool { + true +} + +impl Default for LoggingConfig { + fn default() -> Self { + Self { + log_dir: None, + retention_days: default_retention_days(), + file_logging: default_file_logging(), + } + } +} + /// Main configuration structure. #[derive(Debug, Clone, Deserialize)] pub struct Config { @@ -134,6 +169,9 @@ pub struct Config { #[serde(default)] pub embedding: EmbeddingConfig, + + #[serde(default)] + pub logging: LoggingConfig, } impl Config { diff --git a/src/core/db.rs b/src/core/db.rs index d7cca89..c2b7079 100644 --- a/src/core/db.rs +++ b/src/core/db.rs @@ -47,6 +47,10 @@ const MIGRATIONS: &[(&str, &str)] = &[ "013", include_str!("../../migrations/013_resource_event_watermarks.sql"), ), + ( + "014", + include_str!("../../migrations/014_sync_runs_enrichment.sql"), + ), ]; /// Create a database connection with production-grade pragmas. @@ -190,13 +194,35 @@ pub fn run_migrations_from_dir(conn: &Connection, migrations_dir: &Path) -> Resu let sql = fs::read_to_string(entry.path())?; - conn.execute_batch(&sql) + // Wrap each migration in a savepoint to prevent partial application, + // matching the safety guarantees of run_migrations(). + let savepoint_name = format!("migration_{}", version); + conn.execute_batch(&format!("SAVEPOINT {}", savepoint_name)) .map_err(|e| LoreError::MigrationFailed { version, - message: e.to_string(), + message: format!("Failed to create savepoint: {}", e), source: Some(e), })?; + match conn.execute_batch(&sql) { + Ok(()) => { + conn.execute_batch(&format!("RELEASE {}", savepoint_name)) + .map_err(|e| LoreError::MigrationFailed { + version, + message: format!("Failed to release savepoint: {}", e), + source: Some(e), + })?; + } + Err(e) => { + let _ = conn.execute_batch(&format!("ROLLBACK TO {}", savepoint_name)); + return Err(LoreError::MigrationFailed { + version, + message: e.to_string(), + source: Some(e), + }); + } + } + info!(version, file = %filename_str, "Migration applied"); } diff --git a/src/core/error.rs b/src/core/error.rs index 8e42e5f..f80d580 100644 --- a/src/core/error.rs +++ b/src/core/error.rs @@ -59,7 +59,7 @@ impl ErrorCode { pub fn exit_code(&self) -> i32 { match self { Self::InternalError => 1, - Self::ConfigNotFound => 20, + Self::ConfigNotFound => 2, Self::ConfigInvalid => 3, Self::TokenNotSet => 4, Self::GitLabAuthFailed => 5, @@ -240,6 +240,15 @@ impl LoreError { } } + /// Whether this error represents a permanent API failure that should not be retried. + /// + /// Only 404 (not found) is truly permanent: the resource doesn't exist and never will. + /// 403 and auth errors are NOT permanent — they may be environmental (VPN down, + /// token rotation, temporary restrictions) and should be retried with backoff. + pub fn is_permanent_api_error(&self) -> bool { + matches!(self, Self::GitLabNotFound { .. }) + } + /// Get the exit code for this error. pub fn exit_code(&self) -> i32 { self.code().exit_code() diff --git a/src/core/logging.rs b/src/core/logging.rs new file mode 100644 index 0000000..a34d724 --- /dev/null +++ b/src/core/logging.rs @@ -0,0 +1,217 @@ +//! Logging infrastructure: dual-layer subscriber setup and log file retention. +//! +//! Provides a layered tracing subscriber with: +//! - **stderr layer**: Human-readable or JSON format, controlled by `-v` flags +//! - **file layer**: Always-on JSON output to daily-rotated log files + +use std::fs; +use std::path::Path; + +use tracing_subscriber::EnvFilter; + +/// Build an `EnvFilter` from the verbosity count. +/// +/// | Count | App Level | Dep Level | +/// |-------|-----------|-----------| +/// | 0 | INFO | WARN | +/// | 1 | DEBUG | WARN | +/// | 2 | DEBUG | INFO | +/// | 3+ | TRACE | DEBUG | +pub fn build_stderr_filter(verbose: u8, quiet: bool) -> EnvFilter { + // RUST_LOG always wins if set + if std::env::var("RUST_LOG").is_ok() { + return EnvFilter::from_default_env(); + } + + // -q overrides -v for stderr + if quiet { + return EnvFilter::new("lore=warn,error"); + } + + let directives = match verbose { + 0 => "lore=info,warn", + 1 => "lore=debug,warn", + 2 => "lore=debug,info", + _ => "lore=trace,debug", + }; + + EnvFilter::new(directives) +} + +/// Build an `EnvFilter` for the file layer. +/// +/// Always captures DEBUG+ for `lore::*` and WARN+ for dependencies, +/// unless `RUST_LOG` is set (which overrides everything). +pub fn build_file_filter() -> EnvFilter { + if std::env::var("RUST_LOG").is_ok() { + return EnvFilter::from_default_env(); + } + + EnvFilter::new("lore=debug,warn") +} + +/// Delete log files older than `retention_days` from the given directory. +/// +/// Only deletes files matching the `lore.YYYY-MM-DD.log` pattern. +/// Returns the number of files deleted. +pub fn cleanup_old_logs(log_dir: &Path, retention_days: u32) -> usize { + if retention_days == 0 || !log_dir.exists() { + return 0; + } + + let cutoff = chrono::Utc::now() - chrono::Duration::days(i64::from(retention_days)); + let cutoff_date = cutoff.format("%Y-%m-%d").to_string(); + let mut deleted = 0; + + let entries = match fs::read_dir(log_dir) { + Ok(entries) => entries, + Err(_) => return 0, + }; + + for entry in entries.flatten() { + let file_name = entry.file_name(); + let name = file_name.to_string_lossy(); + + // Match pattern: lore.YYYY-MM-DD.log or lore.YYYY-MM-DD (tracing-appender format) + if let Some(date_str) = extract_log_date(&name) + && date_str < cutoff_date + && fs::remove_file(entry.path()).is_ok() + { + deleted += 1; + } + } + + deleted +} + +/// Extract the date portion from a log filename. +/// +/// Matches: `lore.YYYY-MM-DD.log` or `lore.YYYY-MM-DD` +fn extract_log_date(filename: &str) -> Option { + let rest = filename.strip_prefix("lore.")?; + + // Must have at least YYYY-MM-DD (10 ASCII chars). + // Use get() to avoid panicking on non-ASCII filenames. + let date_part = rest.get(..10)?; + + // Validate it looks like a date + let parts: Vec<&str> = date_part.split('-').collect(); + if parts.len() != 3 || parts[0].len() != 4 || parts[1].len() != 2 || parts[2].len() != 2 { + return None; + } + + // Check all parts are numeric (also ensures ASCII) + if !parts.iter().all(|p| p.chars().all(|c| c.is_ascii_digit())) { + return None; + } + + // After the date, must be end-of-string or ".log" + let suffix = rest.get(10..)?; + if suffix.is_empty() || suffix == ".log" { + Some(date_part.to_string()) + } else { + None + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::fs::File; + use tempfile::TempDir; + + #[test] + fn test_extract_log_date_with_extension() { + assert_eq!( + extract_log_date("lore.2026-02-04.log"), + Some("2026-02-04".to_string()) + ); + } + + #[test] + fn test_extract_log_date_without_extension() { + assert_eq!( + extract_log_date("lore.2026-02-04"), + Some("2026-02-04".to_string()) + ); + } + + #[test] + fn test_extract_log_date_rejects_non_log_files() { + assert_eq!(extract_log_date("other.txt"), None); + assert_eq!(extract_log_date("lore.config.json"), None); + assert_eq!(extract_log_date("lore.db"), None); + } + + #[test] + fn test_extract_log_date_rejects_invalid_dates() { + assert_eq!(extract_log_date("lore.not-a-date.log"), None); + assert_eq!(extract_log_date("lore.20260204.log"), None); + } + + #[test] + fn test_cleanup_old_logs_deletes_old_files() { + let dir = TempDir::new().unwrap(); + + // Create old log files (well before any reasonable retention) + File::create(dir.path().join("lore.2020-01-01.log")).unwrap(); + File::create(dir.path().join("lore.2020-01-15.log")).unwrap(); + + // Create a recent log file (today) + let today = chrono::Utc::now().format("%Y-%m-%d").to_string(); + let recent_name = format!("lore.{today}.log"); + File::create(dir.path().join(&recent_name)).unwrap(); + + // Create a non-log file that should NOT be deleted + File::create(dir.path().join("other.txt")).unwrap(); + + let deleted = cleanup_old_logs(dir.path(), 7); + + assert_eq!(deleted, 2); + assert!(!dir.path().join("lore.2020-01-01.log").exists()); + assert!(!dir.path().join("lore.2020-01-15.log").exists()); + assert!(dir.path().join(&recent_name).exists()); + assert!(dir.path().join("other.txt").exists()); + } + + #[test] + fn test_cleanup_old_logs_zero_retention_is_noop() { + let dir = TempDir::new().unwrap(); + File::create(dir.path().join("lore.2020-01-01.log")).unwrap(); + + let deleted = cleanup_old_logs(dir.path(), 0); + assert_eq!(deleted, 0); + assert!(dir.path().join("lore.2020-01-01.log").exists()); + } + + #[test] + fn test_cleanup_old_logs_nonexistent_dir() { + let deleted = cleanup_old_logs(Path::new("/nonexistent/dir"), 7); + assert_eq!(deleted, 0); + } + + #[test] + fn test_build_stderr_filter_default() { + // Can't easily assert filter contents, but verify it doesn't panic + let _filter = build_stderr_filter(0, false); + } + + #[test] + fn test_build_stderr_filter_verbose_levels() { + let _f0 = build_stderr_filter(0, false); + let _f1 = build_stderr_filter(1, false); + let _f2 = build_stderr_filter(2, false); + let _f3 = build_stderr_filter(3, false); + } + + #[test] + fn test_build_stderr_filter_quiet_overrides_verbose() { + // Quiet should win over verbose + let _filter = build_stderr_filter(3, true); + } + + #[test] + fn test_build_file_filter() { + let _filter = build_file_filter(); + } +} diff --git a/src/core/metrics.rs b/src/core/metrics.rs new file mode 100644 index 0000000..3430596 --- /dev/null +++ b/src/core/metrics.rs @@ -0,0 +1,609 @@ +//! Performance metrics types and tracing layer for sync pipeline observability. +//! +//! Provides: +//! - [`StageTiming`]: Serializable timing/counter data for pipeline stages +//! - [`MetricsLayer`]: Custom tracing subscriber layer that captures span timing + +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use std::time::Instant; + +use serde::{Deserialize, Serialize}; +use tracing::Subscriber; +use tracing::span::{Attributes, Id, Record}; +use tracing_subscriber::layer::{Context, Layer}; +use tracing_subscriber::registry::LookupSpan; + +/// Returns true when value is zero (for serde `skip_serializing_if`). +fn is_zero(v: &usize) -> bool { + *v == 0 +} + +/// Timing and counter data for a single pipeline stage. +/// +/// Supports nested sub-stages for hierarchical timing breakdowns. +/// Fields with zero/empty values are omitted from JSON output to +/// keep robot-mode payloads compact. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StageTiming { + pub name: String, + #[serde(skip_serializing_if = "Option::is_none", default)] + pub project: Option, + pub elapsed_ms: u64, + pub items_processed: usize, + #[serde(skip_serializing_if = "is_zero", default)] + pub items_skipped: usize, + #[serde(skip_serializing_if = "is_zero", default)] + pub errors: usize, + #[serde(skip_serializing_if = "is_zero", default)] + pub rate_limit_hits: usize, + #[serde(skip_serializing_if = "is_zero", default)] + pub retries: usize, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub sub_stages: Vec, +} + +// ============================================================================ +// MetricsLayer: custom tracing subscriber layer +// ============================================================================ + +/// Internal data tracked per open span. +struct SpanData { + name: String, + parent_id: Option, + start: Instant, + fields: HashMap, + rate_limit_hits: usize, + retries: usize, +} + +/// Completed span data with its original ID and parent ID. +struct CompletedSpan { + id: u64, + parent_id: Option, + timing: StageTiming, +} + +/// Custom tracing layer that captures span timing and structured fields. +/// +/// Collects data from `#[instrument]` spans and materializes it into +/// a `Vec` tree via [`extract_timings`]. +/// +/// Thread-safe via `Arc>` — suitable for concurrent span operations. +#[derive(Debug, Clone)] +pub struct MetricsLayer { + spans: Arc>>, + completed: Arc>>, +} + +impl Default for MetricsLayer { + fn default() -> Self { + Self::new() + } +} + +impl MetricsLayer { + pub fn new() -> Self { + Self { + spans: Arc::new(Mutex::new(HashMap::new())), + completed: Arc::new(Mutex::new(Vec::new())), + } + } + + /// Extract timing tree for a completed run. + /// + /// Returns the top-level stages with sub-stages nested. + /// Call after the root span closes. + pub fn extract_timings(&self) -> Vec { + let completed = self.completed.lock().unwrap_or_else(|e| e.into_inner()); + if completed.is_empty() { + return Vec::new(); + } + + // Build children map: parent_id -> Vec + let mut children_map: HashMap> = HashMap::new(); + let mut roots = Vec::new(); + let mut id_to_timing: HashMap = HashMap::new(); + + // First pass: collect all timings by ID + for entry in completed.iter() { + id_to_timing.insert(entry.id, entry.timing.clone()); + } + + // Second pass: process in reverse order (children close before parents) + // to build the tree bottom-up + for entry in completed.iter() { + // Attach any children that were collected for this span + if let Some(timing) = id_to_timing.get_mut(&entry.id) + && let Some(children) = children_map.remove(&entry.id) + { + timing.sub_stages = children; + } + + if let Some(parent_id) = entry.parent_id { + // This is a child span — attach to parent's children + if let Some(timing) = id_to_timing.remove(&entry.id) { + children_map.entry(parent_id).or_default().push(timing); + } + } + } + + // Remaining entries in id_to_timing are roots + for entry in completed.iter() { + if entry.parent_id.is_none() + && let Some(mut timing) = id_to_timing.remove(&entry.id) + { + if let Some(children) = children_map.remove(&entry.id) { + timing.sub_stages = children; + } + roots.push(timing); + } + } + + roots + } +} + +/// Visitor that extracts field values from span attributes. +struct FieldVisitor<'a>(&'a mut HashMap); + +impl tracing::field::Visit for FieldVisitor<'_> { + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + self.0.insert( + field.name().to_string(), + serde_json::Value::String(format!("{value:?}")), + ); + } + + fn record_i64(&mut self, field: &tracing::field::Field, value: i64) { + self.0.insert( + field.name().to_string(), + serde_json::Value::Number(value.into()), + ); + } + + fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { + self.0.insert( + field.name().to_string(), + serde_json::Value::Number(value.into()), + ); + } + + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + self.0.insert( + field.name().to_string(), + serde_json::Value::String(value.to_string()), + ); + } + + fn record_bool(&mut self, field: &tracing::field::Field, value: bool) { + self.0 + .insert(field.name().to_string(), serde_json::Value::Bool(value)); + } +} + +/// Visitor that extracts event fields for rate-limit/retry detection. +#[derive(Default)] +struct EventVisitor { + status_code: Option, + message: String, +} + +impl tracing::field::Visit for EventVisitor { + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + if field.name() == "message" { + self.message = format!("{value:?}"); + } + } + + fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { + if field.name() == "status_code" { + self.status_code = Some(value); + } + } + + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + if field.name() == "message" { + self.message = value.to_string(); + } + } +} + +impl Layer for MetricsLayer +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) { + let parent_id = ctx + .span(id) + .and_then(|s| s.parent().map(|p| p.id().into_u64())); + + let mut fields = HashMap::new(); + let mut visitor = FieldVisitor(&mut fields); + attrs.record(&mut visitor); + + self.spans.lock().unwrap_or_else(|e| e.into_inner()).insert( + id.into_u64(), + SpanData { + name: attrs.metadata().name().to_string(), + parent_id, + start: Instant::now(), + fields, + rate_limit_hits: 0, + retries: 0, + }, + ); + } + + fn on_record(&self, id: &Id, values: &Record<'_>, _ctx: Context<'_, S>) { + if let Some(data) = self + .spans + .lock() + .unwrap_or_else(|e| e.into_inner()) + .get_mut(&id.into_u64()) + { + let mut visitor = FieldVisitor(&mut data.fields); + values.record(&mut visitor); + } + } + + fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) { + // Count rate-limit and retry events on the current span + if let Some(span_ref) = ctx.event_span(event) { + let id = span_ref.id(); + if let Some(data) = self + .spans + .lock() + .unwrap_or_else(|e| e.into_inner()) + .get_mut(&id.into_u64()) + { + let mut visitor = EventVisitor::default(); + event.record(&mut visitor); + + if visitor.status_code == Some(429) { + data.rate_limit_hits += 1; + } + if visitor.message.contains("retrying") || visitor.message.contains("Retrying") { + data.retries += 1; + } + } + } + } + + fn on_close(&self, id: Id, _ctx: Context<'_, S>) { + if let Some(data) = self + .spans + .lock() + .unwrap_or_else(|e| e.into_inner()) + .remove(&id.into_u64()) + { + let elapsed = data.start.elapsed(); + let timing = StageTiming { + name: data.name, + project: data + .fields + .get("project") + .and_then(|v| v.as_str()) + .map(String::from), + elapsed_ms: elapsed.as_millis() as u64, + items_processed: data + .fields + .get("items_processed") + .and_then(|v| v.as_u64()) + .unwrap_or(0) as usize, + items_skipped: data + .fields + .get("items_skipped") + .and_then(|v| v.as_u64()) + .unwrap_or(0) as usize, + errors: data + .fields + .get("errors") + .and_then(|v| v.as_u64()) + .unwrap_or(0) as usize, + rate_limit_hits: data.rate_limit_hits, + retries: data.retries, + sub_stages: vec![], + }; + self.completed + .lock() + .unwrap_or_else(|e| e.into_inner()) + .push(CompletedSpan { + id: id.into_u64(), + parent_id: data.parent_id, + timing, + }); + } + } +} + +// Manual Debug impl since SpanData and CompletedSpan don't derive Debug +impl std::fmt::Debug for SpanData { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SpanData") + .field("name", &self.name) + .field("parent_id", &self.parent_id) + .field("fields", &self.fields.keys().collect::>()) + .finish() + } +} + +impl std::fmt::Debug for CompletedSpan { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CompletedSpan") + .field("id", &self.id) + .field("parent_id", &self.parent_id) + .field("timing", &self.timing.name) + .finish() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tracing_subscriber::layer::SubscriberExt; + + #[test] + fn test_stage_timing_serialization() { + let timing = StageTiming { + name: "sync".to_string(), + project: None, + elapsed_ms: 1500, + items_processed: 42, + items_skipped: 3, + errors: 1, + rate_limit_hits: 2, + retries: 5, + sub_stages: vec![StageTiming { + name: "ingest_issues".to_string(), + project: Some("group/repo".to_string()), + elapsed_ms: 800, + items_processed: 30, + items_skipped: 0, + errors: 0, + rate_limit_hits: 0, + retries: 0, + sub_stages: vec![], + }], + }; + + let json = serde_json::to_value(&timing).unwrap(); + assert_eq!(json["name"], "sync"); + assert_eq!(json["elapsed_ms"], 1500); + assert_eq!(json["items_processed"], 42); + assert_eq!(json["items_skipped"], 3); + assert_eq!(json["errors"], 1); + assert_eq!(json["rate_limit_hits"], 2); + assert_eq!(json["retries"], 5); + + // Sub-stage present + let sub = &json["sub_stages"][0]; + assert_eq!(sub["name"], "ingest_issues"); + assert_eq!(sub["project"], "group/repo"); + assert_eq!(sub["elapsed_ms"], 800); + } + + #[test] + fn test_stage_timing_zero_fields_omitted() { + let timing = StageTiming { + name: "embed".to_string(), + project: None, + elapsed_ms: 500, + items_processed: 10, + items_skipped: 0, + errors: 0, + rate_limit_hits: 0, + retries: 0, + sub_stages: vec![], + }; + + let json = serde_json::to_value(&timing).unwrap(); + let obj = json.as_object().unwrap(); + + // Zero fields must be absent + assert!(!obj.contains_key("items_skipped")); + assert!(!obj.contains_key("errors")); + assert!(!obj.contains_key("rate_limit_hits")); + assert!(!obj.contains_key("retries")); + assert!(!obj.contains_key("sub_stages")); + assert!(!obj.contains_key("project")); + + // Required fields always present + assert!(obj.contains_key("name")); + assert!(obj.contains_key("elapsed_ms")); + assert!(obj.contains_key("items_processed")); + } + + #[test] + fn test_stage_timing_empty_sub_stages() { + let timing = StageTiming { + name: "docs".to_string(), + project: None, + elapsed_ms: 200, + items_processed: 5, + items_skipped: 0, + errors: 0, + rate_limit_hits: 0, + retries: 0, + sub_stages: vec![], + }; + + let json_str = serde_json::to_string(&timing).unwrap(); + assert!(!json_str.contains("sub_stages")); + } + + #[test] + fn test_stage_timing_debug_clone() { + let timing = StageTiming { + name: "test".to_string(), + project: None, + elapsed_ms: 100, + items_processed: 1, + items_skipped: 0, + errors: 0, + rate_limit_hits: 0, + retries: 0, + sub_stages: vec![], + }; + + let cloned = timing.clone(); + assert_eq!(cloned.name, "test"); + + let debug_str = format!("{timing:?}"); + assert!(debug_str.contains("test")); + } + + #[test] + fn test_stage_timing_nested_sub_stages() { + let timing = StageTiming { + name: "sync".to_string(), + project: None, + elapsed_ms: 3000, + items_processed: 100, + items_skipped: 0, + errors: 0, + rate_limit_hits: 0, + retries: 0, + sub_stages: vec![StageTiming { + name: "ingest".to_string(), + project: None, + elapsed_ms: 2000, + items_processed: 80, + items_skipped: 0, + errors: 0, + rate_limit_hits: 0, + retries: 0, + sub_stages: vec![StageTiming { + name: "discussions".to_string(), + project: Some("a/b".to_string()), + elapsed_ms: 1000, + items_processed: 50, + items_skipped: 0, + errors: 0, + rate_limit_hits: 0, + retries: 0, + sub_stages: vec![], + }], + }], + }; + + let json = serde_json::to_value(&timing).unwrap(); + let nested = &json["sub_stages"][0]["sub_stages"][0]; + assert_eq!(nested["name"], "discussions"); + assert_eq!(nested["project"], "a/b"); + } + + #[test] + fn test_rate_limit_fields_omitted_when_zero() { + let timing = StageTiming { + name: "ingest".to_string(), + project: None, + elapsed_ms: 100, + items_processed: 5, + items_skipped: 0, + errors: 0, + rate_limit_hits: 0, + retries: 0, + sub_stages: vec![], + }; + + let json_str = serde_json::to_string(&timing).unwrap(); + assert!(!json_str.contains("rate_limit_hits")); + assert!(!json_str.contains("retries")); + } + + #[test] + fn test_rate_limit_fields_present_when_nonzero() { + let timing = StageTiming { + name: "ingest".to_string(), + project: None, + elapsed_ms: 100, + items_processed: 5, + items_skipped: 0, + errors: 0, + rate_limit_hits: 3, + retries: 7, + sub_stages: vec![], + }; + + let json = serde_json::to_value(&timing).unwrap(); + assert_eq!(json["rate_limit_hits"], 3); + assert_eq!(json["retries"], 7); + } + + #[test] + fn test_metrics_layer_single_span() { + let metrics = MetricsLayer::new(); + let subscriber = tracing_subscriber::registry().with(metrics.clone()); + + tracing::subscriber::with_default(subscriber, || { + let span = tracing::info_span!("test_stage"); + let _guard = span.enter(); + // Simulate work + }); + + let timings = metrics.extract_timings(); + assert_eq!(timings.len(), 1); + assert_eq!(timings[0].name, "test_stage"); + assert!(timings[0].elapsed_ms < 100); // Should be near-instant + } + + #[test] + fn test_metrics_layer_nested_spans() { + let metrics = MetricsLayer::new(); + let subscriber = tracing_subscriber::registry().with(metrics.clone()); + + tracing::subscriber::with_default(subscriber, || { + let parent = tracing::info_span!("parent"); + let _parent_guard = parent.enter(); + { + let child = tracing::info_span!("child"); + let _child_guard = child.enter(); + } + }); + + let timings = metrics.extract_timings(); + assert_eq!(timings.len(), 1); + assert_eq!(timings[0].name, "parent"); + assert_eq!(timings[0].sub_stages.len(), 1); + assert_eq!(timings[0].sub_stages[0].name, "child"); + } + + #[test] + fn test_metrics_layer_parallel_spans() { + let metrics = MetricsLayer::new(); + let subscriber = tracing_subscriber::registry().with(metrics.clone()); + + tracing::subscriber::with_default(subscriber, || { + let parent = tracing::info_span!("parent"); + let _parent_guard = parent.enter(); + { + let child_a = tracing::info_span!("child_a"); + let _a = child_a.enter(); + } + { + let child_b = tracing::info_span!("child_b"); + let _b = child_b.enter(); + } + }); + + let timings = metrics.extract_timings(); + assert_eq!(timings.len(), 1); + assert_eq!(timings[0].sub_stages.len(), 2); + + let names: Vec<&str> = timings[0] + .sub_stages + .iter() + .map(|s| s.name.as_str()) + .collect(); + assert!(names.contains(&"child_a")); + assert!(names.contains(&"child_b")); + } + + #[test] + fn test_metrics_layer_empty() { + let metrics = MetricsLayer::new(); + let timings = metrics.extract_timings(); + assert!(timings.is_empty()); + } +} diff --git a/src/core/mod.rs b/src/core/mod.rs index ea29ee1..6ec90b0 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -7,9 +7,12 @@ pub mod dependent_queue; pub mod error; pub mod events_db; pub mod lock; +pub mod logging; +pub mod metrics; pub mod paths; pub mod payloads; pub mod project; +pub mod sync_run; pub mod time; pub use config::Config; diff --git a/src/core/paths.rs b/src/core/paths.rs index b2db216..a3a3a04 100644 --- a/src/core/paths.rs +++ b/src/core/paths.rs @@ -52,6 +52,15 @@ pub fn get_db_path(config_override: Option<&str>) -> PathBuf { get_data_dir().join("lore.db") } +/// Get the log directory path. +/// Uses config override if provided, otherwise uses default in data dir. +pub fn get_log_dir(config_override: Option<&str>) -> PathBuf { + if let Some(path) = config_override { + return PathBuf::from(path); + } + get_data_dir().join("logs") +} + /// Get the backup directory path. /// Uses config override if provided, otherwise uses default in data dir. pub fn get_backup_dir(config_override: Option<&str>) -> PathBuf { diff --git a/src/core/sync_run.rs b/src/core/sync_run.rs new file mode 100644 index 0000000..53442dd --- /dev/null +++ b/src/core/sync_run.rs @@ -0,0 +1,232 @@ +//! Sync run lifecycle recorder. +//! +//! Encapsulates the INSERT-on-start, UPDATE-on-finish lifecycle for the +//! `sync_runs` table, enabling sync history tracking and observability. + +use rusqlite::Connection; + +use super::error::Result; +use super::metrics::StageTiming; +use super::time::now_ms; + +/// Records a single sync run's lifecycle in the `sync_runs` table. +/// +/// Created via [`start`](Self::start), then finalized with either +/// [`succeed`](Self::succeed) or [`fail`](Self::fail). Both finalizers +/// consume `self` to enforce single-use at compile time. +pub struct SyncRunRecorder { + row_id: i64, +} + +impl SyncRunRecorder { + /// Insert a new `sync_runs` row with `status='running'`. + pub fn start(conn: &Connection, command: &str, run_id: &str) -> Result { + let now = now_ms(); + conn.execute( + "INSERT INTO sync_runs (started_at, heartbeat_at, status, command, run_id) + VALUES (?1, ?2, 'running', ?3, ?4)", + rusqlite::params![now, now, command, run_id], + )?; + let row_id = conn.last_insert_rowid(); + Ok(Self { row_id }) + } + + /// Mark run as succeeded with full metrics. + pub fn succeed( + self, + conn: &Connection, + metrics: &[StageTiming], + total_items: usize, + total_errors: usize, + ) -> Result<()> { + let now = now_ms(); + let metrics_json = serde_json::to_string(metrics).unwrap_or_else(|_| "[]".to_string()); + conn.execute( + "UPDATE sync_runs + SET finished_at = ?1, status = 'succeeded', + metrics_json = ?2, total_items_processed = ?3, total_errors = ?4 + WHERE id = ?5", + rusqlite::params![ + now, + metrics_json, + total_items as i64, + total_errors as i64, + self.row_id + ], + )?; + Ok(()) + } + + /// Mark run as failed with error message and optional partial metrics. + pub fn fail( + self, + conn: &Connection, + error: &str, + metrics: Option<&[StageTiming]>, + ) -> Result<()> { + let now = now_ms(); + let metrics_json = + metrics.map(|m| serde_json::to_string(m).unwrap_or_else(|_| "[]".to_string())); + conn.execute( + "UPDATE sync_runs + SET finished_at = ?1, status = 'failed', error = ?2, + metrics_json = ?3 + WHERE id = ?4", + rusqlite::params![now, error, metrics_json, self.row_id], + )?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::core::db::{create_connection, run_migrations}; + use std::path::Path; + + fn setup_test_db() -> Connection { + let conn = create_connection(Path::new(":memory:")).unwrap(); + run_migrations(&conn).unwrap(); + conn + } + + #[test] + fn test_sync_run_recorder_start() { + let conn = setup_test_db(); + let recorder = SyncRunRecorder::start(&conn, "sync", "abc12345").unwrap(); + assert!(recorder.row_id > 0); + + let (status, command, run_id): (String, String, String) = conn + .query_row( + "SELECT status, command, run_id FROM sync_runs WHERE id = ?1", + [recorder.row_id], + |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)), + ) + .unwrap(); + + assert_eq!(status, "running"); + assert_eq!(command, "sync"); + assert_eq!(run_id, "abc12345"); + } + + #[test] + fn test_sync_run_recorder_succeed() { + let conn = setup_test_db(); + let recorder = SyncRunRecorder::start(&conn, "sync", "def67890").unwrap(); + let row_id = recorder.row_id; + + let metrics = vec![StageTiming { + name: "ingest".to_string(), + project: None, + elapsed_ms: 1200, + items_processed: 50, + items_skipped: 0, + errors: 2, + rate_limit_hits: 0, + retries: 0, + sub_stages: vec![], + }]; + + recorder.succeed(&conn, &metrics, 50, 2).unwrap(); + + let (status, finished_at, metrics_json, total_items, total_errors): ( + String, + Option, + Option, + i64, + i64, + ) = conn + .query_row( + "SELECT status, finished_at, metrics_json, total_items_processed, total_errors + FROM sync_runs WHERE id = ?1", + [row_id], + |row| { + Ok(( + row.get(0)?, + row.get(1)?, + row.get(2)?, + row.get(3)?, + row.get(4)?, + )) + }, + ) + .unwrap(); + + assert_eq!(status, "succeeded"); + assert!(finished_at.is_some()); + assert!(metrics_json.is_some()); + assert_eq!(total_items, 50); + assert_eq!(total_errors, 2); + + // Verify metrics_json is parseable + let parsed: Vec = serde_json::from_str(&metrics_json.unwrap()).unwrap(); + assert_eq!(parsed.len(), 1); + assert_eq!(parsed[0].name, "ingest"); + } + + #[test] + fn test_sync_run_recorder_fail() { + let conn = setup_test_db(); + let recorder = SyncRunRecorder::start(&conn, "ingest issues", "fail0001").unwrap(); + let row_id = recorder.row_id; + + recorder.fail(&conn, "GitLab auth failed", None).unwrap(); + + let (status, finished_at, error, metrics_json): ( + String, + Option, + Option, + Option, + ) = conn + .query_row( + "SELECT status, finished_at, error, metrics_json + FROM sync_runs WHERE id = ?1", + [row_id], + |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)), + ) + .unwrap(); + + assert_eq!(status, "failed"); + assert!(finished_at.is_some()); + assert_eq!(error.as_deref(), Some("GitLab auth failed")); + assert!(metrics_json.is_none()); + } + + #[test] + fn test_sync_run_recorder_fail_with_partial_metrics() { + let conn = setup_test_db(); + let recorder = SyncRunRecorder::start(&conn, "sync", "part0001").unwrap(); + let row_id = recorder.row_id; + + let partial_metrics = vec![StageTiming { + name: "ingest_issues".to_string(), + project: Some("group/repo".to_string()), + elapsed_ms: 800, + items_processed: 30, + items_skipped: 0, + errors: 0, + rate_limit_hits: 1, + retries: 0, + sub_stages: vec![], + }]; + + recorder + .fail(&conn, "Embedding failed", Some(&partial_metrics)) + .unwrap(); + + let (status, metrics_json): (String, Option) = conn + .query_row( + "SELECT status, metrics_json FROM sync_runs WHERE id = ?1", + [row_id], + |row| Ok((row.get(0)?, row.get(1)?)), + ) + .unwrap(); + + assert_eq!(status, "failed"); + assert!(metrics_json.is_some()); + + let parsed: Vec = serde_json::from_str(&metrics_json.unwrap()).unwrap(); + assert_eq!(parsed.len(), 1); + assert_eq!(parsed[0].name, "ingest_issues"); + } +}