diff --git a/src/core/backoff.rs b/src/core/backoff.rs index 99462c3..77ea430 100644 --- a/src/core/backoff.rs +++ b/src/core/backoff.rs @@ -1,7 +1,7 @@ use rand::Rng; pub fn compute_next_attempt_at(now: i64, attempt_count: i64) -> i64 { - let capped_attempts = attempt_count.min(30) as u32; + let capped_attempts = attempt_count.clamp(0, 30) as u32; let base_delay_ms = 1000_i64.saturating_mul(1 << capped_attempts); let capped_delay_ms = base_delay_ms.min(3_600_000); diff --git a/src/core/events_db.rs b/src/core/events_db.rs index 58c0900..be77fdb 100644 --- a/src/core/events_db.rs +++ b/src/core/events_db.rs @@ -141,42 +141,36 @@ fn resolve_entity_ids( pub fn count_events(conn: &Connection) -> Result { let mut counts = EventCounts::default(); - let row: (i64, i64) = conn - .query_row( - "SELECT + let row: (i64, i64) = conn.query_row( + "SELECT COUNT(CASE WHEN issue_id IS NOT NULL THEN 1 END), COUNT(CASE WHEN merge_request_id IS NOT NULL THEN 1 END) FROM resource_state_events", - [], - |row| Ok((row.get(0)?, row.get(1)?)), - ) - .unwrap_or((0, 0)); + [], + |row| Ok((row.get(0)?, row.get(1)?)), + )?; counts.state_issue = row.0 as usize; counts.state_mr = row.1 as usize; - let row: (i64, i64) = conn - .query_row( - "SELECT + let row: (i64, i64) = conn.query_row( + "SELECT COUNT(CASE WHEN issue_id IS NOT NULL THEN 1 END), COUNT(CASE WHEN merge_request_id IS NOT NULL THEN 1 END) FROM resource_label_events", - [], - |row| Ok((row.get(0)?, row.get(1)?)), - ) - .unwrap_or((0, 0)); + [], + |row| Ok((row.get(0)?, row.get(1)?)), + )?; counts.label_issue = row.0 as usize; counts.label_mr = row.1 as usize; - let row: (i64, i64) = conn - .query_row( - "SELECT + let row: (i64, i64) = conn.query_row( + "SELECT COUNT(CASE WHEN issue_id IS NOT NULL THEN 1 END), COUNT(CASE WHEN merge_request_id IS NOT NULL THEN 1 END) FROM resource_milestone_events", - [], - |row| Ok((row.get(0)?, row.get(1)?)), - ) - .unwrap_or((0, 0)); + [], + |row| Ok((row.get(0)?, row.get(1)?)), + )?; counts.milestone_issue = row.0 as usize; counts.milestone_mr = row.1 as usize; diff --git a/src/core/note_parser.rs b/src/core/note_parser.rs index d04e8a9..f7af7be 100644 --- a/src/core/note_parser.rs +++ b/src/core/note_parser.rs @@ -106,8 +106,7 @@ pub fn extract_refs_from_system_notes(conn: &Connection, project_id: i64) -> Res entity_id: row.get(3)?, }) })? - .filter_map(|r| r.ok()) - .collect(); + .collect::, _>>()?; if notes.is_empty() { return Ok(result); @@ -193,7 +192,10 @@ fn noteable_type_to_entity_type(noteable_type: &str) -> &str { match noteable_type { "Issue" => "issue", "MergeRequest" => "merge_request", - _ => "issue", + other => { + debug!(noteable_type = %other, "Unknown noteable_type, defaulting to issue"); + "issue" + } } } diff --git a/src/core/payloads.rs b/src/core/payloads.rs index 5012360..b8e6420 100644 --- a/src/core/payloads.rs +++ b/src/core/payloads.rs @@ -2,6 +2,7 @@ use flate2::Compression; use flate2::read::GzDecoder; use flate2::write::GzEncoder; use rusqlite::Connection; +use rusqlite::OptionalExtension; use sha2::{Digest, Sha256}; use std::io::{Read, Write}; @@ -35,7 +36,7 @@ pub fn store_payload(conn: &Connection, options: StorePayloadOptions) -> Result< ), |row| row.get(0), ) - .ok(); + .optional()?; if let Some(id) = existing { return Ok(id); @@ -74,7 +75,7 @@ pub fn read_payload(conn: &Connection, id: i64) -> Result i64 { } pub fn decode_rowid(rowid: i64) -> (i64, i64) { - debug_assert!( + assert!( rowid >= 0, "decode_rowid called with negative rowid: {rowid}" ); diff --git a/src/ingestion/dirty_tracker.rs b/src/ingestion/dirty_tracker.rs index 9809021..43eb3e4 100644 --- a/src/ingestion/dirty_tracker.rs +++ b/src/ingestion/dirty_tracker.rs @@ -1,4 +1,5 @@ use rusqlite::Connection; +use rusqlite::OptionalExtension; use crate::core::backoff::compute_next_attempt_at; use crate::core::error::Result; @@ -88,11 +89,17 @@ pub fn record_dirty_error( error: &str, ) -> Result<()> { let now = now_ms(); - let attempt_count: i64 = conn.query_row( - "SELECT attempt_count FROM dirty_sources WHERE source_type = ?1 AND source_id = ?2", - rusqlite::params![source_type.as_str(), source_id], - |row| row.get(0), - )?; + let attempt_count: Option = conn + .query_row( + "SELECT attempt_count FROM dirty_sources WHERE source_type = ?1 AND source_id = ?2", + rusqlite::params![source_type.as_str(), source_id], + |row| row.get(0), + ) + .optional()?; + + let Some(attempt_count) = attempt_count else { + return Ok(()); + }; let new_attempt = attempt_count + 1; let next_at = compute_next_attempt_at(now, new_attempt); diff --git a/src/search/vector.rs b/src/search/vector.rs index b7de7e9..c4d2bd5 100644 --- a/src/search/vector.rs +++ b/src/search/vector.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use rusqlite::Connection; +use rusqlite::OptionalExtension; use crate::core::error::Result; use crate::embedding::chunk_ids::decode_rowid; @@ -11,7 +12,7 @@ pub struct VectorResult { pub distance: f64, } -fn max_chunks_per_document(conn: &Connection) -> i64 { +fn max_chunks_per_document(conn: &Connection) -> Result { let stored: Option = conn .query_row( "SELECT MAX(chunk_count) FROM embedding_metadata @@ -19,21 +20,24 @@ fn max_chunks_per_document(conn: &Connection) -> i64 { [], |row| row.get(0), ) - .unwrap_or(None); + .optional()? + .flatten(); if let Some(max) = stored { - return max; + return Ok(max); } - conn.query_row( - "SELECT COALESCE(MAX(cnt), 1) FROM ( + Ok(conn + .query_row( + "SELECT COALESCE(MAX(cnt), 1) FROM ( SELECT COUNT(*) as cnt FROM embedding_metadata WHERE last_error IS NULL GROUP BY document_id )", - [], - |row| row.get(0), - ) - .unwrap_or(1) + [], + |row| row.get(0), + ) + .optional()? + .unwrap_or(1)) } pub fn search_vector( @@ -50,7 +54,7 @@ pub fn search_vector( .flat_map(|f| f.to_le_bytes()) .collect(); - let max_chunks = max_chunks_per_document(conn).max(1); + let max_chunks = max_chunks_per_document(conn)?.max(1); let multiplier = ((max_chunks.unsigned_abs() as usize * 3 / 2) + 1).clamp(8, 200); let k = (limit * multiplier).min(10_000);