From 72f1cafdcf278d682731e495cd876a8a9d8f2d41 Mon Sep 17 00:00:00 2001 From: Taylor Eernisse Date: Thu, 5 Feb 2026 11:21:28 -0500 Subject: [PATCH] perf: Optimize SQL queries and reduce allocations in hot paths Change detection queries (embedding/change_detector.rs): - Replace triple-EXISTS subquery pattern with LEFT JOIN + NULL check - SQLite now scans embedding_metadata once instead of three times - Semantically identical: returns docs needing embedding when no embedding exists, hash changed, or config mismatch Count queries (cli/commands/count.rs): - Consolidate 3 separate COUNT queries for issues into single query using conditional aggregation (CASE WHEN state = 'x' THEN 1) - Same optimization for MRs: 5 queries reduced to 1 Search filter queries (search/filters.rs): - Replace N separate EXISTS clauses for label filtering with single IN() clause with COUNT/GROUP BY HAVING pattern - For multi-label AND queries, this reduces N subqueries to 1 FTS tokenization (search/fts.rs): - Replace collect-into-Vec-then-join pattern with direct String building - Pre-allocate capacity hint for result string Discussion truncation (documents/truncation.rs): - Calculate total length without allocating concatenated string first - Only allocate full string when we know it fits within limit Embedding pipeline (embedding/pipeline.rs): - Add Vec::with_capacity hints for chunk work and cleared_docs hashset - Reduces reallocations during embedding batch processing Backoff calculation (core/backoff.rs): - Replace unchecked addition with saturating_add to prevent overflow - Add test case verifying overflow protection Co-Authored-By: Claude Opus 4.5 --- src/cli/commands/count.rs | 60 +++++++++++++----------------- src/core/backoff.rs | 9 ++++- src/documents/truncation.rs | 10 +++-- src/embedding/change_detector.rs | 63 ++++++++++++-------------------- src/embedding/pipeline.rs | 4 +- src/search/filters.rs | 16 +++++--- src/search/fts.rs | 35 ++++++++++-------- 7 files changed, 96 insertions(+), 101 deletions(-) diff --git a/src/cli/commands/count.rs b/src/cli/commands/count.rs index 668afcb..5a1c3b4 100644 --- a/src/cli/commands/count.rs +++ b/src/cli/commands/count.rs @@ -41,18 +41,15 @@ pub fn run_count(config: &Config, entity: &str, type_filter: Option<&str>) -> Re } fn count_issues(conn: &Connection) -> Result { - let count: i64 = conn.query_row("SELECT COUNT(*) FROM issues", [], |row| row.get(0))?; - - let opened: i64 = conn.query_row( - "SELECT COUNT(*) FROM issues WHERE state = 'opened'", + // Single query with conditional aggregation instead of 3 separate queries + let (count, opened, closed): (i64, i64, i64) = conn.query_row( + "SELECT + COUNT(*), + COALESCE(SUM(CASE WHEN state = 'opened' THEN 1 ELSE 0 END), 0), + COALESCE(SUM(CASE WHEN state = 'closed' THEN 1 ELSE 0 END), 0) + FROM issues", [], - |row| row.get(0), - )?; - - let closed: i64 = conn.query_row( - "SELECT COUNT(*) FROM issues WHERE state = 'closed'", - [], - |row| row.get(0), + |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)), )?; Ok(CountResult { @@ -69,30 +66,25 @@ fn count_issues(conn: &Connection) -> Result { } fn count_mrs(conn: &Connection) -> Result { - let count: i64 = conn.query_row("SELECT COUNT(*) FROM merge_requests", [], |row| row.get(0))?; - - let opened: i64 = conn.query_row( - "SELECT COUNT(*) FROM merge_requests WHERE state = 'opened'", + // Single query with conditional aggregation instead of 5 separate queries + let (count, opened, merged, closed, locked): (i64, i64, i64, i64, i64) = conn.query_row( + "SELECT + COUNT(*), + COALESCE(SUM(CASE WHEN state = 'opened' THEN 1 ELSE 0 END), 0), + COALESCE(SUM(CASE WHEN state = 'merged' THEN 1 ELSE 0 END), 0), + COALESCE(SUM(CASE WHEN state = 'closed' THEN 1 ELSE 0 END), 0), + COALESCE(SUM(CASE WHEN state = 'locked' THEN 1 ELSE 0 END), 0) + FROM merge_requests", [], - |row| row.get(0), - )?; - - let merged: i64 = conn.query_row( - "SELECT COUNT(*) FROM merge_requests WHERE state = 'merged'", - [], - |row| row.get(0), - )?; - - let closed: i64 = conn.query_row( - "SELECT COUNT(*) FROM merge_requests WHERE state = 'closed'", - [], - |row| row.get(0), - )?; - - let locked: i64 = conn.query_row( - "SELECT COUNT(*) FROM merge_requests WHERE state = 'locked'", - [], - |row| row.get(0), + |row| { + Ok(( + row.get(0)?, + row.get(1)?, + row.get(2)?, + row.get(3)?, + row.get(4)?, + )) + }, )?; Ok(CountResult { diff --git a/src/core/backoff.rs b/src/core/backoff.rs index ffd3981..99462c3 100644 --- a/src/core/backoff.rs +++ b/src/core/backoff.rs @@ -8,7 +8,7 @@ pub fn compute_next_attempt_at(now: i64, attempt_count: i64) -> i64 { let jitter_factor = rand::thread_rng().gen_range(0.9..=1.1); let delay_with_jitter = (capped_delay_ms as f64 * jitter_factor) as i64; - now + delay_with_jitter + now.saturating_add(delay_with_jitter) } #[cfg(test)] @@ -82,4 +82,11 @@ mod tests { let result = compute_next_attempt_at(now, i64::MAX); assert!(result > now); } + + #[test] + fn test_saturating_add_prevents_overflow() { + let now = i64::MAX - 10; + let result = compute_next_attempt_at(now, 30); + assert_eq!(result, i64::MAX); + } } diff --git a/src/documents/truncation.rs b/src/documents/truncation.rs index 425e73b..5c08359 100644 --- a/src/documents/truncation.rs +++ b/src/documents/truncation.rs @@ -58,9 +58,13 @@ pub fn truncate_discussion(notes: &[NoteContent], max_bytes: usize) -> Truncatio } let formatted: Vec = notes.iter().map(format_note).collect(); - let total: String = formatted.concat(); + let total_len: usize = formatted.iter().map(|s| s.len()).sum(); - if total.len() <= max_bytes { + if total_len <= max_bytes { + let mut total = String::with_capacity(total_len); + for s in &formatted { + total.push_str(s); + } return TruncationResult { content: total, is_truncated: false, @@ -69,7 +73,7 @@ pub fn truncate_discussion(notes: &[NoteContent], max_bytes: usize) -> Truncatio } if notes.len() == 1 { - let truncated = truncate_utf8(&total, max_bytes.saturating_sub(11)); + let truncated = truncate_utf8(&formatted[0], max_bytes.saturating_sub(11)); let content = format!("{}[truncated]", truncated); return TruncationResult { content, diff --git a/src/embedding/change_detector.rs b/src/embedding/change_detector.rs index 7926b50..4541c82 100644 --- a/src/embedding/change_detector.rs +++ b/src/embedding/change_detector.rs @@ -16,30 +16,25 @@ pub fn find_pending_documents( last_id: i64, model_name: &str, ) -> Result> { + // Optimized query: LEFT JOIN + NULL check replaces triple-EXISTS pattern. + // This allows SQLite to scan embedding_metadata once instead of three times. + // Semantically identical: returns documents needing (re-)embedding when: + // - No embedding exists (em.document_id IS NULL) + // - Content hash changed (em.document_hash != d.content_hash) + // - Config mismatch (model/dims/chunk_max_bytes) let sql = r#" SELECT d.id, d.content_text, d.content_hash FROM documents d + LEFT JOIN embedding_metadata em + ON em.document_id = d.id AND em.chunk_index = 0 WHERE d.id > ?1 AND ( - NOT EXISTS ( - SELECT 1 FROM embedding_metadata em - WHERE em.document_id = d.id AND em.chunk_index = 0 - ) - OR EXISTS ( - SELECT 1 FROM embedding_metadata em - WHERE em.document_id = d.id AND em.chunk_index = 0 - AND em.document_hash != d.content_hash - ) - OR EXISTS ( - SELECT 1 FROM embedding_metadata em - WHERE em.document_id = d.id AND em.chunk_index = 0 - AND ( - em.chunk_max_bytes IS NULL - OR em.chunk_max_bytes != ?3 - OR em.model != ?4 - OR em.dims != ?5 - ) - ) + em.document_id IS NULL + OR em.document_hash != d.content_hash + OR em.chunk_max_bytes IS NULL + OR em.chunk_max_bytes != ?3 + OR em.model != ?4 + OR em.dims != ?5 ) ORDER BY d.id LIMIT ?2 @@ -69,31 +64,19 @@ pub fn find_pending_documents( } pub fn count_pending_documents(conn: &Connection, model_name: &str) -> Result { + // Optimized query: LEFT JOIN + NULL check replaces triple-EXISTS pattern let count: i64 = conn.query_row( r#" SELECT COUNT(*) FROM documents d - WHERE ( - NOT EXISTS ( - SELECT 1 FROM embedding_metadata em - WHERE em.document_id = d.id AND em.chunk_index = 0 - ) - OR EXISTS ( - SELECT 1 FROM embedding_metadata em - WHERE em.document_id = d.id AND em.chunk_index = 0 - AND em.document_hash != d.content_hash - ) - OR EXISTS ( - SELECT 1 FROM embedding_metadata em - WHERE em.document_id = d.id AND em.chunk_index = 0 - AND ( - em.chunk_max_bytes IS NULL - OR em.chunk_max_bytes != ?1 - OR em.model != ?2 - OR em.dims != ?3 - ) - ) - ) + LEFT JOIN embedding_metadata em + ON em.document_id = d.id AND em.chunk_index = 0 + WHERE em.document_id IS NULL + OR em.document_hash != d.content_hash + OR em.chunk_max_bytes IS NULL + OR em.chunk_max_bytes != ?1 + OR em.model != ?2 + OR em.dims != ?3 "#, rusqlite::params![CHUNK_MAX_BYTES as i64, model_name, EXPECTED_DIMS as i64], |row| row.get(0), diff --git a/src/embedding/pipeline.rs b/src/embedding/pipeline.rs index 876ebd6..b2f0ec8 100644 --- a/src/embedding/pipeline.rs +++ b/src/embedding/pipeline.rs @@ -103,7 +103,7 @@ async fn embed_page( total: usize, progress_callback: &Option>, ) -> Result<()> { - let mut all_chunks: Vec = Vec::new(); + let mut all_chunks: Vec = Vec::with_capacity(pending.len() * 3); let mut page_normal_docs: usize = 0; for doc in pending { @@ -159,7 +159,7 @@ async fn embed_page( page_normal_docs += 1; } - let mut cleared_docs: HashSet = HashSet::new(); + let mut cleared_docs: HashSet = HashSet::with_capacity(pending.len()); for batch in all_chunks.chunks(BATCH_SIZE) { let texts: Vec = batch.iter().map(|c| c.text.clone()).collect(); diff --git a/src/search/filters.rs b/src/search/filters.rs index 2c30f8d..c615fe8 100644 --- a/src/search/filters.rs +++ b/src/search/filters.rs @@ -97,13 +97,19 @@ pub fn apply_filters( param_idx += 1; } - for label in &filters.labels { + if !filters.labels.is_empty() { + let placeholders: Vec = (0..filters.labels.len()) + .map(|i| format!("?{}", param_idx + i)) + .collect(); sql.push_str(&format!( - " AND EXISTS (SELECT 1 FROM document_labels dl WHERE dl.document_id = d.id AND dl.label_name = ?{})", - param_idx + " AND EXISTS (SELECT 1 FROM document_labels dl WHERE dl.document_id = d.id AND dl.label_name IN ({}) GROUP BY dl.document_id HAVING COUNT(DISTINCT dl.label_name) = {})", + placeholders.join(","), + filters.labels.len() )); - params.push(Box::new(label.clone())); - param_idx += 1; + for label in &filters.labels { + params.push(Box::new(label.clone())); + param_idx += 1; + } } if let Some(ref path_filter) = filters.path { diff --git a/src/search/fts.rs b/src/search/fts.rs index 1277726..f8a231c 100644 --- a/src/search/fts.rs +++ b/src/search/fts.rs @@ -23,22 +23,25 @@ pub fn to_fts_query(raw: &str, mode: FtsQueryMode) -> String { return String::new(); } - let tokens: Vec = trimmed - .split_whitespace() - .map(|token| { - if let Some(stem) = token.strip_suffix('*') - && !stem.is_empty() - && stem.chars().all(|c| c.is_alphanumeric() || c == '_') - { - let escaped = stem.replace('"', "\"\""); - return format!("\"{}\"*", escaped); - } - let escaped = token.replace('"', "\"\""); - format!("\"{}\"", escaped) - }) - .collect(); - - tokens.join(" ") + let mut result = String::with_capacity(trimmed.len() + 20); + for (i, token) in trimmed.split_whitespace().enumerate() { + if i > 0 { + result.push(' '); + } + if let Some(stem) = token.strip_suffix('*') + && !stem.is_empty() + && stem.chars().all(|c| c.is_alphanumeric() || c == '_') + { + result.push('"'); + result.push_str(&stem.replace('"', "\"\"")); + result.push_str("\"*"); + } else { + result.push('"'); + result.push_str(&token.replace('"', "\"\"")); + result.push('"'); + } + } + result } } }