Files
gitlore/tests/http_integration.rs
teernisse e8d6c5b15f feat(runtime): replace tokio+reqwest with asupersync async runtime
- Add HTTP adapter layer (src/http.rs) wrapping asupersync h1 client
- Migrate gitlab client, graphql, and ollama to HTTP adapter
- Swap entrypoint from #[tokio::main] to RuntimeBuilder::new().block_on()
- Rewrite signal handler for asupersync (RuntimeHandle::spawn + ctrl_c())
- Migrate rate limiter sleeps to asupersync::time::sleep(wall_now(), d)
- Add asupersync-native HTTP integration tests
- Convert timeline_seed_tests to RuntimeBuilder pattern

Phases 1-3 of asupersync migration (atomic: code won't compile without all pieces).
2026-03-06 15:57:20 -05:00

242 lines
7.9 KiB
Rust

use std::io::{Read, Write};
use std::net::TcpListener;
use std::time::Duration;
use lore::http::Client;
/// Spin up a one-shot TCP server that replies with `response_bytes` to the first
/// connection, then shuts down. Returns the `http://127.0.0.1:{port}` base URL.
fn oneshot_server(response_bytes: Vec<u8>) -> String {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let port = listener.local_addr().unwrap().port();
std::thread::spawn(move || {
let (mut stream, _) = listener.accept().unwrap();
// Drain the request so the client doesn't get a broken pipe.
let mut buf = [0u8; 4096];
loop {
let n = stream.read(&mut buf).unwrap();
// Detect end-of-headers (double CRLF). For simplicity we don't
// parse Content-Length; our test requests have tiny/no bodies.
if buf[..n].windows(4).any(|w| w == b"\r\n\r\n") {
break;
}
}
stream.write_all(&response_bytes).unwrap();
stream.flush().unwrap();
// Drop closes the connection.
});
format!("http://127.0.0.1:{port}")
}
fn json_response(status: u16, reason: &str, body: &str) -> Vec<u8> {
let headers = format!(
"HTTP/1.1 {status} {reason}\r\n\
Content-Type: application/json\r\n\
Content-Length: {}\r\n\
\r\n",
body.len()
);
let mut bytes = headers.into_bytes();
bytes.extend_from_slice(body.as_bytes());
bytes
}
fn run<F: std::future::Future<Output = T>, T>(f: F) -> T {
asupersync::runtime::RuntimeBuilder::new()
.build()
.unwrap()
.block_on(f)
}
// -------------------------------------------------------------------
// Test 1: GET with headers + JSON response
// -------------------------------------------------------------------
#[test]
fn get_with_headers_and_json_response() {
let body = r#"{"ok":true,"data":"hello"}"#;
let raw = format!(
"HTTP/1.1 200 OK\r\n\
Content-Type: application/json\r\n\
X-Custom: test-value\r\n\
Content-Length: {}\r\n\
\r\n\
{body}",
body.len()
);
let base = oneshot_server(raw.into_bytes());
run(async {
let client = Client::with_timeout(Duration::from_secs(5));
let resp = client
.get(
&format!("{base}/api/test"),
&[("Accept", "application/json")],
)
.await
.unwrap();
assert!(resp.is_success());
assert_eq!(resp.status, 200);
assert_eq!(resp.header("x-custom"), Some("test-value"));
let parsed: serde_json::Value = resp.json().unwrap();
assert_eq!(parsed["ok"], true);
assert_eq!(parsed["data"], "hello");
});
}
// -------------------------------------------------------------------
// Test 2: POST with JSON body
// -------------------------------------------------------------------
#[test]
fn post_json_body_round_trip() {
let resp_body = r#"{"received":true}"#;
let base = oneshot_server(json_response(200, "OK", resp_body));
run(async {
let client = Client::with_timeout(Duration::from_secs(5));
#[derive(serde::Serialize)]
struct Payload {
model: String,
input: Vec<String>,
}
let payload = Payload {
model: "test-model".into(),
input: vec!["hello".into(), "world".into()],
};
let resp = client
.post_json(&format!("{base}/api/embed"), &[], &payload)
.await
.unwrap();
assert!(resp.is_success());
let parsed: serde_json::Value = resp.json().unwrap();
assert_eq!(parsed["received"], true);
});
}
// -------------------------------------------------------------------
// Test 3: Non-success status code (429) with Retry-After header
// -------------------------------------------------------------------
#[test]
fn non_success_status_with_retry_after() {
let body = r#"{"error":"rate limited"}"#;
let raw = format!(
"HTTP/1.1 429 Too Many Requests\r\n\
Retry-After: 30\r\n\
Content-Type: application/json\r\n\
Content-Length: {}\r\n\
\r\n\
{body}",
body.len()
);
let base = oneshot_server(raw.into_bytes());
run(async {
let client = Client::with_timeout(Duration::from_secs(5));
let resp = client.get(&format!("{base}/api/data"), &[]).await.unwrap();
assert!(!resp.is_success());
assert_eq!(resp.status, 429);
assert_eq!(resp.header("retry-after"), Some("30"));
let parsed: serde_json::Value = resp.json().unwrap();
assert_eq!(parsed["error"], "rate limited");
});
}
// -------------------------------------------------------------------
// Test 4: Timeout fires correctly
// -------------------------------------------------------------------
#[test]
fn timeout_fires_on_slow_server() {
// Server accepts but never responds.
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let port = listener.local_addr().unwrap().port();
std::thread::spawn(move || {
let (_stream, _) = listener.accept().unwrap();
// Hold the connection open without writing anything.
std::thread::sleep(Duration::from_secs(30));
});
let base = format!("http://127.0.0.1:{port}");
run(async {
let client = Client::with_timeout(Duration::from_millis(200));
let result = client.get(&format!("{base}/api/slow"), &[]).await;
assert!(result.is_err(), "expected timeout error");
let err_str = format!("{:?}", result.unwrap_err());
// The error should mention timeout.
assert!(
err_str.to_lowercase().contains("timeout")
|| err_str.to_lowercase().contains("timed out"),
"error should mention timeout, got: {err_str}"
);
});
}
// -------------------------------------------------------------------
// Test 5: Large response rejection (64 MiB guard)
// -------------------------------------------------------------------
#[test]
fn large_response_rejected() {
// Build a response claiming a huge Content-Length but only sending the headers
// plus a body that exceeds 64 MiB. We actually send 64 MiB + 1 byte.
// To avoid allocating that much in the test, we use chunked transfer with a
// server that streams data.
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let port = listener.local_addr().unwrap().port();
std::thread::spawn(move || {
let (mut stream, _) = listener.accept().unwrap();
// Drain request headers.
let mut buf = [0u8; 4096];
loop {
let n = stream.read(&mut buf).unwrap();
if buf[..n].windows(4).any(|w| w == b"\r\n\r\n") {
break;
}
}
let oversized = 64 * 1024 * 1024 + 1; // 64 MiB + 1
let header = format!(
"HTTP/1.1 200 OK\r\n\
Content-Length: {oversized}\r\n\
\r\n"
);
stream.write_all(header.as_bytes()).unwrap();
// Stream zeros in chunks to avoid a huge allocation.
let chunk = vec![0u8; 1024 * 1024]; // 1 MiB chunks
for _ in 0..65 {
if stream.write_all(&chunk).is_err() {
break; // Client may close early.
}
}
});
let base = format!("http://127.0.0.1:{port}");
run(async {
let client = Client::with_timeout(Duration::from_secs(30));
let result = client.get(&format!("{base}/api/huge"), &[]).await;
assert!(result.is_err(), "expected large-response rejection");
let err_str = format!("{:?}", result.unwrap_err());
assert!(
err_str.contains("too large")
|| err_str.contains("Response body")
|| err_str.contains("BodyTooLarge"),
"error should mention body size, got: {err_str}"
);
});
}