feat(embedding): Add Ollama-powered vector embedding pipeline

Implements the embedding module that generates vector representations
of documents using a local Ollama instance with the nomic-embed-text
model. These embeddings enable semantic (vector) search and the hybrid
search mode that fuses lexical and semantic results via RRF.

Key components:

- embedding::ollama: HTTP client for the Ollama /api/embeddings
  endpoint. Handles connection errors with actionable error messages
  (OllamaUnavailable, OllamaModelNotFound) and validates response
  dimensions.

- embedding::chunking: Splits long documents into overlapping
  paragraph-aware chunks for embedding. Uses a configurable max token
  estimate (8192 default for nomic-embed-text) with 10% overlap to
  preserve cross-chunk context.

- embedding::chunk_ids: Encodes chunk identity as
  doc_id * 1000 + chunk_index for the embeddings table rowid. This
  allows vector search to map results back to documents and
  deduplicate by doc_id efficiently.

- embedding::change_detector: Compares document content_hash against
  stored embedding hashes to skip re-embedding unchanged documents,
  making incremental embedding runs fast.

- embedding::pipeline: Orchestrates the full embedding flow: detect
  changed documents, chunk them, call Ollama in configurable
  concurrency (default 4), store results. Supports --retry-failed
  to re-attempt previously failed embeddings.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Taylor Eernisse
2026-01-30 15:46:30 -05:00
parent 20edff4ab1
commit 723703bed9
6 changed files with 810 additions and 0 deletions

251
src/embedding/pipeline.rs Normal file
View File

@@ -0,0 +1,251 @@
//! Async embedding pipeline: chunk documents, embed via Ollama, store in sqlite-vec.
use rusqlite::Connection;
use sha2::{Digest, Sha256};
use tracing::{info, warn};
use crate::core::error::Result;
use crate::embedding::change_detector::{count_pending_documents, find_pending_documents};
use crate::embedding::chunk_ids::encode_rowid;
use crate::embedding::chunking::split_into_chunks;
use crate::embedding::ollama::OllamaClient;
const BATCH_SIZE: usize = 32;
const DB_PAGE_SIZE: usize = 500;
const EXPECTED_DIMS: usize = 768;
/// Result of an embedding run.
#[derive(Debug, Default)]
pub struct EmbedResult {
pub embedded: usize,
pub failed: usize,
pub skipped: usize,
}
/// Work item: a single chunk to embed.
struct ChunkWork {
doc_id: i64,
chunk_index: usize,
doc_hash: String,
chunk_hash: String,
text: String,
}
/// Run the embedding pipeline: find pending documents, chunk, embed, store.
///
/// Processes batches of BATCH_SIZE texts per Ollama API call.
/// Uses keyset pagination over documents (DB_PAGE_SIZE per page).
pub async fn embed_documents(
conn: &Connection,
client: &OllamaClient,
model_name: &str,
progress_callback: Option<Box<dyn Fn(usize, usize)>>,
) -> Result<EmbedResult> {
let total = count_pending_documents(conn)? as usize;
let mut result = EmbedResult::default();
let mut last_id: i64 = 0;
let mut processed: usize = 0;
if total == 0 {
return Ok(result);
}
info!(total, "Starting embedding pipeline");
loop {
let pending = find_pending_documents(conn, DB_PAGE_SIZE, last_id)?;
if pending.is_empty() {
break;
}
// Build chunk work items for this page
let mut all_chunks: Vec<ChunkWork> = Vec::new();
for doc in &pending {
// Always advance the cursor, even for skipped docs, to avoid re-fetching
last_id = doc.document_id;
if doc.content_text.is_empty() {
result.skipped += 1;
processed += 1;
continue;
}
// Clear existing embeddings for this document before re-embedding
clear_document_embeddings(conn, doc.document_id)?;
let chunks = split_into_chunks(&doc.content_text);
for (chunk_index, text) in chunks {
all_chunks.push(ChunkWork {
doc_id: doc.document_id,
chunk_index,
doc_hash: doc.content_hash.clone(),
chunk_hash: sha256_hash(&text),
text,
});
}
// Track progress per document (not per chunk) to match `total`
processed += 1;
if let Some(ref cb) = progress_callback {
cb(processed, total);
}
}
// Process chunks in batches of BATCH_SIZE
for batch in all_chunks.chunks(BATCH_SIZE) {
let texts: Vec<String> = batch.iter().map(|c| c.text.clone()).collect();
match client.embed_batch(texts).await {
Ok(embeddings) => {
for (i, embedding) in embeddings.iter().enumerate() {
if i >= batch.len() {
break;
}
let chunk = &batch[i];
if embedding.len() != EXPECTED_DIMS {
warn!(
doc_id = chunk.doc_id,
chunk_index = chunk.chunk_index,
got_dims = embedding.len(),
expected = EXPECTED_DIMS,
"Dimension mismatch, skipping"
);
record_embedding_error(
conn,
chunk.doc_id,
chunk.chunk_index,
&chunk.doc_hash,
&chunk.chunk_hash,
model_name,
&format!(
"Dimension mismatch: got {}, expected {}",
embedding.len(),
EXPECTED_DIMS
),
)?;
result.failed += 1;
continue;
}
store_embedding(
conn,
chunk.doc_id,
chunk.chunk_index,
&chunk.doc_hash,
&chunk.chunk_hash,
model_name,
embedding,
)?;
result.embedded += 1;
}
}
Err(e) => {
warn!(error = %e, "Batch embedding failed");
for chunk in batch {
record_embedding_error(
conn,
chunk.doc_id,
chunk.chunk_index,
&chunk.doc_hash,
&chunk.chunk_hash,
model_name,
&e.to_string(),
)?;
result.failed += 1;
}
}
}
}
}
info!(
embedded = result.embedded,
failed = result.failed,
skipped = result.skipped,
"Embedding pipeline complete"
);
Ok(result)
}
/// Clear all embeddings and metadata for a document.
fn clear_document_embeddings(conn: &Connection, document_id: i64) -> Result<()> {
conn.execute(
"DELETE FROM embedding_metadata WHERE document_id = ?1",
[document_id],
)?;
let start_rowid = encode_rowid(document_id, 0);
let end_rowid = encode_rowid(document_id + 1, 0);
conn.execute(
"DELETE FROM embeddings WHERE rowid >= ?1 AND rowid < ?2",
rusqlite::params![start_rowid, end_rowid],
)?;
Ok(())
}
/// Store an embedding vector and its metadata.
fn store_embedding(
conn: &Connection,
doc_id: i64,
chunk_index: usize,
doc_hash: &str,
chunk_hash: &str,
model_name: &str,
embedding: &[f32],
) -> Result<()> {
let rowid = encode_rowid(doc_id, chunk_index as i64);
let embedding_bytes: Vec<u8> = embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
conn.execute(
"INSERT OR REPLACE INTO embeddings (rowid, embedding) VALUES (?1, ?2)",
rusqlite::params![rowid, embedding_bytes],
)?;
let now = chrono::Utc::now().timestamp_millis();
conn.execute(
"INSERT OR REPLACE INTO embedding_metadata
(document_id, chunk_index, model, dims, document_hash, chunk_hash,
created_at, attempt_count, last_error)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, 1, NULL)",
rusqlite::params![doc_id, chunk_index as i64, model_name, EXPECTED_DIMS as i64, doc_hash, chunk_hash, now],
)?;
Ok(())
}
/// Record an embedding error in metadata for later retry.
fn record_embedding_error(
conn: &Connection,
doc_id: i64,
chunk_index: usize,
doc_hash: &str,
chunk_hash: &str,
model_name: &str,
error: &str,
) -> Result<()> {
let now = chrono::Utc::now().timestamp_millis();
conn.execute(
"INSERT INTO embedding_metadata
(document_id, chunk_index, model, dims, document_hash, chunk_hash,
created_at, attempt_count, last_error, last_attempt_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, 1, ?8, ?7)
ON CONFLICT(document_id, chunk_index) DO UPDATE SET
attempt_count = embedding_metadata.attempt_count + 1,
last_error = ?8,
last_attempt_at = ?7",
rusqlite::params![doc_id, chunk_index as i64, model_name, EXPECTED_DIMS as i64, doc_hash, chunk_hash, now, error],
)?;
Ok(())
}
fn sha256_hash(input: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(input.as_bytes());
format!("{:x}", hasher.finalize())
}