diff --git a/src/cli/commands/embed.rs b/src/cli/commands/embed.rs index 9fc0f3c..fd7c241 100644 --- a/src/cli/commands/embed.rs +++ b/src/cli/commands/embed.rs @@ -2,16 +2,17 @@ use console::style; use serde::Serialize; use crate::Config; -use crate::core::db::create_connection; -use crate::core::error::Result; +use crate::core::db::{LATEST_SCHEMA_VERSION, create_connection, get_schema_version}; +use crate::core::error::{LoreError, Result}; use crate::core::paths::get_db_path; use crate::core::shutdown::ShutdownSignal; use crate::embedding::ollama::{OllamaClient, OllamaConfig}; -use crate::embedding::pipeline::embed_documents; +use crate::embedding::pipeline::{DEFAULT_EMBED_CONCURRENCY, embed_documents}; #[derive(Debug, Default, Serialize)] pub struct EmbedCommandResult { - pub embedded: usize, + pub docs_embedded: usize, + pub chunks_embedded: usize, pub failed: usize, pub skipped: usize, } @@ -26,6 +27,18 @@ pub async fn run_embed( let db_path = get_db_path(config.storage.db_path.as_deref()); let conn = create_connection(&db_path)?; + let schema_version = get_schema_version(&conn); + if schema_version < LATEST_SCHEMA_VERSION { + return Err(LoreError::MigrationFailed { + version: schema_version, + message: format!( + "Database is at schema version {schema_version} but {LATEST_SCHEMA_VERSION} is required. \ + Run 'lore migrate' first." + ), + source: None, + }); + } + let ollama_config = OllamaConfig { base_url: config.embedding.base_url.clone(), model: config.embedding.model.clone(), @@ -43,18 +56,39 @@ pub async fn run_embed( COMMIT;", )?; } else if retry_failed { - conn.execute( - "UPDATE embedding_metadata SET last_error = NULL, attempt_count = 0 - WHERE last_error IS NOT NULL", - [], + // DELETE (not UPDATE) so the LEFT JOIN in find_pending_documents returns NULL, + // making the doc appear pending again. UPDATE would leave a matching row that + // still satisfies the config-param check, making the doc permanently invisible. + conn.execute_batch( + "BEGIN; + DELETE FROM embeddings WHERE rowid / 1000 IN ( + SELECT DISTINCT document_id FROM embedding_metadata + WHERE last_error IS NOT NULL + ); + DELETE FROM embedding_metadata WHERE last_error IS NOT NULL; + COMMIT;", )?; } let model_name = &config.embedding.model; - let result = embed_documents(&conn, &client, model_name, progress_callback, signal).await?; + let concurrency = if config.embedding.concurrency > 0 { + config.embedding.concurrency as usize + } else { + DEFAULT_EMBED_CONCURRENCY + }; + let result = embed_documents( + &conn, + &client, + model_name, + concurrency, + progress_callback, + signal, + ) + .await?; Ok(EmbedCommandResult { - embedded: result.embedded, + docs_embedded: result.docs_embedded, + chunks_embedded: result.chunks_embedded, failed: result.failed, skipped: result.skipped, }) @@ -62,7 +96,10 @@ pub async fn run_embed( pub fn print_embed(result: &EmbedCommandResult) { println!("{} Embedding complete", style("done").green().bold(),); - println!(" Embedded: {}", result.embedded); + println!( + " Embedded: {} documents ({} chunks)", + result.docs_embedded, result.chunks_embedded + ); if result.failed > 0 { println!(" Failed: {}", style(result.failed).red()); } diff --git a/src/cli/commands/sync.rs b/src/cli/commands/sync.rs index 598ec74..391fcd9 100644 --- a/src/cli/commands/sync.rs +++ b/src/cli/commands/sync.rs @@ -241,7 +241,7 @@ pub async fn run_sync( }); match run_embed(config, options.full, false, Some(embed_cb), signal).await { Ok(embed_result) => { - result.documents_embedded = embed_result.embedded; + result.documents_embedded = embed_result.docs_embedded; embed_bar.finish_and_clear(); spinner.finish_and_clear(); } diff --git a/src/embedding/change_detector.rs b/src/embedding/change_detector.rs index 4541c82..1cccc6c 100644 --- a/src/embedding/change_detector.rs +++ b/src/embedding/change_detector.rs @@ -83,3 +83,148 @@ pub fn count_pending_documents(conn: &Connection, model_name: &str) -> Result Connection { + let conn = create_connection(Path::new(":memory:")).unwrap(); + run_migrations(&conn).unwrap(); + conn + } + + fn insert_test_project(conn: &Connection) -> i64 { + conn.execute( + "INSERT INTO projects (gitlab_project_id, path_with_namespace, web_url) + VALUES (1, 'group/test', 'https://gitlab.example.com/group/test')", + [], + ) + .unwrap(); + conn.last_insert_rowid() + } + + fn insert_test_document(conn: &Connection, project_id: i64, content: &str) -> i64 { + conn.execute( + "INSERT INTO documents (source_type, source_id, project_id, content_text, content_hash) + VALUES ('issue', 1, ?1, ?2, 'hash123')", + rusqlite::params![project_id, content], + ) + .unwrap(); + conn.last_insert_rowid() + } + + #[test] + fn retry_failed_delete_makes_doc_pending_again() { + let conn = setup_db(); + let proj_id = insert_test_project(&conn); + let doc_id = insert_test_document(&conn, proj_id, "some text content"); + + // Doc starts as pending + let pending = find_pending_documents(&conn, 100, 0, MODEL).unwrap(); + assert_eq!(pending.len(), 1, "Doc should be pending initially"); + + // Record an error — doc should no longer be pending + record_embedding_error( + &conn, + doc_id, + 0, + "hash123", + "chunkhash", + MODEL, + "test error", + ) + .unwrap(); + let pending = find_pending_documents(&conn, 100, 0, MODEL).unwrap(); + assert!( + pending.is_empty(), + "Doc with error metadata should not be pending" + ); + + // DELETE error rows (mimicking --retry-failed) — doc should become pending again + conn.execute_batch( + "DELETE FROM embeddings WHERE rowid / 1000 IN ( + SELECT DISTINCT document_id FROM embedding_metadata + WHERE last_error IS NOT NULL + ); + DELETE FROM embedding_metadata WHERE last_error IS NOT NULL;", + ) + .unwrap(); + let pending = find_pending_documents(&conn, 100, 0, MODEL).unwrap(); + assert_eq!(pending.len(), 1, "Doc should be pending again after DELETE"); + assert_eq!(pending[0].document_id, doc_id); + } + + #[test] + fn empty_doc_with_error_not_pending() { + let conn = setup_db(); + let proj_id = insert_test_project(&conn); + let doc_id = insert_test_document(&conn, proj_id, ""); + + // Empty doc starts as pending + let pending = find_pending_documents(&conn, 100, 0, MODEL).unwrap(); + assert_eq!(pending.len(), 1, "Empty doc should be pending initially"); + + // Record an error for the empty doc + record_embedding_error( + &conn, + doc_id, + 0, + "hash123", + "empty", + MODEL, + "Document has empty content", + ) + .unwrap(); + + // Should no longer be pending + let pending = find_pending_documents(&conn, 100, 0, MODEL).unwrap(); + assert!( + pending.is_empty(), + "Empty doc with error metadata should not be pending" + ); + } + + #[test] + fn old_update_approach_leaves_doc_invisible() { + // This test demonstrates WHY we use DELETE instead of UPDATE. + // UPDATE clears last_error but the row still matches config params, + // so the doc stays "not pending" — permanently invisible. + let conn = setup_db(); + let proj_id = insert_test_project(&conn); + let doc_id = insert_test_document(&conn, proj_id, "some text content"); + + // Record an error + record_embedding_error( + &conn, + doc_id, + 0, + "hash123", + "chunkhash", + MODEL, + "test error", + ) + .unwrap(); + + // Old approach: UPDATE to clear error + conn.execute( + "UPDATE embedding_metadata SET last_error = NULL, attempt_count = 0 + WHERE last_error IS NOT NULL", + [], + ) + .unwrap(); + + // Doc is NOT pending — it's permanently invisible! This is the bug. + let pending = find_pending_documents(&conn, 100, 0, MODEL).unwrap(); + assert!( + pending.is_empty(), + "UPDATE approach leaves doc invisible (this proves the bug)" + ); + } +} diff --git a/src/embedding/chunking.rs b/src/embedding/chunking.rs index b4aecae..4dc4562 100644 --- a/src/embedding/chunking.rs +++ b/src/embedding/chunking.rs @@ -41,9 +41,19 @@ pub fn split_into_chunks(content: &str) -> Vec<(usize, String)> { split_at } .max(1); + let old_start = start; start += advance; // Ensure start lands on a char boundary after overlap subtraction start = floor_char_boundary(content, start); + // Guarantee forward progress: multi-byte chars can cause + // floor_char_boundary to round back to old_start + if start <= old_start { + start = old_start + + content[old_start..] + .chars() + .next() + .map_or(1, |c| c.len_utf8()); + } chunk_index += 1; } @@ -219,4 +229,105 @@ mod tests { let chunks = split_into_chunks(&content); assert!(chunks.len() >= 2); } + + #[test] + fn test_box_drawing_heavy_content() { + // Simulates a document with many box-drawing characters (3-byte UTF-8) + // like the ─ (U+2500) character found in markdown tables + let mut content = String::new(); + // Normal text header + content.push_str("# Title\n\nSome description text.\n\n"); + // Table header with box drawing + content.push('┌'); + for _ in 0..200 { + content.push('─'); + } + content.push('┬'); + for _ in 0..200 { + content.push('─'); + } + content.push_str("┐\n"); // clippy: push_str is correct here (multi-char) + // Table rows + for row in 0..50 { + content.push_str(&format!("│ row {:<194}│ data {:<193}│\n", row, row)); + content.push('├'); + for _ in 0..200 { + content.push('─'); + } + content.push('┼'); + for _ in 0..200 { + content.push('─'); + } + content.push_str("┤\n"); // push_str for multi-char + } + content.push('└'); + for _ in 0..200 { + content.push('─'); + } + content.push('┴'); + for _ in 0..200 { + content.push('─'); + } + content.push_str("┘\n"); // push_str for multi-char + + eprintln!( + "Content size: {} bytes, {} chars", + content.len(), + content.chars().count() + ); + let start = std::time::Instant::now(); + let chunks = split_into_chunks(&content); + let elapsed = start.elapsed(); + eprintln!( + "Chunking took {:?}, produced {} chunks", + elapsed, + chunks.len() + ); + + // Should complete in reasonable time + assert!( + elapsed.as_secs() < 5, + "Chunking took too long: {:?}", + elapsed + ); + assert!(!chunks.is_empty()); + } + + #[test] + fn test_real_doc_18526_pattern() { + // Reproduce exact pattern: long lines of ─ (3 bytes each, no spaces) + // followed by newlines, creating a pattern where chunk windows + // land in spaceless regions + let mut content = String::new(); + content.push_str("Header text with spaces\n\n"); + // Create a very long line of ─ chars (2000+ bytes, exceeding CHUNK_MAX_BYTES) + for _ in 0..800 { + content.push('─'); // 3 bytes each = 2400 bytes + } + content.push('\n'); + content.push_str("Some more text.\n\n"); + // Another long run + for _ in 0..800 { + content.push('─'); + } + content.push('\n'); + content.push_str("End text.\n"); + + eprintln!("Content size: {} bytes", content.len()); + let start = std::time::Instant::now(); + let chunks = split_into_chunks(&content); + let elapsed = start.elapsed(); + eprintln!( + "Chunking took {:?}, produced {} chunks", + elapsed, + chunks.len() + ); + + assert!( + elapsed.as_secs() < 2, + "Chunking took too long: {:?}", + elapsed + ); + assert!(!chunks.is_empty()); + } } diff --git a/src/embedding/pipeline.rs b/src/embedding/pipeline.rs index 1281044..9fd8927 100644 --- a/src/embedding/pipeline.rs +++ b/src/embedding/pipeline.rs @@ -1,9 +1,9 @@ -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use futures::future::join_all; use rusqlite::Connection; use sha2::{Digest, Sha256}; -use tracing::{info, instrument, warn}; +use tracing::{debug, info, instrument, warn}; use crate::core::error::Result; use crate::core::shutdown::ShutdownSignal; @@ -14,11 +14,12 @@ use crate::embedding::ollama::OllamaClient; const BATCH_SIZE: usize = 32; const DB_PAGE_SIZE: usize = 500; -const EMBED_CONCURRENCY: usize = 2; +pub const DEFAULT_EMBED_CONCURRENCY: usize = 4; #[derive(Debug, Default)] pub struct EmbedResult { - pub embedded: usize, + pub chunks_embedded: usize, + pub docs_embedded: usize, pub failed: usize, pub skipped: usize, } @@ -37,6 +38,7 @@ pub async fn embed_documents( conn: &Connection, client: &OllamaClient, model_name: &str, + concurrency: usize, progress_callback: Option>, signal: &ShutdownSignal, ) -> Result { @@ -57,16 +59,22 @@ pub async fn embed_documents( break; } + info!(last_id, "Querying pending documents..."); let pending = find_pending_documents(conn, DB_PAGE_SIZE, last_id, model_name)?; if pending.is_empty() { break; } + info!( + count = pending.len(), + "Found pending documents, starting page" + ); conn.execute_batch("SAVEPOINT embed_page")?; let page_result = embed_page( conn, client, model_name, + concurrency, &pending, &mut result, &mut last_id, @@ -77,8 +85,20 @@ pub async fn embed_documents( ) .await; match page_result { + Ok(()) if signal.is_cancelled() => { + let _ = conn.execute_batch("ROLLBACK TO embed_page; RELEASE embed_page"); + info!("Rolled back incomplete page to preserve data integrity"); + } Ok(()) => { conn.execute_batch("RELEASE embed_page")?; + let _ = conn.execute_batch("PRAGMA wal_checkpoint(PASSIVE)"); + info!( + chunks_embedded = result.chunks_embedded, + failed = result.failed, + skipped = result.skipped, + total, + "Page complete" + ); } Err(e) => { let _ = conn.execute_batch("ROLLBACK TO embed_page; RELEASE embed_page"); @@ -88,13 +108,13 @@ pub async fn embed_documents( } info!( - embedded = result.embedded, + chunks_embedded = result.chunks_embedded, failed = result.failed, skipped = result.skipped, "Embedding pipeline complete" ); - tracing::Span::current().record("items_processed", result.embedded); + tracing::Span::current().record("items_processed", result.chunks_embedded); tracing::Span::current().record("items_skipped", result.skipped); tracing::Span::current().record("errors", result.failed); @@ -106,6 +126,7 @@ async fn embed_page( conn: &Connection, client: &OllamaClient, model_name: &str, + concurrency: usize, pending: &[crate::embedding::change_detector::PendingDocument], result: &mut EmbedResult, last_id: &mut i64, @@ -116,17 +137,50 @@ async fn embed_page( ) -> Result<()> { let mut all_chunks: Vec = Vec::with_capacity(pending.len() * 3); let mut page_normal_docs: usize = 0; + let mut chunks_needed: HashMap = HashMap::with_capacity(pending.len()); + let mut chunks_stored: HashMap = HashMap::with_capacity(pending.len()); + debug!(count = pending.len(), "Starting chunking loop"); for doc in pending { *last_id = doc.document_id; if doc.content_text.is_empty() { + record_embedding_error( + conn, + doc.document_id, + 0, + &doc.content_hash, + "empty", + model_name, + "Document has empty content", + )?; result.skipped += 1; *processed += 1; continue; } + if page_normal_docs.is_multiple_of(50) { + debug!( + doc_id = doc.document_id, + doc_num = page_normal_docs, + content_bytes = doc.content_text.len(), + "Chunking document" + ); + } + if page_normal_docs.is_multiple_of(100) { + info!( + doc_id = doc.document_id, + content_bytes = doc.content_text.len(), + docs_so_far = page_normal_docs, + "Chunking document" + ); + } let chunks = split_into_chunks(&doc.content_text); + debug!( + doc_id = doc.document_id, + chunk_count = chunks.len(), + "Chunked" + ); let total_chunks = chunks.len(); if total_chunks as i64 > CHUNK_ROWID_MULTIPLIER { @@ -156,6 +210,8 @@ async fn embed_page( continue; } + chunks_needed.insert(doc.document_id, total_chunks); + for (chunk_index, text) in chunks { all_chunks.push(ChunkWork { doc_id: doc.document_id, @@ -170,12 +226,30 @@ async fn embed_page( page_normal_docs += 1; } + debug!(total_chunks = all_chunks.len(), "Chunking loop done"); + let mut cleared_docs: HashSet = HashSet::with_capacity(pending.len()); + let mut embed_buf: Vec = Vec::with_capacity(EXPECTED_DIMS * 4); // Split chunks into batches, then process batches in concurrent groups let batches: Vec<&[ChunkWork]> = all_chunks.chunks(BATCH_SIZE).collect(); + debug!( + batches = batches.len(), + concurrency, "Starting Ollama requests" + ); + info!( + chunks = all_chunks.len(), + batches = batches.len(), + docs = page_normal_docs, + "Chunking complete, starting Ollama requests" + ); - for concurrent_group in batches.chunks(EMBED_CONCURRENCY) { + info!( + batches = batches.len(), + concurrency, "About to start Ollama request loop" + ); + for (group_idx, concurrent_group) in batches.chunks(concurrency).enumerate() { + debug!(group_idx, "Starting concurrent group"); if signal.is_cancelled() { info!("Shutdown requested during embedding, stopping mid-page"); break; @@ -193,6 +267,11 @@ async fn embed_page( .map(|texts| client.embed_batch(texts)) .collect(); let api_results = join_all(futures).await; + debug!( + group_idx, + results = api_results.len(), + "Ollama group complete" + ); // Phase 3: Serial DB writes for each batch result for (batch, api_result) in concurrent_group.iter().zip(api_results) { @@ -243,8 +322,35 @@ async fn embed_page( model_name, embedding, chunk.total_chunks, + &mut embed_buf, )?; - result.embedded += 1; + result.chunks_embedded += 1; + *chunks_stored.entry(chunk.doc_id).or_insert(0) += 1; + } + + // Record errors for chunks that Ollama silently dropped + if embeddings.len() < batch.len() { + warn!( + returned = embeddings.len(), + expected = batch.len(), + "Ollama returned fewer embeddings than inputs" + ); + for chunk in &batch[embeddings.len()..] { + record_embedding_error( + conn, + chunk.doc_id, + chunk.chunk_index, + &chunk.doc_hash, + &chunk.chunk_hash, + model_name, + &format!( + "Batch mismatch: got {} of {} embeddings", + embeddings.len(), + batch.len() + ), + )?; + result.failed += 1; + } } } Err(e) => { @@ -281,15 +387,24 @@ async fn embed_page( model_name, &embeddings[0], chunk.total_chunks, + &mut embed_buf, )?; - result.embedded += 1; + result.chunks_embedded += 1; + *chunks_stored.entry(chunk.doc_id).or_insert(0) += 1; } - _ => { + Ok(embeddings) => { + let got_dims = embeddings.first().map_or(0, std::vec::Vec::len); + let reason = format!( + "Retry failed: got {} embeddings, first has {} dims (expected {})", + embeddings.len(), + got_dims, + EXPECTED_DIMS + ); warn!( doc_id = chunk.doc_id, chunk_index = chunk.chunk_index, - chunk_bytes = chunk.text.len(), - "Chunk too large for model context window" + %reason, + "Chunk retry returned unexpected result" ); record_embedding_error( conn, @@ -298,7 +413,27 @@ async fn embed_page( &chunk.doc_hash, &chunk.chunk_hash, model_name, - "Chunk exceeds model context window", + &reason, + )?; + result.failed += 1; + } + Err(retry_err) => { + let reason = format!("Retry failed: {}", retry_err); + warn!( + doc_id = chunk.doc_id, + chunk_index = chunk.chunk_index, + chunk_bytes = chunk.text.len(), + error = %retry_err, + "Chunk retry request failed" + ); + record_embedding_error( + conn, + chunk.doc_id, + chunk.chunk_index, + &chunk.doc_hash, + &chunk.chunk_hash, + model_name, + &reason, )?; result.failed += 1; } @@ -324,6 +459,13 @@ async fn embed_page( } } + // Count docs where all chunks were successfully stored + for (doc_id, needed) in &chunks_needed { + if chunks_stored.get(doc_id).copied().unwrap_or(0) == *needed { + result.docs_embedded += 1; + } + } + *processed += page_normal_docs; if let Some(cb) = progress_callback { cb(*processed, total); @@ -333,17 +475,13 @@ async fn embed_page( } fn clear_document_embeddings(conn: &Connection, document_id: i64) -> Result<()> { - conn.execute( - "DELETE FROM embedding_metadata WHERE document_id = ?1", - [document_id], - )?; + conn.prepare_cached("DELETE FROM embedding_metadata WHERE document_id = ?1")? + .execute([document_id])?; let start_rowid = encode_rowid(document_id, 0); let end_rowid = encode_rowid(document_id + 1, 0); - conn.execute( - "DELETE FROM embeddings WHERE rowid >= ?1 AND rowid < ?2", - rusqlite::params![start_rowid, end_rowid], - )?; + conn.prepare_cached("DELETE FROM embeddings WHERE rowid >= ?1 AND rowid < ?2")? + .execute(rusqlite::params![start_rowid, end_rowid])?; Ok(()) } @@ -358,18 +496,18 @@ fn store_embedding( model_name: &str, embedding: &[f32], total_chunks: usize, + embed_buf: &mut Vec, ) -> Result<()> { let rowid = encode_rowid(doc_id, chunk_index as i64); - let mut embedding_bytes = Vec::with_capacity(embedding.len() * 4); + embed_buf.clear(); + embed_buf.reserve(embedding.len() * 4); for f in embedding { - embedding_bytes.extend_from_slice(&f.to_le_bytes()); + embed_buf.extend_from_slice(&f.to_le_bytes()); } - conn.execute( - "INSERT OR REPLACE INTO embeddings (rowid, embedding) VALUES (?1, ?2)", - rusqlite::params![rowid, embedding_bytes], - )?; + conn.prepare_cached("INSERT OR REPLACE INTO embeddings (rowid, embedding) VALUES (?1, ?2)")? + .execute(rusqlite::params![rowid, &embed_buf[..]])?; let chunk_count: Option = if chunk_index == 0 { Some(total_chunks as i64) @@ -378,28 +516,28 @@ fn store_embedding( }; let now = chrono::Utc::now().timestamp_millis(); - conn.execute( + conn.prepare_cached( "INSERT OR REPLACE INTO embedding_metadata (document_id, chunk_index, model, dims, document_hash, chunk_hash, created_at, attempt_count, last_error, chunk_max_bytes, chunk_count) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, 1, NULL, ?8, ?9)", - rusqlite::params![ - doc_id, - chunk_index as i64, - model_name, - EXPECTED_DIMS as i64, - doc_hash, - chunk_hash, - now, - CHUNK_MAX_BYTES as i64, - chunk_count - ], - )?; + )? + .execute(rusqlite::params![ + doc_id, + chunk_index as i64, + model_name, + EXPECTED_DIMS as i64, + doc_hash, + chunk_hash, + now, + CHUNK_MAX_BYTES as i64, + chunk_count + ])?; Ok(()) } -fn record_embedding_error( +pub(crate) fn record_embedding_error( conn: &Connection, doc_id: i64, chunk_index: usize, @@ -409,7 +547,7 @@ fn record_embedding_error( error: &str, ) -> Result<()> { let now = chrono::Utc::now().timestamp_millis(); - conn.execute( + conn.prepare_cached( "INSERT INTO embedding_metadata (document_id, chunk_index, model, dims, document_hash, chunk_hash, created_at, attempt_count, last_error, last_attempt_at, chunk_max_bytes) @@ -419,18 +557,18 @@ fn record_embedding_error( last_error = ?8, last_attempt_at = ?7, chunk_max_bytes = ?9", - rusqlite::params![ - doc_id, - chunk_index as i64, - model_name, - EXPECTED_DIMS as i64, - doc_hash, - chunk_hash, - now, - error, - CHUNK_MAX_BYTES as i64 - ], - )?; + )? + .execute(rusqlite::params![ + doc_id, + chunk_index as i64, + model_name, + EXPECTED_DIMS as i64, + doc_hash, + chunk_hash, + now, + error, + CHUNK_MAX_BYTES as i64 + ])?; Ok(()) }