perf: Optimize SQL queries and reduce allocations in hot paths
Change detection queries (embedding/change_detector.rs): - Replace triple-EXISTS subquery pattern with LEFT JOIN + NULL check - SQLite now scans embedding_metadata once instead of three times - Semantically identical: returns docs needing embedding when no embedding exists, hash changed, or config mismatch Count queries (cli/commands/count.rs): - Consolidate 3 separate COUNT queries for issues into single query using conditional aggregation (CASE WHEN state = 'x' THEN 1) - Same optimization for MRs: 5 queries reduced to 1 Search filter queries (search/filters.rs): - Replace N separate EXISTS clauses for label filtering with single IN() clause with COUNT/GROUP BY HAVING pattern - For multi-label AND queries, this reduces N subqueries to 1 FTS tokenization (search/fts.rs): - Replace collect-into-Vec-then-join pattern with direct String building - Pre-allocate capacity hint for result string Discussion truncation (documents/truncation.rs): - Calculate total length without allocating concatenated string first - Only allocate full string when we know it fits within limit Embedding pipeline (embedding/pipeline.rs): - Add Vec::with_capacity hints for chunk work and cleared_docs hashset - Reduces reallocations during embedding batch processing Backoff calculation (core/backoff.rs): - Replace unchecked addition with saturating_add to prevent overflow - Add test case verifying overflow protection Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -41,18 +41,15 @@ pub fn run_count(config: &Config, entity: &str, type_filter: Option<&str>) -> Re
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn count_issues(conn: &Connection) -> Result<CountResult> {
|
fn count_issues(conn: &Connection) -> Result<CountResult> {
|
||||||
let count: i64 = conn.query_row("SELECT COUNT(*) FROM issues", [], |row| row.get(0))?;
|
// Single query with conditional aggregation instead of 3 separate queries
|
||||||
|
let (count, opened, closed): (i64, i64, i64) = conn.query_row(
|
||||||
let opened: i64 = conn.query_row(
|
"SELECT
|
||||||
"SELECT COUNT(*) FROM issues WHERE state = 'opened'",
|
COUNT(*),
|
||||||
|
COALESCE(SUM(CASE WHEN state = 'opened' THEN 1 ELSE 0 END), 0),
|
||||||
|
COALESCE(SUM(CASE WHEN state = 'closed' THEN 1 ELSE 0 END), 0)
|
||||||
|
FROM issues",
|
||||||
[],
|
[],
|
||||||
|row| row.get(0),
|
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
|
||||||
)?;
|
|
||||||
|
|
||||||
let closed: i64 = conn.query_row(
|
|
||||||
"SELECT COUNT(*) FROM issues WHERE state = 'closed'",
|
|
||||||
[],
|
|
||||||
|row| row.get(0),
|
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
Ok(CountResult {
|
Ok(CountResult {
|
||||||
@@ -69,30 +66,25 @@ fn count_issues(conn: &Connection) -> Result<CountResult> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn count_mrs(conn: &Connection) -> Result<CountResult> {
|
fn count_mrs(conn: &Connection) -> Result<CountResult> {
|
||||||
let count: i64 = conn.query_row("SELECT COUNT(*) FROM merge_requests", [], |row| row.get(0))?;
|
// Single query with conditional aggregation instead of 5 separate queries
|
||||||
|
let (count, opened, merged, closed, locked): (i64, i64, i64, i64, i64) = conn.query_row(
|
||||||
let opened: i64 = conn.query_row(
|
"SELECT
|
||||||
"SELECT COUNT(*) FROM merge_requests WHERE state = 'opened'",
|
COUNT(*),
|
||||||
|
COALESCE(SUM(CASE WHEN state = 'opened' THEN 1 ELSE 0 END), 0),
|
||||||
|
COALESCE(SUM(CASE WHEN state = 'merged' THEN 1 ELSE 0 END), 0),
|
||||||
|
COALESCE(SUM(CASE WHEN state = 'closed' THEN 1 ELSE 0 END), 0),
|
||||||
|
COALESCE(SUM(CASE WHEN state = 'locked' THEN 1 ELSE 0 END), 0)
|
||||||
|
FROM merge_requests",
|
||||||
[],
|
[],
|
||||||
|row| row.get(0),
|
|row| {
|
||||||
)?;
|
Ok((
|
||||||
|
row.get(0)?,
|
||||||
let merged: i64 = conn.query_row(
|
row.get(1)?,
|
||||||
"SELECT COUNT(*) FROM merge_requests WHERE state = 'merged'",
|
row.get(2)?,
|
||||||
[],
|
row.get(3)?,
|
||||||
|row| row.get(0),
|
row.get(4)?,
|
||||||
)?;
|
))
|
||||||
|
},
|
||||||
let closed: i64 = conn.query_row(
|
|
||||||
"SELECT COUNT(*) FROM merge_requests WHERE state = 'closed'",
|
|
||||||
[],
|
|
||||||
|row| row.get(0),
|
|
||||||
)?;
|
|
||||||
|
|
||||||
let locked: i64 = conn.query_row(
|
|
||||||
"SELECT COUNT(*) FROM merge_requests WHERE state = 'locked'",
|
|
||||||
[],
|
|
||||||
|row| row.get(0),
|
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
Ok(CountResult {
|
Ok(CountResult {
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ pub fn compute_next_attempt_at(now: i64, attempt_count: i64) -> i64 {
|
|||||||
let jitter_factor = rand::thread_rng().gen_range(0.9..=1.1);
|
let jitter_factor = rand::thread_rng().gen_range(0.9..=1.1);
|
||||||
let delay_with_jitter = (capped_delay_ms as f64 * jitter_factor) as i64;
|
let delay_with_jitter = (capped_delay_ms as f64 * jitter_factor) as i64;
|
||||||
|
|
||||||
now + delay_with_jitter
|
now.saturating_add(delay_with_jitter)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@@ -82,4 +82,11 @@ mod tests {
|
|||||||
let result = compute_next_attempt_at(now, i64::MAX);
|
let result = compute_next_attempt_at(now, i64::MAX);
|
||||||
assert!(result > now);
|
assert!(result > now);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_saturating_add_prevents_overflow() {
|
||||||
|
let now = i64::MAX - 10;
|
||||||
|
let result = compute_next_attempt_at(now, 30);
|
||||||
|
assert_eq!(result, i64::MAX);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -58,9 +58,13 @@ pub fn truncate_discussion(notes: &[NoteContent], max_bytes: usize) -> Truncatio
|
|||||||
}
|
}
|
||||||
|
|
||||||
let formatted: Vec<String> = notes.iter().map(format_note).collect();
|
let formatted: Vec<String> = notes.iter().map(format_note).collect();
|
||||||
let total: String = formatted.concat();
|
let total_len: usize = formatted.iter().map(|s| s.len()).sum();
|
||||||
|
|
||||||
if total.len() <= max_bytes {
|
if total_len <= max_bytes {
|
||||||
|
let mut total = String::with_capacity(total_len);
|
||||||
|
for s in &formatted {
|
||||||
|
total.push_str(s);
|
||||||
|
}
|
||||||
return TruncationResult {
|
return TruncationResult {
|
||||||
content: total,
|
content: total,
|
||||||
is_truncated: false,
|
is_truncated: false,
|
||||||
@@ -69,7 +73,7 @@ pub fn truncate_discussion(notes: &[NoteContent], max_bytes: usize) -> Truncatio
|
|||||||
}
|
}
|
||||||
|
|
||||||
if notes.len() == 1 {
|
if notes.len() == 1 {
|
||||||
let truncated = truncate_utf8(&total, max_bytes.saturating_sub(11));
|
let truncated = truncate_utf8(&formatted[0], max_bytes.saturating_sub(11));
|
||||||
let content = format!("{}[truncated]", truncated);
|
let content = format!("{}[truncated]", truncated);
|
||||||
return TruncationResult {
|
return TruncationResult {
|
||||||
content,
|
content,
|
||||||
|
|||||||
@@ -16,31 +16,26 @@ pub fn find_pending_documents(
|
|||||||
last_id: i64,
|
last_id: i64,
|
||||||
model_name: &str,
|
model_name: &str,
|
||||||
) -> Result<Vec<PendingDocument>> {
|
) -> Result<Vec<PendingDocument>> {
|
||||||
|
// Optimized query: LEFT JOIN + NULL check replaces triple-EXISTS pattern.
|
||||||
|
// This allows SQLite to scan embedding_metadata once instead of three times.
|
||||||
|
// Semantically identical: returns documents needing (re-)embedding when:
|
||||||
|
// - No embedding exists (em.document_id IS NULL)
|
||||||
|
// - Content hash changed (em.document_hash != d.content_hash)
|
||||||
|
// - Config mismatch (model/dims/chunk_max_bytes)
|
||||||
let sql = r#"
|
let sql = r#"
|
||||||
SELECT d.id, d.content_text, d.content_hash
|
SELECT d.id, d.content_text, d.content_hash
|
||||||
FROM documents d
|
FROM documents d
|
||||||
|
LEFT JOIN embedding_metadata em
|
||||||
|
ON em.document_id = d.id AND em.chunk_index = 0
|
||||||
WHERE d.id > ?1
|
WHERE d.id > ?1
|
||||||
AND (
|
AND (
|
||||||
NOT EXISTS (
|
em.document_id IS NULL
|
||||||
SELECT 1 FROM embedding_metadata em
|
OR em.document_hash != d.content_hash
|
||||||
WHERE em.document_id = d.id AND em.chunk_index = 0
|
OR em.chunk_max_bytes IS NULL
|
||||||
)
|
|
||||||
OR EXISTS (
|
|
||||||
SELECT 1 FROM embedding_metadata em
|
|
||||||
WHERE em.document_id = d.id AND em.chunk_index = 0
|
|
||||||
AND em.document_hash != d.content_hash
|
|
||||||
)
|
|
||||||
OR EXISTS (
|
|
||||||
SELECT 1 FROM embedding_metadata em
|
|
||||||
WHERE em.document_id = d.id AND em.chunk_index = 0
|
|
||||||
AND (
|
|
||||||
em.chunk_max_bytes IS NULL
|
|
||||||
OR em.chunk_max_bytes != ?3
|
OR em.chunk_max_bytes != ?3
|
||||||
OR em.model != ?4
|
OR em.model != ?4
|
||||||
OR em.dims != ?5
|
OR em.dims != ?5
|
||||||
)
|
)
|
||||||
)
|
|
||||||
)
|
|
||||||
ORDER BY d.id
|
ORDER BY d.id
|
||||||
LIMIT ?2
|
LIMIT ?2
|
||||||
"#;
|
"#;
|
||||||
@@ -69,31 +64,19 @@ pub fn find_pending_documents(
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn count_pending_documents(conn: &Connection, model_name: &str) -> Result<i64> {
|
pub fn count_pending_documents(conn: &Connection, model_name: &str) -> Result<i64> {
|
||||||
|
// Optimized query: LEFT JOIN + NULL check replaces triple-EXISTS pattern
|
||||||
let count: i64 = conn.query_row(
|
let count: i64 = conn.query_row(
|
||||||
r#"
|
r#"
|
||||||
SELECT COUNT(*)
|
SELECT COUNT(*)
|
||||||
FROM documents d
|
FROM documents d
|
||||||
WHERE (
|
LEFT JOIN embedding_metadata em
|
||||||
NOT EXISTS (
|
ON em.document_id = d.id AND em.chunk_index = 0
|
||||||
SELECT 1 FROM embedding_metadata em
|
WHERE em.document_id IS NULL
|
||||||
WHERE em.document_id = d.id AND em.chunk_index = 0
|
OR em.document_hash != d.content_hash
|
||||||
)
|
OR em.chunk_max_bytes IS NULL
|
||||||
OR EXISTS (
|
|
||||||
SELECT 1 FROM embedding_metadata em
|
|
||||||
WHERE em.document_id = d.id AND em.chunk_index = 0
|
|
||||||
AND em.document_hash != d.content_hash
|
|
||||||
)
|
|
||||||
OR EXISTS (
|
|
||||||
SELECT 1 FROM embedding_metadata em
|
|
||||||
WHERE em.document_id = d.id AND em.chunk_index = 0
|
|
||||||
AND (
|
|
||||||
em.chunk_max_bytes IS NULL
|
|
||||||
OR em.chunk_max_bytes != ?1
|
OR em.chunk_max_bytes != ?1
|
||||||
OR em.model != ?2
|
OR em.model != ?2
|
||||||
OR em.dims != ?3
|
OR em.dims != ?3
|
||||||
)
|
|
||||||
)
|
|
||||||
)
|
|
||||||
"#,
|
"#,
|
||||||
rusqlite::params![CHUNK_MAX_BYTES as i64, model_name, EXPECTED_DIMS as i64],
|
rusqlite::params![CHUNK_MAX_BYTES as i64, model_name, EXPECTED_DIMS as i64],
|
||||||
|row| row.get(0),
|
|row| row.get(0),
|
||||||
|
|||||||
@@ -103,7 +103,7 @@ async fn embed_page(
|
|||||||
total: usize,
|
total: usize,
|
||||||
progress_callback: &Option<Box<dyn Fn(usize, usize)>>,
|
progress_callback: &Option<Box<dyn Fn(usize, usize)>>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let mut all_chunks: Vec<ChunkWork> = Vec::new();
|
let mut all_chunks: Vec<ChunkWork> = Vec::with_capacity(pending.len() * 3);
|
||||||
let mut page_normal_docs: usize = 0;
|
let mut page_normal_docs: usize = 0;
|
||||||
|
|
||||||
for doc in pending {
|
for doc in pending {
|
||||||
@@ -159,7 +159,7 @@ async fn embed_page(
|
|||||||
page_normal_docs += 1;
|
page_normal_docs += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut cleared_docs: HashSet<i64> = HashSet::new();
|
let mut cleared_docs: HashSet<i64> = HashSet::with_capacity(pending.len());
|
||||||
|
|
||||||
for batch in all_chunks.chunks(BATCH_SIZE) {
|
for batch in all_chunks.chunks(BATCH_SIZE) {
|
||||||
let texts: Vec<String> = batch.iter().map(|c| c.text.clone()).collect();
|
let texts: Vec<String> = batch.iter().map(|c| c.text.clone()).collect();
|
||||||
|
|||||||
@@ -97,14 +97,20 @@ pub fn apply_filters(
|
|||||||
param_idx += 1;
|
param_idx += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
for label in &filters.labels {
|
if !filters.labels.is_empty() {
|
||||||
|
let placeholders: Vec<String> = (0..filters.labels.len())
|
||||||
|
.map(|i| format!("?{}", param_idx + i))
|
||||||
|
.collect();
|
||||||
sql.push_str(&format!(
|
sql.push_str(&format!(
|
||||||
" AND EXISTS (SELECT 1 FROM document_labels dl WHERE dl.document_id = d.id AND dl.label_name = ?{})",
|
" AND EXISTS (SELECT 1 FROM document_labels dl WHERE dl.document_id = d.id AND dl.label_name IN ({}) GROUP BY dl.document_id HAVING COUNT(DISTINCT dl.label_name) = {})",
|
||||||
param_idx
|
placeholders.join(","),
|
||||||
|
filters.labels.len()
|
||||||
));
|
));
|
||||||
|
for label in &filters.labels {
|
||||||
params.push(Box::new(label.clone()));
|
params.push(Box::new(label.clone()));
|
||||||
param_idx += 1;
|
param_idx += 1;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if let Some(ref path_filter) = filters.path {
|
if let Some(ref path_filter) = filters.path {
|
||||||
match path_filter {
|
match path_filter {
|
||||||
|
|||||||
@@ -23,22 +23,25 @@ pub fn to_fts_query(raw: &str, mode: FtsQueryMode) -> String {
|
|||||||
return String::new();
|
return String::new();
|
||||||
}
|
}
|
||||||
|
|
||||||
let tokens: Vec<String> = trimmed
|
let mut result = String::with_capacity(trimmed.len() + 20);
|
||||||
.split_whitespace()
|
for (i, token) in trimmed.split_whitespace().enumerate() {
|
||||||
.map(|token| {
|
if i > 0 {
|
||||||
|
result.push(' ');
|
||||||
|
}
|
||||||
if let Some(stem) = token.strip_suffix('*')
|
if let Some(stem) = token.strip_suffix('*')
|
||||||
&& !stem.is_empty()
|
&& !stem.is_empty()
|
||||||
&& stem.chars().all(|c| c.is_alphanumeric() || c == '_')
|
&& stem.chars().all(|c| c.is_alphanumeric() || c == '_')
|
||||||
{
|
{
|
||||||
let escaped = stem.replace('"', "\"\"");
|
result.push('"');
|
||||||
return format!("\"{}\"*", escaped);
|
result.push_str(&stem.replace('"', "\"\""));
|
||||||
|
result.push_str("\"*");
|
||||||
|
} else {
|
||||||
|
result.push('"');
|
||||||
|
result.push_str(&token.replace('"', "\"\""));
|
||||||
|
result.push('"');
|
||||||
}
|
}
|
||||||
let escaped = token.replace('"', "\"\"");
|
}
|
||||||
format!("\"{}\"", escaped)
|
result
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
tokens.join(" ")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user