From 4ee99c1677708fad2c74721d3070669b133907ea Mon Sep 17 00:00:00 2001 From: Taylor Eernisse Date: Tue, 3 Feb 2026 17:36:45 -0500 Subject: [PATCH] fix: Propagate queue errors, eliminate format!-based SQL construction Two hardening changes to the dependent queue and orchestrator: - dependent_queue::fail_job now propagates the rusqlite error via ? instead of silently falling back to 0 attempts when the job row is missing. A missing job is a real bug that should surface, not be masked by unwrap_or(0) which would cause infinite retries at the base backoff interval. - orchestrator::enqueue_resource_events_for_entity_type replaces format!-based SQL ("SELECT {id_col} FROM {table}") with separate hardcoded queries per entity type. While the original values were not user-controlled, hardcoded SQL is clearer about intent and eliminates a class of injection risk entirely. Co-Authored-By: Claude Opus 4.5 --- src/core/dependent_queue.rs | 14 ++++++-------- src/ingestion/orchestrator.rs | 30 +++++++++++++++++------------- 2 files changed, 23 insertions(+), 21 deletions(-) diff --git a/src/core/dependent_queue.rs b/src/core/dependent_queue.rs index e16640d..744bbc0 100644 --- a/src/core/dependent_queue.rs +++ b/src/core/dependent_queue.rs @@ -117,14 +117,12 @@ pub fn complete_job(conn: &Connection, job_id: i64) -> Result<()> { pub fn fail_job(conn: &Connection, job_id: i64, error: &str) -> Result<()> { let now = now_ms(); - // Get current attempts - let current_attempts: i32 = conn - .query_row( - "SELECT attempts FROM pending_dependent_fetches WHERE id = ?1", - rusqlite::params![job_id], - |row| row.get(0), - ) - .unwrap_or(0); + // Get current attempts (propagate error if job no longer exists) + let current_attempts: i32 = conn.query_row( + "SELECT attempts FROM pending_dependent_fetches WHERE id = ?1", + rusqlite::params![job_id], + |row| row.get(0), + )?; let new_attempts = current_attempts + 1; let backoff_ms: i64 = (30_000i64 * (1i64 << (new_attempts - 1).min(4))).min(480_000); diff --git a/src/ingestion/orchestrator.rs b/src/ingestion/orchestrator.rs index b5ceca4..20a9794 100644 --- a/src/ingestion/orchestrator.rs +++ b/src/ingestion/orchestrator.rs @@ -466,22 +466,26 @@ fn enqueue_resource_events_for_entity_type( project_id: i64, entity_type: &str, ) -> Result { - let (table, id_col) = match entity_type { - "issue" => ("issues", "id"), - "merge_request" => ("merge_requests", "id"), - _ => return Ok(0), - }; - // Query all entities for this project and enqueue resource_events jobs. // The UNIQUE constraint on pending_dependent_fetches makes this idempotent - // already-queued entities are silently skipped via INSERT OR IGNORE. - let mut stmt = conn.prepare_cached(&format!( - "SELECT {id_col}, iid FROM {table} WHERE project_id = ?1" - ))?; - - let entities: Vec<(i64, i64)> = stmt - .query_map([project_id], |row| Ok((row.get(0)?, row.get(1)?)))? - .collect::, _>>()?; + // + // Use separate hardcoded queries per entity type to avoid format!-based SQL. + let entities: Vec<(i64, i64)> = match entity_type { + "issue" => { + let mut stmt = + conn.prepare_cached("SELECT id, iid FROM issues WHERE project_id = ?1")?; + stmt.query_map([project_id], |row| Ok((row.get(0)?, row.get(1)?)))? + .collect::, _>>()? + } + "merge_request" => { + let mut stmt = + conn.prepare_cached("SELECT id, iid FROM merge_requests WHERE project_id = ?1")?; + stmt.query_map([project_id], |row| Ok((row.get(0)?, row.get(1)?)))? + .collect::, _>>()? + } + _ => return Ok(0), + }; let mut enqueued = 0; for (local_id, iid) in &entities {