fix: Propagate queue errors, eliminate format!-based SQL construction
Two hardening changes to the dependent queue and orchestrator:
- dependent_queue::fail_job now propagates the rusqlite error via ?
instead of silently falling back to 0 attempts when the job row is
missing. A missing job is a real bug that should surface, not be
masked by unwrap_or(0) which would cause infinite retries at the
base backoff interval.
- orchestrator::enqueue_resource_events_for_entity_type replaces
format!-based SQL ("SELECT {id_col} FROM {table}") with separate
hardcoded queries per entity type. While the original values were
not user-controlled, hardcoded SQL is clearer about intent and
eliminates a class of injection risk entirely.
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -117,14 +117,12 @@ pub fn complete_job(conn: &Connection, job_id: i64) -> Result<()> {
|
|||||||
pub fn fail_job(conn: &Connection, job_id: i64, error: &str) -> Result<()> {
|
pub fn fail_job(conn: &Connection, job_id: i64, error: &str) -> Result<()> {
|
||||||
let now = now_ms();
|
let now = now_ms();
|
||||||
|
|
||||||
// Get current attempts
|
// Get current attempts (propagate error if job no longer exists)
|
||||||
let current_attempts: i32 = conn
|
let current_attempts: i32 = conn.query_row(
|
||||||
.query_row(
|
|
||||||
"SELECT attempts FROM pending_dependent_fetches WHERE id = ?1",
|
"SELECT attempts FROM pending_dependent_fetches WHERE id = ?1",
|
||||||
rusqlite::params![job_id],
|
rusqlite::params![job_id],
|
||||||
|row| row.get(0),
|
|row| row.get(0),
|
||||||
)
|
)?;
|
||||||
.unwrap_or(0);
|
|
||||||
|
|
||||||
let new_attempts = current_attempts + 1;
|
let new_attempts = current_attempts + 1;
|
||||||
let backoff_ms: i64 = (30_000i64 * (1i64 << (new_attempts - 1).min(4))).min(480_000);
|
let backoff_ms: i64 = (30_000i64 * (1i64 << (new_attempts - 1).min(4))).min(480_000);
|
||||||
|
|||||||
@@ -466,22 +466,26 @@ fn enqueue_resource_events_for_entity_type(
|
|||||||
project_id: i64,
|
project_id: i64,
|
||||||
entity_type: &str,
|
entity_type: &str,
|
||||||
) -> Result<usize> {
|
) -> Result<usize> {
|
||||||
let (table, id_col) = match entity_type {
|
|
||||||
"issue" => ("issues", "id"),
|
|
||||||
"merge_request" => ("merge_requests", "id"),
|
|
||||||
_ => return Ok(0),
|
|
||||||
};
|
|
||||||
|
|
||||||
// Query all entities for this project and enqueue resource_events jobs.
|
// Query all entities for this project and enqueue resource_events jobs.
|
||||||
// The UNIQUE constraint on pending_dependent_fetches makes this idempotent -
|
// The UNIQUE constraint on pending_dependent_fetches makes this idempotent -
|
||||||
// already-queued entities are silently skipped via INSERT OR IGNORE.
|
// already-queued entities are silently skipped via INSERT OR IGNORE.
|
||||||
let mut stmt = conn.prepare_cached(&format!(
|
//
|
||||||
"SELECT {id_col}, iid FROM {table} WHERE project_id = ?1"
|
// Use separate hardcoded queries per entity type to avoid format!-based SQL.
|
||||||
))?;
|
let entities: Vec<(i64, i64)> = match entity_type {
|
||||||
|
"issue" => {
|
||||||
let entities: Vec<(i64, i64)> = stmt
|
let mut stmt =
|
||||||
.query_map([project_id], |row| Ok((row.get(0)?, row.get(1)?)))?
|
conn.prepare_cached("SELECT id, iid FROM issues WHERE project_id = ?1")?;
|
||||||
.collect::<std::result::Result<Vec<_>, _>>()?;
|
stmt.query_map([project_id], |row| Ok((row.get(0)?, row.get(1)?)))?
|
||||||
|
.collect::<std::result::Result<Vec<_>, _>>()?
|
||||||
|
}
|
||||||
|
"merge_request" => {
|
||||||
|
let mut stmt =
|
||||||
|
conn.prepare_cached("SELECT id, iid FROM merge_requests WHERE project_id = ?1")?;
|
||||||
|
stmt.query_map([project_id], |row| Ok((row.get(0)?, row.get(1)?)))?
|
||||||
|
.collect::<std::result::Result<Vec<_>, _>>()?
|
||||||
|
}
|
||||||
|
_ => return Ok(0),
|
||||||
|
};
|
||||||
|
|
||||||
let mut enqueued = 0;
|
let mut enqueued = 0;
|
||||||
for (local_id, iid) in &entities {
|
for (local_id, iid) in &entities {
|
||||||
|
|||||||
Reference in New Issue
Block a user