feat(embed): docs_embedded tracking, buffer reuse, retry hardening

Embedding pipeline improvements building on the concurrent batching
foundation:

- Track docs_embedded vs chunks_embedded separately. A document counts
  as embedded only when ALL its chunks succeed, giving accurate
  progress reporting. The sync command reads docs_embedded for its
  document count.

- Reuse a single Vec<u8> buffer (embed_buf) across all store_embedding
  calls instead of allocating per chunk. Eliminates ~3KB allocation per
  768-dim embedding.

- Detect and record errors when Ollama silently returns fewer
  embeddings than inputs (batch mismatch). Previously these dropped
  chunks were invisible.

- Improve retry error messages: distinguish "retry returned unexpected
  result" (wrong dims/count) from "retry request failed" (network
  error) instead of generic "chunk too large" message.

- Convert all hot-path SQL from conn.execute() to prepare_cached() for
  statement cache reuse (clear_document_embeddings, store_embedding,
  record_embedding_error).

- Record embedding_metadata errors for empty documents so they don't
  appear as perpetually pending on subsequent runs.

- Accept concurrency parameter (configurable via config.embedding.concurrency)
  instead of hardcoded EMBED_CONCURRENCY=2.

- Add schema version pre-flight check in embed command to fail fast
  with actionable error instead of cryptic SQL errors.

- Fix --retry-failed to use DELETE instead of UPDATE. UPDATE clears
  last_error but the row still matches config params in the LEFT JOIN,
  making the doc permanently invisible to find_pending_documents.
  DELETE removes the row entirely so the LEFT JOIN returns NULL.
  Regression test added (old_update_approach_leaves_doc_invisible).

- Add chunking forward-progress guard: after floor_char_boundary()
  rounds backward, ensure start advances by at least one full
  character to prevent infinite loops on multi-byte sequences
  (box-drawing chars, smart quotes). Test cases cover the exact
  patterns that caused production hangs on document 18526.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Taylor Eernisse
2026-02-06 22:42:08 -05:00
parent 39cb0cb087
commit c2036c64e9
5 changed files with 497 additions and 66 deletions

View File

@@ -83,3 +83,148 @@ pub fn count_pending_documents(conn: &Connection, model_name: &str) -> Result<i6
)?;
Ok(count)
}
#[cfg(test)]
mod tests {
use std::path::Path;
use super::*;
use crate::core::db::{create_connection, run_migrations};
use crate::embedding::pipeline::record_embedding_error;
const MODEL: &str = "nomic-embed-text";
fn setup_db() -> Connection {
let conn = create_connection(Path::new(":memory:")).unwrap();
run_migrations(&conn).unwrap();
conn
}
fn insert_test_project(conn: &Connection) -> i64 {
conn.execute(
"INSERT INTO projects (gitlab_project_id, path_with_namespace, web_url)
VALUES (1, 'group/test', 'https://gitlab.example.com/group/test')",
[],
)
.unwrap();
conn.last_insert_rowid()
}
fn insert_test_document(conn: &Connection, project_id: i64, content: &str) -> i64 {
conn.execute(
"INSERT INTO documents (source_type, source_id, project_id, content_text, content_hash)
VALUES ('issue', 1, ?1, ?2, 'hash123')",
rusqlite::params![project_id, content],
)
.unwrap();
conn.last_insert_rowid()
}
#[test]
fn retry_failed_delete_makes_doc_pending_again() {
let conn = setup_db();
let proj_id = insert_test_project(&conn);
let doc_id = insert_test_document(&conn, proj_id, "some text content");
// Doc starts as pending
let pending = find_pending_documents(&conn, 100, 0, MODEL).unwrap();
assert_eq!(pending.len(), 1, "Doc should be pending initially");
// Record an error — doc should no longer be pending
record_embedding_error(
&conn,
doc_id,
0,
"hash123",
"chunkhash",
MODEL,
"test error",
)
.unwrap();
let pending = find_pending_documents(&conn, 100, 0, MODEL).unwrap();
assert!(
pending.is_empty(),
"Doc with error metadata should not be pending"
);
// DELETE error rows (mimicking --retry-failed) — doc should become pending again
conn.execute_batch(
"DELETE FROM embeddings WHERE rowid / 1000 IN (
SELECT DISTINCT document_id FROM embedding_metadata
WHERE last_error IS NOT NULL
);
DELETE FROM embedding_metadata WHERE last_error IS NOT NULL;",
)
.unwrap();
let pending = find_pending_documents(&conn, 100, 0, MODEL).unwrap();
assert_eq!(pending.len(), 1, "Doc should be pending again after DELETE");
assert_eq!(pending[0].document_id, doc_id);
}
#[test]
fn empty_doc_with_error_not_pending() {
let conn = setup_db();
let proj_id = insert_test_project(&conn);
let doc_id = insert_test_document(&conn, proj_id, "");
// Empty doc starts as pending
let pending = find_pending_documents(&conn, 100, 0, MODEL).unwrap();
assert_eq!(pending.len(), 1, "Empty doc should be pending initially");
// Record an error for the empty doc
record_embedding_error(
&conn,
doc_id,
0,
"hash123",
"empty",
MODEL,
"Document has empty content",
)
.unwrap();
// Should no longer be pending
let pending = find_pending_documents(&conn, 100, 0, MODEL).unwrap();
assert!(
pending.is_empty(),
"Empty doc with error metadata should not be pending"
);
}
#[test]
fn old_update_approach_leaves_doc_invisible() {
// This test demonstrates WHY we use DELETE instead of UPDATE.
// UPDATE clears last_error but the row still matches config params,
// so the doc stays "not pending" — permanently invisible.
let conn = setup_db();
let proj_id = insert_test_project(&conn);
let doc_id = insert_test_document(&conn, proj_id, "some text content");
// Record an error
record_embedding_error(
&conn,
doc_id,
0,
"hash123",
"chunkhash",
MODEL,
"test error",
)
.unwrap();
// Old approach: UPDATE to clear error
conn.execute(
"UPDATE embedding_metadata SET last_error = NULL, attempt_count = 0
WHERE last_error IS NOT NULL",
[],
)
.unwrap();
// Doc is NOT pending — it's permanently invisible! This is the bug.
let pending = find_pending_documents(&conn, 100, 0, MODEL).unwrap();
assert!(
pending.is_empty(),
"UPDATE approach leaves doc invisible (this proves the bug)"
);
}
}

View File

@@ -41,9 +41,19 @@ pub fn split_into_chunks(content: &str) -> Vec<(usize, String)> {
split_at
}
.max(1);
let old_start = start;
start += advance;
// Ensure start lands on a char boundary after overlap subtraction
start = floor_char_boundary(content, start);
// Guarantee forward progress: multi-byte chars can cause
// floor_char_boundary to round back to old_start
if start <= old_start {
start = old_start
+ content[old_start..]
.chars()
.next()
.map_or(1, |c| c.len_utf8());
}
chunk_index += 1;
}
@@ -219,4 +229,105 @@ mod tests {
let chunks = split_into_chunks(&content);
assert!(chunks.len() >= 2);
}
#[test]
fn test_box_drawing_heavy_content() {
// Simulates a document with many box-drawing characters (3-byte UTF-8)
// like the ─ (U+2500) character found in markdown tables
let mut content = String::new();
// Normal text header
content.push_str("# Title\n\nSome description text.\n\n");
// Table header with box drawing
content.push('┌');
for _ in 0..200 {
content.push('─');
}
content.push('┬');
for _ in 0..200 {
content.push('─');
}
content.push_str("\n"); // clippy: push_str is correct here (multi-char)
// Table rows
for row in 0..50 {
content.push_str(&format!("│ row {:<194}│ data {:<193}\n", row, row));
content.push('├');
for _ in 0..200 {
content.push('─');
}
content.push('┼');
for _ in 0..200 {
content.push('─');
}
content.push_str("\n"); // push_str for multi-char
}
content.push('└');
for _ in 0..200 {
content.push('─');
}
content.push('┴');
for _ in 0..200 {
content.push('─');
}
content.push_str("\n"); // push_str for multi-char
eprintln!(
"Content size: {} bytes, {} chars",
content.len(),
content.chars().count()
);
let start = std::time::Instant::now();
let chunks = split_into_chunks(&content);
let elapsed = start.elapsed();
eprintln!(
"Chunking took {:?}, produced {} chunks",
elapsed,
chunks.len()
);
// Should complete in reasonable time
assert!(
elapsed.as_secs() < 5,
"Chunking took too long: {:?}",
elapsed
);
assert!(!chunks.is_empty());
}
#[test]
fn test_real_doc_18526_pattern() {
// Reproduce exact pattern: long lines of ─ (3 bytes each, no spaces)
// followed by newlines, creating a pattern where chunk windows
// land in spaceless regions
let mut content = String::new();
content.push_str("Header text with spaces\n\n");
// Create a very long line of ─ chars (2000+ bytes, exceeding CHUNK_MAX_BYTES)
for _ in 0..800 {
content.push('─'); // 3 bytes each = 2400 bytes
}
content.push('\n');
content.push_str("Some more text.\n\n");
// Another long run
for _ in 0..800 {
content.push('─');
}
content.push('\n');
content.push_str("End text.\n");
eprintln!("Content size: {} bytes", content.len());
let start = std::time::Instant::now();
let chunks = split_into_chunks(&content);
let elapsed = start.elapsed();
eprintln!(
"Chunking took {:?}, produced {} chunks",
elapsed,
chunks.len()
);
assert!(
elapsed.as_secs() < 2,
"Chunking took too long: {:?}",
elapsed
);
assert!(!chunks.is_empty());
}
}

View File

@@ -1,9 +1,9 @@
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use futures::future::join_all;
use rusqlite::Connection;
use sha2::{Digest, Sha256};
use tracing::{info, instrument, warn};
use tracing::{debug, info, instrument, warn};
use crate::core::error::Result;
use crate::core::shutdown::ShutdownSignal;
@@ -14,11 +14,12 @@ use crate::embedding::ollama::OllamaClient;
const BATCH_SIZE: usize = 32;
const DB_PAGE_SIZE: usize = 500;
const EMBED_CONCURRENCY: usize = 2;
pub const DEFAULT_EMBED_CONCURRENCY: usize = 4;
#[derive(Debug, Default)]
pub struct EmbedResult {
pub embedded: usize,
pub chunks_embedded: usize,
pub docs_embedded: usize,
pub failed: usize,
pub skipped: usize,
}
@@ -37,6 +38,7 @@ pub async fn embed_documents(
conn: &Connection,
client: &OllamaClient,
model_name: &str,
concurrency: usize,
progress_callback: Option<Box<dyn Fn(usize, usize)>>,
signal: &ShutdownSignal,
) -> Result<EmbedResult> {
@@ -57,16 +59,22 @@ pub async fn embed_documents(
break;
}
info!(last_id, "Querying pending documents...");
let pending = find_pending_documents(conn, DB_PAGE_SIZE, last_id, model_name)?;
if pending.is_empty() {
break;
}
info!(
count = pending.len(),
"Found pending documents, starting page"
);
conn.execute_batch("SAVEPOINT embed_page")?;
let page_result = embed_page(
conn,
client,
model_name,
concurrency,
&pending,
&mut result,
&mut last_id,
@@ -77,8 +85,20 @@ pub async fn embed_documents(
)
.await;
match page_result {
Ok(()) if signal.is_cancelled() => {
let _ = conn.execute_batch("ROLLBACK TO embed_page; RELEASE embed_page");
info!("Rolled back incomplete page to preserve data integrity");
}
Ok(()) => {
conn.execute_batch("RELEASE embed_page")?;
let _ = conn.execute_batch("PRAGMA wal_checkpoint(PASSIVE)");
info!(
chunks_embedded = result.chunks_embedded,
failed = result.failed,
skipped = result.skipped,
total,
"Page complete"
);
}
Err(e) => {
let _ = conn.execute_batch("ROLLBACK TO embed_page; RELEASE embed_page");
@@ -88,13 +108,13 @@ pub async fn embed_documents(
}
info!(
embedded = result.embedded,
chunks_embedded = result.chunks_embedded,
failed = result.failed,
skipped = result.skipped,
"Embedding pipeline complete"
);
tracing::Span::current().record("items_processed", result.embedded);
tracing::Span::current().record("items_processed", result.chunks_embedded);
tracing::Span::current().record("items_skipped", result.skipped);
tracing::Span::current().record("errors", result.failed);
@@ -106,6 +126,7 @@ async fn embed_page(
conn: &Connection,
client: &OllamaClient,
model_name: &str,
concurrency: usize,
pending: &[crate::embedding::change_detector::PendingDocument],
result: &mut EmbedResult,
last_id: &mut i64,
@@ -116,17 +137,50 @@ async fn embed_page(
) -> Result<()> {
let mut all_chunks: Vec<ChunkWork> = Vec::with_capacity(pending.len() * 3);
let mut page_normal_docs: usize = 0;
let mut chunks_needed: HashMap<i64, usize> = HashMap::with_capacity(pending.len());
let mut chunks_stored: HashMap<i64, usize> = HashMap::with_capacity(pending.len());
debug!(count = pending.len(), "Starting chunking loop");
for doc in pending {
*last_id = doc.document_id;
if doc.content_text.is_empty() {
record_embedding_error(
conn,
doc.document_id,
0,
&doc.content_hash,
"empty",
model_name,
"Document has empty content",
)?;
result.skipped += 1;
*processed += 1;
continue;
}
if page_normal_docs.is_multiple_of(50) {
debug!(
doc_id = doc.document_id,
doc_num = page_normal_docs,
content_bytes = doc.content_text.len(),
"Chunking document"
);
}
if page_normal_docs.is_multiple_of(100) {
info!(
doc_id = doc.document_id,
content_bytes = doc.content_text.len(),
docs_so_far = page_normal_docs,
"Chunking document"
);
}
let chunks = split_into_chunks(&doc.content_text);
debug!(
doc_id = doc.document_id,
chunk_count = chunks.len(),
"Chunked"
);
let total_chunks = chunks.len();
if total_chunks as i64 > CHUNK_ROWID_MULTIPLIER {
@@ -156,6 +210,8 @@ async fn embed_page(
continue;
}
chunks_needed.insert(doc.document_id, total_chunks);
for (chunk_index, text) in chunks {
all_chunks.push(ChunkWork {
doc_id: doc.document_id,
@@ -170,12 +226,30 @@ async fn embed_page(
page_normal_docs += 1;
}
debug!(total_chunks = all_chunks.len(), "Chunking loop done");
let mut cleared_docs: HashSet<i64> = HashSet::with_capacity(pending.len());
let mut embed_buf: Vec<u8> = Vec::with_capacity(EXPECTED_DIMS * 4);
// Split chunks into batches, then process batches in concurrent groups
let batches: Vec<&[ChunkWork]> = all_chunks.chunks(BATCH_SIZE).collect();
debug!(
batches = batches.len(),
concurrency, "Starting Ollama requests"
);
info!(
chunks = all_chunks.len(),
batches = batches.len(),
docs = page_normal_docs,
"Chunking complete, starting Ollama requests"
);
for concurrent_group in batches.chunks(EMBED_CONCURRENCY) {
info!(
batches = batches.len(),
concurrency, "About to start Ollama request loop"
);
for (group_idx, concurrent_group) in batches.chunks(concurrency).enumerate() {
debug!(group_idx, "Starting concurrent group");
if signal.is_cancelled() {
info!("Shutdown requested during embedding, stopping mid-page");
break;
@@ -193,6 +267,11 @@ async fn embed_page(
.map(|texts| client.embed_batch(texts))
.collect();
let api_results = join_all(futures).await;
debug!(
group_idx,
results = api_results.len(),
"Ollama group complete"
);
// Phase 3: Serial DB writes for each batch result
for (batch, api_result) in concurrent_group.iter().zip(api_results) {
@@ -243,8 +322,35 @@ async fn embed_page(
model_name,
embedding,
chunk.total_chunks,
&mut embed_buf,
)?;
result.embedded += 1;
result.chunks_embedded += 1;
*chunks_stored.entry(chunk.doc_id).or_insert(0) += 1;
}
// Record errors for chunks that Ollama silently dropped
if embeddings.len() < batch.len() {
warn!(
returned = embeddings.len(),
expected = batch.len(),
"Ollama returned fewer embeddings than inputs"
);
for chunk in &batch[embeddings.len()..] {
record_embedding_error(
conn,
chunk.doc_id,
chunk.chunk_index,
&chunk.doc_hash,
&chunk.chunk_hash,
model_name,
&format!(
"Batch mismatch: got {} of {} embeddings",
embeddings.len(),
batch.len()
),
)?;
result.failed += 1;
}
}
}
Err(e) => {
@@ -281,15 +387,24 @@ async fn embed_page(
model_name,
&embeddings[0],
chunk.total_chunks,
&mut embed_buf,
)?;
result.embedded += 1;
result.chunks_embedded += 1;
*chunks_stored.entry(chunk.doc_id).or_insert(0) += 1;
}
_ => {
Ok(embeddings) => {
let got_dims = embeddings.first().map_or(0, std::vec::Vec::len);
let reason = format!(
"Retry failed: got {} embeddings, first has {} dims (expected {})",
embeddings.len(),
got_dims,
EXPECTED_DIMS
);
warn!(
doc_id = chunk.doc_id,
chunk_index = chunk.chunk_index,
chunk_bytes = chunk.text.len(),
"Chunk too large for model context window"
%reason,
"Chunk retry returned unexpected result"
);
record_embedding_error(
conn,
@@ -298,7 +413,27 @@ async fn embed_page(
&chunk.doc_hash,
&chunk.chunk_hash,
model_name,
"Chunk exceeds model context window",
&reason,
)?;
result.failed += 1;
}
Err(retry_err) => {
let reason = format!("Retry failed: {}", retry_err);
warn!(
doc_id = chunk.doc_id,
chunk_index = chunk.chunk_index,
chunk_bytes = chunk.text.len(),
error = %retry_err,
"Chunk retry request failed"
);
record_embedding_error(
conn,
chunk.doc_id,
chunk.chunk_index,
&chunk.doc_hash,
&chunk.chunk_hash,
model_name,
&reason,
)?;
result.failed += 1;
}
@@ -324,6 +459,13 @@ async fn embed_page(
}
}
// Count docs where all chunks were successfully stored
for (doc_id, needed) in &chunks_needed {
if chunks_stored.get(doc_id).copied().unwrap_or(0) == *needed {
result.docs_embedded += 1;
}
}
*processed += page_normal_docs;
if let Some(cb) = progress_callback {
cb(*processed, total);
@@ -333,17 +475,13 @@ async fn embed_page(
}
fn clear_document_embeddings(conn: &Connection, document_id: i64) -> Result<()> {
conn.execute(
"DELETE FROM embedding_metadata WHERE document_id = ?1",
[document_id],
)?;
conn.prepare_cached("DELETE FROM embedding_metadata WHERE document_id = ?1")?
.execute([document_id])?;
let start_rowid = encode_rowid(document_id, 0);
let end_rowid = encode_rowid(document_id + 1, 0);
conn.execute(
"DELETE FROM embeddings WHERE rowid >= ?1 AND rowid < ?2",
rusqlite::params![start_rowid, end_rowid],
)?;
conn.prepare_cached("DELETE FROM embeddings WHERE rowid >= ?1 AND rowid < ?2")?
.execute(rusqlite::params![start_rowid, end_rowid])?;
Ok(())
}
@@ -358,18 +496,18 @@ fn store_embedding(
model_name: &str,
embedding: &[f32],
total_chunks: usize,
embed_buf: &mut Vec<u8>,
) -> Result<()> {
let rowid = encode_rowid(doc_id, chunk_index as i64);
let mut embedding_bytes = Vec::with_capacity(embedding.len() * 4);
embed_buf.clear();
embed_buf.reserve(embedding.len() * 4);
for f in embedding {
embedding_bytes.extend_from_slice(&f.to_le_bytes());
embed_buf.extend_from_slice(&f.to_le_bytes());
}
conn.execute(
"INSERT OR REPLACE INTO embeddings (rowid, embedding) VALUES (?1, ?2)",
rusqlite::params![rowid, embedding_bytes],
)?;
conn.prepare_cached("INSERT OR REPLACE INTO embeddings (rowid, embedding) VALUES (?1, ?2)")?
.execute(rusqlite::params![rowid, &embed_buf[..]])?;
let chunk_count: Option<i64> = if chunk_index == 0 {
Some(total_chunks as i64)
@@ -378,28 +516,28 @@ fn store_embedding(
};
let now = chrono::Utc::now().timestamp_millis();
conn.execute(
conn.prepare_cached(
"INSERT OR REPLACE INTO embedding_metadata
(document_id, chunk_index, model, dims, document_hash, chunk_hash,
created_at, attempt_count, last_error, chunk_max_bytes, chunk_count)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, 1, NULL, ?8, ?9)",
rusqlite::params![
doc_id,
chunk_index as i64,
model_name,
EXPECTED_DIMS as i64,
doc_hash,
chunk_hash,
now,
CHUNK_MAX_BYTES as i64,
chunk_count
],
)?;
)?
.execute(rusqlite::params![
doc_id,
chunk_index as i64,
model_name,
EXPECTED_DIMS as i64,
doc_hash,
chunk_hash,
now,
CHUNK_MAX_BYTES as i64,
chunk_count
])?;
Ok(())
}
fn record_embedding_error(
pub(crate) fn record_embedding_error(
conn: &Connection,
doc_id: i64,
chunk_index: usize,
@@ -409,7 +547,7 @@ fn record_embedding_error(
error: &str,
) -> Result<()> {
let now = chrono::Utc::now().timestamp_millis();
conn.execute(
conn.prepare_cached(
"INSERT INTO embedding_metadata
(document_id, chunk_index, model, dims, document_hash, chunk_hash,
created_at, attempt_count, last_error, last_attempt_at, chunk_max_bytes)
@@ -419,18 +557,18 @@ fn record_embedding_error(
last_error = ?8,
last_attempt_at = ?7,
chunk_max_bytes = ?9",
rusqlite::params![
doc_id,
chunk_index as i64,
model_name,
EXPECTED_DIMS as i64,
doc_hash,
chunk_hash,
now,
error,
CHUNK_MAX_BYTES as i64
],
)?;
)?
.execute(rusqlite::params![
doc_id,
chunk_index as i64,
model_name,
EXPECTED_DIMS as i64,
doc_hash,
chunk_hash,
now,
error,
CHUNK_MAX_BYTES as i64
])?;
Ok(())
}