//! Cancellation integration tests for asupersync runtime migration. //! //! Verifies: //! 1. ShutdownSignal stops fan-out loops cleanly (no task leaks) //! 2. After runtime completes, no background tasks remain (quiescence) //! 3. Transaction integrity: cancel during fetch-before-write yields zero partial data use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use futures::future::join_all; use rusqlite::Connection; use lore::core::db::{create_connection, run_migrations}; use lore::core::shutdown::ShutdownSignal; fn run, T>(f: F) -> T { asupersync::runtime::RuntimeBuilder::new() .build() .unwrap() .block_on(f) } fn setup_db() -> Connection { let conn = create_connection(std::path::Path::new(":memory:")).unwrap(); run_migrations(&conn).unwrap(); conn.execute( "INSERT INTO projects (gitlab_project_id, path_with_namespace, web_url) VALUES (100, 'test/repo', 'https://git.example.com/test/repo')", [], ) .unwrap(); conn } // ─────────────────────────────────────────────────────────────────── // Test 1: ShutdownSignal cancels fan-out mid-batch // ─────────────────────────────────────────────────────────────────── #[test] fn cancel_mid_fanout_drains_all_inflight_tasks() { run(async { let signal = ShutdownSignal::new(); let completed = Arc::new(AtomicUsize::new(0)); // Simulate a batch of 10 concurrent tasks (like join_all prefetch) let futures: Vec<_> = (0..10) .map(|i| { let completed = Arc::clone(&completed); async move { // Simulate some async work asupersync::time::sleep( asupersync::time::wall_now(), std::time::Duration::from_millis(1), ) .await; completed.fetch_add(1, Ordering::Relaxed); i } }) .collect(); // Cancel signal BEFORE awaiting the batch signal.cancel(); // join_all still completes all in-flight futures let results = join_all(futures).await; assert_eq!(results.len(), 10, "all futures should resolve"); assert_eq!( completed.load(Ordering::Relaxed), 10, "all tasks should have completed" ); // After cancellation, the signal should be observable assert!(signal.is_cancelled()); }); } // ─────────────────────────────────────────────────────────────────── // Test 2: Cancellation loop pattern — signal checked between batches // ─────────────────────────────────────────────────────────────────── #[test] fn cancel_between_batches_stops_processing() { run(async { let signal = ShutdownSignal::new(); let batches_processed = Arc::new(AtomicUsize::new(0)); // Simulate the ingestion loop pattern: check signal between batches for batch_num in 0..5 { if signal.is_cancelled() { break; } // Simulate batch work let futures: Vec<_> = (0..3) .map(|_| async { asupersync::time::sleep( asupersync::time::wall_now(), std::time::Duration::from_millis(1), ) .await; }) .collect(); join_all(futures).await; batches_processed.fetch_add(1, Ordering::Relaxed); // Cancel after first batch completes if batch_num == 0 { signal.cancel(); } } assert_eq!( batches_processed.load(Ordering::Relaxed), 1, "only first batch should complete before cancellation stops the loop" ); }); } // ─────────────────────────────────────────────────────────────────── // Test 3: RuntimeHandle::spawn tasks complete before runtime drops // ─────────────────────────────────────────────────────────────────── #[test] fn spawned_tasks_complete_before_runtime_drop() { let completed = Arc::new(AtomicUsize::new(0)); let completed_clone = Arc::clone(&completed); let rt = asupersync::runtime::RuntimeBuilder::new().build().unwrap(); let handle = rt.handle(); // Spawn background tasks (like the signal handler pattern) for _ in 0..5 { let c = Arc::clone(&completed_clone); handle.spawn(async move { asupersync::time::sleep( asupersync::time::wall_now(), std::time::Duration::from_millis(1), ) .await; c.fetch_add(1, Ordering::Relaxed); }); } rt.block_on(async { // Give spawned tasks a chance to run asupersync::time::sleep( asupersync::time::wall_now(), std::time::Duration::from_millis(50), ) .await; }); // After block_on returns, spawned tasks should have completed assert_eq!( completed.load(Ordering::Relaxed), 5, "all spawned tasks should complete" ); } // ─────────────────────────────────────────────────────────────────── // Test 4: Transaction integrity — cancel during fetch, before write // ─────────────────────────────────────────────────────────────────── #[test] fn cancel_during_fetch_commits_zero_partial_data() { let conn = setup_db(); run(async { let signal = ShutdownSignal::new(); // Simulate the fetch-then-write pattern from orchestrator let mut items_written = 0usize; for batch_num in 0..3 { if signal.is_cancelled() { break; } // Phase 1: Concurrent fetch (simulated) let fetched: Vec = { let futures: Vec<_> = (0..5) .map(|i| async move { asupersync::time::sleep( asupersync::time::wall_now(), std::time::Duration::from_millis(1), ) .await; (batch_num * 5 + i) as i64 }) .collect(); join_all(futures).await }; // Cancel after first fetch completes but before second batch writes if batch_num == 0 { signal.cancel(); } // Phase 2: Serial DB write (in transaction) let tx = conn.unchecked_transaction().unwrap(); for iid in &fetched { tx.execute( "INSERT INTO issues ( gitlab_id, project_id, iid, title, state, author_username, created_at, updated_at, last_seen_at ) VALUES (?1, 1, ?2, 'test', 'opened', 'bot', 1000, 2000, 3000)", rusqlite::params![iid + 1000, iid], ) .unwrap(); } tx.commit().unwrap(); items_written += fetched.len(); } // Only batch 0's data should be written (signal checked before batch 1 fetch) assert_eq!(items_written, 5, "only one batch should have been written"); let count: i64 = conn .query_row( "SELECT COUNT(*) FROM issues WHERE project_id = 1", [], |row| row.get(0), ) .unwrap(); assert_eq!(count, 5, "exactly one batch of 5 issues in DB"); }); } // ─────────────────────────────────────────────────────────────────── // Test 5: SAVEPOINT rollback on cancellation (embedding pattern) // ─────────────────────────────────────────────────────────────────── #[test] fn savepoint_rollback_on_cancel_preserves_prior_data() { let conn = setup_db(); run(async { let signal = ShutdownSignal::new(); // Write page 1 successfully conn.execute_batch("SAVEPOINT embed_page").unwrap(); conn.execute( "INSERT INTO issues ( gitlab_id, project_id, iid, title, state, author_username, created_at, updated_at, last_seen_at ) VALUES (2001, 1, 1, 'page1-issue', 'opened', 'bot', 1000, 2000, 3000)", [], ) .unwrap(); conn.execute_batch("RELEASE embed_page").unwrap(); // Start page 2, write partial data, then cancel conn.execute_batch("SAVEPOINT embed_page").unwrap(); conn.execute( "INSERT INTO issues ( gitlab_id, project_id, iid, title, state, author_username, created_at, updated_at, last_seen_at ) VALUES (2002, 1, 2, 'page2-issue', 'opened', 'bot', 1000, 2000, 3000)", [], ) .unwrap(); // Cancel mid-page signal.cancel(); // Rollback the incomplete page (matches embed_documents pattern) if signal.is_cancelled() { conn.execute_batch("ROLLBACK TO embed_page; RELEASE embed_page") .unwrap(); } // Page 1 data survives, page 2 data rolled back let count: i64 = conn .query_row( "SELECT COUNT(*) FROM issues WHERE project_id = 1", [], |row| row.get(0), ) .unwrap(); assert_eq!(count, 1, "only page 1 issue should survive"); let title: String = conn .query_row("SELECT title FROM issues WHERE project_id = 1", [], |row| { row.get(0) }) .unwrap(); assert_eq!( title, "page1-issue", "surviving issue should be from page 1" ); }); } // ─────────────────────────────────────────────────────────────────── // Test 6: Transaction drop (implicit rollback) on error // ─────────────────────────────────────────────────────────────────── #[test] fn transaction_drop_without_commit_rolls_back() { let conn = setup_db(); // Start transaction, write data, then drop without commit { let tx = conn.unchecked_transaction().unwrap(); tx.execute( "INSERT INTO issues ( gitlab_id, project_id, iid, title, state, author_username, created_at, updated_at, last_seen_at ) VALUES (3001, 1, 1, 'dropped', 'opened', 'bot', 1000, 2000, 3000)", [], ) .unwrap(); // tx dropped here without commit } let count: i64 = conn .query_row( "SELECT COUNT(*) FROM issues WHERE project_id = 1", [], |row| row.get(0), ) .unwrap(); assert_eq!(count, 0, "dropped transaction should roll back all writes"); } // ─────────────────────────────────────────────────────────────────── // Test 7: Signal propagates across clones (thread safety) // ─────────────────────────────────────────────────────────────────── #[test] fn signal_propagates_across_async_tasks() { run(async { let signal = ShutdownSignal::new(); // Spawn tasks that observe the signal let futures: Vec<_> = (0..5) .map(|_| { let s = signal.clone(); async move { // Busy-wait briefly, then check asupersync::time::sleep( asupersync::time::wall_now(), std::time::Duration::from_millis(10), ) .await; s.is_cancelled() } }) .collect(); // Cancel before tasks complete their sleep signal.cancel(); let results = join_all(futures).await; assert!( results.iter().all(|&cancelled| cancelled), "all cloned signals should observe cancellation" ); }); }