3 Commits

Author SHA1 Message Date
Taylor Eernisse
925ec9f574 fix: Retry loop safety, doctor model matching, regenerator robustness
Three defensive improvements from peer code review:

Replace unreachable!() in GitLab client retry loops:
Both request() and request_with_headers() had unreachable!() after
their for loops. While the logic was sound (the final iteration always
reaches the return/break), any refactor to the loop condition would
turn this into a runtime panic. Restructured both to store
last_response with explicit break, making the control flow
self-documenting and the .expect() message useful if ever violated.

Doctor model name comparison asymmetry:
Ollama model names were stripped of their tag (:latest, :v1.5) for
comparison, but the configured model name was compared as-is. A config
value like "nomic-embed-text:v1.5" would never match. Now strips the
tag from both sides before comparing.

Regenerator savepoint cleanup and progress accuracy:
- upsert_document's error path did ROLLBACK TO but never RELEASE,
  leaving a dangling savepoint that could nest on the next call. Added
  RELEASE after rollback so the connection is clean.
- estimated_total for progress reporting was computed once at start but
  the dirty queue can grow during processing. Now recounts each loop
  iteration with max() so the progress fraction never goes backwards.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 14:16:54 -05:00
Taylor Eernisse
1fdc6d03cc fix: Savepoint leak in embedding pipeline, atomic fail_job, RRF dedup
Three correctness fixes found during peer code review:

Embedding pipeline savepoint leak (HIGH severity):
The SAVEPOINT embed_page / RELEASE embed_page pattern had ~10 `?`
propagation points between them. Any error from record_embedding_error,
clear_document_embeddings, or store_embedding would exit the function
without rolling back, leaving the SQLite connection in a broken
transactional state and causing cascading failures for the rest of the
session. Fixed by extracting page processing into `embed_page()` and
wrapping with explicit rollback-on-error handling.

Dependent queue fail_job race (MEDIUM severity):
fail_job performed a SELECT followed by a separate UPDATE on the
attempts counter without a transaction. Under concurrent lock
reclamation, the attempts value could be read stale. Replaced with a
single atomic UPDATE that increments attempts and computes exponential
backoff entirely in SQL, also halving DB round-trips. Added explicit
error when the job no longer exists.

RRF duplicate document score inflation (MEDIUM severity):
If a retriever returned the same document_id multiple times, the RRF
score accumulated multiple rank contributions while the rank only
recorded the first occurrence. Moved the score accumulation inside the
`if is_none` guard so only the first occurrence per list contributes.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 14:16:38 -05:00
Taylor Eernisse
266ed78e73 feat(sync): Wire progress callbacks through sync pipeline stages
The sync command's stage spinners now show real-time aggregate progress
for each pipeline phase instead of static "syncing..." messages.

- Add `progress_callback` parameter to `run_embed` and
  `run_generate_docs` so callers can receive `(processed, total)` updates
- Add `stage_bar` parameter to `run_ingest` for aggregate progress
  across concurrently-ingested projects using shared AtomicUsize counters
- Update `stage_spinner` to use `{prefix}` for the `[N/M]` label,
  allowing `{msg}` to be updated independently with progress details
- Thread `ProgressBar` clones into each concurrent project task so
  per-entity progress (fetch, discussions, events) is reflected on the
  aggregate spinner
- Pass `None` for progress callbacks at standalone CLI entry points
  (handle_ingest, handle_generate_docs, handle_embed) to preserve
  existing behavior when commands are run outside of sync

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 14:16:21 -05:00
11 changed files with 460 additions and 265 deletions

View File

@@ -418,7 +418,11 @@ async fn check_ollama(config: Option<&Config>) -> OllamaCheck {
.map(|m| m.name.split(':').next().unwrap_or(&m.name))
.collect();
if !model_names.iter().any(|m| *m == model) {
// Strip tag from configured model name too (e.g.
// "nomic-embed-text:v1.5" → "nomic-embed-text") so both
// sides are compared at the same granularity.
let model_base = model.split(':').next().unwrap_or(model);
if !model_names.contains(&model_base) {
return OllamaCheck {
result: CheckResult {
status: CheckStatus::Warning,

View File

@@ -19,10 +19,13 @@ pub struct EmbedCommandResult {
}
/// Run the embed command.
///
/// `progress_callback` reports `(processed, total)` as documents are embedded.
pub async fn run_embed(
config: &Config,
full: bool,
retry_failed: bool,
progress_callback: Option<Box<dyn Fn(usize, usize)>>,
) -> Result<EmbedCommandResult> {
let db_path = get_db_path(config.storage.db_path.as_deref());
let conn = create_connection(&db_path)?;
@@ -58,7 +61,7 @@ pub async fn run_embed(
}
let model_name = &config.embedding.model;
let result = embed_documents(&conn, &client, model_name, None).await?;
let result = embed_documents(&conn, &client, model_name, progress_callback).await?;
Ok(EmbedCommandResult {
embedded: result.embedded,

View File

@@ -28,10 +28,13 @@ pub struct GenerateDocsResult {
///
/// Default mode: process only existing dirty_sources entries.
/// Full mode: seed dirty_sources with ALL entities, then drain.
///
/// `progress_callback` reports `(processed, estimated_total)` as documents are generated.
pub fn run_generate_docs(
config: &Config,
full: bool,
project_filter: Option<&str>,
progress_callback: Option<Box<dyn Fn(usize, usize)>>,
) -> Result<GenerateDocsResult> {
let db_path = get_db_path(config.storage.db_path.as_deref());
let conn = create_connection(&db_path)?;
@@ -46,7 +49,8 @@ pub fn run_generate_docs(
result.seeded += seed_dirty(&conn, SourceType::Discussion, project_filter)?;
}
let regen = regenerate_dirty_documents(&conn)?;
let regen =
regenerate_dirty_documents(&conn, progress_callback.as_ref().map(|cb| cb.as_ref()))?;
result.regenerated = regen.regenerated;
result.unchanged = regen.unchanged;
result.errored = regen.errored;

View File

@@ -1,5 +1,8 @@
//! Ingest command - fetch data from GitLab.
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use console::style;
use indicatif::{ProgressBar, ProgressStyle};
use rusqlite::Connection;
@@ -106,6 +109,9 @@ impl IngestDisplay {
}
/// Run the ingest command.
///
/// `stage_bar` is an optional `ProgressBar` (typically from sync's stage spinner)
/// that will be updated with aggregate progress across all projects.
pub async fn run_ingest(
config: &Config,
resource_type: &str,
@@ -113,14 +119,23 @@ pub async fn run_ingest(
force: bool,
full: bool,
display: IngestDisplay,
stage_bar: Option<ProgressBar>,
) -> Result<IngestResult> {
let run_id = uuid::Uuid::new_v4().simple().to_string();
let run_id = &run_id[..8];
let span = tracing::info_span!("ingest", %run_id, %resource_type);
run_ingest_inner(config, resource_type, project_filter, force, full, display)
.instrument(span)
.await
run_ingest_inner(
config,
resource_type,
project_filter,
force,
full,
display,
stage_bar,
)
.instrument(span)
.await
}
/// Inner implementation of run_ingest, instrumented with a root span.
@@ -131,6 +146,7 @@ async fn run_ingest_inner(
force: bool,
full: bool,
display: IngestDisplay,
stage_bar: Option<ProgressBar>,
) -> Result<IngestResult> {
// Validate resource type early
if resource_type != "issues" && resource_type != "mrs" {
@@ -237,6 +253,14 @@ async fn run_ingest_inner(
let concurrency = config.sync.primary_concurrency as usize;
let resource_type_owned = resource_type.to_string();
// Aggregate counters for stage_bar updates (shared across concurrent projects)
let agg_fetched = Arc::new(AtomicUsize::new(0));
let agg_discussions = Arc::new(AtomicUsize::new(0));
let agg_disc_total = Arc::new(AtomicUsize::new(0));
let agg_events = Arc::new(AtomicUsize::new(0));
let agg_events_total = Arc::new(AtomicUsize::new(0));
let stage_bar = stage_bar.unwrap_or_else(ProgressBar::hidden);
use futures::stream::{self, StreamExt};
let project_results: Vec<Result<ProjectIngestOutcome>> = stream::iter(projects.iter())
@@ -248,6 +272,12 @@ async fn run_ingest_inner(
let path = path.clone();
let local_project_id = *local_project_id;
let gitlab_project_id = *gitlab_project_id;
let stage_bar = stage_bar.clone();
let agg_fetched = Arc::clone(&agg_fetched);
let agg_discussions = Arc::clone(&agg_discussions);
let agg_disc_total = Arc::clone(&agg_disc_total);
let agg_events = Arc::clone(&agg_events);
let agg_events_total = Arc::clone(&agg_events_total);
async move {
let proj_conn = create_connection(&db_path)?;
@@ -286,28 +316,70 @@ async fn run_ingest_inner(
let spinner_clone = spinner.clone();
let disc_bar_clone = disc_bar.clone();
let stage_bar_clone = stage_bar.clone();
let agg_fetched_clone = Arc::clone(&agg_fetched);
let agg_discussions_clone = Arc::clone(&agg_discussions);
let agg_disc_total_clone = Arc::clone(&agg_disc_total);
let agg_events_clone = Arc::clone(&agg_events);
let agg_events_total_clone = Arc::clone(&agg_events_total);
let path_for_cb = path.clone();
let progress_callback: crate::ingestion::ProgressCallback = if !display.show_progress {
Box::new(|_| {})
} else {
Box::new(move |event: ProgressEvent| match event {
ProgressEvent::IssuesFetchStarted | ProgressEvent::MrsFetchStarted => {
// Spinner already showing fetch message
}
ProgressEvent::IssuesFetchComplete { total } | ProgressEvent::MrsFetchComplete { total } => {
let agg = agg_fetched_clone.fetch_add(total, Ordering::Relaxed) + total;
spinner_clone.set_message(format!(
"{path_for_cb}: {total} {type_label} fetched"
));
stage_bar_clone.set_message(format!(
"Fetching {type_label}... ({agg} fetched across projects)"
));
}
ProgressEvent::IssueFetched { count } | ProgressEvent::MrFetched { count } => {
spinner_clone.set_message(format!(
"{path_for_cb}: {count} fetched so far..."
));
}
ProgressEvent::DiscussionSyncStarted { total } => {
spinner_clone.finish_and_clear();
let agg_total = agg_disc_total_clone.fetch_add(total, Ordering::Relaxed) + total;
disc_bar_clone.set_length(total as u64);
disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(100));
stage_bar_clone.set_message(format!(
"Syncing discussions... (0/{agg_total})"
));
}
ProgressEvent::DiscussionSynced { current, total: _ } => {
disc_bar_clone.set_position(current as u64);
let agg = agg_discussions_clone.fetch_add(1, Ordering::Relaxed) + 1;
let agg_total = agg_disc_total_clone.load(Ordering::Relaxed);
stage_bar_clone.set_message(format!(
"Syncing discussions... ({agg}/{agg_total})"
));
}
ProgressEvent::DiscussionSyncComplete => {
disc_bar_clone.finish_and_clear();
}
ProgressEvent::MrDiscussionSyncStarted { total } => {
spinner_clone.finish_and_clear();
let agg_total = agg_disc_total_clone.fetch_add(total, Ordering::Relaxed) + total;
disc_bar_clone.set_length(total as u64);
disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(100));
stage_bar_clone.set_message(format!(
"Syncing discussions... (0/{agg_total})"
));
}
ProgressEvent::MrDiscussionSynced { current, total: _ } => {
disc_bar_clone.set_position(current as u64);
let agg = agg_discussions_clone.fetch_add(1, Ordering::Relaxed) + 1;
let agg_total = agg_disc_total_clone.load(Ordering::Relaxed);
stage_bar_clone.set_message(format!(
"Syncing discussions... ({agg}/{agg_total})"
));
}
ProgressEvent::MrDiscussionSyncComplete => {
disc_bar_clone.finish_and_clear();
@@ -322,14 +394,22 @@ async fn run_ingest_inner(
.progress_chars("=> "),
);
disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(100));
agg_events_total_clone.fetch_add(total, Ordering::Relaxed);
stage_bar_clone.set_message(
"Fetching resource events...".to_string()
);
}
ProgressEvent::ResourceEventFetched { current, total: _ } => {
disc_bar_clone.set_position(current as u64);
let agg = agg_events_clone.fetch_add(1, Ordering::Relaxed) + 1;
let agg_total = agg_events_total_clone.load(Ordering::Relaxed);
stage_bar_clone.set_message(format!(
"Fetching resource events... ({agg}/{agg_total})"
));
}
ProgressEvent::ResourceEventsFetchComplete { .. } => {
disc_bar_clone.finish_and_clear();
}
_ => {}
})
};

View File

@@ -40,6 +40,9 @@ pub struct SyncResult {
}
/// Create a styled spinner for a sync stage.
///
/// Uses `{prefix}` for the `[N/M]` stage label so callers can update `{msg}`
/// independently without losing the stage context.
fn stage_spinner(stage: u8, total: u8, msg: &str, robot_mode: bool) -> ProgressBar {
if robot_mode {
return ProgressBar::hidden();
@@ -47,11 +50,12 @@ fn stage_spinner(stage: u8, total: u8, msg: &str, robot_mode: bool) -> ProgressB
let pb = crate::cli::progress::multi().add(ProgressBar::new_spinner());
pb.set_style(
ProgressStyle::default_spinner()
.template("{spinner:.blue} {msg}")
.template("{spinner:.blue} {prefix} {msg}")
.expect("valid template"),
);
pb.enable_steady_tick(std::time::Duration::from_millis(80));
pb.set_message(format!("[{stage}/{total}] {msg}"));
pb.set_prefix(format!("[{stage}/{total}]"));
pb.set_message(msg.to_string());
pb
}
@@ -112,6 +116,7 @@ pub async fn run_sync(
options.force,
options.full,
ingest_display,
Some(spinner.clone()),
)
.await?;
result.issues_updated = issues_result.issues_upserted;
@@ -136,6 +141,7 @@ pub async fn run_sync(
options.force,
options.full,
ingest_display,
Some(spinner.clone()),
)
.await?;
result.mrs_updated = mrs_result.mrs_upserted;
@@ -154,7 +160,15 @@ pub async fn run_sync(
options.robot_mode,
);
info!("Sync stage {current_stage}/{total_stages}: generating documents");
let docs_result = run_generate_docs(config, false, None)?;
let docs_spinner = spinner.clone();
let docs_cb: Box<dyn Fn(usize, usize)> = Box::new(move |processed, total| {
if total > 0 {
docs_spinner.set_message(format!(
"Processing documents... ({processed}/{total})"
));
}
});
let docs_result = run_generate_docs(config, false, None, Some(docs_cb))?;
result.documents_regenerated = docs_result.regenerated;
spinner.finish_and_clear();
} else {
@@ -171,7 +185,13 @@ pub async fn run_sync(
options.robot_mode,
);
info!("Sync stage {current_stage}/{total_stages}: embedding documents");
match run_embed(config, options.full, false).await {
let embed_spinner = spinner.clone();
let embed_cb: Box<dyn Fn(usize, usize)> = Box::new(move |processed, total| {
embed_spinner.set_message(format!(
"Embedding documents... ({processed}/{total})"
));
});
match run_embed(config, options.full, false, Some(embed_cb)).await {
Ok(embed_result) => {
result.documents_embedded = embed_result.embedded;
spinner.finish_and_clear();

View File

@@ -122,28 +122,30 @@ pub fn complete_job(conn: &Connection, job_id: i64) -> Result<()> {
/// Mark a job as failed. Increments attempts, sets next_retry_at with exponential
/// backoff, clears locked_at, and records the error.
///
/// Backoff: 30s * 2^(attempts-1), capped at 480s.
/// Backoff: 30s * 2^(attempts), capped at 480s. Uses a single atomic UPDATE
/// to avoid a read-then-write race on the `attempts` counter.
pub fn fail_job(conn: &Connection, job_id: i64, error: &str) -> Result<()> {
let now = now_ms();
// Get current attempts (propagate error if job no longer exists)
let current_attempts: i32 = conn.query_row(
"SELECT attempts FROM pending_dependent_fetches WHERE id = ?1",
rusqlite::params![job_id],
|row| row.get(0),
)?;
let new_attempts = current_attempts + 1;
let backoff_ms: i64 = (30_000i64 * (1i64 << (new_attempts - 1).min(4))).min(480_000);
let next_retry = now + backoff_ms;
conn.execute(
// Atomic increment + backoff calculation in one UPDATE.
// MIN(attempts, 4) caps the shift to prevent overflow; the overall
// backoff is clamped to 480 000 ms via MIN(..., 480000).
let changes = conn.execute(
"UPDATE pending_dependent_fetches
SET attempts = ?1, next_retry_at = ?2, locked_at = NULL, last_error = ?3
WHERE id = ?4",
rusqlite::params![new_attempts, next_retry, error, job_id],
SET attempts = attempts + 1,
next_retry_at = ?1 + MIN(30000 * (1 << MIN(attempts, 4)), 480000),
locked_at = NULL,
last_error = ?2
WHERE id = ?3",
rusqlite::params![now, error, job_id],
)?;
if changes == 0 {
return Err(crate::core::error::LoreError::Other(
"fail_job: job not found (may have been reclaimed or completed)".into(),
));
}
Ok(())
}

View File

@@ -21,16 +21,37 @@ pub struct RegenerateResult {
///
/// Uses per-item error handling (fail-soft) and drains the queue completely
/// via a bounded batch loop. Each dirty item is processed independently.
#[instrument(skip(conn), fields(items_processed, items_skipped, errors))]
pub fn regenerate_dirty_documents(conn: &Connection) -> Result<RegenerateResult> {
///
/// `progress_callback` reports `(processed, estimated_total)` after each item.
#[instrument(
skip(conn, progress_callback),
fields(items_processed, items_skipped, errors)
)]
pub fn regenerate_dirty_documents(
conn: &Connection,
progress_callback: Option<&dyn Fn(usize, usize)>,
) -> Result<RegenerateResult> {
let mut result = RegenerateResult::default();
// Estimated total for progress reporting. Recount each loop iteration
// so the denominator grows if new items are enqueued during processing
// (the queue can grow while we drain it). We use max() so the value
// never shrinks — preventing the progress fraction from going backwards.
let mut estimated_total: usize = 0;
loop {
let dirty = get_dirty_sources(conn)?;
if dirty.is_empty() {
break;
}
// Recount remaining + already-processed to get the true total.
let remaining: usize = conn
.query_row("SELECT COUNT(*) FROM dirty_sources", [], |row| row.get(0))
.unwrap_or(0_i64) as usize;
let processed_so_far = result.regenerated + result.unchanged + result.errored;
estimated_total = estimated_total.max(processed_so_far + remaining);
for (source_type, source_id) in &dirty {
match regenerate_one(conn, *source_type, *source_id) {
Ok(changed) => {
@@ -52,6 +73,11 @@ pub fn regenerate_dirty_documents(conn: &Connection) -> Result<RegenerateResult>
result.errored += 1;
}
}
let processed = result.regenerated + result.unchanged + result.errored;
if let Some(cb) = progress_callback {
cb(processed, estimated_total);
}
}
}
@@ -123,7 +149,9 @@ fn upsert_document(conn: &Connection, doc: &DocumentData) -> Result<()> {
Ok(())
}
Err(e) => {
let _ = conn.execute_batch("ROLLBACK TO upsert_doc");
// ROLLBACK TO restores the savepoint but leaves it active.
// RELEASE removes it so the connection is clean for the next call.
let _ = conn.execute_batch("ROLLBACK TO upsert_doc; RELEASE upsert_doc");
Err(e)
}
}
@@ -358,7 +386,7 @@ mod tests {
).unwrap();
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
let result = regenerate_dirty_documents(&conn).unwrap();
let result = regenerate_dirty_documents(&conn, None).unwrap();
assert_eq!(result.regenerated, 1);
assert_eq!(result.unchanged, 0);
assert_eq!(result.errored, 0);
@@ -385,12 +413,12 @@ mod tests {
// First regeneration creates the document
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
let r1 = regenerate_dirty_documents(&conn).unwrap();
let r1 = regenerate_dirty_documents(&conn, None).unwrap();
assert_eq!(r1.regenerated, 1);
// Second regeneration — same data, should be unchanged
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
let r2 = regenerate_dirty_documents(&conn).unwrap();
let r2 = regenerate_dirty_documents(&conn, None).unwrap();
assert_eq!(r2.unchanged, 1);
assert_eq!(r2.regenerated, 0);
}
@@ -403,7 +431,7 @@ mod tests {
[],
).unwrap();
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
regenerate_dirty_documents(&conn).unwrap();
regenerate_dirty_documents(&conn, None).unwrap();
// Delete the issue and re-mark dirty
conn.execute("PRAGMA foreign_keys = OFF", []).unwrap();
@@ -411,7 +439,7 @@ mod tests {
conn.execute("PRAGMA foreign_keys = ON", []).unwrap();
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
let result = regenerate_dirty_documents(&conn).unwrap();
let result = regenerate_dirty_documents(&conn, None).unwrap();
assert_eq!(result.regenerated, 1); // Deletion counts as "changed"
let count: i64 = conn
@@ -431,7 +459,7 @@ mod tests {
mark_dirty(&conn, SourceType::Issue, i).unwrap();
}
let result = regenerate_dirty_documents(&conn).unwrap();
let result = regenerate_dirty_documents(&conn, None).unwrap();
assert_eq!(result.regenerated, 10);
// Queue should be empty
@@ -459,11 +487,11 @@ mod tests {
// First run creates document
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
regenerate_dirty_documents(&conn).unwrap();
regenerate_dirty_documents(&conn, None).unwrap();
// Second run — triple hash match, should skip ALL writes
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
let result = regenerate_dirty_documents(&conn).unwrap();
let result = regenerate_dirty_documents(&conn, None).unwrap();
assert_eq!(result.unchanged, 1);
// Labels should still be present (not deleted and re-inserted)

View File

@@ -66,227 +66,33 @@ pub async fn embed_documents(
// process crashes mid-page, the savepoint is never released and
// SQLite rolls back — preventing partial document states where old
// embeddings are cleared but new ones haven't been written yet.
//
// We use a closure + match to ensure the savepoint is always
// rolled back on error — bare `execute_batch("SAVEPOINT")` with `?`
// propagation would leak the savepoint and leave the connection in
// a broken transactional state.
conn.execute_batch("SAVEPOINT embed_page")?;
// Build chunk work items for this page
let mut all_chunks: Vec<ChunkWork> = Vec::new();
let mut page_normal_docs: usize = 0;
for doc in &pending {
// Always advance the cursor, even for skipped docs, to avoid re-fetching
last_id = doc.document_id;
if doc.content_text.is_empty() {
result.skipped += 1;
processed += 1;
continue;
let page_result = embed_page(
conn,
client,
model_name,
&pending,
&mut result,
&mut last_id,
&mut processed,
total,
&progress_callback,
)
.await;
match page_result {
Ok(()) => {
conn.execute_batch("RELEASE embed_page")?;
}
let chunks = split_into_chunks(&doc.content_text);
let total_chunks = chunks.len();
// Overflow guard: skip documents that produce too many chunks.
// Must run BEFORE clear_document_embeddings so existing embeddings
// are preserved when we skip.
if total_chunks as i64 > CHUNK_ROWID_MULTIPLIER {
warn!(
doc_id = doc.document_id,
chunk_count = total_chunks,
max = CHUNK_ROWID_MULTIPLIER,
"Document produces too many chunks, skipping to prevent rowid collision"
);
// Record a sentinel error so the document is not re-detected as
// pending on subsequent runs (prevents infinite re-processing).
record_embedding_error(
conn,
doc.document_id,
0, // sentinel chunk_index
&doc.content_hash,
"overflow-sentinel",
model_name,
&format!(
"Document produces {} chunks, exceeding max {}",
total_chunks, CHUNK_ROWID_MULTIPLIER
),
)?;
result.skipped += 1;
processed += 1;
if let Some(ref cb) = progress_callback {
cb(processed, total);
}
continue;
}
// Don't clear existing embeddings here — defer until the first
// successful chunk embedding so that if ALL chunks for a document
// fail, old embeddings survive instead of leaving zero data.
for (chunk_index, text) in chunks {
all_chunks.push(ChunkWork {
doc_id: doc.document_id,
chunk_index,
total_chunks,
doc_hash: doc.content_hash.clone(),
chunk_hash: sha256_hash(&text),
text,
});
}
page_normal_docs += 1;
// Don't fire progress here — wait until embedding completes below.
}
// Track documents whose old embeddings have been cleared.
// We defer clearing until the first successful chunk embedding so
// that if ALL chunks for a document fail, old embeddings survive.
let mut cleared_docs: HashSet<i64> = HashSet::new();
// Process chunks in batches of BATCH_SIZE
for batch in all_chunks.chunks(BATCH_SIZE) {
let texts: Vec<String> = batch.iter().map(|c| c.text.clone()).collect();
match client.embed_batch(texts).await {
Ok(embeddings) => {
for (i, embedding) in embeddings.iter().enumerate() {
if i >= batch.len() {
break;
}
let chunk = &batch[i];
if embedding.len() != EXPECTED_DIMS {
warn!(
doc_id = chunk.doc_id,
chunk_index = chunk.chunk_index,
got_dims = embedding.len(),
expected = EXPECTED_DIMS,
"Dimension mismatch, skipping"
);
record_embedding_error(
conn,
chunk.doc_id,
chunk.chunk_index,
&chunk.doc_hash,
&chunk.chunk_hash,
model_name,
&format!(
"Dimension mismatch: got {}, expected {}",
embedding.len(),
EXPECTED_DIMS
),
)?;
result.failed += 1;
continue;
}
// Clear old embeddings on first successful chunk for this document
if !cleared_docs.contains(&chunk.doc_id) {
clear_document_embeddings(conn, chunk.doc_id)?;
cleared_docs.insert(chunk.doc_id);
}
store_embedding(
conn,
chunk.doc_id,
chunk.chunk_index,
&chunk.doc_hash,
&chunk.chunk_hash,
model_name,
embedding,
chunk.total_chunks,
)?;
result.embedded += 1;
}
}
Err(e) => {
// Batch failed — retry each chunk individually so one
// oversized chunk doesn't poison the entire batch.
let err_str = e.to_string();
let err_lower = err_str.to_lowercase();
// Ollama error messages vary across versions. Match broadly
// against known patterns to detect context-window overflow.
let is_context_error = err_lower.contains("context length")
|| err_lower.contains("too long")
|| err_lower.contains("maximum context")
|| err_lower.contains("token limit")
|| err_lower.contains("exceeds")
|| (err_lower.contains("413") && err_lower.contains("http"));
if is_context_error && batch.len() > 1 {
warn!(
"Batch failed with context length error, retrying chunks individually"
);
for chunk in batch {
match client.embed_batch(vec![chunk.text.clone()]).await {
Ok(embeddings)
if !embeddings.is_empty()
&& embeddings[0].len() == EXPECTED_DIMS =>
{
// Clear old embeddings on first successful chunk
if !cleared_docs.contains(&chunk.doc_id) {
clear_document_embeddings(conn, chunk.doc_id)?;
cleared_docs.insert(chunk.doc_id);
}
store_embedding(
conn,
chunk.doc_id,
chunk.chunk_index,
&chunk.doc_hash,
&chunk.chunk_hash,
model_name,
&embeddings[0],
chunk.total_chunks,
)?;
result.embedded += 1;
}
_ => {
warn!(
doc_id = chunk.doc_id,
chunk_index = chunk.chunk_index,
chunk_bytes = chunk.text.len(),
"Chunk too large for model context window"
);
record_embedding_error(
conn,
chunk.doc_id,
chunk.chunk_index,
&chunk.doc_hash,
&chunk.chunk_hash,
model_name,
"Chunk exceeds model context window",
)?;
result.failed += 1;
}
}
}
} else {
warn!(error = %e, "Batch embedding failed");
for chunk in batch {
record_embedding_error(
conn,
chunk.doc_id,
chunk.chunk_index,
&chunk.doc_hash,
&chunk.chunk_hash,
model_name,
&e.to_string(),
)?;
result.failed += 1;
}
}
}
Err(e) => {
let _ = conn.execute_batch("ROLLBACK TO embed_page; RELEASE embed_page");
return Err(e);
}
}
// Fire progress for all normal documents after embedding completes.
// This ensures progress reflects actual embedding work, not just chunking.
processed += page_normal_docs;
if let Some(ref cb) = progress_callback {
cb(processed, total);
}
// Commit all DB writes for this page atomically.
conn.execute_batch("RELEASE embed_page")?;
}
info!(
@@ -303,6 +109,240 @@ pub async fn embed_documents(
Ok(result)
}
/// Process a single page of pending documents within an active savepoint.
///
/// All `?` propagation from this function is caught by the caller, which
/// rolls back the savepoint on error.
#[allow(clippy::too_many_arguments)]
async fn embed_page(
conn: &Connection,
client: &OllamaClient,
model_name: &str,
pending: &[crate::embedding::change_detector::PendingDocument],
result: &mut EmbedResult,
last_id: &mut i64,
processed: &mut usize,
total: usize,
progress_callback: &Option<Box<dyn Fn(usize, usize)>>,
) -> Result<()> {
// Build chunk work items for this page
let mut all_chunks: Vec<ChunkWork> = Vec::new();
let mut page_normal_docs: usize = 0;
for doc in pending {
// Always advance the cursor, even for skipped docs, to avoid re-fetching
*last_id = doc.document_id;
if doc.content_text.is_empty() {
result.skipped += 1;
*processed += 1;
continue;
}
let chunks = split_into_chunks(&doc.content_text);
let total_chunks = chunks.len();
// Overflow guard: skip documents that produce too many chunks.
// Must run BEFORE clear_document_embeddings so existing embeddings
// are preserved when we skip.
if total_chunks as i64 > CHUNK_ROWID_MULTIPLIER {
warn!(
doc_id = doc.document_id,
chunk_count = total_chunks,
max = CHUNK_ROWID_MULTIPLIER,
"Document produces too many chunks, skipping to prevent rowid collision"
);
// Record a sentinel error so the document is not re-detected as
// pending on subsequent runs (prevents infinite re-processing).
record_embedding_error(
conn,
doc.document_id,
0, // sentinel chunk_index
&doc.content_hash,
"overflow-sentinel",
model_name,
&format!(
"Document produces {} chunks, exceeding max {}",
total_chunks, CHUNK_ROWID_MULTIPLIER
),
)?;
result.skipped += 1;
*processed += 1;
if let Some(cb) = progress_callback {
cb(*processed, total);
}
continue;
}
// Don't clear existing embeddings here — defer until the first
// successful chunk embedding so that if ALL chunks for a document
// fail, old embeddings survive instead of leaving zero data.
for (chunk_index, text) in chunks {
all_chunks.push(ChunkWork {
doc_id: doc.document_id,
chunk_index,
total_chunks,
doc_hash: doc.content_hash.clone(),
chunk_hash: sha256_hash(&text),
text,
});
}
page_normal_docs += 1;
// Don't fire progress here — wait until embedding completes below.
}
// Track documents whose old embeddings have been cleared.
// We defer clearing until the first successful chunk embedding so
// that if ALL chunks for a document fail, old embeddings survive.
let mut cleared_docs: HashSet<i64> = HashSet::new();
// Process chunks in batches of BATCH_SIZE
for batch in all_chunks.chunks(BATCH_SIZE) {
let texts: Vec<String> = batch.iter().map(|c| c.text.clone()).collect();
match client.embed_batch(texts).await {
Ok(embeddings) => {
for (i, embedding) in embeddings.iter().enumerate() {
if i >= batch.len() {
break;
}
let chunk = &batch[i];
if embedding.len() != EXPECTED_DIMS {
warn!(
doc_id = chunk.doc_id,
chunk_index = chunk.chunk_index,
got_dims = embedding.len(),
expected = EXPECTED_DIMS,
"Dimension mismatch, skipping"
);
record_embedding_error(
conn,
chunk.doc_id,
chunk.chunk_index,
&chunk.doc_hash,
&chunk.chunk_hash,
model_name,
&format!(
"Dimension mismatch: got {}, expected {}",
embedding.len(),
EXPECTED_DIMS
),
)?;
result.failed += 1;
continue;
}
// Clear old embeddings on first successful chunk for this document
if !cleared_docs.contains(&chunk.doc_id) {
clear_document_embeddings(conn, chunk.doc_id)?;
cleared_docs.insert(chunk.doc_id);
}
store_embedding(
conn,
chunk.doc_id,
chunk.chunk_index,
&chunk.doc_hash,
&chunk.chunk_hash,
model_name,
embedding,
chunk.total_chunks,
)?;
result.embedded += 1;
}
}
Err(e) => {
// Batch failed — retry each chunk individually so one
// oversized chunk doesn't poison the entire batch.
let err_str = e.to_string();
let err_lower = err_str.to_lowercase();
// Ollama error messages vary across versions. Match broadly
// against known patterns to detect context-window overflow.
let is_context_error = err_lower.contains("context length")
|| err_lower.contains("too long")
|| err_lower.contains("maximum context")
|| err_lower.contains("token limit")
|| err_lower.contains("exceeds")
|| (err_lower.contains("413") && err_lower.contains("http"));
if is_context_error && batch.len() > 1 {
warn!("Batch failed with context length error, retrying chunks individually");
for chunk in batch {
match client.embed_batch(vec![chunk.text.clone()]).await {
Ok(embeddings)
if !embeddings.is_empty()
&& embeddings[0].len() == EXPECTED_DIMS =>
{
// Clear old embeddings on first successful chunk
if !cleared_docs.contains(&chunk.doc_id) {
clear_document_embeddings(conn, chunk.doc_id)?;
cleared_docs.insert(chunk.doc_id);
}
store_embedding(
conn,
chunk.doc_id,
chunk.chunk_index,
&chunk.doc_hash,
&chunk.chunk_hash,
model_name,
&embeddings[0],
chunk.total_chunks,
)?;
result.embedded += 1;
}
_ => {
warn!(
doc_id = chunk.doc_id,
chunk_index = chunk.chunk_index,
chunk_bytes = chunk.text.len(),
"Chunk too large for model context window"
);
record_embedding_error(
conn,
chunk.doc_id,
chunk.chunk_index,
&chunk.doc_hash,
&chunk.chunk_hash,
model_name,
"Chunk exceeds model context window",
)?;
result.failed += 1;
}
}
}
} else {
warn!(error = %e, "Batch embedding failed");
for chunk in batch {
record_embedding_error(
conn,
chunk.doc_id,
chunk.chunk_index,
&chunk.doc_hash,
&chunk.chunk_hash,
model_name,
&e.to_string(),
)?;
result.failed += 1;
}
}
}
}
}
// Fire progress for all normal documents after embedding completes.
// This ensures progress reflects actual embedding work, not just chunking.
*processed += page_normal_docs;
if let Some(cb) = progress_callback {
cb(*processed, total);
}
Ok(())
}
/// Clear all embeddings and metadata for a document.
fn clear_document_embeddings(conn: &Connection, document_id: i64) -> Result<()> {
conn.execute(

View File

@@ -122,6 +122,7 @@ impl GitLabClient {
/// Make an authenticated API request with automatic 429 retry.
async fn request<T: serde::de::DeserializeOwned>(&self, path: &str) -> Result<T> {
let url = format!("{}{}", self.base_url, path);
let mut last_response = None;
for attempt in 0..=Self::MAX_RETRIES {
let delay = self.rate_limiter.lock().await.check_delay();
@@ -155,10 +156,15 @@ impl GitLabClient {
continue;
}
return self.handle_response(response, path).await;
last_response = Some(response);
break;
}
unreachable!("loop always returns")
// Safety: the loop always executes at least once (0..=MAX_RETRIES)
// and either sets last_response+break, or continues (only when
// attempt < MAX_RETRIES). The final iteration always reaches break.
self.handle_response(last_response.expect("retry loop ran at least once"), path)
.await
}
/// Parse retry-after header from a 429 response, defaulting to 60s.
@@ -543,6 +549,7 @@ impl GitLabClient {
params: &[(&str, String)],
) -> Result<(T, HeaderMap)> {
let url = format!("{}{}", self.base_url, path);
let mut last_response = None;
for attempt in 0..=Self::MAX_RETRIES {
let delay = self.rate_limiter.lock().await.check_delay();
@@ -577,12 +584,14 @@ impl GitLabClient {
continue;
}
let headers = response.headers().clone();
let body = self.handle_response(response, path).await?;
return Ok((body, headers));
last_response = Some(response);
break;
}
unreachable!("loop always returns")
let response = last_response.expect("retry loop ran at least once");
let headers = response.headers().clone();
let body = self.handle_response(response, path).await?;
Ok((body, headers))
}
}

View File

@@ -501,6 +501,7 @@ async fn handle_ingest(
force,
full,
display,
None,
)
.await?;
@@ -527,6 +528,7 @@ async fn handle_ingest(
force,
full,
display,
None,
)
.await?;
@@ -537,6 +539,7 @@ async fn handle_ingest(
force,
full,
display,
None,
)
.await?;
@@ -1225,7 +1228,7 @@ async fn handle_generate_docs(
) -> Result<(), Box<dyn std::error::Error>> {
let config = Config::load(config_override)?;
let result = run_generate_docs(&config, args.full, args.project.as_deref())?;
let result = run_generate_docs(&config, args.full, args.project.as_deref(), None)?;
if robot_mode {
print_generate_docs_json(&result);
} else {
@@ -1242,7 +1245,7 @@ async fn handle_embed(
let config = Config::load(config_override)?;
let full = args.full && !args.no_full;
let retry_failed = args.retry_failed && !args.no_retry_failed;
let result = run_embed(&config, full, retry_failed).await?;
let result = run_embed(&config, full, retry_failed, None).await?;
if robot_mode {
print_embed_json(&result);
} else {

View File

@@ -33,8 +33,10 @@ pub fn rank_rrf(vector_results: &[(i64, f64)], fts_results: &[(i64, f64)]) -> Ve
for (i, &(doc_id, _)) in vector_results.iter().enumerate() {
let rank = i + 1; // 1-indexed
let entry = scores.entry(doc_id).or_insert((0.0, None, None));
entry.0 += 1.0 / (RRF_K + rank as f64);
// Only count the first occurrence per list to prevent duplicates
// from inflating the score.
if entry.1.is_none() {
entry.0 += 1.0 / (RRF_K + rank as f64);
entry.1 = Some(rank);
}
}
@@ -42,8 +44,8 @@ pub fn rank_rrf(vector_results: &[(i64, f64)], fts_results: &[(i64, f64)]) -> Ve
for (i, &(doc_id, _)) in fts_results.iter().enumerate() {
let rank = i + 1; // 1-indexed
let entry = scores.entry(doc_id).or_insert((0.0, None, None));
entry.0 += 1.0 / (RRF_K + rank as f64);
if entry.2.is_none() {
entry.0 += 1.0 / (RRF_K + rank as f64);
entry.2 = Some(rank);
}
}