Files
gitlore/tests/cancellation_tests.rs
teernisse af167e2086 test(asupersync): add cancellation, parity, and E2E acceptance tests
- Add 7 cancellation integration tests (ShutdownSignal, transaction rollback)
- Add 7 HTTP behavior parity tests (redirect, proxy, keep-alive, DNS, TLS)
- Add 9 E2E runtime acceptance tests (lifecycle, cancel+resume, tracing, HTTP pipeline)
- Total: 1190 tests, all passing

Phases 4-5 of asupersync migration.
2026-03-06 16:09:41 -05:00

370 lines
14 KiB
Rust

//! Cancellation integration tests for asupersync runtime migration.
//!
//! Verifies:
//! 1. ShutdownSignal stops fan-out loops cleanly (no task leaks)
//! 2. After runtime completes, no background tasks remain (quiescence)
//! 3. Transaction integrity: cancel during fetch-before-write yields zero partial data
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use futures::future::join_all;
use rusqlite::Connection;
use lore::core::db::{create_connection, run_migrations};
use lore::core::shutdown::ShutdownSignal;
fn run<F: std::future::Future<Output = T>, T>(f: F) -> T {
asupersync::runtime::RuntimeBuilder::new()
.build()
.unwrap()
.block_on(f)
}
fn setup_db() -> Connection {
let conn = create_connection(std::path::Path::new(":memory:")).unwrap();
run_migrations(&conn).unwrap();
conn.execute(
"INSERT INTO projects (gitlab_project_id, path_with_namespace, web_url)
VALUES (100, 'test/repo', 'https://git.example.com/test/repo')",
[],
)
.unwrap();
conn
}
// ───────────────────────────────────────────────────────────────────
// Test 1: ShutdownSignal cancels fan-out mid-batch
// ───────────────────────────────────────────────────────────────────
#[test]
fn cancel_mid_fanout_drains_all_inflight_tasks() {
run(async {
let signal = ShutdownSignal::new();
let completed = Arc::new(AtomicUsize::new(0));
// Simulate a batch of 10 concurrent tasks (like join_all prefetch)
let futures: Vec<_> = (0..10)
.map(|i| {
let completed = Arc::clone(&completed);
async move {
// Simulate some async work
asupersync::time::sleep(
asupersync::time::wall_now(),
std::time::Duration::from_millis(1),
)
.await;
completed.fetch_add(1, Ordering::Relaxed);
i
}
})
.collect();
// Cancel signal BEFORE awaiting the batch
signal.cancel();
// join_all still completes all in-flight futures
let results = join_all(futures).await;
assert_eq!(results.len(), 10, "all futures should resolve");
assert_eq!(
completed.load(Ordering::Relaxed),
10,
"all tasks should have completed"
);
// After cancellation, the signal should be observable
assert!(signal.is_cancelled());
});
}
// ───────────────────────────────────────────────────────────────────
// Test 2: Cancellation loop pattern — signal checked between batches
// ───────────────────────────────────────────────────────────────────
#[test]
fn cancel_between_batches_stops_processing() {
run(async {
let signal = ShutdownSignal::new();
let batches_processed = Arc::new(AtomicUsize::new(0));
// Simulate the ingestion loop pattern: check signal between batches
for batch_num in 0..5 {
if signal.is_cancelled() {
break;
}
// Simulate batch work
let futures: Vec<_> = (0..3)
.map(|_| async {
asupersync::time::sleep(
asupersync::time::wall_now(),
std::time::Duration::from_millis(1),
)
.await;
})
.collect();
join_all(futures).await;
batches_processed.fetch_add(1, Ordering::Relaxed);
// Cancel after first batch completes
if batch_num == 0 {
signal.cancel();
}
}
assert_eq!(
batches_processed.load(Ordering::Relaxed),
1,
"only first batch should complete before cancellation stops the loop"
);
});
}
// ───────────────────────────────────────────────────────────────────
// Test 3: RuntimeHandle::spawn tasks complete before runtime drops
// ───────────────────────────────────────────────────────────────────
#[test]
fn spawned_tasks_complete_before_runtime_drop() {
let completed = Arc::new(AtomicUsize::new(0));
let completed_clone = Arc::clone(&completed);
let rt = asupersync::runtime::RuntimeBuilder::new().build().unwrap();
let handle = rt.handle();
// Spawn background tasks (like the signal handler pattern)
for _ in 0..5 {
let c = Arc::clone(&completed_clone);
handle.spawn(async move {
asupersync::time::sleep(
asupersync::time::wall_now(),
std::time::Duration::from_millis(1),
)
.await;
c.fetch_add(1, Ordering::Relaxed);
});
}
rt.block_on(async {
// Give spawned tasks a chance to run
asupersync::time::sleep(
asupersync::time::wall_now(),
std::time::Duration::from_millis(50),
)
.await;
});
// After block_on returns, spawned tasks should have completed
assert_eq!(
completed.load(Ordering::Relaxed),
5,
"all spawned tasks should complete"
);
}
// ───────────────────────────────────────────────────────────────────
// Test 4: Transaction integrity — cancel during fetch, before write
// ───────────────────────────────────────────────────────────────────
#[test]
fn cancel_during_fetch_commits_zero_partial_data() {
let conn = setup_db();
run(async {
let signal = ShutdownSignal::new();
// Simulate the fetch-then-write pattern from orchestrator
let mut items_written = 0usize;
for batch_num in 0..3 {
if signal.is_cancelled() {
break;
}
// Phase 1: Concurrent fetch (simulated)
let fetched: Vec<i64> = {
let futures: Vec<_> = (0..5)
.map(|i| async move {
asupersync::time::sleep(
asupersync::time::wall_now(),
std::time::Duration::from_millis(1),
)
.await;
(batch_num * 5 + i) as i64
})
.collect();
join_all(futures).await
};
// Cancel after first fetch completes but before second batch writes
if batch_num == 0 {
signal.cancel();
}
// Phase 2: Serial DB write (in transaction)
let tx = conn.unchecked_transaction().unwrap();
for iid in &fetched {
tx.execute(
"INSERT INTO issues (
gitlab_id, project_id, iid, title, state,
author_username, created_at, updated_at, last_seen_at
) VALUES (?1, 1, ?2, 'test', 'opened', 'bot', 1000, 2000, 3000)",
rusqlite::params![iid + 1000, iid],
)
.unwrap();
}
tx.commit().unwrap();
items_written += fetched.len();
}
// Only batch 0's data should be written (signal checked before batch 1 fetch)
assert_eq!(items_written, 5, "only one batch should have been written");
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM issues WHERE project_id = 1",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(count, 5, "exactly one batch of 5 issues in DB");
});
}
// ───────────────────────────────────────────────────────────────────
// Test 5: SAVEPOINT rollback on cancellation (embedding pattern)
// ───────────────────────────────────────────────────────────────────
#[test]
fn savepoint_rollback_on_cancel_preserves_prior_data() {
let conn = setup_db();
run(async {
let signal = ShutdownSignal::new();
// Write page 1 successfully
conn.execute_batch("SAVEPOINT embed_page").unwrap();
conn.execute(
"INSERT INTO issues (
gitlab_id, project_id, iid, title, state,
author_username, created_at, updated_at, last_seen_at
) VALUES (2001, 1, 1, 'page1-issue', 'opened', 'bot', 1000, 2000, 3000)",
[],
)
.unwrap();
conn.execute_batch("RELEASE embed_page").unwrap();
// Start page 2, write partial data, then cancel
conn.execute_batch("SAVEPOINT embed_page").unwrap();
conn.execute(
"INSERT INTO issues (
gitlab_id, project_id, iid, title, state,
author_username, created_at, updated_at, last_seen_at
) VALUES (2002, 1, 2, 'page2-issue', 'opened', 'bot', 1000, 2000, 3000)",
[],
)
.unwrap();
// Cancel mid-page
signal.cancel();
// Rollback the incomplete page (matches embed_documents pattern)
if signal.is_cancelled() {
conn.execute_batch("ROLLBACK TO embed_page; RELEASE embed_page")
.unwrap();
}
// Page 1 data survives, page 2 data rolled back
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM issues WHERE project_id = 1",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(count, 1, "only page 1 issue should survive");
let title: String = conn
.query_row("SELECT title FROM issues WHERE project_id = 1", [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(
title, "page1-issue",
"surviving issue should be from page 1"
);
});
}
// ───────────────────────────────────────────────────────────────────
// Test 6: Transaction drop (implicit rollback) on error
// ───────────────────────────────────────────────────────────────────
#[test]
fn transaction_drop_without_commit_rolls_back() {
let conn = setup_db();
// Start transaction, write data, then drop without commit
{
let tx = conn.unchecked_transaction().unwrap();
tx.execute(
"INSERT INTO issues (
gitlab_id, project_id, iid, title, state,
author_username, created_at, updated_at, last_seen_at
) VALUES (3001, 1, 1, 'dropped', 'opened', 'bot', 1000, 2000, 3000)",
[],
)
.unwrap();
// tx dropped here without commit
}
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM issues WHERE project_id = 1",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(count, 0, "dropped transaction should roll back all writes");
}
// ───────────────────────────────────────────────────────────────────
// Test 7: Signal propagates across clones (thread safety)
// ───────────────────────────────────────────────────────────────────
#[test]
fn signal_propagates_across_async_tasks() {
run(async {
let signal = ShutdownSignal::new();
// Spawn tasks that observe the signal
let futures: Vec<_> = (0..5)
.map(|_| {
let s = signal.clone();
async move {
// Busy-wait briefly, then check
asupersync::time::sleep(
asupersync::time::wall_now(),
std::time::Duration::from_millis(10),
)
.await;
s.is_cancelled()
}
})
.collect();
// Cancel before tasks complete their sleep
signal.cancel();
let results = join_all(futures).await;
assert!(
results.iter().all(|&cancelled| cancelled),
"all cloned signals should observe cancellation"
);
});
}