feat(stats): Extend --check with event FK integrity and queue health diagnostics

Adds two new categories of integrity checks to 'lore stats --check':

Event FK integrity (3 queries):
- Detects orphaned resource_state_events where issue_id or
  merge_request_id points to a non-existent parent entity
- Same check for resource_label_events and resource_milestone_events
- Under normal CASCADE operation these should always be zero; non-zero
  indicates manual DB edits, bugs, or partial migration state

Queue health diagnostics:
- pending_dependent_fetches counts: pending, failed, and stuck (locked)
- queue_stuck_locks: Jobs with locked_at set (potential worker crashes)
- queue_max_attempts: Highest retry count across all jobs (signals
  permanently failing jobs when > 3)

New IntegrityResult fields: orphan_state_events, orphan_label_events,
orphan_milestone_events, queue_stuck_locks, queue_max_attempts.

New QueueStats fields: pending_dependent_fetches,
pending_dependent_fetches_failed, pending_dependent_fetches_stuck.

Human output shows colored PASS/WARN/FAIL indicators:
- Red "!" for orphaned events (integrity failure)
- Yellow "!" for stuck locks and high retry counts (warnings)
- Dependent fetch queue line only shown when non-zero

All new queries are guarded by table_exists() checks for graceful
degradation on databases without migration 011 applied.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Taylor Eernisse
2026-02-03 12:08:15 -05:00
parent 12811683ca
commit 0236ef2776

View File

@@ -47,6 +47,9 @@ pub struct QueueStats {
pub dirty_sources_failed: i64, pub dirty_sources_failed: i64,
pub pending_discussion_fetches: i64, pub pending_discussion_fetches: i64,
pub pending_discussion_fetches_failed: i64, pub pending_discussion_fetches_failed: i64,
pub pending_dependent_fetches: i64,
pub pending_dependent_fetches_failed: i64,
pub pending_dependent_fetches_stuck: i64,
} }
#[derive(Debug, Default, Serialize)] #[derive(Debug, Default, Serialize)]
@@ -55,6 +58,11 @@ pub struct IntegrityResult {
pub fts_doc_mismatch: bool, pub fts_doc_mismatch: bool,
pub orphan_embeddings: i64, pub orphan_embeddings: i64,
pub stale_metadata: i64, pub stale_metadata: i64,
pub orphan_state_events: i64,
pub orphan_label_events: i64,
pub orphan_milestone_events: i64,
pub queue_stuck_locks: i64,
pub queue_max_attempts: i64,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub repair: Option<RepairResult>, pub repair: Option<RepairResult>,
} }
@@ -127,6 +135,21 @@ pub fn run_stats(
)?; )?;
} }
if table_exists(&conn, "pending_dependent_fetches") {
result.queues.pending_dependent_fetches = count_query(
&conn,
"SELECT COUNT(*) FROM pending_dependent_fetches WHERE last_error IS NULL",
)?;
result.queues.pending_dependent_fetches_failed = count_query(
&conn,
"SELECT COUNT(*) FROM pending_dependent_fetches WHERE last_error IS NOT NULL",
)?;
result.queues.pending_dependent_fetches_stuck = count_query(
&conn,
"SELECT COUNT(*) FROM pending_dependent_fetches WHERE locked_at IS NOT NULL",
)?;
}
// Integrity check // Integrity check
if check { if check {
let mut integrity = IntegrityResult::default(); let mut integrity = IntegrityResult::default();
@@ -153,9 +176,52 @@ pub fn run_stats(
)?; )?;
} }
// Orphaned resource events (FK targets missing)
if table_exists(&conn, "resource_state_events") {
integrity.orphan_state_events = count_query(
&conn,
"SELECT COUNT(*) FROM resource_state_events rse
WHERE (rse.issue_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM issues i WHERE i.id = rse.issue_id))
OR (rse.merge_request_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM merge_requests m WHERE m.id = rse.merge_request_id))",
)?;
}
if table_exists(&conn, "resource_label_events") {
integrity.orphan_label_events = count_query(
&conn,
"SELECT COUNT(*) FROM resource_label_events rle
WHERE (rle.issue_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM issues i WHERE i.id = rle.issue_id))
OR (rle.merge_request_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM merge_requests m WHERE m.id = rle.merge_request_id))",
)?;
}
if table_exists(&conn, "resource_milestone_events") {
integrity.orphan_milestone_events = count_query(
&conn,
"SELECT COUNT(*) FROM resource_milestone_events rme
WHERE (rme.issue_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM issues i WHERE i.id = rme.issue_id))
OR (rme.merge_request_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM merge_requests m WHERE m.id = rme.merge_request_id))",
)?;
}
// Queue health: stuck locks and max retry attempts
if table_exists(&conn, "pending_dependent_fetches") {
integrity.queue_stuck_locks = count_query(
&conn,
"SELECT COUNT(*) FROM pending_dependent_fetches WHERE locked_at IS NOT NULL",
)?;
integrity.queue_max_attempts = count_query(
&conn,
"SELECT COALESCE(MAX(attempts), 0) FROM pending_dependent_fetches",
)?;
}
let orphan_events = integrity.orphan_state_events
+ integrity.orphan_label_events
+ integrity.orphan_milestone_events;
integrity.ok = !integrity.fts_doc_mismatch integrity.ok = !integrity.fts_doc_mismatch
&& integrity.orphan_embeddings == 0 && integrity.orphan_embeddings == 0
&& integrity.stale_metadata == 0; && integrity.stale_metadata == 0
&& orphan_events == 0;
// Repair // Repair
if repair { if repair {
@@ -260,6 +326,17 @@ pub fn print_stats(result: &StatsResult) {
result.queues.pending_discussion_fetches, result.queues.pending_discussion_fetches,
result.queues.pending_discussion_fetches_failed result.queues.pending_discussion_fetches_failed
); );
if result.queues.pending_dependent_fetches > 0
|| result.queues.pending_dependent_fetches_failed > 0
|| result.queues.pending_dependent_fetches_stuck > 0
{
println!(
" Dependent fetch: {} pending, {} failed, {} stuck",
result.queues.pending_dependent_fetches,
result.queues.pending_dependent_fetches_failed,
result.queues.pending_dependent_fetches_stuck
);
}
if let Some(ref integrity) = result.integrity { if let Some(ref integrity) = result.integrity {
println!(); println!();
@@ -287,6 +364,33 @@ pub fn print_stats(result: &StatsResult) {
integrity.stale_metadata integrity.stale_metadata
); );
} }
let orphan_events = integrity.orphan_state_events
+ integrity.orphan_label_events
+ integrity.orphan_milestone_events;
if orphan_events > 0 {
println!(
" {} {} orphan resource events (state: {}, label: {}, milestone: {})",
style("!").red(),
orphan_events,
integrity.orphan_state_events,
integrity.orphan_label_events,
integrity.orphan_milestone_events
);
}
if integrity.queue_stuck_locks > 0 {
println!(
" {} {} stuck queue locks",
style("!").yellow(),
integrity.queue_stuck_locks
);
}
if integrity.queue_max_attempts > 3 {
println!(
" {} max queue retry attempts: {}",
style("!").yellow(),
integrity.queue_max_attempts
);
}
if let Some(ref repair) = integrity.repair { if let Some(ref repair) = integrity.repair {
println!(); println!();
@@ -336,6 +440,11 @@ pub fn print_stats_json(result: &StatsResult) {
fts_doc_mismatch: i.fts_doc_mismatch, fts_doc_mismatch: i.fts_doc_mismatch,
orphan_embeddings: i.orphan_embeddings, orphan_embeddings: i.orphan_embeddings,
stale_metadata: i.stale_metadata, stale_metadata: i.stale_metadata,
orphan_state_events: i.orphan_state_events,
orphan_label_events: i.orphan_label_events,
orphan_milestone_events: i.orphan_milestone_events,
queue_stuck_locks: i.queue_stuck_locks,
queue_max_attempts: i.queue_max_attempts,
repair: i.repair.as_ref().map(|r| RepairResult { repair: i.repair.as_ref().map(|r| RepairResult {
fts_rebuilt: r.fts_rebuilt, fts_rebuilt: r.fts_rebuilt,
orphans_deleted: r.orphans_deleted, orphans_deleted: r.orphans_deleted,