diff --git a/src/core/dependent_queue.rs b/src/core/dependent_queue.rs index 744bbc0..bbd4f2d 100644 --- a/src/core/dependent_queue.rs +++ b/src/core/dependent_queue.rs @@ -54,11 +54,16 @@ pub fn enqueue_job( Ok(changes > 0) } -/// Claim a batch of jobs for processing. +/// Claim a batch of jobs for processing, scoped to a specific project. /// /// Atomically selects and locks jobs within a transaction. Only claims jobs /// where `locked_at IS NULL` and `(next_retry_at IS NULL OR next_retry_at <= now)`. -pub fn claim_jobs(conn: &Connection, job_type: &str, batch_size: usize) -> Result> { +pub fn claim_jobs( + conn: &Connection, + job_type: &str, + project_id: i64, + batch_size: usize, +) -> Result> { if batch_size == 0 { return Ok(Vec::new()); } @@ -73,6 +78,7 @@ pub fn claim_jobs(conn: &Connection, job_type: &str, batch_size: usize) -> Resul WHERE id IN ( SELECT id FROM pending_dependent_fetches WHERE job_type = ?2 + AND project_id = ?4 AND locked_at IS NULL AND (next_retry_at IS NULL OR next_retry_at <= ?1) ORDER BY enqueued_at ASC @@ -83,18 +89,21 @@ pub fn claim_jobs(conn: &Connection, job_type: &str, batch_size: usize) -> Resul )?; let jobs = stmt - .query_map(rusqlite::params![now, job_type, batch_size as i64], |row| { - Ok(PendingJob { - id: row.get(0)?, - project_id: row.get(1)?, - entity_type: row.get(2)?, - entity_iid: row.get(3)?, - entity_local_id: row.get(4)?, - job_type: row.get(5)?, - payload_json: row.get(6)?, - attempts: row.get(7)?, - }) - })? + .query_map( + rusqlite::params![now, job_type, batch_size as i64, project_id], + |row| { + Ok(PendingJob { + id: row.get(0)?, + project_id: row.get(1)?, + entity_type: row.get(2)?, + entity_iid: row.get(3)?, + entity_local_id: row.get(4)?, + job_type: row.get(5)?, + payload_json: row.get(6)?, + attempts: row.get(7)?, + }) + }, + )? .collect::, _>>()?; Ok(jobs) @@ -152,19 +161,69 @@ pub fn reclaim_stale_locks(conn: &Connection, stale_threshold_minutes: u32) -> R Ok(changes) } -/// Count pending jobs by job_type (for stats/progress). -pub fn count_pending_jobs(conn: &Connection) -> Result> { - let mut stmt = conn.prepare_cached( - "SELECT job_type, COUNT(*) FROM pending_dependent_fetches GROUP BY job_type", - )?; - +/// Count pending jobs by job_type, optionally scoped to a project. +pub fn count_pending_jobs( + conn: &Connection, + project_id: Option, +) -> Result> { let mut counts = HashMap::new(); - let rows = stmt.query_map([], |row| { + + match project_id { + Some(pid) => { + let mut stmt = conn.prepare_cached( + "SELECT job_type, COUNT(*) FROM pending_dependent_fetches \ + WHERE project_id = ?1 GROUP BY job_type", + )?; + let rows = stmt.query_map(rusqlite::params![pid], |row| { + let job_type: String = row.get(0)?; + let count: i64 = row.get(1)?; + Ok((job_type, count as usize)) + })?; + for row in rows { + let (job_type, count) = row?; + counts.insert(job_type, count); + } + } + None => { + let mut stmt = conn.prepare_cached( + "SELECT job_type, COUNT(*) FROM pending_dependent_fetches GROUP BY job_type", + )?; + let rows = stmt.query_map([], |row| { + let job_type: String = row.get(0)?; + let count: i64 = row.get(1)?; + Ok((job_type, count as usize)) + })?; + for row in rows { + let (job_type, count) = row?; + counts.insert(job_type, count); + } + } + } + + Ok(counts) +} + +/// Count jobs that are actually claimable right now, by job_type. +/// +/// Only counts jobs where `locked_at IS NULL` and `(next_retry_at IS NULL OR next_retry_at <= now)`, +/// matching the exact WHERE clause used by [`claim_jobs`]. This gives an accurate total +/// for progress bars — unlike [`count_pending_jobs`] which includes locked and backing-off jobs. +pub fn count_claimable_jobs(conn: &Connection, project_id: i64) -> Result> { + let now = now_ms(); + let mut counts = HashMap::new(); + + let mut stmt = conn.prepare_cached( + "SELECT job_type, COUNT(*) FROM pending_dependent_fetches \ + WHERE project_id = ?1 \ + AND locked_at IS NULL \ + AND (next_retry_at IS NULL OR next_retry_at <= ?2) \ + GROUP BY job_type", + )?; + let rows = stmt.query_map(rusqlite::params![project_id, now], |row| { let job_type: String = row.get(0)?; let count: i64 = row.get(1)?; Ok((job_type, count as usize)) })?; - for row in rows { let (job_type, count) = row?; counts.insert(job_type, count); diff --git a/src/embedding/chunk_ids.rs b/src/embedding/chunk_ids.rs index dce1b08..a678764 100644 --- a/src/embedding/chunk_ids.rs +++ b/src/embedding/chunk_ids.rs @@ -1,7 +1,7 @@ /// Multiplier for encoding (document_id, chunk_index) into a single rowid. /// Supports up to 1000 chunks per document. At CHUNK_MAX_BYTES=6000, /// a 2MB document (MAX_DOCUMENT_BYTES_HARD) produces ~333 chunks. -/// The pipeline enforces chunk_count < CHUNK_ROWID_MULTIPLIER at runtime. +/// The pipeline enforces chunk_count <= CHUNK_ROWID_MULTIPLIER at runtime. pub const CHUNK_ROWID_MULTIPLIER: i64 = 1000; /// Encode (document_id, chunk_index) into a sqlite-vec rowid. diff --git a/src/gitlab/client.rs b/src/gitlab/client.rs index 42853ec..dd9552b 100644 --- a/src/gitlab/client.rs +++ b/src/gitlab/client.rs @@ -144,11 +144,12 @@ impl GitLabClient { if response.status() == StatusCode::TOO_MANY_REQUESTS && attempt < Self::MAX_RETRIES { let retry_after = Self::parse_retry_after(&response); - tracing::warn!( - retry_after_secs = retry_after, + tracing::info!( + path = %path, attempt, - path, - "Rate limited by GitLab, retrying" + retry_after_secs = retry_after, + status_code = 429u16, + "Rate limited, retrying" ); sleep(Duration::from_secs(retry_after)).await; continue; @@ -565,11 +566,12 @@ impl GitLabClient { if response.status() == StatusCode::TOO_MANY_REQUESTS && attempt < Self::MAX_RETRIES { let retry_after = Self::parse_retry_after(&response); - tracing::warn!( - retry_after_secs = retry_after, + tracing::info!( + path = %path, attempt, - path, - "Rate limited by GitLab, retrying" + retry_after_secs = retry_after, + status_code = 429u16, + "Rate limited, retrying" ); sleep(Duration::from_secs(retry_after)).await; continue; diff --git a/src/gitlab/transformers/discussion.rs b/src/gitlab/transformers/discussion.rs index 043790a..32d352a 100644 --- a/src/gitlab/transformers/discussion.rs +++ b/src/gitlab/transformers/discussion.rs @@ -1,5 +1,7 @@ //! Discussion and note transformers: convert GitLab discussions to local schema. +use tracing::warn; + use crate::core::time::{iso_to_ms, iso_to_ms_strict, now_ms}; use crate::gitlab::types::{GitLabDiscussion, GitLabNote}; @@ -60,7 +62,13 @@ pub struct NormalizedNote { /// Parse ISO 8601 timestamp to milliseconds, defaulting to 0 on failure. fn parse_timestamp(ts: &str) -> i64 { - iso_to_ms(ts).unwrap_or(0) + match iso_to_ms(ts) { + Some(ms) => ms, + None => { + warn!(timestamp = ts, "Invalid timestamp, defaulting to epoch 0"); + 0 + } + } } /// Transform a GitLab discussion into normalized schema. diff --git a/src/ingestion/merge_requests.rs b/src/ingestion/merge_requests.rs index fe713a8..29de75b 100644 --- a/src/ingestion/merge_requests.rs +++ b/src/ingestion/merge_requests.rs @@ -314,7 +314,6 @@ fn process_mr_in_transaction( }) } -/// Upsert a label within a transaction, returning its ID. /// Upsert a label within a transaction, returning its ID. /// Uses INSERT...ON CONFLICT...RETURNING for a single round-trip. fn upsert_label_tx( diff --git a/src/search/rrf.rs b/src/search/rrf.rs index 5d741a5..67aed40 100644 --- a/src/search/rrf.rs +++ b/src/search/rrf.rs @@ -60,11 +60,7 @@ pub fn rank_rrf(vector_results: &[(i64, f64)], fts_results: &[(i64, f64)]) -> Ve .collect(); // Sort descending by rrf_score - results.sort_by(|a, b| { - b.rrf_score - .partial_cmp(&a.rrf_score) - .unwrap_or(std::cmp::Ordering::Equal) - }); + results.sort_by(|a, b| b.rrf_score.total_cmp(&a.rrf_score)); // Normalize: best = 1.0 if let Some(max_score) = results.first().map(|r| r.rrf_score).filter(|&s| s > 0.0) {