//! E2E runtime acceptance tests for the asupersync migration. //! //! Proves the full runtime lifecycle works end-to-end: //! 1. RuntimeBuilder creates a working runtime with IO and timers //! 2. ShutdownSignal cancellation mid-flow stops processing cleanly //! 3. Resume: second run picks up where the cancelled first run left off //! 4. Structured tracing events fire with expected fields //! 5. No DB corruption across cancel + resume cycle //! 6. Real HTTP ingestion pipeline (GitLabClient -> pagination -> DB writes) use std::io::{Read, Write}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use futures::future::join_all; use rusqlite::Connection; use serde_json::json; use lore::core::config::{ Config, EmbeddingConfig, GitLabConfig, LoggingConfig, ProjectConfig, ScoringConfig, StorageConfig, SyncConfig, }; use lore::core::db::{create_connection, run_migrations}; use lore::core::shutdown::ShutdownSignal; use lore::gitlab::GitLabClient; use lore::ingestion::ingest_issues; fn run, T>(f: F) -> T { asupersync::runtime::RuntimeBuilder::new() .build() .unwrap() .block_on(f) } fn setup_db() -> Connection { let conn = create_connection(std::path::Path::new(":memory:")).unwrap(); run_migrations(&conn).unwrap(); conn.execute( "INSERT INTO projects (id, gitlab_project_id, path_with_namespace, web_url) VALUES (1, 100, 'test/repo', 'https://git.example.com/test/repo')", [], ) .unwrap(); conn } /// Simulates a multi-phase sync pipeline. Returns the number of phases completed /// and the IIDs that were "synced" (written to DB). /// /// If `cancel_after_batches` is `Some(n)`, the signal is cancelled after `n` /// batches complete, causing the loop to exit before processing subsequent chunks. async fn run_sync_pipeline( conn: &Connection, signal: &ShutdownSignal, project_id: i64, all_iids: &[i64], batch_size: usize, cancel_after_batches: Option, ) -> (usize, Vec) { let mut phases_completed = 0usize; let mut synced_iids = Vec::new(); for chunk in all_iids.chunks(batch_size) { if signal.is_cancelled() { break; } // Phase: concurrent fetch (simulated) let fetched: Vec = { let futs: Vec<_> = chunk .iter() .map(|&iid| async move { asupersync::time::sleep(asupersync::time::wall_now(), Duration::from_millis(1)) .await; iid }) .collect(); join_all(futs).await }; if signal.is_cancelled() { break; } // Phase: DB write (transactional) let tx = conn.unchecked_transaction().unwrap(); for &iid in &fetched { tx.execute( "INSERT OR IGNORE INTO issues ( gitlab_id, project_id, iid, title, state, author_username, created_at, updated_at, last_seen_at ) VALUES (?1, ?2, ?3, 'issue', 'opened', 'bot', 1000, 2000, 3000)", rusqlite::params![iid + 10000, project_id, iid], ) .unwrap(); } tx.commit().unwrap(); synced_iids.extend(&fetched); phases_completed += 1; // Simulate Ctrl+C after N batches (deterministic cancellation) if cancel_after_batches.is_some_and(|limit| phases_completed >= limit) { signal.cancel(); } } (phases_completed, synced_iids) } // ─────────────────────────────────────────────────────────────────── // Test 1: Full runtime lifecycle — build, spawn, block_on, drop // ─────────────────────────────────────────────────────────────────── #[test] fn runtime_lifecycle_build_run_drop() { let rt = asupersync::runtime::RuntimeBuilder::new().build().unwrap(); let handle = rt.handle(); let counter = Arc::new(AtomicUsize::new(0)); let c = Arc::clone(&counter); // Spawn a background task via handle handle.spawn(async move { asupersync::time::sleep(asupersync::time::wall_now(), Duration::from_millis(5)).await; c.fetch_add(1, Ordering::Relaxed); }); // block_on drives the reactor let result = rt.block_on(async { asupersync::time::sleep(asupersync::time::wall_now(), Duration::from_millis(20)).await; 42 }); assert_eq!(result, 42, "block_on should return the async result"); assert_eq!( counter.load(Ordering::Relaxed), 1, "spawned task should complete during block_on" ); // rt drops here — no panics, no leaks } // ─────────────────────────────────────────────────────────────────── // Test 2: Cancel mid-flow, then resume — proves idempotent sync // ─────────────────────────────────────────────────────────────────── #[test] fn cancel_then_resume_completes_all_work() { let conn = setup_db(); let all_iids: Vec = (1..=15).collect(); let batch_size = 5; // 3 batches of 5 // Run 1: cancel after first batch (deterministic — no timing dependency) let (phases_r1, synced_r1) = run(async { let signal = ShutdownSignal::new(); run_sync_pipeline(&conn, &signal, 1, &all_iids, batch_size, Some(1)).await }); assert_eq!( phases_r1, 1, "exactly one batch should complete before cancel" ); assert_eq!( synced_r1.len(), batch_size, "only first batch should be synced" ); // Verify DB state matches what was reported let count_r1: i64 = conn .query_row( "SELECT COUNT(*) FROM issues WHERE project_id = 1", [], |row| row.get(0), ) .unwrap(); assert_eq!( count_r1, synced_r1.len() as i64, "DB count should match synced IIDs" ); // Run 2: resume — find remaining IIDs and sync them let already_synced: Vec = { let mut stmt = conn .prepare("SELECT iid FROM issues WHERE project_id = 1 ORDER BY iid") .unwrap(); stmt.query_map([], |row| row.get(0)) .unwrap() .map(|r| r.unwrap()) .collect() }; let remaining: Vec = all_iids .iter() .filter(|iid| !already_synced.contains(iid)) .copied() .collect(); assert!( !remaining.is_empty(), "there should be remaining work after cancellation" ); let (phases_r2, synced_r2) = run(async { let signal = ShutdownSignal::new(); // fresh signal, no cancel run_sync_pipeline(&conn, &signal, 1, &remaining, batch_size, None).await }); assert!( phases_r2 >= 1, "resume run should process remaining batches" ); // Verify ALL 15 issues are now in the DB let count_final: i64 = conn .query_row( "SELECT COUNT(*) FROM issues WHERE project_id = 1", [], |row| row.get(0), ) .unwrap(); assert_eq!( count_final, 15, "all 15 issues should be in DB after cancel + resume" ); // Verify no duplicates (INSERT OR IGNORE should prevent) let total_synced = synced_r1.len() + synced_r2.len(); assert_eq!( total_synced, 15, "combined synced count should equal total issues" ); } // ─────────────────────────────────────────────────────────────────── // Test 3: Structured tracing events fire with expected fields // ─────────────────────────────────────────────────────────────────── #[test] fn structured_tracing_captures_phase_transitions() { use tracing_subscriber::layer::SubscriberExt; // Collect tracing events into a shared buffer let events: Arc>> = Arc::new(Mutex::new(Vec::new())); let events_clone = Arc::clone(&events); let layer = tracing_subscriber::fmt::layer() .json() .with_writer(move || EventWriter(Arc::clone(&events_clone))) .with_target(false); let subscriber = tracing_subscriber::registry().with(layer); let _guard = tracing::subscriber::set_default(subscriber); run(async { let span = tracing::info_span!( "sync_pipeline", run_id = "test-run-001", project = "test/repo" ); let _enter = span.enter(); tracing::info!(phase = "fetch_issues", entity_count = 10, "phase started"); asupersync::time::sleep(asupersync::time::wall_now(), Duration::from_millis(1)).await; tracing::info!( phase = "fetch_issues", entity_count = 10, success = 8, skipped = 2, "phase completed" ); tracing::info!(phase = "cancelled", reason = "ctrl_c", "pipeline stopped"); }); let captured = events.lock().unwrap(); assert!( captured.len() >= 3, "should capture at least 3 tracing events, got {}", captured.len() ); // Verify structured fields are present in the JSON output let all_text = captured.join("\n"); assert!( all_text.contains("sync_pipeline"), "span name should appear in output" ); assert!( all_text.contains("test-run-001"), "run_id field should appear in output" ); assert!( all_text.contains("fetch_issues"), "phase field should appear in output" ); assert!( all_text.contains("ctrl_c"), "cancellation reason should appear in output" ); } /// A Write impl that appends each line to a shared Vec. struct EventWriter(Arc>>); impl std::io::Write for EventWriter { fn write(&mut self, buf: &[u8]) -> std::io::Result { if let Ok(s) = std::str::from_utf8(buf) { let trimmed = s.trim(); if !trimmed.is_empty() { self.0.lock().unwrap().push(trimmed.to_string()); } } Ok(buf.len()) } fn flush(&mut self) -> std::io::Result<()> { Ok(()) } } // ─────────────────────────────────────────────────────────────────── // Test 4: Concurrent fan-out under asupersync with timing proof // ─────────────────────────────────────────────────────────────────── #[test] fn concurrent_fanout_runs_in_parallel() { run(async { let start = Instant::now(); // 10 tasks each sleeping 50ms — should complete in ~50ms if parallel let futs: Vec<_> = (0..10) .map(|_| async { asupersync::time::sleep(asupersync::time::wall_now(), Duration::from_millis(50)) .await; }) .collect(); join_all(futs).await; let elapsed = start.elapsed(); // If sequential, this would take 500ms+. Parallel should be well under 200ms. assert!( elapsed < Duration::from_millis(200), "fan-out should run concurrently, took {:?}", elapsed ); }); } // ─────────────────────────────────────────────────────────────────── // Test 5: DB integrity after multiple runtime instantiations // ─────────────────────────────────────────────────────────────────── #[test] fn db_integrity_across_runtime_restarts() { let conn = setup_db(); // Run 1: insert issues 1-5 { let rt = asupersync::runtime::RuntimeBuilder::new().build().unwrap(); rt.block_on(async { let tx = conn.unchecked_transaction().unwrap(); for iid in 1..=5 { tx.execute( "INSERT INTO issues ( gitlab_id, project_id, iid, title, state, author_username, created_at, updated_at, last_seen_at ) VALUES (?1, 1, ?2, 'issue', 'opened', 'bot', 1000, 2000, 3000)", rusqlite::params![iid + 10000, iid], ) .unwrap(); } tx.commit().unwrap(); }); // rt drops here — runtime fully torn down } // Run 2: new runtime, verify data survives, insert more { let rt = asupersync::runtime::RuntimeBuilder::new().build().unwrap(); rt.block_on(async { let count: i64 = conn .query_row( "SELECT COUNT(*) FROM issues WHERE project_id = 1", [], |row| row.get(0), ) .unwrap(); assert_eq!(count, 5, "run 1 data should persist"); let tx = conn.unchecked_transaction().unwrap(); for iid in 6..=10 { tx.execute( "INSERT INTO issues ( gitlab_id, project_id, iid, title, state, author_username, created_at, updated_at, last_seen_at ) VALUES (?1, 1, ?2, 'issue', 'opened', 'bot', 1000, 2000, 3000)", rusqlite::params![iid + 10000, iid], ) .unwrap(); } tx.commit().unwrap(); }); } // Verify final state without a runtime (sync query) let count: i64 = conn .query_row( "SELECT COUNT(*) FROM issues WHERE project_id = 1", [], |row| row.get(0), ) .unwrap(); assert_eq!(count, 10, "both runs should have persisted correctly"); } // ═══════════════════════════════════════════════════════════════════ // Real HTTP pipeline tests — exercise GitLabClient -> ingest_issues // ═══════════════════════════════════════════════════════════════════ fn test_config() -> Config { Config { gitlab: GitLabConfig { base_url: "https://gitlab.example.com".to_string(), token_env_var: "GITLAB_TOKEN".to_string(), token: None, username: None, }, projects: vec![ProjectConfig { path: "group/project".to_string(), }], default_project: None, sync: SyncConfig::default(), storage: StorageConfig::default(), embedding: EmbeddingConfig::default(), logging: LoggingConfig::default(), scoring: ScoringConfig::default(), } } /// Build a GitLab-style issue JSON object with deterministic fields. fn make_issue_json(id: i64, iid: i64, updated_at: &str) -> serde_json::Value { json!({ "id": id, "iid": iid, "project_id": 100, "title": format!("Issue {iid}"), "description": format!("Description for issue {iid}"), "state": "opened", "created_at": "2024-01-01T00:00:00.000Z", "updated_at": updated_at, "closed_at": null, "author": { "id": 1, "username": "testbot", "name": "Test Bot" }, "assignees": [], "labels": ["backend"], "milestone": null, "due_date": null, "web_url": format!("https://git.example.com/test/repo/-/issues/{iid}") }) } /// Drain HTTP request bytes from a TCP stream until the end-of-headers marker. fn drain_http_request(stream: &mut std::net::TcpStream) { let mut buf = [0u8; 8192]; let mut accumulated = Vec::new(); loop { let n = stream.read(&mut buf).unwrap(); if n == 0 { break; } accumulated.extend_from_slice(&buf[..n]); if accumulated.windows(4).any(|w| w == b"\r\n\r\n") { break; } } } /// Write an HTTP response with the given status, headers, and JSON body. fn write_http_response( stream: &mut std::net::TcpStream, status: u16, reason: &str, extra_headers: &[(&str, &str)], body: &str, ) { let mut header_block = format!( "HTTP/1.1 {status} {reason}\r\n\ Content-Type: application/json\r\n\ Content-Length: {}\r\n", body.len() ); for (k, v) in extra_headers { header_block.push_str(&format!("{k}: {v}\r\n")); } header_block.push_str("Connection: close\r\n\r\n"); stream.write_all(header_block.as_bytes()).unwrap(); stream.write_all(body.as_bytes()).unwrap(); stream.flush().unwrap(); } /// Spin up a mock GitLab server that handles `request_count` sequential HTTP /// connections. Each connection is handled by `handler_fn(connection_index, stream)`. /// Returns the `http://127.0.0.1:{port}` base URL. fn mock_gitlab_server(request_count: usize, handler_fn: F) -> String where F: Fn(usize, &mut std::net::TcpStream) + Send + 'static, { let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); let port = listener.local_addr().unwrap().port(); std::thread::spawn(move || { for i in 0..request_count { let Ok((mut stream, _)) = listener.accept() else { break; }; handler_fn(i, &mut stream); } }); format!("http://127.0.0.1:{port}") } // ─────────────────────────────────────────────────────────────────── // Test 6: Full HTTP ingestion pipeline — paginated issues via mock // ─────────────────────────────────────────────────────────────────── #[test] fn http_pipeline_ingest_issues_paginated() { // Serve 2 pages of issues: page 1 has 3, page 2 has 2 (no x-next-page = done). let base = mock_gitlab_server(2, |i, stream| { drain_http_request(stream); match i { 0 => { let body = serde_json::to_string(&vec![ make_issue_json(1001, 1, "2024-06-01T00:00:00.000Z"), make_issue_json(1002, 2, "2024-06-02T00:00:00.000Z"), make_issue_json(1003, 3, "2024-06-03T00:00:00.000Z"), ]) .unwrap(); write_http_response(stream, 200, "OK", &[("x-next-page", "2")], &body); } _ => { let body = serde_json::to_string(&vec![ make_issue_json(1004, 4, "2024-06-04T00:00:00.000Z"), make_issue_json(1005, 5, "2024-06-05T00:00:00.000Z"), ]) .unwrap(); write_http_response(stream, 200, "OK", &[], &body); } } }); let conn = setup_db(); let config = test_config(); run(async { let client = GitLabClient::new(&base, "test-token", Some(1000.0)); let signal = ShutdownSignal::new(); let result = ingest_issues(&conn, &client, &config, 1, 100, &signal) .await .unwrap(); assert_eq!( result.fetched, 5, "should fetch all 5 issues across 2 pages" ); assert_eq!(result.upserted, 5, "should upsert all 5 issues"); let db_count: i64 = conn .query_row( "SELECT COUNT(*) FROM issues WHERE project_id = 1", [], |row| row.get(0), ) .unwrap(); assert_eq!(db_count, 5, "DB should contain 5 issues"); // Verify sync cursor was persisted let cursor_ts: i64 = conn .query_row( "SELECT updated_at_cursor FROM sync_cursors WHERE project_id = 1 AND resource_type = 'issues'", [], |row| row.get(0), ) .unwrap(); assert!(cursor_ts > 0, "sync cursor should be set after ingestion"); // Verify labels created let label_count: i64 = conn .query_row( "SELECT COUNT(*) FROM labels WHERE project_id = 1 AND name = 'backend'", [], |row| row.get(0), ) .unwrap(); assert_eq!(label_count, 1, "backend label should exist"); let summary = json!({ "test": "http_pipeline_ingest_issues_paginated", "fetched": result.fetched, "upserted": result.upserted, "labels_created": result.labels_created, "db_count": db_count, "cursor_set": cursor_ts > 0, }); eprintln!("E2E_SUMMARY: {}", serde_json::to_string(&summary).unwrap()); }); } // ─────────────────────────────────────────────────────────────────── // Test 7: Cancellation preserves DB integrity during real ingestion // ─────────────────────────────────────────────────────────────────── #[test] fn http_pipeline_cancel_preserves_integrity() { let requests_served = Arc::new(AtomicUsize::new(0)); let requests_clone = Arc::clone(&requests_served); // Serve 2 pages. The test cancels the signal immediately, so the stream // should stop early. Whatever was committed must be consistent. let base = mock_gitlab_server(2, move |i, stream| { drain_http_request(stream); requests_clone.fetch_add(1, Ordering::Relaxed); match i { 0 => { let body = serde_json::to_string(&vec![ make_issue_json(2001, 10, "2024-07-01T00:00:00.000Z"), make_issue_json(2002, 11, "2024-07-02T00:00:00.000Z"), make_issue_json(2003, 12, "2024-07-03T00:00:00.000Z"), ]) .unwrap(); write_http_response(stream, 200, "OK", &[("x-next-page", "2")], &body); } _ => { let body = serde_json::to_string(&vec![ make_issue_json(2004, 13, "2024-07-04T00:00:00.000Z"), make_issue_json(2005, 14, "2024-07-05T00:00:00.000Z"), ]) .unwrap(); write_http_response(stream, 200, "OK", &[], &body); } } }); let conn = setup_db(); let config = test_config(); run(async { let client = GitLabClient::new(&base, "test-token", Some(1000.0)); let signal = ShutdownSignal::new(); // Cancel immediately — signal is checked between each streamed issue signal.cancel(); let result = ingest_issues(&conn, &client, &config, 1, 100, &signal) .await .unwrap(); // Key invariant: DB count must match reported upsert count let db_count: i64 = conn .query_row( "SELECT COUNT(*) FROM issues WHERE project_id = 1", [], |row| row.get(0), ) .unwrap(); assert_eq!( db_count as usize, result.upserted, "DB count must equal upserted count (no partial data)" ); // If anything was upserted, cursor must exist if result.upserted > 0 { let cursor_exists: bool = conn .query_row( "SELECT COUNT(*) > 0 FROM sync_cursors WHERE project_id = 1 AND resource_type = 'issues'", [], |row| row.get(0), ) .unwrap(); assert!( cursor_exists, "cursor should exist when items were upserted" ); } let summary = json!({ "test": "http_pipeline_cancel_preserves_integrity", "fetched": result.fetched, "upserted": result.upserted, "db_count": db_count, "integrity_ok": db_count as usize == result.upserted, }); eprintln!("E2E_SUMMARY: {}", serde_json::to_string(&summary).unwrap()); }); } // ─────────────────────────────────────────────────────────────────── // Test 8: Resume via cursor — second run deduplicates correctly // ─────────────────────────────────────────────────────────────────── #[test] fn http_pipeline_resume_via_cursor() { let conn = setup_db(); let config = test_config(); // --- Run 1: Ingest 3 issues --- let base1 = mock_gitlab_server(1, |_i, stream| { drain_http_request(stream); let body = serde_json::to_string(&vec![ make_issue_json(3001, 20, "2024-08-01T00:00:00.000Z"), make_issue_json(3002, 21, "2024-08-02T00:00:00.000Z"), make_issue_json(3003, 22, "2024-08-03T00:00:00.000Z"), ]) .unwrap(); write_http_response(stream, 200, "OK", &[], &body); }); let run1 = run(async { let client = GitLabClient::new(&base1, "test-token", Some(1000.0)); let signal = ShutdownSignal::new(); ingest_issues(&conn, &client, &config, 1, 100, &signal) .await .unwrap() }); assert_eq!(run1.upserted, 3, "run 1 should upsert 3 issues"); // --- Run 2: Same 3 + 2 new issues. Cursor should skip the first 3. --- let base2 = mock_gitlab_server(1, |_i, stream| { drain_http_request(stream); let body = serde_json::to_string(&vec![ make_issue_json(3001, 20, "2024-08-01T00:00:00.000Z"), make_issue_json(3002, 21, "2024-08-02T00:00:00.000Z"), make_issue_json(3003, 22, "2024-08-03T00:00:00.000Z"), make_issue_json(3004, 23, "2024-08-04T00:00:00.000Z"), make_issue_json(3005, 24, "2024-08-05T00:00:00.000Z"), ]) .unwrap(); write_http_response(stream, 200, "OK", &[], &body); }); let run2 = run(async { let client = GitLabClient::new(&base2, "test-token", Some(1000.0)); let signal = ShutdownSignal::new(); ingest_issues(&conn, &client, &config, 1, 100, &signal) .await .unwrap() }); assert_eq!(run2.fetched, 5, "run 2 should fetch all 5 from API"); assert_eq!(run2.upserted, 2, "run 2 should only upsert 2 new issues"); let total: i64 = conn .query_row( "SELECT COUNT(*) FROM issues WHERE project_id = 1", [], |row| row.get(0), ) .unwrap(); assert_eq!(total, 5, "DB should have 5 total issues after resume"); let distinct: i64 = conn .query_row( "SELECT COUNT(DISTINCT iid) FROM issues WHERE project_id = 1", [], |row| row.get(0), ) .unwrap(); assert_eq!(distinct, 5, "no duplicate IIDs"); let summary = json!({ "test": "http_pipeline_resume_via_cursor", "run1_upserted": run1.upserted, "run2_fetched": run2.fetched, "run2_upserted": run2.upserted, "total": total, "no_duplicates": distinct == 5, }); eprintln!("E2E_SUMMARY: {}", serde_json::to_string(&summary).unwrap()); } // ─────────────────────────────────────────────────────────────────── // Test 9: Runtime quiescence — no leaked tasks after real ingestion // ─────────────────────────────────────────────────────────────────── #[test] fn http_pipeline_runtime_quiescence() { let base = mock_gitlab_server(1, |_i, stream| { drain_http_request(stream); let body = serde_json::to_string(&vec![ make_issue_json(4001, 30, "2024-09-01T00:00:00.000Z"), make_issue_json(4002, 31, "2024-09-02T00:00:00.000Z"), ]) .unwrap(); write_http_response(stream, 200, "OK", &[], &body); }); let conn = setup_db(); let config = test_config(); let rt = asupersync::runtime::RuntimeBuilder::new().build().unwrap(); let result = rt.block_on(async { let client = GitLabClient::new(&base, "test-token", Some(1000.0)); let signal = ShutdownSignal::new(); ingest_issues(&conn, &client, &config, 1, 100, &signal) .await .unwrap() }); // Runtime drops cleanly — if tasks leaked, this would hang or panic drop(rt); assert_eq!(result.upserted, 2); let count: i64 = conn .query_row( "SELECT COUNT(*) FROM issues WHERE project_id = 1", [], |row| row.get(0), ) .unwrap(); assert_eq!(count, 2, "data committed before runtime drop"); let summary = json!({ "test": "http_pipeline_runtime_quiescence", "upserted": result.upserted, "db_count": count, "runtime_dropped_cleanly": true, }); eprintln!("E2E_SUMMARY: {}", serde_json::to_string(&summary).unwrap()); }