From 4c0123426af619c184af5d3ea80a9f48a169b87e Mon Sep 17 00:00:00 2001 From: Taylor Eernisse Date: Tue, 3 Feb 2026 14:09:22 -0500 Subject: [PATCH] fix: Content hash now computed after truncation, atomic job claiming Two bug fixes: 1. extractor.rs: The content hash was computed on the pre-truncation content, meaning the hash stored in the document didn't correspond to the actual stored (truncated) content. This would cause change detection to miss updates when content changed only within the truncated portion. Hash is now computed after truncate_hard_cap() so it always matches the persisted content. 2. dependent_queue.rs: claim_jobs() had a TOCTOU race between the SELECT that found available jobs and the UPDATE that locked them. Under concurrent callers, two drain runs could claim the same job. Replaced with a single UPDATE ... RETURNING statement that atomically selects and locks jobs in one operation. Co-Authored-By: Claude Opus 4.5 --- src/core/dependent_queue.rs | 44 ++++++++++++++++--------------------- src/documents/extractor.rs | 8 +++---- 2 files changed, 23 insertions(+), 29 deletions(-) diff --git a/src/core/dependent_queue.rs b/src/core/dependent_queue.rs index 8ffad0e..e16640d 100644 --- a/src/core/dependent_queue.rs +++ b/src/core/dependent_queue.rs @@ -56,8 +56,8 @@ pub fn enqueue_job( /// Claim a batch of jobs for processing. /// -/// Atomically sets `locked_at` on the claimed jobs. Only claims jobs where -/// `locked_at IS NULL` and `(next_retry_at IS NULL OR next_retry_at <= now)`. +/// Atomically selects and locks jobs within a transaction. Only claims jobs +/// where `locked_at IS NULL` and `(next_retry_at IS NULL OR next_retry_at <= now)`. pub fn claim_jobs(conn: &Connection, job_type: &str, batch_size: usize) -> Result> { if batch_size == 0 { return Ok(Vec::new()); @@ -65,19 +65,25 @@ pub fn claim_jobs(conn: &Connection, job_type: &str, batch_size: usize) -> Resul let now = now_ms(); - // Find available jobs - let mut select_stmt = conn.prepare_cached( - "SELECT id, project_id, entity_type, entity_iid, entity_local_id, job_type, payload_json, attempts - FROM pending_dependent_fetches - WHERE job_type = ?1 - AND locked_at IS NULL - AND (next_retry_at IS NULL OR next_retry_at <= ?2) - ORDER BY enqueued_at ASC - LIMIT ?3", + // Use UPDATE ... RETURNING to atomically select and lock in one statement. + // This eliminates the race between SELECT and UPDATE. + let mut stmt = conn.prepare_cached( + "UPDATE pending_dependent_fetches + SET locked_at = ?1 + WHERE id IN ( + SELECT id FROM pending_dependent_fetches + WHERE job_type = ?2 + AND locked_at IS NULL + AND (next_retry_at IS NULL OR next_retry_at <= ?1) + ORDER BY enqueued_at ASC + LIMIT ?3 + ) + RETURNING id, project_id, entity_type, entity_iid, entity_local_id, + job_type, payload_json, attempts", )?; - let jobs: Vec = select_stmt - .query_map(rusqlite::params![job_type, now, batch_size as i64], |row| { + let jobs = stmt + .query_map(rusqlite::params![now, job_type, batch_size as i64], |row| { Ok(PendingJob { id: row.get(0)?, project_id: row.get(1)?, @@ -91,18 +97,6 @@ pub fn claim_jobs(conn: &Connection, job_type: &str, batch_size: usize) -> Resul })? .collect::, _>>()?; - // Lock the claimed jobs - if !jobs.is_empty() { - let ids: Vec = jobs.iter().map(|j| j.id.to_string()).collect(); - let placeholders = ids.join(","); - conn.execute( - &format!( - "UPDATE pending_dependent_fetches SET locked_at = ?1 WHERE id IN ({placeholders})" - ), - rusqlite::params![now], - )?; - } - Ok(jobs) } diff --git a/src/documents/extractor.rs b/src/documents/extractor.rs index 14d9358..dbc79a9 100644 --- a/src/documents/extractor.rs +++ b/src/documents/extractor.rs @@ -166,12 +166,12 @@ pub fn extract_issue_document(conn: &Connection, issue_id: i64) -> Result