Compare commits
3 Commits
a65ea2f56f
...
925ec9f574
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
925ec9f574 | ||
|
|
1fdc6d03cc | ||
|
|
266ed78e73 |
@@ -418,7 +418,11 @@ async fn check_ollama(config: Option<&Config>) -> OllamaCheck {
|
|||||||
.map(|m| m.name.split(':').next().unwrap_or(&m.name))
|
.map(|m| m.name.split(':').next().unwrap_or(&m.name))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
if !model_names.iter().any(|m| *m == model) {
|
// Strip tag from configured model name too (e.g.
|
||||||
|
// "nomic-embed-text:v1.5" → "nomic-embed-text") so both
|
||||||
|
// sides are compared at the same granularity.
|
||||||
|
let model_base = model.split(':').next().unwrap_or(model);
|
||||||
|
if !model_names.contains(&model_base) {
|
||||||
return OllamaCheck {
|
return OllamaCheck {
|
||||||
result: CheckResult {
|
result: CheckResult {
|
||||||
status: CheckStatus::Warning,
|
status: CheckStatus::Warning,
|
||||||
|
|||||||
@@ -19,10 +19,13 @@ pub struct EmbedCommandResult {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Run the embed command.
|
/// Run the embed command.
|
||||||
|
///
|
||||||
|
/// `progress_callback` reports `(processed, total)` as documents are embedded.
|
||||||
pub async fn run_embed(
|
pub async fn run_embed(
|
||||||
config: &Config,
|
config: &Config,
|
||||||
full: bool,
|
full: bool,
|
||||||
retry_failed: bool,
|
retry_failed: bool,
|
||||||
|
progress_callback: Option<Box<dyn Fn(usize, usize)>>,
|
||||||
) -> Result<EmbedCommandResult> {
|
) -> Result<EmbedCommandResult> {
|
||||||
let db_path = get_db_path(config.storage.db_path.as_deref());
|
let db_path = get_db_path(config.storage.db_path.as_deref());
|
||||||
let conn = create_connection(&db_path)?;
|
let conn = create_connection(&db_path)?;
|
||||||
@@ -58,7 +61,7 @@ pub async fn run_embed(
|
|||||||
}
|
}
|
||||||
|
|
||||||
let model_name = &config.embedding.model;
|
let model_name = &config.embedding.model;
|
||||||
let result = embed_documents(&conn, &client, model_name, None).await?;
|
let result = embed_documents(&conn, &client, model_name, progress_callback).await?;
|
||||||
|
|
||||||
Ok(EmbedCommandResult {
|
Ok(EmbedCommandResult {
|
||||||
embedded: result.embedded,
|
embedded: result.embedded,
|
||||||
|
|||||||
@@ -28,10 +28,13 @@ pub struct GenerateDocsResult {
|
|||||||
///
|
///
|
||||||
/// Default mode: process only existing dirty_sources entries.
|
/// Default mode: process only existing dirty_sources entries.
|
||||||
/// Full mode: seed dirty_sources with ALL entities, then drain.
|
/// Full mode: seed dirty_sources with ALL entities, then drain.
|
||||||
|
///
|
||||||
|
/// `progress_callback` reports `(processed, estimated_total)` as documents are generated.
|
||||||
pub fn run_generate_docs(
|
pub fn run_generate_docs(
|
||||||
config: &Config,
|
config: &Config,
|
||||||
full: bool,
|
full: bool,
|
||||||
project_filter: Option<&str>,
|
project_filter: Option<&str>,
|
||||||
|
progress_callback: Option<Box<dyn Fn(usize, usize)>>,
|
||||||
) -> Result<GenerateDocsResult> {
|
) -> Result<GenerateDocsResult> {
|
||||||
let db_path = get_db_path(config.storage.db_path.as_deref());
|
let db_path = get_db_path(config.storage.db_path.as_deref());
|
||||||
let conn = create_connection(&db_path)?;
|
let conn = create_connection(&db_path)?;
|
||||||
@@ -46,7 +49,8 @@ pub fn run_generate_docs(
|
|||||||
result.seeded += seed_dirty(&conn, SourceType::Discussion, project_filter)?;
|
result.seeded += seed_dirty(&conn, SourceType::Discussion, project_filter)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let regen = regenerate_dirty_documents(&conn)?;
|
let regen =
|
||||||
|
regenerate_dirty_documents(&conn, progress_callback.as_ref().map(|cb| cb.as_ref()))?;
|
||||||
result.regenerated = regen.regenerated;
|
result.regenerated = regen.regenerated;
|
||||||
result.unchanged = regen.unchanged;
|
result.unchanged = regen.unchanged;
|
||||||
result.errored = regen.errored;
|
result.errored = regen.errored;
|
||||||
|
|||||||
@@ -1,5 +1,8 @@
|
|||||||
//! Ingest command - fetch data from GitLab.
|
//! Ingest command - fetch data from GitLab.
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
|
|
||||||
use console::style;
|
use console::style;
|
||||||
use indicatif::{ProgressBar, ProgressStyle};
|
use indicatif::{ProgressBar, ProgressStyle};
|
||||||
use rusqlite::Connection;
|
use rusqlite::Connection;
|
||||||
@@ -106,6 +109,9 @@ impl IngestDisplay {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Run the ingest command.
|
/// Run the ingest command.
|
||||||
|
///
|
||||||
|
/// `stage_bar` is an optional `ProgressBar` (typically from sync's stage spinner)
|
||||||
|
/// that will be updated with aggregate progress across all projects.
|
||||||
pub async fn run_ingest(
|
pub async fn run_ingest(
|
||||||
config: &Config,
|
config: &Config,
|
||||||
resource_type: &str,
|
resource_type: &str,
|
||||||
@@ -113,12 +119,21 @@ pub async fn run_ingest(
|
|||||||
force: bool,
|
force: bool,
|
||||||
full: bool,
|
full: bool,
|
||||||
display: IngestDisplay,
|
display: IngestDisplay,
|
||||||
|
stage_bar: Option<ProgressBar>,
|
||||||
) -> Result<IngestResult> {
|
) -> Result<IngestResult> {
|
||||||
let run_id = uuid::Uuid::new_v4().simple().to_string();
|
let run_id = uuid::Uuid::new_v4().simple().to_string();
|
||||||
let run_id = &run_id[..8];
|
let run_id = &run_id[..8];
|
||||||
let span = tracing::info_span!("ingest", %run_id, %resource_type);
|
let span = tracing::info_span!("ingest", %run_id, %resource_type);
|
||||||
|
|
||||||
run_ingest_inner(config, resource_type, project_filter, force, full, display)
|
run_ingest_inner(
|
||||||
|
config,
|
||||||
|
resource_type,
|
||||||
|
project_filter,
|
||||||
|
force,
|
||||||
|
full,
|
||||||
|
display,
|
||||||
|
stage_bar,
|
||||||
|
)
|
||||||
.instrument(span)
|
.instrument(span)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
@@ -131,6 +146,7 @@ async fn run_ingest_inner(
|
|||||||
force: bool,
|
force: bool,
|
||||||
full: bool,
|
full: bool,
|
||||||
display: IngestDisplay,
|
display: IngestDisplay,
|
||||||
|
stage_bar: Option<ProgressBar>,
|
||||||
) -> Result<IngestResult> {
|
) -> Result<IngestResult> {
|
||||||
// Validate resource type early
|
// Validate resource type early
|
||||||
if resource_type != "issues" && resource_type != "mrs" {
|
if resource_type != "issues" && resource_type != "mrs" {
|
||||||
@@ -237,6 +253,14 @@ async fn run_ingest_inner(
|
|||||||
let concurrency = config.sync.primary_concurrency as usize;
|
let concurrency = config.sync.primary_concurrency as usize;
|
||||||
let resource_type_owned = resource_type.to_string();
|
let resource_type_owned = resource_type.to_string();
|
||||||
|
|
||||||
|
// Aggregate counters for stage_bar updates (shared across concurrent projects)
|
||||||
|
let agg_fetched = Arc::new(AtomicUsize::new(0));
|
||||||
|
let agg_discussions = Arc::new(AtomicUsize::new(0));
|
||||||
|
let agg_disc_total = Arc::new(AtomicUsize::new(0));
|
||||||
|
let agg_events = Arc::new(AtomicUsize::new(0));
|
||||||
|
let agg_events_total = Arc::new(AtomicUsize::new(0));
|
||||||
|
let stage_bar = stage_bar.unwrap_or_else(ProgressBar::hidden);
|
||||||
|
|
||||||
use futures::stream::{self, StreamExt};
|
use futures::stream::{self, StreamExt};
|
||||||
|
|
||||||
let project_results: Vec<Result<ProjectIngestOutcome>> = stream::iter(projects.iter())
|
let project_results: Vec<Result<ProjectIngestOutcome>> = stream::iter(projects.iter())
|
||||||
@@ -248,6 +272,12 @@ async fn run_ingest_inner(
|
|||||||
let path = path.clone();
|
let path = path.clone();
|
||||||
let local_project_id = *local_project_id;
|
let local_project_id = *local_project_id;
|
||||||
let gitlab_project_id = *gitlab_project_id;
|
let gitlab_project_id = *gitlab_project_id;
|
||||||
|
let stage_bar = stage_bar.clone();
|
||||||
|
let agg_fetched = Arc::clone(&agg_fetched);
|
||||||
|
let agg_discussions = Arc::clone(&agg_discussions);
|
||||||
|
let agg_disc_total = Arc::clone(&agg_disc_total);
|
||||||
|
let agg_events = Arc::clone(&agg_events);
|
||||||
|
let agg_events_total = Arc::clone(&agg_events_total);
|
||||||
|
|
||||||
async move {
|
async move {
|
||||||
let proj_conn = create_connection(&db_path)?;
|
let proj_conn = create_connection(&db_path)?;
|
||||||
@@ -286,28 +316,70 @@ async fn run_ingest_inner(
|
|||||||
|
|
||||||
let spinner_clone = spinner.clone();
|
let spinner_clone = spinner.clone();
|
||||||
let disc_bar_clone = disc_bar.clone();
|
let disc_bar_clone = disc_bar.clone();
|
||||||
|
let stage_bar_clone = stage_bar.clone();
|
||||||
|
let agg_fetched_clone = Arc::clone(&agg_fetched);
|
||||||
|
let agg_discussions_clone = Arc::clone(&agg_discussions);
|
||||||
|
let agg_disc_total_clone = Arc::clone(&agg_disc_total);
|
||||||
|
let agg_events_clone = Arc::clone(&agg_events);
|
||||||
|
let agg_events_total_clone = Arc::clone(&agg_events_total);
|
||||||
|
let path_for_cb = path.clone();
|
||||||
let progress_callback: crate::ingestion::ProgressCallback = if !display.show_progress {
|
let progress_callback: crate::ingestion::ProgressCallback = if !display.show_progress {
|
||||||
Box::new(|_| {})
|
Box::new(|_| {})
|
||||||
} else {
|
} else {
|
||||||
Box::new(move |event: ProgressEvent| match event {
|
Box::new(move |event: ProgressEvent| match event {
|
||||||
|
ProgressEvent::IssuesFetchStarted | ProgressEvent::MrsFetchStarted => {
|
||||||
|
// Spinner already showing fetch message
|
||||||
|
}
|
||||||
|
ProgressEvent::IssuesFetchComplete { total } | ProgressEvent::MrsFetchComplete { total } => {
|
||||||
|
let agg = agg_fetched_clone.fetch_add(total, Ordering::Relaxed) + total;
|
||||||
|
spinner_clone.set_message(format!(
|
||||||
|
"{path_for_cb}: {total} {type_label} fetched"
|
||||||
|
));
|
||||||
|
stage_bar_clone.set_message(format!(
|
||||||
|
"Fetching {type_label}... ({agg} fetched across projects)"
|
||||||
|
));
|
||||||
|
}
|
||||||
|
ProgressEvent::IssueFetched { count } | ProgressEvent::MrFetched { count } => {
|
||||||
|
spinner_clone.set_message(format!(
|
||||||
|
"{path_for_cb}: {count} fetched so far..."
|
||||||
|
));
|
||||||
|
}
|
||||||
ProgressEvent::DiscussionSyncStarted { total } => {
|
ProgressEvent::DiscussionSyncStarted { total } => {
|
||||||
spinner_clone.finish_and_clear();
|
spinner_clone.finish_and_clear();
|
||||||
|
let agg_total = agg_disc_total_clone.fetch_add(total, Ordering::Relaxed) + total;
|
||||||
disc_bar_clone.set_length(total as u64);
|
disc_bar_clone.set_length(total as u64);
|
||||||
disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(100));
|
disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(100));
|
||||||
|
stage_bar_clone.set_message(format!(
|
||||||
|
"Syncing discussions... (0/{agg_total})"
|
||||||
|
));
|
||||||
}
|
}
|
||||||
ProgressEvent::DiscussionSynced { current, total: _ } => {
|
ProgressEvent::DiscussionSynced { current, total: _ } => {
|
||||||
disc_bar_clone.set_position(current as u64);
|
disc_bar_clone.set_position(current as u64);
|
||||||
|
let agg = agg_discussions_clone.fetch_add(1, Ordering::Relaxed) + 1;
|
||||||
|
let agg_total = agg_disc_total_clone.load(Ordering::Relaxed);
|
||||||
|
stage_bar_clone.set_message(format!(
|
||||||
|
"Syncing discussions... ({agg}/{agg_total})"
|
||||||
|
));
|
||||||
}
|
}
|
||||||
ProgressEvent::DiscussionSyncComplete => {
|
ProgressEvent::DiscussionSyncComplete => {
|
||||||
disc_bar_clone.finish_and_clear();
|
disc_bar_clone.finish_and_clear();
|
||||||
}
|
}
|
||||||
ProgressEvent::MrDiscussionSyncStarted { total } => {
|
ProgressEvent::MrDiscussionSyncStarted { total } => {
|
||||||
spinner_clone.finish_and_clear();
|
spinner_clone.finish_and_clear();
|
||||||
|
let agg_total = agg_disc_total_clone.fetch_add(total, Ordering::Relaxed) + total;
|
||||||
disc_bar_clone.set_length(total as u64);
|
disc_bar_clone.set_length(total as u64);
|
||||||
disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(100));
|
disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(100));
|
||||||
|
stage_bar_clone.set_message(format!(
|
||||||
|
"Syncing discussions... (0/{agg_total})"
|
||||||
|
));
|
||||||
}
|
}
|
||||||
ProgressEvent::MrDiscussionSynced { current, total: _ } => {
|
ProgressEvent::MrDiscussionSynced { current, total: _ } => {
|
||||||
disc_bar_clone.set_position(current as u64);
|
disc_bar_clone.set_position(current as u64);
|
||||||
|
let agg = agg_discussions_clone.fetch_add(1, Ordering::Relaxed) + 1;
|
||||||
|
let agg_total = agg_disc_total_clone.load(Ordering::Relaxed);
|
||||||
|
stage_bar_clone.set_message(format!(
|
||||||
|
"Syncing discussions... ({agg}/{agg_total})"
|
||||||
|
));
|
||||||
}
|
}
|
||||||
ProgressEvent::MrDiscussionSyncComplete => {
|
ProgressEvent::MrDiscussionSyncComplete => {
|
||||||
disc_bar_clone.finish_and_clear();
|
disc_bar_clone.finish_and_clear();
|
||||||
@@ -322,14 +394,22 @@ async fn run_ingest_inner(
|
|||||||
.progress_chars("=> "),
|
.progress_chars("=> "),
|
||||||
);
|
);
|
||||||
disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(100));
|
disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(100));
|
||||||
|
agg_events_total_clone.fetch_add(total, Ordering::Relaxed);
|
||||||
|
stage_bar_clone.set_message(
|
||||||
|
"Fetching resource events...".to_string()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
ProgressEvent::ResourceEventFetched { current, total: _ } => {
|
ProgressEvent::ResourceEventFetched { current, total: _ } => {
|
||||||
disc_bar_clone.set_position(current as u64);
|
disc_bar_clone.set_position(current as u64);
|
||||||
|
let agg = agg_events_clone.fetch_add(1, Ordering::Relaxed) + 1;
|
||||||
|
let agg_total = agg_events_total_clone.load(Ordering::Relaxed);
|
||||||
|
stage_bar_clone.set_message(format!(
|
||||||
|
"Fetching resource events... ({agg}/{agg_total})"
|
||||||
|
));
|
||||||
}
|
}
|
||||||
ProgressEvent::ResourceEventsFetchComplete { .. } => {
|
ProgressEvent::ResourceEventsFetchComplete { .. } => {
|
||||||
disc_bar_clone.finish_and_clear();
|
disc_bar_clone.finish_and_clear();
|
||||||
}
|
}
|
||||||
_ => {}
|
|
||||||
})
|
})
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -40,6 +40,9 @@ pub struct SyncResult {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Create a styled spinner for a sync stage.
|
/// Create a styled spinner for a sync stage.
|
||||||
|
///
|
||||||
|
/// Uses `{prefix}` for the `[N/M]` stage label so callers can update `{msg}`
|
||||||
|
/// independently without losing the stage context.
|
||||||
fn stage_spinner(stage: u8, total: u8, msg: &str, robot_mode: bool) -> ProgressBar {
|
fn stage_spinner(stage: u8, total: u8, msg: &str, robot_mode: bool) -> ProgressBar {
|
||||||
if robot_mode {
|
if robot_mode {
|
||||||
return ProgressBar::hidden();
|
return ProgressBar::hidden();
|
||||||
@@ -47,11 +50,12 @@ fn stage_spinner(stage: u8, total: u8, msg: &str, robot_mode: bool) -> ProgressB
|
|||||||
let pb = crate::cli::progress::multi().add(ProgressBar::new_spinner());
|
let pb = crate::cli::progress::multi().add(ProgressBar::new_spinner());
|
||||||
pb.set_style(
|
pb.set_style(
|
||||||
ProgressStyle::default_spinner()
|
ProgressStyle::default_spinner()
|
||||||
.template("{spinner:.blue} {msg}")
|
.template("{spinner:.blue} {prefix} {msg}")
|
||||||
.expect("valid template"),
|
.expect("valid template"),
|
||||||
);
|
);
|
||||||
pb.enable_steady_tick(std::time::Duration::from_millis(80));
|
pb.enable_steady_tick(std::time::Duration::from_millis(80));
|
||||||
pb.set_message(format!("[{stage}/{total}] {msg}"));
|
pb.set_prefix(format!("[{stage}/{total}]"));
|
||||||
|
pb.set_message(msg.to_string());
|
||||||
pb
|
pb
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -112,6 +116,7 @@ pub async fn run_sync(
|
|||||||
options.force,
|
options.force,
|
||||||
options.full,
|
options.full,
|
||||||
ingest_display,
|
ingest_display,
|
||||||
|
Some(spinner.clone()),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
result.issues_updated = issues_result.issues_upserted;
|
result.issues_updated = issues_result.issues_upserted;
|
||||||
@@ -136,6 +141,7 @@ pub async fn run_sync(
|
|||||||
options.force,
|
options.force,
|
||||||
options.full,
|
options.full,
|
||||||
ingest_display,
|
ingest_display,
|
||||||
|
Some(spinner.clone()),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
result.mrs_updated = mrs_result.mrs_upserted;
|
result.mrs_updated = mrs_result.mrs_upserted;
|
||||||
@@ -154,7 +160,15 @@ pub async fn run_sync(
|
|||||||
options.robot_mode,
|
options.robot_mode,
|
||||||
);
|
);
|
||||||
info!("Sync stage {current_stage}/{total_stages}: generating documents");
|
info!("Sync stage {current_stage}/{total_stages}: generating documents");
|
||||||
let docs_result = run_generate_docs(config, false, None)?;
|
let docs_spinner = spinner.clone();
|
||||||
|
let docs_cb: Box<dyn Fn(usize, usize)> = Box::new(move |processed, total| {
|
||||||
|
if total > 0 {
|
||||||
|
docs_spinner.set_message(format!(
|
||||||
|
"Processing documents... ({processed}/{total})"
|
||||||
|
));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
let docs_result = run_generate_docs(config, false, None, Some(docs_cb))?;
|
||||||
result.documents_regenerated = docs_result.regenerated;
|
result.documents_regenerated = docs_result.regenerated;
|
||||||
spinner.finish_and_clear();
|
spinner.finish_and_clear();
|
||||||
} else {
|
} else {
|
||||||
@@ -171,7 +185,13 @@ pub async fn run_sync(
|
|||||||
options.robot_mode,
|
options.robot_mode,
|
||||||
);
|
);
|
||||||
info!("Sync stage {current_stage}/{total_stages}: embedding documents");
|
info!("Sync stage {current_stage}/{total_stages}: embedding documents");
|
||||||
match run_embed(config, options.full, false).await {
|
let embed_spinner = spinner.clone();
|
||||||
|
let embed_cb: Box<dyn Fn(usize, usize)> = Box::new(move |processed, total| {
|
||||||
|
embed_spinner.set_message(format!(
|
||||||
|
"Embedding documents... ({processed}/{total})"
|
||||||
|
));
|
||||||
|
});
|
||||||
|
match run_embed(config, options.full, false, Some(embed_cb)).await {
|
||||||
Ok(embed_result) => {
|
Ok(embed_result) => {
|
||||||
result.documents_embedded = embed_result.embedded;
|
result.documents_embedded = embed_result.embedded;
|
||||||
spinner.finish_and_clear();
|
spinner.finish_and_clear();
|
||||||
|
|||||||
@@ -122,28 +122,30 @@ pub fn complete_job(conn: &Connection, job_id: i64) -> Result<()> {
|
|||||||
/// Mark a job as failed. Increments attempts, sets next_retry_at with exponential
|
/// Mark a job as failed. Increments attempts, sets next_retry_at with exponential
|
||||||
/// backoff, clears locked_at, and records the error.
|
/// backoff, clears locked_at, and records the error.
|
||||||
///
|
///
|
||||||
/// Backoff: 30s * 2^(attempts-1), capped at 480s.
|
/// Backoff: 30s * 2^(attempts), capped at 480s. Uses a single atomic UPDATE
|
||||||
|
/// to avoid a read-then-write race on the `attempts` counter.
|
||||||
pub fn fail_job(conn: &Connection, job_id: i64, error: &str) -> Result<()> {
|
pub fn fail_job(conn: &Connection, job_id: i64, error: &str) -> Result<()> {
|
||||||
let now = now_ms();
|
let now = now_ms();
|
||||||
|
|
||||||
// Get current attempts (propagate error if job no longer exists)
|
// Atomic increment + backoff calculation in one UPDATE.
|
||||||
let current_attempts: i32 = conn.query_row(
|
// MIN(attempts, 4) caps the shift to prevent overflow; the overall
|
||||||
"SELECT attempts FROM pending_dependent_fetches WHERE id = ?1",
|
// backoff is clamped to 480 000 ms via MIN(..., 480000).
|
||||||
rusqlite::params![job_id],
|
let changes = conn.execute(
|
||||||
|row| row.get(0),
|
|
||||||
)?;
|
|
||||||
|
|
||||||
let new_attempts = current_attempts + 1;
|
|
||||||
let backoff_ms: i64 = (30_000i64 * (1i64 << (new_attempts - 1).min(4))).min(480_000);
|
|
||||||
let next_retry = now + backoff_ms;
|
|
||||||
|
|
||||||
conn.execute(
|
|
||||||
"UPDATE pending_dependent_fetches
|
"UPDATE pending_dependent_fetches
|
||||||
SET attempts = ?1, next_retry_at = ?2, locked_at = NULL, last_error = ?3
|
SET attempts = attempts + 1,
|
||||||
WHERE id = ?4",
|
next_retry_at = ?1 + MIN(30000 * (1 << MIN(attempts, 4)), 480000),
|
||||||
rusqlite::params![new_attempts, next_retry, error, job_id],
|
locked_at = NULL,
|
||||||
|
last_error = ?2
|
||||||
|
WHERE id = ?3",
|
||||||
|
rusqlite::params![now, error, job_id],
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
if changes == 0 {
|
||||||
|
return Err(crate::core::error::LoreError::Other(
|
||||||
|
"fail_job: job not found (may have been reclaimed or completed)".into(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -21,16 +21,37 @@ pub struct RegenerateResult {
|
|||||||
///
|
///
|
||||||
/// Uses per-item error handling (fail-soft) and drains the queue completely
|
/// Uses per-item error handling (fail-soft) and drains the queue completely
|
||||||
/// via a bounded batch loop. Each dirty item is processed independently.
|
/// via a bounded batch loop. Each dirty item is processed independently.
|
||||||
#[instrument(skip(conn), fields(items_processed, items_skipped, errors))]
|
///
|
||||||
pub fn regenerate_dirty_documents(conn: &Connection) -> Result<RegenerateResult> {
|
/// `progress_callback` reports `(processed, estimated_total)` after each item.
|
||||||
|
#[instrument(
|
||||||
|
skip(conn, progress_callback),
|
||||||
|
fields(items_processed, items_skipped, errors)
|
||||||
|
)]
|
||||||
|
pub fn regenerate_dirty_documents(
|
||||||
|
conn: &Connection,
|
||||||
|
progress_callback: Option<&dyn Fn(usize, usize)>,
|
||||||
|
) -> Result<RegenerateResult> {
|
||||||
let mut result = RegenerateResult::default();
|
let mut result = RegenerateResult::default();
|
||||||
|
|
||||||
|
// Estimated total for progress reporting. Recount each loop iteration
|
||||||
|
// so the denominator grows if new items are enqueued during processing
|
||||||
|
// (the queue can grow while we drain it). We use max() so the value
|
||||||
|
// never shrinks — preventing the progress fraction from going backwards.
|
||||||
|
let mut estimated_total: usize = 0;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let dirty = get_dirty_sources(conn)?;
|
let dirty = get_dirty_sources(conn)?;
|
||||||
if dirty.is_empty() {
|
if dirty.is_empty() {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Recount remaining + already-processed to get the true total.
|
||||||
|
let remaining: usize = conn
|
||||||
|
.query_row("SELECT COUNT(*) FROM dirty_sources", [], |row| row.get(0))
|
||||||
|
.unwrap_or(0_i64) as usize;
|
||||||
|
let processed_so_far = result.regenerated + result.unchanged + result.errored;
|
||||||
|
estimated_total = estimated_total.max(processed_so_far + remaining);
|
||||||
|
|
||||||
for (source_type, source_id) in &dirty {
|
for (source_type, source_id) in &dirty {
|
||||||
match regenerate_one(conn, *source_type, *source_id) {
|
match regenerate_one(conn, *source_type, *source_id) {
|
||||||
Ok(changed) => {
|
Ok(changed) => {
|
||||||
@@ -52,6 +73,11 @@ pub fn regenerate_dirty_documents(conn: &Connection) -> Result<RegenerateResult>
|
|||||||
result.errored += 1;
|
result.errored += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let processed = result.regenerated + result.unchanged + result.errored;
|
||||||
|
if let Some(cb) = progress_callback {
|
||||||
|
cb(processed, estimated_total);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -123,7 +149,9 @@ fn upsert_document(conn: &Connection, doc: &DocumentData) -> Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
let _ = conn.execute_batch("ROLLBACK TO upsert_doc");
|
// ROLLBACK TO restores the savepoint but leaves it active.
|
||||||
|
// RELEASE removes it so the connection is clean for the next call.
|
||||||
|
let _ = conn.execute_batch("ROLLBACK TO upsert_doc; RELEASE upsert_doc");
|
||||||
Err(e)
|
Err(e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -358,7 +386,7 @@ mod tests {
|
|||||||
).unwrap();
|
).unwrap();
|
||||||
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
|
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
|
||||||
|
|
||||||
let result = regenerate_dirty_documents(&conn).unwrap();
|
let result = regenerate_dirty_documents(&conn, None).unwrap();
|
||||||
assert_eq!(result.regenerated, 1);
|
assert_eq!(result.regenerated, 1);
|
||||||
assert_eq!(result.unchanged, 0);
|
assert_eq!(result.unchanged, 0);
|
||||||
assert_eq!(result.errored, 0);
|
assert_eq!(result.errored, 0);
|
||||||
@@ -385,12 +413,12 @@ mod tests {
|
|||||||
|
|
||||||
// First regeneration creates the document
|
// First regeneration creates the document
|
||||||
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
|
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
|
||||||
let r1 = regenerate_dirty_documents(&conn).unwrap();
|
let r1 = regenerate_dirty_documents(&conn, None).unwrap();
|
||||||
assert_eq!(r1.regenerated, 1);
|
assert_eq!(r1.regenerated, 1);
|
||||||
|
|
||||||
// Second regeneration — same data, should be unchanged
|
// Second regeneration — same data, should be unchanged
|
||||||
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
|
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
|
||||||
let r2 = regenerate_dirty_documents(&conn).unwrap();
|
let r2 = regenerate_dirty_documents(&conn, None).unwrap();
|
||||||
assert_eq!(r2.unchanged, 1);
|
assert_eq!(r2.unchanged, 1);
|
||||||
assert_eq!(r2.regenerated, 0);
|
assert_eq!(r2.regenerated, 0);
|
||||||
}
|
}
|
||||||
@@ -403,7 +431,7 @@ mod tests {
|
|||||||
[],
|
[],
|
||||||
).unwrap();
|
).unwrap();
|
||||||
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
|
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
|
||||||
regenerate_dirty_documents(&conn).unwrap();
|
regenerate_dirty_documents(&conn, None).unwrap();
|
||||||
|
|
||||||
// Delete the issue and re-mark dirty
|
// Delete the issue and re-mark dirty
|
||||||
conn.execute("PRAGMA foreign_keys = OFF", []).unwrap();
|
conn.execute("PRAGMA foreign_keys = OFF", []).unwrap();
|
||||||
@@ -411,7 +439,7 @@ mod tests {
|
|||||||
conn.execute("PRAGMA foreign_keys = ON", []).unwrap();
|
conn.execute("PRAGMA foreign_keys = ON", []).unwrap();
|
||||||
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
|
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
|
||||||
|
|
||||||
let result = regenerate_dirty_documents(&conn).unwrap();
|
let result = regenerate_dirty_documents(&conn, None).unwrap();
|
||||||
assert_eq!(result.regenerated, 1); // Deletion counts as "changed"
|
assert_eq!(result.regenerated, 1); // Deletion counts as "changed"
|
||||||
|
|
||||||
let count: i64 = conn
|
let count: i64 = conn
|
||||||
@@ -431,7 +459,7 @@ mod tests {
|
|||||||
mark_dirty(&conn, SourceType::Issue, i).unwrap();
|
mark_dirty(&conn, SourceType::Issue, i).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
let result = regenerate_dirty_documents(&conn).unwrap();
|
let result = regenerate_dirty_documents(&conn, None).unwrap();
|
||||||
assert_eq!(result.regenerated, 10);
|
assert_eq!(result.regenerated, 10);
|
||||||
|
|
||||||
// Queue should be empty
|
// Queue should be empty
|
||||||
@@ -459,11 +487,11 @@ mod tests {
|
|||||||
|
|
||||||
// First run creates document
|
// First run creates document
|
||||||
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
|
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
|
||||||
regenerate_dirty_documents(&conn).unwrap();
|
regenerate_dirty_documents(&conn, None).unwrap();
|
||||||
|
|
||||||
// Second run — triple hash match, should skip ALL writes
|
// Second run — triple hash match, should skip ALL writes
|
||||||
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
|
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
|
||||||
let result = regenerate_dirty_documents(&conn).unwrap();
|
let result = regenerate_dirty_documents(&conn, None).unwrap();
|
||||||
assert_eq!(result.unchanged, 1);
|
assert_eq!(result.unchanged, 1);
|
||||||
|
|
||||||
// Labels should still be present (not deleted and re-inserted)
|
// Labels should still be present (not deleted and re-inserted)
|
||||||
|
|||||||
@@ -66,19 +66,76 @@ pub async fn embed_documents(
|
|||||||
// process crashes mid-page, the savepoint is never released and
|
// process crashes mid-page, the savepoint is never released and
|
||||||
// SQLite rolls back — preventing partial document states where old
|
// SQLite rolls back — preventing partial document states where old
|
||||||
// embeddings are cleared but new ones haven't been written yet.
|
// embeddings are cleared but new ones haven't been written yet.
|
||||||
|
//
|
||||||
|
// We use a closure + match to ensure the savepoint is always
|
||||||
|
// rolled back on error — bare `execute_batch("SAVEPOINT")` with `?`
|
||||||
|
// propagation would leak the savepoint and leave the connection in
|
||||||
|
// a broken transactional state.
|
||||||
conn.execute_batch("SAVEPOINT embed_page")?;
|
conn.execute_batch("SAVEPOINT embed_page")?;
|
||||||
|
let page_result = embed_page(
|
||||||
|
conn,
|
||||||
|
client,
|
||||||
|
model_name,
|
||||||
|
&pending,
|
||||||
|
&mut result,
|
||||||
|
&mut last_id,
|
||||||
|
&mut processed,
|
||||||
|
total,
|
||||||
|
&progress_callback,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
match page_result {
|
||||||
|
Ok(()) => {
|
||||||
|
conn.execute_batch("RELEASE embed_page")?;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
let _ = conn.execute_batch("ROLLBACK TO embed_page; RELEASE embed_page");
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
info!(
|
||||||
|
embedded = result.embedded,
|
||||||
|
failed = result.failed,
|
||||||
|
skipped = result.skipped,
|
||||||
|
"Embedding pipeline complete"
|
||||||
|
);
|
||||||
|
|
||||||
|
tracing::Span::current().record("items_processed", result.embedded);
|
||||||
|
tracing::Span::current().record("items_skipped", result.skipped);
|
||||||
|
tracing::Span::current().record("errors", result.failed);
|
||||||
|
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Process a single page of pending documents within an active savepoint.
|
||||||
|
///
|
||||||
|
/// All `?` propagation from this function is caught by the caller, which
|
||||||
|
/// rolls back the savepoint on error.
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
|
async fn embed_page(
|
||||||
|
conn: &Connection,
|
||||||
|
client: &OllamaClient,
|
||||||
|
model_name: &str,
|
||||||
|
pending: &[crate::embedding::change_detector::PendingDocument],
|
||||||
|
result: &mut EmbedResult,
|
||||||
|
last_id: &mut i64,
|
||||||
|
processed: &mut usize,
|
||||||
|
total: usize,
|
||||||
|
progress_callback: &Option<Box<dyn Fn(usize, usize)>>,
|
||||||
|
) -> Result<()> {
|
||||||
// Build chunk work items for this page
|
// Build chunk work items for this page
|
||||||
let mut all_chunks: Vec<ChunkWork> = Vec::new();
|
let mut all_chunks: Vec<ChunkWork> = Vec::new();
|
||||||
let mut page_normal_docs: usize = 0;
|
let mut page_normal_docs: usize = 0;
|
||||||
|
|
||||||
for doc in &pending {
|
for doc in pending {
|
||||||
// Always advance the cursor, even for skipped docs, to avoid re-fetching
|
// Always advance the cursor, even for skipped docs, to avoid re-fetching
|
||||||
last_id = doc.document_id;
|
*last_id = doc.document_id;
|
||||||
|
|
||||||
if doc.content_text.is_empty() {
|
if doc.content_text.is_empty() {
|
||||||
result.skipped += 1;
|
result.skipped += 1;
|
||||||
processed += 1;
|
*processed += 1;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -110,9 +167,9 @@ pub async fn embed_documents(
|
|||||||
),
|
),
|
||||||
)?;
|
)?;
|
||||||
result.skipped += 1;
|
result.skipped += 1;
|
||||||
processed += 1;
|
*processed += 1;
|
||||||
if let Some(ref cb) = progress_callback {
|
if let Some(cb) = progress_callback {
|
||||||
cb(processed, total);
|
cb(*processed, total);
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@@ -212,9 +269,7 @@ pub async fn embed_documents(
|
|||||||
|| (err_lower.contains("413") && err_lower.contains("http"));
|
|| (err_lower.contains("413") && err_lower.contains("http"));
|
||||||
|
|
||||||
if is_context_error && batch.len() > 1 {
|
if is_context_error && batch.len() > 1 {
|
||||||
warn!(
|
warn!("Batch failed with context length error, retrying chunks individually");
|
||||||
"Batch failed with context length error, retrying chunks individually"
|
|
||||||
);
|
|
||||||
for chunk in batch {
|
for chunk in batch {
|
||||||
match client.embed_batch(vec![chunk.text.clone()]).await {
|
match client.embed_batch(vec![chunk.text.clone()]).await {
|
||||||
Ok(embeddings)
|
Ok(embeddings)
|
||||||
@@ -280,27 +335,12 @@ pub async fn embed_documents(
|
|||||||
|
|
||||||
// Fire progress for all normal documents after embedding completes.
|
// Fire progress for all normal documents after embedding completes.
|
||||||
// This ensures progress reflects actual embedding work, not just chunking.
|
// This ensures progress reflects actual embedding work, not just chunking.
|
||||||
processed += page_normal_docs;
|
*processed += page_normal_docs;
|
||||||
if let Some(ref cb) = progress_callback {
|
if let Some(cb) = progress_callback {
|
||||||
cb(processed, total);
|
cb(*processed, total);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Commit all DB writes for this page atomically.
|
Ok(())
|
||||||
conn.execute_batch("RELEASE embed_page")?;
|
|
||||||
}
|
|
||||||
|
|
||||||
info!(
|
|
||||||
embedded = result.embedded,
|
|
||||||
failed = result.failed,
|
|
||||||
skipped = result.skipped,
|
|
||||||
"Embedding pipeline complete"
|
|
||||||
);
|
|
||||||
|
|
||||||
tracing::Span::current().record("items_processed", result.embedded);
|
|
||||||
tracing::Span::current().record("items_skipped", result.skipped);
|
|
||||||
tracing::Span::current().record("errors", result.failed);
|
|
||||||
|
|
||||||
Ok(result)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Clear all embeddings and metadata for a document.
|
/// Clear all embeddings and metadata for a document.
|
||||||
|
|||||||
@@ -122,6 +122,7 @@ impl GitLabClient {
|
|||||||
/// Make an authenticated API request with automatic 429 retry.
|
/// Make an authenticated API request with automatic 429 retry.
|
||||||
async fn request<T: serde::de::DeserializeOwned>(&self, path: &str) -> Result<T> {
|
async fn request<T: serde::de::DeserializeOwned>(&self, path: &str) -> Result<T> {
|
||||||
let url = format!("{}{}", self.base_url, path);
|
let url = format!("{}{}", self.base_url, path);
|
||||||
|
let mut last_response = None;
|
||||||
|
|
||||||
for attempt in 0..=Self::MAX_RETRIES {
|
for attempt in 0..=Self::MAX_RETRIES {
|
||||||
let delay = self.rate_limiter.lock().await.check_delay();
|
let delay = self.rate_limiter.lock().await.check_delay();
|
||||||
@@ -155,10 +156,15 @@ impl GitLabClient {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
return self.handle_response(response, path).await;
|
last_response = Some(response);
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
unreachable!("loop always returns")
|
// Safety: the loop always executes at least once (0..=MAX_RETRIES)
|
||||||
|
// and either sets last_response+break, or continues (only when
|
||||||
|
// attempt < MAX_RETRIES). The final iteration always reaches break.
|
||||||
|
self.handle_response(last_response.expect("retry loop ran at least once"), path)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse retry-after header from a 429 response, defaulting to 60s.
|
/// Parse retry-after header from a 429 response, defaulting to 60s.
|
||||||
@@ -543,6 +549,7 @@ impl GitLabClient {
|
|||||||
params: &[(&str, String)],
|
params: &[(&str, String)],
|
||||||
) -> Result<(T, HeaderMap)> {
|
) -> Result<(T, HeaderMap)> {
|
||||||
let url = format!("{}{}", self.base_url, path);
|
let url = format!("{}{}", self.base_url, path);
|
||||||
|
let mut last_response = None;
|
||||||
|
|
||||||
for attempt in 0..=Self::MAX_RETRIES {
|
for attempt in 0..=Self::MAX_RETRIES {
|
||||||
let delay = self.rate_limiter.lock().await.check_delay();
|
let delay = self.rate_limiter.lock().await.check_delay();
|
||||||
@@ -577,12 +584,14 @@ impl GitLabClient {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let headers = response.headers().clone();
|
last_response = Some(response);
|
||||||
let body = self.handle_response(response, path).await?;
|
break;
|
||||||
return Ok((body, headers));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
unreachable!("loop always returns")
|
let response = last_response.expect("retry loop ran at least once");
|
||||||
|
let headers = response.headers().clone();
|
||||||
|
let body = self.handle_response(response, path).await?;
|
||||||
|
Ok((body, headers))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -501,6 +501,7 @@ async fn handle_ingest(
|
|||||||
force,
|
force,
|
||||||
full,
|
full,
|
||||||
display,
|
display,
|
||||||
|
None,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
@@ -527,6 +528,7 @@ async fn handle_ingest(
|
|||||||
force,
|
force,
|
||||||
full,
|
full,
|
||||||
display,
|
display,
|
||||||
|
None,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
@@ -537,6 +539,7 @@ async fn handle_ingest(
|
|||||||
force,
|
force,
|
||||||
full,
|
full,
|
||||||
display,
|
display,
|
||||||
|
None,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
@@ -1225,7 +1228,7 @@ async fn handle_generate_docs(
|
|||||||
) -> Result<(), Box<dyn std::error::Error>> {
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
let config = Config::load(config_override)?;
|
let config = Config::load(config_override)?;
|
||||||
|
|
||||||
let result = run_generate_docs(&config, args.full, args.project.as_deref())?;
|
let result = run_generate_docs(&config, args.full, args.project.as_deref(), None)?;
|
||||||
if robot_mode {
|
if robot_mode {
|
||||||
print_generate_docs_json(&result);
|
print_generate_docs_json(&result);
|
||||||
} else {
|
} else {
|
||||||
@@ -1242,7 +1245,7 @@ async fn handle_embed(
|
|||||||
let config = Config::load(config_override)?;
|
let config = Config::load(config_override)?;
|
||||||
let full = args.full && !args.no_full;
|
let full = args.full && !args.no_full;
|
||||||
let retry_failed = args.retry_failed && !args.no_retry_failed;
|
let retry_failed = args.retry_failed && !args.no_retry_failed;
|
||||||
let result = run_embed(&config, full, retry_failed).await?;
|
let result = run_embed(&config, full, retry_failed, None).await?;
|
||||||
if robot_mode {
|
if robot_mode {
|
||||||
print_embed_json(&result);
|
print_embed_json(&result);
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -33,8 +33,10 @@ pub fn rank_rrf(vector_results: &[(i64, f64)], fts_results: &[(i64, f64)]) -> Ve
|
|||||||
for (i, &(doc_id, _)) in vector_results.iter().enumerate() {
|
for (i, &(doc_id, _)) in vector_results.iter().enumerate() {
|
||||||
let rank = i + 1; // 1-indexed
|
let rank = i + 1; // 1-indexed
|
||||||
let entry = scores.entry(doc_id).or_insert((0.0, None, None));
|
let entry = scores.entry(doc_id).or_insert((0.0, None, None));
|
||||||
entry.0 += 1.0 / (RRF_K + rank as f64);
|
// Only count the first occurrence per list to prevent duplicates
|
||||||
|
// from inflating the score.
|
||||||
if entry.1.is_none() {
|
if entry.1.is_none() {
|
||||||
|
entry.0 += 1.0 / (RRF_K + rank as f64);
|
||||||
entry.1 = Some(rank);
|
entry.1 = Some(rank);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -42,8 +44,8 @@ pub fn rank_rrf(vector_results: &[(i64, f64)], fts_results: &[(i64, f64)]) -> Ve
|
|||||||
for (i, &(doc_id, _)) in fts_results.iter().enumerate() {
|
for (i, &(doc_id, _)) in fts_results.iter().enumerate() {
|
||||||
let rank = i + 1; // 1-indexed
|
let rank = i + 1; // 1-indexed
|
||||||
let entry = scores.entry(doc_id).or_insert((0.0, None, None));
|
let entry = scores.entry(doc_id).or_insert((0.0, None, None));
|
||||||
entry.0 += 1.0 / (RRF_K + rank as f64);
|
|
||||||
if entry.2.is_none() {
|
if entry.2.is_none() {
|
||||||
|
entry.0 += 1.0 / (RRF_K + rank as f64);
|
||||||
entry.2 = Some(rank);
|
entry.2 = Some(rank);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user