fix: Content hash now computed after truncation, atomic job claiming

Two bug fixes:

1. extractor.rs: The content hash was computed on the pre-truncation
   content, meaning the hash stored in the document didn't correspond
   to the actual stored (truncated) content. This would cause change
   detection to miss updates when content changed only within the
   truncated portion. Hash is now computed after truncate_hard_cap()
   so it always matches the persisted content.

2. dependent_queue.rs: claim_jobs() had a TOCTOU race between the
   SELECT that found available jobs and the UPDATE that locked them.
   Under concurrent callers, two drain runs could claim the same job.
   Replaced with a single UPDATE ... RETURNING statement that
   atomically selects and locks jobs in one operation.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Taylor Eernisse
2026-02-03 14:09:22 -05:00
parent bb75a9d228
commit 4c0123426a
2 changed files with 23 additions and 29 deletions

View File

@@ -56,8 +56,8 @@ pub fn enqueue_job(
/// Claim a batch of jobs for processing.
///
/// Atomically sets `locked_at` on the claimed jobs. Only claims jobs where
/// `locked_at IS NULL` and `(next_retry_at IS NULL OR next_retry_at <= now)`.
/// Atomically selects and locks jobs within a transaction. Only claims jobs
/// where `locked_at IS NULL` and `(next_retry_at IS NULL OR next_retry_at <= now)`.
pub fn claim_jobs(conn: &Connection, job_type: &str, batch_size: usize) -> Result<Vec<PendingJob>> {
if batch_size == 0 {
return Ok(Vec::new());
@@ -65,19 +65,25 @@ pub fn claim_jobs(conn: &Connection, job_type: &str, batch_size: usize) -> Resul
let now = now_ms();
// Find available jobs
let mut select_stmt = conn.prepare_cached(
"SELECT id, project_id, entity_type, entity_iid, entity_local_id, job_type, payload_json, attempts
FROM pending_dependent_fetches
WHERE job_type = ?1
AND locked_at IS NULL
AND (next_retry_at IS NULL OR next_retry_at <= ?2)
ORDER BY enqueued_at ASC
LIMIT ?3",
// Use UPDATE ... RETURNING to atomically select and lock in one statement.
// This eliminates the race between SELECT and UPDATE.
let mut stmt = conn.prepare_cached(
"UPDATE pending_dependent_fetches
SET locked_at = ?1
WHERE id IN (
SELECT id FROM pending_dependent_fetches
WHERE job_type = ?2
AND locked_at IS NULL
AND (next_retry_at IS NULL OR next_retry_at <= ?1)
ORDER BY enqueued_at ASC
LIMIT ?3
)
RETURNING id, project_id, entity_type, entity_iid, entity_local_id,
job_type, payload_json, attempts",
)?;
let jobs: Vec<PendingJob> = select_stmt
.query_map(rusqlite::params![job_type, now, batch_size as i64], |row| {
let jobs = stmt
.query_map(rusqlite::params![now, job_type, batch_size as i64], |row| {
Ok(PendingJob {
id: row.get(0)?,
project_id: row.get(1)?,
@@ -91,18 +97,6 @@ pub fn claim_jobs(conn: &Connection, job_type: &str, batch_size: usize) -> Resul
})?
.collect::<std::result::Result<Vec<_>, _>>()?;
// Lock the claimed jobs
if !jobs.is_empty() {
let ids: Vec<String> = jobs.iter().map(|j| j.id.to_string()).collect();
let placeholders = ids.join(",");
conn.execute(
&format!(
"UPDATE pending_dependent_fetches SET locked_at = ?1 WHERE id IN ({placeholders})"
),
rusqlite::params![now],
)?;
}
Ok(jobs)
}

View File

@@ -166,12 +166,12 @@ pub fn extract_issue_document(conn: &Connection, issue_id: i64) -> Result<Option
content.push_str(desc);
}
let content_hash = compute_content_hash(&content);
let labels_hash = compute_list_hash(&labels);
let paths_hash = compute_list_hash(&[]); // Issues have no paths
// Apply hard cap truncation for safety
// Apply hard cap truncation for safety, then hash the final stored content
let hard_cap = truncate_hard_cap(&content);
let content_hash = compute_content_hash(&hard_cap.content);
Ok(Some(DocumentData {
source_type: SourceType::Issue,
@@ -281,12 +281,12 @@ pub fn extract_mr_document(conn: &Connection, mr_id: i64) -> Result<Option<Docum
content.push_str(desc);
}
let content_hash = compute_content_hash(&content);
let labels_hash = compute_list_hash(&labels);
let paths_hash = compute_list_hash(&[]);
// Apply hard cap truncation for safety
// Apply hard cap truncation for safety, then hash the final stored content
let hard_cap = truncate_hard_cap(&content);
let content_hash = compute_content_hash(&hard_cap.content);
Ok(Some(DocumentData {
source_type: SourceType::MergeRequest,