diff --git a/src/core/dependent_queue.rs b/src/core/dependent_queue.rs index 8ffad0e..e16640d 100644 --- a/src/core/dependent_queue.rs +++ b/src/core/dependent_queue.rs @@ -56,8 +56,8 @@ pub fn enqueue_job( /// Claim a batch of jobs for processing. /// -/// Atomically sets `locked_at` on the claimed jobs. Only claims jobs where -/// `locked_at IS NULL` and `(next_retry_at IS NULL OR next_retry_at <= now)`. +/// Atomically selects and locks jobs within a transaction. Only claims jobs +/// where `locked_at IS NULL` and `(next_retry_at IS NULL OR next_retry_at <= now)`. pub fn claim_jobs(conn: &Connection, job_type: &str, batch_size: usize) -> Result> { if batch_size == 0 { return Ok(Vec::new()); @@ -65,19 +65,25 @@ pub fn claim_jobs(conn: &Connection, job_type: &str, batch_size: usize) -> Resul let now = now_ms(); - // Find available jobs - let mut select_stmt = conn.prepare_cached( - "SELECT id, project_id, entity_type, entity_iid, entity_local_id, job_type, payload_json, attempts - FROM pending_dependent_fetches - WHERE job_type = ?1 - AND locked_at IS NULL - AND (next_retry_at IS NULL OR next_retry_at <= ?2) - ORDER BY enqueued_at ASC - LIMIT ?3", + // Use UPDATE ... RETURNING to atomically select and lock in one statement. + // This eliminates the race between SELECT and UPDATE. + let mut stmt = conn.prepare_cached( + "UPDATE pending_dependent_fetches + SET locked_at = ?1 + WHERE id IN ( + SELECT id FROM pending_dependent_fetches + WHERE job_type = ?2 + AND locked_at IS NULL + AND (next_retry_at IS NULL OR next_retry_at <= ?1) + ORDER BY enqueued_at ASC + LIMIT ?3 + ) + RETURNING id, project_id, entity_type, entity_iid, entity_local_id, + job_type, payload_json, attempts", )?; - let jobs: Vec = select_stmt - .query_map(rusqlite::params![job_type, now, batch_size as i64], |row| { + let jobs = stmt + .query_map(rusqlite::params![now, job_type, batch_size as i64], |row| { Ok(PendingJob { id: row.get(0)?, project_id: row.get(1)?, @@ -91,18 +97,6 @@ pub fn claim_jobs(conn: &Connection, job_type: &str, batch_size: usize) -> Resul })? .collect::, _>>()?; - // Lock the claimed jobs - if !jobs.is_empty() { - let ids: Vec = jobs.iter().map(|j| j.id.to_string()).collect(); - let placeholders = ids.join(","); - conn.execute( - &format!( - "UPDATE pending_dependent_fetches SET locked_at = ?1 WHERE id IN ({placeholders})" - ), - rusqlite::params![now], - )?; - } - Ok(jobs) } diff --git a/src/documents/extractor.rs b/src/documents/extractor.rs index 14d9358..dbc79a9 100644 --- a/src/documents/extractor.rs +++ b/src/documents/extractor.rs @@ -166,12 +166,12 @@ pub fn extract_issue_document(conn: &Connection, issue_id: i64) -> Result