#![allow(dead_code)] //! Sync screen actions — query sync run history and detect running syncs. //! //! With cron-driven syncs as the primary mechanism, the TUI's sync screen //! acts as a status dashboard. These pure query functions read `sync_runs` //! and `projects` to populate the screen. use anyhow::{Context, Result}; use rusqlite::Connection; use crate::clock::Clock; /// How many recent runs to display in the sync history. const HISTORY_LIMIT: usize = 10; /// If a "running" sync hasn't heartbeated in this many milliseconds, /// consider it stale (likely crashed). const STALE_HEARTBEAT_MS: i64 = 120_000; // 2 minutes // --------------------------------------------------------------------------- // Data types // --------------------------------------------------------------------------- /// Overview data for the sync screen. #[derive(Debug, Default)] pub struct SyncOverview { /// Info about a currently running sync, if any. pub running: Option, /// Most recent completed (succeeded or failed) run. pub last_completed: Option, /// Recent sync run history (newest first). pub recent_runs: Vec, /// Configured project paths. pub projects: Vec, } /// A sync that is currently in progress. #[derive(Debug, Clone)] pub struct RunningSyncInfo { /// Row ID in sync_runs. pub id: i64, /// When this sync started (ms epoch). pub started_at: i64, /// Last heartbeat (ms epoch). pub heartbeat_at: i64, /// How long it's been running (ms). pub elapsed_ms: u64, /// Whether the heartbeat is stale (sync may have crashed). pub stale: bool, /// Items processed so far. pub items_processed: u64, } /// Summary of a single sync run. #[derive(Debug, Clone)] pub struct SyncRunInfo { /// Row ID in sync_runs. pub id: i64, /// 'succeeded', 'failed', or 'running'. pub status: String, /// The command that was run (e.g., 'sync', 'ingest issues'). pub command: String, /// When this sync started (ms epoch). pub started_at: i64, /// When this sync finished (ms epoch), if completed. pub finished_at: Option, /// Duration in ms (computed from started_at/finished_at). pub duration_ms: Option, /// Total items processed. pub items_processed: u64, /// Total errors encountered. pub errors: u64, /// Error message if the run failed. pub error: Option, /// Correlation ID for log matching. pub run_id: Option, } // --------------------------------------------------------------------------- // Public API // --------------------------------------------------------------------------- /// Fetch the complete sync overview for the sync screen. /// /// Combines running sync detection, last completed run, recent history, /// and configured projects into a single struct. pub fn fetch_sync_overview(conn: &Connection, clock: &dyn Clock) -> Result { let running = detect_running_sync(conn, clock)?; let recent_runs = fetch_recent_runs(conn, HISTORY_LIMIT)?; let last_completed = recent_runs .iter() .find(|r| r.status == "succeeded" || r.status == "failed") .cloned(); let projects = fetch_configured_projects(conn)?; Ok(SyncOverview { running, last_completed, recent_runs, projects, }) } /// Detect a currently running sync from the `sync_runs` table. /// /// A sync is considered "running" if `status = 'running'`. It's marked /// stale if the heartbeat is older than [`STALE_HEARTBEAT_MS`]. pub fn detect_running_sync( conn: &Connection, clock: &dyn Clock, ) -> Result> { let result = conn.query_row( "SELECT id, started_at, heartbeat_at, total_items_processed FROM sync_runs WHERE status = 'running' ORDER BY id DESC LIMIT 1", [], |row| { let id: i64 = row.get(0)?; let started_at: i64 = row.get(1)?; let heartbeat_at: i64 = row.get(2)?; let items: Option = row.get(3)?; Ok((id, started_at, heartbeat_at, items.unwrap_or(0))) }, ); match result { Ok((id, started_at, heartbeat_at, items)) => { let now = clock.now_ms(); let elapsed_ms = now.saturating_sub(started_at); let stale = (now - heartbeat_at) > STALE_HEARTBEAT_MS; #[allow(clippy::cast_sign_loss)] Ok(Some(RunningSyncInfo { id, started_at, heartbeat_at, elapsed_ms: elapsed_ms as u64, stale, items_processed: items as u64, })) } Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), Err(e) => Err(e).context("detecting running sync"), } } /// Fetch recent sync runs (newest first). pub fn fetch_recent_runs(conn: &Connection, limit: usize) -> Result> { let mut stmt = conn .prepare( "SELECT id, status, command, started_at, finished_at, total_items_processed, total_errors, error, run_id FROM sync_runs ORDER BY id DESC LIMIT ?1", ) .context("preparing sync runs query")?; let rows = stmt .query_map([limit as i64], |row| { let id: i64 = row.get(0)?; let status: String = row.get(1)?; let command: String = row.get(2)?; let started_at: i64 = row.get(3)?; let finished_at: Option = row.get(4)?; let items: Option = row.get(5)?; let errors: Option = row.get(6)?; let error: Option = row.get(7)?; let run_id: Option = row.get(8)?; Ok(( id, status, command, started_at, finished_at, items, errors, error, run_id, )) }) .context("querying sync runs")?; let mut result = Vec::new(); for row in rows { let (id, status, command, started_at, finished_at, items, errors, error, run_id) = row.context("reading sync run row")?; #[allow(clippy::cast_sign_loss)] let duration_ms = finished_at.map(|f| (f - started_at) as u64); #[allow(clippy::cast_sign_loss)] result.push(SyncRunInfo { id, status, command, started_at, finished_at, duration_ms, items_processed: items.unwrap_or(0) as u64, errors: errors.unwrap_or(0) as u64, error, run_id, }); } Ok(result) } /// Fetch configured project paths from the `projects` table. pub fn fetch_configured_projects(conn: &Connection) -> Result> { let mut stmt = conn .prepare("SELECT path_with_namespace FROM projects ORDER BY path_with_namespace") .context("preparing projects query")?; let rows = stmt .query_map([], |row| row.get::<_, String>(0)) .context("querying projects")?; let mut result = Vec::new(); for row in rows { result.push(row.context("reading project row")?); } Ok(result) } // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- #[cfg(test)] mod tests { use super::*; use crate::clock::FakeClock; /// Create the minimal schema needed for sync queries. fn create_sync_schema(conn: &Connection) { conn.execute_batch( " CREATE TABLE projects ( id INTEGER PRIMARY KEY, gitlab_project_id INTEGER UNIQUE NOT NULL, path_with_namespace TEXT NOT NULL ); CREATE TABLE sync_runs ( id INTEGER PRIMARY KEY, started_at INTEGER NOT NULL, heartbeat_at INTEGER NOT NULL, finished_at INTEGER, status TEXT NOT NULL, command TEXT NOT NULL, error TEXT, metrics_json TEXT, run_id TEXT, total_items_processed INTEGER DEFAULT 0, total_errors INTEGER DEFAULT 0 ); ", ) .expect("create sync schema"); } fn insert_project(conn: &Connection, id: i64, path: &str) { conn.execute( "INSERT INTO projects (id, gitlab_project_id, path_with_namespace) VALUES (?1, ?2, ?3)", rusqlite::params![id, id * 100, path], ) .expect("insert project"); } #[allow(clippy::too_many_arguments)] fn insert_sync_run( conn: &Connection, started_at: i64, finished_at: Option, status: &str, command: &str, items: i64, errors: i64, error: Option<&str>, ) -> i64 { conn.execute( "INSERT INTO sync_runs (started_at, heartbeat_at, finished_at, status, command, total_items_processed, total_errors, error) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)", rusqlite::params![ started_at, finished_at.unwrap_or(started_at), finished_at, status, command, items, errors, error, ], ) .expect("insert sync run"); conn.last_insert_rowid() } // ----------------------------------------------------------------------- // detect_running_sync // ----------------------------------------------------------------------- #[test] fn test_detect_running_sync_none_when_empty() { let conn = Connection::open_in_memory().unwrap(); create_sync_schema(&conn); let clock = FakeClock::from_ms(1_700_000_000_000); let result = detect_running_sync(&conn, &clock).unwrap(); assert!(result.is_none()); } #[test] fn test_detect_running_sync_none_when_all_completed() { let conn = Connection::open_in_memory().unwrap(); create_sync_schema(&conn); let now = 1_700_000_000_000_i64; insert_sync_run( &conn, now - 60_000, Some(now - 30_000), "succeeded", "sync", 100, 0, None, ); insert_sync_run( &conn, now - 120_000, Some(now - 90_000), "failed", "sync", 50, 2, Some("timeout"), ); let clock = FakeClock::from_ms(now); let result = detect_running_sync(&conn, &clock).unwrap(); assert!(result.is_none()); } #[test] fn test_detect_running_sync_found() { let conn = Connection::open_in_memory().unwrap(); create_sync_schema(&conn); let now = 1_700_000_000_000_i64; let started = now - 30_000; // 30 seconds ago // Heartbeat at started_at (fresh since we just set it) conn.execute( "INSERT INTO sync_runs (started_at, heartbeat_at, status, command, total_items_processed) VALUES (?1, ?2, 'running', 'sync', 42)", [started, now - 5_000], // heartbeat 5 seconds ago ) .unwrap(); let clock = FakeClock::from_ms(now); let running = detect_running_sync(&conn, &clock).unwrap().unwrap(); assert_eq!(running.elapsed_ms, 30_000); assert_eq!(running.items_processed, 42); assert!(!running.stale); } #[test] fn test_detect_running_sync_stale_heartbeat() { let conn = Connection::open_in_memory().unwrap(); create_sync_schema(&conn); let now = 1_700_000_000_000_i64; let started = now - 300_000; // 5 minutes ago // Heartbeat 3 minutes ago — stale conn.execute( "INSERT INTO sync_runs (started_at, heartbeat_at, status, command) VALUES (?1, ?2, 'running', 'sync')", [started, now - 180_000], ) .unwrap(); let clock = FakeClock::from_ms(now); let running = detect_running_sync(&conn, &clock).unwrap().unwrap(); assert!(running.stale); assert_eq!(running.elapsed_ms, 300_000); } // ----------------------------------------------------------------------- // fetch_recent_runs // ----------------------------------------------------------------------- #[test] fn test_fetch_recent_runs_empty() { let conn = Connection::open_in_memory().unwrap(); create_sync_schema(&conn); let runs = fetch_recent_runs(&conn, 10).unwrap(); assert!(runs.is_empty()); } #[test] fn test_fetch_recent_runs_ordered_newest_first() { let conn = Connection::open_in_memory().unwrap(); create_sync_schema(&conn); let now = 1_700_000_000_000_i64; insert_sync_run( &conn, now - 120_000, Some(now - 90_000), "succeeded", "sync", 100, 0, None, ); insert_sync_run( &conn, now - 60_000, Some(now - 30_000), "succeeded", "sync", 200, 0, None, ); let runs = fetch_recent_runs(&conn, 10).unwrap(); assert_eq!(runs.len(), 2); // Newest first (higher id) assert_eq!(runs[0].items_processed, 200); assert_eq!(runs[1].items_processed, 100); } #[test] fn test_fetch_recent_runs_respects_limit() { let conn = Connection::open_in_memory().unwrap(); create_sync_schema(&conn); let now = 1_700_000_000_000_i64; for i in 0..5 { insert_sync_run( &conn, now - (5 - i) * 60_000, Some(now - (5 - i) * 60_000 + 30_000), "succeeded", "sync", i * 10, 0, None, ); } let runs = fetch_recent_runs(&conn, 3).unwrap(); assert_eq!(runs.len(), 3); } #[test] fn test_fetch_recent_runs_duration_computed() { let conn = Connection::open_in_memory().unwrap(); create_sync_schema(&conn); let now = 1_700_000_000_000_i64; insert_sync_run( &conn, now - 60_000, Some(now - 15_000), "succeeded", "sync", 0, 0, None, ); let runs = fetch_recent_runs(&conn, 10).unwrap(); assert_eq!(runs[0].duration_ms, Some(45_000)); } #[test] fn test_fetch_recent_runs_running_no_duration() { let conn = Connection::open_in_memory().unwrap(); create_sync_schema(&conn); let now = 1_700_000_000_000_i64; insert_sync_run(&conn, now - 60_000, None, "running", "sync", 0, 0, None); let runs = fetch_recent_runs(&conn, 10).unwrap(); assert_eq!(runs[0].status, "running"); assert!(runs[0].duration_ms.is_none()); } #[test] fn test_fetch_recent_runs_failed_with_error() { let conn = Connection::open_in_memory().unwrap(); create_sync_schema(&conn); let now = 1_700_000_000_000_i64; insert_sync_run( &conn, now - 60_000, Some(now - 30_000), "failed", "sync", 50, 3, Some("network timeout"), ); let runs = fetch_recent_runs(&conn, 10).unwrap(); assert_eq!(runs[0].status, "failed"); assert_eq!(runs[0].errors, 3); assert_eq!(runs[0].error.as_deref(), Some("network timeout")); } // ----------------------------------------------------------------------- // fetch_configured_projects // ----------------------------------------------------------------------- #[test] fn test_fetch_configured_projects_empty() { let conn = Connection::open_in_memory().unwrap(); create_sync_schema(&conn); let projects = fetch_configured_projects(&conn).unwrap(); assert!(projects.is_empty()); } #[test] fn test_fetch_configured_projects_sorted() { let conn = Connection::open_in_memory().unwrap(); create_sync_schema(&conn); insert_project(&conn, 1, "group/beta"); insert_project(&conn, 2, "group/alpha"); insert_project(&conn, 3, "other/gamma"); let projects = fetch_configured_projects(&conn).unwrap(); assert_eq!(projects, vec!["group/alpha", "group/beta", "other/gamma"]); } // ----------------------------------------------------------------------- // fetch_sync_overview (integration) // ----------------------------------------------------------------------- #[test] fn test_fetch_sync_overview_empty_db() { let conn = Connection::open_in_memory().unwrap(); create_sync_schema(&conn); let clock = FakeClock::from_ms(1_700_000_000_000); let overview = fetch_sync_overview(&conn, &clock).unwrap(); assert!(overview.running.is_none()); assert!(overview.last_completed.is_none()); assert!(overview.recent_runs.is_empty()); assert!(overview.projects.is_empty()); } #[test] fn test_fetch_sync_overview_with_history() { let conn = Connection::open_in_memory().unwrap(); create_sync_schema(&conn); let now = 1_700_000_000_000_i64; insert_project(&conn, 1, "group/repo"); insert_sync_run( &conn, now - 120_000, Some(now - 90_000), "succeeded", "sync", 150, 0, None, ); insert_sync_run( &conn, now - 60_000, Some(now - 30_000), "failed", "sync", 50, 2, Some("db locked"), ); let clock = FakeClock::from_ms(now); let overview = fetch_sync_overview(&conn, &clock).unwrap(); assert!(overview.running.is_none()); assert_eq!(overview.recent_runs.len(), 2); assert_eq!(overview.projects, vec!["group/repo"]); // last_completed should be the newest completed run (failed, id=2) let last = overview.last_completed.unwrap(); assert_eq!(last.status, "failed"); assert_eq!(last.errors, 2); } #[test] fn test_fetch_sync_overview_with_running_sync() { let conn = Connection::open_in_memory().unwrap(); create_sync_schema(&conn); let now = 1_700_000_000_000_i64; insert_project(&conn, 1, "group/repo"); // A completed run. insert_sync_run( &conn, now - 600_000, Some(now - 570_000), "succeeded", "sync", 200, 0, None, ); // A currently running sync. conn.execute( "INSERT INTO sync_runs (started_at, heartbeat_at, status, command, total_items_processed) VALUES (?1, ?2, 'running', 'sync', 75)", [now - 20_000, now - 2_000], ) .unwrap(); let clock = FakeClock::from_ms(now); let overview = fetch_sync_overview(&conn, &clock).unwrap(); assert!(overview.running.is_some()); let running = overview.running.unwrap(); assert_eq!(running.elapsed_ms, 20_000); assert_eq!(running.items_processed, 75); assert!(!running.stale); // last_completed should find the succeeded run, not the running one. let last = overview.last_completed.unwrap(); assert_eq!(last.status, "succeeded"); assert_eq!(last.items_processed, 200); } #[test] fn test_sync_run_info_with_run_id() { let conn = Connection::open_in_memory().unwrap(); create_sync_schema(&conn); let now = 1_700_000_000_000_i64; conn.execute( "INSERT INTO sync_runs (started_at, heartbeat_at, finished_at, status, command, total_items_processed, total_errors, run_id) VALUES (?1, ?1, ?2, 'succeeded', 'sync', 100, 0, 'abc-123')", [now - 60_000, now - 30_000], ) .unwrap(); let runs = fetch_recent_runs(&conn, 10).unwrap(); assert_eq!(runs[0].run_id.as_deref(), Some("abc-123")); } }