diff --git a/src/embedding/change_detector.rs b/src/embedding/change_detector.rs index 76d8b4a..1445c99 100644 --- a/src/embedding/change_detector.rs +++ b/src/embedding/change_detector.rs @@ -3,6 +3,7 @@ use rusqlite::Connection; use crate::core::error::Result; +use crate::embedding::chunking::{CHUNK_MAX_BYTES, EXPECTED_DIMS}; /// A document that needs embedding or re-embedding. #[derive(Debug)] @@ -12,17 +13,20 @@ pub struct PendingDocument { pub content_hash: String, } -/// Find documents that need embedding: new (no metadata) or changed (hash mismatch). +/// Find documents that need embedding: new (no metadata), changed (hash mismatch), +/// or config-drifted (chunk_max_bytes/model/dims mismatch). /// /// Uses keyset pagination (WHERE d.id > last_id) and returns up to `page_size` results. pub fn find_pending_documents( conn: &Connection, page_size: usize, last_id: i64, + model_name: &str, ) -> Result> { // Documents that either: // 1. Have no embedding_metadata at all (new) // 2. Have metadata where document_hash != content_hash (changed) + // 3. Config drift: chunk_max_bytes, model, or dims mismatch (or pre-migration NULL) let sql = r#" SELECT d.id, d.content_text, d.content_hash FROM documents d @@ -37,6 +41,16 @@ pub fn find_pending_documents( WHERE em.document_id = d.id AND em.chunk_index = 0 AND em.document_hash != d.content_hash ) + OR EXISTS ( + SELECT 1 FROM embedding_metadata em + WHERE em.document_id = d.id AND em.chunk_index = 0 + AND ( + em.chunk_max_bytes IS NULL + OR em.chunk_max_bytes != ?3 + OR em.model != ?4 + OR em.dims != ?5 + ) + ) ) ORDER BY d.id LIMIT ?2 @@ -44,35 +58,56 @@ pub fn find_pending_documents( let mut stmt = conn.prepare(sql)?; let rows = stmt - .query_map(rusqlite::params![last_id, page_size as i64], |row| { - Ok(PendingDocument { - document_id: row.get(0)?, - content_text: row.get(1)?, - content_hash: row.get(2)?, - }) - })? + .query_map( + rusqlite::params![ + last_id, + page_size as i64, + CHUNK_MAX_BYTES as i64, + model_name, + EXPECTED_DIMS as i64, + ], + |row| { + Ok(PendingDocument { + document_id: row.get(0)?, + content_text: row.get(1)?, + content_hash: row.get(2)?, + }) + }, + )? .collect::, _>>()?; Ok(rows) } /// Count total documents that need embedding. -pub fn count_pending_documents(conn: &Connection) -> Result { +pub fn count_pending_documents(conn: &Connection, model_name: &str) -> Result { let count: i64 = conn.query_row( r#" SELECT COUNT(*) FROM documents d - WHERE NOT EXISTS ( - SELECT 1 FROM embedding_metadata em - WHERE em.document_id = d.id AND em.chunk_index = 0 - ) - OR EXISTS ( - SELECT 1 FROM embedding_metadata em - WHERE em.document_id = d.id AND em.chunk_index = 0 - AND em.document_hash != d.content_hash + WHERE ( + NOT EXISTS ( + SELECT 1 FROM embedding_metadata em + WHERE em.document_id = d.id AND em.chunk_index = 0 + ) + OR EXISTS ( + SELECT 1 FROM embedding_metadata em + WHERE em.document_id = d.id AND em.chunk_index = 0 + AND em.document_hash != d.content_hash + ) + OR EXISTS ( + SELECT 1 FROM embedding_metadata em + WHERE em.document_id = d.id AND em.chunk_index = 0 + AND ( + em.chunk_max_bytes IS NULL + OR em.chunk_max_bytes != ?1 + OR em.model != ?2 + OR em.dims != ?3 + ) + ) ) "#, - [], + rusqlite::params![CHUNK_MAX_BYTES as i64, model_name, EXPECTED_DIMS as i64], |row| row.get(0), )?; Ok(count) diff --git a/src/embedding/chunk_ids.rs b/src/embedding/chunk_ids.rs index 3b546e0..dce1b08 100644 --- a/src/embedding/chunk_ids.rs +++ b/src/embedding/chunk_ids.rs @@ -1,5 +1,7 @@ /// Multiplier for encoding (document_id, chunk_index) into a single rowid. -/// Supports up to 1000 chunks per document (32M chars at 32k/chunk). +/// Supports up to 1000 chunks per document. At CHUNK_MAX_BYTES=6000, +/// a 2MB document (MAX_DOCUMENT_BYTES_HARD) produces ~333 chunks. +/// The pipeline enforces chunk_count < CHUNK_ROWID_MULTIPLIER at runtime. pub const CHUNK_ROWID_MULTIPLIER: i64 = 1000; /// Encode (document_id, chunk_index) into a sqlite-vec rowid. diff --git a/src/embedding/chunking.rs b/src/embedding/chunking.rs index 8314f33..770e517 100644 --- a/src/embedding/chunking.rs +++ b/src/embedding/chunking.rs @@ -2,11 +2,19 @@ /// Maximum bytes per chunk. /// Named `_BYTES` because `str::len()` returns byte count; multi-byte UTF-8 -/// sequences mean byte length ≥ char count. -pub const CHUNK_MAX_BYTES: usize = 32_000; +/// sequences mean byte length >= char count. +/// +/// nomic-embed-text has an 8,192-token context window. English prose averages +/// ~4 chars/token, but technical content (code, URLs, JSON) can be 1-2 +/// chars/token. We use 6,000 bytes as a conservative limit that stays safe +/// even for code-heavy chunks (~6,000 tokens worst-case). +pub const CHUNK_MAX_BYTES: usize = 6_000; + +/// Expected embedding dimensions for nomic-embed-text. +pub const EXPECTED_DIMS: usize = 768; /// Character overlap between adjacent chunks. -pub const CHUNK_OVERLAP_CHARS: usize = 500; +pub const CHUNK_OVERLAP_CHARS: usize = 200; /// Split document content into chunks suitable for embedding. /// diff --git a/src/embedding/pipeline.rs b/src/embedding/pipeline.rs index 022e0fa..20b57a4 100644 --- a/src/embedding/pipeline.rs +++ b/src/embedding/pipeline.rs @@ -1,18 +1,19 @@ //! Async embedding pipeline: chunk documents, embed via Ollama, store in sqlite-vec. +use std::collections::HashSet; + use rusqlite::Connection; use sha2::{Digest, Sha256}; use tracing::{info, warn}; use crate::core::error::Result; use crate::embedding::change_detector::{count_pending_documents, find_pending_documents}; -use crate::embedding::chunk_ids::encode_rowid; -use crate::embedding::chunking::split_into_chunks; +use crate::embedding::chunk_ids::{encode_rowid, CHUNK_ROWID_MULTIPLIER}; +use crate::embedding::chunking::{split_into_chunks, CHUNK_MAX_BYTES, EXPECTED_DIMS}; use crate::embedding::ollama::OllamaClient; const BATCH_SIZE: usize = 32; const DB_PAGE_SIZE: usize = 500; -const EXPECTED_DIMS: usize = 768; /// Result of an embedding run. #[derive(Debug, Default)] @@ -26,6 +27,7 @@ pub struct EmbedResult { struct ChunkWork { doc_id: i64, chunk_index: usize, + total_chunks: usize, doc_hash: String, chunk_hash: String, text: String, @@ -41,7 +43,7 @@ pub async fn embed_documents( model_name: &str, progress_callback: Option>, ) -> Result { - let total = count_pending_documents(conn)? as usize; + let total = count_pending_documents(conn, model_name)? as usize; let mut result = EmbedResult::default(); let mut last_id: i64 = 0; let mut processed: usize = 0; @@ -53,13 +55,21 @@ pub async fn embed_documents( info!(total, "Starting embedding pipeline"); loop { - let pending = find_pending_documents(conn, DB_PAGE_SIZE, last_id)?; + let pending = find_pending_documents(conn, DB_PAGE_SIZE, last_id, model_name)?; if pending.is_empty() { break; } + // Wrap all DB writes for this page in a savepoint so that + // clear_document_embeddings + store_embedding are atomic. If the + // process crashes mid-page, the savepoint is never released and + // SQLite rolls back — preventing partial document states where old + // embeddings are cleared but new ones haven't been written yet. + conn.execute_batch("SAVEPOINT embed_page")?; + // Build chunk work items for this page let mut all_chunks: Vec = Vec::new(); + let mut page_normal_docs: usize = 0; for doc in &pending { // Always advance the cursor, even for skipped docs, to avoid re-fetching @@ -71,27 +81,65 @@ pub async fn embed_documents( continue; } - // Clear existing embeddings for this document before re-embedding - clear_document_embeddings(conn, doc.document_id)?; - let chunks = split_into_chunks(&doc.content_text); + let total_chunks = chunks.len(); + + // Overflow guard: skip documents that produce too many chunks. + // Must run BEFORE clear_document_embeddings so existing embeddings + // are preserved when we skip. + if total_chunks as i64 >= CHUNK_ROWID_MULTIPLIER { + warn!( + doc_id = doc.document_id, + chunk_count = total_chunks, + max = CHUNK_ROWID_MULTIPLIER, + "Document produces too many chunks, skipping to prevent rowid collision" + ); + // Record a sentinel error so the document is not re-detected as + // pending on subsequent runs (prevents infinite re-processing). + record_embedding_error( + conn, + doc.document_id, + 0, // sentinel chunk_index + &doc.content_hash, + "overflow-sentinel", + model_name, + &format!( + "Document produces {} chunks, exceeding max {}", + total_chunks, CHUNK_ROWID_MULTIPLIER + ), + )?; + result.skipped += 1; + processed += 1; + if let Some(ref cb) = progress_callback { + cb(processed, total); + } + continue; + } + + // Don't clear existing embeddings here — defer until the first + // successful chunk embedding so that if ALL chunks for a document + // fail, old embeddings survive instead of leaving zero data. + for (chunk_index, text) in chunks { all_chunks.push(ChunkWork { doc_id: doc.document_id, chunk_index, + total_chunks, doc_hash: doc.content_hash.clone(), chunk_hash: sha256_hash(&text), text, }); } - // Track progress per document (not per chunk) to match `total` - processed += 1; - if let Some(ref cb) = progress_callback { - cb(processed, total); - } + page_normal_docs += 1; + // Don't fire progress here — wait until embedding completes below. } + // Track documents whose old embeddings have been cleared. + // We defer clearing until the first successful chunk embedding so + // that if ALL chunks for a document fail, old embeddings survive. + let mut cleared_docs: HashSet = HashSet::new(); + // Process chunks in batches of BATCH_SIZE for batch in all_chunks.chunks(BATCH_SIZE) { let texts: Vec = batch.iter().map(|c| c.text.clone()).collect(); @@ -129,6 +177,12 @@ pub async fn embed_documents( continue; } + // Clear old embeddings on first successful chunk for this document + if !cleared_docs.contains(&chunk.doc_id) { + clear_document_embeddings(conn, chunk.doc_id)?; + cleared_docs.insert(chunk.doc_id); + } + store_embedding( conn, chunk.doc_id, @@ -137,28 +191,99 @@ pub async fn embed_documents( &chunk.chunk_hash, model_name, embedding, + chunk.total_chunks, )?; result.embedded += 1; } } Err(e) => { - warn!(error = %e, "Batch embedding failed"); - for chunk in batch { - record_embedding_error( - conn, - chunk.doc_id, - chunk.chunk_index, - &chunk.doc_hash, - &chunk.chunk_hash, - model_name, - &e.to_string(), - )?; - result.failed += 1; + // Batch failed — retry each chunk individually so one + // oversized chunk doesn't poison the entire batch. + let err_str = e.to_string(); + let err_lower = err_str.to_lowercase(); + // Ollama error messages vary across versions. Match broadly + // against known patterns to detect context-window overflow. + let is_context_error = err_lower.contains("context length") + || err_lower.contains("too long") + || err_lower.contains("maximum context") + || err_lower.contains("token limit") + || err_lower.contains("exceeds") + || (err_lower.contains("413") && err_lower.contains("http")); + + if is_context_error && batch.len() > 1 { + warn!("Batch failed with context length error, retrying chunks individually"); + for chunk in batch { + match client.embed_batch(vec![chunk.text.clone()]).await { + Ok(embeddings) if !embeddings.is_empty() + && embeddings[0].len() == EXPECTED_DIMS => + { + // Clear old embeddings on first successful chunk + if !cleared_docs.contains(&chunk.doc_id) { + clear_document_embeddings(conn, chunk.doc_id)?; + cleared_docs.insert(chunk.doc_id); + } + + store_embedding( + conn, + chunk.doc_id, + chunk.chunk_index, + &chunk.doc_hash, + &chunk.chunk_hash, + model_name, + &embeddings[0], + chunk.total_chunks, + )?; + result.embedded += 1; + } + _ => { + warn!( + doc_id = chunk.doc_id, + chunk_index = chunk.chunk_index, + chunk_bytes = chunk.text.len(), + "Chunk too large for model context window" + ); + record_embedding_error( + conn, + chunk.doc_id, + chunk.chunk_index, + &chunk.doc_hash, + &chunk.chunk_hash, + model_name, + "Chunk exceeds model context window", + )?; + result.failed += 1; + } + } + } + } else { + warn!(error = %e, "Batch embedding failed"); + for chunk in batch { + record_embedding_error( + conn, + chunk.doc_id, + chunk.chunk_index, + &chunk.doc_hash, + &chunk.chunk_hash, + model_name, + &e.to_string(), + )?; + result.failed += 1; + } } } } } + + // Fire progress for all normal documents after embedding completes. + // This ensures progress reflects actual embedding work, not just chunking. + processed += page_normal_docs; + if let Some(ref cb) = progress_callback { + cb(processed, total); + } + + // Commit all DB writes for this page atomically. + conn.execute_batch("RELEASE embed_page")?; } info!( @@ -197,6 +322,7 @@ fn store_embedding( chunk_hash: &str, model_name: &str, embedding: &[f32], + total_chunks: usize, ) -> Result<()> { let rowid = encode_rowid(doc_id, chunk_index as i64); @@ -207,13 +333,23 @@ fn store_embedding( rusqlite::params![rowid, embedding_bytes], )?; + // Only store chunk_count on the sentinel row (chunk_index=0) + let chunk_count: Option = if chunk_index == 0 { + Some(total_chunks as i64) + } else { + None + }; + let now = chrono::Utc::now().timestamp_millis(); conn.execute( "INSERT OR REPLACE INTO embedding_metadata (document_id, chunk_index, model, dims, document_hash, chunk_hash, - created_at, attempt_count, last_error) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, 1, NULL)", - rusqlite::params![doc_id, chunk_index as i64, model_name, EXPECTED_DIMS as i64, doc_hash, chunk_hash, now], + created_at, attempt_count, last_error, chunk_max_bytes, chunk_count) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, 1, NULL, ?8, ?9)", + rusqlite::params![ + doc_id, chunk_index as i64, model_name, EXPECTED_DIMS as i64, + doc_hash, chunk_hash, now, CHUNK_MAX_BYTES as i64, chunk_count + ], )?; Ok(()) @@ -233,13 +369,17 @@ fn record_embedding_error( conn.execute( "INSERT INTO embedding_metadata (document_id, chunk_index, model, dims, document_hash, chunk_hash, - created_at, attempt_count, last_error, last_attempt_at) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, 1, ?8, ?7) + created_at, attempt_count, last_error, last_attempt_at, chunk_max_bytes) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, 1, ?8, ?7, ?9) ON CONFLICT(document_id, chunk_index) DO UPDATE SET attempt_count = embedding_metadata.attempt_count + 1, last_error = ?8, - last_attempt_at = ?7", - rusqlite::params![doc_id, chunk_index as i64, model_name, EXPECTED_DIMS as i64, doc_hash, chunk_hash, now, error], + last_attempt_at = ?7, + chunk_max_bytes = ?9", + rusqlite::params![ + doc_id, chunk_index as i64, model_name, EXPECTED_DIMS as i64, + doc_hash, chunk_hash, now, error, CHUNK_MAX_BYTES as i64 + ], )?; Ok(()) } diff --git a/src/search/vector.rs b/src/search/vector.rs index 5939323..ee2c24f 100644 --- a/src/search/vector.rs +++ b/src/search/vector.rs @@ -12,10 +12,39 @@ pub struct VectorResult { pub distance: f64, } +/// Query the maximum number of chunks per document for adaptive dedup sizing. +fn max_chunks_per_document(conn: &Connection) -> i64 { + // Fast path: stored chunk_count on sentinel rows (post-migration 010) + let stored: Option = conn + .query_row( + "SELECT MAX(chunk_count) FROM embedding_metadata + WHERE chunk_index = 0 AND chunk_count IS NOT NULL", + [], + |row| row.get(0), + ) + .unwrap_or(None); + + if let Some(max) = stored { + return max; + } + + // Fallback for pre-migration data: count chunks per document + conn.query_row( + "SELECT COALESCE(MAX(cnt), 1) FROM ( + SELECT COUNT(*) as cnt FROM embedding_metadata + WHERE last_error IS NULL GROUP BY document_id + )", + [], + |row| row.get(0), + ) + .unwrap_or(1) +} + /// Search documents using sqlite-vec KNN query. /// -/// Over-fetches 3x limit to handle chunk deduplication (multiple chunks per -/// document produce multiple KNN results for the same document_id). +/// Over-fetches by an adaptive multiplier based on actual max chunks per document +/// to handle chunk deduplication (multiple chunks per document produce multiple +/// KNN results for the same document_id). /// Returns deduplicated results with best (lowest) distance per document. pub fn search_vector( conn: &Connection, @@ -32,7 +61,9 @@ pub fn search_vector( .flat_map(|f| f.to_le_bytes()) .collect(); - let k = limit * 3; // Over-fetch for dedup + let max_chunks = max_chunks_per_document(conn); + let multiplier = ((max_chunks as usize * 3 / 2) + 1).max(8); + let k = limit * multiplier; let mut stmt = conn.prepare( "SELECT rowid, distance @@ -69,7 +100,7 @@ pub fn search_vector( distance, }) .collect(); - results.sort_by(|a, b| a.distance.partial_cmp(&b.distance).unwrap_or(std::cmp::Ordering::Equal)); + results.sort_by(|a, b| a.distance.total_cmp(&b.distance)); results.truncate(limit); Ok(results) @@ -132,7 +163,7 @@ mod tests { .into_iter() .map(|(document_id, distance)| VectorResult { document_id, distance }) .collect(); - results.sort_by(|a, b| a.distance.partial_cmp(&b.distance).unwrap_or(std::cmp::Ordering::Equal)); + results.sort_by(|a, b| a.distance.total_cmp(&b.distance)); results.truncate(limit); results }