Files
gitlore/src/http.rs
teernisse e8d6c5b15f feat(runtime): replace tokio+reqwest with asupersync async runtime
- Add HTTP adapter layer (src/http.rs) wrapping asupersync h1 client
- Migrate gitlab client, graphql, and ollama to HTTP adapter
- Swap entrypoint from #[tokio::main] to RuntimeBuilder::new().block_on()
- Rewrite signal handler for asupersync (RuntimeHandle::spawn + ctrl_c())
- Migrate rate limiter sleeps to asupersync::time::sleep(wall_now(), d)
- Add asupersync-native HTTP integration tests
- Convert timeline_seed_tests to RuntimeBuilder pattern

Phases 1-3 of asupersync migration (atomic: code won't compile without all pieces).
2026-03-06 15:57:20 -05:00

319 lines
9.5 KiB
Rust

use std::time::Duration;
use asupersync::http::h1::{
ClientError, HttpClient, HttpClientConfig, Method, Response as RawResponse,
};
use asupersync::http::pool::PoolConfig;
use asupersync::time::{timeout, wall_now};
use serde::Serialize;
use serde::de::DeserializeOwned;
use crate::core::error::{LoreError, NetworkErrorKind, Result};
const MAX_RESPONSE_BODY_BYTES: usize = 64 * 1024 * 1024; // 64 MiB
pub struct Client {
inner: HttpClient,
timeout: Duration,
}
#[derive(Debug)]
pub struct Response {
pub status: u16,
pub reason: String,
pub headers: Vec<(String, String)>,
body: Vec<u8>,
}
impl Client {
pub fn with_timeout(timeout: Duration) -> Self {
Self {
inner: HttpClient::with_config(HttpClientConfig {
pool_config: PoolConfig {
max_connections_per_host: 6,
max_total_connections: 100,
idle_timeout: Duration::from_secs(90),
..Default::default()
},
..Default::default()
}),
timeout,
}
}
pub async fn get(&self, url: &str, headers: &[(&str, &str)]) -> Result<Response> {
self.execute(Method::Get, url, headers, Vec::new()).await
}
pub async fn get_with_query(
&self,
url: &str,
params: &[(&str, String)],
headers: &[(&str, &str)],
) -> Result<Response> {
let full_url = append_query_params(url, params);
self.execute(Method::Get, &full_url, headers, Vec::new())
.await
}
pub async fn post_json<T: Serialize>(
&self,
url: &str,
headers: &[(&str, &str)],
body: &T,
) -> Result<Response> {
let body_bytes = serde_json::to_vec(body)
.map_err(|e| LoreError::Other(format!("JSON serialization failed: {e}")))?;
let mut all_headers: Vec<(&str, &str)> = headers.to_vec();
all_headers.push(("Content-Type", "application/json"));
self.execute(Method::Post, url, &all_headers, body_bytes)
.await
}
async fn execute(
&self,
method: Method,
url: &str,
headers: &[(&str, &str)],
body: Vec<u8>,
) -> Result<Response> {
let header_tuples: Vec<(String, String)> = headers
.iter()
.map(|(k, v)| ((*k).to_owned(), (*v).to_owned()))
.collect();
let raw: RawResponse = timeout(
wall_now(),
self.timeout,
self.inner.request(method, url, header_tuples, body),
)
.await
.map_err(|_| LoreError::GitLabNetworkError {
base_url: url.to_string(),
kind: NetworkErrorKind::Timeout,
detail: Some(format!("Request timed out after {:?}", self.timeout)),
})?
.map_err(|e| LoreError::GitLabNetworkError {
base_url: url.to_string(),
kind: classify_transport_error(&e),
detail: Some(format!("{e:?}")),
})?;
if raw.body.len() > MAX_RESPONSE_BODY_BYTES {
return Err(LoreError::Other(format!(
"Response body too large: {} bytes (max {MAX_RESPONSE_BODY_BYTES})",
raw.body.len(),
)));
}
Ok(Response {
status: raw.status,
reason: raw.reason,
headers: raw.headers,
body: raw.body,
})
}
}
impl Response {
pub fn is_success(&self) -> bool {
(200..300).contains(&self.status)
}
pub fn json<T: DeserializeOwned>(&self) -> Result<T> {
serde_json::from_slice(&self.body)
.map_err(|e| LoreError::Other(format!("JSON parse error: {e}")))
}
pub fn text(self) -> Result<String> {
String::from_utf8(self.body)
.map_err(|e| LoreError::Other(format!("UTF-8 decode error: {e}")))
}
pub fn header(&self, name: &str) -> Option<&str> {
self.headers
.iter()
.find(|(k, _)| k.eq_ignore_ascii_case(name))
.map(|(_, v)| v.as_str())
}
pub fn headers_all(&self, name: &str) -> Vec<&str> {
self.headers
.iter()
.filter(|(k, _)| k.eq_ignore_ascii_case(name))
.map(|(_, v)| v.as_str())
.collect()
}
}
fn classify_transport_error(e: &ClientError) -> NetworkErrorKind {
match e {
ClientError::DnsError(_) => NetworkErrorKind::DnsResolution,
ClientError::ConnectError(_) => NetworkErrorKind::ConnectionRefused,
ClientError::TlsError(_) => NetworkErrorKind::Tls,
_ => NetworkErrorKind::Other,
}
}
fn append_query_params(url: &str, params: &[(&str, String)]) -> String {
if params.is_empty() {
return url.to_string();
}
let query: String = params
.iter()
.map(|(k, v)| format!("{}={}", urlencoding::encode(k), urlencoding::encode(v)))
.collect::<Vec<_>>()
.join("&");
let (base, fragment) = match url.split_once('#') {
Some((b, f)) => (b, Some(f)),
None => (url, None),
};
let with_query = if base.contains('?') {
format!("{base}&{query}")
} else {
format!("{base}?{query}")
};
match fragment {
Some(f) => format!("{with_query}#{f}"),
None => with_query,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn append_query_params_empty_returns_unchanged() {
let url = "https://example.com/api";
assert_eq!(append_query_params(url, &[]), url);
}
#[test]
fn append_query_params_adds_question_mark() {
let result = append_query_params(
"https://example.com/api",
&[("page", "1".into()), ("per_page", "20".into())],
);
assert_eq!(result, "https://example.com/api?page=1&per_page=20");
}
#[test]
fn append_query_params_existing_query_uses_ampersand() {
let result = append_query_params(
"https://example.com/api?state=opened",
&[("page", "2".into())],
);
assert_eq!(result, "https://example.com/api?state=opened&page=2");
}
#[test]
fn append_query_params_preserves_fragment() {
let result =
append_query_params("https://example.com/api#section", &[("key", "val".into())]);
assert_eq!(result, "https://example.com/api?key=val#section");
}
#[test]
fn append_query_params_encodes_special_chars() {
let result =
append_query_params("https://example.com/api", &[("labels[]", "bug fix".into())]);
assert_eq!(result, "https://example.com/api?labels%5B%5D=bug%20fix");
}
#[test]
fn append_query_params_repeated_keys() {
let result = append_query_params(
"https://example.com/api",
&[("labels[]", "bug".into()), ("labels[]", "urgent".into())],
);
assert_eq!(
result,
"https://example.com/api?labels%5B%5D=bug&labels%5B%5D=urgent"
);
}
#[test]
fn response_header_case_insensitive() {
let resp = Response {
status: 200,
reason: "OK".into(),
headers: vec![
("Content-Type".into(), "application/json".into()),
("X-Page".into(), "1".into()),
],
body: Vec::new(),
};
assert_eq!(resp.header("content-type"), Some("application/json"));
assert_eq!(resp.header("CONTENT-TYPE"), Some("application/json"));
assert_eq!(resp.header("Content-Type"), Some("application/json"));
assert_eq!(resp.header("x-page"), Some("1"));
assert_eq!(resp.header("X-Missing"), None);
}
#[test]
fn response_headers_all_returns_multiple_values() {
let resp = Response {
status: 200,
reason: "OK".into(),
headers: vec![
("Link".into(), "<url1>; rel=\"next\"".into()),
("Link".into(), "<url2>; rel=\"last\"".into()),
("Content-Type".into(), "application/json".into()),
],
body: Vec::new(),
};
let links = resp.headers_all("link");
assert_eq!(links.len(), 2);
assert_eq!(links[0], "<url1>; rel=\"next\"");
assert_eq!(links[1], "<url2>; rel=\"last\"");
}
#[test]
fn response_is_success_range() {
for status in [200, 201, 204, 299] {
let resp = Response {
status,
reason: String::new(),
headers: Vec::new(),
body: Vec::new(),
};
assert!(resp.is_success(), "status {status} should be success");
}
for status in [100, 199, 300, 301, 400, 404, 500] {
let resp = Response {
status,
reason: String::new(),
headers: Vec::new(),
body: Vec::new(),
};
assert!(!resp.is_success(), "status {status} should not be success");
}
}
#[test]
fn classify_dns_error() {
let err = ClientError::DnsError(std::io::Error::other("dns failed"));
assert_eq!(
classify_transport_error(&err),
NetworkErrorKind::DnsResolution
);
}
#[test]
fn classify_connect_error() {
let err = ClientError::ConnectError(std::io::Error::other("refused"));
assert_eq!(
classify_transport_error(&err),
NetworkErrorKind::ConnectionRefused
);
}
#[test]
fn classify_tls_error() {
let err = ClientError::TlsError("bad cert".into());
assert_eq!(classify_transport_error(&err), NetworkErrorKind::Tls);
}
}