fix: Savepoint leak in embedding pipeline, atomic fail_job, RRF dedup

Three correctness fixes found during peer code review:

Embedding pipeline savepoint leak (HIGH severity):
The SAVEPOINT embed_page / RELEASE embed_page pattern had ~10 `?`
propagation points between them. Any error from record_embedding_error,
clear_document_embeddings, or store_embedding would exit the function
without rolling back, leaving the SQLite connection in a broken
transactional state and causing cascading failures for the rest of the
session. Fixed by extracting page processing into `embed_page()` and
wrapping with explicit rollback-on-error handling.

Dependent queue fail_job race (MEDIUM severity):
fail_job performed a SELECT followed by a separate UPDATE on the
attempts counter without a transaction. Under concurrent lock
reclamation, the attempts value could be read stale. Replaced with a
single atomic UPDATE that increments attempts and computes exponential
backoff entirely in SQL, also halving DB round-trips. Added explicit
error when the job no longer exists.

RRF duplicate document score inflation (MEDIUM severity):
If a retriever returned the same document_id multiple times, the RRF
score accumulated multiple rank contributions while the rank only
recorded the first occurrence. Moved the score accumulation inside the
`if is_none` guard so only the first occurrence per list contributes.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Taylor Eernisse
2026-02-04 14:16:38 -05:00
parent 266ed78e73
commit 1fdc6d03cc
3 changed files with 279 additions and 235 deletions

View File

@@ -122,28 +122,30 @@ pub fn complete_job(conn: &Connection, job_id: i64) -> Result<()> {
/// Mark a job as failed. Increments attempts, sets next_retry_at with exponential /// Mark a job as failed. Increments attempts, sets next_retry_at with exponential
/// backoff, clears locked_at, and records the error. /// backoff, clears locked_at, and records the error.
/// ///
/// Backoff: 30s * 2^(attempts-1), capped at 480s. /// Backoff: 30s * 2^(attempts), capped at 480s. Uses a single atomic UPDATE
/// to avoid a read-then-write race on the `attempts` counter.
pub fn fail_job(conn: &Connection, job_id: i64, error: &str) -> Result<()> { pub fn fail_job(conn: &Connection, job_id: i64, error: &str) -> Result<()> {
let now = now_ms(); let now = now_ms();
// Get current attempts (propagate error if job no longer exists) // Atomic increment + backoff calculation in one UPDATE.
let current_attempts: i32 = conn.query_row( // MIN(attempts, 4) caps the shift to prevent overflow; the overall
"SELECT attempts FROM pending_dependent_fetches WHERE id = ?1", // backoff is clamped to 480 000 ms via MIN(..., 480000).
rusqlite::params![job_id], let changes = conn.execute(
|row| row.get(0),
)?;
let new_attempts = current_attempts + 1;
let backoff_ms: i64 = (30_000i64 * (1i64 << (new_attempts - 1).min(4))).min(480_000);
let next_retry = now + backoff_ms;
conn.execute(
"UPDATE pending_dependent_fetches "UPDATE pending_dependent_fetches
SET attempts = ?1, next_retry_at = ?2, locked_at = NULL, last_error = ?3 SET attempts = attempts + 1,
WHERE id = ?4", next_retry_at = ?1 + MIN(30000 * (1 << MIN(attempts, 4)), 480000),
rusqlite::params![new_attempts, next_retry, error, job_id], locked_at = NULL,
last_error = ?2
WHERE id = ?3",
rusqlite::params![now, error, job_id],
)?; )?;
if changes == 0 {
return Err(crate::core::error::LoreError::Other(
"fail_job: job not found (may have been reclaimed or completed)".into(),
));
}
Ok(()) Ok(())
} }

View File

@@ -66,19 +66,76 @@ pub async fn embed_documents(
// process crashes mid-page, the savepoint is never released and // process crashes mid-page, the savepoint is never released and
// SQLite rolls back — preventing partial document states where old // SQLite rolls back — preventing partial document states where old
// embeddings are cleared but new ones haven't been written yet. // embeddings are cleared but new ones haven't been written yet.
//
// We use a closure + match to ensure the savepoint is always
// rolled back on error — bare `execute_batch("SAVEPOINT")` with `?`
// propagation would leak the savepoint and leave the connection in
// a broken transactional state.
conn.execute_batch("SAVEPOINT embed_page")?; conn.execute_batch("SAVEPOINT embed_page")?;
let page_result = embed_page(
conn,
client,
model_name,
&pending,
&mut result,
&mut last_id,
&mut processed,
total,
&progress_callback,
)
.await;
match page_result {
Ok(()) => {
conn.execute_batch("RELEASE embed_page")?;
}
Err(e) => {
let _ = conn.execute_batch("ROLLBACK TO embed_page; RELEASE embed_page");
return Err(e);
}
}
}
info!(
embedded = result.embedded,
failed = result.failed,
skipped = result.skipped,
"Embedding pipeline complete"
);
tracing::Span::current().record("items_processed", result.embedded);
tracing::Span::current().record("items_skipped", result.skipped);
tracing::Span::current().record("errors", result.failed);
Ok(result)
}
/// Process a single page of pending documents within an active savepoint.
///
/// All `?` propagation from this function is caught by the caller, which
/// rolls back the savepoint on error.
#[allow(clippy::too_many_arguments)]
async fn embed_page(
conn: &Connection,
client: &OllamaClient,
model_name: &str,
pending: &[crate::embedding::change_detector::PendingDocument],
result: &mut EmbedResult,
last_id: &mut i64,
processed: &mut usize,
total: usize,
progress_callback: &Option<Box<dyn Fn(usize, usize)>>,
) -> Result<()> {
// Build chunk work items for this page // Build chunk work items for this page
let mut all_chunks: Vec<ChunkWork> = Vec::new(); let mut all_chunks: Vec<ChunkWork> = Vec::new();
let mut page_normal_docs: usize = 0; let mut page_normal_docs: usize = 0;
for doc in &pending { for doc in pending {
// Always advance the cursor, even for skipped docs, to avoid re-fetching // Always advance the cursor, even for skipped docs, to avoid re-fetching
last_id = doc.document_id; *last_id = doc.document_id;
if doc.content_text.is_empty() { if doc.content_text.is_empty() {
result.skipped += 1; result.skipped += 1;
processed += 1; *processed += 1;
continue; continue;
} }
@@ -110,9 +167,9 @@ pub async fn embed_documents(
), ),
)?; )?;
result.skipped += 1; result.skipped += 1;
processed += 1; *processed += 1;
if let Some(ref cb) = progress_callback { if let Some(cb) = progress_callback {
cb(processed, total); cb(*processed, total);
} }
continue; continue;
} }
@@ -212,9 +269,7 @@ pub async fn embed_documents(
|| (err_lower.contains("413") && err_lower.contains("http")); || (err_lower.contains("413") && err_lower.contains("http"));
if is_context_error && batch.len() > 1 { if is_context_error && batch.len() > 1 {
warn!( warn!("Batch failed with context length error, retrying chunks individually");
"Batch failed with context length error, retrying chunks individually"
);
for chunk in batch { for chunk in batch {
match client.embed_batch(vec![chunk.text.clone()]).await { match client.embed_batch(vec![chunk.text.clone()]).await {
Ok(embeddings) Ok(embeddings)
@@ -280,27 +335,12 @@ pub async fn embed_documents(
// Fire progress for all normal documents after embedding completes. // Fire progress for all normal documents after embedding completes.
// This ensures progress reflects actual embedding work, not just chunking. // This ensures progress reflects actual embedding work, not just chunking.
processed += page_normal_docs; *processed += page_normal_docs;
if let Some(ref cb) = progress_callback { if let Some(cb) = progress_callback {
cb(processed, total); cb(*processed, total);
} }
// Commit all DB writes for this page atomically. Ok(())
conn.execute_batch("RELEASE embed_page")?;
}
info!(
embedded = result.embedded,
failed = result.failed,
skipped = result.skipped,
"Embedding pipeline complete"
);
tracing::Span::current().record("items_processed", result.embedded);
tracing::Span::current().record("items_skipped", result.skipped);
tracing::Span::current().record("errors", result.failed);
Ok(result)
} }
/// Clear all embeddings and metadata for a document. /// Clear all embeddings and metadata for a document.

View File

@@ -33,8 +33,10 @@ pub fn rank_rrf(vector_results: &[(i64, f64)], fts_results: &[(i64, f64)]) -> Ve
for (i, &(doc_id, _)) in vector_results.iter().enumerate() { for (i, &(doc_id, _)) in vector_results.iter().enumerate() {
let rank = i + 1; // 1-indexed let rank = i + 1; // 1-indexed
let entry = scores.entry(doc_id).or_insert((0.0, None, None)); let entry = scores.entry(doc_id).or_insert((0.0, None, None));
entry.0 += 1.0 / (RRF_K + rank as f64); // Only count the first occurrence per list to prevent duplicates
// from inflating the score.
if entry.1.is_none() { if entry.1.is_none() {
entry.0 += 1.0 / (RRF_K + rank as f64);
entry.1 = Some(rank); entry.1 = Some(rank);
} }
} }
@@ -42,8 +44,8 @@ pub fn rank_rrf(vector_results: &[(i64, f64)], fts_results: &[(i64, f64)]) -> Ve
for (i, &(doc_id, _)) in fts_results.iter().enumerate() { for (i, &(doc_id, _)) in fts_results.iter().enumerate() {
let rank = i + 1; // 1-indexed let rank = i + 1; // 1-indexed
let entry = scores.entry(doc_id).or_insert((0.0, None, None)); let entry = scores.entry(doc_id).or_insert((0.0, None, None));
entry.0 += 1.0 / (RRF_K + rank as f64);
if entry.2.is_none() { if entry.2.is_none() {
entry.0 += 1.0 / (RRF_K + rank as f64);
entry.2 = Some(rank); entry.2 = Some(rank);
} }
} }