diff --git a/src/cli/commands/stats.rs b/src/cli/commands/stats.rs index 8ec3e46..d16c1bc 100644 --- a/src/cli/commands/stats.rs +++ b/src/cli/commands/stats.rs @@ -47,6 +47,9 @@ pub struct QueueStats { pub dirty_sources_failed: i64, pub pending_discussion_fetches: i64, pub pending_discussion_fetches_failed: i64, + pub pending_dependent_fetches: i64, + pub pending_dependent_fetches_failed: i64, + pub pending_dependent_fetches_stuck: i64, } #[derive(Debug, Default, Serialize)] @@ -55,6 +58,11 @@ pub struct IntegrityResult { pub fts_doc_mismatch: bool, pub orphan_embeddings: i64, pub stale_metadata: i64, + pub orphan_state_events: i64, + pub orphan_label_events: i64, + pub orphan_milestone_events: i64, + pub queue_stuck_locks: i64, + pub queue_max_attempts: i64, #[serde(skip_serializing_if = "Option::is_none")] pub repair: Option, } @@ -127,6 +135,21 @@ pub fn run_stats( )?; } + if table_exists(&conn, "pending_dependent_fetches") { + result.queues.pending_dependent_fetches = count_query( + &conn, + "SELECT COUNT(*) FROM pending_dependent_fetches WHERE last_error IS NULL", + )?; + result.queues.pending_dependent_fetches_failed = count_query( + &conn, + "SELECT COUNT(*) FROM pending_dependent_fetches WHERE last_error IS NOT NULL", + )?; + result.queues.pending_dependent_fetches_stuck = count_query( + &conn, + "SELECT COUNT(*) FROM pending_dependent_fetches WHERE locked_at IS NOT NULL", + )?; + } + // Integrity check if check { let mut integrity = IntegrityResult::default(); @@ -153,9 +176,52 @@ pub fn run_stats( )?; } + // Orphaned resource events (FK targets missing) + if table_exists(&conn, "resource_state_events") { + integrity.orphan_state_events = count_query( + &conn, + "SELECT COUNT(*) FROM resource_state_events rse + WHERE (rse.issue_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM issues i WHERE i.id = rse.issue_id)) + OR (rse.merge_request_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM merge_requests m WHERE m.id = rse.merge_request_id))", + )?; + } + if table_exists(&conn, "resource_label_events") { + integrity.orphan_label_events = count_query( + &conn, + "SELECT COUNT(*) FROM resource_label_events rle + WHERE (rle.issue_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM issues i WHERE i.id = rle.issue_id)) + OR (rle.merge_request_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM merge_requests m WHERE m.id = rle.merge_request_id))", + )?; + } + if table_exists(&conn, "resource_milestone_events") { + integrity.orphan_milestone_events = count_query( + &conn, + "SELECT COUNT(*) FROM resource_milestone_events rme + WHERE (rme.issue_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM issues i WHERE i.id = rme.issue_id)) + OR (rme.merge_request_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM merge_requests m WHERE m.id = rme.merge_request_id))", + )?; + } + + // Queue health: stuck locks and max retry attempts + if table_exists(&conn, "pending_dependent_fetches") { + integrity.queue_stuck_locks = count_query( + &conn, + "SELECT COUNT(*) FROM pending_dependent_fetches WHERE locked_at IS NOT NULL", + )?; + integrity.queue_max_attempts = count_query( + &conn, + "SELECT COALESCE(MAX(attempts), 0) FROM pending_dependent_fetches", + )?; + } + + let orphan_events = integrity.orphan_state_events + + integrity.orphan_label_events + + integrity.orphan_milestone_events; + integrity.ok = !integrity.fts_doc_mismatch && integrity.orphan_embeddings == 0 - && integrity.stale_metadata == 0; + && integrity.stale_metadata == 0 + && orphan_events == 0; // Repair if repair { @@ -260,6 +326,17 @@ pub fn print_stats(result: &StatsResult) { result.queues.pending_discussion_fetches, result.queues.pending_discussion_fetches_failed ); + if result.queues.pending_dependent_fetches > 0 + || result.queues.pending_dependent_fetches_failed > 0 + || result.queues.pending_dependent_fetches_stuck > 0 + { + println!( + " Dependent fetch: {} pending, {} failed, {} stuck", + result.queues.pending_dependent_fetches, + result.queues.pending_dependent_fetches_failed, + result.queues.pending_dependent_fetches_stuck + ); + } if let Some(ref integrity) = result.integrity { println!(); @@ -287,6 +364,33 @@ pub fn print_stats(result: &StatsResult) { integrity.stale_metadata ); } + let orphan_events = integrity.orphan_state_events + + integrity.orphan_label_events + + integrity.orphan_milestone_events; + if orphan_events > 0 { + println!( + " {} {} orphan resource events (state: {}, label: {}, milestone: {})", + style("!").red(), + orphan_events, + integrity.orphan_state_events, + integrity.orphan_label_events, + integrity.orphan_milestone_events + ); + } + if integrity.queue_stuck_locks > 0 { + println!( + " {} {} stuck queue locks", + style("!").yellow(), + integrity.queue_stuck_locks + ); + } + if integrity.queue_max_attempts > 3 { + println!( + " {} max queue retry attempts: {}", + style("!").yellow(), + integrity.queue_max_attempts + ); + } if let Some(ref repair) = integrity.repair { println!(); @@ -336,6 +440,11 @@ pub fn print_stats_json(result: &StatsResult) { fts_doc_mismatch: i.fts_doc_mismatch, orphan_embeddings: i.orphan_embeddings, stale_metadata: i.stale_metadata, + orphan_state_events: i.orphan_state_events, + orphan_label_events: i.orphan_label_events, + orphan_milestone_events: i.orphan_milestone_events, + queue_stuck_locks: i.queue_stuck_locks, + queue_max_attempts: i.queue_max_attempts, repair: i.repair.as_ref().map(|r| RepairResult { fts_rebuilt: r.fts_rebuilt, orphans_deleted: r.orphans_deleted,