- Add 7 cancellation integration tests (ShutdownSignal, transaction rollback) - Add 7 HTTP behavior parity tests (redirect, proxy, keep-alive, DNS, TLS) - Add 9 E2E runtime acceptance tests (lifecycle, cancel+resume, tracing, HTTP pipeline) - Total: 1190 tests, all passing Phases 4-5 of asupersync migration.
852 lines
30 KiB
Rust
852 lines
30 KiB
Rust
//! E2E runtime acceptance tests for the asupersync migration.
|
|
//!
|
|
//! Proves the full runtime lifecycle works end-to-end:
|
|
//! 1. RuntimeBuilder creates a working runtime with IO and timers
|
|
//! 2. ShutdownSignal cancellation mid-flow stops processing cleanly
|
|
//! 3. Resume: second run picks up where the cancelled first run left off
|
|
//! 4. Structured tracing events fire with expected fields
|
|
//! 5. No DB corruption across cancel + resume cycle
|
|
//! 6. Real HTTP ingestion pipeline (GitLabClient -> pagination -> DB writes)
|
|
|
|
use std::io::{Read, Write};
|
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
|
use std::sync::{Arc, Mutex};
|
|
use std::time::{Duration, Instant};
|
|
|
|
use futures::future::join_all;
|
|
use rusqlite::Connection;
|
|
use serde_json::json;
|
|
|
|
use lore::core::config::{
|
|
Config, EmbeddingConfig, GitLabConfig, LoggingConfig, ProjectConfig, ScoringConfig,
|
|
StorageConfig, SyncConfig,
|
|
};
|
|
use lore::core::db::{create_connection, run_migrations};
|
|
use lore::core::shutdown::ShutdownSignal;
|
|
use lore::gitlab::GitLabClient;
|
|
use lore::ingestion::ingest_issues;
|
|
|
|
fn run<F: std::future::Future<Output = T>, T>(f: F) -> T {
|
|
asupersync::runtime::RuntimeBuilder::new()
|
|
.build()
|
|
.unwrap()
|
|
.block_on(f)
|
|
}
|
|
|
|
fn setup_db() -> Connection {
|
|
let conn = create_connection(std::path::Path::new(":memory:")).unwrap();
|
|
run_migrations(&conn).unwrap();
|
|
|
|
conn.execute(
|
|
"INSERT INTO projects (id, gitlab_project_id, path_with_namespace, web_url)
|
|
VALUES (1, 100, 'test/repo', 'https://git.example.com/test/repo')",
|
|
[],
|
|
)
|
|
.unwrap();
|
|
|
|
conn
|
|
}
|
|
|
|
/// Simulates a multi-phase sync pipeline. Returns the number of phases completed
|
|
/// and the IIDs that were "synced" (written to DB).
|
|
///
|
|
/// If `cancel_after_batches` is `Some(n)`, the signal is cancelled after `n`
|
|
/// batches complete, causing the loop to exit before processing subsequent chunks.
|
|
async fn run_sync_pipeline(
|
|
conn: &Connection,
|
|
signal: &ShutdownSignal,
|
|
project_id: i64,
|
|
all_iids: &[i64],
|
|
batch_size: usize,
|
|
cancel_after_batches: Option<usize>,
|
|
) -> (usize, Vec<i64>) {
|
|
let mut phases_completed = 0usize;
|
|
let mut synced_iids = Vec::new();
|
|
|
|
for chunk in all_iids.chunks(batch_size) {
|
|
if signal.is_cancelled() {
|
|
break;
|
|
}
|
|
|
|
// Phase: concurrent fetch (simulated)
|
|
let fetched: Vec<i64> = {
|
|
let futs: Vec<_> = chunk
|
|
.iter()
|
|
.map(|&iid| async move {
|
|
asupersync::time::sleep(asupersync::time::wall_now(), Duration::from_millis(1))
|
|
.await;
|
|
iid
|
|
})
|
|
.collect();
|
|
join_all(futs).await
|
|
};
|
|
|
|
if signal.is_cancelled() {
|
|
break;
|
|
}
|
|
|
|
// Phase: DB write (transactional)
|
|
let tx = conn.unchecked_transaction().unwrap();
|
|
for &iid in &fetched {
|
|
tx.execute(
|
|
"INSERT OR IGNORE INTO issues (
|
|
gitlab_id, project_id, iid, title, state,
|
|
author_username, created_at, updated_at, last_seen_at
|
|
) VALUES (?1, ?2, ?3, 'issue', 'opened', 'bot', 1000, 2000, 3000)",
|
|
rusqlite::params![iid + 10000, project_id, iid],
|
|
)
|
|
.unwrap();
|
|
}
|
|
tx.commit().unwrap();
|
|
|
|
synced_iids.extend(&fetched);
|
|
phases_completed += 1;
|
|
|
|
// Simulate Ctrl+C after N batches (deterministic cancellation)
|
|
if cancel_after_batches.is_some_and(|limit| phases_completed >= limit) {
|
|
signal.cancel();
|
|
}
|
|
}
|
|
|
|
(phases_completed, synced_iids)
|
|
}
|
|
|
|
// ───────────────────────────────────────────────────────────────────
|
|
// Test 1: Full runtime lifecycle — build, spawn, block_on, drop
|
|
// ───────────────────────────────────────────────────────────────────
|
|
|
|
#[test]
|
|
fn runtime_lifecycle_build_run_drop() {
|
|
let rt = asupersync::runtime::RuntimeBuilder::new().build().unwrap();
|
|
let handle = rt.handle();
|
|
|
|
let counter = Arc::new(AtomicUsize::new(0));
|
|
let c = Arc::clone(&counter);
|
|
|
|
// Spawn a background task via handle
|
|
handle.spawn(async move {
|
|
asupersync::time::sleep(asupersync::time::wall_now(), Duration::from_millis(5)).await;
|
|
c.fetch_add(1, Ordering::Relaxed);
|
|
});
|
|
|
|
// block_on drives the reactor
|
|
let result = rt.block_on(async {
|
|
asupersync::time::sleep(asupersync::time::wall_now(), Duration::from_millis(20)).await;
|
|
42
|
|
});
|
|
|
|
assert_eq!(result, 42, "block_on should return the async result");
|
|
assert_eq!(
|
|
counter.load(Ordering::Relaxed),
|
|
1,
|
|
"spawned task should complete during block_on"
|
|
);
|
|
// rt drops here — no panics, no leaks
|
|
}
|
|
|
|
// ───────────────────────────────────────────────────────────────────
|
|
// Test 2: Cancel mid-flow, then resume — proves idempotent sync
|
|
// ───────────────────────────────────────────────────────────────────
|
|
|
|
#[test]
|
|
fn cancel_then_resume_completes_all_work() {
|
|
let conn = setup_db();
|
|
let all_iids: Vec<i64> = (1..=15).collect();
|
|
let batch_size = 5; // 3 batches of 5
|
|
|
|
// Run 1: cancel after first batch (deterministic — no timing dependency)
|
|
let (phases_r1, synced_r1) = run(async {
|
|
let signal = ShutdownSignal::new();
|
|
run_sync_pipeline(&conn, &signal, 1, &all_iids, batch_size, Some(1)).await
|
|
});
|
|
|
|
assert_eq!(
|
|
phases_r1, 1,
|
|
"exactly one batch should complete before cancel"
|
|
);
|
|
assert_eq!(
|
|
synced_r1.len(),
|
|
batch_size,
|
|
"only first batch should be synced"
|
|
);
|
|
|
|
// Verify DB state matches what was reported
|
|
let count_r1: i64 = conn
|
|
.query_row(
|
|
"SELECT COUNT(*) FROM issues WHERE project_id = 1",
|
|
[],
|
|
|row| row.get(0),
|
|
)
|
|
.unwrap();
|
|
assert_eq!(
|
|
count_r1,
|
|
synced_r1.len() as i64,
|
|
"DB count should match synced IIDs"
|
|
);
|
|
|
|
// Run 2: resume — find remaining IIDs and sync them
|
|
let already_synced: Vec<i64> = {
|
|
let mut stmt = conn
|
|
.prepare("SELECT iid FROM issues WHERE project_id = 1 ORDER BY iid")
|
|
.unwrap();
|
|
stmt.query_map([], |row| row.get(0))
|
|
.unwrap()
|
|
.map(|r| r.unwrap())
|
|
.collect()
|
|
};
|
|
|
|
let remaining: Vec<i64> = all_iids
|
|
.iter()
|
|
.filter(|iid| !already_synced.contains(iid))
|
|
.copied()
|
|
.collect();
|
|
|
|
assert!(
|
|
!remaining.is_empty(),
|
|
"there should be remaining work after cancellation"
|
|
);
|
|
|
|
let (phases_r2, synced_r2) = run(async {
|
|
let signal = ShutdownSignal::new(); // fresh signal, no cancel
|
|
run_sync_pipeline(&conn, &signal, 1, &remaining, batch_size, None).await
|
|
});
|
|
|
|
assert!(
|
|
phases_r2 >= 1,
|
|
"resume run should process remaining batches"
|
|
);
|
|
|
|
// Verify ALL 15 issues are now in the DB
|
|
let count_final: i64 = conn
|
|
.query_row(
|
|
"SELECT COUNT(*) FROM issues WHERE project_id = 1",
|
|
[],
|
|
|row| row.get(0),
|
|
)
|
|
.unwrap();
|
|
assert_eq!(
|
|
count_final, 15,
|
|
"all 15 issues should be in DB after cancel + resume"
|
|
);
|
|
|
|
// Verify no duplicates (INSERT OR IGNORE should prevent)
|
|
let total_synced = synced_r1.len() + synced_r2.len();
|
|
assert_eq!(
|
|
total_synced, 15,
|
|
"combined synced count should equal total issues"
|
|
);
|
|
}
|
|
|
|
// ───────────────────────────────────────────────────────────────────
|
|
// Test 3: Structured tracing events fire with expected fields
|
|
// ───────────────────────────────────────────────────────────────────
|
|
|
|
#[test]
|
|
fn structured_tracing_captures_phase_transitions() {
|
|
use tracing_subscriber::layer::SubscriberExt;
|
|
|
|
// Collect tracing events into a shared buffer
|
|
let events: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
|
|
let events_clone = Arc::clone(&events);
|
|
|
|
let layer = tracing_subscriber::fmt::layer()
|
|
.json()
|
|
.with_writer(move || EventWriter(Arc::clone(&events_clone)))
|
|
.with_target(false);
|
|
|
|
let subscriber = tracing_subscriber::registry().with(layer);
|
|
|
|
let _guard = tracing::subscriber::set_default(subscriber);
|
|
|
|
run(async {
|
|
let span = tracing::info_span!(
|
|
"sync_pipeline",
|
|
run_id = "test-run-001",
|
|
project = "test/repo"
|
|
);
|
|
let _enter = span.enter();
|
|
|
|
tracing::info!(phase = "fetch_issues", entity_count = 10, "phase started");
|
|
|
|
asupersync::time::sleep(asupersync::time::wall_now(), Duration::from_millis(1)).await;
|
|
|
|
tracing::info!(
|
|
phase = "fetch_issues",
|
|
entity_count = 10,
|
|
success = 8,
|
|
skipped = 2,
|
|
"phase completed"
|
|
);
|
|
|
|
tracing::info!(phase = "cancelled", reason = "ctrl_c", "pipeline stopped");
|
|
});
|
|
|
|
let captured = events.lock().unwrap();
|
|
assert!(
|
|
captured.len() >= 3,
|
|
"should capture at least 3 tracing events, got {}",
|
|
captured.len()
|
|
);
|
|
|
|
// Verify structured fields are present in the JSON output
|
|
let all_text = captured.join("\n");
|
|
assert!(
|
|
all_text.contains("sync_pipeline"),
|
|
"span name should appear in output"
|
|
);
|
|
assert!(
|
|
all_text.contains("test-run-001"),
|
|
"run_id field should appear in output"
|
|
);
|
|
assert!(
|
|
all_text.contains("fetch_issues"),
|
|
"phase field should appear in output"
|
|
);
|
|
assert!(
|
|
all_text.contains("ctrl_c"),
|
|
"cancellation reason should appear in output"
|
|
);
|
|
}
|
|
|
|
/// A Write impl that appends each line to a shared Vec.
|
|
struct EventWriter(Arc<Mutex<Vec<String>>>);
|
|
|
|
impl std::io::Write for EventWriter {
|
|
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
|
if let Ok(s) = std::str::from_utf8(buf) {
|
|
let trimmed = s.trim();
|
|
if !trimmed.is_empty() {
|
|
self.0.lock().unwrap().push(trimmed.to_string());
|
|
}
|
|
}
|
|
Ok(buf.len())
|
|
}
|
|
|
|
fn flush(&mut self) -> std::io::Result<()> {
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
// ───────────────────────────────────────────────────────────────────
|
|
// Test 4: Concurrent fan-out under asupersync with timing proof
|
|
// ───────────────────────────────────────────────────────────────────
|
|
|
|
#[test]
|
|
fn concurrent_fanout_runs_in_parallel() {
|
|
run(async {
|
|
let start = Instant::now();
|
|
|
|
// 10 tasks each sleeping 50ms — should complete in ~50ms if parallel
|
|
let futs: Vec<_> = (0..10)
|
|
.map(|_| async {
|
|
asupersync::time::sleep(asupersync::time::wall_now(), Duration::from_millis(50))
|
|
.await;
|
|
})
|
|
.collect();
|
|
|
|
join_all(futs).await;
|
|
|
|
let elapsed = start.elapsed();
|
|
// If sequential, this would take 500ms+. Parallel should be well under 200ms.
|
|
assert!(
|
|
elapsed < Duration::from_millis(200),
|
|
"fan-out should run concurrently, took {:?}",
|
|
elapsed
|
|
);
|
|
});
|
|
}
|
|
|
|
// ───────────────────────────────────────────────────────────────────
|
|
// Test 5: DB integrity after multiple runtime instantiations
|
|
// ───────────────────────────────────────────────────────────────────
|
|
|
|
#[test]
|
|
fn db_integrity_across_runtime_restarts() {
|
|
let conn = setup_db();
|
|
|
|
// Run 1: insert issues 1-5
|
|
{
|
|
let rt = asupersync::runtime::RuntimeBuilder::new().build().unwrap();
|
|
rt.block_on(async {
|
|
let tx = conn.unchecked_transaction().unwrap();
|
|
for iid in 1..=5 {
|
|
tx.execute(
|
|
"INSERT INTO issues (
|
|
gitlab_id, project_id, iid, title, state,
|
|
author_username, created_at, updated_at, last_seen_at
|
|
) VALUES (?1, 1, ?2, 'issue', 'opened', 'bot', 1000, 2000, 3000)",
|
|
rusqlite::params![iid + 10000, iid],
|
|
)
|
|
.unwrap();
|
|
}
|
|
tx.commit().unwrap();
|
|
});
|
|
// rt drops here — runtime fully torn down
|
|
}
|
|
|
|
// Run 2: new runtime, verify data survives, insert more
|
|
{
|
|
let rt = asupersync::runtime::RuntimeBuilder::new().build().unwrap();
|
|
rt.block_on(async {
|
|
let count: i64 = conn
|
|
.query_row(
|
|
"SELECT COUNT(*) FROM issues WHERE project_id = 1",
|
|
[],
|
|
|row| row.get(0),
|
|
)
|
|
.unwrap();
|
|
assert_eq!(count, 5, "run 1 data should persist");
|
|
|
|
let tx = conn.unchecked_transaction().unwrap();
|
|
for iid in 6..=10 {
|
|
tx.execute(
|
|
"INSERT INTO issues (
|
|
gitlab_id, project_id, iid, title, state,
|
|
author_username, created_at, updated_at, last_seen_at
|
|
) VALUES (?1, 1, ?2, 'issue', 'opened', 'bot', 1000, 2000, 3000)",
|
|
rusqlite::params![iid + 10000, iid],
|
|
)
|
|
.unwrap();
|
|
}
|
|
tx.commit().unwrap();
|
|
});
|
|
}
|
|
|
|
// Verify final state without a runtime (sync query)
|
|
let count: i64 = conn
|
|
.query_row(
|
|
"SELECT COUNT(*) FROM issues WHERE project_id = 1",
|
|
[],
|
|
|row| row.get(0),
|
|
)
|
|
.unwrap();
|
|
assert_eq!(count, 10, "both runs should have persisted correctly");
|
|
}
|
|
|
|
// ═══════════════════════════════════════════════════════════════════
|
|
// Real HTTP pipeline tests — exercise GitLabClient -> ingest_issues
|
|
// ═══════════════════════════════════════════════════════════════════
|
|
|
|
fn test_config() -> Config {
|
|
Config {
|
|
gitlab: GitLabConfig {
|
|
base_url: "https://gitlab.example.com".to_string(),
|
|
token_env_var: "GITLAB_TOKEN".to_string(),
|
|
token: None,
|
|
username: None,
|
|
},
|
|
projects: vec![ProjectConfig {
|
|
path: "group/project".to_string(),
|
|
}],
|
|
default_project: None,
|
|
sync: SyncConfig::default(),
|
|
storage: StorageConfig::default(),
|
|
embedding: EmbeddingConfig::default(),
|
|
logging: LoggingConfig::default(),
|
|
scoring: ScoringConfig::default(),
|
|
}
|
|
}
|
|
|
|
/// Build a GitLab-style issue JSON object with deterministic fields.
|
|
fn make_issue_json(id: i64, iid: i64, updated_at: &str) -> serde_json::Value {
|
|
json!({
|
|
"id": id,
|
|
"iid": iid,
|
|
"project_id": 100,
|
|
"title": format!("Issue {iid}"),
|
|
"description": format!("Description for issue {iid}"),
|
|
"state": "opened",
|
|
"created_at": "2024-01-01T00:00:00.000Z",
|
|
"updated_at": updated_at,
|
|
"closed_at": null,
|
|
"author": {
|
|
"id": 1,
|
|
"username": "testbot",
|
|
"name": "Test Bot"
|
|
},
|
|
"assignees": [],
|
|
"labels": ["backend"],
|
|
"milestone": null,
|
|
"due_date": null,
|
|
"web_url": format!("https://git.example.com/test/repo/-/issues/{iid}")
|
|
})
|
|
}
|
|
|
|
/// Drain HTTP request bytes from a TCP stream until the end-of-headers marker.
|
|
fn drain_http_request(stream: &mut std::net::TcpStream) {
|
|
let mut buf = [0u8; 8192];
|
|
let mut accumulated = Vec::new();
|
|
|
|
loop {
|
|
let n = stream.read(&mut buf).unwrap();
|
|
if n == 0 {
|
|
break;
|
|
}
|
|
accumulated.extend_from_slice(&buf[..n]);
|
|
if accumulated.windows(4).any(|w| w == b"\r\n\r\n") {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Write an HTTP response with the given status, headers, and JSON body.
|
|
fn write_http_response(
|
|
stream: &mut std::net::TcpStream,
|
|
status: u16,
|
|
reason: &str,
|
|
extra_headers: &[(&str, &str)],
|
|
body: &str,
|
|
) {
|
|
let mut header_block = format!(
|
|
"HTTP/1.1 {status} {reason}\r\n\
|
|
Content-Type: application/json\r\n\
|
|
Content-Length: {}\r\n",
|
|
body.len()
|
|
);
|
|
for (k, v) in extra_headers {
|
|
header_block.push_str(&format!("{k}: {v}\r\n"));
|
|
}
|
|
header_block.push_str("Connection: close\r\n\r\n");
|
|
|
|
stream.write_all(header_block.as_bytes()).unwrap();
|
|
stream.write_all(body.as_bytes()).unwrap();
|
|
stream.flush().unwrap();
|
|
}
|
|
|
|
/// Spin up a mock GitLab server that handles `request_count` sequential HTTP
|
|
/// connections. Each connection is handled by `handler_fn(connection_index, stream)`.
|
|
/// Returns the `http://127.0.0.1:{port}` base URL.
|
|
fn mock_gitlab_server<F>(request_count: usize, handler_fn: F) -> String
|
|
where
|
|
F: Fn(usize, &mut std::net::TcpStream) + Send + 'static,
|
|
{
|
|
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
|
|
let port = listener.local_addr().unwrap().port();
|
|
|
|
std::thread::spawn(move || {
|
|
for i in 0..request_count {
|
|
let Ok((mut stream, _)) = listener.accept() else {
|
|
break;
|
|
};
|
|
handler_fn(i, &mut stream);
|
|
}
|
|
});
|
|
|
|
format!("http://127.0.0.1:{port}")
|
|
}
|
|
|
|
// ───────────────────────────────────────────────────────────────────
|
|
// Test 6: Full HTTP ingestion pipeline — paginated issues via mock
|
|
// ───────────────────────────────────────────────────────────────────
|
|
|
|
#[test]
|
|
fn http_pipeline_ingest_issues_paginated() {
|
|
// Serve 2 pages of issues: page 1 has 3, page 2 has 2 (no x-next-page = done).
|
|
let base = mock_gitlab_server(2, |i, stream| {
|
|
drain_http_request(stream);
|
|
match i {
|
|
0 => {
|
|
let body = serde_json::to_string(&vec![
|
|
make_issue_json(1001, 1, "2024-06-01T00:00:00.000Z"),
|
|
make_issue_json(1002, 2, "2024-06-02T00:00:00.000Z"),
|
|
make_issue_json(1003, 3, "2024-06-03T00:00:00.000Z"),
|
|
])
|
|
.unwrap();
|
|
write_http_response(stream, 200, "OK", &[("x-next-page", "2")], &body);
|
|
}
|
|
_ => {
|
|
let body = serde_json::to_string(&vec![
|
|
make_issue_json(1004, 4, "2024-06-04T00:00:00.000Z"),
|
|
make_issue_json(1005, 5, "2024-06-05T00:00:00.000Z"),
|
|
])
|
|
.unwrap();
|
|
write_http_response(stream, 200, "OK", &[], &body);
|
|
}
|
|
}
|
|
});
|
|
|
|
let conn = setup_db();
|
|
let config = test_config();
|
|
|
|
run(async {
|
|
let client = GitLabClient::new(&base, "test-token", Some(1000.0));
|
|
let signal = ShutdownSignal::new();
|
|
|
|
let result = ingest_issues(&conn, &client, &config, 1, 100, &signal)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(
|
|
result.fetched, 5,
|
|
"should fetch all 5 issues across 2 pages"
|
|
);
|
|
assert_eq!(result.upserted, 5, "should upsert all 5 issues");
|
|
|
|
let db_count: i64 = conn
|
|
.query_row(
|
|
"SELECT COUNT(*) FROM issues WHERE project_id = 1",
|
|
[],
|
|
|row| row.get(0),
|
|
)
|
|
.unwrap();
|
|
assert_eq!(db_count, 5, "DB should contain 5 issues");
|
|
|
|
// Verify sync cursor was persisted
|
|
let cursor_ts: i64 = conn
|
|
.query_row(
|
|
"SELECT updated_at_cursor FROM sync_cursors
|
|
WHERE project_id = 1 AND resource_type = 'issues'",
|
|
[],
|
|
|row| row.get(0),
|
|
)
|
|
.unwrap();
|
|
assert!(cursor_ts > 0, "sync cursor should be set after ingestion");
|
|
|
|
// Verify labels created
|
|
let label_count: i64 = conn
|
|
.query_row(
|
|
"SELECT COUNT(*) FROM labels WHERE project_id = 1 AND name = 'backend'",
|
|
[],
|
|
|row| row.get(0),
|
|
)
|
|
.unwrap();
|
|
assert_eq!(label_count, 1, "backend label should exist");
|
|
|
|
let summary = json!({
|
|
"test": "http_pipeline_ingest_issues_paginated",
|
|
"fetched": result.fetched,
|
|
"upserted": result.upserted,
|
|
"labels_created": result.labels_created,
|
|
"db_count": db_count,
|
|
"cursor_set": cursor_ts > 0,
|
|
});
|
|
eprintln!("E2E_SUMMARY: {}", serde_json::to_string(&summary).unwrap());
|
|
});
|
|
}
|
|
|
|
// ───────────────────────────────────────────────────────────────────
|
|
// Test 7: Cancellation preserves DB integrity during real ingestion
|
|
// ───────────────────────────────────────────────────────────────────
|
|
|
|
#[test]
|
|
fn http_pipeline_cancel_preserves_integrity() {
|
|
let requests_served = Arc::new(AtomicUsize::new(0));
|
|
let requests_clone = Arc::clone(&requests_served);
|
|
|
|
// Serve 2 pages. The test cancels the signal immediately, so the stream
|
|
// should stop early. Whatever was committed must be consistent.
|
|
let base = mock_gitlab_server(2, move |i, stream| {
|
|
drain_http_request(stream);
|
|
requests_clone.fetch_add(1, Ordering::Relaxed);
|
|
match i {
|
|
0 => {
|
|
let body = serde_json::to_string(&vec![
|
|
make_issue_json(2001, 10, "2024-07-01T00:00:00.000Z"),
|
|
make_issue_json(2002, 11, "2024-07-02T00:00:00.000Z"),
|
|
make_issue_json(2003, 12, "2024-07-03T00:00:00.000Z"),
|
|
])
|
|
.unwrap();
|
|
write_http_response(stream, 200, "OK", &[("x-next-page", "2")], &body);
|
|
}
|
|
_ => {
|
|
let body = serde_json::to_string(&vec![
|
|
make_issue_json(2004, 13, "2024-07-04T00:00:00.000Z"),
|
|
make_issue_json(2005, 14, "2024-07-05T00:00:00.000Z"),
|
|
])
|
|
.unwrap();
|
|
write_http_response(stream, 200, "OK", &[], &body);
|
|
}
|
|
}
|
|
});
|
|
|
|
let conn = setup_db();
|
|
let config = test_config();
|
|
|
|
run(async {
|
|
let client = GitLabClient::new(&base, "test-token", Some(1000.0));
|
|
let signal = ShutdownSignal::new();
|
|
|
|
// Cancel immediately — signal is checked between each streamed issue
|
|
signal.cancel();
|
|
|
|
let result = ingest_issues(&conn, &client, &config, 1, 100, &signal)
|
|
.await
|
|
.unwrap();
|
|
|
|
// Key invariant: DB count must match reported upsert count
|
|
let db_count: i64 = conn
|
|
.query_row(
|
|
"SELECT COUNT(*) FROM issues WHERE project_id = 1",
|
|
[],
|
|
|row| row.get(0),
|
|
)
|
|
.unwrap();
|
|
|
|
assert_eq!(
|
|
db_count as usize, result.upserted,
|
|
"DB count must equal upserted count (no partial data)"
|
|
);
|
|
|
|
// If anything was upserted, cursor must exist
|
|
if result.upserted > 0 {
|
|
let cursor_exists: bool = conn
|
|
.query_row(
|
|
"SELECT COUNT(*) > 0 FROM sync_cursors
|
|
WHERE project_id = 1 AND resource_type = 'issues'",
|
|
[],
|
|
|row| row.get(0),
|
|
)
|
|
.unwrap();
|
|
assert!(
|
|
cursor_exists,
|
|
"cursor should exist when items were upserted"
|
|
);
|
|
}
|
|
|
|
let summary = json!({
|
|
"test": "http_pipeline_cancel_preserves_integrity",
|
|
"fetched": result.fetched,
|
|
"upserted": result.upserted,
|
|
"db_count": db_count,
|
|
"integrity_ok": db_count as usize == result.upserted,
|
|
});
|
|
eprintln!("E2E_SUMMARY: {}", serde_json::to_string(&summary).unwrap());
|
|
});
|
|
}
|
|
|
|
// ───────────────────────────────────────────────────────────────────
|
|
// Test 8: Resume via cursor — second run deduplicates correctly
|
|
// ───────────────────────────────────────────────────────────────────
|
|
|
|
#[test]
|
|
fn http_pipeline_resume_via_cursor() {
|
|
let conn = setup_db();
|
|
let config = test_config();
|
|
|
|
// --- Run 1: Ingest 3 issues ---
|
|
let base1 = mock_gitlab_server(1, |_i, stream| {
|
|
drain_http_request(stream);
|
|
let body = serde_json::to_string(&vec![
|
|
make_issue_json(3001, 20, "2024-08-01T00:00:00.000Z"),
|
|
make_issue_json(3002, 21, "2024-08-02T00:00:00.000Z"),
|
|
make_issue_json(3003, 22, "2024-08-03T00:00:00.000Z"),
|
|
])
|
|
.unwrap();
|
|
write_http_response(stream, 200, "OK", &[], &body);
|
|
});
|
|
|
|
let run1 = run(async {
|
|
let client = GitLabClient::new(&base1, "test-token", Some(1000.0));
|
|
let signal = ShutdownSignal::new();
|
|
ingest_issues(&conn, &client, &config, 1, 100, &signal)
|
|
.await
|
|
.unwrap()
|
|
});
|
|
|
|
assert_eq!(run1.upserted, 3, "run 1 should upsert 3 issues");
|
|
|
|
// --- Run 2: Same 3 + 2 new issues. Cursor should skip the first 3. ---
|
|
let base2 = mock_gitlab_server(1, |_i, stream| {
|
|
drain_http_request(stream);
|
|
let body = serde_json::to_string(&vec![
|
|
make_issue_json(3001, 20, "2024-08-01T00:00:00.000Z"),
|
|
make_issue_json(3002, 21, "2024-08-02T00:00:00.000Z"),
|
|
make_issue_json(3003, 22, "2024-08-03T00:00:00.000Z"),
|
|
make_issue_json(3004, 23, "2024-08-04T00:00:00.000Z"),
|
|
make_issue_json(3005, 24, "2024-08-05T00:00:00.000Z"),
|
|
])
|
|
.unwrap();
|
|
write_http_response(stream, 200, "OK", &[], &body);
|
|
});
|
|
|
|
let run2 = run(async {
|
|
let client = GitLabClient::new(&base2, "test-token", Some(1000.0));
|
|
let signal = ShutdownSignal::new();
|
|
ingest_issues(&conn, &client, &config, 1, 100, &signal)
|
|
.await
|
|
.unwrap()
|
|
});
|
|
|
|
assert_eq!(run2.fetched, 5, "run 2 should fetch all 5 from API");
|
|
assert_eq!(run2.upserted, 2, "run 2 should only upsert 2 new issues");
|
|
|
|
let total: i64 = conn
|
|
.query_row(
|
|
"SELECT COUNT(*) FROM issues WHERE project_id = 1",
|
|
[],
|
|
|row| row.get(0),
|
|
)
|
|
.unwrap();
|
|
assert_eq!(total, 5, "DB should have 5 total issues after resume");
|
|
|
|
let distinct: i64 = conn
|
|
.query_row(
|
|
"SELECT COUNT(DISTINCT iid) FROM issues WHERE project_id = 1",
|
|
[],
|
|
|row| row.get(0),
|
|
)
|
|
.unwrap();
|
|
assert_eq!(distinct, 5, "no duplicate IIDs");
|
|
|
|
let summary = json!({
|
|
"test": "http_pipeline_resume_via_cursor",
|
|
"run1_upserted": run1.upserted,
|
|
"run2_fetched": run2.fetched,
|
|
"run2_upserted": run2.upserted,
|
|
"total": total,
|
|
"no_duplicates": distinct == 5,
|
|
});
|
|
eprintln!("E2E_SUMMARY: {}", serde_json::to_string(&summary).unwrap());
|
|
}
|
|
|
|
// ───────────────────────────────────────────────────────────────────
|
|
// Test 9: Runtime quiescence — no leaked tasks after real ingestion
|
|
// ───────────────────────────────────────────────────────────────────
|
|
|
|
#[test]
|
|
fn http_pipeline_runtime_quiescence() {
|
|
let base = mock_gitlab_server(1, |_i, stream| {
|
|
drain_http_request(stream);
|
|
let body = serde_json::to_string(&vec![
|
|
make_issue_json(4001, 30, "2024-09-01T00:00:00.000Z"),
|
|
make_issue_json(4002, 31, "2024-09-02T00:00:00.000Z"),
|
|
])
|
|
.unwrap();
|
|
write_http_response(stream, 200, "OK", &[], &body);
|
|
});
|
|
|
|
let conn = setup_db();
|
|
let config = test_config();
|
|
|
|
let rt = asupersync::runtime::RuntimeBuilder::new().build().unwrap();
|
|
|
|
let result = rt.block_on(async {
|
|
let client = GitLabClient::new(&base, "test-token", Some(1000.0));
|
|
let signal = ShutdownSignal::new();
|
|
ingest_issues(&conn, &client, &config, 1, 100, &signal)
|
|
.await
|
|
.unwrap()
|
|
});
|
|
|
|
// Runtime drops cleanly — if tasks leaked, this would hang or panic
|
|
drop(rt);
|
|
|
|
assert_eq!(result.upserted, 2);
|
|
let count: i64 = conn
|
|
.query_row(
|
|
"SELECT COUNT(*) FROM issues WHERE project_id = 1",
|
|
[],
|
|
|row| row.get(0),
|
|
)
|
|
.unwrap();
|
|
assert_eq!(count, 2, "data committed before runtime drop");
|
|
|
|
let summary = json!({
|
|
"test": "http_pipeline_runtime_quiescence",
|
|
"upserted": result.upserted,
|
|
"db_count": count,
|
|
"runtime_dropped_cleanly": true,
|
|
});
|
|
eprintln!("E2E_SUMMARY: {}", serde_json::to_string(&summary).unwrap());
|
|
}
|