//! Performance metrics types and tracing layer for sync pipeline observability. //! //! Provides: //! - [`StageTiming`]: Serializable timing/counter data for pipeline stages //! - [`MetricsLayer`]: Custom tracing subscriber layer that captures span timing use std::collections::HashMap; use std::sync::{Arc, Mutex}; use std::time::Instant; use serde::{Deserialize, Serialize}; use tracing::Subscriber; use tracing::span::{Attributes, Id, Record}; use tracing_subscriber::layer::{Context, Layer}; use tracing_subscriber::registry::LookupSpan; /// Returns true when value is zero (for serde `skip_serializing_if`). fn is_zero(v: &usize) -> bool { *v == 0 } /// Timing and counter data for a single pipeline stage. /// /// Supports nested sub-stages for hierarchical timing breakdowns. /// Fields with zero/empty values are omitted from JSON output to /// keep robot-mode payloads compact. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StageTiming { pub name: String, #[serde(skip_serializing_if = "Option::is_none", default)] pub project: Option, pub elapsed_ms: u64, pub items_processed: usize, #[serde(skip_serializing_if = "is_zero", default)] pub items_skipped: usize, #[serde(skip_serializing_if = "is_zero", default)] pub errors: usize, #[serde(skip_serializing_if = "is_zero", default)] pub rate_limit_hits: usize, #[serde(skip_serializing_if = "is_zero", default)] pub retries: usize, #[serde(skip_serializing_if = "Vec::is_empty", default)] pub sub_stages: Vec, } // ============================================================================ // MetricsLayer: custom tracing subscriber layer // ============================================================================ /// Internal data tracked per open span. struct SpanData { name: String, parent_id: Option, start: Instant, fields: HashMap, rate_limit_hits: usize, retries: usize, } /// Completed span data with its original ID and parent ID. struct CompletedSpan { id: u64, parent_id: Option, timing: StageTiming, } /// Custom tracing layer that captures span timing and structured fields. /// /// Collects data from `#[instrument]` spans and materializes it into /// a `Vec` tree via [`extract_timings`]. /// /// Thread-safe via `Arc>` — suitable for concurrent span operations. #[derive(Debug, Clone)] pub struct MetricsLayer { spans: Arc>>, completed: Arc>>, } impl Default for MetricsLayer { fn default() -> Self { Self::new() } } impl MetricsLayer { pub fn new() -> Self { Self { spans: Arc::new(Mutex::new(HashMap::new())), completed: Arc::new(Mutex::new(Vec::new())), } } /// Extract timing tree for a completed run. /// /// Returns the top-level stages with sub-stages nested. /// Call after the root span closes. pub fn extract_timings(&self) -> Vec { let completed = self.completed.lock().unwrap_or_else(|e| e.into_inner()); if completed.is_empty() { return Vec::new(); } // Build children map: parent_id -> Vec let mut children_map: HashMap> = HashMap::new(); let mut roots = Vec::new(); let mut id_to_timing: HashMap = HashMap::new(); // First pass: collect all timings by ID for entry in completed.iter() { id_to_timing.insert(entry.id, entry.timing.clone()); } // Second pass: process in reverse order (children close before parents) // to build the tree bottom-up for entry in completed.iter() { // Attach any children that were collected for this span if let Some(timing) = id_to_timing.get_mut(&entry.id) && let Some(children) = children_map.remove(&entry.id) { timing.sub_stages = children; } if let Some(parent_id) = entry.parent_id { // This is a child span — attach to parent's children if let Some(timing) = id_to_timing.remove(&entry.id) { children_map.entry(parent_id).or_default().push(timing); } } } // Remaining entries in id_to_timing are roots for entry in completed.iter() { if entry.parent_id.is_none() && let Some(mut timing) = id_to_timing.remove(&entry.id) { if let Some(children) = children_map.remove(&entry.id) { timing.sub_stages = children; } roots.push(timing); } } roots } } /// Visitor that extracts field values from span attributes. struct FieldVisitor<'a>(&'a mut HashMap); impl tracing::field::Visit for FieldVisitor<'_> { fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { self.0.insert( field.name().to_string(), serde_json::Value::String(format!("{value:?}")), ); } fn record_i64(&mut self, field: &tracing::field::Field, value: i64) { self.0.insert( field.name().to_string(), serde_json::Value::Number(value.into()), ); } fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { self.0.insert( field.name().to_string(), serde_json::Value::Number(value.into()), ); } fn record_str(&mut self, field: &tracing::field::Field, value: &str) { self.0.insert( field.name().to_string(), serde_json::Value::String(value.to_string()), ); } fn record_bool(&mut self, field: &tracing::field::Field, value: bool) { self.0 .insert(field.name().to_string(), serde_json::Value::Bool(value)); } } /// Visitor that extracts event fields for rate-limit/retry detection. #[derive(Default)] struct EventVisitor { status_code: Option, message: String, } impl tracing::field::Visit for EventVisitor { fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { if field.name() == "message" { self.message = format!("{value:?}"); } } fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { if field.name() == "status_code" { self.status_code = Some(value); } } fn record_str(&mut self, field: &tracing::field::Field, value: &str) { if field.name() == "message" { self.message = value.to_string(); } } } impl Layer for MetricsLayer where S: Subscriber + for<'a> LookupSpan<'a>, { fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) { let parent_id = ctx .span(id) .and_then(|s| s.parent().map(|p| p.id().into_u64())); let mut fields = HashMap::new(); let mut visitor = FieldVisitor(&mut fields); attrs.record(&mut visitor); self.spans.lock().unwrap_or_else(|e| e.into_inner()).insert( id.into_u64(), SpanData { name: attrs.metadata().name().to_string(), parent_id, start: Instant::now(), fields, rate_limit_hits: 0, retries: 0, }, ); } fn on_record(&self, id: &Id, values: &Record<'_>, _ctx: Context<'_, S>) { if let Some(data) = self .spans .lock() .unwrap_or_else(|e| e.into_inner()) .get_mut(&id.into_u64()) { let mut visitor = FieldVisitor(&mut data.fields); values.record(&mut visitor); } } fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) { // Count rate-limit and retry events on the current span if let Some(span_ref) = ctx.event_span(event) { let id = span_ref.id(); if let Some(data) = self .spans .lock() .unwrap_or_else(|e| e.into_inner()) .get_mut(&id.into_u64()) { let mut visitor = EventVisitor::default(); event.record(&mut visitor); if visitor.status_code == Some(429) { data.rate_limit_hits += 1; } if visitor.message.contains("retrying") || visitor.message.contains("Retrying") { data.retries += 1; } } } } fn on_close(&self, id: Id, _ctx: Context<'_, S>) { if let Some(data) = self .spans .lock() .unwrap_or_else(|e| e.into_inner()) .remove(&id.into_u64()) { let elapsed = data.start.elapsed(); let timing = StageTiming { name: data.name, project: data .fields .get("project") .and_then(|v| v.as_str()) .map(String::from), elapsed_ms: elapsed.as_millis() as u64, items_processed: data .fields .get("items_processed") .and_then(|v| v.as_u64()) .unwrap_or(0) as usize, items_skipped: data .fields .get("items_skipped") .and_then(|v| v.as_u64()) .unwrap_or(0) as usize, errors: data .fields .get("errors") .and_then(|v| v.as_u64()) .unwrap_or(0) as usize, rate_limit_hits: data.rate_limit_hits, retries: data.retries, sub_stages: vec![], }; self.completed .lock() .unwrap_or_else(|e| e.into_inner()) .push(CompletedSpan { id: id.into_u64(), parent_id: data.parent_id, timing, }); } } } // Manual Debug impl since SpanData and CompletedSpan don't derive Debug impl std::fmt::Debug for SpanData { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("SpanData") .field("name", &self.name) .field("parent_id", &self.parent_id) .field("fields", &self.fields.keys().collect::>()) .finish() } } impl std::fmt::Debug for CompletedSpan { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("CompletedSpan") .field("id", &self.id) .field("parent_id", &self.parent_id) .field("timing", &self.timing.name) .finish() } } #[cfg(test)] mod tests { use super::*; use tracing_subscriber::layer::SubscriberExt; #[test] fn test_stage_timing_serialization() { let timing = StageTiming { name: "sync".to_string(), project: None, elapsed_ms: 1500, items_processed: 42, items_skipped: 3, errors: 1, rate_limit_hits: 2, retries: 5, sub_stages: vec![StageTiming { name: "ingest_issues".to_string(), project: Some("group/repo".to_string()), elapsed_ms: 800, items_processed: 30, items_skipped: 0, errors: 0, rate_limit_hits: 0, retries: 0, sub_stages: vec![], }], }; let json = serde_json::to_value(&timing).unwrap(); assert_eq!(json["name"], "sync"); assert_eq!(json["elapsed_ms"], 1500); assert_eq!(json["items_processed"], 42); assert_eq!(json["items_skipped"], 3); assert_eq!(json["errors"], 1); assert_eq!(json["rate_limit_hits"], 2); assert_eq!(json["retries"], 5); // Sub-stage present let sub = &json["sub_stages"][0]; assert_eq!(sub["name"], "ingest_issues"); assert_eq!(sub["project"], "group/repo"); assert_eq!(sub["elapsed_ms"], 800); } #[test] fn test_stage_timing_zero_fields_omitted() { let timing = StageTiming { name: "embed".to_string(), project: None, elapsed_ms: 500, items_processed: 10, items_skipped: 0, errors: 0, rate_limit_hits: 0, retries: 0, sub_stages: vec![], }; let json = serde_json::to_value(&timing).unwrap(); let obj = json.as_object().unwrap(); // Zero fields must be absent assert!(!obj.contains_key("items_skipped")); assert!(!obj.contains_key("errors")); assert!(!obj.contains_key("rate_limit_hits")); assert!(!obj.contains_key("retries")); assert!(!obj.contains_key("sub_stages")); assert!(!obj.contains_key("project")); // Required fields always present assert!(obj.contains_key("name")); assert!(obj.contains_key("elapsed_ms")); assert!(obj.contains_key("items_processed")); } #[test] fn test_stage_timing_empty_sub_stages() { let timing = StageTiming { name: "docs".to_string(), project: None, elapsed_ms: 200, items_processed: 5, items_skipped: 0, errors: 0, rate_limit_hits: 0, retries: 0, sub_stages: vec![], }; let json_str = serde_json::to_string(&timing).unwrap(); assert!(!json_str.contains("sub_stages")); } #[test] fn test_stage_timing_debug_clone() { let timing = StageTiming { name: "test".to_string(), project: None, elapsed_ms: 100, items_processed: 1, items_skipped: 0, errors: 0, rate_limit_hits: 0, retries: 0, sub_stages: vec![], }; let cloned = timing.clone(); assert_eq!(cloned.name, "test"); let debug_str = format!("{timing:?}"); assert!(debug_str.contains("test")); } #[test] fn test_stage_timing_nested_sub_stages() { let timing = StageTiming { name: "sync".to_string(), project: None, elapsed_ms: 3000, items_processed: 100, items_skipped: 0, errors: 0, rate_limit_hits: 0, retries: 0, sub_stages: vec![StageTiming { name: "ingest".to_string(), project: None, elapsed_ms: 2000, items_processed: 80, items_skipped: 0, errors: 0, rate_limit_hits: 0, retries: 0, sub_stages: vec![StageTiming { name: "discussions".to_string(), project: Some("a/b".to_string()), elapsed_ms: 1000, items_processed: 50, items_skipped: 0, errors: 0, rate_limit_hits: 0, retries: 0, sub_stages: vec![], }], }], }; let json = serde_json::to_value(&timing).unwrap(); let nested = &json["sub_stages"][0]["sub_stages"][0]; assert_eq!(nested["name"], "discussions"); assert_eq!(nested["project"], "a/b"); } #[test] fn test_rate_limit_fields_omitted_when_zero() { let timing = StageTiming { name: "ingest".to_string(), project: None, elapsed_ms: 100, items_processed: 5, items_skipped: 0, errors: 0, rate_limit_hits: 0, retries: 0, sub_stages: vec![], }; let json_str = serde_json::to_string(&timing).unwrap(); assert!(!json_str.contains("rate_limit_hits")); assert!(!json_str.contains("retries")); } #[test] fn test_rate_limit_fields_present_when_nonzero() { let timing = StageTiming { name: "ingest".to_string(), project: None, elapsed_ms: 100, items_processed: 5, items_skipped: 0, errors: 0, rate_limit_hits: 3, retries: 7, sub_stages: vec![], }; let json = serde_json::to_value(&timing).unwrap(); assert_eq!(json["rate_limit_hits"], 3); assert_eq!(json["retries"], 7); } #[test] fn test_metrics_layer_single_span() { let metrics = MetricsLayer::new(); let subscriber = tracing_subscriber::registry().with(metrics.clone()); tracing::subscriber::with_default(subscriber, || { let span = tracing::info_span!("test_stage"); let _guard = span.enter(); // Simulate work }); let timings = metrics.extract_timings(); assert_eq!(timings.len(), 1); assert_eq!(timings[0].name, "test_stage"); assert!(timings[0].elapsed_ms < 100); // Should be near-instant } #[test] fn test_metrics_layer_nested_spans() { let metrics = MetricsLayer::new(); let subscriber = tracing_subscriber::registry().with(metrics.clone()); tracing::subscriber::with_default(subscriber, || { let parent = tracing::info_span!("parent"); let _parent_guard = parent.enter(); { let child = tracing::info_span!("child"); let _child_guard = child.enter(); } }); let timings = metrics.extract_timings(); assert_eq!(timings.len(), 1); assert_eq!(timings[0].name, "parent"); assert_eq!(timings[0].sub_stages.len(), 1); assert_eq!(timings[0].sub_stages[0].name, "child"); } #[test] fn test_metrics_layer_parallel_spans() { let metrics = MetricsLayer::new(); let subscriber = tracing_subscriber::registry().with(metrics.clone()); tracing::subscriber::with_default(subscriber, || { let parent = tracing::info_span!("parent"); let _parent_guard = parent.enter(); { let child_a = tracing::info_span!("child_a"); let _a = child_a.enter(); } { let child_b = tracing::info_span!("child_b"); let _b = child_b.enter(); } }); let timings = metrics.extract_timings(); assert_eq!(timings.len(), 1); assert_eq!(timings[0].sub_stages.len(), 2); let names: Vec<&str> = timings[0] .sub_stages .iter() .map(|s| s.name.as_str()) .collect(); assert!(names.contains(&"child_a")); assert!(names.contains(&"child_b")); } #[test] fn test_metrics_layer_empty() { let metrics = MetricsLayer::new(); let timings = metrics.extract_timings(); assert!(timings.is_empty()); } }