use rusqlite::Connection; use rusqlite::OptionalExtension; use crate::core::backoff::compute_next_attempt_at; use crate::core::error::Result; use crate::core::time::now_ms; use crate::documents::SourceType; const DIRTY_SOURCES_BATCH_SIZE: usize = 500; pub fn mark_dirty_tx( tx: &rusqlite::Transaction<'_>, source_type: SourceType, source_id: i64, ) -> Result<()> { tx.execute( "INSERT INTO dirty_sources (source_type, source_id, queued_at) VALUES (?1, ?2, ?3) ON CONFLICT(source_type, source_id) DO UPDATE SET queued_at = excluded.queued_at, attempt_count = 0, last_attempt_at = NULL, last_error = NULL, next_attempt_at = NULL", rusqlite::params![source_type.as_str(), source_id, now_ms()], )?; Ok(()) } pub fn mark_dirty(conn: &Connection, source_type: SourceType, source_id: i64) -> Result<()> { conn.execute( "INSERT INTO dirty_sources (source_type, source_id, queued_at) VALUES (?1, ?2, ?3) ON CONFLICT(source_type, source_id) DO UPDATE SET queued_at = excluded.queued_at, attempt_count = 0, last_attempt_at = NULL, last_error = NULL, next_attempt_at = NULL", rusqlite::params![source_type.as_str(), source_id, now_ms()], )?; Ok(()) } pub fn get_dirty_sources(conn: &Connection) -> Result> { let now = now_ms(); let mut stmt = conn.prepare( "SELECT source_type, source_id FROM dirty_sources WHERE next_attempt_at IS NULL OR next_attempt_at <= ?1 ORDER BY attempt_count ASC, queued_at ASC LIMIT ?2", )?; let rows = stmt .query_map( rusqlite::params![now, DIRTY_SOURCES_BATCH_SIZE as i64], |row| { let st_str: String = row.get(0)?; let source_id: i64 = row.get(1)?; Ok((st_str, source_id)) }, )? .collect::, _>>()?; let mut results = Vec::with_capacity(rows.len()); for (st_str, source_id) in rows { let source_type = SourceType::parse(&st_str).ok_or_else(|| { crate::core::error::LoreError::Other(format!( "Invalid source_type in dirty_sources: {}", st_str )) })?; results.push((source_type, source_id)); } Ok(results) } pub fn clear_dirty(conn: &Connection, source_type: SourceType, source_id: i64) -> Result<()> { conn.execute( "DELETE FROM dirty_sources WHERE source_type = ?1 AND source_id = ?2", rusqlite::params![source_type.as_str(), source_id], )?; Ok(()) } pub fn record_dirty_error( conn: &Connection, source_type: SourceType, source_id: i64, error: &str, ) -> Result<()> { let now = now_ms(); let attempt_count: Option = conn .query_row( "SELECT attempt_count FROM dirty_sources WHERE source_type = ?1 AND source_id = ?2", rusqlite::params![source_type.as_str(), source_id], |row| row.get(0), ) .optional()?; let Some(attempt_count) = attempt_count else { return Ok(()); }; let new_attempt = attempt_count + 1; let next_at = compute_next_attempt_at(now, new_attempt); conn.execute( "UPDATE dirty_sources SET attempt_count = ?1, last_attempt_at = ?2, last_error = ?3, next_attempt_at = ?4 WHERE source_type = ?5 AND source_id = ?6", rusqlite::params![ new_attempt, now, error, next_at, source_type.as_str(), source_id ], )?; Ok(()) } #[cfg(test)] #[path = "dirty_tracker_tests.rs"] mod tests;