Embedding pipeline improvements building on the concurrent batching foundation: - Track docs_embedded vs chunks_embedded separately. A document counts as embedded only when ALL its chunks succeed, giving accurate progress reporting. The sync command reads docs_embedded for its document count. - Reuse a single Vec<u8> buffer (embed_buf) across all store_embedding calls instead of allocating per chunk. Eliminates ~3KB allocation per 768-dim embedding. - Detect and record errors when Ollama silently returns fewer embeddings than inputs (batch mismatch). Previously these dropped chunks were invisible. - Improve retry error messages: distinguish "retry returned unexpected result" (wrong dims/count) from "retry request failed" (network error) instead of generic "chunk too large" message. - Convert all hot-path SQL from conn.execute() to prepare_cached() for statement cache reuse (clear_document_embeddings, store_embedding, record_embedding_error). - Record embedding_metadata errors for empty documents so they don't appear as perpetually pending on subsequent runs. - Accept concurrency parameter (configurable via config.embedding.concurrency) instead of hardcoded EMBED_CONCURRENCY=2. - Add schema version pre-flight check in embed command to fail fast with actionable error instead of cryptic SQL errors. - Fix --retry-failed to use DELETE instead of UPDATE. UPDATE clears last_error but the row still matches config params in the LEFT JOIN, making the doc permanently invisible to find_pending_documents. DELETE removes the row entirely so the LEFT JOIN returns NULL. Regression test added (old_update_approach_leaves_doc_invisible). - Add chunking forward-progress guard: after floor_char_boundary() rounds backward, ensure start advances by at least one full character to prevent infinite loops on multi-byte sequences (box-drawing chars, smart quotes). Test cases cover the exact patterns that caused production hangs on document 18526. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
231 lines
7.3 KiB
Rust
231 lines
7.3 KiB
Rust
use rusqlite::Connection;
|
|
|
|
use crate::core::error::Result;
|
|
use crate::embedding::chunking::{CHUNK_MAX_BYTES, EXPECTED_DIMS};
|
|
|
|
#[derive(Debug)]
|
|
pub struct PendingDocument {
|
|
pub document_id: i64,
|
|
pub content_text: String,
|
|
pub content_hash: String,
|
|
}
|
|
|
|
pub fn find_pending_documents(
|
|
conn: &Connection,
|
|
page_size: usize,
|
|
last_id: i64,
|
|
model_name: &str,
|
|
) -> Result<Vec<PendingDocument>> {
|
|
// Optimized query: LEFT JOIN + NULL check replaces triple-EXISTS pattern.
|
|
// This allows SQLite to scan embedding_metadata once instead of three times.
|
|
// Semantically identical: returns documents needing (re-)embedding when:
|
|
// - No embedding exists (em.document_id IS NULL)
|
|
// - Content hash changed (em.document_hash != d.content_hash)
|
|
// - Config mismatch (model/dims/chunk_max_bytes)
|
|
let sql = r#"
|
|
SELECT d.id, d.content_text, d.content_hash
|
|
FROM documents d
|
|
LEFT JOIN embedding_metadata em
|
|
ON em.document_id = d.id AND em.chunk_index = 0
|
|
WHERE d.id > ?1
|
|
AND (
|
|
em.document_id IS NULL
|
|
OR em.document_hash != d.content_hash
|
|
OR em.chunk_max_bytes IS NULL
|
|
OR em.chunk_max_bytes != ?3
|
|
OR em.model != ?4
|
|
OR em.dims != ?5
|
|
)
|
|
ORDER BY d.id
|
|
LIMIT ?2
|
|
"#;
|
|
|
|
let mut stmt = conn.prepare(sql)?;
|
|
let rows = stmt
|
|
.query_map(
|
|
rusqlite::params![
|
|
last_id,
|
|
page_size as i64,
|
|
CHUNK_MAX_BYTES as i64,
|
|
model_name,
|
|
EXPECTED_DIMS as i64,
|
|
],
|
|
|row| {
|
|
Ok(PendingDocument {
|
|
document_id: row.get(0)?,
|
|
content_text: row.get(1)?,
|
|
content_hash: row.get(2)?,
|
|
})
|
|
},
|
|
)?
|
|
.collect::<std::result::Result<Vec<_>, _>>()?;
|
|
|
|
Ok(rows)
|
|
}
|
|
|
|
pub fn count_pending_documents(conn: &Connection, model_name: &str) -> Result<i64> {
|
|
// Optimized query: LEFT JOIN + NULL check replaces triple-EXISTS pattern
|
|
let count: i64 = conn.query_row(
|
|
r#"
|
|
SELECT COUNT(*)
|
|
FROM documents d
|
|
LEFT JOIN embedding_metadata em
|
|
ON em.document_id = d.id AND em.chunk_index = 0
|
|
WHERE em.document_id IS NULL
|
|
OR em.document_hash != d.content_hash
|
|
OR em.chunk_max_bytes IS NULL
|
|
OR em.chunk_max_bytes != ?1
|
|
OR em.model != ?2
|
|
OR em.dims != ?3
|
|
"#,
|
|
rusqlite::params![CHUNK_MAX_BYTES as i64, model_name, EXPECTED_DIMS as i64],
|
|
|row| row.get(0),
|
|
)?;
|
|
Ok(count)
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use std::path::Path;
|
|
|
|
use super::*;
|
|
use crate::core::db::{create_connection, run_migrations};
|
|
use crate::embedding::pipeline::record_embedding_error;
|
|
|
|
const MODEL: &str = "nomic-embed-text";
|
|
|
|
fn setup_db() -> Connection {
|
|
let conn = create_connection(Path::new(":memory:")).unwrap();
|
|
run_migrations(&conn).unwrap();
|
|
conn
|
|
}
|
|
|
|
fn insert_test_project(conn: &Connection) -> i64 {
|
|
conn.execute(
|
|
"INSERT INTO projects (gitlab_project_id, path_with_namespace, web_url)
|
|
VALUES (1, 'group/test', 'https://gitlab.example.com/group/test')",
|
|
[],
|
|
)
|
|
.unwrap();
|
|
conn.last_insert_rowid()
|
|
}
|
|
|
|
fn insert_test_document(conn: &Connection, project_id: i64, content: &str) -> i64 {
|
|
conn.execute(
|
|
"INSERT INTO documents (source_type, source_id, project_id, content_text, content_hash)
|
|
VALUES ('issue', 1, ?1, ?2, 'hash123')",
|
|
rusqlite::params![project_id, content],
|
|
)
|
|
.unwrap();
|
|
conn.last_insert_rowid()
|
|
}
|
|
|
|
#[test]
|
|
fn retry_failed_delete_makes_doc_pending_again() {
|
|
let conn = setup_db();
|
|
let proj_id = insert_test_project(&conn);
|
|
let doc_id = insert_test_document(&conn, proj_id, "some text content");
|
|
|
|
// Doc starts as pending
|
|
let pending = find_pending_documents(&conn, 100, 0, MODEL).unwrap();
|
|
assert_eq!(pending.len(), 1, "Doc should be pending initially");
|
|
|
|
// Record an error — doc should no longer be pending
|
|
record_embedding_error(
|
|
&conn,
|
|
doc_id,
|
|
0,
|
|
"hash123",
|
|
"chunkhash",
|
|
MODEL,
|
|
"test error",
|
|
)
|
|
.unwrap();
|
|
let pending = find_pending_documents(&conn, 100, 0, MODEL).unwrap();
|
|
assert!(
|
|
pending.is_empty(),
|
|
"Doc with error metadata should not be pending"
|
|
);
|
|
|
|
// DELETE error rows (mimicking --retry-failed) — doc should become pending again
|
|
conn.execute_batch(
|
|
"DELETE FROM embeddings WHERE rowid / 1000 IN (
|
|
SELECT DISTINCT document_id FROM embedding_metadata
|
|
WHERE last_error IS NOT NULL
|
|
);
|
|
DELETE FROM embedding_metadata WHERE last_error IS NOT NULL;",
|
|
)
|
|
.unwrap();
|
|
let pending = find_pending_documents(&conn, 100, 0, MODEL).unwrap();
|
|
assert_eq!(pending.len(), 1, "Doc should be pending again after DELETE");
|
|
assert_eq!(pending[0].document_id, doc_id);
|
|
}
|
|
|
|
#[test]
|
|
fn empty_doc_with_error_not_pending() {
|
|
let conn = setup_db();
|
|
let proj_id = insert_test_project(&conn);
|
|
let doc_id = insert_test_document(&conn, proj_id, "");
|
|
|
|
// Empty doc starts as pending
|
|
let pending = find_pending_documents(&conn, 100, 0, MODEL).unwrap();
|
|
assert_eq!(pending.len(), 1, "Empty doc should be pending initially");
|
|
|
|
// Record an error for the empty doc
|
|
record_embedding_error(
|
|
&conn,
|
|
doc_id,
|
|
0,
|
|
"hash123",
|
|
"empty",
|
|
MODEL,
|
|
"Document has empty content",
|
|
)
|
|
.unwrap();
|
|
|
|
// Should no longer be pending
|
|
let pending = find_pending_documents(&conn, 100, 0, MODEL).unwrap();
|
|
assert!(
|
|
pending.is_empty(),
|
|
"Empty doc with error metadata should not be pending"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn old_update_approach_leaves_doc_invisible() {
|
|
// This test demonstrates WHY we use DELETE instead of UPDATE.
|
|
// UPDATE clears last_error but the row still matches config params,
|
|
// so the doc stays "not pending" — permanently invisible.
|
|
let conn = setup_db();
|
|
let proj_id = insert_test_project(&conn);
|
|
let doc_id = insert_test_document(&conn, proj_id, "some text content");
|
|
|
|
// Record an error
|
|
record_embedding_error(
|
|
&conn,
|
|
doc_id,
|
|
0,
|
|
"hash123",
|
|
"chunkhash",
|
|
MODEL,
|
|
"test error",
|
|
)
|
|
.unwrap();
|
|
|
|
// Old approach: UPDATE to clear error
|
|
conn.execute(
|
|
"UPDATE embedding_metadata SET last_error = NULL, attempt_count = 0
|
|
WHERE last_error IS NOT NULL",
|
|
[],
|
|
)
|
|
.unwrap();
|
|
|
|
// Doc is NOT pending — it's permanently invisible! This is the bug.
|
|
let pending = find_pending_documents(&conn, 100, 0, MODEL).unwrap();
|
|
assert!(
|
|
pending.is_empty(),
|
|
"UPDATE approach leaves doc invisible (this proves the bug)"
|
|
);
|
|
}
|
|
}
|