//! Generic dependent fetch queue for resource events, MR closes, and MR diffs. //! //! Provides enqueue, claim, complete, fail (with exponential backoff), and //! stale lock reclamation operations against the `pending_dependent_fetches` table. use std::collections::HashMap; use rusqlite::Connection; use super::error::Result; use super::time::now_ms; /// A pending job from the dependent fetch queue. #[derive(Debug)] pub struct PendingJob { pub id: i64, pub project_id: i64, pub entity_type: String, pub entity_iid: i64, pub entity_local_id: i64, pub job_type: String, pub payload_json: Option, pub attempts: i32, } /// Enqueue a dependent fetch job. Idempotent via UNIQUE constraint (INSERT OR IGNORE). /// /// Returns `true` if actually inserted (not deduped). pub fn enqueue_job( conn: &Connection, project_id: i64, entity_type: &str, entity_iid: i64, entity_local_id: i64, job_type: &str, payload_json: Option<&str>, ) -> Result { let now = now_ms(); let changes = conn.execute( "INSERT OR IGNORE INTO pending_dependent_fetches (project_id, entity_type, entity_iid, entity_local_id, job_type, payload_json, enqueued_at) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)", rusqlite::params![project_id, entity_type, entity_iid, entity_local_id, job_type, payload_json, now], )?; Ok(changes > 0) } /// Claim a batch of jobs for processing. /// /// Atomically sets `locked_at` on the claimed jobs. Only claims jobs where /// `locked_at IS NULL` and `(next_retry_at IS NULL OR next_retry_at <= now)`. pub fn claim_jobs(conn: &Connection, job_type: &str, batch_size: usize) -> Result> { if batch_size == 0 { return Ok(Vec::new()); } let now = now_ms(); // Find available jobs let mut select_stmt = conn.prepare_cached( "SELECT id, project_id, entity_type, entity_iid, entity_local_id, job_type, payload_json, attempts FROM pending_dependent_fetches WHERE job_type = ?1 AND locked_at IS NULL AND (next_retry_at IS NULL OR next_retry_at <= ?2) ORDER BY enqueued_at ASC LIMIT ?3", )?; let jobs: Vec = select_stmt .query_map( rusqlite::params![job_type, now, batch_size as i64], |row| { Ok(PendingJob { id: row.get(0)?, project_id: row.get(1)?, entity_type: row.get(2)?, entity_iid: row.get(3)?, entity_local_id: row.get(4)?, job_type: row.get(5)?, payload_json: row.get(6)?, attempts: row.get(7)?, }) }, )? .collect::, _>>()?; // Lock the claimed jobs if !jobs.is_empty() { let ids: Vec = jobs.iter().map(|j| j.id.to_string()).collect(); let placeholders = ids.join(","); conn.execute( &format!( "UPDATE pending_dependent_fetches SET locked_at = ?1 WHERE id IN ({placeholders})" ), rusqlite::params![now], )?; } Ok(jobs) } /// Mark a job as complete (DELETE the row). pub fn complete_job(conn: &Connection, job_id: i64) -> Result<()> { conn.execute( "DELETE FROM pending_dependent_fetches WHERE id = ?1", rusqlite::params![job_id], )?; Ok(()) } /// Mark a job as failed. Increments attempts, sets next_retry_at with exponential /// backoff, clears locked_at, and records the error. /// /// Backoff: 30s * 2^(attempts-1), capped at 480s. pub fn fail_job(conn: &Connection, job_id: i64, error: &str) -> Result<()> { let now = now_ms(); // Get current attempts let current_attempts: i32 = conn .query_row( "SELECT attempts FROM pending_dependent_fetches WHERE id = ?1", rusqlite::params![job_id], |row| row.get(0), ) .unwrap_or(0); let new_attempts = current_attempts + 1; let backoff_ms: i64 = (30_000i64 * (1i64 << (new_attempts - 1).min(4))).min(480_000); let next_retry = now + backoff_ms; conn.execute( "UPDATE pending_dependent_fetches SET attempts = ?1, next_retry_at = ?2, locked_at = NULL, last_error = ?3 WHERE id = ?4", rusqlite::params![new_attempts, next_retry, error, job_id], )?; Ok(()) } /// Reclaim stale locks (locked_at older than threshold). /// /// Returns count of reclaimed jobs. pub fn reclaim_stale_locks(conn: &Connection, stale_threshold_minutes: u32) -> Result { let threshold_ms = now_ms() - (i64::from(stale_threshold_minutes) * 60 * 1000); let changes = conn.execute( "UPDATE pending_dependent_fetches SET locked_at = NULL WHERE locked_at < ?1", rusqlite::params![threshold_ms], )?; Ok(changes) } /// Count pending jobs by job_type (for stats/progress). pub fn count_pending_jobs(conn: &Connection) -> Result> { let mut stmt = conn.prepare_cached( "SELECT job_type, COUNT(*) FROM pending_dependent_fetches GROUP BY job_type", )?; let mut counts = HashMap::new(); let rows = stmt.query_map([], |row| { let job_type: String = row.get(0)?; let count: i64 = row.get(1)?; Ok((job_type, count as usize)) })?; for row in rows { let (job_type, count) = row?; counts.insert(job_type, count); } Ok(counts) }