diff --git a/src/core/dependent_queue.rs b/src/core/dependent_queue.rs index bbd4f2d..7fc1263 100644 --- a/src/core/dependent_queue.rs +++ b/src/core/dependent_queue.rs @@ -122,28 +122,30 @@ pub fn complete_job(conn: &Connection, job_id: i64) -> Result<()> { /// Mark a job as failed. Increments attempts, sets next_retry_at with exponential /// backoff, clears locked_at, and records the error. /// -/// Backoff: 30s * 2^(attempts-1), capped at 480s. +/// Backoff: 30s * 2^(attempts), capped at 480s. Uses a single atomic UPDATE +/// to avoid a read-then-write race on the `attempts` counter. pub fn fail_job(conn: &Connection, job_id: i64, error: &str) -> Result<()> { let now = now_ms(); - // Get current attempts (propagate error if job no longer exists) - let current_attempts: i32 = conn.query_row( - "SELECT attempts FROM pending_dependent_fetches WHERE id = ?1", - rusqlite::params![job_id], - |row| row.get(0), - )?; - - let new_attempts = current_attempts + 1; - let backoff_ms: i64 = (30_000i64 * (1i64 << (new_attempts - 1).min(4))).min(480_000); - let next_retry = now + backoff_ms; - - conn.execute( + // Atomic increment + backoff calculation in one UPDATE. + // MIN(attempts, 4) caps the shift to prevent overflow; the overall + // backoff is clamped to 480 000 ms via MIN(..., 480000). + let changes = conn.execute( "UPDATE pending_dependent_fetches - SET attempts = ?1, next_retry_at = ?2, locked_at = NULL, last_error = ?3 - WHERE id = ?4", - rusqlite::params![new_attempts, next_retry, error, job_id], + SET attempts = attempts + 1, + next_retry_at = ?1 + MIN(30000 * (1 << MIN(attempts, 4)), 480000), + locked_at = NULL, + last_error = ?2 + WHERE id = ?3", + rusqlite::params![now, error, job_id], )?; + if changes == 0 { + return Err(crate::core::error::LoreError::Other( + "fail_job: job not found (may have been reclaimed or completed)".into(), + )); + } + Ok(()) } diff --git a/src/embedding/pipeline.rs b/src/embedding/pipeline.rs index 56e7132..74de771 100644 --- a/src/embedding/pipeline.rs +++ b/src/embedding/pipeline.rs @@ -66,227 +66,33 @@ pub async fn embed_documents( // process crashes mid-page, the savepoint is never released and // SQLite rolls back — preventing partial document states where old // embeddings are cleared but new ones haven't been written yet. + // + // We use a closure + match to ensure the savepoint is always + // rolled back on error — bare `execute_batch("SAVEPOINT")` with `?` + // propagation would leak the savepoint and leave the connection in + // a broken transactional state. conn.execute_batch("SAVEPOINT embed_page")?; - - // Build chunk work items for this page - let mut all_chunks: Vec = Vec::new(); - let mut page_normal_docs: usize = 0; - - for doc in &pending { - // Always advance the cursor, even for skipped docs, to avoid re-fetching - last_id = doc.document_id; - - if doc.content_text.is_empty() { - result.skipped += 1; - processed += 1; - continue; + let page_result = embed_page( + conn, + client, + model_name, + &pending, + &mut result, + &mut last_id, + &mut processed, + total, + &progress_callback, + ) + .await; + match page_result { + Ok(()) => { + conn.execute_batch("RELEASE embed_page")?; } - - let chunks = split_into_chunks(&doc.content_text); - let total_chunks = chunks.len(); - - // Overflow guard: skip documents that produce too many chunks. - // Must run BEFORE clear_document_embeddings so existing embeddings - // are preserved when we skip. - if total_chunks as i64 > CHUNK_ROWID_MULTIPLIER { - warn!( - doc_id = doc.document_id, - chunk_count = total_chunks, - max = CHUNK_ROWID_MULTIPLIER, - "Document produces too many chunks, skipping to prevent rowid collision" - ); - // Record a sentinel error so the document is not re-detected as - // pending on subsequent runs (prevents infinite re-processing). - record_embedding_error( - conn, - doc.document_id, - 0, // sentinel chunk_index - &doc.content_hash, - "overflow-sentinel", - model_name, - &format!( - "Document produces {} chunks, exceeding max {}", - total_chunks, CHUNK_ROWID_MULTIPLIER - ), - )?; - result.skipped += 1; - processed += 1; - if let Some(ref cb) = progress_callback { - cb(processed, total); - } - continue; - } - - // Don't clear existing embeddings here — defer until the first - // successful chunk embedding so that if ALL chunks for a document - // fail, old embeddings survive instead of leaving zero data. - - for (chunk_index, text) in chunks { - all_chunks.push(ChunkWork { - doc_id: doc.document_id, - chunk_index, - total_chunks, - doc_hash: doc.content_hash.clone(), - chunk_hash: sha256_hash(&text), - text, - }); - } - - page_normal_docs += 1; - // Don't fire progress here — wait until embedding completes below. - } - - // Track documents whose old embeddings have been cleared. - // We defer clearing until the first successful chunk embedding so - // that if ALL chunks for a document fail, old embeddings survive. - let mut cleared_docs: HashSet = HashSet::new(); - - // Process chunks in batches of BATCH_SIZE - for batch in all_chunks.chunks(BATCH_SIZE) { - let texts: Vec = batch.iter().map(|c| c.text.clone()).collect(); - - match client.embed_batch(texts).await { - Ok(embeddings) => { - for (i, embedding) in embeddings.iter().enumerate() { - if i >= batch.len() { - break; - } - let chunk = &batch[i]; - - if embedding.len() != EXPECTED_DIMS { - warn!( - doc_id = chunk.doc_id, - chunk_index = chunk.chunk_index, - got_dims = embedding.len(), - expected = EXPECTED_DIMS, - "Dimension mismatch, skipping" - ); - record_embedding_error( - conn, - chunk.doc_id, - chunk.chunk_index, - &chunk.doc_hash, - &chunk.chunk_hash, - model_name, - &format!( - "Dimension mismatch: got {}, expected {}", - embedding.len(), - EXPECTED_DIMS - ), - )?; - result.failed += 1; - continue; - } - - // Clear old embeddings on first successful chunk for this document - if !cleared_docs.contains(&chunk.doc_id) { - clear_document_embeddings(conn, chunk.doc_id)?; - cleared_docs.insert(chunk.doc_id); - } - - store_embedding( - conn, - chunk.doc_id, - chunk.chunk_index, - &chunk.doc_hash, - &chunk.chunk_hash, - model_name, - embedding, - chunk.total_chunks, - )?; - result.embedded += 1; - } - } - Err(e) => { - // Batch failed — retry each chunk individually so one - // oversized chunk doesn't poison the entire batch. - let err_str = e.to_string(); - let err_lower = err_str.to_lowercase(); - // Ollama error messages vary across versions. Match broadly - // against known patterns to detect context-window overflow. - let is_context_error = err_lower.contains("context length") - || err_lower.contains("too long") - || err_lower.contains("maximum context") - || err_lower.contains("token limit") - || err_lower.contains("exceeds") - || (err_lower.contains("413") && err_lower.contains("http")); - - if is_context_error && batch.len() > 1 { - warn!( - "Batch failed with context length error, retrying chunks individually" - ); - for chunk in batch { - match client.embed_batch(vec![chunk.text.clone()]).await { - Ok(embeddings) - if !embeddings.is_empty() - && embeddings[0].len() == EXPECTED_DIMS => - { - // Clear old embeddings on first successful chunk - if !cleared_docs.contains(&chunk.doc_id) { - clear_document_embeddings(conn, chunk.doc_id)?; - cleared_docs.insert(chunk.doc_id); - } - - store_embedding( - conn, - chunk.doc_id, - chunk.chunk_index, - &chunk.doc_hash, - &chunk.chunk_hash, - model_name, - &embeddings[0], - chunk.total_chunks, - )?; - result.embedded += 1; - } - _ => { - warn!( - doc_id = chunk.doc_id, - chunk_index = chunk.chunk_index, - chunk_bytes = chunk.text.len(), - "Chunk too large for model context window" - ); - record_embedding_error( - conn, - chunk.doc_id, - chunk.chunk_index, - &chunk.doc_hash, - &chunk.chunk_hash, - model_name, - "Chunk exceeds model context window", - )?; - result.failed += 1; - } - } - } - } else { - warn!(error = %e, "Batch embedding failed"); - for chunk in batch { - record_embedding_error( - conn, - chunk.doc_id, - chunk.chunk_index, - &chunk.doc_hash, - &chunk.chunk_hash, - model_name, - &e.to_string(), - )?; - result.failed += 1; - } - } - } + Err(e) => { + let _ = conn.execute_batch("ROLLBACK TO embed_page; RELEASE embed_page"); + return Err(e); } } - - // Fire progress for all normal documents after embedding completes. - // This ensures progress reflects actual embedding work, not just chunking. - processed += page_normal_docs; - if let Some(ref cb) = progress_callback { - cb(processed, total); - } - - // Commit all DB writes for this page atomically. - conn.execute_batch("RELEASE embed_page")?; } info!( @@ -303,6 +109,240 @@ pub async fn embed_documents( Ok(result) } +/// Process a single page of pending documents within an active savepoint. +/// +/// All `?` propagation from this function is caught by the caller, which +/// rolls back the savepoint on error. +#[allow(clippy::too_many_arguments)] +async fn embed_page( + conn: &Connection, + client: &OllamaClient, + model_name: &str, + pending: &[crate::embedding::change_detector::PendingDocument], + result: &mut EmbedResult, + last_id: &mut i64, + processed: &mut usize, + total: usize, + progress_callback: &Option>, +) -> Result<()> { + // Build chunk work items for this page + let mut all_chunks: Vec = Vec::new(); + let mut page_normal_docs: usize = 0; + + for doc in pending { + // Always advance the cursor, even for skipped docs, to avoid re-fetching + *last_id = doc.document_id; + + if doc.content_text.is_empty() { + result.skipped += 1; + *processed += 1; + continue; + } + + let chunks = split_into_chunks(&doc.content_text); + let total_chunks = chunks.len(); + + // Overflow guard: skip documents that produce too many chunks. + // Must run BEFORE clear_document_embeddings so existing embeddings + // are preserved when we skip. + if total_chunks as i64 > CHUNK_ROWID_MULTIPLIER { + warn!( + doc_id = doc.document_id, + chunk_count = total_chunks, + max = CHUNK_ROWID_MULTIPLIER, + "Document produces too many chunks, skipping to prevent rowid collision" + ); + // Record a sentinel error so the document is not re-detected as + // pending on subsequent runs (prevents infinite re-processing). + record_embedding_error( + conn, + doc.document_id, + 0, // sentinel chunk_index + &doc.content_hash, + "overflow-sentinel", + model_name, + &format!( + "Document produces {} chunks, exceeding max {}", + total_chunks, CHUNK_ROWID_MULTIPLIER + ), + )?; + result.skipped += 1; + *processed += 1; + if let Some(cb) = progress_callback { + cb(*processed, total); + } + continue; + } + + // Don't clear existing embeddings here — defer until the first + // successful chunk embedding so that if ALL chunks for a document + // fail, old embeddings survive instead of leaving zero data. + + for (chunk_index, text) in chunks { + all_chunks.push(ChunkWork { + doc_id: doc.document_id, + chunk_index, + total_chunks, + doc_hash: doc.content_hash.clone(), + chunk_hash: sha256_hash(&text), + text, + }); + } + + page_normal_docs += 1; + // Don't fire progress here — wait until embedding completes below. + } + + // Track documents whose old embeddings have been cleared. + // We defer clearing until the first successful chunk embedding so + // that if ALL chunks for a document fail, old embeddings survive. + let mut cleared_docs: HashSet = HashSet::new(); + + // Process chunks in batches of BATCH_SIZE + for batch in all_chunks.chunks(BATCH_SIZE) { + let texts: Vec = batch.iter().map(|c| c.text.clone()).collect(); + + match client.embed_batch(texts).await { + Ok(embeddings) => { + for (i, embedding) in embeddings.iter().enumerate() { + if i >= batch.len() { + break; + } + let chunk = &batch[i]; + + if embedding.len() != EXPECTED_DIMS { + warn!( + doc_id = chunk.doc_id, + chunk_index = chunk.chunk_index, + got_dims = embedding.len(), + expected = EXPECTED_DIMS, + "Dimension mismatch, skipping" + ); + record_embedding_error( + conn, + chunk.doc_id, + chunk.chunk_index, + &chunk.doc_hash, + &chunk.chunk_hash, + model_name, + &format!( + "Dimension mismatch: got {}, expected {}", + embedding.len(), + EXPECTED_DIMS + ), + )?; + result.failed += 1; + continue; + } + + // Clear old embeddings on first successful chunk for this document + if !cleared_docs.contains(&chunk.doc_id) { + clear_document_embeddings(conn, chunk.doc_id)?; + cleared_docs.insert(chunk.doc_id); + } + + store_embedding( + conn, + chunk.doc_id, + chunk.chunk_index, + &chunk.doc_hash, + &chunk.chunk_hash, + model_name, + embedding, + chunk.total_chunks, + )?; + result.embedded += 1; + } + } + Err(e) => { + // Batch failed — retry each chunk individually so one + // oversized chunk doesn't poison the entire batch. + let err_str = e.to_string(); + let err_lower = err_str.to_lowercase(); + // Ollama error messages vary across versions. Match broadly + // against known patterns to detect context-window overflow. + let is_context_error = err_lower.contains("context length") + || err_lower.contains("too long") + || err_lower.contains("maximum context") + || err_lower.contains("token limit") + || err_lower.contains("exceeds") + || (err_lower.contains("413") && err_lower.contains("http")); + + if is_context_error && batch.len() > 1 { + warn!("Batch failed with context length error, retrying chunks individually"); + for chunk in batch { + match client.embed_batch(vec![chunk.text.clone()]).await { + Ok(embeddings) + if !embeddings.is_empty() + && embeddings[0].len() == EXPECTED_DIMS => + { + // Clear old embeddings on first successful chunk + if !cleared_docs.contains(&chunk.doc_id) { + clear_document_embeddings(conn, chunk.doc_id)?; + cleared_docs.insert(chunk.doc_id); + } + + store_embedding( + conn, + chunk.doc_id, + chunk.chunk_index, + &chunk.doc_hash, + &chunk.chunk_hash, + model_name, + &embeddings[0], + chunk.total_chunks, + )?; + result.embedded += 1; + } + _ => { + warn!( + doc_id = chunk.doc_id, + chunk_index = chunk.chunk_index, + chunk_bytes = chunk.text.len(), + "Chunk too large for model context window" + ); + record_embedding_error( + conn, + chunk.doc_id, + chunk.chunk_index, + &chunk.doc_hash, + &chunk.chunk_hash, + model_name, + "Chunk exceeds model context window", + )?; + result.failed += 1; + } + } + } + } else { + warn!(error = %e, "Batch embedding failed"); + for chunk in batch { + record_embedding_error( + conn, + chunk.doc_id, + chunk.chunk_index, + &chunk.doc_hash, + &chunk.chunk_hash, + model_name, + &e.to_string(), + )?; + result.failed += 1; + } + } + } + } + } + + // Fire progress for all normal documents after embedding completes. + // This ensures progress reflects actual embedding work, not just chunking. + *processed += page_normal_docs; + if let Some(cb) = progress_callback { + cb(*processed, total); + } + + Ok(()) +} + /// Clear all embeddings and metadata for a document. fn clear_document_embeddings(conn: &Connection, document_id: i64) -> Result<()> { conn.execute( diff --git a/src/search/rrf.rs b/src/search/rrf.rs index 67aed40..b8b6e88 100644 --- a/src/search/rrf.rs +++ b/src/search/rrf.rs @@ -33,8 +33,10 @@ pub fn rank_rrf(vector_results: &[(i64, f64)], fts_results: &[(i64, f64)]) -> Ve for (i, &(doc_id, _)) in vector_results.iter().enumerate() { let rank = i + 1; // 1-indexed let entry = scores.entry(doc_id).or_insert((0.0, None, None)); - entry.0 += 1.0 / (RRF_K + rank as f64); + // Only count the first occurrence per list to prevent duplicates + // from inflating the score. if entry.1.is_none() { + entry.0 += 1.0 / (RRF_K + rank as f64); entry.1 = Some(rank); } } @@ -42,8 +44,8 @@ pub fn rank_rrf(vector_results: &[(i64, f64)], fts_results: &[(i64, f64)]) -> Ve for (i, &(doc_id, _)) in fts_results.iter().enumerate() { let rank = i + 1; // 1-indexed let entry = scores.entry(doc_id).or_insert((0.0, None, None)); - entry.0 += 1.0 / (RRF_K + rank as f64); if entry.2.is_none() { + entry.0 += 1.0 / (RRF_K + rank as f64); entry.2 = Some(rank); } }