Add complete infrastructure for ingesting GitLab Resource Events (state, label, milestone) into local SQLite tables. This enables temporal queries (timeline, file-history, trace) in later gates. - Add migration 011: resource_state/label/milestone_events tables, entity_references table, pending_dependent_fetches queue - Add 6 serde types for GitLab Resource Events API responses - Add fetchResourceEvents config flag with --no-events CLI override - Add fetch_all_pages<T> generic paginator and 6 API endpoint methods - Add DB upsert functions with savepoint atomicity (events_db.rs) - Add dependent fetch queue with exponential backoff (dependent_queue.rs) - Add 'lore count events' command with human table and robot JSON output - Extend 'lore stats --check' with event FK integrity and queue health - Add 8 unit tests for resource event type deserialization Closes: bd-hu3, bd-2e8, bd-2fm, bd-sqw, bd-1uc, bd-tir, bd-3sh, bd-1m8 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
178 lines
5.4 KiB
Rust
178 lines
5.4 KiB
Rust
//! Generic dependent fetch queue for resource events, MR closes, and MR diffs.
|
|
//!
|
|
//! Provides enqueue, claim, complete, fail (with exponential backoff), and
|
|
//! stale lock reclamation operations against the `pending_dependent_fetches` table.
|
|
|
|
use std::collections::HashMap;
|
|
|
|
use rusqlite::Connection;
|
|
|
|
use super::error::Result;
|
|
use super::time::now_ms;
|
|
|
|
/// A pending job from the dependent fetch queue.
|
|
#[derive(Debug)]
|
|
pub struct PendingJob {
|
|
pub id: i64,
|
|
pub project_id: i64,
|
|
pub entity_type: String,
|
|
pub entity_iid: i64,
|
|
pub entity_local_id: i64,
|
|
pub job_type: String,
|
|
pub payload_json: Option<String>,
|
|
pub attempts: i32,
|
|
}
|
|
|
|
/// Enqueue a dependent fetch job. Idempotent via UNIQUE constraint (INSERT OR IGNORE).
|
|
///
|
|
/// Returns `true` if actually inserted (not deduped).
|
|
pub fn enqueue_job(
|
|
conn: &Connection,
|
|
project_id: i64,
|
|
entity_type: &str,
|
|
entity_iid: i64,
|
|
entity_local_id: i64,
|
|
job_type: &str,
|
|
payload_json: Option<&str>,
|
|
) -> Result<bool> {
|
|
let now = now_ms();
|
|
let changes = conn.execute(
|
|
"INSERT OR IGNORE INTO pending_dependent_fetches
|
|
(project_id, entity_type, entity_iid, entity_local_id, job_type, payload_json, enqueued_at)
|
|
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
|
|
rusqlite::params![project_id, entity_type, entity_iid, entity_local_id, job_type, payload_json, now],
|
|
)?;
|
|
|
|
Ok(changes > 0)
|
|
}
|
|
|
|
/// Claim a batch of jobs for processing.
|
|
///
|
|
/// Atomically sets `locked_at` on the claimed jobs. Only claims jobs where
|
|
/// `locked_at IS NULL` and `(next_retry_at IS NULL OR next_retry_at <= now)`.
|
|
pub fn claim_jobs(conn: &Connection, job_type: &str, batch_size: usize) -> Result<Vec<PendingJob>> {
|
|
if batch_size == 0 {
|
|
return Ok(Vec::new());
|
|
}
|
|
|
|
let now = now_ms();
|
|
|
|
// Find available jobs
|
|
let mut select_stmt = conn.prepare_cached(
|
|
"SELECT id, project_id, entity_type, entity_iid, entity_local_id, job_type, payload_json, attempts
|
|
FROM pending_dependent_fetches
|
|
WHERE job_type = ?1
|
|
AND locked_at IS NULL
|
|
AND (next_retry_at IS NULL OR next_retry_at <= ?2)
|
|
ORDER BY enqueued_at ASC
|
|
LIMIT ?3",
|
|
)?;
|
|
|
|
let jobs: Vec<PendingJob> = select_stmt
|
|
.query_map(
|
|
rusqlite::params![job_type, now, batch_size as i64],
|
|
|row| {
|
|
Ok(PendingJob {
|
|
id: row.get(0)?,
|
|
project_id: row.get(1)?,
|
|
entity_type: row.get(2)?,
|
|
entity_iid: row.get(3)?,
|
|
entity_local_id: row.get(4)?,
|
|
job_type: row.get(5)?,
|
|
payload_json: row.get(6)?,
|
|
attempts: row.get(7)?,
|
|
})
|
|
},
|
|
)?
|
|
.collect::<std::result::Result<Vec<_>, _>>()?;
|
|
|
|
// Lock the claimed jobs
|
|
if !jobs.is_empty() {
|
|
let ids: Vec<String> = jobs.iter().map(|j| j.id.to_string()).collect();
|
|
let placeholders = ids.join(",");
|
|
conn.execute(
|
|
&format!(
|
|
"UPDATE pending_dependent_fetches SET locked_at = ?1 WHERE id IN ({placeholders})"
|
|
),
|
|
rusqlite::params![now],
|
|
)?;
|
|
}
|
|
|
|
Ok(jobs)
|
|
}
|
|
|
|
/// Mark a job as complete (DELETE the row).
|
|
pub fn complete_job(conn: &Connection, job_id: i64) -> Result<()> {
|
|
conn.execute(
|
|
"DELETE FROM pending_dependent_fetches WHERE id = ?1",
|
|
rusqlite::params![job_id],
|
|
)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Mark a job as failed. Increments attempts, sets next_retry_at with exponential
|
|
/// backoff, clears locked_at, and records the error.
|
|
///
|
|
/// Backoff: 30s * 2^(attempts-1), capped at 480s.
|
|
pub fn fail_job(conn: &Connection, job_id: i64, error: &str) -> Result<()> {
|
|
let now = now_ms();
|
|
|
|
// Get current attempts
|
|
let current_attempts: i32 = conn
|
|
.query_row(
|
|
"SELECT attempts FROM pending_dependent_fetches WHERE id = ?1",
|
|
rusqlite::params![job_id],
|
|
|row| row.get(0),
|
|
)
|
|
.unwrap_or(0);
|
|
|
|
let new_attempts = current_attempts + 1;
|
|
let backoff_ms: i64 = (30_000i64 * (1i64 << (new_attempts - 1).min(4))).min(480_000);
|
|
let next_retry = now + backoff_ms;
|
|
|
|
conn.execute(
|
|
"UPDATE pending_dependent_fetches
|
|
SET attempts = ?1, next_retry_at = ?2, locked_at = NULL, last_error = ?3
|
|
WHERE id = ?4",
|
|
rusqlite::params![new_attempts, next_retry, error, job_id],
|
|
)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Reclaim stale locks (locked_at older than threshold).
|
|
///
|
|
/// Returns count of reclaimed jobs.
|
|
pub fn reclaim_stale_locks(conn: &Connection, stale_threshold_minutes: u32) -> Result<usize> {
|
|
let threshold_ms = now_ms() - (i64::from(stale_threshold_minutes) * 60 * 1000);
|
|
|
|
let changes = conn.execute(
|
|
"UPDATE pending_dependent_fetches SET locked_at = NULL WHERE locked_at < ?1",
|
|
rusqlite::params![threshold_ms],
|
|
)?;
|
|
|
|
Ok(changes)
|
|
}
|
|
|
|
/// Count pending jobs by job_type (for stats/progress).
|
|
pub fn count_pending_jobs(conn: &Connection) -> Result<HashMap<String, usize>> {
|
|
let mut stmt = conn.prepare_cached(
|
|
"SELECT job_type, COUNT(*) FROM pending_dependent_fetches GROUP BY job_type",
|
|
)?;
|
|
|
|
let mut counts = HashMap::new();
|
|
let rows = stmt.query_map([], |row| {
|
|
let job_type: String = row.get(0)?;
|
|
let count: i64 = row.get(1)?;
|
|
Ok((job_type, count as usize))
|
|
})?;
|
|
|
|
for row in rows {
|
|
let (job_type, count) = row?;
|
|
counts.insert(job_type, count);
|
|
}
|
|
|
|
Ok(counts)
|
|
}
|