fix: Project-scoped job claiming, structured rate-limit logging, RRF total_cmp

Targeted fixes across multiple subsystems:

dependent_queue:
- Add project_id parameter to claim_jobs() for project-scoped job claiming,
  preventing cross-project job theft during concurrent multi-project ingestion
- Add project_id parameter to count_pending_jobs() with optional scoping
  (None returns global counts, Some(pid) returns per-project counts)

gitlab/client:
- Downgrade rate-limit log from warn to info (429s are expected operational
  behavior, not warnings) and add structured fields (path, status_code)
  for better log filtering and aggregation

gitlab/transformers/discussion:
- Add tracing::warn on invalid timestamp parse instead of silent fallback
  to epoch 0, making data quality issues visible in logs

ingestion/merge_requests:
- Remove duplicate doc comment on upsert_label_tx

search/rrf:
- Replace partial_cmp().unwrap_or() with total_cmp() for f64 sorting,
  eliminating the NaN edge case entirely (total_cmp treats NaN consistently)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
teernisse
2026-02-04 10:01:28 -05:00
parent f6d19a9467
commit 86a51cddef
6 changed files with 102 additions and 38 deletions

View File

@@ -54,11 +54,16 @@ pub fn enqueue_job(
Ok(changes > 0) Ok(changes > 0)
} }
/// Claim a batch of jobs for processing. /// Claim a batch of jobs for processing, scoped to a specific project.
/// ///
/// Atomically selects and locks jobs within a transaction. Only claims jobs /// Atomically selects and locks jobs within a transaction. Only claims jobs
/// where `locked_at IS NULL` and `(next_retry_at IS NULL OR next_retry_at <= now)`. /// where `locked_at IS NULL` and `(next_retry_at IS NULL OR next_retry_at <= now)`.
pub fn claim_jobs(conn: &Connection, job_type: &str, batch_size: usize) -> Result<Vec<PendingJob>> { pub fn claim_jobs(
conn: &Connection,
job_type: &str,
project_id: i64,
batch_size: usize,
) -> Result<Vec<PendingJob>> {
if batch_size == 0 { if batch_size == 0 {
return Ok(Vec::new()); return Ok(Vec::new());
} }
@@ -73,6 +78,7 @@ pub fn claim_jobs(conn: &Connection, job_type: &str, batch_size: usize) -> Resul
WHERE id IN ( WHERE id IN (
SELECT id FROM pending_dependent_fetches SELECT id FROM pending_dependent_fetches
WHERE job_type = ?2 WHERE job_type = ?2
AND project_id = ?4
AND locked_at IS NULL AND locked_at IS NULL
AND (next_retry_at IS NULL OR next_retry_at <= ?1) AND (next_retry_at IS NULL OR next_retry_at <= ?1)
ORDER BY enqueued_at ASC ORDER BY enqueued_at ASC
@@ -83,7 +89,9 @@ pub fn claim_jobs(conn: &Connection, job_type: &str, batch_size: usize) -> Resul
)?; )?;
let jobs = stmt let jobs = stmt
.query_map(rusqlite::params![now, job_type, batch_size as i64], |row| { .query_map(
rusqlite::params![now, job_type, batch_size as i64, project_id],
|row| {
Ok(PendingJob { Ok(PendingJob {
id: row.get(0)?, id: row.get(0)?,
project_id: row.get(1)?, project_id: row.get(1)?,
@@ -94,7 +102,8 @@ pub fn claim_jobs(conn: &Connection, job_type: &str, batch_size: usize) -> Resul
payload_json: row.get(6)?, payload_json: row.get(6)?,
attempts: row.get(7)?, attempts: row.get(7)?,
}) })
})? },
)?
.collect::<std::result::Result<Vec<_>, _>>()?; .collect::<std::result::Result<Vec<_>, _>>()?;
Ok(jobs) Ok(jobs)
@@ -152,19 +161,69 @@ pub fn reclaim_stale_locks(conn: &Connection, stale_threshold_minutes: u32) -> R
Ok(changes) Ok(changes)
} }
/// Count pending jobs by job_type (for stats/progress). /// Count pending jobs by job_type, optionally scoped to a project.
pub fn count_pending_jobs(conn: &Connection) -> Result<HashMap<String, usize>> { pub fn count_pending_jobs(
conn: &Connection,
project_id: Option<i64>,
) -> Result<HashMap<String, usize>> {
let mut counts = HashMap::new();
match project_id {
Some(pid) => {
let mut stmt = conn.prepare_cached(
"SELECT job_type, COUNT(*) FROM pending_dependent_fetches \
WHERE project_id = ?1 GROUP BY job_type",
)?;
let rows = stmt.query_map(rusqlite::params![pid], |row| {
let job_type: String = row.get(0)?;
let count: i64 = row.get(1)?;
Ok((job_type, count as usize))
})?;
for row in rows {
let (job_type, count) = row?;
counts.insert(job_type, count);
}
}
None => {
let mut stmt = conn.prepare_cached( let mut stmt = conn.prepare_cached(
"SELECT job_type, COUNT(*) FROM pending_dependent_fetches GROUP BY job_type", "SELECT job_type, COUNT(*) FROM pending_dependent_fetches GROUP BY job_type",
)?; )?;
let mut counts = HashMap::new();
let rows = stmt.query_map([], |row| { let rows = stmt.query_map([], |row| {
let job_type: String = row.get(0)?; let job_type: String = row.get(0)?;
let count: i64 = row.get(1)?; let count: i64 = row.get(1)?;
Ok((job_type, count as usize)) Ok((job_type, count as usize))
})?; })?;
for row in rows {
let (job_type, count) = row?;
counts.insert(job_type, count);
}
}
}
Ok(counts)
}
/// Count jobs that are actually claimable right now, by job_type.
///
/// Only counts jobs where `locked_at IS NULL` and `(next_retry_at IS NULL OR next_retry_at <= now)`,
/// matching the exact WHERE clause used by [`claim_jobs`]. This gives an accurate total
/// for progress bars — unlike [`count_pending_jobs`] which includes locked and backing-off jobs.
pub fn count_claimable_jobs(conn: &Connection, project_id: i64) -> Result<HashMap<String, usize>> {
let now = now_ms();
let mut counts = HashMap::new();
let mut stmt = conn.prepare_cached(
"SELECT job_type, COUNT(*) FROM pending_dependent_fetches \
WHERE project_id = ?1 \
AND locked_at IS NULL \
AND (next_retry_at IS NULL OR next_retry_at <= ?2) \
GROUP BY job_type",
)?;
let rows = stmt.query_map(rusqlite::params![project_id, now], |row| {
let job_type: String = row.get(0)?;
let count: i64 = row.get(1)?;
Ok((job_type, count as usize))
})?;
for row in rows { for row in rows {
let (job_type, count) = row?; let (job_type, count) = row?;
counts.insert(job_type, count); counts.insert(job_type, count);

View File

@@ -1,7 +1,7 @@
/// Multiplier for encoding (document_id, chunk_index) into a single rowid. /// Multiplier for encoding (document_id, chunk_index) into a single rowid.
/// Supports up to 1000 chunks per document. At CHUNK_MAX_BYTES=6000, /// Supports up to 1000 chunks per document. At CHUNK_MAX_BYTES=6000,
/// a 2MB document (MAX_DOCUMENT_BYTES_HARD) produces ~333 chunks. /// a 2MB document (MAX_DOCUMENT_BYTES_HARD) produces ~333 chunks.
/// The pipeline enforces chunk_count < CHUNK_ROWID_MULTIPLIER at runtime. /// The pipeline enforces chunk_count <= CHUNK_ROWID_MULTIPLIER at runtime.
pub const CHUNK_ROWID_MULTIPLIER: i64 = 1000; pub const CHUNK_ROWID_MULTIPLIER: i64 = 1000;
/// Encode (document_id, chunk_index) into a sqlite-vec rowid. /// Encode (document_id, chunk_index) into a sqlite-vec rowid.

View File

@@ -144,11 +144,12 @@ impl GitLabClient {
if response.status() == StatusCode::TOO_MANY_REQUESTS && attempt < Self::MAX_RETRIES { if response.status() == StatusCode::TOO_MANY_REQUESTS && attempt < Self::MAX_RETRIES {
let retry_after = Self::parse_retry_after(&response); let retry_after = Self::parse_retry_after(&response);
tracing::warn!( tracing::info!(
retry_after_secs = retry_after, path = %path,
attempt, attempt,
path, retry_after_secs = retry_after,
"Rate limited by GitLab, retrying" status_code = 429u16,
"Rate limited, retrying"
); );
sleep(Duration::from_secs(retry_after)).await; sleep(Duration::from_secs(retry_after)).await;
continue; continue;
@@ -565,11 +566,12 @@ impl GitLabClient {
if response.status() == StatusCode::TOO_MANY_REQUESTS && attempt < Self::MAX_RETRIES { if response.status() == StatusCode::TOO_MANY_REQUESTS && attempt < Self::MAX_RETRIES {
let retry_after = Self::parse_retry_after(&response); let retry_after = Self::parse_retry_after(&response);
tracing::warn!( tracing::info!(
retry_after_secs = retry_after, path = %path,
attempt, attempt,
path, retry_after_secs = retry_after,
"Rate limited by GitLab, retrying" status_code = 429u16,
"Rate limited, retrying"
); );
sleep(Duration::from_secs(retry_after)).await; sleep(Duration::from_secs(retry_after)).await;
continue; continue;

View File

@@ -1,5 +1,7 @@
//! Discussion and note transformers: convert GitLab discussions to local schema. //! Discussion and note transformers: convert GitLab discussions to local schema.
use tracing::warn;
use crate::core::time::{iso_to_ms, iso_to_ms_strict, now_ms}; use crate::core::time::{iso_to_ms, iso_to_ms_strict, now_ms};
use crate::gitlab::types::{GitLabDiscussion, GitLabNote}; use crate::gitlab::types::{GitLabDiscussion, GitLabNote};
@@ -60,7 +62,13 @@ pub struct NormalizedNote {
/// Parse ISO 8601 timestamp to milliseconds, defaulting to 0 on failure. /// Parse ISO 8601 timestamp to milliseconds, defaulting to 0 on failure.
fn parse_timestamp(ts: &str) -> i64 { fn parse_timestamp(ts: &str) -> i64 {
iso_to_ms(ts).unwrap_or(0) match iso_to_ms(ts) {
Some(ms) => ms,
None => {
warn!(timestamp = ts, "Invalid timestamp, defaulting to epoch 0");
0
}
}
} }
/// Transform a GitLab discussion into normalized schema. /// Transform a GitLab discussion into normalized schema.

View File

@@ -314,7 +314,6 @@ fn process_mr_in_transaction(
}) })
} }
/// Upsert a label within a transaction, returning its ID.
/// Upsert a label within a transaction, returning its ID. /// Upsert a label within a transaction, returning its ID.
/// Uses INSERT...ON CONFLICT...RETURNING for a single round-trip. /// Uses INSERT...ON CONFLICT...RETURNING for a single round-trip.
fn upsert_label_tx( fn upsert_label_tx(

View File

@@ -60,11 +60,7 @@ pub fn rank_rrf(vector_results: &[(i64, f64)], fts_results: &[(i64, f64)]) -> Ve
.collect(); .collect();
// Sort descending by rrf_score // Sort descending by rrf_score
results.sort_by(|a, b| { results.sort_by(|a, b| b.rrf_score.total_cmp(&a.rrf_score));
b.rrf_score
.partial_cmp(&a.rrf_score)
.unwrap_or(std::cmp::Ordering::Equal)
});
// Normalize: best = 1.0 // Normalize: best = 1.0
if let Some(max_score) = results.first().map(|r| r.rrf_score).filter(|&s| s > 0.0) { if let Some(max_score) = results.first().map(|r| r.rrf_score).filter(|&s| s > 0.0) {