feat(observability): Add metrics, logging, and sync-run core modules

Introduce the foundational observability layer for the sync pipeline:

- MetricsLayer: Custom tracing subscriber layer that captures span timing
  and structured fields, materializing them into a hierarchical
  Vec<StageTiming> tree for robot-mode performance data output
- logging: Dual-layer subscriber infrastructure with configurable stderr
  verbosity (-v/-vv/-vvv) and always-on JSON file logging with daily
  rotation and configurable retention (default 30 days)
- SyncRunRecorder: Compile-time enforced lifecycle recorder for sync_runs
  table (start -> succeed|fail), with correlation IDs and aggregate counts
- LoggingConfig: New config section for log_dir, retention_days, and
  file_logging toggle
- get_log_dir(): Path helper for log directory resolution
- is_permanent_api_error(): Distinguish retryable vs permanent API failures
  (only 404 is truly permanent; 403/auth errors may be environmental)

Database changes:
- Migration 013: Add resource_events_synced_for_updated_at watermark columns
  to issues and merge_requests tables for incremental resource event sync
- Migration 014: Enrich sync_runs with run_id correlation ID, aggregate
  counts (total_items_processed, total_errors), and run_id index
- Wrap file-based migrations in savepoints for rollback safety

Dependencies: Add uuid (run_id generation), tracing-appender (file logging)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
teernisse
2026-02-04 10:01:28 -05:00
parent ee5c5f9645
commit 329c8f4539
12 changed files with 1263 additions and 4 deletions

93
Cargo.lock generated
View File

@@ -296,6 +296,21 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "crossbeam-channel"
version = "0.5.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28"
[[package]] [[package]]
name = "crossterm" name = "crossterm"
version = "0.29.0" version = "0.29.0"
@@ -347,6 +362,15 @@ version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b" checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b"
[[package]]
name = "deranged"
version = "0.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ececcb659e7ba858fb4f10388c250a7252eb0a27373f1a72b8748afdd248e587"
dependencies = [
"powerfmt",
]
[[package]] [[package]]
name = "dialoguer" name = "dialoguer"
version = "0.12.0" version = "0.12.0"
@@ -1108,6 +1132,7 @@ dependencies = [
"thiserror", "thiserror",
"tokio", "tokio",
"tracing", "tracing",
"tracing-appender",
"tracing-subscriber", "tracing-subscriber",
"url", "url",
"urlencoding", "urlencoding",
@@ -1183,6 +1208,12 @@ dependencies = [
"windows-sys 0.61.2", "windows-sys 0.61.2",
] ]
[[package]]
name = "num-conv"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050"
[[package]] [[package]]
name = "num-traits" name = "num-traits"
version = "0.2.19" version = "0.2.19"
@@ -1343,6 +1374,12 @@ dependencies = [
"zerovec", "zerovec",
] ]
[[package]]
name = "powerfmt"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
[[package]] [[package]]
name = "ppv-lite86" name = "ppv-lite86"
version = "0.2.21" version = "0.2.21"
@@ -1878,6 +1915,37 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "time"
version = "0.3.46"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9da98b7d9b7dad93488a84b8248efc35352b0b2657397d4167e7ad67e5d535e5"
dependencies = [
"deranged",
"itoa",
"num-conv",
"powerfmt",
"serde_core",
"time-core",
"time-macros",
]
[[package]]
name = "time-core"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca"
[[package]]
name = "time-macros"
version = "0.2.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78cc610bac2dcee56805c99642447d4c5dbde4d01f752ffea0199aee1f601dc4"
dependencies = [
"num-conv",
"time-core",
]
[[package]] [[package]]
name = "tinystr" name = "tinystr"
version = "0.8.2" version = "0.8.2"
@@ -2003,6 +2071,18 @@ dependencies = [
"tracing-core", "tracing-core",
] ]
[[package]]
name = "tracing-appender"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "786d480bce6247ab75f005b14ae1624ad978d3029d9113f0a22fa1ac773faeaf"
dependencies = [
"crossbeam-channel",
"thiserror",
"time",
"tracing-subscriber",
]
[[package]] [[package]]
name = "tracing-attributes" name = "tracing-attributes"
version = "0.1.31" version = "0.1.31"
@@ -2035,6 +2115,16 @@ dependencies = [
"tracing-core", "tracing-core",
] ]
[[package]]
name = "tracing-serde"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1"
dependencies = [
"serde",
"tracing-core",
]
[[package]] [[package]]
name = "tracing-subscriber" name = "tracing-subscriber"
version = "0.3.22" version = "0.3.22"
@@ -2045,12 +2135,15 @@ dependencies = [
"nu-ansi-term", "nu-ansi-term",
"once_cell", "once_cell",
"regex-automata", "regex-automata",
"serde",
"serde_json",
"sharded-slab", "sharded-slab",
"smallvec", "smallvec",
"thread_local", "thread_local",
"tracing", "tracing",
"tracing-core", "tracing-core",
"tracing-log", "tracing-log",
"tracing-serde",
] ]
[[package]] [[package]]

View File

@@ -52,7 +52,8 @@ libc = "0.2"
# Logging # Logging
tracing = "0.1" tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
tracing-appender = "0.2"
[dev-dependencies] [dev-dependencies]
tempfile = "3" tempfile = "3"

View File

@@ -0,0 +1,10 @@
-- Migration 013: Add resource event sync watermarks
-- Mirrors the discussions_synced_for_updated_at pattern so that only entities
-- whose updated_at exceeds the last resource event sync get re-enqueued.
ALTER TABLE issues ADD COLUMN resource_events_synced_for_updated_at INTEGER;
ALTER TABLE merge_requests ADD COLUMN resource_events_synced_for_updated_at INTEGER;
-- Update schema version
INSERT INTO schema_version (version, applied_at, description)
VALUES (13, strftime('%s', 'now') * 1000, 'Add resource event sync watermarks to issues and merge_requests');

View File

@@ -0,0 +1,12 @@
-- Migration 014: sync_runs enrichment for observability
-- Adds correlation ID and aggregate counts for queryable sync history
ALTER TABLE sync_runs ADD COLUMN run_id TEXT;
ALTER TABLE sync_runs ADD COLUMN total_items_processed INTEGER DEFAULT 0;
ALTER TABLE sync_runs ADD COLUMN total_errors INTEGER DEFAULT 0;
-- Index for correlation queries (find run by run_id from logs)
CREATE INDEX IF NOT EXISTS idx_sync_runs_run_id ON sync_runs(run_id);
INSERT INTO schema_version (version, applied_at, description)
VALUES (14, strftime('%s', 'now') * 1000, 'Sync runs enrichment for observability');

View File

@@ -120,6 +120,41 @@ impl Default for EmbeddingConfig {
} }
} }
/// Logging and observability settings.
#[derive(Debug, Clone, Deserialize)]
#[serde(default)]
pub struct LoggingConfig {
/// Directory for log files. Default: ~/.local/share/lore/logs/
#[serde(rename = "logDir")]
pub log_dir: Option<String>,
/// Days to retain log files. Default: 30. Set to 0 to disable file logging.
#[serde(rename = "retentionDays", default = "default_retention_days")]
pub retention_days: u32,
/// Enable JSON log files. Default: true.
#[serde(rename = "fileLogging", default = "default_file_logging")]
pub file_logging: bool,
}
fn default_retention_days() -> u32 {
30
}
fn default_file_logging() -> bool {
true
}
impl Default for LoggingConfig {
fn default() -> Self {
Self {
log_dir: None,
retention_days: default_retention_days(),
file_logging: default_file_logging(),
}
}
}
/// Main configuration structure. /// Main configuration structure.
#[derive(Debug, Clone, Deserialize)] #[derive(Debug, Clone, Deserialize)]
pub struct Config { pub struct Config {
@@ -134,6 +169,9 @@ pub struct Config {
#[serde(default)] #[serde(default)]
pub embedding: EmbeddingConfig, pub embedding: EmbeddingConfig,
#[serde(default)]
pub logging: LoggingConfig,
} }
impl Config { impl Config {

View File

@@ -47,6 +47,10 @@ const MIGRATIONS: &[(&str, &str)] = &[
"013", "013",
include_str!("../../migrations/013_resource_event_watermarks.sql"), include_str!("../../migrations/013_resource_event_watermarks.sql"),
), ),
(
"014",
include_str!("../../migrations/014_sync_runs_enrichment.sql"),
),
]; ];
/// Create a database connection with production-grade pragmas. /// Create a database connection with production-grade pragmas.
@@ -190,13 +194,35 @@ pub fn run_migrations_from_dir(conn: &Connection, migrations_dir: &Path) -> Resu
let sql = fs::read_to_string(entry.path())?; let sql = fs::read_to_string(entry.path())?;
conn.execute_batch(&sql) // Wrap each migration in a savepoint to prevent partial application,
// matching the safety guarantees of run_migrations().
let savepoint_name = format!("migration_{}", version);
conn.execute_batch(&format!("SAVEPOINT {}", savepoint_name))
.map_err(|e| LoreError::MigrationFailed { .map_err(|e| LoreError::MigrationFailed {
version, version,
message: e.to_string(), message: format!("Failed to create savepoint: {}", e),
source: Some(e), source: Some(e),
})?; })?;
match conn.execute_batch(&sql) {
Ok(()) => {
conn.execute_batch(&format!("RELEASE {}", savepoint_name))
.map_err(|e| LoreError::MigrationFailed {
version,
message: format!("Failed to release savepoint: {}", e),
source: Some(e),
})?;
}
Err(e) => {
let _ = conn.execute_batch(&format!("ROLLBACK TO {}", savepoint_name));
return Err(LoreError::MigrationFailed {
version,
message: e.to_string(),
source: Some(e),
});
}
}
info!(version, file = %filename_str, "Migration applied"); info!(version, file = %filename_str, "Migration applied");
} }

View File

@@ -59,7 +59,7 @@ impl ErrorCode {
pub fn exit_code(&self) -> i32 { pub fn exit_code(&self) -> i32 {
match self { match self {
Self::InternalError => 1, Self::InternalError => 1,
Self::ConfigNotFound => 20, Self::ConfigNotFound => 2,
Self::ConfigInvalid => 3, Self::ConfigInvalid => 3,
Self::TokenNotSet => 4, Self::TokenNotSet => 4,
Self::GitLabAuthFailed => 5, Self::GitLabAuthFailed => 5,
@@ -240,6 +240,15 @@ impl LoreError {
} }
} }
/// Whether this error represents a permanent API failure that should not be retried.
///
/// Only 404 (not found) is truly permanent: the resource doesn't exist and never will.
/// 403 and auth errors are NOT permanent — they may be environmental (VPN down,
/// token rotation, temporary restrictions) and should be retried with backoff.
pub fn is_permanent_api_error(&self) -> bool {
matches!(self, Self::GitLabNotFound { .. })
}
/// Get the exit code for this error. /// Get the exit code for this error.
pub fn exit_code(&self) -> i32 { pub fn exit_code(&self) -> i32 {
self.code().exit_code() self.code().exit_code()

217
src/core/logging.rs Normal file
View File

@@ -0,0 +1,217 @@
//! Logging infrastructure: dual-layer subscriber setup and log file retention.
//!
//! Provides a layered tracing subscriber with:
//! - **stderr layer**: Human-readable or JSON format, controlled by `-v` flags
//! - **file layer**: Always-on JSON output to daily-rotated log files
use std::fs;
use std::path::Path;
use tracing_subscriber::EnvFilter;
/// Build an `EnvFilter` from the verbosity count.
///
/// | Count | App Level | Dep Level |
/// |-------|-----------|-----------|
/// | 0 | INFO | WARN |
/// | 1 | DEBUG | WARN |
/// | 2 | DEBUG | INFO |
/// | 3+ | TRACE | DEBUG |
pub fn build_stderr_filter(verbose: u8, quiet: bool) -> EnvFilter {
// RUST_LOG always wins if set
if std::env::var("RUST_LOG").is_ok() {
return EnvFilter::from_default_env();
}
// -q overrides -v for stderr
if quiet {
return EnvFilter::new("lore=warn,error");
}
let directives = match verbose {
0 => "lore=info,warn",
1 => "lore=debug,warn",
2 => "lore=debug,info",
_ => "lore=trace,debug",
};
EnvFilter::new(directives)
}
/// Build an `EnvFilter` for the file layer.
///
/// Always captures DEBUG+ for `lore::*` and WARN+ for dependencies,
/// unless `RUST_LOG` is set (which overrides everything).
pub fn build_file_filter() -> EnvFilter {
if std::env::var("RUST_LOG").is_ok() {
return EnvFilter::from_default_env();
}
EnvFilter::new("lore=debug,warn")
}
/// Delete log files older than `retention_days` from the given directory.
///
/// Only deletes files matching the `lore.YYYY-MM-DD.log` pattern.
/// Returns the number of files deleted.
pub fn cleanup_old_logs(log_dir: &Path, retention_days: u32) -> usize {
if retention_days == 0 || !log_dir.exists() {
return 0;
}
let cutoff = chrono::Utc::now() - chrono::Duration::days(i64::from(retention_days));
let cutoff_date = cutoff.format("%Y-%m-%d").to_string();
let mut deleted = 0;
let entries = match fs::read_dir(log_dir) {
Ok(entries) => entries,
Err(_) => return 0,
};
for entry in entries.flatten() {
let file_name = entry.file_name();
let name = file_name.to_string_lossy();
// Match pattern: lore.YYYY-MM-DD.log or lore.YYYY-MM-DD (tracing-appender format)
if let Some(date_str) = extract_log_date(&name)
&& date_str < cutoff_date
&& fs::remove_file(entry.path()).is_ok()
{
deleted += 1;
}
}
deleted
}
/// Extract the date portion from a log filename.
///
/// Matches: `lore.YYYY-MM-DD.log` or `lore.YYYY-MM-DD`
fn extract_log_date(filename: &str) -> Option<String> {
let rest = filename.strip_prefix("lore.")?;
// Must have at least YYYY-MM-DD (10 ASCII chars).
// Use get() to avoid panicking on non-ASCII filenames.
let date_part = rest.get(..10)?;
// Validate it looks like a date
let parts: Vec<&str> = date_part.split('-').collect();
if parts.len() != 3 || parts[0].len() != 4 || parts[1].len() != 2 || parts[2].len() != 2 {
return None;
}
// Check all parts are numeric (also ensures ASCII)
if !parts.iter().all(|p| p.chars().all(|c| c.is_ascii_digit())) {
return None;
}
// After the date, must be end-of-string or ".log"
let suffix = rest.get(10..)?;
if suffix.is_empty() || suffix == ".log" {
Some(date_part.to_string())
} else {
None
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs::File;
use tempfile::TempDir;
#[test]
fn test_extract_log_date_with_extension() {
assert_eq!(
extract_log_date("lore.2026-02-04.log"),
Some("2026-02-04".to_string())
);
}
#[test]
fn test_extract_log_date_without_extension() {
assert_eq!(
extract_log_date("lore.2026-02-04"),
Some("2026-02-04".to_string())
);
}
#[test]
fn test_extract_log_date_rejects_non_log_files() {
assert_eq!(extract_log_date("other.txt"), None);
assert_eq!(extract_log_date("lore.config.json"), None);
assert_eq!(extract_log_date("lore.db"), None);
}
#[test]
fn test_extract_log_date_rejects_invalid_dates() {
assert_eq!(extract_log_date("lore.not-a-date.log"), None);
assert_eq!(extract_log_date("lore.20260204.log"), None);
}
#[test]
fn test_cleanup_old_logs_deletes_old_files() {
let dir = TempDir::new().unwrap();
// Create old log files (well before any reasonable retention)
File::create(dir.path().join("lore.2020-01-01.log")).unwrap();
File::create(dir.path().join("lore.2020-01-15.log")).unwrap();
// Create a recent log file (today)
let today = chrono::Utc::now().format("%Y-%m-%d").to_string();
let recent_name = format!("lore.{today}.log");
File::create(dir.path().join(&recent_name)).unwrap();
// Create a non-log file that should NOT be deleted
File::create(dir.path().join("other.txt")).unwrap();
let deleted = cleanup_old_logs(dir.path(), 7);
assert_eq!(deleted, 2);
assert!(!dir.path().join("lore.2020-01-01.log").exists());
assert!(!dir.path().join("lore.2020-01-15.log").exists());
assert!(dir.path().join(&recent_name).exists());
assert!(dir.path().join("other.txt").exists());
}
#[test]
fn test_cleanup_old_logs_zero_retention_is_noop() {
let dir = TempDir::new().unwrap();
File::create(dir.path().join("lore.2020-01-01.log")).unwrap();
let deleted = cleanup_old_logs(dir.path(), 0);
assert_eq!(deleted, 0);
assert!(dir.path().join("lore.2020-01-01.log").exists());
}
#[test]
fn test_cleanup_old_logs_nonexistent_dir() {
let deleted = cleanup_old_logs(Path::new("/nonexistent/dir"), 7);
assert_eq!(deleted, 0);
}
#[test]
fn test_build_stderr_filter_default() {
// Can't easily assert filter contents, but verify it doesn't panic
let _filter = build_stderr_filter(0, false);
}
#[test]
fn test_build_stderr_filter_verbose_levels() {
let _f0 = build_stderr_filter(0, false);
let _f1 = build_stderr_filter(1, false);
let _f2 = build_stderr_filter(2, false);
let _f3 = build_stderr_filter(3, false);
}
#[test]
fn test_build_stderr_filter_quiet_overrides_verbose() {
// Quiet should win over verbose
let _filter = build_stderr_filter(3, true);
}
#[test]
fn test_build_file_filter() {
let _filter = build_file_filter();
}
}

609
src/core/metrics.rs Normal file
View File

@@ -0,0 +1,609 @@
//! Performance metrics types and tracing layer for sync pipeline observability.
//!
//! Provides:
//! - [`StageTiming`]: Serializable timing/counter data for pipeline stages
//! - [`MetricsLayer`]: Custom tracing subscriber layer that captures span timing
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use serde::{Deserialize, Serialize};
use tracing::Subscriber;
use tracing::span::{Attributes, Id, Record};
use tracing_subscriber::layer::{Context, Layer};
use tracing_subscriber::registry::LookupSpan;
/// Returns true when value is zero (for serde `skip_serializing_if`).
fn is_zero(v: &usize) -> bool {
*v == 0
}
/// Timing and counter data for a single pipeline stage.
///
/// Supports nested sub-stages for hierarchical timing breakdowns.
/// Fields with zero/empty values are omitted from JSON output to
/// keep robot-mode payloads compact.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StageTiming {
pub name: String,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub project: Option<String>,
pub elapsed_ms: u64,
pub items_processed: usize,
#[serde(skip_serializing_if = "is_zero", default)]
pub items_skipped: usize,
#[serde(skip_serializing_if = "is_zero", default)]
pub errors: usize,
#[serde(skip_serializing_if = "is_zero", default)]
pub rate_limit_hits: usize,
#[serde(skip_serializing_if = "is_zero", default)]
pub retries: usize,
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub sub_stages: Vec<StageTiming>,
}
// ============================================================================
// MetricsLayer: custom tracing subscriber layer
// ============================================================================
/// Internal data tracked per open span.
struct SpanData {
name: String,
parent_id: Option<u64>,
start: Instant,
fields: HashMap<String, serde_json::Value>,
rate_limit_hits: usize,
retries: usize,
}
/// Completed span data with its original ID and parent ID.
struct CompletedSpan {
id: u64,
parent_id: Option<u64>,
timing: StageTiming,
}
/// Custom tracing layer that captures span timing and structured fields.
///
/// Collects data from `#[instrument]` spans and materializes it into
/// a `Vec<StageTiming>` tree via [`extract_timings`].
///
/// Thread-safe via `Arc<Mutex<>>` — suitable for concurrent span operations.
#[derive(Debug, Clone)]
pub struct MetricsLayer {
spans: Arc<Mutex<HashMap<u64, SpanData>>>,
completed: Arc<Mutex<Vec<CompletedSpan>>>,
}
impl Default for MetricsLayer {
fn default() -> Self {
Self::new()
}
}
impl MetricsLayer {
pub fn new() -> Self {
Self {
spans: Arc::new(Mutex::new(HashMap::new())),
completed: Arc::new(Mutex::new(Vec::new())),
}
}
/// Extract timing tree for a completed run.
///
/// Returns the top-level stages with sub-stages nested.
/// Call after the root span closes.
pub fn extract_timings(&self) -> Vec<StageTiming> {
let completed = self.completed.lock().unwrap_or_else(|e| e.into_inner());
if completed.is_empty() {
return Vec::new();
}
// Build children map: parent_id -> Vec<StageTiming>
let mut children_map: HashMap<u64, Vec<StageTiming>> = HashMap::new();
let mut roots = Vec::new();
let mut id_to_timing: HashMap<u64, StageTiming> = HashMap::new();
// First pass: collect all timings by ID
for entry in completed.iter() {
id_to_timing.insert(entry.id, entry.timing.clone());
}
// Second pass: process in reverse order (children close before parents)
// to build the tree bottom-up
for entry in completed.iter() {
// Attach any children that were collected for this span
if let Some(timing) = id_to_timing.get_mut(&entry.id)
&& let Some(children) = children_map.remove(&entry.id)
{
timing.sub_stages = children;
}
if let Some(parent_id) = entry.parent_id {
// This is a child span — attach to parent's children
if let Some(timing) = id_to_timing.remove(&entry.id) {
children_map.entry(parent_id).or_default().push(timing);
}
}
}
// Remaining entries in id_to_timing are roots
for entry in completed.iter() {
if entry.parent_id.is_none()
&& let Some(mut timing) = id_to_timing.remove(&entry.id)
{
if let Some(children) = children_map.remove(&entry.id) {
timing.sub_stages = children;
}
roots.push(timing);
}
}
roots
}
}
/// Visitor that extracts field values from span attributes.
struct FieldVisitor<'a>(&'a mut HashMap<String, serde_json::Value>);
impl tracing::field::Visit for FieldVisitor<'_> {
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
self.0.insert(
field.name().to_string(),
serde_json::Value::String(format!("{value:?}")),
);
}
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
self.0.insert(
field.name().to_string(),
serde_json::Value::Number(value.into()),
);
}
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
self.0.insert(
field.name().to_string(),
serde_json::Value::Number(value.into()),
);
}
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
self.0.insert(
field.name().to_string(),
serde_json::Value::String(value.to_string()),
);
}
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
self.0
.insert(field.name().to_string(), serde_json::Value::Bool(value));
}
}
/// Visitor that extracts event fields for rate-limit/retry detection.
#[derive(Default)]
struct EventVisitor {
status_code: Option<u64>,
message: String,
}
impl tracing::field::Visit for EventVisitor {
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
if field.name() == "message" {
self.message = format!("{value:?}");
}
}
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
if field.name() == "status_code" {
self.status_code = Some(value);
}
}
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
if field.name() == "message" {
self.message = value.to_string();
}
}
}
impl<S> Layer<S> for MetricsLayer
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
let parent_id = ctx
.span(id)
.and_then(|s| s.parent().map(|p| p.id().into_u64()));
let mut fields = HashMap::new();
let mut visitor = FieldVisitor(&mut fields);
attrs.record(&mut visitor);
self.spans.lock().unwrap_or_else(|e| e.into_inner()).insert(
id.into_u64(),
SpanData {
name: attrs.metadata().name().to_string(),
parent_id,
start: Instant::now(),
fields,
rate_limit_hits: 0,
retries: 0,
},
);
}
fn on_record(&self, id: &Id, values: &Record<'_>, _ctx: Context<'_, S>) {
if let Some(data) = self
.spans
.lock()
.unwrap_or_else(|e| e.into_inner())
.get_mut(&id.into_u64())
{
let mut visitor = FieldVisitor(&mut data.fields);
values.record(&mut visitor);
}
}
fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) {
// Count rate-limit and retry events on the current span
if let Some(span_ref) = ctx.event_span(event) {
let id = span_ref.id();
if let Some(data) = self
.spans
.lock()
.unwrap_or_else(|e| e.into_inner())
.get_mut(&id.into_u64())
{
let mut visitor = EventVisitor::default();
event.record(&mut visitor);
if visitor.status_code == Some(429) {
data.rate_limit_hits += 1;
}
if visitor.message.contains("retrying") || visitor.message.contains("Retrying") {
data.retries += 1;
}
}
}
}
fn on_close(&self, id: Id, _ctx: Context<'_, S>) {
if let Some(data) = self
.spans
.lock()
.unwrap_or_else(|e| e.into_inner())
.remove(&id.into_u64())
{
let elapsed = data.start.elapsed();
let timing = StageTiming {
name: data.name,
project: data
.fields
.get("project")
.and_then(|v| v.as_str())
.map(String::from),
elapsed_ms: elapsed.as_millis() as u64,
items_processed: data
.fields
.get("items_processed")
.and_then(|v| v.as_u64())
.unwrap_or(0) as usize,
items_skipped: data
.fields
.get("items_skipped")
.and_then(|v| v.as_u64())
.unwrap_or(0) as usize,
errors: data
.fields
.get("errors")
.and_then(|v| v.as_u64())
.unwrap_or(0) as usize,
rate_limit_hits: data.rate_limit_hits,
retries: data.retries,
sub_stages: vec![],
};
self.completed
.lock()
.unwrap_or_else(|e| e.into_inner())
.push(CompletedSpan {
id: id.into_u64(),
parent_id: data.parent_id,
timing,
});
}
}
}
// Manual Debug impl since SpanData and CompletedSpan don't derive Debug
impl std::fmt::Debug for SpanData {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SpanData")
.field("name", &self.name)
.field("parent_id", &self.parent_id)
.field("fields", &self.fields.keys().collect::<Vec<_>>())
.finish()
}
}
impl std::fmt::Debug for CompletedSpan {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CompletedSpan")
.field("id", &self.id)
.field("parent_id", &self.parent_id)
.field("timing", &self.timing.name)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use tracing_subscriber::layer::SubscriberExt;
#[test]
fn test_stage_timing_serialization() {
let timing = StageTiming {
name: "sync".to_string(),
project: None,
elapsed_ms: 1500,
items_processed: 42,
items_skipped: 3,
errors: 1,
rate_limit_hits: 2,
retries: 5,
sub_stages: vec![StageTiming {
name: "ingest_issues".to_string(),
project: Some("group/repo".to_string()),
elapsed_ms: 800,
items_processed: 30,
items_skipped: 0,
errors: 0,
rate_limit_hits: 0,
retries: 0,
sub_stages: vec![],
}],
};
let json = serde_json::to_value(&timing).unwrap();
assert_eq!(json["name"], "sync");
assert_eq!(json["elapsed_ms"], 1500);
assert_eq!(json["items_processed"], 42);
assert_eq!(json["items_skipped"], 3);
assert_eq!(json["errors"], 1);
assert_eq!(json["rate_limit_hits"], 2);
assert_eq!(json["retries"], 5);
// Sub-stage present
let sub = &json["sub_stages"][0];
assert_eq!(sub["name"], "ingest_issues");
assert_eq!(sub["project"], "group/repo");
assert_eq!(sub["elapsed_ms"], 800);
}
#[test]
fn test_stage_timing_zero_fields_omitted() {
let timing = StageTiming {
name: "embed".to_string(),
project: None,
elapsed_ms: 500,
items_processed: 10,
items_skipped: 0,
errors: 0,
rate_limit_hits: 0,
retries: 0,
sub_stages: vec![],
};
let json = serde_json::to_value(&timing).unwrap();
let obj = json.as_object().unwrap();
// Zero fields must be absent
assert!(!obj.contains_key("items_skipped"));
assert!(!obj.contains_key("errors"));
assert!(!obj.contains_key("rate_limit_hits"));
assert!(!obj.contains_key("retries"));
assert!(!obj.contains_key("sub_stages"));
assert!(!obj.contains_key("project"));
// Required fields always present
assert!(obj.contains_key("name"));
assert!(obj.contains_key("elapsed_ms"));
assert!(obj.contains_key("items_processed"));
}
#[test]
fn test_stage_timing_empty_sub_stages() {
let timing = StageTiming {
name: "docs".to_string(),
project: None,
elapsed_ms: 200,
items_processed: 5,
items_skipped: 0,
errors: 0,
rate_limit_hits: 0,
retries: 0,
sub_stages: vec![],
};
let json_str = serde_json::to_string(&timing).unwrap();
assert!(!json_str.contains("sub_stages"));
}
#[test]
fn test_stage_timing_debug_clone() {
let timing = StageTiming {
name: "test".to_string(),
project: None,
elapsed_ms: 100,
items_processed: 1,
items_skipped: 0,
errors: 0,
rate_limit_hits: 0,
retries: 0,
sub_stages: vec![],
};
let cloned = timing.clone();
assert_eq!(cloned.name, "test");
let debug_str = format!("{timing:?}");
assert!(debug_str.contains("test"));
}
#[test]
fn test_stage_timing_nested_sub_stages() {
let timing = StageTiming {
name: "sync".to_string(),
project: None,
elapsed_ms: 3000,
items_processed: 100,
items_skipped: 0,
errors: 0,
rate_limit_hits: 0,
retries: 0,
sub_stages: vec![StageTiming {
name: "ingest".to_string(),
project: None,
elapsed_ms: 2000,
items_processed: 80,
items_skipped: 0,
errors: 0,
rate_limit_hits: 0,
retries: 0,
sub_stages: vec![StageTiming {
name: "discussions".to_string(),
project: Some("a/b".to_string()),
elapsed_ms: 1000,
items_processed: 50,
items_skipped: 0,
errors: 0,
rate_limit_hits: 0,
retries: 0,
sub_stages: vec![],
}],
}],
};
let json = serde_json::to_value(&timing).unwrap();
let nested = &json["sub_stages"][0]["sub_stages"][0];
assert_eq!(nested["name"], "discussions");
assert_eq!(nested["project"], "a/b");
}
#[test]
fn test_rate_limit_fields_omitted_when_zero() {
let timing = StageTiming {
name: "ingest".to_string(),
project: None,
elapsed_ms: 100,
items_processed: 5,
items_skipped: 0,
errors: 0,
rate_limit_hits: 0,
retries: 0,
sub_stages: vec![],
};
let json_str = serde_json::to_string(&timing).unwrap();
assert!(!json_str.contains("rate_limit_hits"));
assert!(!json_str.contains("retries"));
}
#[test]
fn test_rate_limit_fields_present_when_nonzero() {
let timing = StageTiming {
name: "ingest".to_string(),
project: None,
elapsed_ms: 100,
items_processed: 5,
items_skipped: 0,
errors: 0,
rate_limit_hits: 3,
retries: 7,
sub_stages: vec![],
};
let json = serde_json::to_value(&timing).unwrap();
assert_eq!(json["rate_limit_hits"], 3);
assert_eq!(json["retries"], 7);
}
#[test]
fn test_metrics_layer_single_span() {
let metrics = MetricsLayer::new();
let subscriber = tracing_subscriber::registry().with(metrics.clone());
tracing::subscriber::with_default(subscriber, || {
let span = tracing::info_span!("test_stage");
let _guard = span.enter();
// Simulate work
});
let timings = metrics.extract_timings();
assert_eq!(timings.len(), 1);
assert_eq!(timings[0].name, "test_stage");
assert!(timings[0].elapsed_ms < 100); // Should be near-instant
}
#[test]
fn test_metrics_layer_nested_spans() {
let metrics = MetricsLayer::new();
let subscriber = tracing_subscriber::registry().with(metrics.clone());
tracing::subscriber::with_default(subscriber, || {
let parent = tracing::info_span!("parent");
let _parent_guard = parent.enter();
{
let child = tracing::info_span!("child");
let _child_guard = child.enter();
}
});
let timings = metrics.extract_timings();
assert_eq!(timings.len(), 1);
assert_eq!(timings[0].name, "parent");
assert_eq!(timings[0].sub_stages.len(), 1);
assert_eq!(timings[0].sub_stages[0].name, "child");
}
#[test]
fn test_metrics_layer_parallel_spans() {
let metrics = MetricsLayer::new();
let subscriber = tracing_subscriber::registry().with(metrics.clone());
tracing::subscriber::with_default(subscriber, || {
let parent = tracing::info_span!("parent");
let _parent_guard = parent.enter();
{
let child_a = tracing::info_span!("child_a");
let _a = child_a.enter();
}
{
let child_b = tracing::info_span!("child_b");
let _b = child_b.enter();
}
});
let timings = metrics.extract_timings();
assert_eq!(timings.len(), 1);
assert_eq!(timings[0].sub_stages.len(), 2);
let names: Vec<&str> = timings[0]
.sub_stages
.iter()
.map(|s| s.name.as_str())
.collect();
assert!(names.contains(&"child_a"));
assert!(names.contains(&"child_b"));
}
#[test]
fn test_metrics_layer_empty() {
let metrics = MetricsLayer::new();
let timings = metrics.extract_timings();
assert!(timings.is_empty());
}
}

View File

@@ -7,9 +7,12 @@ pub mod dependent_queue;
pub mod error; pub mod error;
pub mod events_db; pub mod events_db;
pub mod lock; pub mod lock;
pub mod logging;
pub mod metrics;
pub mod paths; pub mod paths;
pub mod payloads; pub mod payloads;
pub mod project; pub mod project;
pub mod sync_run;
pub mod time; pub mod time;
pub use config::Config; pub use config::Config;

View File

@@ -52,6 +52,15 @@ pub fn get_db_path(config_override: Option<&str>) -> PathBuf {
get_data_dir().join("lore.db") get_data_dir().join("lore.db")
} }
/// Get the log directory path.
/// Uses config override if provided, otherwise uses default in data dir.
pub fn get_log_dir(config_override: Option<&str>) -> PathBuf {
if let Some(path) = config_override {
return PathBuf::from(path);
}
get_data_dir().join("logs")
}
/// Get the backup directory path. /// Get the backup directory path.
/// Uses config override if provided, otherwise uses default in data dir. /// Uses config override if provided, otherwise uses default in data dir.
pub fn get_backup_dir(config_override: Option<&str>) -> PathBuf { pub fn get_backup_dir(config_override: Option<&str>) -> PathBuf {

232
src/core/sync_run.rs Normal file
View File

@@ -0,0 +1,232 @@
//! Sync run lifecycle recorder.
//!
//! Encapsulates the INSERT-on-start, UPDATE-on-finish lifecycle for the
//! `sync_runs` table, enabling sync history tracking and observability.
use rusqlite::Connection;
use super::error::Result;
use super::metrics::StageTiming;
use super::time::now_ms;
/// Records a single sync run's lifecycle in the `sync_runs` table.
///
/// Created via [`start`](Self::start), then finalized with either
/// [`succeed`](Self::succeed) or [`fail`](Self::fail). Both finalizers
/// consume `self` to enforce single-use at compile time.
pub struct SyncRunRecorder {
row_id: i64,
}
impl SyncRunRecorder {
/// Insert a new `sync_runs` row with `status='running'`.
pub fn start(conn: &Connection, command: &str, run_id: &str) -> Result<Self> {
let now = now_ms();
conn.execute(
"INSERT INTO sync_runs (started_at, heartbeat_at, status, command, run_id)
VALUES (?1, ?2, 'running', ?3, ?4)",
rusqlite::params![now, now, command, run_id],
)?;
let row_id = conn.last_insert_rowid();
Ok(Self { row_id })
}
/// Mark run as succeeded with full metrics.
pub fn succeed(
self,
conn: &Connection,
metrics: &[StageTiming],
total_items: usize,
total_errors: usize,
) -> Result<()> {
let now = now_ms();
let metrics_json = serde_json::to_string(metrics).unwrap_or_else(|_| "[]".to_string());
conn.execute(
"UPDATE sync_runs
SET finished_at = ?1, status = 'succeeded',
metrics_json = ?2, total_items_processed = ?3, total_errors = ?4
WHERE id = ?5",
rusqlite::params![
now,
metrics_json,
total_items as i64,
total_errors as i64,
self.row_id
],
)?;
Ok(())
}
/// Mark run as failed with error message and optional partial metrics.
pub fn fail(
self,
conn: &Connection,
error: &str,
metrics: Option<&[StageTiming]>,
) -> Result<()> {
let now = now_ms();
let metrics_json =
metrics.map(|m| serde_json::to_string(m).unwrap_or_else(|_| "[]".to_string()));
conn.execute(
"UPDATE sync_runs
SET finished_at = ?1, status = 'failed', error = ?2,
metrics_json = ?3
WHERE id = ?4",
rusqlite::params![now, error, metrics_json, self.row_id],
)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::db::{create_connection, run_migrations};
use std::path::Path;
fn setup_test_db() -> Connection {
let conn = create_connection(Path::new(":memory:")).unwrap();
run_migrations(&conn).unwrap();
conn
}
#[test]
fn test_sync_run_recorder_start() {
let conn = setup_test_db();
let recorder = SyncRunRecorder::start(&conn, "sync", "abc12345").unwrap();
assert!(recorder.row_id > 0);
let (status, command, run_id): (String, String, String) = conn
.query_row(
"SELECT status, command, run_id FROM sync_runs WHERE id = ?1",
[recorder.row_id],
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
)
.unwrap();
assert_eq!(status, "running");
assert_eq!(command, "sync");
assert_eq!(run_id, "abc12345");
}
#[test]
fn test_sync_run_recorder_succeed() {
let conn = setup_test_db();
let recorder = SyncRunRecorder::start(&conn, "sync", "def67890").unwrap();
let row_id = recorder.row_id;
let metrics = vec![StageTiming {
name: "ingest".to_string(),
project: None,
elapsed_ms: 1200,
items_processed: 50,
items_skipped: 0,
errors: 2,
rate_limit_hits: 0,
retries: 0,
sub_stages: vec![],
}];
recorder.succeed(&conn, &metrics, 50, 2).unwrap();
let (status, finished_at, metrics_json, total_items, total_errors): (
String,
Option<i64>,
Option<String>,
i64,
i64,
) = conn
.query_row(
"SELECT status, finished_at, metrics_json, total_items_processed, total_errors
FROM sync_runs WHERE id = ?1",
[row_id],
|row| {
Ok((
row.get(0)?,
row.get(1)?,
row.get(2)?,
row.get(3)?,
row.get(4)?,
))
},
)
.unwrap();
assert_eq!(status, "succeeded");
assert!(finished_at.is_some());
assert!(metrics_json.is_some());
assert_eq!(total_items, 50);
assert_eq!(total_errors, 2);
// Verify metrics_json is parseable
let parsed: Vec<StageTiming> = serde_json::from_str(&metrics_json.unwrap()).unwrap();
assert_eq!(parsed.len(), 1);
assert_eq!(parsed[0].name, "ingest");
}
#[test]
fn test_sync_run_recorder_fail() {
let conn = setup_test_db();
let recorder = SyncRunRecorder::start(&conn, "ingest issues", "fail0001").unwrap();
let row_id = recorder.row_id;
recorder.fail(&conn, "GitLab auth failed", None).unwrap();
let (status, finished_at, error, metrics_json): (
String,
Option<i64>,
Option<String>,
Option<String>,
) = conn
.query_row(
"SELECT status, finished_at, error, metrics_json
FROM sync_runs WHERE id = ?1",
[row_id],
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
)
.unwrap();
assert_eq!(status, "failed");
assert!(finished_at.is_some());
assert_eq!(error.as_deref(), Some("GitLab auth failed"));
assert!(metrics_json.is_none());
}
#[test]
fn test_sync_run_recorder_fail_with_partial_metrics() {
let conn = setup_test_db();
let recorder = SyncRunRecorder::start(&conn, "sync", "part0001").unwrap();
let row_id = recorder.row_id;
let partial_metrics = vec![StageTiming {
name: "ingest_issues".to_string(),
project: Some("group/repo".to_string()),
elapsed_ms: 800,
items_processed: 30,
items_skipped: 0,
errors: 0,
rate_limit_hits: 1,
retries: 0,
sub_stages: vec![],
}];
recorder
.fail(&conn, "Embedding failed", Some(&partial_metrics))
.unwrap();
let (status, metrics_json): (String, Option<String>) = conn
.query_row(
"SELECT status, metrics_json FROM sync_runs WHERE id = ?1",
[row_id],
|row| Ok((row.get(0)?, row.get(1)?)),
)
.unwrap();
assert_eq!(status, "failed");
assert!(metrics_json.is_some());
let parsed: Vec<StageTiming> = serde_json::from_str(&metrics_json.unwrap()).unwrap();
assert_eq!(parsed.len(), 1);
assert_eq!(parsed[0].name, "ingest_issues");
}
}