Compare commits
3 Commits
a65ea2f56f
...
925ec9f574
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
925ec9f574 | ||
|
|
1fdc6d03cc | ||
|
|
266ed78e73 |
@@ -418,7 +418,11 @@ async fn check_ollama(config: Option<&Config>) -> OllamaCheck {
|
||||
.map(|m| m.name.split(':').next().unwrap_or(&m.name))
|
||||
.collect();
|
||||
|
||||
if !model_names.iter().any(|m| *m == model) {
|
||||
// Strip tag from configured model name too (e.g.
|
||||
// "nomic-embed-text:v1.5" → "nomic-embed-text") so both
|
||||
// sides are compared at the same granularity.
|
||||
let model_base = model.split(':').next().unwrap_or(model);
|
||||
if !model_names.contains(&model_base) {
|
||||
return OllamaCheck {
|
||||
result: CheckResult {
|
||||
status: CheckStatus::Warning,
|
||||
|
||||
@@ -19,10 +19,13 @@ pub struct EmbedCommandResult {
|
||||
}
|
||||
|
||||
/// Run the embed command.
|
||||
///
|
||||
/// `progress_callback` reports `(processed, total)` as documents are embedded.
|
||||
pub async fn run_embed(
|
||||
config: &Config,
|
||||
full: bool,
|
||||
retry_failed: bool,
|
||||
progress_callback: Option<Box<dyn Fn(usize, usize)>>,
|
||||
) -> Result<EmbedCommandResult> {
|
||||
let db_path = get_db_path(config.storage.db_path.as_deref());
|
||||
let conn = create_connection(&db_path)?;
|
||||
@@ -58,7 +61,7 @@ pub async fn run_embed(
|
||||
}
|
||||
|
||||
let model_name = &config.embedding.model;
|
||||
let result = embed_documents(&conn, &client, model_name, None).await?;
|
||||
let result = embed_documents(&conn, &client, model_name, progress_callback).await?;
|
||||
|
||||
Ok(EmbedCommandResult {
|
||||
embedded: result.embedded,
|
||||
|
||||
@@ -28,10 +28,13 @@ pub struct GenerateDocsResult {
|
||||
///
|
||||
/// Default mode: process only existing dirty_sources entries.
|
||||
/// Full mode: seed dirty_sources with ALL entities, then drain.
|
||||
///
|
||||
/// `progress_callback` reports `(processed, estimated_total)` as documents are generated.
|
||||
pub fn run_generate_docs(
|
||||
config: &Config,
|
||||
full: bool,
|
||||
project_filter: Option<&str>,
|
||||
progress_callback: Option<Box<dyn Fn(usize, usize)>>,
|
||||
) -> Result<GenerateDocsResult> {
|
||||
let db_path = get_db_path(config.storage.db_path.as_deref());
|
||||
let conn = create_connection(&db_path)?;
|
||||
@@ -46,7 +49,8 @@ pub fn run_generate_docs(
|
||||
result.seeded += seed_dirty(&conn, SourceType::Discussion, project_filter)?;
|
||||
}
|
||||
|
||||
let regen = regenerate_dirty_documents(&conn)?;
|
||||
let regen =
|
||||
regenerate_dirty_documents(&conn, progress_callback.as_ref().map(|cb| cb.as_ref()))?;
|
||||
result.regenerated = regen.regenerated;
|
||||
result.unchanged = regen.unchanged;
|
||||
result.errored = regen.errored;
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
//! Ingest command - fetch data from GitLab.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
use console::style;
|
||||
use indicatif::{ProgressBar, ProgressStyle};
|
||||
use rusqlite::Connection;
|
||||
@@ -106,6 +109,9 @@ impl IngestDisplay {
|
||||
}
|
||||
|
||||
/// Run the ingest command.
|
||||
///
|
||||
/// `stage_bar` is an optional `ProgressBar` (typically from sync's stage spinner)
|
||||
/// that will be updated with aggregate progress across all projects.
|
||||
pub async fn run_ingest(
|
||||
config: &Config,
|
||||
resource_type: &str,
|
||||
@@ -113,12 +119,21 @@ pub async fn run_ingest(
|
||||
force: bool,
|
||||
full: bool,
|
||||
display: IngestDisplay,
|
||||
stage_bar: Option<ProgressBar>,
|
||||
) -> Result<IngestResult> {
|
||||
let run_id = uuid::Uuid::new_v4().simple().to_string();
|
||||
let run_id = &run_id[..8];
|
||||
let span = tracing::info_span!("ingest", %run_id, %resource_type);
|
||||
|
||||
run_ingest_inner(config, resource_type, project_filter, force, full, display)
|
||||
run_ingest_inner(
|
||||
config,
|
||||
resource_type,
|
||||
project_filter,
|
||||
force,
|
||||
full,
|
||||
display,
|
||||
stage_bar,
|
||||
)
|
||||
.instrument(span)
|
||||
.await
|
||||
}
|
||||
@@ -131,6 +146,7 @@ async fn run_ingest_inner(
|
||||
force: bool,
|
||||
full: bool,
|
||||
display: IngestDisplay,
|
||||
stage_bar: Option<ProgressBar>,
|
||||
) -> Result<IngestResult> {
|
||||
// Validate resource type early
|
||||
if resource_type != "issues" && resource_type != "mrs" {
|
||||
@@ -237,6 +253,14 @@ async fn run_ingest_inner(
|
||||
let concurrency = config.sync.primary_concurrency as usize;
|
||||
let resource_type_owned = resource_type.to_string();
|
||||
|
||||
// Aggregate counters for stage_bar updates (shared across concurrent projects)
|
||||
let agg_fetched = Arc::new(AtomicUsize::new(0));
|
||||
let agg_discussions = Arc::new(AtomicUsize::new(0));
|
||||
let agg_disc_total = Arc::new(AtomicUsize::new(0));
|
||||
let agg_events = Arc::new(AtomicUsize::new(0));
|
||||
let agg_events_total = Arc::new(AtomicUsize::new(0));
|
||||
let stage_bar = stage_bar.unwrap_or_else(ProgressBar::hidden);
|
||||
|
||||
use futures::stream::{self, StreamExt};
|
||||
|
||||
let project_results: Vec<Result<ProjectIngestOutcome>> = stream::iter(projects.iter())
|
||||
@@ -248,6 +272,12 @@ async fn run_ingest_inner(
|
||||
let path = path.clone();
|
||||
let local_project_id = *local_project_id;
|
||||
let gitlab_project_id = *gitlab_project_id;
|
||||
let stage_bar = stage_bar.clone();
|
||||
let agg_fetched = Arc::clone(&agg_fetched);
|
||||
let agg_discussions = Arc::clone(&agg_discussions);
|
||||
let agg_disc_total = Arc::clone(&agg_disc_total);
|
||||
let agg_events = Arc::clone(&agg_events);
|
||||
let agg_events_total = Arc::clone(&agg_events_total);
|
||||
|
||||
async move {
|
||||
let proj_conn = create_connection(&db_path)?;
|
||||
@@ -286,28 +316,70 @@ async fn run_ingest_inner(
|
||||
|
||||
let spinner_clone = spinner.clone();
|
||||
let disc_bar_clone = disc_bar.clone();
|
||||
let stage_bar_clone = stage_bar.clone();
|
||||
let agg_fetched_clone = Arc::clone(&agg_fetched);
|
||||
let agg_discussions_clone = Arc::clone(&agg_discussions);
|
||||
let agg_disc_total_clone = Arc::clone(&agg_disc_total);
|
||||
let agg_events_clone = Arc::clone(&agg_events);
|
||||
let agg_events_total_clone = Arc::clone(&agg_events_total);
|
||||
let path_for_cb = path.clone();
|
||||
let progress_callback: crate::ingestion::ProgressCallback = if !display.show_progress {
|
||||
Box::new(|_| {})
|
||||
} else {
|
||||
Box::new(move |event: ProgressEvent| match event {
|
||||
ProgressEvent::IssuesFetchStarted | ProgressEvent::MrsFetchStarted => {
|
||||
// Spinner already showing fetch message
|
||||
}
|
||||
ProgressEvent::IssuesFetchComplete { total } | ProgressEvent::MrsFetchComplete { total } => {
|
||||
let agg = agg_fetched_clone.fetch_add(total, Ordering::Relaxed) + total;
|
||||
spinner_clone.set_message(format!(
|
||||
"{path_for_cb}: {total} {type_label} fetched"
|
||||
));
|
||||
stage_bar_clone.set_message(format!(
|
||||
"Fetching {type_label}... ({agg} fetched across projects)"
|
||||
));
|
||||
}
|
||||
ProgressEvent::IssueFetched { count } | ProgressEvent::MrFetched { count } => {
|
||||
spinner_clone.set_message(format!(
|
||||
"{path_for_cb}: {count} fetched so far..."
|
||||
));
|
||||
}
|
||||
ProgressEvent::DiscussionSyncStarted { total } => {
|
||||
spinner_clone.finish_and_clear();
|
||||
let agg_total = agg_disc_total_clone.fetch_add(total, Ordering::Relaxed) + total;
|
||||
disc_bar_clone.set_length(total as u64);
|
||||
disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(100));
|
||||
stage_bar_clone.set_message(format!(
|
||||
"Syncing discussions... (0/{agg_total})"
|
||||
));
|
||||
}
|
||||
ProgressEvent::DiscussionSynced { current, total: _ } => {
|
||||
disc_bar_clone.set_position(current as u64);
|
||||
let agg = agg_discussions_clone.fetch_add(1, Ordering::Relaxed) + 1;
|
||||
let agg_total = agg_disc_total_clone.load(Ordering::Relaxed);
|
||||
stage_bar_clone.set_message(format!(
|
||||
"Syncing discussions... ({agg}/{agg_total})"
|
||||
));
|
||||
}
|
||||
ProgressEvent::DiscussionSyncComplete => {
|
||||
disc_bar_clone.finish_and_clear();
|
||||
}
|
||||
ProgressEvent::MrDiscussionSyncStarted { total } => {
|
||||
spinner_clone.finish_and_clear();
|
||||
let agg_total = agg_disc_total_clone.fetch_add(total, Ordering::Relaxed) + total;
|
||||
disc_bar_clone.set_length(total as u64);
|
||||
disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(100));
|
||||
stage_bar_clone.set_message(format!(
|
||||
"Syncing discussions... (0/{agg_total})"
|
||||
));
|
||||
}
|
||||
ProgressEvent::MrDiscussionSynced { current, total: _ } => {
|
||||
disc_bar_clone.set_position(current as u64);
|
||||
let agg = agg_discussions_clone.fetch_add(1, Ordering::Relaxed) + 1;
|
||||
let agg_total = agg_disc_total_clone.load(Ordering::Relaxed);
|
||||
stage_bar_clone.set_message(format!(
|
||||
"Syncing discussions... ({agg}/{agg_total})"
|
||||
));
|
||||
}
|
||||
ProgressEvent::MrDiscussionSyncComplete => {
|
||||
disc_bar_clone.finish_and_clear();
|
||||
@@ -322,14 +394,22 @@ async fn run_ingest_inner(
|
||||
.progress_chars("=> "),
|
||||
);
|
||||
disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(100));
|
||||
agg_events_total_clone.fetch_add(total, Ordering::Relaxed);
|
||||
stage_bar_clone.set_message(
|
||||
"Fetching resource events...".to_string()
|
||||
);
|
||||
}
|
||||
ProgressEvent::ResourceEventFetched { current, total: _ } => {
|
||||
disc_bar_clone.set_position(current as u64);
|
||||
let agg = agg_events_clone.fetch_add(1, Ordering::Relaxed) + 1;
|
||||
let agg_total = agg_events_total_clone.load(Ordering::Relaxed);
|
||||
stage_bar_clone.set_message(format!(
|
||||
"Fetching resource events... ({agg}/{agg_total})"
|
||||
));
|
||||
}
|
||||
ProgressEvent::ResourceEventsFetchComplete { .. } => {
|
||||
disc_bar_clone.finish_and_clear();
|
||||
}
|
||||
_ => {}
|
||||
})
|
||||
};
|
||||
|
||||
|
||||
@@ -40,6 +40,9 @@ pub struct SyncResult {
|
||||
}
|
||||
|
||||
/// Create a styled spinner for a sync stage.
|
||||
///
|
||||
/// Uses `{prefix}` for the `[N/M]` stage label so callers can update `{msg}`
|
||||
/// independently without losing the stage context.
|
||||
fn stage_spinner(stage: u8, total: u8, msg: &str, robot_mode: bool) -> ProgressBar {
|
||||
if robot_mode {
|
||||
return ProgressBar::hidden();
|
||||
@@ -47,11 +50,12 @@ fn stage_spinner(stage: u8, total: u8, msg: &str, robot_mode: bool) -> ProgressB
|
||||
let pb = crate::cli::progress::multi().add(ProgressBar::new_spinner());
|
||||
pb.set_style(
|
||||
ProgressStyle::default_spinner()
|
||||
.template("{spinner:.blue} {msg}")
|
||||
.template("{spinner:.blue} {prefix} {msg}")
|
||||
.expect("valid template"),
|
||||
);
|
||||
pb.enable_steady_tick(std::time::Duration::from_millis(80));
|
||||
pb.set_message(format!("[{stage}/{total}] {msg}"));
|
||||
pb.set_prefix(format!("[{stage}/{total}]"));
|
||||
pb.set_message(msg.to_string());
|
||||
pb
|
||||
}
|
||||
|
||||
@@ -112,6 +116,7 @@ pub async fn run_sync(
|
||||
options.force,
|
||||
options.full,
|
||||
ingest_display,
|
||||
Some(spinner.clone()),
|
||||
)
|
||||
.await?;
|
||||
result.issues_updated = issues_result.issues_upserted;
|
||||
@@ -136,6 +141,7 @@ pub async fn run_sync(
|
||||
options.force,
|
||||
options.full,
|
||||
ingest_display,
|
||||
Some(spinner.clone()),
|
||||
)
|
||||
.await?;
|
||||
result.mrs_updated = mrs_result.mrs_upserted;
|
||||
@@ -154,7 +160,15 @@ pub async fn run_sync(
|
||||
options.robot_mode,
|
||||
);
|
||||
info!("Sync stage {current_stage}/{total_stages}: generating documents");
|
||||
let docs_result = run_generate_docs(config, false, None)?;
|
||||
let docs_spinner = spinner.clone();
|
||||
let docs_cb: Box<dyn Fn(usize, usize)> = Box::new(move |processed, total| {
|
||||
if total > 0 {
|
||||
docs_spinner.set_message(format!(
|
||||
"Processing documents... ({processed}/{total})"
|
||||
));
|
||||
}
|
||||
});
|
||||
let docs_result = run_generate_docs(config, false, None, Some(docs_cb))?;
|
||||
result.documents_regenerated = docs_result.regenerated;
|
||||
spinner.finish_and_clear();
|
||||
} else {
|
||||
@@ -171,7 +185,13 @@ pub async fn run_sync(
|
||||
options.robot_mode,
|
||||
);
|
||||
info!("Sync stage {current_stage}/{total_stages}: embedding documents");
|
||||
match run_embed(config, options.full, false).await {
|
||||
let embed_spinner = spinner.clone();
|
||||
let embed_cb: Box<dyn Fn(usize, usize)> = Box::new(move |processed, total| {
|
||||
embed_spinner.set_message(format!(
|
||||
"Embedding documents... ({processed}/{total})"
|
||||
));
|
||||
});
|
||||
match run_embed(config, options.full, false, Some(embed_cb)).await {
|
||||
Ok(embed_result) => {
|
||||
result.documents_embedded = embed_result.embedded;
|
||||
spinner.finish_and_clear();
|
||||
|
||||
@@ -122,28 +122,30 @@ pub fn complete_job(conn: &Connection, job_id: i64) -> Result<()> {
|
||||
/// Mark a job as failed. Increments attempts, sets next_retry_at with exponential
|
||||
/// backoff, clears locked_at, and records the error.
|
||||
///
|
||||
/// Backoff: 30s * 2^(attempts-1), capped at 480s.
|
||||
/// Backoff: 30s * 2^(attempts), capped at 480s. Uses a single atomic UPDATE
|
||||
/// to avoid a read-then-write race on the `attempts` counter.
|
||||
pub fn fail_job(conn: &Connection, job_id: i64, error: &str) -> Result<()> {
|
||||
let now = now_ms();
|
||||
|
||||
// Get current attempts (propagate error if job no longer exists)
|
||||
let current_attempts: i32 = conn.query_row(
|
||||
"SELECT attempts FROM pending_dependent_fetches WHERE id = ?1",
|
||||
rusqlite::params![job_id],
|
||||
|row| row.get(0),
|
||||
)?;
|
||||
|
||||
let new_attempts = current_attempts + 1;
|
||||
let backoff_ms: i64 = (30_000i64 * (1i64 << (new_attempts - 1).min(4))).min(480_000);
|
||||
let next_retry = now + backoff_ms;
|
||||
|
||||
conn.execute(
|
||||
// Atomic increment + backoff calculation in one UPDATE.
|
||||
// MIN(attempts, 4) caps the shift to prevent overflow; the overall
|
||||
// backoff is clamped to 480 000 ms via MIN(..., 480000).
|
||||
let changes = conn.execute(
|
||||
"UPDATE pending_dependent_fetches
|
||||
SET attempts = ?1, next_retry_at = ?2, locked_at = NULL, last_error = ?3
|
||||
WHERE id = ?4",
|
||||
rusqlite::params![new_attempts, next_retry, error, job_id],
|
||||
SET attempts = attempts + 1,
|
||||
next_retry_at = ?1 + MIN(30000 * (1 << MIN(attempts, 4)), 480000),
|
||||
locked_at = NULL,
|
||||
last_error = ?2
|
||||
WHERE id = ?3",
|
||||
rusqlite::params![now, error, job_id],
|
||||
)?;
|
||||
|
||||
if changes == 0 {
|
||||
return Err(crate::core::error::LoreError::Other(
|
||||
"fail_job: job not found (may have been reclaimed or completed)".into(),
|
||||
));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -21,16 +21,37 @@ pub struct RegenerateResult {
|
||||
///
|
||||
/// Uses per-item error handling (fail-soft) and drains the queue completely
|
||||
/// via a bounded batch loop. Each dirty item is processed independently.
|
||||
#[instrument(skip(conn), fields(items_processed, items_skipped, errors))]
|
||||
pub fn regenerate_dirty_documents(conn: &Connection) -> Result<RegenerateResult> {
|
||||
///
|
||||
/// `progress_callback` reports `(processed, estimated_total)` after each item.
|
||||
#[instrument(
|
||||
skip(conn, progress_callback),
|
||||
fields(items_processed, items_skipped, errors)
|
||||
)]
|
||||
pub fn regenerate_dirty_documents(
|
||||
conn: &Connection,
|
||||
progress_callback: Option<&dyn Fn(usize, usize)>,
|
||||
) -> Result<RegenerateResult> {
|
||||
let mut result = RegenerateResult::default();
|
||||
|
||||
// Estimated total for progress reporting. Recount each loop iteration
|
||||
// so the denominator grows if new items are enqueued during processing
|
||||
// (the queue can grow while we drain it). We use max() so the value
|
||||
// never shrinks — preventing the progress fraction from going backwards.
|
||||
let mut estimated_total: usize = 0;
|
||||
|
||||
loop {
|
||||
let dirty = get_dirty_sources(conn)?;
|
||||
if dirty.is_empty() {
|
||||
break;
|
||||
}
|
||||
|
||||
// Recount remaining + already-processed to get the true total.
|
||||
let remaining: usize = conn
|
||||
.query_row("SELECT COUNT(*) FROM dirty_sources", [], |row| row.get(0))
|
||||
.unwrap_or(0_i64) as usize;
|
||||
let processed_so_far = result.regenerated + result.unchanged + result.errored;
|
||||
estimated_total = estimated_total.max(processed_so_far + remaining);
|
||||
|
||||
for (source_type, source_id) in &dirty {
|
||||
match regenerate_one(conn, *source_type, *source_id) {
|
||||
Ok(changed) => {
|
||||
@@ -52,6 +73,11 @@ pub fn regenerate_dirty_documents(conn: &Connection) -> Result<RegenerateResult>
|
||||
result.errored += 1;
|
||||
}
|
||||
}
|
||||
|
||||
let processed = result.regenerated + result.unchanged + result.errored;
|
||||
if let Some(cb) = progress_callback {
|
||||
cb(processed, estimated_total);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -123,7 +149,9 @@ fn upsert_document(conn: &Connection, doc: &DocumentData) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = conn.execute_batch("ROLLBACK TO upsert_doc");
|
||||
// ROLLBACK TO restores the savepoint but leaves it active.
|
||||
// RELEASE removes it so the connection is clean for the next call.
|
||||
let _ = conn.execute_batch("ROLLBACK TO upsert_doc; RELEASE upsert_doc");
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
@@ -358,7 +386,7 @@ mod tests {
|
||||
).unwrap();
|
||||
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
|
||||
|
||||
let result = regenerate_dirty_documents(&conn).unwrap();
|
||||
let result = regenerate_dirty_documents(&conn, None).unwrap();
|
||||
assert_eq!(result.regenerated, 1);
|
||||
assert_eq!(result.unchanged, 0);
|
||||
assert_eq!(result.errored, 0);
|
||||
@@ -385,12 +413,12 @@ mod tests {
|
||||
|
||||
// First regeneration creates the document
|
||||
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
|
||||
let r1 = regenerate_dirty_documents(&conn).unwrap();
|
||||
let r1 = regenerate_dirty_documents(&conn, None).unwrap();
|
||||
assert_eq!(r1.regenerated, 1);
|
||||
|
||||
// Second regeneration — same data, should be unchanged
|
||||
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
|
||||
let r2 = regenerate_dirty_documents(&conn).unwrap();
|
||||
let r2 = regenerate_dirty_documents(&conn, None).unwrap();
|
||||
assert_eq!(r2.unchanged, 1);
|
||||
assert_eq!(r2.regenerated, 0);
|
||||
}
|
||||
@@ -403,7 +431,7 @@ mod tests {
|
||||
[],
|
||||
).unwrap();
|
||||
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
|
||||
regenerate_dirty_documents(&conn).unwrap();
|
||||
regenerate_dirty_documents(&conn, None).unwrap();
|
||||
|
||||
// Delete the issue and re-mark dirty
|
||||
conn.execute("PRAGMA foreign_keys = OFF", []).unwrap();
|
||||
@@ -411,7 +439,7 @@ mod tests {
|
||||
conn.execute("PRAGMA foreign_keys = ON", []).unwrap();
|
||||
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
|
||||
|
||||
let result = regenerate_dirty_documents(&conn).unwrap();
|
||||
let result = regenerate_dirty_documents(&conn, None).unwrap();
|
||||
assert_eq!(result.regenerated, 1); // Deletion counts as "changed"
|
||||
|
||||
let count: i64 = conn
|
||||
@@ -431,7 +459,7 @@ mod tests {
|
||||
mark_dirty(&conn, SourceType::Issue, i).unwrap();
|
||||
}
|
||||
|
||||
let result = regenerate_dirty_documents(&conn).unwrap();
|
||||
let result = regenerate_dirty_documents(&conn, None).unwrap();
|
||||
assert_eq!(result.regenerated, 10);
|
||||
|
||||
// Queue should be empty
|
||||
@@ -459,11 +487,11 @@ mod tests {
|
||||
|
||||
// First run creates document
|
||||
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
|
||||
regenerate_dirty_documents(&conn).unwrap();
|
||||
regenerate_dirty_documents(&conn, None).unwrap();
|
||||
|
||||
// Second run — triple hash match, should skip ALL writes
|
||||
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
|
||||
let result = regenerate_dirty_documents(&conn).unwrap();
|
||||
let result = regenerate_dirty_documents(&conn, None).unwrap();
|
||||
assert_eq!(result.unchanged, 1);
|
||||
|
||||
// Labels should still be present (not deleted and re-inserted)
|
||||
|
||||
@@ -66,19 +66,76 @@ pub async fn embed_documents(
|
||||
// process crashes mid-page, the savepoint is never released and
|
||||
// SQLite rolls back — preventing partial document states where old
|
||||
// embeddings are cleared but new ones haven't been written yet.
|
||||
//
|
||||
// We use a closure + match to ensure the savepoint is always
|
||||
// rolled back on error — bare `execute_batch("SAVEPOINT")` with `?`
|
||||
// propagation would leak the savepoint and leave the connection in
|
||||
// a broken transactional state.
|
||||
conn.execute_batch("SAVEPOINT embed_page")?;
|
||||
let page_result = embed_page(
|
||||
conn,
|
||||
client,
|
||||
model_name,
|
||||
&pending,
|
||||
&mut result,
|
||||
&mut last_id,
|
||||
&mut processed,
|
||||
total,
|
||||
&progress_callback,
|
||||
)
|
||||
.await;
|
||||
match page_result {
|
||||
Ok(()) => {
|
||||
conn.execute_batch("RELEASE embed_page")?;
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = conn.execute_batch("ROLLBACK TO embed_page; RELEASE embed_page");
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
embedded = result.embedded,
|
||||
failed = result.failed,
|
||||
skipped = result.skipped,
|
||||
"Embedding pipeline complete"
|
||||
);
|
||||
|
||||
tracing::Span::current().record("items_processed", result.embedded);
|
||||
tracing::Span::current().record("items_skipped", result.skipped);
|
||||
tracing::Span::current().record("errors", result.failed);
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Process a single page of pending documents within an active savepoint.
|
||||
///
|
||||
/// All `?` propagation from this function is caught by the caller, which
|
||||
/// rolls back the savepoint on error.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn embed_page(
|
||||
conn: &Connection,
|
||||
client: &OllamaClient,
|
||||
model_name: &str,
|
||||
pending: &[crate::embedding::change_detector::PendingDocument],
|
||||
result: &mut EmbedResult,
|
||||
last_id: &mut i64,
|
||||
processed: &mut usize,
|
||||
total: usize,
|
||||
progress_callback: &Option<Box<dyn Fn(usize, usize)>>,
|
||||
) -> Result<()> {
|
||||
// Build chunk work items for this page
|
||||
let mut all_chunks: Vec<ChunkWork> = Vec::new();
|
||||
let mut page_normal_docs: usize = 0;
|
||||
|
||||
for doc in &pending {
|
||||
for doc in pending {
|
||||
// Always advance the cursor, even for skipped docs, to avoid re-fetching
|
||||
last_id = doc.document_id;
|
||||
*last_id = doc.document_id;
|
||||
|
||||
if doc.content_text.is_empty() {
|
||||
result.skipped += 1;
|
||||
processed += 1;
|
||||
*processed += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -110,9 +167,9 @@ pub async fn embed_documents(
|
||||
),
|
||||
)?;
|
||||
result.skipped += 1;
|
||||
processed += 1;
|
||||
if let Some(ref cb) = progress_callback {
|
||||
cb(processed, total);
|
||||
*processed += 1;
|
||||
if let Some(cb) = progress_callback {
|
||||
cb(*processed, total);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
@@ -212,9 +269,7 @@ pub async fn embed_documents(
|
||||
|| (err_lower.contains("413") && err_lower.contains("http"));
|
||||
|
||||
if is_context_error && batch.len() > 1 {
|
||||
warn!(
|
||||
"Batch failed with context length error, retrying chunks individually"
|
||||
);
|
||||
warn!("Batch failed with context length error, retrying chunks individually");
|
||||
for chunk in batch {
|
||||
match client.embed_batch(vec![chunk.text.clone()]).await {
|
||||
Ok(embeddings)
|
||||
@@ -280,27 +335,12 @@ pub async fn embed_documents(
|
||||
|
||||
// Fire progress for all normal documents after embedding completes.
|
||||
// This ensures progress reflects actual embedding work, not just chunking.
|
||||
processed += page_normal_docs;
|
||||
if let Some(ref cb) = progress_callback {
|
||||
cb(processed, total);
|
||||
*processed += page_normal_docs;
|
||||
if let Some(cb) = progress_callback {
|
||||
cb(*processed, total);
|
||||
}
|
||||
|
||||
// Commit all DB writes for this page atomically.
|
||||
conn.execute_batch("RELEASE embed_page")?;
|
||||
}
|
||||
|
||||
info!(
|
||||
embedded = result.embedded,
|
||||
failed = result.failed,
|
||||
skipped = result.skipped,
|
||||
"Embedding pipeline complete"
|
||||
);
|
||||
|
||||
tracing::Span::current().record("items_processed", result.embedded);
|
||||
tracing::Span::current().record("items_skipped", result.skipped);
|
||||
tracing::Span::current().record("errors", result.failed);
|
||||
|
||||
Ok(result)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Clear all embeddings and metadata for a document.
|
||||
|
||||
@@ -122,6 +122,7 @@ impl GitLabClient {
|
||||
/// Make an authenticated API request with automatic 429 retry.
|
||||
async fn request<T: serde::de::DeserializeOwned>(&self, path: &str) -> Result<T> {
|
||||
let url = format!("{}{}", self.base_url, path);
|
||||
let mut last_response = None;
|
||||
|
||||
for attempt in 0..=Self::MAX_RETRIES {
|
||||
let delay = self.rate_limiter.lock().await.check_delay();
|
||||
@@ -155,10 +156,15 @@ impl GitLabClient {
|
||||
continue;
|
||||
}
|
||||
|
||||
return self.handle_response(response, path).await;
|
||||
last_response = Some(response);
|
||||
break;
|
||||
}
|
||||
|
||||
unreachable!("loop always returns")
|
||||
// Safety: the loop always executes at least once (0..=MAX_RETRIES)
|
||||
// and either sets last_response+break, or continues (only when
|
||||
// attempt < MAX_RETRIES). The final iteration always reaches break.
|
||||
self.handle_response(last_response.expect("retry loop ran at least once"), path)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Parse retry-after header from a 429 response, defaulting to 60s.
|
||||
@@ -543,6 +549,7 @@ impl GitLabClient {
|
||||
params: &[(&str, String)],
|
||||
) -> Result<(T, HeaderMap)> {
|
||||
let url = format!("{}{}", self.base_url, path);
|
||||
let mut last_response = None;
|
||||
|
||||
for attempt in 0..=Self::MAX_RETRIES {
|
||||
let delay = self.rate_limiter.lock().await.check_delay();
|
||||
@@ -577,12 +584,14 @@ impl GitLabClient {
|
||||
continue;
|
||||
}
|
||||
|
||||
let headers = response.headers().clone();
|
||||
let body = self.handle_response(response, path).await?;
|
||||
return Ok((body, headers));
|
||||
last_response = Some(response);
|
||||
break;
|
||||
}
|
||||
|
||||
unreachable!("loop always returns")
|
||||
let response = last_response.expect("retry loop ran at least once");
|
||||
let headers = response.headers().clone();
|
||||
let body = self.handle_response(response, path).await?;
|
||||
Ok((body, headers))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -501,6 +501,7 @@ async fn handle_ingest(
|
||||
force,
|
||||
full,
|
||||
display,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -527,6 +528,7 @@ async fn handle_ingest(
|
||||
force,
|
||||
full,
|
||||
display,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -537,6 +539,7 @@ async fn handle_ingest(
|
||||
force,
|
||||
full,
|
||||
display,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -1225,7 +1228,7 @@ async fn handle_generate_docs(
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let config = Config::load(config_override)?;
|
||||
|
||||
let result = run_generate_docs(&config, args.full, args.project.as_deref())?;
|
||||
let result = run_generate_docs(&config, args.full, args.project.as_deref(), None)?;
|
||||
if robot_mode {
|
||||
print_generate_docs_json(&result);
|
||||
} else {
|
||||
@@ -1242,7 +1245,7 @@ async fn handle_embed(
|
||||
let config = Config::load(config_override)?;
|
||||
let full = args.full && !args.no_full;
|
||||
let retry_failed = args.retry_failed && !args.no_retry_failed;
|
||||
let result = run_embed(&config, full, retry_failed).await?;
|
||||
let result = run_embed(&config, full, retry_failed, None).await?;
|
||||
if robot_mode {
|
||||
print_embed_json(&result);
|
||||
} else {
|
||||
|
||||
@@ -33,8 +33,10 @@ pub fn rank_rrf(vector_results: &[(i64, f64)], fts_results: &[(i64, f64)]) -> Ve
|
||||
for (i, &(doc_id, _)) in vector_results.iter().enumerate() {
|
||||
let rank = i + 1; // 1-indexed
|
||||
let entry = scores.entry(doc_id).or_insert((0.0, None, None));
|
||||
entry.0 += 1.0 / (RRF_K + rank as f64);
|
||||
// Only count the first occurrence per list to prevent duplicates
|
||||
// from inflating the score.
|
||||
if entry.1.is_none() {
|
||||
entry.0 += 1.0 / (RRF_K + rank as f64);
|
||||
entry.1 = Some(rank);
|
||||
}
|
||||
}
|
||||
@@ -42,8 +44,8 @@ pub fn rank_rrf(vector_results: &[(i64, f64)], fts_results: &[(i64, f64)]) -> Ve
|
||||
for (i, &(doc_id, _)) in fts_results.iter().enumerate() {
|
||||
let rank = i + 1; // 1-indexed
|
||||
let entry = scores.entry(doc_id).or_insert((0.0, None, None));
|
||||
entry.0 += 1.0 / (RRF_K + rank as f64);
|
||||
if entry.2.is_none() {
|
||||
entry.0 += 1.0 / (RRF_K + rank as f64);
|
||||
entry.2 = Some(rank);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user