use console::style; use rusqlite::Connection; use serde::Serialize; use crate::Config; use crate::cli::robot::RobotMeta; use crate::core::db::create_connection; use crate::core::error::Result; use crate::core::paths::get_db_path; #[derive(Debug, Default, Serialize)] pub struct StatsResult { pub documents: DocumentStats, pub embeddings: EmbeddingStats, pub fts: FtsStats, pub queues: QueueStats, #[serde(skip_serializing_if = "Option::is_none")] pub integrity: Option, } #[derive(Debug, Default, Serialize)] pub struct DocumentStats { pub total: i64, pub issues: i64, pub merge_requests: i64, pub discussions: i64, pub truncated: i64, } #[derive(Debug, Default, Serialize)] pub struct EmbeddingStats { pub embedded_documents: i64, pub total_chunks: i64, pub coverage_pct: f64, } #[derive(Debug, Default, Serialize)] pub struct FtsStats { pub indexed: i64, } #[derive(Debug, Default, Serialize)] pub struct QueueStats { pub dirty_sources: i64, pub dirty_sources_failed: i64, pub pending_discussion_fetches: i64, pub pending_discussion_fetches_failed: i64, pub pending_dependent_fetches: i64, pub pending_dependent_fetches_failed: i64, pub pending_dependent_fetches_stuck: i64, } #[derive(Debug, Default, Serialize)] pub struct IntegrityResult { pub ok: bool, pub fts_doc_mismatch: bool, pub orphan_embeddings: i64, pub stale_metadata: i64, pub orphan_state_events: i64, pub orphan_label_events: i64, pub orphan_milestone_events: i64, pub queue_stuck_locks: i64, pub queue_max_attempts: i64, #[serde(skip_serializing_if = "Option::is_none")] pub repair: Option, } #[derive(Debug, Default, Serialize)] pub struct RepairResult { pub fts_rebuilt: bool, pub orphans_deleted: i64, pub stale_cleared: i64, pub dry_run: bool, } pub fn run_stats(config: &Config, check: bool, repair: bool, dry_run: bool) -> Result { let db_path = get_db_path(config.storage.db_path.as_deref()); let conn = create_connection(&db_path)?; let mut result = StatsResult::default(); // Single-scan conditional aggregate: 5 sequential COUNT(*) → 1 table scan let (total, issues, mrs, discussions, truncated) = conn .query_row( "SELECT COUNT(*), COALESCE(SUM(CASE WHEN source_type = 'issue' THEN 1 END), 0), COALESCE(SUM(CASE WHEN source_type = 'merge_request' THEN 1 END), 0), COALESCE(SUM(CASE WHEN source_type = 'discussion' THEN 1 END), 0), COALESCE(SUM(CASE WHEN is_truncated = 1 THEN 1 END), 0) FROM documents", [], |row| { Ok(( row.get::<_, i64>(0)?, row.get::<_, i64>(1)?, row.get::<_, i64>(2)?, row.get::<_, i64>(3)?, row.get::<_, i64>(4)?, )) }, ) .unwrap_or((0, 0, 0, 0, 0)); result.documents.total = total; result.documents.issues = issues; result.documents.merge_requests = mrs; result.documents.discussions = discussions; result.documents.truncated = truncated; if table_exists(&conn, "embedding_metadata") { // Single scan: COUNT(DISTINCT) + COUNT(*) in one pass let (embedded, chunks) = conn .query_row( "SELECT COUNT(DISTINCT document_id), COUNT(*) FROM embedding_metadata WHERE last_error IS NULL", [], |row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?)), ) .unwrap_or((0, 0)); result.embeddings.embedded_documents = embedded; result.embeddings.total_chunks = chunks; result.embeddings.coverage_pct = if result.documents.total > 0 { (embedded as f64 / result.documents.total as f64) * 100.0 } else { 0.0 }; } // FTS5 shadow table is a regular B-tree with one row per document — // 19x faster than scanning the virtual table for COUNT(*) result.fts.indexed = count_query(&conn, "SELECT COUNT(*) FROM documents_fts_docsize")?; // Single scan: 2 conditional counts on dirty_sources let (ds_pending, ds_failed) = conn .query_row( "SELECT COALESCE(SUM(CASE WHEN last_error IS NULL THEN 1 END), 0), COALESCE(SUM(CASE WHEN last_error IS NOT NULL THEN 1 END), 0) FROM dirty_sources", [], |row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?)), ) .unwrap_or((0, 0)); result.queues.dirty_sources = ds_pending; result.queues.dirty_sources_failed = ds_failed; if table_exists(&conn, "pending_discussion_fetches") { let (pdf_pending, pdf_failed) = conn .query_row( "SELECT COALESCE(SUM(CASE WHEN last_error IS NULL THEN 1 END), 0), COALESCE(SUM(CASE WHEN last_error IS NOT NULL THEN 1 END), 0) FROM pending_discussion_fetches", [], |row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?)), ) .unwrap_or((0, 0)); result.queues.pending_discussion_fetches = pdf_pending; result.queues.pending_discussion_fetches_failed = pdf_failed; } if table_exists(&conn, "pending_dependent_fetches") { let (pf_pending, pf_failed, pf_stuck) = conn .query_row( "SELECT COALESCE(SUM(CASE WHEN last_error IS NULL THEN 1 END), 0), COALESCE(SUM(CASE WHEN last_error IS NOT NULL THEN 1 END), 0), COALESCE(SUM(CASE WHEN locked_at IS NOT NULL THEN 1 END), 0) FROM pending_dependent_fetches", [], |row| { Ok(( row.get::<_, i64>(0)?, row.get::<_, i64>(1)?, row.get::<_, i64>(2)?, )) }, ) .unwrap_or((0, 0, 0)); result.queues.pending_dependent_fetches = pf_pending; result.queues.pending_dependent_fetches_failed = pf_failed; result.queues.pending_dependent_fetches_stuck = pf_stuck; } #[allow(clippy::field_reassign_with_default)] if check { let mut integrity = IntegrityResult::default(); integrity.fts_doc_mismatch = result.fts.indexed != result.documents.total; if table_exists(&conn, "embeddings") { integrity.orphan_embeddings = count_query( &conn, "SELECT COUNT(*) FROM embedding_metadata em WHERE NOT EXISTS (SELECT 1 FROM documents d WHERE d.id = em.document_id)", )?; } if table_exists(&conn, "embedding_metadata") { integrity.stale_metadata = count_query( &conn, "SELECT COUNT(*) FROM embedding_metadata em JOIN documents d ON d.id = em.document_id WHERE em.chunk_index = 0 AND em.document_hash != d.content_hash", )?; } if table_exists(&conn, "resource_state_events") { integrity.orphan_state_events = count_query( &conn, "SELECT COUNT(*) FROM resource_state_events rse WHERE (rse.issue_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM issues i WHERE i.id = rse.issue_id)) OR (rse.merge_request_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM merge_requests m WHERE m.id = rse.merge_request_id))", )?; } if table_exists(&conn, "resource_label_events") { integrity.orphan_label_events = count_query( &conn, "SELECT COUNT(*) FROM resource_label_events rle WHERE (rle.issue_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM issues i WHERE i.id = rle.issue_id)) OR (rle.merge_request_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM merge_requests m WHERE m.id = rle.merge_request_id))", )?; } if table_exists(&conn, "resource_milestone_events") { integrity.orphan_milestone_events = count_query( &conn, "SELECT COUNT(*) FROM resource_milestone_events rme WHERE (rme.issue_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM issues i WHERE i.id = rme.issue_id)) OR (rme.merge_request_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM merge_requests m WHERE m.id = rme.merge_request_id))", )?; } if table_exists(&conn, "pending_dependent_fetches") { integrity.queue_stuck_locks = count_query( &conn, "SELECT COUNT(*) FROM pending_dependent_fetches WHERE locked_at IS NOT NULL", )?; integrity.queue_max_attempts = count_query( &conn, "SELECT COALESCE(MAX(attempts), 0) FROM pending_dependent_fetches", )?; } let orphan_events = integrity.orphan_state_events + integrity.orphan_label_events + integrity.orphan_milestone_events; integrity.ok = !integrity.fts_doc_mismatch && integrity.orphan_embeddings == 0 && integrity.stale_metadata == 0 && orphan_events == 0; if repair { let mut repair_result = RepairResult::default(); repair_result.dry_run = dry_run; if integrity.fts_doc_mismatch { if !dry_run { conn.execute( "INSERT INTO documents_fts(documents_fts) VALUES('rebuild')", [], )?; } repair_result.fts_rebuilt = true; } if integrity.orphan_embeddings > 0 && table_exists(&conn, "embedding_metadata") { if !dry_run { let deleted = conn.execute( "DELETE FROM embedding_metadata WHERE NOT EXISTS (SELECT 1 FROM documents d WHERE d.id = embedding_metadata.document_id)", [], )?; repair_result.orphans_deleted = deleted as i64; if table_exists(&conn, "embeddings") { let _ = conn.execute( "DELETE FROM embeddings WHERE rowid / 1000 NOT IN (SELECT id FROM documents)", [], ); } } else { repair_result.orphans_deleted = integrity.orphan_embeddings; } } if integrity.stale_metadata > 0 && table_exists(&conn, "embedding_metadata") { if !dry_run { let cleared = conn.execute( "DELETE FROM embedding_metadata WHERE document_id IN ( SELECT em.document_id FROM embedding_metadata em JOIN documents d ON d.id = em.document_id WHERE em.chunk_index = 0 AND em.document_hash != d.content_hash )", [], )?; repair_result.stale_cleared = cleared as i64; } else { repair_result.stale_cleared = integrity.stale_metadata; } } integrity.repair = Some(repair_result); } result.integrity = Some(integrity); } Ok(result) } fn count_query(conn: &Connection, sql: &str) -> Result { let count: i64 = conn.query_row(sql, [], |row| row.get(0)).unwrap_or(0); Ok(count) } fn table_exists(conn: &Connection, table: &str) -> bool { conn.query_row( "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1", [table], |row| row.get::<_, i64>(0), ) .unwrap_or(0) > 0 } pub fn print_stats(result: &StatsResult) { println!("{}", style("Documents").cyan().bold()); println!(" Total: {}", result.documents.total); println!(" Issues: {}", result.documents.issues); println!(" Merge Requests: {}", result.documents.merge_requests); println!(" Discussions: {}", result.documents.discussions); if result.documents.truncated > 0 { println!( " Truncated: {}", style(result.documents.truncated).yellow() ); } println!(); println!("{}", style("Search Index").cyan().bold()); println!(" FTS indexed: {}", result.fts.indexed); println!( " Embedding coverage: {:.1}% ({}/{})", result.embeddings.coverage_pct, result.embeddings.embedded_documents, result.documents.total ); if result.embeddings.total_chunks > 0 { println!(" Total chunks: {}", result.embeddings.total_chunks); } println!(); println!("{}", style("Queues").cyan().bold()); println!( " Dirty sources: {} pending, {} failed", result.queues.dirty_sources, result.queues.dirty_sources_failed ); println!( " Discussion fetch: {} pending, {} failed", result.queues.pending_discussion_fetches, result.queues.pending_discussion_fetches_failed ); if result.queues.pending_dependent_fetches > 0 || result.queues.pending_dependent_fetches_failed > 0 || result.queues.pending_dependent_fetches_stuck > 0 { println!( " Dependent fetch: {} pending, {} failed, {} stuck", result.queues.pending_dependent_fetches, result.queues.pending_dependent_fetches_failed, result.queues.pending_dependent_fetches_stuck ); } if let Some(ref integrity) = result.integrity { println!(); let status = if integrity.ok { style("OK").green().bold() } else { style("ISSUES FOUND").red().bold() }; println!("{} Integrity: {}", style("Check").cyan().bold(), status); if integrity.fts_doc_mismatch { println!(" {} FTS/document count mismatch", style("!").red()); } if integrity.orphan_embeddings > 0 { println!( " {} {} orphan embeddings", style("!").red(), integrity.orphan_embeddings ); } if integrity.stale_metadata > 0 { println!( " {} {} stale embedding metadata", style("!").red(), integrity.stale_metadata ); } let orphan_events = integrity.orphan_state_events + integrity.orphan_label_events + integrity.orphan_milestone_events; if orphan_events > 0 { println!( " {} {} orphan resource events (state: {}, label: {}, milestone: {})", style("!").red(), orphan_events, integrity.orphan_state_events, integrity.orphan_label_events, integrity.orphan_milestone_events ); } if integrity.queue_stuck_locks > 0 { println!( " {} {} stuck queue locks", style("!").yellow(), integrity.queue_stuck_locks ); } if integrity.queue_max_attempts > 3 { println!( " {} max queue retry attempts: {}", style("!").yellow(), integrity.queue_max_attempts ); } if let Some(ref repair) = integrity.repair { println!(); if repair.dry_run { println!( "{} {}", style("Repair").cyan().bold(), style("(dry run - no changes made)").yellow() ); } else { println!("{}", style("Repair").cyan().bold()); } let action = if repair.dry_run { style("would fix").yellow() } else { style("fixed").green() }; if repair.fts_rebuilt { println!(" {} FTS index rebuilt", action); } if repair.orphans_deleted > 0 { println!( " {} {} orphan embeddings deleted", action, repair.orphans_deleted ); } if repair.stale_cleared > 0 { println!( " {} {} stale metadata entries cleared", action, repair.stale_cleared ); } if !repair.fts_rebuilt && repair.orphans_deleted == 0 && repair.stale_cleared == 0 { println!(" No issues to repair."); } } } } #[derive(Serialize)] struct StatsJsonOutput { ok: bool, data: StatsResult, meta: RobotMeta, } pub fn print_stats_json(result: &StatsResult, elapsed_ms: u64) { let output = StatsJsonOutput { ok: true, data: StatsResult { documents: DocumentStats { ..result.documents }, embeddings: EmbeddingStats { ..result.embeddings }, fts: FtsStats { ..result.fts }, queues: QueueStats { ..result.queues }, integrity: result.integrity.as_ref().map(|i| IntegrityResult { ok: i.ok, fts_doc_mismatch: i.fts_doc_mismatch, orphan_embeddings: i.orphan_embeddings, stale_metadata: i.stale_metadata, orphan_state_events: i.orphan_state_events, orphan_label_events: i.orphan_label_events, orphan_milestone_events: i.orphan_milestone_events, queue_stuck_locks: i.queue_stuck_locks, queue_max_attempts: i.queue_max_attempts, repair: i.repair.as_ref().map(|r| RepairResult { fts_rebuilt: r.fts_rebuilt, orphans_deleted: r.orphans_deleted, stale_cleared: r.stale_cleared, dry_run: r.dry_run, }), }), }, meta: RobotMeta { elapsed_ms }, }; println!("{}", serde_json::to_string(&output).unwrap()); }