Files
gitlore/crates/lore-tui/tests/pagination_race_test.rs
teernisse 01491b4180 feat(tui): add soak + pagination race tests (bd-14hv)
7 soak tests: 50k-event sustained load, watchdog timeout, render
interleaving, screen cycling, mode oscillation, depth bounds, multi-seed.
7 pagination race tests: concurrent read/write with snapshot fence,
multi-reader, within-fence writes, stress 1000 iterations.
2026-02-19 07:49:22 -05:00

672 lines
20 KiB
Rust

//! Concurrent pagination/write race tests (bd-14hv).
//!
//! Proves that the keyset pagination + snapshot fence mechanism prevents
//! duplicate or skipped rows when a writer inserts new issues concurrently
//! with a reader paginating through the issue list.
//!
//! Architecture:
//! - DbManager (3 readers + 1 writer) with WAL mode
//! - Reader threads: paginate using `fetch_issue_list()` with keyset cursor
//! - Writer thread: INSERT new issues concurrently
//! - Assertions: no duplicate IIDs, snapshot fence excludes new writes
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Barrier};
use rusqlite::Connection;
use lore_tui::action::fetch_issue_list;
use lore_tui::db::DbManager;
use lore_tui::state::issue_list::{IssueFilter, IssueListState, SortField, SortOrder};
// ---------------------------------------------------------------------------
// Test infrastructure
// ---------------------------------------------------------------------------
static DB_COUNTER: AtomicU64 = AtomicU64::new(0);
fn test_db_path() -> PathBuf {
let n = DB_COUNTER.fetch_add(1, Ordering::Relaxed);
let dir = std::env::temp_dir().join("lore-tui-pagination-tests");
std::fs::create_dir_all(&dir).expect("create test dir");
dir.join(format!(
"race-{}-{:?}-{n}.db",
std::process::id(),
std::thread::current().id(),
))
}
/// Create the schema needed for issue list queries.
fn create_schema(conn: &Connection) {
conn.execute_batch(
"
CREATE TABLE projects (
id INTEGER PRIMARY KEY,
gitlab_project_id INTEGER UNIQUE NOT NULL,
path_with_namespace TEXT NOT NULL
);
CREATE TABLE issues (
id INTEGER PRIMARY KEY,
gitlab_id INTEGER UNIQUE NOT NULL,
project_id INTEGER NOT NULL,
iid INTEGER NOT NULL,
title TEXT,
state TEXT NOT NULL,
author_username TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
last_seen_at INTEGER NOT NULL
);
CREATE TABLE labels (
id INTEGER PRIMARY KEY,
gitlab_id INTEGER,
project_id INTEGER NOT NULL,
name TEXT NOT NULL,
color TEXT,
description TEXT
);
CREATE TABLE issue_labels (
issue_id INTEGER NOT NULL,
label_id INTEGER NOT NULL,
PRIMARY KEY(issue_id, label_id)
);
INSERT INTO projects (gitlab_project_id, path_with_namespace)
VALUES (1, 'group/project');
",
)
.expect("create schema");
}
/// Insert N issues with sequential IIDs starting from `start_iid`.
///
/// Each issue gets `updated_at = base_ts - (offset * 1000)` to create
/// a deterministic ordering for keyset pagination (newest first).
fn seed_issues(conn: &Connection, start_iid: i64, count: i64, base_ts: i64) {
let mut stmt = conn
.prepare(
"INSERT INTO issues (gitlab_id, project_id, iid, title, state,
author_username, created_at, updated_at, last_seen_at)
VALUES (?1, 1, ?2, ?3, 'opened', 'alice', ?4, ?4, ?4)",
)
.expect("prepare insert");
for i in 0..count {
let iid = start_iid + i;
let ts = base_ts - (i * 1000);
stmt.execute(rusqlite::params![
iid * 100, // gitlab_id
iid,
format!("Issue {iid}"),
ts,
])
.expect("insert issue");
}
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
/// Paginate through all issues without concurrent writes.
///
/// Baseline: keyset pagination yields every IID exactly once.
#[test]
fn test_pagination_no_duplicates_baseline() {
let path = test_db_path();
let db = DbManager::open(&path).expect("open db");
let base_ts = 1_700_000_000_000_i64;
db.with_writer(|conn| {
create_schema(conn);
seed_issues(conn, 1, 200, base_ts);
Ok(())
})
.unwrap();
// Paginate through all issues collecting IIDs.
let mut all_iids = Vec::new();
let mut state = IssueListState::default();
let filter = IssueFilter::default();
loop {
let page = db
.with_reader(|conn| {
fetch_issue_list(
conn,
&filter,
SortField::UpdatedAt,
SortOrder::Desc,
state.next_cursor.as_ref(),
state.snapshot_fence,
)
})
.expect("fetch page");
if page.rows.is_empty() {
break;
}
for row in &page.rows {
all_iids.push(row.iid);
}
state.apply_page(page);
if state.next_cursor.is_none() {
break;
}
}
// Every IID 1..=200 should appear exactly once.
let unique: HashSet<i64> = all_iids.iter().copied().collect();
assert_eq!(
unique.len(),
200,
"Expected 200 unique IIDs, got {}",
unique.len()
);
assert_eq!(
all_iids.len(),
200,
"Expected 200 total IIDs, got {} (duplicates present)",
all_iids.len()
);
}
/// Concurrent writer inserts NEW issues (with future timestamps) while
/// reader paginates. Snapshot fence should exclude the new rows.
#[test]
fn test_pagination_no_duplicates_with_concurrent_writes() {
let path = test_db_path();
let db = Arc::new(DbManager::open(&path).expect("open db"));
let base_ts = 1_700_000_000_000_i64;
// Seed 200 issues.
db.with_writer(|conn| {
create_schema(conn);
seed_issues(conn, 1, 200, base_ts);
Ok(())
})
.unwrap();
// Barrier to synchronize reader and writer start.
let barrier = Arc::new(Barrier::new(2));
// Writer thread: inserts issues with NEWER timestamps (above the fence).
let db_w = Arc::clone(&db);
let barrier_w = Arc::clone(&barrier);
let writer = std::thread::spawn(move || {
barrier_w.wait();
for batch in 0..10 {
db_w.with_writer(|conn| {
for i in 0..10 {
let iid = 1000 + batch * 10 + i;
// Future timestamp: above the snapshot fence.
let ts = base_ts + 100_000 + (batch * 10 + i) * 1000;
conn.execute(
"INSERT INTO issues (gitlab_id, project_id, iid, title, state,
author_username, created_at, updated_at, last_seen_at)
VALUES (?1, 1, ?2, ?3, 'opened', 'writer', ?4, ?4, ?4)",
rusqlite::params![iid * 100, iid, format!("New {iid}"), ts],
)?;
}
Ok(())
})
.expect("writer batch");
// Small yield to interleave with reader.
std::thread::yield_now();
}
});
// Reader thread: paginate with snapshot fence.
let db_r = Arc::clone(&db);
let barrier_r = Arc::clone(&barrier);
let reader = std::thread::spawn(move || {
barrier_r.wait();
let mut all_iids = Vec::new();
let mut state = IssueListState::default();
let filter = IssueFilter::default();
loop {
let page = db_r
.with_reader(|conn| {
fetch_issue_list(
conn,
&filter,
SortField::UpdatedAt,
SortOrder::Desc,
state.next_cursor.as_ref(),
state.snapshot_fence,
)
})
.expect("fetch page");
if page.rows.is_empty() {
break;
}
for row in &page.rows {
all_iids.push(row.iid);
}
state.apply_page(page);
// Yield to let writer interleave.
std::thread::yield_now();
if state.next_cursor.is_none() {
break;
}
}
all_iids
});
writer.join().expect("writer thread");
let all_iids = reader.join().expect("reader thread");
// The critical invariant: NO DUPLICATES.
let unique: HashSet<i64> = all_iids.iter().copied().collect();
assert_eq!(
all_iids.len(),
unique.len(),
"Duplicate IIDs found in pagination results"
);
// All original issues present.
for iid in 1..=200 {
assert!(
unique.contains(&iid),
"Original issue {iid} missing from pagination"
);
}
// Writer issues may appear on the first page (before the fence is
// established), but should NOT cause duplicates. Count them as a
// diagnostic.
let writer_count = all_iids.iter().filter(|&&iid| iid >= 1000).count();
eprintln!("Writer issues visible through fence: {writer_count} (expected: few or zero)");
}
/// Multiple concurrent readers paginating simultaneously — no interference.
#[test]
fn test_multiple_concurrent_readers() {
let path = test_db_path();
let db = Arc::new(DbManager::open(&path).expect("open db"));
let base_ts = 1_700_000_000_000_i64;
db.with_writer(|conn| {
create_schema(conn);
seed_issues(conn, 1, 100, base_ts);
Ok(())
})
.unwrap();
let barrier = Arc::new(Barrier::new(4));
let mut handles = Vec::new();
for reader_id in 0..4 {
let db_r = Arc::clone(&db);
let barrier_r = Arc::clone(&barrier);
handles.push(std::thread::spawn(move || {
barrier_r.wait();
let mut all_iids = Vec::new();
let mut state = IssueListState::default();
let filter = IssueFilter::default();
loop {
let page = db_r
.with_reader(|conn| {
fetch_issue_list(
conn,
&filter,
SortField::UpdatedAt,
SortOrder::Desc,
state.next_cursor.as_ref(),
state.snapshot_fence,
)
})
.unwrap_or_else(|e| panic!("reader {reader_id} fetch failed: {e}"));
if page.rows.is_empty() {
break;
}
for row in &page.rows {
all_iids.push(row.iid);
}
state.apply_page(page);
if state.next_cursor.is_none() {
break;
}
}
all_iids
}));
}
for (i, h) in handles.into_iter().enumerate() {
let iids = h.join().unwrap_or_else(|_| panic!("reader {i} panicked"));
let unique: HashSet<i64> = iids.iter().copied().collect();
assert_eq!(iids.len(), unique.len(), "Reader {i} got duplicates");
assert_eq!(
unique.len(),
100,
"Reader {i} missed issues: got {}",
unique.len()
);
}
}
/// Snapshot fence invalidation: after `reset_pagination()`, the fence is
/// cleared and a new read picks up newly written rows.
#[test]
fn test_snapshot_fence_invalidated_on_refresh() {
let path = test_db_path();
let db = DbManager::open(&path).expect("open db");
let base_ts = 1_700_000_000_000_i64;
db.with_writer(|conn| {
create_schema(conn);
seed_issues(conn, 1, 10, base_ts);
Ok(())
})
.unwrap();
// First pagination: snapshot fence set.
let mut state = IssueListState::default();
let filter = IssueFilter::default();
let page = db
.with_reader(|conn| {
fetch_issue_list(
conn,
&filter,
SortField::UpdatedAt,
SortOrder::Desc,
None,
None,
)
})
.unwrap();
state.apply_page(page);
assert_eq!(state.rows.len(), 10);
assert!(state.snapshot_fence.is_some());
// Writer adds new issues with FUTURE timestamps.
db.with_writer(|conn| {
seed_issues(conn, 100, 5, base_ts + 500_000);
Ok(())
})
.unwrap();
// WITH fence: new issues should NOT appear.
let fenced_page = db
.with_reader(|conn| {
fetch_issue_list(
conn,
&filter,
SortField::UpdatedAt,
SortOrder::Desc,
None,
state.snapshot_fence,
)
})
.unwrap();
assert_eq!(
fenced_page.total_count, 10,
"Fence should exclude new issues"
);
// Manual refresh: reset_pagination clears the fence.
state.reset_pagination();
assert!(state.snapshot_fence.is_none());
// WITHOUT fence: new issues should appear.
let refreshed_page = db
.with_reader(|conn| {
fetch_issue_list(
conn,
&filter,
SortField::UpdatedAt,
SortOrder::Desc,
None,
state.snapshot_fence,
)
})
.unwrap();
assert_eq!(
refreshed_page.total_count, 15,
"After refresh, should see all 15 issues"
);
}
/// Concurrent writer inserts issues with timestamps WITHIN the fence range.
///
/// This is the edge case: snapshot fence is timestamp-based, not
/// transaction-based, so writes with `updated_at <= fence` CAN appear.
/// The keyset cursor still prevents duplicates (no row appears twice),
/// but newly inserted rows with old timestamps might appear in later pages.
///
/// This test documents the known behavior.
#[test]
fn test_concurrent_write_within_fence_range() {
let path = test_db_path();
let db = Arc::new(DbManager::open(&path).expect("open db"));
let base_ts = 1_700_000_000_000_i64;
// Seed 100 issues spanning base_ts down to base_ts - 99000.
db.with_writer(|conn| {
create_schema(conn);
seed_issues(conn, 1, 100, base_ts);
Ok(())
})
.unwrap();
let barrier = Arc::new(Barrier::new(2));
// Writer: insert issues with timestamps WITHIN the existing range.
let db_w = Arc::clone(&db);
let barrier_w = Arc::clone(&barrier);
let writer = std::thread::spawn(move || {
barrier_w.wait();
for i in 0..20 {
db_w.with_writer(|conn| {
let iid = 500 + i;
// Timestamp within the range of existing issues.
let ts = base_ts - 50_000 - i * 100;
conn.execute(
"INSERT INTO issues (gitlab_id, project_id, iid, title, state,
author_username, created_at, updated_at, last_seen_at)
VALUES (?1, 1, ?2, ?3, 'opened', 'writer', ?4, ?4, ?4)",
rusqlite::params![iid * 100, iid, format!("Mid {iid}"), ts],
)?;
Ok(())
})
.expect("writer insert");
std::thread::yield_now();
}
});
// Reader: paginate with fence.
let db_r = Arc::clone(&db);
let barrier_r = Arc::clone(&barrier);
let reader = std::thread::spawn(move || {
barrier_r.wait();
let mut all_iids = Vec::new();
let mut state = IssueListState::default();
let filter = IssueFilter::default();
loop {
let page = db_r
.with_reader(|conn| {
fetch_issue_list(
conn,
&filter,
SortField::UpdatedAt,
SortOrder::Desc,
state.next_cursor.as_ref(),
state.snapshot_fence,
)
})
.expect("fetch");
if page.rows.is_empty() {
break;
}
for row in &page.rows {
all_iids.push(row.iid);
}
state.apply_page(page);
std::thread::yield_now();
if state.next_cursor.is_none() {
break;
}
}
all_iids
});
writer.join().expect("writer");
let all_iids = reader.join().expect("reader");
// The critical invariant: NO DUPLICATES regardless of timing.
let unique: HashSet<i64> = all_iids.iter().copied().collect();
assert_eq!(
all_iids.len(),
unique.len(),
"No duplicate IIDs should appear even with concurrent in-range writes"
);
// All original issues must still be present.
for iid in 1..=100 {
assert!(unique.contains(&iid), "Original issue {iid} missing");
}
}
/// Stress test: 1000 iterations of concurrent read+write with verification.
#[test]
fn test_pagination_stress_1000_iterations() {
let path = test_db_path();
let db = Arc::new(DbManager::open(&path).expect("open db"));
let base_ts = 1_700_000_000_000_i64;
db.with_writer(|conn| {
create_schema(conn);
seed_issues(conn, 1, 100, base_ts);
Ok(())
})
.unwrap();
// Run 1000 pagination cycles with concurrent writes.
let writer_iid = Arc::new(AtomicU64::new(1000));
for iteration in 0..1000 {
// Writer: insert one issue per iteration.
let next_iid = writer_iid.fetch_add(1, Ordering::Relaxed) as i64;
db.with_writer(|conn| {
let ts = base_ts + 100_000 + next_iid * 100;
conn.execute(
"INSERT INTO issues (gitlab_id, project_id, iid, title, state,
author_username, created_at, updated_at, last_seen_at)
VALUES (?1, 1, ?2, ?3, 'opened', 'stress', ?4, ?4, ?4)",
rusqlite::params![next_iid * 100, next_iid, format!("Stress {next_iid}"), ts],
)?;
Ok(())
})
.expect("stress write");
// Reader: paginate first page, verify no duplicates within that page.
let page = db
.with_reader(|conn| {
fetch_issue_list(
conn,
&IssueFilter::default(),
SortField::UpdatedAt,
SortOrder::Desc,
None,
None,
)
})
.unwrap_or_else(|e| panic!("iteration {iteration}: fetch failed: {e}"));
let iids: Vec<i64> = page.rows.iter().map(|r| r.iid).collect();
let unique: HashSet<i64> = iids.iter().copied().collect();
assert_eq!(
iids.len(),
unique.len(),
"Iteration {iteration}: duplicates within a single page"
);
}
}
/// Background writes do NOT invalidate an active snapshot fence.
#[test]
fn test_background_writes_dont_invalidate_fence() {
let path = test_db_path();
let db = DbManager::open(&path).expect("open db");
let base_ts = 1_700_000_000_000_i64;
db.with_writer(|conn| {
create_schema(conn);
seed_issues(conn, 1, 50, base_ts);
Ok(())
})
.unwrap();
// Initial pagination sets the fence.
let mut state = IssueListState::default();
let filter = IssueFilter::default();
let page = db
.with_reader(|conn| {
fetch_issue_list(
conn,
&filter,
SortField::UpdatedAt,
SortOrder::Desc,
None,
None,
)
})
.unwrap();
state.apply_page(page);
let original_fence = state.snapshot_fence;
// Simulate background sync writing 20 new issues.
db.with_writer(|conn| {
seed_issues(conn, 200, 20, base_ts + 1_000_000);
Ok(())
})
.unwrap();
// The state's fence should be unchanged — background writes are invisible.
assert_eq!(state.snapshot_fence, original_fence);
assert_eq!(state.rows.len(), 50);
// Re-fetch with the existing fence: still sees only original 50.
let fenced = db
.with_reader(|conn| {
fetch_issue_list(
conn,
&filter,
SortField::UpdatedAt,
SortOrder::Desc,
None,
state.snapshot_fence,
)
})
.unwrap();
assert_eq!(fenced.total_count, 50);
}