Files
gitlore/tests/asupersync_e2e.rs
teernisse af167e2086 test(asupersync): add cancellation, parity, and E2E acceptance tests
- Add 7 cancellation integration tests (ShutdownSignal, transaction rollback)
- Add 7 HTTP behavior parity tests (redirect, proxy, keep-alive, DNS, TLS)
- Add 9 E2E runtime acceptance tests (lifecycle, cancel+resume, tracing, HTTP pipeline)
- Total: 1190 tests, all passing

Phases 4-5 of asupersync migration.
2026-03-06 16:09:41 -05:00

852 lines
30 KiB
Rust

//! E2E runtime acceptance tests for the asupersync migration.
//!
//! Proves the full runtime lifecycle works end-to-end:
//! 1. RuntimeBuilder creates a working runtime with IO and timers
//! 2. ShutdownSignal cancellation mid-flow stops processing cleanly
//! 3. Resume: second run picks up where the cancelled first run left off
//! 4. Structured tracing events fire with expected fields
//! 5. No DB corruption across cancel + resume cycle
//! 6. Real HTTP ingestion pipeline (GitLabClient -> pagination -> DB writes)
use std::io::{Read, Write};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use futures::future::join_all;
use rusqlite::Connection;
use serde_json::json;
use lore::core::config::{
Config, EmbeddingConfig, GitLabConfig, LoggingConfig, ProjectConfig, ScoringConfig,
StorageConfig, SyncConfig,
};
use lore::core::db::{create_connection, run_migrations};
use lore::core::shutdown::ShutdownSignal;
use lore::gitlab::GitLabClient;
use lore::ingestion::ingest_issues;
fn run<F: std::future::Future<Output = T>, T>(f: F) -> T {
asupersync::runtime::RuntimeBuilder::new()
.build()
.unwrap()
.block_on(f)
}
fn setup_db() -> Connection {
let conn = create_connection(std::path::Path::new(":memory:")).unwrap();
run_migrations(&conn).unwrap();
conn.execute(
"INSERT INTO projects (id, gitlab_project_id, path_with_namespace, web_url)
VALUES (1, 100, 'test/repo', 'https://git.example.com/test/repo')",
[],
)
.unwrap();
conn
}
/// Simulates a multi-phase sync pipeline. Returns the number of phases completed
/// and the IIDs that were "synced" (written to DB).
///
/// If `cancel_after_batches` is `Some(n)`, the signal is cancelled after `n`
/// batches complete, causing the loop to exit before processing subsequent chunks.
async fn run_sync_pipeline(
conn: &Connection,
signal: &ShutdownSignal,
project_id: i64,
all_iids: &[i64],
batch_size: usize,
cancel_after_batches: Option<usize>,
) -> (usize, Vec<i64>) {
let mut phases_completed = 0usize;
let mut synced_iids = Vec::new();
for chunk in all_iids.chunks(batch_size) {
if signal.is_cancelled() {
break;
}
// Phase: concurrent fetch (simulated)
let fetched: Vec<i64> = {
let futs: Vec<_> = chunk
.iter()
.map(|&iid| async move {
asupersync::time::sleep(asupersync::time::wall_now(), Duration::from_millis(1))
.await;
iid
})
.collect();
join_all(futs).await
};
if signal.is_cancelled() {
break;
}
// Phase: DB write (transactional)
let tx = conn.unchecked_transaction().unwrap();
for &iid in &fetched {
tx.execute(
"INSERT OR IGNORE INTO issues (
gitlab_id, project_id, iid, title, state,
author_username, created_at, updated_at, last_seen_at
) VALUES (?1, ?2, ?3, 'issue', 'opened', 'bot', 1000, 2000, 3000)",
rusqlite::params![iid + 10000, project_id, iid],
)
.unwrap();
}
tx.commit().unwrap();
synced_iids.extend(&fetched);
phases_completed += 1;
// Simulate Ctrl+C after N batches (deterministic cancellation)
if cancel_after_batches.is_some_and(|limit| phases_completed >= limit) {
signal.cancel();
}
}
(phases_completed, synced_iids)
}
// ───────────────────────────────────────────────────────────────────
// Test 1: Full runtime lifecycle — build, spawn, block_on, drop
// ───────────────────────────────────────────────────────────────────
#[test]
fn runtime_lifecycle_build_run_drop() {
let rt = asupersync::runtime::RuntimeBuilder::new().build().unwrap();
let handle = rt.handle();
let counter = Arc::new(AtomicUsize::new(0));
let c = Arc::clone(&counter);
// Spawn a background task via handle
handle.spawn(async move {
asupersync::time::sleep(asupersync::time::wall_now(), Duration::from_millis(5)).await;
c.fetch_add(1, Ordering::Relaxed);
});
// block_on drives the reactor
let result = rt.block_on(async {
asupersync::time::sleep(asupersync::time::wall_now(), Duration::from_millis(20)).await;
42
});
assert_eq!(result, 42, "block_on should return the async result");
assert_eq!(
counter.load(Ordering::Relaxed),
1,
"spawned task should complete during block_on"
);
// rt drops here — no panics, no leaks
}
// ───────────────────────────────────────────────────────────────────
// Test 2: Cancel mid-flow, then resume — proves idempotent sync
// ───────────────────────────────────────────────────────────────────
#[test]
fn cancel_then_resume_completes_all_work() {
let conn = setup_db();
let all_iids: Vec<i64> = (1..=15).collect();
let batch_size = 5; // 3 batches of 5
// Run 1: cancel after first batch (deterministic — no timing dependency)
let (phases_r1, synced_r1) = run(async {
let signal = ShutdownSignal::new();
run_sync_pipeline(&conn, &signal, 1, &all_iids, batch_size, Some(1)).await
});
assert_eq!(
phases_r1, 1,
"exactly one batch should complete before cancel"
);
assert_eq!(
synced_r1.len(),
batch_size,
"only first batch should be synced"
);
// Verify DB state matches what was reported
let count_r1: i64 = conn
.query_row(
"SELECT COUNT(*) FROM issues WHERE project_id = 1",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
count_r1,
synced_r1.len() as i64,
"DB count should match synced IIDs"
);
// Run 2: resume — find remaining IIDs and sync them
let already_synced: Vec<i64> = {
let mut stmt = conn
.prepare("SELECT iid FROM issues WHERE project_id = 1 ORDER BY iid")
.unwrap();
stmt.query_map([], |row| row.get(0))
.unwrap()
.map(|r| r.unwrap())
.collect()
};
let remaining: Vec<i64> = all_iids
.iter()
.filter(|iid| !already_synced.contains(iid))
.copied()
.collect();
assert!(
!remaining.is_empty(),
"there should be remaining work after cancellation"
);
let (phases_r2, synced_r2) = run(async {
let signal = ShutdownSignal::new(); // fresh signal, no cancel
run_sync_pipeline(&conn, &signal, 1, &remaining, batch_size, None).await
});
assert!(
phases_r2 >= 1,
"resume run should process remaining batches"
);
// Verify ALL 15 issues are now in the DB
let count_final: i64 = conn
.query_row(
"SELECT COUNT(*) FROM issues WHERE project_id = 1",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
count_final, 15,
"all 15 issues should be in DB after cancel + resume"
);
// Verify no duplicates (INSERT OR IGNORE should prevent)
let total_synced = synced_r1.len() + synced_r2.len();
assert_eq!(
total_synced, 15,
"combined synced count should equal total issues"
);
}
// ───────────────────────────────────────────────────────────────────
// Test 3: Structured tracing events fire with expected fields
// ───────────────────────────────────────────────────────────────────
#[test]
fn structured_tracing_captures_phase_transitions() {
use tracing_subscriber::layer::SubscriberExt;
// Collect tracing events into a shared buffer
let events: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let events_clone = Arc::clone(&events);
let layer = tracing_subscriber::fmt::layer()
.json()
.with_writer(move || EventWriter(Arc::clone(&events_clone)))
.with_target(false);
let subscriber = tracing_subscriber::registry().with(layer);
let _guard = tracing::subscriber::set_default(subscriber);
run(async {
let span = tracing::info_span!(
"sync_pipeline",
run_id = "test-run-001",
project = "test/repo"
);
let _enter = span.enter();
tracing::info!(phase = "fetch_issues", entity_count = 10, "phase started");
asupersync::time::sleep(asupersync::time::wall_now(), Duration::from_millis(1)).await;
tracing::info!(
phase = "fetch_issues",
entity_count = 10,
success = 8,
skipped = 2,
"phase completed"
);
tracing::info!(phase = "cancelled", reason = "ctrl_c", "pipeline stopped");
});
let captured = events.lock().unwrap();
assert!(
captured.len() >= 3,
"should capture at least 3 tracing events, got {}",
captured.len()
);
// Verify structured fields are present in the JSON output
let all_text = captured.join("\n");
assert!(
all_text.contains("sync_pipeline"),
"span name should appear in output"
);
assert!(
all_text.contains("test-run-001"),
"run_id field should appear in output"
);
assert!(
all_text.contains("fetch_issues"),
"phase field should appear in output"
);
assert!(
all_text.contains("ctrl_c"),
"cancellation reason should appear in output"
);
}
/// A Write impl that appends each line to a shared Vec.
struct EventWriter(Arc<Mutex<Vec<String>>>);
impl std::io::Write for EventWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
if let Ok(s) = std::str::from_utf8(buf) {
let trimmed = s.trim();
if !trimmed.is_empty() {
self.0.lock().unwrap().push(trimmed.to_string());
}
}
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
// ───────────────────────────────────────────────────────────────────
// Test 4: Concurrent fan-out under asupersync with timing proof
// ───────────────────────────────────────────────────────────────────
#[test]
fn concurrent_fanout_runs_in_parallel() {
run(async {
let start = Instant::now();
// 10 tasks each sleeping 50ms — should complete in ~50ms if parallel
let futs: Vec<_> = (0..10)
.map(|_| async {
asupersync::time::sleep(asupersync::time::wall_now(), Duration::from_millis(50))
.await;
})
.collect();
join_all(futs).await;
let elapsed = start.elapsed();
// If sequential, this would take 500ms+. Parallel should be well under 200ms.
assert!(
elapsed < Duration::from_millis(200),
"fan-out should run concurrently, took {:?}",
elapsed
);
});
}
// ───────────────────────────────────────────────────────────────────
// Test 5: DB integrity after multiple runtime instantiations
// ───────────────────────────────────────────────────────────────────
#[test]
fn db_integrity_across_runtime_restarts() {
let conn = setup_db();
// Run 1: insert issues 1-5
{
let rt = asupersync::runtime::RuntimeBuilder::new().build().unwrap();
rt.block_on(async {
let tx = conn.unchecked_transaction().unwrap();
for iid in 1..=5 {
tx.execute(
"INSERT INTO issues (
gitlab_id, project_id, iid, title, state,
author_username, created_at, updated_at, last_seen_at
) VALUES (?1, 1, ?2, 'issue', 'opened', 'bot', 1000, 2000, 3000)",
rusqlite::params![iid + 10000, iid],
)
.unwrap();
}
tx.commit().unwrap();
});
// rt drops here — runtime fully torn down
}
// Run 2: new runtime, verify data survives, insert more
{
let rt = asupersync::runtime::RuntimeBuilder::new().build().unwrap();
rt.block_on(async {
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM issues WHERE project_id = 1",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(count, 5, "run 1 data should persist");
let tx = conn.unchecked_transaction().unwrap();
for iid in 6..=10 {
tx.execute(
"INSERT INTO issues (
gitlab_id, project_id, iid, title, state,
author_username, created_at, updated_at, last_seen_at
) VALUES (?1, 1, ?2, 'issue', 'opened', 'bot', 1000, 2000, 3000)",
rusqlite::params![iid + 10000, iid],
)
.unwrap();
}
tx.commit().unwrap();
});
}
// Verify final state without a runtime (sync query)
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM issues WHERE project_id = 1",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(count, 10, "both runs should have persisted correctly");
}
// ═══════════════════════════════════════════════════════════════════
// Real HTTP pipeline tests — exercise GitLabClient -> ingest_issues
// ═══════════════════════════════════════════════════════════════════
fn test_config() -> Config {
Config {
gitlab: GitLabConfig {
base_url: "https://gitlab.example.com".to_string(),
token_env_var: "GITLAB_TOKEN".to_string(),
token: None,
username: None,
},
projects: vec![ProjectConfig {
path: "group/project".to_string(),
}],
default_project: None,
sync: SyncConfig::default(),
storage: StorageConfig::default(),
embedding: EmbeddingConfig::default(),
logging: LoggingConfig::default(),
scoring: ScoringConfig::default(),
}
}
/// Build a GitLab-style issue JSON object with deterministic fields.
fn make_issue_json(id: i64, iid: i64, updated_at: &str) -> serde_json::Value {
json!({
"id": id,
"iid": iid,
"project_id": 100,
"title": format!("Issue {iid}"),
"description": format!("Description for issue {iid}"),
"state": "opened",
"created_at": "2024-01-01T00:00:00.000Z",
"updated_at": updated_at,
"closed_at": null,
"author": {
"id": 1,
"username": "testbot",
"name": "Test Bot"
},
"assignees": [],
"labels": ["backend"],
"milestone": null,
"due_date": null,
"web_url": format!("https://git.example.com/test/repo/-/issues/{iid}")
})
}
/// Drain HTTP request bytes from a TCP stream until the end-of-headers marker.
fn drain_http_request(stream: &mut std::net::TcpStream) {
let mut buf = [0u8; 8192];
let mut accumulated = Vec::new();
loop {
let n = stream.read(&mut buf).unwrap();
if n == 0 {
break;
}
accumulated.extend_from_slice(&buf[..n]);
if accumulated.windows(4).any(|w| w == b"\r\n\r\n") {
break;
}
}
}
/// Write an HTTP response with the given status, headers, and JSON body.
fn write_http_response(
stream: &mut std::net::TcpStream,
status: u16,
reason: &str,
extra_headers: &[(&str, &str)],
body: &str,
) {
let mut header_block = format!(
"HTTP/1.1 {status} {reason}\r\n\
Content-Type: application/json\r\n\
Content-Length: {}\r\n",
body.len()
);
for (k, v) in extra_headers {
header_block.push_str(&format!("{k}: {v}\r\n"));
}
header_block.push_str("Connection: close\r\n\r\n");
stream.write_all(header_block.as_bytes()).unwrap();
stream.write_all(body.as_bytes()).unwrap();
stream.flush().unwrap();
}
/// Spin up a mock GitLab server that handles `request_count` sequential HTTP
/// connections. Each connection is handled by `handler_fn(connection_index, stream)`.
/// Returns the `http://127.0.0.1:{port}` base URL.
fn mock_gitlab_server<F>(request_count: usize, handler_fn: F) -> String
where
F: Fn(usize, &mut std::net::TcpStream) + Send + 'static,
{
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let port = listener.local_addr().unwrap().port();
std::thread::spawn(move || {
for i in 0..request_count {
let Ok((mut stream, _)) = listener.accept() else {
break;
};
handler_fn(i, &mut stream);
}
});
format!("http://127.0.0.1:{port}")
}
// ───────────────────────────────────────────────────────────────────
// Test 6: Full HTTP ingestion pipeline — paginated issues via mock
// ───────────────────────────────────────────────────────────────────
#[test]
fn http_pipeline_ingest_issues_paginated() {
// Serve 2 pages of issues: page 1 has 3, page 2 has 2 (no x-next-page = done).
let base = mock_gitlab_server(2, |i, stream| {
drain_http_request(stream);
match i {
0 => {
let body = serde_json::to_string(&vec![
make_issue_json(1001, 1, "2024-06-01T00:00:00.000Z"),
make_issue_json(1002, 2, "2024-06-02T00:00:00.000Z"),
make_issue_json(1003, 3, "2024-06-03T00:00:00.000Z"),
])
.unwrap();
write_http_response(stream, 200, "OK", &[("x-next-page", "2")], &body);
}
_ => {
let body = serde_json::to_string(&vec![
make_issue_json(1004, 4, "2024-06-04T00:00:00.000Z"),
make_issue_json(1005, 5, "2024-06-05T00:00:00.000Z"),
])
.unwrap();
write_http_response(stream, 200, "OK", &[], &body);
}
}
});
let conn = setup_db();
let config = test_config();
run(async {
let client = GitLabClient::new(&base, "test-token", Some(1000.0));
let signal = ShutdownSignal::new();
let result = ingest_issues(&conn, &client, &config, 1, 100, &signal)
.await
.unwrap();
assert_eq!(
result.fetched, 5,
"should fetch all 5 issues across 2 pages"
);
assert_eq!(result.upserted, 5, "should upsert all 5 issues");
let db_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM issues WHERE project_id = 1",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(db_count, 5, "DB should contain 5 issues");
// Verify sync cursor was persisted
let cursor_ts: i64 = conn
.query_row(
"SELECT updated_at_cursor FROM sync_cursors
WHERE project_id = 1 AND resource_type = 'issues'",
[],
|row| row.get(0),
)
.unwrap();
assert!(cursor_ts > 0, "sync cursor should be set after ingestion");
// Verify labels created
let label_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM labels WHERE project_id = 1 AND name = 'backend'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(label_count, 1, "backend label should exist");
let summary = json!({
"test": "http_pipeline_ingest_issues_paginated",
"fetched": result.fetched,
"upserted": result.upserted,
"labels_created": result.labels_created,
"db_count": db_count,
"cursor_set": cursor_ts > 0,
});
eprintln!("E2E_SUMMARY: {}", serde_json::to_string(&summary).unwrap());
});
}
// ───────────────────────────────────────────────────────────────────
// Test 7: Cancellation preserves DB integrity during real ingestion
// ───────────────────────────────────────────────────────────────────
#[test]
fn http_pipeline_cancel_preserves_integrity() {
let requests_served = Arc::new(AtomicUsize::new(0));
let requests_clone = Arc::clone(&requests_served);
// Serve 2 pages. The test cancels the signal immediately, so the stream
// should stop early. Whatever was committed must be consistent.
let base = mock_gitlab_server(2, move |i, stream| {
drain_http_request(stream);
requests_clone.fetch_add(1, Ordering::Relaxed);
match i {
0 => {
let body = serde_json::to_string(&vec![
make_issue_json(2001, 10, "2024-07-01T00:00:00.000Z"),
make_issue_json(2002, 11, "2024-07-02T00:00:00.000Z"),
make_issue_json(2003, 12, "2024-07-03T00:00:00.000Z"),
])
.unwrap();
write_http_response(stream, 200, "OK", &[("x-next-page", "2")], &body);
}
_ => {
let body = serde_json::to_string(&vec![
make_issue_json(2004, 13, "2024-07-04T00:00:00.000Z"),
make_issue_json(2005, 14, "2024-07-05T00:00:00.000Z"),
])
.unwrap();
write_http_response(stream, 200, "OK", &[], &body);
}
}
});
let conn = setup_db();
let config = test_config();
run(async {
let client = GitLabClient::new(&base, "test-token", Some(1000.0));
let signal = ShutdownSignal::new();
// Cancel immediately — signal is checked between each streamed issue
signal.cancel();
let result = ingest_issues(&conn, &client, &config, 1, 100, &signal)
.await
.unwrap();
// Key invariant: DB count must match reported upsert count
let db_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM issues WHERE project_id = 1",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
db_count as usize, result.upserted,
"DB count must equal upserted count (no partial data)"
);
// If anything was upserted, cursor must exist
if result.upserted > 0 {
let cursor_exists: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM sync_cursors
WHERE project_id = 1 AND resource_type = 'issues'",
[],
|row| row.get(0),
)
.unwrap();
assert!(
cursor_exists,
"cursor should exist when items were upserted"
);
}
let summary = json!({
"test": "http_pipeline_cancel_preserves_integrity",
"fetched": result.fetched,
"upserted": result.upserted,
"db_count": db_count,
"integrity_ok": db_count as usize == result.upserted,
});
eprintln!("E2E_SUMMARY: {}", serde_json::to_string(&summary).unwrap());
});
}
// ───────────────────────────────────────────────────────────────────
// Test 8: Resume via cursor — second run deduplicates correctly
// ───────────────────────────────────────────────────────────────────
#[test]
fn http_pipeline_resume_via_cursor() {
let conn = setup_db();
let config = test_config();
// --- Run 1: Ingest 3 issues ---
let base1 = mock_gitlab_server(1, |_i, stream| {
drain_http_request(stream);
let body = serde_json::to_string(&vec![
make_issue_json(3001, 20, "2024-08-01T00:00:00.000Z"),
make_issue_json(3002, 21, "2024-08-02T00:00:00.000Z"),
make_issue_json(3003, 22, "2024-08-03T00:00:00.000Z"),
])
.unwrap();
write_http_response(stream, 200, "OK", &[], &body);
});
let run1 = run(async {
let client = GitLabClient::new(&base1, "test-token", Some(1000.0));
let signal = ShutdownSignal::new();
ingest_issues(&conn, &client, &config, 1, 100, &signal)
.await
.unwrap()
});
assert_eq!(run1.upserted, 3, "run 1 should upsert 3 issues");
// --- Run 2: Same 3 + 2 new issues. Cursor should skip the first 3. ---
let base2 = mock_gitlab_server(1, |_i, stream| {
drain_http_request(stream);
let body = serde_json::to_string(&vec![
make_issue_json(3001, 20, "2024-08-01T00:00:00.000Z"),
make_issue_json(3002, 21, "2024-08-02T00:00:00.000Z"),
make_issue_json(3003, 22, "2024-08-03T00:00:00.000Z"),
make_issue_json(3004, 23, "2024-08-04T00:00:00.000Z"),
make_issue_json(3005, 24, "2024-08-05T00:00:00.000Z"),
])
.unwrap();
write_http_response(stream, 200, "OK", &[], &body);
});
let run2 = run(async {
let client = GitLabClient::new(&base2, "test-token", Some(1000.0));
let signal = ShutdownSignal::new();
ingest_issues(&conn, &client, &config, 1, 100, &signal)
.await
.unwrap()
});
assert_eq!(run2.fetched, 5, "run 2 should fetch all 5 from API");
assert_eq!(run2.upserted, 2, "run 2 should only upsert 2 new issues");
let total: i64 = conn
.query_row(
"SELECT COUNT(*) FROM issues WHERE project_id = 1",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(total, 5, "DB should have 5 total issues after resume");
let distinct: i64 = conn
.query_row(
"SELECT COUNT(DISTINCT iid) FROM issues WHERE project_id = 1",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(distinct, 5, "no duplicate IIDs");
let summary = json!({
"test": "http_pipeline_resume_via_cursor",
"run1_upserted": run1.upserted,
"run2_fetched": run2.fetched,
"run2_upserted": run2.upserted,
"total": total,
"no_duplicates": distinct == 5,
});
eprintln!("E2E_SUMMARY: {}", serde_json::to_string(&summary).unwrap());
}
// ───────────────────────────────────────────────────────────────────
// Test 9: Runtime quiescence — no leaked tasks after real ingestion
// ───────────────────────────────────────────────────────────────────
#[test]
fn http_pipeline_runtime_quiescence() {
let base = mock_gitlab_server(1, |_i, stream| {
drain_http_request(stream);
let body = serde_json::to_string(&vec![
make_issue_json(4001, 30, "2024-09-01T00:00:00.000Z"),
make_issue_json(4002, 31, "2024-09-02T00:00:00.000Z"),
])
.unwrap();
write_http_response(stream, 200, "OK", &[], &body);
});
let conn = setup_db();
let config = test_config();
let rt = asupersync::runtime::RuntimeBuilder::new().build().unwrap();
let result = rt.block_on(async {
let client = GitLabClient::new(&base, "test-token", Some(1000.0));
let signal = ShutdownSignal::new();
ingest_issues(&conn, &client, &config, 1, 100, &signal)
.await
.unwrap()
});
// Runtime drops cleanly — if tasks leaked, this would hang or panic
drop(rt);
assert_eq!(result.upserted, 2);
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM issues WHERE project_id = 1",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(count, 2, "data committed before runtime drop");
let summary = json!({
"test": "http_pipeline_runtime_quiescence",
"upserted": result.upserted,
"db_count": count,
"runtime_dropped_cleanly": true,
});
eprintln!("E2E_SUMMARY: {}", serde_json::to_string(&summary).unwrap());
}