//! Concurrent pagination/write race tests (bd-14hv). //! //! Proves that the keyset pagination + snapshot fence mechanism prevents //! duplicate or skipped rows when a writer inserts new issues concurrently //! with a reader paginating through the issue list. //! //! Architecture: //! - DbManager (3 readers + 1 writer) with WAL mode //! - Reader threads: paginate using `fetch_issue_list()` with keyset cursor //! - Writer thread: INSERT new issues concurrently //! - Assertions: no duplicate IIDs, snapshot fence excludes new writes use std::collections::HashSet; use std::path::PathBuf; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Barrier}; use rusqlite::Connection; use lore_tui::action::fetch_issue_list; use lore_tui::db::DbManager; use lore_tui::state::issue_list::{IssueFilter, IssueListState, SortField, SortOrder}; // --------------------------------------------------------------------------- // Test infrastructure // --------------------------------------------------------------------------- static DB_COUNTER: AtomicU64 = AtomicU64::new(0); fn test_db_path() -> PathBuf { let n = DB_COUNTER.fetch_add(1, Ordering::Relaxed); let dir = std::env::temp_dir().join("lore-tui-pagination-tests"); std::fs::create_dir_all(&dir).expect("create test dir"); dir.join(format!( "race-{}-{:?}-{n}.db", std::process::id(), std::thread::current().id(), )) } /// Create the schema needed for issue list queries. fn create_schema(conn: &Connection) { conn.execute_batch( " CREATE TABLE projects ( id INTEGER PRIMARY KEY, gitlab_project_id INTEGER UNIQUE NOT NULL, path_with_namespace TEXT NOT NULL ); CREATE TABLE issues ( id INTEGER PRIMARY KEY, gitlab_id INTEGER UNIQUE NOT NULL, project_id INTEGER NOT NULL, iid INTEGER NOT NULL, title TEXT, state TEXT NOT NULL, author_username TEXT, created_at INTEGER NOT NULL, updated_at INTEGER NOT NULL, last_seen_at INTEGER NOT NULL ); CREATE TABLE labels ( id INTEGER PRIMARY KEY, gitlab_id INTEGER, project_id INTEGER NOT NULL, name TEXT NOT NULL, color TEXT, description TEXT ); CREATE TABLE issue_labels ( issue_id INTEGER NOT NULL, label_id INTEGER NOT NULL, PRIMARY KEY(issue_id, label_id) ); INSERT INTO projects (gitlab_project_id, path_with_namespace) VALUES (1, 'group/project'); ", ) .expect("create schema"); } /// Insert N issues with sequential IIDs starting from `start_iid`. /// /// Each issue gets `updated_at = base_ts - (offset * 1000)` to create /// a deterministic ordering for keyset pagination (newest first). fn seed_issues(conn: &Connection, start_iid: i64, count: i64, base_ts: i64) { let mut stmt = conn .prepare( "INSERT INTO issues (gitlab_id, project_id, iid, title, state, author_username, created_at, updated_at, last_seen_at) VALUES (?1, 1, ?2, ?3, 'opened', 'alice', ?4, ?4, ?4)", ) .expect("prepare insert"); for i in 0..count { let iid = start_iid + i; let ts = base_ts - (i * 1000); stmt.execute(rusqlite::params![ iid * 100, // gitlab_id iid, format!("Issue {iid}"), ts, ]) .expect("insert issue"); } } // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- /// Paginate through all issues without concurrent writes. /// /// Baseline: keyset pagination yields every IID exactly once. #[test] fn test_pagination_no_duplicates_baseline() { let path = test_db_path(); let db = DbManager::open(&path).expect("open db"); let base_ts = 1_700_000_000_000_i64; db.with_writer(|conn| { create_schema(conn); seed_issues(conn, 1, 200, base_ts); Ok(()) }) .unwrap(); // Paginate through all issues collecting IIDs. let mut all_iids = Vec::new(); let mut state = IssueListState::default(); let filter = IssueFilter::default(); loop { let page = db .with_reader(|conn| { fetch_issue_list( conn, &filter, SortField::UpdatedAt, SortOrder::Desc, state.next_cursor.as_ref(), state.snapshot_fence, ) }) .expect("fetch page"); if page.rows.is_empty() { break; } for row in &page.rows { all_iids.push(row.iid); } state.apply_page(page); if state.next_cursor.is_none() { break; } } // Every IID 1..=200 should appear exactly once. let unique: HashSet = all_iids.iter().copied().collect(); assert_eq!( unique.len(), 200, "Expected 200 unique IIDs, got {}", unique.len() ); assert_eq!( all_iids.len(), 200, "Expected 200 total IIDs, got {} (duplicates present)", all_iids.len() ); } /// Concurrent writer inserts NEW issues (with future timestamps) while /// reader paginates. Snapshot fence should exclude the new rows. #[test] fn test_pagination_no_duplicates_with_concurrent_writes() { let path = test_db_path(); let db = Arc::new(DbManager::open(&path).expect("open db")); let base_ts = 1_700_000_000_000_i64; // Seed 200 issues. db.with_writer(|conn| { create_schema(conn); seed_issues(conn, 1, 200, base_ts); Ok(()) }) .unwrap(); // Barrier to synchronize reader and writer start. let barrier = Arc::new(Barrier::new(2)); // Writer thread: inserts issues with NEWER timestamps (above the fence). let db_w = Arc::clone(&db); let barrier_w = Arc::clone(&barrier); let writer = std::thread::spawn(move || { barrier_w.wait(); for batch in 0..10 { db_w.with_writer(|conn| { for i in 0..10 { let iid = 1000 + batch * 10 + i; // Future timestamp: above the snapshot fence. let ts = base_ts + 100_000 + (batch * 10 + i) * 1000; conn.execute( "INSERT INTO issues (gitlab_id, project_id, iid, title, state, author_username, created_at, updated_at, last_seen_at) VALUES (?1, 1, ?2, ?3, 'opened', 'writer', ?4, ?4, ?4)", rusqlite::params![iid * 100, iid, format!("New {iid}"), ts], )?; } Ok(()) }) .expect("writer batch"); // Small yield to interleave with reader. std::thread::yield_now(); } }); // Reader thread: paginate with snapshot fence. let db_r = Arc::clone(&db); let barrier_r = Arc::clone(&barrier); let reader = std::thread::spawn(move || { barrier_r.wait(); let mut all_iids = Vec::new(); let mut state = IssueListState::default(); let filter = IssueFilter::default(); loop { let page = db_r .with_reader(|conn| { fetch_issue_list( conn, &filter, SortField::UpdatedAt, SortOrder::Desc, state.next_cursor.as_ref(), state.snapshot_fence, ) }) .expect("fetch page"); if page.rows.is_empty() { break; } for row in &page.rows { all_iids.push(row.iid); } state.apply_page(page); // Yield to let writer interleave. std::thread::yield_now(); if state.next_cursor.is_none() { break; } } all_iids }); writer.join().expect("writer thread"); let all_iids = reader.join().expect("reader thread"); // The critical invariant: NO DUPLICATES. let unique: HashSet = all_iids.iter().copied().collect(); assert_eq!( all_iids.len(), unique.len(), "Duplicate IIDs found in pagination results" ); // All original issues present. for iid in 1..=200 { assert!( unique.contains(&iid), "Original issue {iid} missing from pagination" ); } // Writer issues may appear on the first page (before the fence is // established), but should NOT cause duplicates. Count them as a // diagnostic. let writer_count = all_iids.iter().filter(|&&iid| iid >= 1000).count(); eprintln!("Writer issues visible through fence: {writer_count} (expected: few or zero)"); } /// Multiple concurrent readers paginating simultaneously — no interference. #[test] fn test_multiple_concurrent_readers() { let path = test_db_path(); let db = Arc::new(DbManager::open(&path).expect("open db")); let base_ts = 1_700_000_000_000_i64; db.with_writer(|conn| { create_schema(conn); seed_issues(conn, 1, 100, base_ts); Ok(()) }) .unwrap(); let barrier = Arc::new(Barrier::new(4)); let mut handles = Vec::new(); for reader_id in 0..4 { let db_r = Arc::clone(&db); let barrier_r = Arc::clone(&barrier); handles.push(std::thread::spawn(move || { barrier_r.wait(); let mut all_iids = Vec::new(); let mut state = IssueListState::default(); let filter = IssueFilter::default(); loop { let page = db_r .with_reader(|conn| { fetch_issue_list( conn, &filter, SortField::UpdatedAt, SortOrder::Desc, state.next_cursor.as_ref(), state.snapshot_fence, ) }) .unwrap_or_else(|e| panic!("reader {reader_id} fetch failed: {e}")); if page.rows.is_empty() { break; } for row in &page.rows { all_iids.push(row.iid); } state.apply_page(page); if state.next_cursor.is_none() { break; } } all_iids })); } for (i, h) in handles.into_iter().enumerate() { let iids = h.join().unwrap_or_else(|_| panic!("reader {i} panicked")); let unique: HashSet = iids.iter().copied().collect(); assert_eq!(iids.len(), unique.len(), "Reader {i} got duplicates"); assert_eq!( unique.len(), 100, "Reader {i} missed issues: got {}", unique.len() ); } } /// Snapshot fence invalidation: after `reset_pagination()`, the fence is /// cleared and a new read picks up newly written rows. #[test] fn test_snapshot_fence_invalidated_on_refresh() { let path = test_db_path(); let db = DbManager::open(&path).expect("open db"); let base_ts = 1_700_000_000_000_i64; db.with_writer(|conn| { create_schema(conn); seed_issues(conn, 1, 10, base_ts); Ok(()) }) .unwrap(); // First pagination: snapshot fence set. let mut state = IssueListState::default(); let filter = IssueFilter::default(); let page = db .with_reader(|conn| { fetch_issue_list( conn, &filter, SortField::UpdatedAt, SortOrder::Desc, None, None, ) }) .unwrap(); state.apply_page(page); assert_eq!(state.rows.len(), 10); assert!(state.snapshot_fence.is_some()); // Writer adds new issues with FUTURE timestamps. db.with_writer(|conn| { seed_issues(conn, 100, 5, base_ts + 500_000); Ok(()) }) .unwrap(); // WITH fence: new issues should NOT appear. let fenced_page = db .with_reader(|conn| { fetch_issue_list( conn, &filter, SortField::UpdatedAt, SortOrder::Desc, None, state.snapshot_fence, ) }) .unwrap(); assert_eq!( fenced_page.total_count, 10, "Fence should exclude new issues" ); // Manual refresh: reset_pagination clears the fence. state.reset_pagination(); assert!(state.snapshot_fence.is_none()); // WITHOUT fence: new issues should appear. let refreshed_page = db .with_reader(|conn| { fetch_issue_list( conn, &filter, SortField::UpdatedAt, SortOrder::Desc, None, state.snapshot_fence, ) }) .unwrap(); assert_eq!( refreshed_page.total_count, 15, "After refresh, should see all 15 issues" ); } /// Concurrent writer inserts issues with timestamps WITHIN the fence range. /// /// This is the edge case: snapshot fence is timestamp-based, not /// transaction-based, so writes with `updated_at <= fence` CAN appear. /// The keyset cursor still prevents duplicates (no row appears twice), /// but newly inserted rows with old timestamps might appear in later pages. /// /// This test documents the known behavior. #[test] fn test_concurrent_write_within_fence_range() { let path = test_db_path(); let db = Arc::new(DbManager::open(&path).expect("open db")); let base_ts = 1_700_000_000_000_i64; // Seed 100 issues spanning base_ts down to base_ts - 99000. db.with_writer(|conn| { create_schema(conn); seed_issues(conn, 1, 100, base_ts); Ok(()) }) .unwrap(); let barrier = Arc::new(Barrier::new(2)); // Writer: insert issues with timestamps WITHIN the existing range. let db_w = Arc::clone(&db); let barrier_w = Arc::clone(&barrier); let writer = std::thread::spawn(move || { barrier_w.wait(); for i in 0..20 { db_w.with_writer(|conn| { let iid = 500 + i; // Timestamp within the range of existing issues. let ts = base_ts - 50_000 - i * 100; conn.execute( "INSERT INTO issues (gitlab_id, project_id, iid, title, state, author_username, created_at, updated_at, last_seen_at) VALUES (?1, 1, ?2, ?3, 'opened', 'writer', ?4, ?4, ?4)", rusqlite::params![iid * 100, iid, format!("Mid {iid}"), ts], )?; Ok(()) }) .expect("writer insert"); std::thread::yield_now(); } }); // Reader: paginate with fence. let db_r = Arc::clone(&db); let barrier_r = Arc::clone(&barrier); let reader = std::thread::spawn(move || { barrier_r.wait(); let mut all_iids = Vec::new(); let mut state = IssueListState::default(); let filter = IssueFilter::default(); loop { let page = db_r .with_reader(|conn| { fetch_issue_list( conn, &filter, SortField::UpdatedAt, SortOrder::Desc, state.next_cursor.as_ref(), state.snapshot_fence, ) }) .expect("fetch"); if page.rows.is_empty() { break; } for row in &page.rows { all_iids.push(row.iid); } state.apply_page(page); std::thread::yield_now(); if state.next_cursor.is_none() { break; } } all_iids }); writer.join().expect("writer"); let all_iids = reader.join().expect("reader"); // The critical invariant: NO DUPLICATES regardless of timing. let unique: HashSet = all_iids.iter().copied().collect(); assert_eq!( all_iids.len(), unique.len(), "No duplicate IIDs should appear even with concurrent in-range writes" ); // All original issues must still be present. for iid in 1..=100 { assert!(unique.contains(&iid), "Original issue {iid} missing"); } } /// Stress test: 1000 iterations of concurrent read+write with verification. #[test] fn test_pagination_stress_1000_iterations() { let path = test_db_path(); let db = Arc::new(DbManager::open(&path).expect("open db")); let base_ts = 1_700_000_000_000_i64; db.with_writer(|conn| { create_schema(conn); seed_issues(conn, 1, 100, base_ts); Ok(()) }) .unwrap(); // Run 1000 pagination cycles with concurrent writes. let writer_iid = Arc::new(AtomicU64::new(1000)); for iteration in 0..1000 { // Writer: insert one issue per iteration. let next_iid = writer_iid.fetch_add(1, Ordering::Relaxed) as i64; db.with_writer(|conn| { let ts = base_ts + 100_000 + next_iid * 100; conn.execute( "INSERT INTO issues (gitlab_id, project_id, iid, title, state, author_username, created_at, updated_at, last_seen_at) VALUES (?1, 1, ?2, ?3, 'opened', 'stress', ?4, ?4, ?4)", rusqlite::params![next_iid * 100, next_iid, format!("Stress {next_iid}"), ts], )?; Ok(()) }) .expect("stress write"); // Reader: paginate first page, verify no duplicates within that page. let page = db .with_reader(|conn| { fetch_issue_list( conn, &IssueFilter::default(), SortField::UpdatedAt, SortOrder::Desc, None, None, ) }) .unwrap_or_else(|e| panic!("iteration {iteration}: fetch failed: {e}")); let iids: Vec = page.rows.iter().map(|r| r.iid).collect(); let unique: HashSet = iids.iter().copied().collect(); assert_eq!( iids.len(), unique.len(), "Iteration {iteration}: duplicates within a single page" ); } } /// Background writes do NOT invalidate an active snapshot fence. #[test] fn test_background_writes_dont_invalidate_fence() { let path = test_db_path(); let db = DbManager::open(&path).expect("open db"); let base_ts = 1_700_000_000_000_i64; db.with_writer(|conn| { create_schema(conn); seed_issues(conn, 1, 50, base_ts); Ok(()) }) .unwrap(); // Initial pagination sets the fence. let mut state = IssueListState::default(); let filter = IssueFilter::default(); let page = db .with_reader(|conn| { fetch_issue_list( conn, &filter, SortField::UpdatedAt, SortOrder::Desc, None, None, ) }) .unwrap(); state.apply_page(page); let original_fence = state.snapshot_fence; // Simulate background sync writing 20 new issues. db.with_writer(|conn| { seed_issues(conn, 200, 20, base_ts + 1_000_000); Ok(()) }) .unwrap(); // The state's fence should be unchanged — background writes are invisible. assert_eq!(state.snapshot_fence, original_fence); assert_eq!(state.rows.len(), 50); // Re-fetch with the existing fence: still sees only original 50. let fenced = db .with_reader(|conn| { fetch_issue_list( conn, &filter, SortField::UpdatedAt, SortOrder::Desc, None, state.snapshot_fence, ) }) .unwrap(); assert_eq!(fenced.total_count, 50); }