- Add HTTP adapter layer (src/http.rs) wrapping asupersync h1 client - Migrate gitlab client, graphql, and ollama to HTTP adapter - Swap entrypoint from #[tokio::main] to RuntimeBuilder::new().block_on() - Rewrite signal handler for asupersync (RuntimeHandle::spawn + ctrl_c()) - Migrate rate limiter sleeps to asupersync::time::sleep(wall_now(), d) - Add asupersync-native HTTP integration tests - Convert timeline_seed_tests to RuntimeBuilder pattern Phases 1-3 of asupersync migration (atomic: code won't compile without all pieces).
242 lines
7.9 KiB
Rust
242 lines
7.9 KiB
Rust
use std::io::{Read, Write};
|
|
use std::net::TcpListener;
|
|
use std::time::Duration;
|
|
|
|
use lore::http::Client;
|
|
|
|
/// Spin up a one-shot TCP server that replies with `response_bytes` to the first
|
|
/// connection, then shuts down. Returns the `http://127.0.0.1:{port}` base URL.
|
|
fn oneshot_server(response_bytes: Vec<u8>) -> String {
|
|
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
|
|
let port = listener.local_addr().unwrap().port();
|
|
std::thread::spawn(move || {
|
|
let (mut stream, _) = listener.accept().unwrap();
|
|
// Drain the request so the client doesn't get a broken pipe.
|
|
let mut buf = [0u8; 4096];
|
|
loop {
|
|
let n = stream.read(&mut buf).unwrap();
|
|
// Detect end-of-headers (double CRLF). For simplicity we don't
|
|
// parse Content-Length; our test requests have tiny/no bodies.
|
|
if buf[..n].windows(4).any(|w| w == b"\r\n\r\n") {
|
|
break;
|
|
}
|
|
}
|
|
stream.write_all(&response_bytes).unwrap();
|
|
stream.flush().unwrap();
|
|
// Drop closes the connection.
|
|
});
|
|
format!("http://127.0.0.1:{port}")
|
|
}
|
|
|
|
fn json_response(status: u16, reason: &str, body: &str) -> Vec<u8> {
|
|
let headers = format!(
|
|
"HTTP/1.1 {status} {reason}\r\n\
|
|
Content-Type: application/json\r\n\
|
|
Content-Length: {}\r\n\
|
|
\r\n",
|
|
body.len()
|
|
);
|
|
let mut bytes = headers.into_bytes();
|
|
bytes.extend_from_slice(body.as_bytes());
|
|
bytes
|
|
}
|
|
|
|
fn run<F: std::future::Future<Output = T>, T>(f: F) -> T {
|
|
asupersync::runtime::RuntimeBuilder::new()
|
|
.build()
|
|
.unwrap()
|
|
.block_on(f)
|
|
}
|
|
|
|
// -------------------------------------------------------------------
|
|
// Test 1: GET with headers + JSON response
|
|
// -------------------------------------------------------------------
|
|
|
|
#[test]
|
|
fn get_with_headers_and_json_response() {
|
|
let body = r#"{"ok":true,"data":"hello"}"#;
|
|
let raw = format!(
|
|
"HTTP/1.1 200 OK\r\n\
|
|
Content-Type: application/json\r\n\
|
|
X-Custom: test-value\r\n\
|
|
Content-Length: {}\r\n\
|
|
\r\n\
|
|
{body}",
|
|
body.len()
|
|
);
|
|
let base = oneshot_server(raw.into_bytes());
|
|
|
|
run(async {
|
|
let client = Client::with_timeout(Duration::from_secs(5));
|
|
let resp = client
|
|
.get(
|
|
&format!("{base}/api/test"),
|
|
&[("Accept", "application/json")],
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert!(resp.is_success());
|
|
assert_eq!(resp.status, 200);
|
|
assert_eq!(resp.header("x-custom"), Some("test-value"));
|
|
|
|
let parsed: serde_json::Value = resp.json().unwrap();
|
|
assert_eq!(parsed["ok"], true);
|
|
assert_eq!(parsed["data"], "hello");
|
|
});
|
|
}
|
|
|
|
// -------------------------------------------------------------------
|
|
// Test 2: POST with JSON body
|
|
// -------------------------------------------------------------------
|
|
|
|
#[test]
|
|
fn post_json_body_round_trip() {
|
|
let resp_body = r#"{"received":true}"#;
|
|
let base = oneshot_server(json_response(200, "OK", resp_body));
|
|
|
|
run(async {
|
|
let client = Client::with_timeout(Duration::from_secs(5));
|
|
|
|
#[derive(serde::Serialize)]
|
|
struct Payload {
|
|
model: String,
|
|
input: Vec<String>,
|
|
}
|
|
|
|
let payload = Payload {
|
|
model: "test-model".into(),
|
|
input: vec!["hello".into(), "world".into()],
|
|
};
|
|
|
|
let resp = client
|
|
.post_json(&format!("{base}/api/embed"), &[], &payload)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert!(resp.is_success());
|
|
let parsed: serde_json::Value = resp.json().unwrap();
|
|
assert_eq!(parsed["received"], true);
|
|
});
|
|
}
|
|
|
|
// -------------------------------------------------------------------
|
|
// Test 3: Non-success status code (429) with Retry-After header
|
|
// -------------------------------------------------------------------
|
|
|
|
#[test]
|
|
fn non_success_status_with_retry_after() {
|
|
let body = r#"{"error":"rate limited"}"#;
|
|
let raw = format!(
|
|
"HTTP/1.1 429 Too Many Requests\r\n\
|
|
Retry-After: 30\r\n\
|
|
Content-Type: application/json\r\n\
|
|
Content-Length: {}\r\n\
|
|
\r\n\
|
|
{body}",
|
|
body.len()
|
|
);
|
|
let base = oneshot_server(raw.into_bytes());
|
|
|
|
run(async {
|
|
let client = Client::with_timeout(Duration::from_secs(5));
|
|
let resp = client.get(&format!("{base}/api/data"), &[]).await.unwrap();
|
|
|
|
assert!(!resp.is_success());
|
|
assert_eq!(resp.status, 429);
|
|
assert_eq!(resp.header("retry-after"), Some("30"));
|
|
|
|
let parsed: serde_json::Value = resp.json().unwrap();
|
|
assert_eq!(parsed["error"], "rate limited");
|
|
});
|
|
}
|
|
|
|
// -------------------------------------------------------------------
|
|
// Test 4: Timeout fires correctly
|
|
// -------------------------------------------------------------------
|
|
|
|
#[test]
|
|
fn timeout_fires_on_slow_server() {
|
|
// Server accepts but never responds.
|
|
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
|
|
let port = listener.local_addr().unwrap().port();
|
|
std::thread::spawn(move || {
|
|
let (_stream, _) = listener.accept().unwrap();
|
|
// Hold the connection open without writing anything.
|
|
std::thread::sleep(Duration::from_secs(30));
|
|
});
|
|
let base = format!("http://127.0.0.1:{port}");
|
|
|
|
run(async {
|
|
let client = Client::with_timeout(Duration::from_millis(200));
|
|
let result = client.get(&format!("{base}/api/slow"), &[]).await;
|
|
|
|
assert!(result.is_err(), "expected timeout error");
|
|
let err_str = format!("{:?}", result.unwrap_err());
|
|
// The error should mention timeout.
|
|
assert!(
|
|
err_str.to_lowercase().contains("timeout")
|
|
|| err_str.to_lowercase().contains("timed out"),
|
|
"error should mention timeout, got: {err_str}"
|
|
);
|
|
});
|
|
}
|
|
|
|
// -------------------------------------------------------------------
|
|
// Test 5: Large response rejection (64 MiB guard)
|
|
// -------------------------------------------------------------------
|
|
|
|
#[test]
|
|
fn large_response_rejected() {
|
|
// Build a response claiming a huge Content-Length but only sending the headers
|
|
// plus a body that exceeds 64 MiB. We actually send 64 MiB + 1 byte.
|
|
// To avoid allocating that much in the test, we use chunked transfer with a
|
|
// server that streams data.
|
|
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
|
|
let port = listener.local_addr().unwrap().port();
|
|
|
|
std::thread::spawn(move || {
|
|
let (mut stream, _) = listener.accept().unwrap();
|
|
// Drain request headers.
|
|
let mut buf = [0u8; 4096];
|
|
loop {
|
|
let n = stream.read(&mut buf).unwrap();
|
|
if buf[..n].windows(4).any(|w| w == b"\r\n\r\n") {
|
|
break;
|
|
}
|
|
}
|
|
|
|
let oversized = 64 * 1024 * 1024 + 1; // 64 MiB + 1
|
|
let header = format!(
|
|
"HTTP/1.1 200 OK\r\n\
|
|
Content-Length: {oversized}\r\n\
|
|
\r\n"
|
|
);
|
|
stream.write_all(header.as_bytes()).unwrap();
|
|
|
|
// Stream zeros in chunks to avoid a huge allocation.
|
|
let chunk = vec![0u8; 1024 * 1024]; // 1 MiB chunks
|
|
for _ in 0..65 {
|
|
if stream.write_all(&chunk).is_err() {
|
|
break; // Client may close early.
|
|
}
|
|
}
|
|
});
|
|
|
|
let base = format!("http://127.0.0.1:{port}");
|
|
|
|
run(async {
|
|
let client = Client::with_timeout(Duration::from_secs(30));
|
|
let result = client.get(&format!("{base}/api/huge"), &[]).await;
|
|
|
|
assert!(result.is_err(), "expected large-response rejection");
|
|
let err_str = format!("{:?}", result.unwrap_err());
|
|
assert!(
|
|
err_str.contains("too large")
|
|
|| err_str.contains("Response body")
|
|
|| err_str.contains("BodyTooLarge"),
|
|
"error should mention body size, got: {err_str}"
|
|
);
|
|
});
|
|
}
|