diff --git a/src/core/dependent_queue.rs b/src/core/dependent_queue.rs index e16640d..744bbc0 100644 --- a/src/core/dependent_queue.rs +++ b/src/core/dependent_queue.rs @@ -117,14 +117,12 @@ pub fn complete_job(conn: &Connection, job_id: i64) -> Result<()> { pub fn fail_job(conn: &Connection, job_id: i64, error: &str) -> Result<()> { let now = now_ms(); - // Get current attempts - let current_attempts: i32 = conn - .query_row( - "SELECT attempts FROM pending_dependent_fetches WHERE id = ?1", - rusqlite::params![job_id], - |row| row.get(0), - ) - .unwrap_or(0); + // Get current attempts (propagate error if job no longer exists) + let current_attempts: i32 = conn.query_row( + "SELECT attempts FROM pending_dependent_fetches WHERE id = ?1", + rusqlite::params![job_id], + |row| row.get(0), + )?; let new_attempts = current_attempts + 1; let backoff_ms: i64 = (30_000i64 * (1i64 << (new_attempts - 1).min(4))).min(480_000); diff --git a/src/ingestion/orchestrator.rs b/src/ingestion/orchestrator.rs index b5ceca4..20a9794 100644 --- a/src/ingestion/orchestrator.rs +++ b/src/ingestion/orchestrator.rs @@ -466,22 +466,26 @@ fn enqueue_resource_events_for_entity_type( project_id: i64, entity_type: &str, ) -> Result { - let (table, id_col) = match entity_type { - "issue" => ("issues", "id"), - "merge_request" => ("merge_requests", "id"), - _ => return Ok(0), - }; - // Query all entities for this project and enqueue resource_events jobs. // The UNIQUE constraint on pending_dependent_fetches makes this idempotent - // already-queued entities are silently skipped via INSERT OR IGNORE. - let mut stmt = conn.prepare_cached(&format!( - "SELECT {id_col}, iid FROM {table} WHERE project_id = ?1" - ))?; - - let entities: Vec<(i64, i64)> = stmt - .query_map([project_id], |row| Ok((row.get(0)?, row.get(1)?)))? - .collect::, _>>()?; + // + // Use separate hardcoded queries per entity type to avoid format!-based SQL. + let entities: Vec<(i64, i64)> = match entity_type { + "issue" => { + let mut stmt = + conn.prepare_cached("SELECT id, iid FROM issues WHERE project_id = ?1")?; + stmt.query_map([project_id], |row| Ok((row.get(0)?, row.get(1)?)))? + .collect::, _>>()? + } + "merge_request" => { + let mut stmt = + conn.prepare_cached("SELECT id, iid FROM merge_requests WHERE project_id = ?1")?; + stmt.query_map([project_id], |row| Ok((row.get(0)?, row.get(1)?)))? + .collect::, _>>()? + } + _ => return Ok(0), + }; let mut enqueued = 0; for (local_id, iid) in &entities {