use crate::cli::render::{self, Theme}; use rusqlite::Connection; use serde::Serialize; use crate::Config; use crate::cli::robot::RobotMeta; use crate::core::db::create_connection; use crate::core::error::Result; use crate::core::paths::get_db_path; #[derive(Debug, Default, Serialize)] pub struct StatsResult { pub documents: DocumentStats, pub embeddings: EmbeddingStats, pub fts: FtsStats, pub queues: QueueStats, #[serde(skip_serializing_if = "Option::is_none")] pub integrity: Option, } #[derive(Debug, Default, Serialize)] pub struct DocumentStats { pub total: i64, pub issues: i64, pub merge_requests: i64, pub discussions: i64, pub truncated: i64, } #[derive(Debug, Default, Serialize)] pub struct EmbeddingStats { pub embedded_documents: i64, pub total_chunks: i64, pub coverage_pct: f64, } #[derive(Debug, Default, Serialize)] pub struct FtsStats { pub indexed: i64, } #[derive(Debug, Default, Serialize)] pub struct QueueStats { pub dirty_sources: i64, pub dirty_sources_failed: i64, pub pending_discussion_fetches: i64, pub pending_discussion_fetches_failed: i64, pub pending_dependent_fetches: i64, pub pending_dependent_fetches_failed: i64, pub pending_dependent_fetches_stuck: i64, } #[derive(Debug, Default, Serialize)] pub struct IntegrityResult { pub ok: bool, pub fts_doc_mismatch: bool, pub orphan_embeddings: i64, pub stale_metadata: i64, pub orphan_state_events: i64, pub orphan_label_events: i64, pub orphan_milestone_events: i64, pub queue_stuck_locks: i64, pub queue_max_attempts: i64, #[serde(skip_serializing_if = "Option::is_none")] pub repair: Option, } #[derive(Debug, Default, Serialize)] pub struct RepairResult { pub fts_rebuilt: bool, pub orphans_deleted: i64, pub stale_cleared: i64, pub dry_run: bool, } pub fn run_stats(config: &Config, check: bool, repair: bool, dry_run: bool) -> Result { let db_path = get_db_path(config.storage.db_path.as_deref()); let conn = create_connection(&db_path)?; let mut result = StatsResult::default(); // Single-scan conditional aggregate: 5 sequential COUNT(*) → 1 table scan let (total, issues, mrs, discussions, truncated) = conn .query_row( "SELECT COUNT(*), COALESCE(SUM(CASE WHEN source_type = 'issue' THEN 1 END), 0), COALESCE(SUM(CASE WHEN source_type = 'merge_request' THEN 1 END), 0), COALESCE(SUM(CASE WHEN source_type = 'discussion' THEN 1 END), 0), COALESCE(SUM(CASE WHEN is_truncated = 1 THEN 1 END), 0) FROM documents", [], |row| { Ok(( row.get::<_, i64>(0)?, row.get::<_, i64>(1)?, row.get::<_, i64>(2)?, row.get::<_, i64>(3)?, row.get::<_, i64>(4)?, )) }, ) .unwrap_or((0, 0, 0, 0, 0)); result.documents.total = total; result.documents.issues = issues; result.documents.merge_requests = mrs; result.documents.discussions = discussions; result.documents.truncated = truncated; if table_exists(&conn, "embedding_metadata") { // Single scan: COUNT(DISTINCT) + COUNT(*) in one pass let (embedded, chunks) = conn .query_row( "SELECT COUNT(DISTINCT document_id), COUNT(*) FROM embedding_metadata WHERE last_error IS NULL", [], |row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?)), ) .unwrap_or((0, 0)); result.embeddings.embedded_documents = embedded; result.embeddings.total_chunks = chunks; result.embeddings.coverage_pct = if result.documents.total > 0 { (embedded as f64 / result.documents.total as f64) * 100.0 } else { 0.0 }; } // FTS5 shadow table is a regular B-tree with one row per document — // 19x faster than scanning the virtual table for COUNT(*) result.fts.indexed = count_query(&conn, "SELECT COUNT(*) FROM documents_fts_docsize")?; // Single scan: 2 conditional counts on dirty_sources let (ds_pending, ds_failed) = conn .query_row( "SELECT COALESCE(SUM(CASE WHEN last_error IS NULL THEN 1 END), 0), COALESCE(SUM(CASE WHEN last_error IS NOT NULL THEN 1 END), 0) FROM dirty_sources", [], |row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?)), ) .unwrap_or((0, 0)); result.queues.dirty_sources = ds_pending; result.queues.dirty_sources_failed = ds_failed; if table_exists(&conn, "pending_discussion_fetches") { let (pdf_pending, pdf_failed) = conn .query_row( "SELECT COALESCE(SUM(CASE WHEN last_error IS NULL THEN 1 END), 0), COALESCE(SUM(CASE WHEN last_error IS NOT NULL THEN 1 END), 0) FROM pending_discussion_fetches", [], |row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?)), ) .unwrap_or((0, 0)); result.queues.pending_discussion_fetches = pdf_pending; result.queues.pending_discussion_fetches_failed = pdf_failed; } if table_exists(&conn, "pending_dependent_fetches") { let (pf_pending, pf_failed, pf_stuck) = conn .query_row( "SELECT COALESCE(SUM(CASE WHEN last_error IS NULL THEN 1 END), 0), COALESCE(SUM(CASE WHEN last_error IS NOT NULL THEN 1 END), 0), COALESCE(SUM(CASE WHEN locked_at IS NOT NULL THEN 1 END), 0) FROM pending_dependent_fetches", [], |row| { Ok(( row.get::<_, i64>(0)?, row.get::<_, i64>(1)?, row.get::<_, i64>(2)?, )) }, ) .unwrap_or((0, 0, 0)); result.queues.pending_dependent_fetches = pf_pending; result.queues.pending_dependent_fetches_failed = pf_failed; result.queues.pending_dependent_fetches_stuck = pf_stuck; } #[allow(clippy::field_reassign_with_default)] if check { let mut integrity = IntegrityResult::default(); integrity.fts_doc_mismatch = result.fts.indexed != result.documents.total; if table_exists(&conn, "embeddings") { integrity.orphan_embeddings = count_query( &conn, "SELECT COUNT(*) FROM embedding_metadata em WHERE NOT EXISTS (SELECT 1 FROM documents d WHERE d.id = em.document_id)", )?; } if table_exists(&conn, "embedding_metadata") { integrity.stale_metadata = count_query( &conn, "SELECT COUNT(*) FROM embedding_metadata em JOIN documents d ON d.id = em.document_id WHERE em.chunk_index = 0 AND em.document_hash != d.content_hash", )?; } if table_exists(&conn, "resource_state_events") { integrity.orphan_state_events = count_query( &conn, "SELECT COUNT(*) FROM resource_state_events rse WHERE (rse.issue_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM issues i WHERE i.id = rse.issue_id)) OR (rse.merge_request_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM merge_requests m WHERE m.id = rse.merge_request_id))", )?; } if table_exists(&conn, "resource_label_events") { integrity.orphan_label_events = count_query( &conn, "SELECT COUNT(*) FROM resource_label_events rle WHERE (rle.issue_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM issues i WHERE i.id = rle.issue_id)) OR (rle.merge_request_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM merge_requests m WHERE m.id = rle.merge_request_id))", )?; } if table_exists(&conn, "resource_milestone_events") { integrity.orphan_milestone_events = count_query( &conn, "SELECT COUNT(*) FROM resource_milestone_events rme WHERE (rme.issue_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM issues i WHERE i.id = rme.issue_id)) OR (rme.merge_request_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM merge_requests m WHERE m.id = rme.merge_request_id))", )?; } if table_exists(&conn, "pending_dependent_fetches") { integrity.queue_stuck_locks = count_query( &conn, "SELECT COUNT(*) FROM pending_dependent_fetches WHERE locked_at IS NOT NULL", )?; integrity.queue_max_attempts = count_query( &conn, "SELECT COALESCE(MAX(attempts), 0) FROM pending_dependent_fetches", )?; } let orphan_events = integrity.orphan_state_events + integrity.orphan_label_events + integrity.orphan_milestone_events; integrity.ok = !integrity.fts_doc_mismatch && integrity.orphan_embeddings == 0 && integrity.stale_metadata == 0 && orphan_events == 0; if repair { let mut repair_result = RepairResult::default(); repair_result.dry_run = dry_run; if integrity.fts_doc_mismatch { if !dry_run { conn.execute( "INSERT INTO documents_fts(documents_fts) VALUES('rebuild')", [], )?; } repair_result.fts_rebuilt = true; } if integrity.orphan_embeddings > 0 && table_exists(&conn, "embedding_metadata") { if !dry_run { let deleted = conn.execute( "DELETE FROM embedding_metadata WHERE NOT EXISTS (SELECT 1 FROM documents d WHERE d.id = embedding_metadata.document_id)", [], )?; repair_result.orphans_deleted = deleted as i64; if table_exists(&conn, "embeddings") { let _ = conn.execute( "DELETE FROM embeddings WHERE rowid / 1000 NOT IN (SELECT id FROM documents)", [], ); } } else { repair_result.orphans_deleted = integrity.orphan_embeddings; } } if integrity.stale_metadata > 0 && table_exists(&conn, "embedding_metadata") { if !dry_run { let cleared = conn.execute( "DELETE FROM embedding_metadata WHERE document_id IN ( SELECT em.document_id FROM embedding_metadata em JOIN documents d ON d.id = em.document_id WHERE em.chunk_index = 0 AND em.document_hash != d.content_hash )", [], )?; repair_result.stale_cleared = cleared as i64; } else { repair_result.stale_cleared = integrity.stale_metadata; } } integrity.repair = Some(repair_result); } result.integrity = Some(integrity); } Ok(result) } fn count_query(conn: &Connection, sql: &str) -> Result { let count: i64 = conn.query_row(sql, [], |row| row.get(0)).unwrap_or(0); Ok(count) } fn table_exists(conn: &Connection, table: &str) -> bool { conn.query_row( "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1", [table], |row| row.get::<_, i64>(0), ) .unwrap_or(0) > 0 } fn section(title: &str) { println!("{}", render::section_divider(title)); } pub fn print_stats(result: &StatsResult) { section("Documents"); let mut parts = vec![format!( "{} total", render::format_number(result.documents.total) )]; if result.documents.issues > 0 { parts.push(format!( "{} issues", render::format_number(result.documents.issues) )); } if result.documents.merge_requests > 0 { parts.push(format!( "{} MRs", render::format_number(result.documents.merge_requests) )); } if result.documents.discussions > 0 { parts.push(format!( "{} discussions", render::format_number(result.documents.discussions) )); } println!(" {}", parts.join(" \u{b7} ")); if result.documents.truncated > 0 { println!( " {}", Theme::warning().render(&format!( "{} truncated", render::format_number(result.documents.truncated) )) ); } section("Search Index"); println!( " {} FTS indexed", render::format_number(result.fts.indexed) ); let coverage_color = if result.embeddings.coverage_pct >= 95.0 { Theme::success().render(&format!("{:.0}%", result.embeddings.coverage_pct)) } else if result.embeddings.coverage_pct >= 50.0 { Theme::warning().render(&format!("{:.0}%", result.embeddings.coverage_pct)) } else { Theme::error().render(&format!("{:.0}%", result.embeddings.coverage_pct)) }; println!( " {} embedding coverage ({}/{})", coverage_color, render::format_number(result.embeddings.embedded_documents), render::format_number(result.documents.total), ); if result.embeddings.total_chunks > 0 { println!( " {}", Theme::dim().render(&format!( "{} chunks", render::format_number(result.embeddings.total_chunks) )) ); } // Queues: only show if there's anything to report let has_queue_activity = result.queues.dirty_sources > 0 || result.queues.dirty_sources_failed > 0 || result.queues.pending_discussion_fetches > 0 || result.queues.pending_discussion_fetches_failed > 0 || result.queues.pending_dependent_fetches > 0 || result.queues.pending_dependent_fetches_failed > 0; if has_queue_activity { section("Queues"); if result.queues.dirty_sources > 0 || result.queues.dirty_sources_failed > 0 { let mut q = Vec::new(); if result.queues.dirty_sources > 0 { q.push(format!("{} pending", result.queues.dirty_sources)); } if result.queues.dirty_sources_failed > 0 { q.push( Theme::error() .render(&format!("{} failed", result.queues.dirty_sources_failed)), ); } println!(" dirty sources: {}", q.join(", ")); } if result.queues.pending_discussion_fetches > 0 || result.queues.pending_discussion_fetches_failed > 0 { let mut q = Vec::new(); if result.queues.pending_discussion_fetches > 0 { q.push(format!( "{} pending", result.queues.pending_discussion_fetches )); } if result.queues.pending_discussion_fetches_failed > 0 { q.push(Theme::error().render(&format!( "{} failed", result.queues.pending_discussion_fetches_failed ))); } println!(" discussion fetch: {}", q.join(", ")); } if result.queues.pending_dependent_fetches > 0 || result.queues.pending_dependent_fetches_failed > 0 { let mut q = Vec::new(); if result.queues.pending_dependent_fetches > 0 { q.push(format!( "{} pending", result.queues.pending_dependent_fetches )); } if result.queues.pending_dependent_fetches_failed > 0 { q.push(Theme::error().render(&format!( "{} failed", result.queues.pending_dependent_fetches_failed ))); } if result.queues.pending_dependent_fetches_stuck > 0 { q.push(Theme::warning().render(&format!( "{} stuck", result.queues.pending_dependent_fetches_stuck ))); } println!(" dependent fetch: {}", q.join(", ")); } } else { section("Queues"); println!(" {}", Theme::success().render("all clear")); } if let Some(ref integrity) = result.integrity { section("Integrity"); if integrity.ok { println!( " {} all checks passed", Theme::success().render("\u{2713}") ); } else { if integrity.fts_doc_mismatch { println!( " {} FTS/document count mismatch", Theme::error().render("\u{2717}") ); } if integrity.orphan_embeddings > 0 { println!( " {} {} orphan embeddings", Theme::error().render("\u{2717}"), integrity.orphan_embeddings ); } if integrity.stale_metadata > 0 { println!( " {} {} stale embedding metadata", Theme::error().render("\u{2717}"), integrity.stale_metadata ); } let orphan_events = integrity.orphan_state_events + integrity.orphan_label_events + integrity.orphan_milestone_events; if orphan_events > 0 { println!( " {} {} orphan resource events", Theme::error().render("\u{2717}"), orphan_events ); } if integrity.queue_stuck_locks > 0 { println!( " {} {} stuck queue locks", Theme::warning().render("!"), integrity.queue_stuck_locks ); } } if let Some(ref repair) = integrity.repair { println!(); if repair.dry_run { println!( " {} {}", Theme::bold().render("Repair"), Theme::warning().render("(dry run)") ); } else { println!(" {}", Theme::bold().render("Repair")); } let action = if repair.dry_run { Theme::warning().render("would fix") } else { Theme::success().render("fixed") }; if repair.fts_rebuilt { println!(" {} FTS index rebuilt", action); } if repair.orphans_deleted > 0 { println!( " {} {} orphan embeddings deleted", action, repair.orphans_deleted ); } if repair.stale_cleared > 0 { println!( " {} {} stale metadata cleared", action, repair.stale_cleared ); } if !repair.fts_rebuilt && repair.orphans_deleted == 0 && repair.stale_cleared == 0 { println!(" {}", Theme::dim().render("nothing to repair")); } } } println!(); } #[derive(Serialize)] struct StatsJsonOutput { ok: bool, data: StatsResult, meta: RobotMeta, } pub fn print_stats_json(result: &StatsResult, elapsed_ms: u64) { let output = StatsJsonOutput { ok: true, data: StatsResult { documents: DocumentStats { ..result.documents }, embeddings: EmbeddingStats { ..result.embeddings }, fts: FtsStats { ..result.fts }, queues: QueueStats { ..result.queues }, integrity: result.integrity.as_ref().map(|i| IntegrityResult { ok: i.ok, fts_doc_mismatch: i.fts_doc_mismatch, orphan_embeddings: i.orphan_embeddings, stale_metadata: i.stale_metadata, orphan_state_events: i.orphan_state_events, orphan_label_events: i.orphan_label_events, orphan_milestone_events: i.orphan_milestone_events, queue_stuck_locks: i.queue_stuck_locks, queue_max_attempts: i.queue_max_attempts, repair: i.repair.as_ref().map(|r| RepairResult { fts_rebuilt: r.fts_rebuilt, orphans_deleted: r.orphans_deleted, stale_cleared: r.stale_cleared, dry_run: r.dry_run, }), }), }, meta: RobotMeta { elapsed_ms }, }; println!("{}", serde_json::to_string(&output).unwrap()); }