fix: propagate DB errors instead of silently swallowing them

Replace .unwrap_or(), .ok(), and .filter_map(|r| r.ok()) patterns with
proper error propagation using ? and rusqlite::OptionalExtension where
the query may legitimately return no rows.

Affected areas:
- events_db::count_events: three count queries now propagate errors
  instead of defaulting to (0, 0) on failure
- note_parser::extract_refs_from_system_notes: row iteration errors
  are now propagated instead of silently dropped via filter_map
- note_parser::noteable_type_to_entity_type: unknown types now log a
  debug warning before defaulting to "issue"
- payloads::store_payload/read_payload: use .optional()? instead of
  .ok() to distinguish "no row" from "query failed"
- backoff::compute_next_attempt_at: use .clamp(0, 30) to guard against
  negative attempt_count, not just .min(30)
- search::vector::max_chunks_per_document: returns Result<i64> with
  proper error propagation through .optional()?.flatten()
- embedding::chunk_ids::decode_rowid: promote debug_assert to assert
  since negative rowids indicate data corruption worth failing fast on
- ingestion::dirty_tracker::record_dirty_error: use .optional()? to
  handle missing dirty_sources row gracefully instead of hard error

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Taylor Eernisse
2026-02-09 10:15:36 -05:00
parent 41504b4941
commit 53ef21d653
7 changed files with 51 additions and 43 deletions

View File

@@ -1,7 +1,7 @@
use rand::Rng;
pub fn compute_next_attempt_at(now: i64, attempt_count: i64) -> i64 {
let capped_attempts = attempt_count.min(30) as u32;
let capped_attempts = attempt_count.clamp(0, 30) as u32;
let base_delay_ms = 1000_i64.saturating_mul(1 << capped_attempts);
let capped_delay_ms = base_delay_ms.min(3_600_000);

View File

@@ -141,42 +141,36 @@ fn resolve_entity_ids(
pub fn count_events(conn: &Connection) -> Result<EventCounts> {
let mut counts = EventCounts::default();
let row: (i64, i64) = conn
.query_row(
let row: (i64, i64) = conn.query_row(
"SELECT
COUNT(CASE WHEN issue_id IS NOT NULL THEN 1 END),
COUNT(CASE WHEN merge_request_id IS NOT NULL THEN 1 END)
FROM resource_state_events",
[],
|row| Ok((row.get(0)?, row.get(1)?)),
)
.unwrap_or((0, 0));
)?;
counts.state_issue = row.0 as usize;
counts.state_mr = row.1 as usize;
let row: (i64, i64) = conn
.query_row(
let row: (i64, i64) = conn.query_row(
"SELECT
COUNT(CASE WHEN issue_id IS NOT NULL THEN 1 END),
COUNT(CASE WHEN merge_request_id IS NOT NULL THEN 1 END)
FROM resource_label_events",
[],
|row| Ok((row.get(0)?, row.get(1)?)),
)
.unwrap_or((0, 0));
)?;
counts.label_issue = row.0 as usize;
counts.label_mr = row.1 as usize;
let row: (i64, i64) = conn
.query_row(
let row: (i64, i64) = conn.query_row(
"SELECT
COUNT(CASE WHEN issue_id IS NOT NULL THEN 1 END),
COUNT(CASE WHEN merge_request_id IS NOT NULL THEN 1 END)
FROM resource_milestone_events",
[],
|row| Ok((row.get(0)?, row.get(1)?)),
)
.unwrap_or((0, 0));
)?;
counts.milestone_issue = row.0 as usize;
counts.milestone_mr = row.1 as usize;

View File

@@ -106,8 +106,7 @@ pub fn extract_refs_from_system_notes(conn: &Connection, project_id: i64) -> Res
entity_id: row.get(3)?,
})
})?
.filter_map(|r| r.ok())
.collect();
.collect::<std::result::Result<Vec<_>, _>>()?;
if notes.is_empty() {
return Ok(result);
@@ -193,7 +192,10 @@ fn noteable_type_to_entity_type(noteable_type: &str) -> &str {
match noteable_type {
"Issue" => "issue",
"MergeRequest" => "merge_request",
_ => "issue",
other => {
debug!(noteable_type = %other, "Unknown noteable_type, defaulting to issue");
"issue"
}
}
}

View File

@@ -2,6 +2,7 @@ use flate2::Compression;
use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use rusqlite::Connection;
use rusqlite::OptionalExtension;
use sha2::{Digest, Sha256};
use std::io::{Read, Write};
@@ -35,7 +36,7 @@ pub fn store_payload(conn: &Connection, options: StorePayloadOptions) -> Result<
),
|row| row.get(0),
)
.ok();
.optional()?;
if let Some(id) = existing {
return Ok(id);
@@ -74,7 +75,7 @@ pub fn read_payload(conn: &Connection, id: i64) -> Result<Option<serde_json::Val
[id],
|row| Ok((row.get(0)?, row.get(1)?)),
)
.ok();
.optional()?;
let Some((encoding, payload_bytes)) = row else {
return Ok(None);

View File

@@ -14,7 +14,7 @@ pub fn encode_rowid(document_id: i64, chunk_index: i64) -> i64 {
}
pub fn decode_rowid(rowid: i64) -> (i64, i64) {
debug_assert!(
assert!(
rowid >= 0,
"decode_rowid called with negative rowid: {rowid}"
);

View File

@@ -1,4 +1,5 @@
use rusqlite::Connection;
use rusqlite::OptionalExtension;
use crate::core::backoff::compute_next_attempt_at;
use crate::core::error::Result;
@@ -88,11 +89,17 @@ pub fn record_dirty_error(
error: &str,
) -> Result<()> {
let now = now_ms();
let attempt_count: i64 = conn.query_row(
let attempt_count: Option<i64> = conn
.query_row(
"SELECT attempt_count FROM dirty_sources WHERE source_type = ?1 AND source_id = ?2",
rusqlite::params![source_type.as_str(), source_id],
|row| row.get(0),
)?;
)
.optional()?;
let Some(attempt_count) = attempt_count else {
return Ok(());
};
let new_attempt = attempt_count + 1;
let next_at = compute_next_attempt_at(now, new_attempt);

View File

@@ -1,6 +1,7 @@
use std::collections::HashMap;
use rusqlite::Connection;
use rusqlite::OptionalExtension;
use crate::core::error::Result;
use crate::embedding::chunk_ids::decode_rowid;
@@ -11,7 +12,7 @@ pub struct VectorResult {
pub distance: f64,
}
fn max_chunks_per_document(conn: &Connection) -> i64 {
fn max_chunks_per_document(conn: &Connection) -> Result<i64> {
let stored: Option<i64> = conn
.query_row(
"SELECT MAX(chunk_count) FROM embedding_metadata
@@ -19,13 +20,15 @@ fn max_chunks_per_document(conn: &Connection) -> i64 {
[],
|row| row.get(0),
)
.unwrap_or(None);
.optional()?
.flatten();
if let Some(max) = stored {
return max;
return Ok(max);
}
conn.query_row(
Ok(conn
.query_row(
"SELECT COALESCE(MAX(cnt), 1) FROM (
SELECT COUNT(*) as cnt FROM embedding_metadata
WHERE last_error IS NULL GROUP BY document_id
@@ -33,7 +36,8 @@ fn max_chunks_per_document(conn: &Connection) -> i64 {
[],
|row| row.get(0),
)
.unwrap_or(1)
.optional()?
.unwrap_or(1))
}
pub fn search_vector(
@@ -50,7 +54,7 @@ pub fn search_vector(
.flat_map(|f| f.to_le_bytes())
.collect();
let max_chunks = max_chunks_per_document(conn).max(1);
let max_chunks = max_chunks_per_document(conn)?.max(1);
let multiplier = ((max_chunks.unsigned_abs() as usize * 3 / 2) + 1).clamp(8, 200);
let k = (limit * multiplier).min(10_000);