Removes module-level doc comments (//! lines) and excessive inline doc comments that were duplicating information already evident from: - Function/struct names (self-documenting code) - Type signatures (the what is clear from types) - Implementation context (the how is clear from code) Affected modules: - cli/* - Removed command descriptions duplicating clap help text - core/* - Removed module headers and obvious function docs - documents/* - Removed extractor/regenerator/truncation docs - embedding/* - Removed pipeline and chunking docs - gitlab/* - Removed client and transformer docs (kept type definitions) - ingestion/* - Removed orchestrator and ingestion docs - search/* - Removed FTS and vector search docs Philosophy: Code should be self-documenting. Comments should explain "why" (business decisions, non-obvious constraints) not "what" (which the code itself shows). This change reduces noise and maintenance burden while keeping the codebase just as understandable. Retains comments for: - Non-obvious business logic - Important safety invariants - Complex algorithm explanations - Public API boundaries where generated docs matter Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
567 lines
17 KiB
Rust
567 lines
17 KiB
Rust
use std::collections::HashMap;
|
|
use std::sync::{Arc, Mutex};
|
|
use std::time::Instant;
|
|
|
|
use serde::{Deserialize, Serialize};
|
|
use tracing::Subscriber;
|
|
use tracing::span::{Attributes, Id, Record};
|
|
use tracing_subscriber::layer::{Context, Layer};
|
|
use tracing_subscriber::registry::LookupSpan;
|
|
|
|
fn is_zero(v: &usize) -> bool {
|
|
*v == 0
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct StageTiming {
|
|
pub name: String,
|
|
#[serde(skip_serializing_if = "Option::is_none", default)]
|
|
pub project: Option<String>,
|
|
pub elapsed_ms: u64,
|
|
pub items_processed: usize,
|
|
#[serde(skip_serializing_if = "is_zero", default)]
|
|
pub items_skipped: usize,
|
|
#[serde(skip_serializing_if = "is_zero", default)]
|
|
pub errors: usize,
|
|
#[serde(skip_serializing_if = "is_zero", default)]
|
|
pub rate_limit_hits: usize,
|
|
#[serde(skip_serializing_if = "is_zero", default)]
|
|
pub retries: usize,
|
|
#[serde(skip_serializing_if = "Vec::is_empty", default)]
|
|
pub sub_stages: Vec<StageTiming>,
|
|
}
|
|
|
|
struct SpanData {
|
|
name: String,
|
|
parent_id: Option<u64>,
|
|
start: Instant,
|
|
fields: HashMap<String, serde_json::Value>,
|
|
rate_limit_hits: usize,
|
|
retries: usize,
|
|
}
|
|
|
|
struct CompletedSpan {
|
|
id: u64,
|
|
parent_id: Option<u64>,
|
|
timing: StageTiming,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct MetricsLayer {
|
|
spans: Arc<Mutex<HashMap<u64, SpanData>>>,
|
|
completed: Arc<Mutex<Vec<CompletedSpan>>>,
|
|
}
|
|
|
|
impl Default for MetricsLayer {
|
|
fn default() -> Self {
|
|
Self::new()
|
|
}
|
|
}
|
|
|
|
impl MetricsLayer {
|
|
pub fn new() -> Self {
|
|
Self {
|
|
spans: Arc::new(Mutex::new(HashMap::new())),
|
|
completed: Arc::new(Mutex::new(Vec::new())),
|
|
}
|
|
}
|
|
|
|
pub fn extract_timings(&self) -> Vec<StageTiming> {
|
|
let completed = self.completed.lock().unwrap_or_else(|e| e.into_inner());
|
|
if completed.is_empty() {
|
|
return Vec::new();
|
|
}
|
|
|
|
let mut children_map: HashMap<u64, Vec<StageTiming>> = HashMap::new();
|
|
let mut roots = Vec::new();
|
|
let mut id_to_timing: HashMap<u64, StageTiming> = HashMap::new();
|
|
|
|
for entry in completed.iter() {
|
|
id_to_timing.insert(entry.id, entry.timing.clone());
|
|
}
|
|
|
|
for entry in completed.iter() {
|
|
if let Some(timing) = id_to_timing.get_mut(&entry.id)
|
|
&& let Some(children) = children_map.remove(&entry.id)
|
|
{
|
|
timing.sub_stages = children;
|
|
}
|
|
|
|
if let Some(parent_id) = entry.parent_id
|
|
&& let Some(timing) = id_to_timing.remove(&entry.id)
|
|
{
|
|
children_map.entry(parent_id).or_default().push(timing);
|
|
}
|
|
}
|
|
|
|
for entry in completed.iter() {
|
|
if entry.parent_id.is_none()
|
|
&& let Some(mut timing) = id_to_timing.remove(&entry.id)
|
|
{
|
|
if let Some(children) = children_map.remove(&entry.id) {
|
|
timing.sub_stages = children;
|
|
}
|
|
roots.push(timing);
|
|
}
|
|
}
|
|
|
|
roots
|
|
}
|
|
}
|
|
|
|
struct FieldVisitor<'a>(&'a mut HashMap<String, serde_json::Value>);
|
|
|
|
impl tracing::field::Visit for FieldVisitor<'_> {
|
|
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
|
|
self.0.insert(
|
|
field.name().to_string(),
|
|
serde_json::Value::String(format!("{value:?}")),
|
|
);
|
|
}
|
|
|
|
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
|
|
self.0.insert(
|
|
field.name().to_string(),
|
|
serde_json::Value::Number(value.into()),
|
|
);
|
|
}
|
|
|
|
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
|
|
self.0.insert(
|
|
field.name().to_string(),
|
|
serde_json::Value::Number(value.into()),
|
|
);
|
|
}
|
|
|
|
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
|
|
self.0.insert(
|
|
field.name().to_string(),
|
|
serde_json::Value::String(value.to_string()),
|
|
);
|
|
}
|
|
|
|
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
|
|
self.0
|
|
.insert(field.name().to_string(), serde_json::Value::Bool(value));
|
|
}
|
|
}
|
|
|
|
#[derive(Default)]
|
|
struct EventVisitor {
|
|
status_code: Option<u64>,
|
|
message: String,
|
|
}
|
|
|
|
impl tracing::field::Visit for EventVisitor {
|
|
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
|
|
if field.name() == "message" {
|
|
self.message = format!("{value:?}");
|
|
}
|
|
}
|
|
|
|
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
|
|
if field.name() == "status_code" {
|
|
self.status_code = Some(value);
|
|
}
|
|
}
|
|
|
|
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
|
|
if field.name() == "message" {
|
|
self.message = value.to_string();
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<S> Layer<S> for MetricsLayer
|
|
where
|
|
S: Subscriber + for<'a> LookupSpan<'a>,
|
|
{
|
|
fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
|
|
let parent_id = ctx
|
|
.span(id)
|
|
.and_then(|s| s.parent().map(|p| p.id().into_u64()));
|
|
|
|
let mut fields = HashMap::new();
|
|
let mut visitor = FieldVisitor(&mut fields);
|
|
attrs.record(&mut visitor);
|
|
|
|
self.spans.lock().unwrap_or_else(|e| e.into_inner()).insert(
|
|
id.into_u64(),
|
|
SpanData {
|
|
name: attrs.metadata().name().to_string(),
|
|
parent_id,
|
|
start: Instant::now(),
|
|
fields,
|
|
rate_limit_hits: 0,
|
|
retries: 0,
|
|
},
|
|
);
|
|
}
|
|
|
|
fn on_record(&self, id: &Id, values: &Record<'_>, _ctx: Context<'_, S>) {
|
|
if let Some(data) = self
|
|
.spans
|
|
.lock()
|
|
.unwrap_or_else(|e| e.into_inner())
|
|
.get_mut(&id.into_u64())
|
|
{
|
|
let mut visitor = FieldVisitor(&mut data.fields);
|
|
values.record(&mut visitor);
|
|
}
|
|
}
|
|
|
|
fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) {
|
|
if let Some(span_ref) = ctx.event_span(event) {
|
|
let id = span_ref.id();
|
|
if let Some(data) = self
|
|
.spans
|
|
.lock()
|
|
.unwrap_or_else(|e| e.into_inner())
|
|
.get_mut(&id.into_u64())
|
|
{
|
|
let mut visitor = EventVisitor::default();
|
|
event.record(&mut visitor);
|
|
|
|
if visitor.status_code == Some(429) {
|
|
data.rate_limit_hits += 1;
|
|
}
|
|
if visitor.message.contains("retrying") || visitor.message.contains("Retrying") {
|
|
data.retries += 1;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
fn on_close(&self, id: Id, _ctx: Context<'_, S>) {
|
|
if let Some(data) = self
|
|
.spans
|
|
.lock()
|
|
.unwrap_or_else(|e| e.into_inner())
|
|
.remove(&id.into_u64())
|
|
{
|
|
let elapsed = data.start.elapsed();
|
|
let timing = StageTiming {
|
|
name: data.name,
|
|
project: data
|
|
.fields
|
|
.get("project")
|
|
.and_then(|v| v.as_str())
|
|
.map(String::from),
|
|
elapsed_ms: elapsed.as_millis() as u64,
|
|
items_processed: data
|
|
.fields
|
|
.get("items_processed")
|
|
.and_then(|v| v.as_u64())
|
|
.unwrap_or(0) as usize,
|
|
items_skipped: data
|
|
.fields
|
|
.get("items_skipped")
|
|
.and_then(|v| v.as_u64())
|
|
.unwrap_or(0) as usize,
|
|
errors: data
|
|
.fields
|
|
.get("errors")
|
|
.and_then(|v| v.as_u64())
|
|
.unwrap_or(0) as usize,
|
|
rate_limit_hits: data.rate_limit_hits,
|
|
retries: data.retries,
|
|
sub_stages: vec![],
|
|
};
|
|
self.completed
|
|
.lock()
|
|
.unwrap_or_else(|e| e.into_inner())
|
|
.push(CompletedSpan {
|
|
id: id.into_u64(),
|
|
parent_id: data.parent_id,
|
|
timing,
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
impl std::fmt::Debug for SpanData {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
f.debug_struct("SpanData")
|
|
.field("name", &self.name)
|
|
.field("parent_id", &self.parent_id)
|
|
.field("fields", &self.fields.keys().collect::<Vec<_>>())
|
|
.finish()
|
|
}
|
|
}
|
|
|
|
impl std::fmt::Debug for CompletedSpan {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
f.debug_struct("CompletedSpan")
|
|
.field("id", &self.id)
|
|
.field("parent_id", &self.parent_id)
|
|
.field("timing", &self.timing.name)
|
|
.finish()
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use tracing_subscriber::layer::SubscriberExt;
|
|
|
|
#[test]
|
|
fn test_stage_timing_serialization() {
|
|
let timing = StageTiming {
|
|
name: "sync".to_string(),
|
|
project: None,
|
|
elapsed_ms: 1500,
|
|
items_processed: 42,
|
|
items_skipped: 3,
|
|
errors: 1,
|
|
rate_limit_hits: 2,
|
|
retries: 5,
|
|
sub_stages: vec![StageTiming {
|
|
name: "ingest_issues".to_string(),
|
|
project: Some("group/repo".to_string()),
|
|
elapsed_ms: 800,
|
|
items_processed: 30,
|
|
items_skipped: 0,
|
|
errors: 0,
|
|
rate_limit_hits: 0,
|
|
retries: 0,
|
|
sub_stages: vec![],
|
|
}],
|
|
};
|
|
|
|
let json = serde_json::to_value(&timing).unwrap();
|
|
assert_eq!(json["name"], "sync");
|
|
assert_eq!(json["elapsed_ms"], 1500);
|
|
assert_eq!(json["items_processed"], 42);
|
|
assert_eq!(json["items_skipped"], 3);
|
|
assert_eq!(json["errors"], 1);
|
|
assert_eq!(json["rate_limit_hits"], 2);
|
|
assert_eq!(json["retries"], 5);
|
|
|
|
let sub = &json["sub_stages"][0];
|
|
assert_eq!(sub["name"], "ingest_issues");
|
|
assert_eq!(sub["project"], "group/repo");
|
|
assert_eq!(sub["elapsed_ms"], 800);
|
|
}
|
|
|
|
#[test]
|
|
fn test_stage_timing_zero_fields_omitted() {
|
|
let timing = StageTiming {
|
|
name: "embed".to_string(),
|
|
project: None,
|
|
elapsed_ms: 500,
|
|
items_processed: 10,
|
|
items_skipped: 0,
|
|
errors: 0,
|
|
rate_limit_hits: 0,
|
|
retries: 0,
|
|
sub_stages: vec![],
|
|
};
|
|
|
|
let json = serde_json::to_value(&timing).unwrap();
|
|
let obj = json.as_object().unwrap();
|
|
|
|
assert!(!obj.contains_key("items_skipped"));
|
|
assert!(!obj.contains_key("errors"));
|
|
assert!(!obj.contains_key("rate_limit_hits"));
|
|
assert!(!obj.contains_key("retries"));
|
|
assert!(!obj.contains_key("sub_stages"));
|
|
assert!(!obj.contains_key("project"));
|
|
|
|
assert!(obj.contains_key("name"));
|
|
assert!(obj.contains_key("elapsed_ms"));
|
|
assert!(obj.contains_key("items_processed"));
|
|
}
|
|
|
|
#[test]
|
|
fn test_stage_timing_empty_sub_stages() {
|
|
let timing = StageTiming {
|
|
name: "docs".to_string(),
|
|
project: None,
|
|
elapsed_ms: 200,
|
|
items_processed: 5,
|
|
items_skipped: 0,
|
|
errors: 0,
|
|
rate_limit_hits: 0,
|
|
retries: 0,
|
|
sub_stages: vec![],
|
|
};
|
|
|
|
let json_str = serde_json::to_string(&timing).unwrap();
|
|
assert!(!json_str.contains("sub_stages"));
|
|
}
|
|
|
|
#[test]
|
|
fn test_stage_timing_debug_clone() {
|
|
let timing = StageTiming {
|
|
name: "test".to_string(),
|
|
project: None,
|
|
elapsed_ms: 100,
|
|
items_processed: 1,
|
|
items_skipped: 0,
|
|
errors: 0,
|
|
rate_limit_hits: 0,
|
|
retries: 0,
|
|
sub_stages: vec![],
|
|
};
|
|
|
|
let cloned = timing.clone();
|
|
assert_eq!(cloned.name, "test");
|
|
|
|
let debug_str = format!("{timing:?}");
|
|
assert!(debug_str.contains("test"));
|
|
}
|
|
|
|
#[test]
|
|
fn test_stage_timing_nested_sub_stages() {
|
|
let timing = StageTiming {
|
|
name: "sync".to_string(),
|
|
project: None,
|
|
elapsed_ms: 3000,
|
|
items_processed: 100,
|
|
items_skipped: 0,
|
|
errors: 0,
|
|
rate_limit_hits: 0,
|
|
retries: 0,
|
|
sub_stages: vec![StageTiming {
|
|
name: "ingest".to_string(),
|
|
project: None,
|
|
elapsed_ms: 2000,
|
|
items_processed: 80,
|
|
items_skipped: 0,
|
|
errors: 0,
|
|
rate_limit_hits: 0,
|
|
retries: 0,
|
|
sub_stages: vec![StageTiming {
|
|
name: "discussions".to_string(),
|
|
project: Some("a/b".to_string()),
|
|
elapsed_ms: 1000,
|
|
items_processed: 50,
|
|
items_skipped: 0,
|
|
errors: 0,
|
|
rate_limit_hits: 0,
|
|
retries: 0,
|
|
sub_stages: vec![],
|
|
}],
|
|
}],
|
|
};
|
|
|
|
let json = serde_json::to_value(&timing).unwrap();
|
|
let nested = &json["sub_stages"][0]["sub_stages"][0];
|
|
assert_eq!(nested["name"], "discussions");
|
|
assert_eq!(nested["project"], "a/b");
|
|
}
|
|
|
|
#[test]
|
|
fn test_rate_limit_fields_omitted_when_zero() {
|
|
let timing = StageTiming {
|
|
name: "ingest".to_string(),
|
|
project: None,
|
|
elapsed_ms: 100,
|
|
items_processed: 5,
|
|
items_skipped: 0,
|
|
errors: 0,
|
|
rate_limit_hits: 0,
|
|
retries: 0,
|
|
sub_stages: vec![],
|
|
};
|
|
|
|
let json_str = serde_json::to_string(&timing).unwrap();
|
|
assert!(!json_str.contains("rate_limit_hits"));
|
|
assert!(!json_str.contains("retries"));
|
|
}
|
|
|
|
#[test]
|
|
fn test_rate_limit_fields_present_when_nonzero() {
|
|
let timing = StageTiming {
|
|
name: "ingest".to_string(),
|
|
project: None,
|
|
elapsed_ms: 100,
|
|
items_processed: 5,
|
|
items_skipped: 0,
|
|
errors: 0,
|
|
rate_limit_hits: 3,
|
|
retries: 7,
|
|
sub_stages: vec![],
|
|
};
|
|
|
|
let json = serde_json::to_value(&timing).unwrap();
|
|
assert_eq!(json["rate_limit_hits"], 3);
|
|
assert_eq!(json["retries"], 7);
|
|
}
|
|
|
|
#[test]
|
|
fn test_metrics_layer_single_span() {
|
|
let metrics = MetricsLayer::new();
|
|
let subscriber = tracing_subscriber::registry().with(metrics.clone());
|
|
|
|
tracing::subscriber::with_default(subscriber, || {
|
|
let span = tracing::info_span!("test_stage");
|
|
let _guard = span.enter();
|
|
});
|
|
|
|
let timings = metrics.extract_timings();
|
|
assert_eq!(timings.len(), 1);
|
|
assert_eq!(timings[0].name, "test_stage");
|
|
assert!(timings[0].elapsed_ms < 100);
|
|
}
|
|
|
|
#[test]
|
|
fn test_metrics_layer_nested_spans() {
|
|
let metrics = MetricsLayer::new();
|
|
let subscriber = tracing_subscriber::registry().with(metrics.clone());
|
|
|
|
tracing::subscriber::with_default(subscriber, || {
|
|
let parent = tracing::info_span!("parent");
|
|
let _parent_guard = parent.enter();
|
|
{
|
|
let child = tracing::info_span!("child");
|
|
let _child_guard = child.enter();
|
|
}
|
|
});
|
|
|
|
let timings = metrics.extract_timings();
|
|
assert_eq!(timings.len(), 1);
|
|
assert_eq!(timings[0].name, "parent");
|
|
assert_eq!(timings[0].sub_stages.len(), 1);
|
|
assert_eq!(timings[0].sub_stages[0].name, "child");
|
|
}
|
|
|
|
#[test]
|
|
fn test_metrics_layer_parallel_spans() {
|
|
let metrics = MetricsLayer::new();
|
|
let subscriber = tracing_subscriber::registry().with(metrics.clone());
|
|
|
|
tracing::subscriber::with_default(subscriber, || {
|
|
let parent = tracing::info_span!("parent");
|
|
let _parent_guard = parent.enter();
|
|
{
|
|
let child_a = tracing::info_span!("child_a");
|
|
let _a = child_a.enter();
|
|
}
|
|
{
|
|
let child_b = tracing::info_span!("child_b");
|
|
let _b = child_b.enter();
|
|
}
|
|
});
|
|
|
|
let timings = metrics.extract_timings();
|
|
assert_eq!(timings.len(), 1);
|
|
assert_eq!(timings[0].sub_stages.len(), 2);
|
|
|
|
let names: Vec<&str> = timings[0]
|
|
.sub_stages
|
|
.iter()
|
|
.map(|s| s.name.as_str())
|
|
.collect();
|
|
assert!(names.contains(&"child_a"));
|
|
assert!(names.contains(&"child_b"));
|
|
}
|
|
|
|
#[test]
|
|
fn test_metrics_layer_empty() {
|
|
let metrics = MetricsLayer::new();
|
|
let timings = metrics.extract_timings();
|
|
assert!(timings.is_empty());
|
|
}
|
|
}
|