fix(embedding): Harden pipeline against chunk overflow, config drift, and partial failures

Reduces CHUNK_MAX_BYTES from 32KB to 6KB and CHUNK_OVERLAP_CHARS from
500 to 200 to stay within nomic-embed-text's 8,192-token context
window. This commit addresses all downstream consequences of that
reduction:

- Config drift detection: find_pending_documents and
  count_pending_documents now take model_name and compare
  chunk_max_bytes, model, and dims against stored metadata. Documents
  embedded with stale config are automatically re-queued.

- Overflow guard: documents producing >= CHUNK_ROWID_MULTIPLIER chunks
  are skipped with a sentinel error recorded in embedding_metadata,
  preventing both rowid collision and infinite re-processing loops.

- Deferred clearing: old embeddings are no longer cleared before
  attempting new ones. clear_document_embeddings is deferred until the
  first successful chunk embedding, so if all chunks fail the document
  retains its previous embeddings rather than losing all data.

- Savepoints: each page of DB writes is wrapped in a SQLite savepoint
  so a crash mid-page rolls back atomically instead of leaving partial
  state (cleared embeddings with no replacements).

- Per-chunk retry on context overflow: when a batch fails with a
  context-length error, each chunk is retried individually so one
  oversized chunk doesn't poison the entire batch.

- Adaptive dedup in vector search: replaces the static 3x over-fetch
  multiplier with a dynamic one based on actual max chunks per document
  (using the new chunk_count column with a fallback COUNT query for
  pre-migration data). Also replaces partial_cmp with total_cmp for
  f64 distance sorting.

- Stores chunk_max_bytes and chunk_count (on sentinel rows) in
  embedding_metadata to support config drift detection and adaptive
  dedup without runtime queries.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Taylor Eernisse
2026-02-03 09:35:08 -05:00
parent 2a52594a60
commit 7d07f95d4c
5 changed files with 275 additions and 59 deletions

View File

@@ -1,18 +1,19 @@
//! Async embedding pipeline: chunk documents, embed via Ollama, store in sqlite-vec.
use std::collections::HashSet;
use rusqlite::Connection;
use sha2::{Digest, Sha256};
use tracing::{info, warn};
use crate::core::error::Result;
use crate::embedding::change_detector::{count_pending_documents, find_pending_documents};
use crate::embedding::chunk_ids::encode_rowid;
use crate::embedding::chunking::split_into_chunks;
use crate::embedding::chunk_ids::{encode_rowid, CHUNK_ROWID_MULTIPLIER};
use crate::embedding::chunking::{split_into_chunks, CHUNK_MAX_BYTES, EXPECTED_DIMS};
use crate::embedding::ollama::OllamaClient;
const BATCH_SIZE: usize = 32;
const DB_PAGE_SIZE: usize = 500;
const EXPECTED_DIMS: usize = 768;
/// Result of an embedding run.
#[derive(Debug, Default)]
@@ -26,6 +27,7 @@ pub struct EmbedResult {
struct ChunkWork {
doc_id: i64,
chunk_index: usize,
total_chunks: usize,
doc_hash: String,
chunk_hash: String,
text: String,
@@ -41,7 +43,7 @@ pub async fn embed_documents(
model_name: &str,
progress_callback: Option<Box<dyn Fn(usize, usize)>>,
) -> Result<EmbedResult> {
let total = count_pending_documents(conn)? as usize;
let total = count_pending_documents(conn, model_name)? as usize;
let mut result = EmbedResult::default();
let mut last_id: i64 = 0;
let mut processed: usize = 0;
@@ -53,13 +55,21 @@ pub async fn embed_documents(
info!(total, "Starting embedding pipeline");
loop {
let pending = find_pending_documents(conn, DB_PAGE_SIZE, last_id)?;
let pending = find_pending_documents(conn, DB_PAGE_SIZE, last_id, model_name)?;
if pending.is_empty() {
break;
}
// Wrap all DB writes for this page in a savepoint so that
// clear_document_embeddings + store_embedding are atomic. If the
// process crashes mid-page, the savepoint is never released and
// SQLite rolls back — preventing partial document states where old
// embeddings are cleared but new ones haven't been written yet.
conn.execute_batch("SAVEPOINT embed_page")?;
// Build chunk work items for this page
let mut all_chunks: Vec<ChunkWork> = Vec::new();
let mut page_normal_docs: usize = 0;
for doc in &pending {
// Always advance the cursor, even for skipped docs, to avoid re-fetching
@@ -71,27 +81,65 @@ pub async fn embed_documents(
continue;
}
// Clear existing embeddings for this document before re-embedding
clear_document_embeddings(conn, doc.document_id)?;
let chunks = split_into_chunks(&doc.content_text);
let total_chunks = chunks.len();
// Overflow guard: skip documents that produce too many chunks.
// Must run BEFORE clear_document_embeddings so existing embeddings
// are preserved when we skip.
if total_chunks as i64 >= CHUNK_ROWID_MULTIPLIER {
warn!(
doc_id = doc.document_id,
chunk_count = total_chunks,
max = CHUNK_ROWID_MULTIPLIER,
"Document produces too many chunks, skipping to prevent rowid collision"
);
// Record a sentinel error so the document is not re-detected as
// pending on subsequent runs (prevents infinite re-processing).
record_embedding_error(
conn,
doc.document_id,
0, // sentinel chunk_index
&doc.content_hash,
"overflow-sentinel",
model_name,
&format!(
"Document produces {} chunks, exceeding max {}",
total_chunks, CHUNK_ROWID_MULTIPLIER
),
)?;
result.skipped += 1;
processed += 1;
if let Some(ref cb) = progress_callback {
cb(processed, total);
}
continue;
}
// Don't clear existing embeddings here — defer until the first
// successful chunk embedding so that if ALL chunks for a document
// fail, old embeddings survive instead of leaving zero data.
for (chunk_index, text) in chunks {
all_chunks.push(ChunkWork {
doc_id: doc.document_id,
chunk_index,
total_chunks,
doc_hash: doc.content_hash.clone(),
chunk_hash: sha256_hash(&text),
text,
});
}
// Track progress per document (not per chunk) to match `total`
processed += 1;
if let Some(ref cb) = progress_callback {
cb(processed, total);
}
page_normal_docs += 1;
// Don't fire progress here — wait until embedding completes below.
}
// Track documents whose old embeddings have been cleared.
// We defer clearing until the first successful chunk embedding so
// that if ALL chunks for a document fail, old embeddings survive.
let mut cleared_docs: HashSet<i64> = HashSet::new();
// Process chunks in batches of BATCH_SIZE
for batch in all_chunks.chunks(BATCH_SIZE) {
let texts: Vec<String> = batch.iter().map(|c| c.text.clone()).collect();
@@ -129,6 +177,12 @@ pub async fn embed_documents(
continue;
}
// Clear old embeddings on first successful chunk for this document
if !cleared_docs.contains(&chunk.doc_id) {
clear_document_embeddings(conn, chunk.doc_id)?;
cleared_docs.insert(chunk.doc_id);
}
store_embedding(
conn,
chunk.doc_id,
@@ -137,28 +191,99 @@ pub async fn embed_documents(
&chunk.chunk_hash,
model_name,
embedding,
chunk.total_chunks,
)?;
result.embedded += 1;
}
}
Err(e) => {
warn!(error = %e, "Batch embedding failed");
for chunk in batch {
record_embedding_error(
conn,
chunk.doc_id,
chunk.chunk_index,
&chunk.doc_hash,
&chunk.chunk_hash,
model_name,
&e.to_string(),
)?;
result.failed += 1;
// Batch failed — retry each chunk individually so one
// oversized chunk doesn't poison the entire batch.
let err_str = e.to_string();
let err_lower = err_str.to_lowercase();
// Ollama error messages vary across versions. Match broadly
// against known patterns to detect context-window overflow.
let is_context_error = err_lower.contains("context length")
|| err_lower.contains("too long")
|| err_lower.contains("maximum context")
|| err_lower.contains("token limit")
|| err_lower.contains("exceeds")
|| (err_lower.contains("413") && err_lower.contains("http"));
if is_context_error && batch.len() > 1 {
warn!("Batch failed with context length error, retrying chunks individually");
for chunk in batch {
match client.embed_batch(vec![chunk.text.clone()]).await {
Ok(embeddings) if !embeddings.is_empty()
&& embeddings[0].len() == EXPECTED_DIMS =>
{
// Clear old embeddings on first successful chunk
if !cleared_docs.contains(&chunk.doc_id) {
clear_document_embeddings(conn, chunk.doc_id)?;
cleared_docs.insert(chunk.doc_id);
}
store_embedding(
conn,
chunk.doc_id,
chunk.chunk_index,
&chunk.doc_hash,
&chunk.chunk_hash,
model_name,
&embeddings[0],
chunk.total_chunks,
)?;
result.embedded += 1;
}
_ => {
warn!(
doc_id = chunk.doc_id,
chunk_index = chunk.chunk_index,
chunk_bytes = chunk.text.len(),
"Chunk too large for model context window"
);
record_embedding_error(
conn,
chunk.doc_id,
chunk.chunk_index,
&chunk.doc_hash,
&chunk.chunk_hash,
model_name,
"Chunk exceeds model context window",
)?;
result.failed += 1;
}
}
}
} else {
warn!(error = %e, "Batch embedding failed");
for chunk in batch {
record_embedding_error(
conn,
chunk.doc_id,
chunk.chunk_index,
&chunk.doc_hash,
&chunk.chunk_hash,
model_name,
&e.to_string(),
)?;
result.failed += 1;
}
}
}
}
}
// Fire progress for all normal documents after embedding completes.
// This ensures progress reflects actual embedding work, not just chunking.
processed += page_normal_docs;
if let Some(ref cb) = progress_callback {
cb(processed, total);
}
// Commit all DB writes for this page atomically.
conn.execute_batch("RELEASE embed_page")?;
}
info!(
@@ -197,6 +322,7 @@ fn store_embedding(
chunk_hash: &str,
model_name: &str,
embedding: &[f32],
total_chunks: usize,
) -> Result<()> {
let rowid = encode_rowid(doc_id, chunk_index as i64);
@@ -207,13 +333,23 @@ fn store_embedding(
rusqlite::params![rowid, embedding_bytes],
)?;
// Only store chunk_count on the sentinel row (chunk_index=0)
let chunk_count: Option<i64> = if chunk_index == 0 {
Some(total_chunks as i64)
} else {
None
};
let now = chrono::Utc::now().timestamp_millis();
conn.execute(
"INSERT OR REPLACE INTO embedding_metadata
(document_id, chunk_index, model, dims, document_hash, chunk_hash,
created_at, attempt_count, last_error)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, 1, NULL)",
rusqlite::params![doc_id, chunk_index as i64, model_name, EXPECTED_DIMS as i64, doc_hash, chunk_hash, now],
created_at, attempt_count, last_error, chunk_max_bytes, chunk_count)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, 1, NULL, ?8, ?9)",
rusqlite::params![
doc_id, chunk_index as i64, model_name, EXPECTED_DIMS as i64,
doc_hash, chunk_hash, now, CHUNK_MAX_BYTES as i64, chunk_count
],
)?;
Ok(())
@@ -233,13 +369,17 @@ fn record_embedding_error(
conn.execute(
"INSERT INTO embedding_metadata
(document_id, chunk_index, model, dims, document_hash, chunk_hash,
created_at, attempt_count, last_error, last_attempt_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, 1, ?8, ?7)
created_at, attempt_count, last_error, last_attempt_at, chunk_max_bytes)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, 1, ?8, ?7, ?9)
ON CONFLICT(document_id, chunk_index) DO UPDATE SET
attempt_count = embedding_metadata.attempt_count + 1,
last_error = ?8,
last_attempt_at = ?7",
rusqlite::params![doc_id, chunk_index as i64, model_name, EXPECTED_DIMS as i64, doc_hash, chunk_hash, now, error],
last_attempt_at = ?7,
chunk_max_bytes = ?9",
rusqlite::params![
doc_id, chunk_index as i64, model_name, EXPECTED_DIMS as i64,
doc_hash, chunk_hash, now, error, CHUNK_MAX_BYTES as i64
],
)?;
Ok(())
}