feat(runtime): replace tokio+reqwest with asupersync async runtime

- Add HTTP adapter layer (src/http.rs) wrapping asupersync h1 client
- Migrate gitlab client, graphql, and ollama to HTTP adapter
- Swap entrypoint from #[tokio::main] to RuntimeBuilder::new().block_on()
- Rewrite signal handler for asupersync (RuntimeHandle::spawn + ctrl_c())
- Migrate rate limiter sleeps to asupersync::time::sleep(wall_now(), d)
- Add asupersync-native HTTP integration tests
- Convert timeline_seed_tests to RuntimeBuilder pattern

Phases 1-3 of asupersync migration (atomic: code won't compile without all pieces).
This commit is contained in:
teernisse
2026-03-06 15:23:55 -05:00
parent bf977eca1a
commit e8d6c5b15f
16 changed files with 1974 additions and 1189 deletions

File diff suppressed because one or more lines are too long

View File

@@ -1 +1 @@
bd-8con
bd-26km

950
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -29,15 +29,11 @@ lipgloss = { package = "charmed-lipgloss", version = "0.2", default-features = f
open = "5"
# HTTP
reqwest = { version = "0.12", features = ["json"] }
tokio = { version = "1", features = ["rt-multi-thread", "macros", "time", "signal"] }
# Async runtime (asupersync migration candidate)
asupersync = { version = "0.2", features = ["tls", "tls-native-roots", "proc-macros"] }
asupersync = { version = "0.2", features = ["tls", "tls-native-roots"] }
# Async streaming for pagination
async-stream = "0.3"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures = { version = "0.3", default-features = false, features = ["alloc", "async-await"] }
# Utilities
thiserror = "2"
@@ -63,6 +59,7 @@ tracing-appender = "0.2"
[dev-dependencies]
tempfile = "3"
tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros"] }
wiremock = "0.6"
[profile.release]

View File

@@ -162,6 +162,7 @@ async fn handle_ingest(
robot_mode: bool,
quiet: bool,
metrics: &MetricsLayer,
rt_handle: &asupersync::runtime::RuntimeHandle,
) -> Result<(), Box<dyn std::error::Error>> {
let start = std::time::Instant::now();
let dry_run = args.dry_run && !args.no_dry_run;
@@ -212,7 +213,7 @@ async fn handle_ingest(
let recorder = SyncRunRecorder::start(&recorder_conn, &command, run_id_short)?;
let signal = ShutdownSignal::new();
install_ctrl_c_handler(signal.clone());
install_ctrl_c_handler(rt_handle, signal.clone());
let ingest_result: std::result::Result<(), Box<dyn std::error::Error>> = async {
match args.entity.as_deref() {
@@ -1516,6 +1517,7 @@ async fn handle_embed(
config_override: Option<&str>,
args: EmbedArgs,
robot_mode: bool,
rt_handle: &asupersync::runtime::RuntimeHandle,
) -> Result<(), Box<dyn std::error::Error>> {
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
@@ -1526,7 +1528,7 @@ async fn handle_embed(
let retry_failed = args.retry_failed && !args.no_retry_failed;
let signal = ShutdownSignal::new();
install_ctrl_c_handler(signal.clone());
install_ctrl_c_handler(rt_handle, signal.clone());
let embed_bar = lore::cli::progress::nested_progress("Embedding", 0, robot_mode);
let bar_clone = embed_bar.clone();
@@ -1565,6 +1567,7 @@ async fn handle_sync_cmd(
args: SyncArgs,
robot_mode: bool,
metrics: &MetricsLayer,
rt_handle: &asupersync::runtime::RuntimeHandle,
) -> Result<(), Box<dyn std::error::Error>> {
let dry_run = args.dry_run && !args.no_dry_run;
@@ -1671,7 +1674,7 @@ async fn handle_sync_cmd(
// Skip the normal recorder setup and let the dispatch handle everything.
if options.is_surgical() {
let signal = ShutdownSignal::new();
install_ctrl_c_handler(signal.clone());
install_ctrl_c_handler(rt_handle, signal.clone());
let start = std::time::Instant::now();
match run_sync(&config, options, None, &signal).await {
@@ -1695,7 +1698,7 @@ async fn handle_sync_cmd(
let recorder = SyncRunRecorder::start(&recorder_conn, "sync", run_id_short)?;
let signal = ShutdownSignal::new();
install_ctrl_c_handler(signal.clone());
install_ctrl_c_handler(rt_handle, signal.clone());
let start = std::time::Instant::now();
match run_sync(&config, options, Some(run_id_short), &signal).await {

View File

@@ -385,25 +385,11 @@ async fn check_ollama(config: Option<&Config>) -> OllamaCheck {
let base_url = &config.embedding.base_url;
let model = &config.embedding.model;
let client = match reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(2))
.build()
{
Ok(client) => client,
Err(e) => {
return OllamaCheck {
result: CheckResult {
status: CheckStatus::Warning,
message: Some(format!("Failed to build HTTP client: {e}")),
},
url: Some(base_url.clone()),
model: Some(model.clone()),
};
}
};
let client = crate::http::Client::with_timeout(std::time::Duration::from_secs(2));
let url = format!("{base_url}/api/tags");
match client.get(format!("{base_url}/api/tags")).send().await {
Ok(response) if response.status().is_success() => {
match client.get(&url, &[]).await {
Ok(response) if response.is_success() => {
#[derive(serde::Deserialize)]
struct TagsResponse {
models: Option<Vec<ModelInfo>>,
@@ -413,7 +399,7 @@ async fn check_ollama(config: Option<&Config>) -> OllamaCheck {
name: String,
}
match response.json::<TagsResponse>().await {
match response.json::<TagsResponse>() {
Ok(data) => {
let models = data.models.unwrap_or_default();
let model_names: Vec<&str> = models
@@ -462,7 +448,7 @@ async fn check_ollama(config: Option<&Config>) -> OllamaCheck {
Ok(response) => OllamaCheck {
result: CheckResult {
status: CheckStatus::Warning,
message: Some(format!("Ollama responded with {}", response.status())),
message: Some(format!("Ollama responded with {}", response.status)),
},
url: Some(base_url.clone()),
model: Some(model.clone()),

View File

@@ -271,11 +271,11 @@ async fn run_ingest_inner(
let token = config.gitlab.resolve_token()?;
let client = GitLabClient::new(
let client = Arc::new(GitLabClient::new(
&config.gitlab.base_url,
&token,
Some(config.sync.requests_per_second),
);
));
let projects = get_projects_to_sync(&conn, &config.projects, project_filter)?;
@@ -352,7 +352,7 @@ async fn run_ingest_inner(
let project_results: Vec<Result<ProjectIngestOutcome>> = stream::iter(projects.iter())
.map(|(local_project_id, gitlab_project_id, path)| {
let client = client.clone();
let client = Arc::clone(&client);
let db_path = db_path.clone();
let config = config.clone();
let resource_type = resource_type_owned.clone();

View File

@@ -1,16 +1,18 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use asupersync::runtime::RuntimeHandle;
/// Spawn a background task that listens for Ctrl+C.
///
/// First press: cancels `signal` and prints an interrupt message.
/// Second press: force-exits with code 130.
pub fn install_ctrl_c_handler(signal: ShutdownSignal) {
tokio::spawn(async move {
let _ = tokio::signal::ctrl_c().await;
pub fn install_ctrl_c_handler(handle: &RuntimeHandle, signal: ShutdownSignal) {
handle.spawn(async move {
let _ = asupersync::signal::ctrl_c().await;
eprintln!("\nInterrupted, finishing current batch... (Ctrl+C again to force quit)");
signal.cancel();
let _ = tokio::signal::ctrl_c().await;
let _ = asupersync::signal::ctrl_c().await;
std::process::exit(130);
});
}

View File

@@ -1,9 +1,8 @@
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tracing::warn;
use crate::core::error::{LoreError, Result};
use crate::http::Client;
pub struct OllamaConfig {
pub base_url: String,
@@ -51,17 +50,7 @@ struct ModelInfo {
impl OllamaClient {
pub fn new(config: OllamaConfig) -> Self {
let client = Client::builder()
.timeout(Duration::from_secs(config.timeout_secs))
.build()
.unwrap_or_else(|e| {
warn!(
error = %e,
"Failed to build configured Ollama HTTP client; falling back to default client"
);
Client::new()
});
let client = Client::with_timeout(Duration::from_secs(config.timeout_secs));
Self { client, config }
}
@@ -70,22 +59,17 @@ impl OllamaClient {
let response =
self.client
.get(&url)
.send()
.get(&url, &[])
.await
.map_err(|e| LoreError::OllamaUnavailable {
base_url: self.config.base_url.clone(),
detail: Some(format!("{e:?}")),
})?;
let tags: TagsResponse =
response
.json()
.await
.map_err(|e| LoreError::OllamaUnavailable {
base_url: self.config.base_url.clone(),
detail: Some(format!("{e:?}")),
})?;
let tags: TagsResponse = response.json().map_err(|e| LoreError::OllamaUnavailable {
base_url: self.config.base_url.clone(),
detail: Some(format!("{e:?}")),
})?;
let model_found = tags.models.iter().any(|m| {
m.name == self.config.model || m.name.starts_with(&format!("{}:", self.config.model))
@@ -110,49 +94,36 @@ impl OllamaClient {
let response = self
.client
.post(&url)
.json(&request)
.send()
.post_json(&url, &[], &request)
.await
.map_err(|e| LoreError::OllamaUnavailable {
base_url: self.config.base_url.clone(),
detail: Some(format!("{e:?}")),
})?;
let status = response.status();
if !status.is_success() {
let body = response.text().await.unwrap_or_default();
if !response.is_success() {
let status = response.status;
let body = response.text().unwrap_or_default();
return Err(LoreError::EmbeddingFailed {
document_id: 0,
reason: format!("HTTP {}: {}", status, body),
reason: format!("HTTP {status}: {body}"),
});
}
let embed_response: EmbedResponse =
response
.json()
.await
.map_err(|e| LoreError::EmbeddingFailed {
document_id: 0,
reason: format!("Failed to parse embed response: {}", e),
})?;
response.json().map_err(|e| LoreError::EmbeddingFailed {
document_id: 0,
reason: format!("Failed to parse embed response: {e}"),
})?;
Ok(embed_response.embeddings)
}
}
pub async fn check_ollama_health(base_url: &str) -> bool {
let client = Client::builder()
.timeout(Duration::from_secs(5))
.build()
.ok();
let Some(client) = client else {
return false;
};
let client = Client::with_timeout(Duration::from_secs(5));
let url = format!("{base_url}/api/tags");
client.get(&url).send().await.is_ok()
client.get(&url, &[]).await.is_ok_and(|r| r.is_success())
}
#[cfg(test)]

View File

@@ -1,20 +1,19 @@
use asupersync::time::{sleep, wall_now};
use async_stream::stream;
use chrono::{DateTime, Utc};
use futures::Stream;
use reqwest::header::{ACCEPT, HeaderMap, HeaderValue};
use reqwest::{Client, Response, StatusCode};
use std::pin::Pin;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::{Duration, Instant};
use tokio::time::sleep;
use tracing::{debug, warn};
use tracing::debug;
use super::types::{
GitLabDiscussion, GitLabIssue, GitLabIssueRef, GitLabLabelEvent, GitLabMergeRequest,
GitLabMilestoneEvent, GitLabMrDiff, GitLabProject, GitLabStateEvent, GitLabUser, GitLabVersion,
};
use crate::core::error::{LoreError, Result};
use crate::http;
struct RateLimiter {
last_request: Instant,
@@ -56,9 +55,8 @@ fn rand_jitter() -> u64 {
(n ^ nanos) % 50
}
#[derive(Clone)]
pub struct GitLabClient {
client: Client,
client: http::Client,
base_url: String,
token: String,
rate_limiter: Arc<Mutex<RateLimiter>>,
@@ -66,27 +64,8 @@ pub struct GitLabClient {
impl GitLabClient {
pub fn new(base_url: &str, token: &str, requests_per_second: Option<f64>) -> Self {
let mut headers = HeaderMap::new();
headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
let client = Client::builder()
.default_headers(headers.clone())
.timeout(Duration::from_secs(30))
.build()
.unwrap_or_else(|e| {
warn!(
error = %e,
"Failed to build configured HTTP client; falling back to default client with timeout"
);
Client::builder()
.default_headers(headers)
.timeout(Duration::from_secs(30))
.build()
.unwrap_or_else(|_| Client::new())
});
Self {
client,
client: http::Client::with_timeout(Duration::from_secs(30)),
base_url: base_url.trim_end_matches('/').to_string(),
token: token.to_string(),
rate_limiter: Arc::new(Mutex::new(RateLimiter::new(
@@ -142,24 +121,23 @@ impl GitLabClient {
limiter.check_delay()
};
if let Some(d) = delay {
sleep(d).await;
sleep(wall_now(), d).await;
}
debug!(url = %url, attempt, "GitLab request");
let response = self
.client
.get(&url)
.header("PRIVATE-TOKEN", &self.token)
.send()
.await
.map_err(|e| LoreError::GitLabNetworkError {
base_url: self.base_url.clone(),
kind: crate::core::error::NetworkErrorKind::Other,
detail: Some(format!("{e:?}")),
})?;
.get(
&url,
&[
("PRIVATE-TOKEN", self.token.as_str()),
("Accept", "application/json"),
],
)
.await?;
if response.status() == StatusCode::TOO_MANY_REQUESTS && attempt < Self::MAX_RETRIES {
if response.status == 429 && attempt < Self::MAX_RETRIES {
let retry_after = Self::parse_retry_after(&response);
tracing::info!(
path = %path,
@@ -168,7 +146,7 @@ impl GitLabClient {
status_code = 429u16,
"Rate limited, retrying"
);
sleep(Duration::from_secs(retry_after)).await;
sleep(wall_now(), Duration::from_secs(retry_after)).await;
continue;
}
@@ -177,60 +155,35 @@ impl GitLabClient {
}
self.handle_response(last_response.expect("retry loop ran at least once"), path)
.await
}
fn parse_retry_after(response: &Response) -> u64 {
fn parse_retry_after(response: &http::Response) -> u64 {
response
.headers()
.get("retry-after")
.and_then(|v| v.to_str().ok())
.header("retry-after")
.and_then(|s| s.parse().ok())
.unwrap_or(60)
}
async fn handle_response<T: serde::de::DeserializeOwned>(
fn handle_response<T: serde::de::DeserializeOwned>(
&self,
response: Response,
response: http::Response,
path: &str,
) -> Result<T> {
match response.status() {
StatusCode::UNAUTHORIZED => Err(LoreError::GitLabAuthFailed),
StatusCode::NOT_FOUND => Err(LoreError::GitLabNotFound {
match response.status {
401 => Err(LoreError::GitLabAuthFailed),
404 => Err(LoreError::GitLabNotFound {
resource: path.to_string(),
}),
StatusCode::TOO_MANY_REQUESTS => {
429 => {
let retry_after = Self::parse_retry_after(&response);
Err(LoreError::GitLabRateLimited { retry_after })
}
status if status.is_success() => {
let text = response
.text()
.await
.map_err(|e| LoreError::GitLabNetworkError {
base_url: self.base_url.clone(),
kind: crate::core::error::NetworkErrorKind::Other,
detail: Some(format!("{e:?}")),
})?;
serde_json::from_str(&text).map_err(|e| {
let preview = if text.len() > 500 {
&text[..text.floor_char_boundary(500)]
} else {
&text
};
LoreError::Other(format!(
"Failed to decode response from {path}: {e}\nResponse preview: {preview}"
))
})
}
status => Err(LoreError::Other(format!(
"GitLab API error: {} {}",
status.as_u16(),
status.canonical_reason().unwrap_or("Unknown")
_ if response.is_success() => response.json::<T>().map_err(|e| {
LoreError::Other(format!("Failed to decode response from {path}: {e}"))
}),
s => Err(LoreError::Other(format!(
"GitLab API error: {s} {}",
response.reason
))),
}
}
@@ -278,9 +231,7 @@ impl GitLabClient {
yield Ok(issue);
}
let next_page = headers
.get("x-next-page")
.and_then(|v| v.to_str().ok())
let next_page = header_value(&headers, "x-next-page")
.and_then(|s| s.parse::<u32>().ok());
match next_page {
@@ -334,9 +285,7 @@ impl GitLabClient {
yield Ok(discussion);
}
let next_page = headers
.get("x-next-page")
.and_then(|v| v.to_str().ok())
let next_page = header_value(&headers, "x-next-page")
.and_then(|s| s.parse::<u32>().ok());
match next_page {
@@ -439,10 +388,7 @@ impl GitLabClient {
.await?;
let link_next = parse_link_header_next(&headers);
let x_next_page = headers
.get("x-next-page")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u32>().ok());
let x_next_page = header_value(&headers, "x-next-page").and_then(|s| s.parse::<u32>().ok());
let full_page = items.len() as u32 == per_page;
let (next_page, is_last_page) = match (link_next.is_some(), x_next_page, full_page) {
@@ -490,9 +436,7 @@ impl GitLabClient {
}
let link_next = parse_link_header_next(&headers);
let x_next_page = headers
.get("x-next-page")
.and_then(|v| v.to_str().ok())
let x_next_page = header_value(&headers, "x-next-page")
.and_then(|s| s.parse::<u32>().ok());
let should_continue = match (link_next.is_some(), x_next_page, full_page) {
@@ -528,7 +472,7 @@ impl GitLabClient {
&self,
path: &str,
params: &[(&str, String)],
) -> Result<(T, HeaderMap)> {
) -> Result<(T, Vec<(String, String)>)> {
let url = format!("{}{}", self.base_url, path);
let mut last_response = None;
@@ -544,25 +488,24 @@ impl GitLabClient {
limiter.check_delay()
};
if let Some(d) = delay {
sleep(d).await;
sleep(wall_now(), d).await;
}
debug!(url = %url, ?params, attempt, "GitLab paginated request");
let response = self
.client
.get(&url)
.query(params)
.header("PRIVATE-TOKEN", &self.token)
.send()
.await
.map_err(|e| LoreError::GitLabNetworkError {
base_url: self.base_url.clone(),
kind: crate::core::error::NetworkErrorKind::Other,
detail: Some(format!("{e:?}")),
})?;
.get_with_query(
&url,
params,
&[
("PRIVATE-TOKEN", self.token.as_str()),
("Accept", "application/json"),
],
)
.await?;
if response.status() == StatusCode::TOO_MANY_REQUESTS && attempt < Self::MAX_RETRIES {
if response.status == 429 && attempt < Self::MAX_RETRIES {
let retry_after = Self::parse_retry_after(&response);
tracing::info!(
path = %path,
@@ -571,7 +514,7 @@ impl GitLabClient {
status_code = 429u16,
"Rate limited, retrying"
);
sleep(Duration::from_secs(retry_after)).await;
sleep(wall_now(), Duration::from_secs(retry_after)).await;
continue;
}
@@ -580,8 +523,8 @@ impl GitLabClient {
}
let response = last_response.expect("retry loop ran at least once");
let headers = response.headers().clone();
let body = self.handle_response(response, path).await?;
let headers = response.headers.clone();
let body = self.handle_response(response, path)?;
Ok((body, headers))
}
}
@@ -640,10 +583,8 @@ impl GitLabClient {
let full_page = items.len() as u32 == per_page;
results.extend(items);
let next_page = headers
.get("x-next-page")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u32>().ok());
let next_page =
header_value(&headers, "x-next-page").and_then(|s| s.parse::<u32>().ok());
match next_page {
Some(next) if next > page => page = next,
@@ -788,22 +729,26 @@ pub struct MergeRequestPage {
pub is_last_page: bool,
}
fn parse_link_header_next(headers: &HeaderMap) -> Option<String> {
fn header_value<'a>(headers: &'a [(String, String)], name: &str) -> Option<&'a str> {
headers
.get("link")
.and_then(|v| v.to_str().ok())
.and_then(|link_str| {
for part in link_str.split(',') {
let part = part.trim();
if (part.contains("rel=\"next\"") || part.contains("rel=next"))
&& let Some(start) = part.find('<')
&& let Some(end) = part.find('>')
{
return Some(part[start + 1..end].to_string());
}
.iter()
.find(|(k, _)| k.eq_ignore_ascii_case(name))
.map(|(_, v)| v.as_str())
}
fn parse_link_header_next(headers: &[(String, String)]) -> Option<String> {
header_value(headers, "link").and_then(|link_str| {
for part in link_str.split(',') {
let part = part.trim();
if (part.contains("rel=\"next\"") || part.contains("rel=next"))
&& let Some(start) = part.find('<')
&& let Some(end) = part.find('>')
{
return Some(part[start + 1..end].to_string());
}
None
})
}
None
})
}
fn coalesce_not_found<T>(result: Result<Vec<T>>) -> Result<Vec<T>> {
@@ -863,13 +808,10 @@ mod tests {
#[test]
fn parse_link_header_extracts_next_url() {
let mut headers = HeaderMap::new();
headers.insert(
"link",
HeaderValue::from_static(
r#"<https://gitlab.example.com/api/v4/projects/1/merge_requests?page=2>; rel="next", <https://gitlab.example.com/api/v4/projects/1/merge_requests?page=5>; rel="last""#,
),
);
let headers = vec![(
"link".to_string(),
r#"<https://gitlab.example.com/api/v4/projects/1/merge_requests?page=2>; rel="next", <https://gitlab.example.com/api/v4/projects/1/merge_requests?page=5>; rel="last""#.to_string(),
)];
let result = parse_link_header_next(&headers);
assert_eq!(
@@ -880,11 +822,10 @@ mod tests {
#[test]
fn parse_link_header_handles_unquoted_rel() {
let mut headers = HeaderMap::new();
headers.insert(
"link",
HeaderValue::from_static(r#"<https://example.com/next>; rel=next"#),
);
let headers = vec![(
"link".to_string(),
r#"<https://example.com/next>; rel=next"#.to_string(),
)];
let result = parse_link_header_next(&headers);
assert_eq!(result, Some("https://example.com/next".to_string()));
@@ -892,11 +833,10 @@ mod tests {
#[test]
fn parse_link_header_returns_none_when_no_next() {
let mut headers = HeaderMap::new();
headers.insert(
"link",
HeaderValue::from_static(r#"<https://example.com/last>; rel="last""#),
);
let headers = vec![(
"link".to_string(),
r#"<https://example.com/last>; rel="last""#.to_string(),
)];
let result = parse_link_header_next(&headers);
assert!(result.is_none());
@@ -904,7 +844,7 @@ mod tests {
#[test]
fn parse_link_header_returns_none_when_missing() {
let headers = HeaderMap::new();
let headers: Vec<(String, String)> = vec![];
let result = parse_link_header_next(&headers);
assert!(result.is_none());
}

View File

@@ -1,10 +1,10 @@
use reqwest::Client;
use serde::Deserialize;
use serde_json::Value;
use std::time::{Duration, SystemTime};
use tracing::warn;
use crate::core::error::LoreError;
use crate::http::Client;
pub struct GraphqlClient {
http: Client,
@@ -21,13 +21,8 @@ pub struct GraphqlQueryResult {
impl GraphqlClient {
pub fn new(base_url: &str, token: &str) -> Self {
let http = Client::builder()
.timeout(Duration::from_secs(30))
.build()
.unwrap_or_else(|_| Client::new());
Self {
http,
http: Client::with_timeout(Duration::from_secs(30)),
base_url: base_url.trim_end_matches('/').to_string(),
token: token.to_string(),
}
@@ -45,23 +40,13 @@ impl GraphqlClient {
"variables": variables,
});
let bearer = format!("Bearer {}", self.token);
let response = self
.http
.post(&url)
.header("Authorization", format!("Bearer {}", self.token))
.header("Content-Type", "application/json")
.json(&body)
.send()
.await
.map_err(|e| LoreError::GitLabNetworkError {
base_url: self.base_url.clone(),
kind: crate::core::error::NetworkErrorKind::Other,
detail: Some(format!("{e:?}")),
})?;
.post_json(&url, &[("Authorization", bearer.as_str())], &body)
.await?;
let status = response.status();
match status.as_u16() {
match response.status {
401 | 403 => return Err(LoreError::GitLabAuthFailed),
404 => {
return Err(LoreError::GitLabNotFound {
@@ -73,14 +58,13 @@ impl GraphqlClient {
return Err(LoreError::GitLabRateLimited { retry_after });
}
s if s >= 400 => {
return Err(LoreError::Other(format!("GraphQL HTTP {status}")));
return Err(LoreError::Other(format!("GraphQL HTTP {s}")));
}
_ => {}
}
let json: Value = response
.json()
.await
.map_err(|e| LoreError::Other(format!("Failed to parse GraphQL response: {e}")))?;
let errors = json.get("errors").and_then(|e| e.as_array());
@@ -117,12 +101,8 @@ impl GraphqlClient {
}
}
fn parse_retry_after(response: &reqwest::Response) -> u64 {
let header = match response
.headers()
.get("retry-after")
.and_then(|v| v.to_str().ok())
{
fn parse_retry_after(response: &crate::http::Response) -> u64 {
let header = match response.header("retry-after") {
Some(s) => s,
None => return 60,
};

318
src/http.rs Normal file
View File

@@ -0,0 +1,318 @@
use std::time::Duration;
use asupersync::http::h1::{
ClientError, HttpClient, HttpClientConfig, Method, Response as RawResponse,
};
use asupersync::http::pool::PoolConfig;
use asupersync::time::{timeout, wall_now};
use serde::Serialize;
use serde::de::DeserializeOwned;
use crate::core::error::{LoreError, NetworkErrorKind, Result};
const MAX_RESPONSE_BODY_BYTES: usize = 64 * 1024 * 1024; // 64 MiB
pub struct Client {
inner: HttpClient,
timeout: Duration,
}
#[derive(Debug)]
pub struct Response {
pub status: u16,
pub reason: String,
pub headers: Vec<(String, String)>,
body: Vec<u8>,
}
impl Client {
pub fn with_timeout(timeout: Duration) -> Self {
Self {
inner: HttpClient::with_config(HttpClientConfig {
pool_config: PoolConfig {
max_connections_per_host: 6,
max_total_connections: 100,
idle_timeout: Duration::from_secs(90),
..Default::default()
},
..Default::default()
}),
timeout,
}
}
pub async fn get(&self, url: &str, headers: &[(&str, &str)]) -> Result<Response> {
self.execute(Method::Get, url, headers, Vec::new()).await
}
pub async fn get_with_query(
&self,
url: &str,
params: &[(&str, String)],
headers: &[(&str, &str)],
) -> Result<Response> {
let full_url = append_query_params(url, params);
self.execute(Method::Get, &full_url, headers, Vec::new())
.await
}
pub async fn post_json<T: Serialize>(
&self,
url: &str,
headers: &[(&str, &str)],
body: &T,
) -> Result<Response> {
let body_bytes = serde_json::to_vec(body)
.map_err(|e| LoreError::Other(format!("JSON serialization failed: {e}")))?;
let mut all_headers: Vec<(&str, &str)> = headers.to_vec();
all_headers.push(("Content-Type", "application/json"));
self.execute(Method::Post, url, &all_headers, body_bytes)
.await
}
async fn execute(
&self,
method: Method,
url: &str,
headers: &[(&str, &str)],
body: Vec<u8>,
) -> Result<Response> {
let header_tuples: Vec<(String, String)> = headers
.iter()
.map(|(k, v)| ((*k).to_owned(), (*v).to_owned()))
.collect();
let raw: RawResponse = timeout(
wall_now(),
self.timeout,
self.inner.request(method, url, header_tuples, body),
)
.await
.map_err(|_| LoreError::GitLabNetworkError {
base_url: url.to_string(),
kind: NetworkErrorKind::Timeout,
detail: Some(format!("Request timed out after {:?}", self.timeout)),
})?
.map_err(|e| LoreError::GitLabNetworkError {
base_url: url.to_string(),
kind: classify_transport_error(&e),
detail: Some(format!("{e:?}")),
})?;
if raw.body.len() > MAX_RESPONSE_BODY_BYTES {
return Err(LoreError::Other(format!(
"Response body too large: {} bytes (max {MAX_RESPONSE_BODY_BYTES})",
raw.body.len(),
)));
}
Ok(Response {
status: raw.status,
reason: raw.reason,
headers: raw.headers,
body: raw.body,
})
}
}
impl Response {
pub fn is_success(&self) -> bool {
(200..300).contains(&self.status)
}
pub fn json<T: DeserializeOwned>(&self) -> Result<T> {
serde_json::from_slice(&self.body)
.map_err(|e| LoreError::Other(format!("JSON parse error: {e}")))
}
pub fn text(self) -> Result<String> {
String::from_utf8(self.body)
.map_err(|e| LoreError::Other(format!("UTF-8 decode error: {e}")))
}
pub fn header(&self, name: &str) -> Option<&str> {
self.headers
.iter()
.find(|(k, _)| k.eq_ignore_ascii_case(name))
.map(|(_, v)| v.as_str())
}
pub fn headers_all(&self, name: &str) -> Vec<&str> {
self.headers
.iter()
.filter(|(k, _)| k.eq_ignore_ascii_case(name))
.map(|(_, v)| v.as_str())
.collect()
}
}
fn classify_transport_error(e: &ClientError) -> NetworkErrorKind {
match e {
ClientError::DnsError(_) => NetworkErrorKind::DnsResolution,
ClientError::ConnectError(_) => NetworkErrorKind::ConnectionRefused,
ClientError::TlsError(_) => NetworkErrorKind::Tls,
_ => NetworkErrorKind::Other,
}
}
fn append_query_params(url: &str, params: &[(&str, String)]) -> String {
if params.is_empty() {
return url.to_string();
}
let query: String = params
.iter()
.map(|(k, v)| format!("{}={}", urlencoding::encode(k), urlencoding::encode(v)))
.collect::<Vec<_>>()
.join("&");
let (base, fragment) = match url.split_once('#') {
Some((b, f)) => (b, Some(f)),
None => (url, None),
};
let with_query = if base.contains('?') {
format!("{base}&{query}")
} else {
format!("{base}?{query}")
};
match fragment {
Some(f) => format!("{with_query}#{f}"),
None => with_query,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn append_query_params_empty_returns_unchanged() {
let url = "https://example.com/api";
assert_eq!(append_query_params(url, &[]), url);
}
#[test]
fn append_query_params_adds_question_mark() {
let result = append_query_params(
"https://example.com/api",
&[("page", "1".into()), ("per_page", "20".into())],
);
assert_eq!(result, "https://example.com/api?page=1&per_page=20");
}
#[test]
fn append_query_params_existing_query_uses_ampersand() {
let result = append_query_params(
"https://example.com/api?state=opened",
&[("page", "2".into())],
);
assert_eq!(result, "https://example.com/api?state=opened&page=2");
}
#[test]
fn append_query_params_preserves_fragment() {
let result =
append_query_params("https://example.com/api#section", &[("key", "val".into())]);
assert_eq!(result, "https://example.com/api?key=val#section");
}
#[test]
fn append_query_params_encodes_special_chars() {
let result =
append_query_params("https://example.com/api", &[("labels[]", "bug fix".into())]);
assert_eq!(result, "https://example.com/api?labels%5B%5D=bug%20fix");
}
#[test]
fn append_query_params_repeated_keys() {
let result = append_query_params(
"https://example.com/api",
&[("labels[]", "bug".into()), ("labels[]", "urgent".into())],
);
assert_eq!(
result,
"https://example.com/api?labels%5B%5D=bug&labels%5B%5D=urgent"
);
}
#[test]
fn response_header_case_insensitive() {
let resp = Response {
status: 200,
reason: "OK".into(),
headers: vec![
("Content-Type".into(), "application/json".into()),
("X-Page".into(), "1".into()),
],
body: Vec::new(),
};
assert_eq!(resp.header("content-type"), Some("application/json"));
assert_eq!(resp.header("CONTENT-TYPE"), Some("application/json"));
assert_eq!(resp.header("Content-Type"), Some("application/json"));
assert_eq!(resp.header("x-page"), Some("1"));
assert_eq!(resp.header("X-Missing"), None);
}
#[test]
fn response_headers_all_returns_multiple_values() {
let resp = Response {
status: 200,
reason: "OK".into(),
headers: vec![
("Link".into(), "<url1>; rel=\"next\"".into()),
("Link".into(), "<url2>; rel=\"last\"".into()),
("Content-Type".into(), "application/json".into()),
],
body: Vec::new(),
};
let links = resp.headers_all("link");
assert_eq!(links.len(), 2);
assert_eq!(links[0], "<url1>; rel=\"next\"");
assert_eq!(links[1], "<url2>; rel=\"last\"");
}
#[test]
fn response_is_success_range() {
for status in [200, 201, 204, 299] {
let resp = Response {
status,
reason: String::new(),
headers: Vec::new(),
body: Vec::new(),
};
assert!(resp.is_success(), "status {status} should be success");
}
for status in [100, 199, 300, 301, 400, 404, 500] {
let resp = Response {
status,
reason: String::new(),
headers: Vec::new(),
body: Vec::new(),
};
assert!(!resp.is_success(), "status {status} should not be success");
}
}
#[test]
fn classify_dns_error() {
let err = ClientError::DnsError(std::io::Error::other("dns failed"));
assert_eq!(
classify_transport_error(&err),
NetworkErrorKind::DnsResolution
);
}
#[test]
fn classify_connect_error() {
let err = ClientError::ConnectError(std::io::Error::other("refused"));
assert_eq!(
classify_transport_error(&err),
NetworkErrorKind::ConnectionRefused
);
}
#[test]
fn classify_tls_error() {
let err = ClientError::TlsError("bad cert".into());
assert_eq!(classify_transport_error(&err), NetworkErrorKind::Tls);
}
}

View File

@@ -3,6 +3,7 @@ pub mod core;
pub mod documents;
pub mod embedding;
pub mod gitlab;
pub mod http;
pub mod ingestion;
pub mod search;
#[cfg(test)]

View File

@@ -52,8 +52,18 @@ use lore::core::trace::run_trace;
use lore::ingestion::storage::queue::release_all_locked_jobs;
use lore::ingestion::storage::sync_run::SyncRunRecorder;
#[tokio::main]
async fn main() {
fn main() {
let rt = match asupersync::runtime::RuntimeBuilder::new().build() {
Ok(rt) => rt,
Err(e) => {
eprintln!("Failed to build async runtime: {e}");
std::process::exit(1);
}
};
let rt_handle = rt.handle();
rt.block_on(async {
#[cfg(unix)]
unsafe {
libc::signal(libc::SIGPIPE, libc::SIG_DFL);
@@ -245,9 +255,9 @@ async fn main() {
.await
}
Some(Commands::Stats(args)) => handle_stats(cli.config.as_deref(), args, robot_mode).await,
Some(Commands::Embed(args)) => handle_embed(cli.config.as_deref(), args, robot_mode).await,
Some(Commands::Embed(args)) => handle_embed(cli.config.as_deref(), args, robot_mode, &rt_handle).await,
Some(Commands::Sync(args)) => {
handle_sync_cmd(cli.config.as_deref(), args, robot_mode, &metrics_layer).await
handle_sync_cmd(cli.config.as_deref(), args, robot_mode, &metrics_layer, &rt_handle).await
}
Some(Commands::Ingest(args)) => {
handle_ingest(
@@ -256,6 +266,7 @@ async fn main() {
robot_mode,
quiet,
&metrics_layer,
&rt_handle,
)
.await
}
@@ -414,6 +425,7 @@ async fn main() {
if let Err(e) = result {
handle_error(e, robot_mode);
}
}); // rt.block_on
}
include!("app/dispatch.rs");

View File

@@ -85,343 +85,384 @@ fn insert_note(
conn.last_insert_rowid()
}
#[tokio::test]
async fn test_seed_empty_query_returns_empty() {
let conn = setup_test_db();
let result = seed_timeline(&conn, None, "", None, None, 50, 10)
.await
.unwrap();
assert!(result.seed_entities.is_empty());
assert!(result.evidence_notes.is_empty());
#[test]
fn test_seed_empty_query_returns_empty() {
let rt = asupersync::runtime::RuntimeBuilder::new().build().unwrap();
rt.block_on(async {
let conn = setup_test_db();
let result = seed_timeline(&conn, None, "", None, None, 50, 10)
.await
.unwrap();
assert!(result.seed_entities.is_empty());
assert!(result.evidence_notes.is_empty());
});
}
#[tokio::test]
async fn test_seed_no_matches_returns_empty() {
let conn = setup_test_db();
let project_id = insert_test_project(&conn);
let issue_id = insert_test_issue(&conn, project_id, 1);
insert_document(
&conn,
"issue",
issue_id,
project_id,
"unrelated content here",
);
#[test]
fn test_seed_no_matches_returns_empty() {
let rt = asupersync::runtime::RuntimeBuilder::new().build().unwrap();
rt.block_on(async {
let conn = setup_test_db();
let project_id = insert_test_project(&conn);
let issue_id = insert_test_issue(&conn, project_id, 1);
insert_document(
&conn,
"issue",
issue_id,
project_id,
"unrelated content here",
);
let result = seed_timeline(&conn, None, "nonexistent_xyzzy_query", None, None, 50, 10)
.await
.unwrap();
assert!(result.seed_entities.is_empty());
let result = seed_timeline(&conn, None, "nonexistent_xyzzy_query", None, None, 50, 10)
.await
.unwrap();
assert!(result.seed_entities.is_empty());
});
}
#[tokio::test]
async fn test_seed_finds_issue() {
let conn = setup_test_db();
let project_id = insert_test_project(&conn);
let issue_id = insert_test_issue(&conn, project_id, 42);
insert_document(
&conn,
"issue",
issue_id,
project_id,
"authentication error in login flow",
);
#[test]
fn test_seed_finds_issue() {
let rt = asupersync::runtime::RuntimeBuilder::new().build().unwrap();
rt.block_on(async {
let conn = setup_test_db();
let project_id = insert_test_project(&conn);
let issue_id = insert_test_issue(&conn, project_id, 42);
insert_document(
&conn,
"issue",
issue_id,
project_id,
"authentication error in login flow",
);
let result = seed_timeline(&conn, None, "authentication", None, None, 50, 10)
.await
.unwrap();
assert_eq!(result.seed_entities.len(), 1);
assert_eq!(result.seed_entities[0].entity_type, "issue");
assert_eq!(result.seed_entities[0].entity_iid, 42);
assert_eq!(result.seed_entities[0].project_path, "group/project");
let result = seed_timeline(&conn, None, "authentication", None, None, 50, 10)
.await
.unwrap();
assert_eq!(result.seed_entities.len(), 1);
assert_eq!(result.seed_entities[0].entity_type, "issue");
assert_eq!(result.seed_entities[0].entity_iid, 42);
assert_eq!(result.seed_entities[0].project_path, "group/project");
});
}
#[tokio::test]
async fn test_seed_finds_mr() {
let conn = setup_test_db();
let project_id = insert_test_project(&conn);
let mr_id = insert_test_mr(&conn, project_id, 99);
insert_document(
&conn,
"merge_request",
mr_id,
project_id,
"fix authentication bug",
);
#[test]
fn test_seed_finds_mr() {
let rt = asupersync::runtime::RuntimeBuilder::new().build().unwrap();
rt.block_on(async {
let conn = setup_test_db();
let project_id = insert_test_project(&conn);
let mr_id = insert_test_mr(&conn, project_id, 99);
insert_document(
&conn,
"merge_request",
mr_id,
project_id,
"fix authentication bug",
);
let result = seed_timeline(&conn, None, "authentication", None, None, 50, 10)
.await
.unwrap();
assert_eq!(result.seed_entities.len(), 1);
assert_eq!(result.seed_entities[0].entity_type, "merge_request");
assert_eq!(result.seed_entities[0].entity_iid, 99);
let result = seed_timeline(&conn, None, "authentication", None, None, 50, 10)
.await
.unwrap();
assert_eq!(result.seed_entities.len(), 1);
assert_eq!(result.seed_entities[0].entity_type, "merge_request");
assert_eq!(result.seed_entities[0].entity_iid, 99);
});
}
#[tokio::test]
async fn test_seed_deduplicates_entities() {
let conn = setup_test_db();
let project_id = insert_test_project(&conn);
let issue_id = insert_test_issue(&conn, project_id, 10);
#[test]
fn test_seed_deduplicates_entities() {
let rt = asupersync::runtime::RuntimeBuilder::new().build().unwrap();
rt.block_on(async {
let conn = setup_test_db();
let project_id = insert_test_project(&conn);
let issue_id = insert_test_issue(&conn, project_id, 10);
// Two documents referencing the same issue
insert_document(
&conn,
"issue",
issue_id,
project_id,
"authentication error first doc",
);
let disc_id = insert_discussion(&conn, project_id, Some(issue_id), None);
insert_document(
&conn,
"discussion",
disc_id,
project_id,
"authentication error second doc",
);
let result = seed_timeline(&conn, None, "authentication", None, None, 50, 10)
.await
.unwrap();
// Should deduplicate: both map to the same issue
assert_eq!(result.seed_entities.len(), 1);
assert_eq!(result.seed_entities[0].entity_iid, 10);
}
#[tokio::test]
async fn test_seed_resolves_discussion_to_parent() {
let conn = setup_test_db();
let project_id = insert_test_project(&conn);
let issue_id = insert_test_issue(&conn, project_id, 7);
let disc_id = insert_discussion(&conn, project_id, Some(issue_id), None);
insert_document(
&conn,
"discussion",
disc_id,
project_id,
"deployment pipeline failed",
);
let result = seed_timeline(&conn, None, "deployment", None, None, 50, 10)
.await
.unwrap();
assert_eq!(result.seed_entities.len(), 1);
assert_eq!(result.seed_entities[0].entity_type, "issue");
assert_eq!(result.seed_entities[0].entity_iid, 7);
}
#[tokio::test]
async fn test_seed_evidence_capped() {
let conn = setup_test_db();
let project_id = insert_test_project(&conn);
let issue_id = insert_test_issue(&conn, project_id, 1);
// Create 15 discussion documents with notes about "deployment"
for i in 0..15 {
// Two documents referencing the same issue
insert_document(
&conn,
"issue",
issue_id,
project_id,
"authentication error first doc",
);
let disc_id = insert_discussion(&conn, project_id, Some(issue_id), None);
insert_document(
&conn,
"discussion",
disc_id,
project_id,
&format!("deployment issue number {i}"),
"authentication error second doc",
);
insert_note(
let result = seed_timeline(&conn, None, "authentication", None, None, 50, 10)
.await
.unwrap();
// Should deduplicate: both map to the same issue
assert_eq!(result.seed_entities.len(), 1);
assert_eq!(result.seed_entities[0].entity_iid, 10);
});
}
#[test]
fn test_seed_resolves_discussion_to_parent() {
let rt = asupersync::runtime::RuntimeBuilder::new().build().unwrap();
rt.block_on(async {
let conn = setup_test_db();
let project_id = insert_test_project(&conn);
let issue_id = insert_test_issue(&conn, project_id, 7);
let disc_id = insert_discussion(&conn, project_id, Some(issue_id), None);
insert_document(
&conn,
"discussion",
disc_id,
project_id,
&format!("deployment note {i}"),
false,
"deployment pipeline failed",
);
}
let result = seed_timeline(&conn, None, "deployment", None, None, 50, 5)
.await
.unwrap();
assert!(result.evidence_notes.len() <= 5);
let result = seed_timeline(&conn, None, "deployment", None, None, 50, 10)
.await
.unwrap();
assert_eq!(result.seed_entities.len(), 1);
assert_eq!(result.seed_entities[0].entity_type, "issue");
assert_eq!(result.seed_entities[0].entity_iid, 7);
});
}
#[tokio::test]
async fn test_seed_evidence_snippet_truncated() {
let conn = setup_test_db();
let project_id = insert_test_project(&conn);
let issue_id = insert_test_issue(&conn, project_id, 1);
let disc_id = insert_discussion(&conn, project_id, Some(issue_id), None);
insert_document(
&conn,
"discussion",
disc_id,
project_id,
"deployment configuration",
);
#[test]
fn test_seed_evidence_capped() {
let rt = asupersync::runtime::RuntimeBuilder::new().build().unwrap();
rt.block_on(async {
let conn = setup_test_db();
let project_id = insert_test_project(&conn);
let issue_id = insert_test_issue(&conn, project_id, 1);
let long_body = "x".repeat(500);
insert_note(&conn, disc_id, project_id, &long_body, false);
// Create 15 discussion documents with notes about "deployment"
for i in 0..15 {
let disc_id = insert_discussion(&conn, project_id, Some(issue_id), None);
insert_document(
&conn,
"discussion",
disc_id,
project_id,
&format!("deployment issue number {i}"),
);
insert_note(
&conn,
disc_id,
project_id,
&format!("deployment note {i}"),
false,
);
}
let result = seed_timeline(&conn, None, "deployment", None, None, 50, 10)
.await
.unwrap();
assert!(!result.evidence_notes.is_empty());
if let TimelineEventType::NoteEvidence { snippet, .. } = &result.evidence_notes[0].event_type {
assert!(snippet.chars().count() <= 200);
} else {
panic!("Expected NoteEvidence");
}
let result = seed_timeline(&conn, None, "deployment", None, None, 50, 5)
.await
.unwrap();
assert!(result.evidence_notes.len() <= 5);
});
}
#[tokio::test]
async fn test_seed_respects_project_filter() {
let conn = setup_test_db();
let project_id = insert_test_project(&conn);
#[test]
fn test_seed_evidence_snippet_truncated() {
let rt = asupersync::runtime::RuntimeBuilder::new().build().unwrap();
rt.block_on(async {
let conn = setup_test_db();
let project_id = insert_test_project(&conn);
let issue_id = insert_test_issue(&conn, project_id, 1);
let disc_id = insert_discussion(&conn, project_id, Some(issue_id), None);
insert_document(
&conn,
"discussion",
disc_id,
project_id,
"deployment configuration",
);
// Insert a second project
conn.execute(
"INSERT INTO projects (gitlab_project_id, path_with_namespace, web_url) VALUES (2, 'other/repo', 'https://gitlab.com/other/repo')",
[],
)
.unwrap();
let project2_id = conn.last_insert_rowid();
let long_body = "x".repeat(500);
insert_note(&conn, disc_id, project_id, &long_body, false);
let issue1_id = insert_test_issue(&conn, project_id, 1);
insert_document(
&conn,
"issue",
issue1_id,
project_id,
"authentication error",
);
let result = seed_timeline(&conn, None, "deployment", None, None, 50, 10)
.await
.unwrap();
assert!(!result.evidence_notes.is_empty());
if let TimelineEventType::NoteEvidence { snippet, .. } =
&result.evidence_notes[0].event_type
{
assert!(snippet.chars().count() <= 200);
} else {
panic!("Expected NoteEvidence");
}
});
}
let issue2_id = insert_test_issue(&conn, project2_id, 2);
insert_document(
&conn,
"issue",
issue2_id,
project2_id,
"authentication error",
);
#[test]
fn test_seed_respects_project_filter() {
let rt = asupersync::runtime::RuntimeBuilder::new().build().unwrap();
rt.block_on(async {
let conn = setup_test_db();
let project_id = insert_test_project(&conn);
// Filter to project 1 only
let result = seed_timeline(
&conn,
None,
"authentication",
Some(project_id),
None,
50,
10,
)
.await
.unwrap();
assert_eq!(result.seed_entities.len(), 1);
assert_eq!(result.seed_entities[0].project_path, "group/project");
// Insert a second project
conn.execute(
"INSERT INTO projects (gitlab_project_id, path_with_namespace, web_url) VALUES (2, 'other/repo', 'https://gitlab.com/other/repo')",
[],
)
.unwrap();
let project2_id = conn.last_insert_rowid();
let issue1_id = insert_test_issue(&conn, project_id, 1);
insert_document(
&conn,
"issue",
issue1_id,
project_id,
"authentication error",
);
let issue2_id = insert_test_issue(&conn, project2_id, 2);
insert_document(
&conn,
"issue",
issue2_id,
project2_id,
"authentication error",
);
// Filter to project 1 only
let result = seed_timeline(
&conn,
None,
"authentication",
Some(project_id),
None,
50,
10,
)
.await
.unwrap();
assert_eq!(result.seed_entities.len(), 1);
assert_eq!(result.seed_entities[0].project_path, "group/project");
});
}
// ─── Matched discussion tests ───────────────────────────────────────────────
#[tokio::test]
async fn test_seed_captures_matched_discussions_from_discussion_doc() {
let conn = setup_test_db();
let project_id = insert_test_project(&conn);
let issue_id = insert_test_issue(&conn, project_id, 1);
let disc_id = insert_discussion(&conn, project_id, Some(issue_id), None);
insert_document(
&conn,
"discussion",
disc_id,
project_id,
"deployment pipeline authentication",
);
#[test]
fn test_seed_captures_matched_discussions_from_discussion_doc() {
let rt = asupersync::runtime::RuntimeBuilder::new().build().unwrap();
rt.block_on(async {
let conn = setup_test_db();
let project_id = insert_test_project(&conn);
let issue_id = insert_test_issue(&conn, project_id, 1);
let disc_id = insert_discussion(&conn, project_id, Some(issue_id), None);
insert_document(
&conn,
"discussion",
disc_id,
project_id,
"deployment pipeline authentication",
);
let result = seed_timeline(&conn, None, "deployment", None, None, 50, 10)
.await
.unwrap();
assert_eq!(result.matched_discussions.len(), 1);
assert_eq!(result.matched_discussions[0].discussion_id, disc_id);
assert_eq!(result.matched_discussions[0].entity_type, "issue");
assert_eq!(result.matched_discussions[0].entity_id, issue_id);
let result = seed_timeline(&conn, None, "deployment", None, None, 50, 10)
.await
.unwrap();
assert_eq!(result.matched_discussions.len(), 1);
assert_eq!(result.matched_discussions[0].discussion_id, disc_id);
assert_eq!(result.matched_discussions[0].entity_type, "issue");
assert_eq!(result.matched_discussions[0].entity_id, issue_id);
});
}
#[tokio::test]
async fn test_seed_captures_matched_discussions_from_note_doc() {
let conn = setup_test_db();
let project_id = insert_test_project(&conn);
let issue_id = insert_test_issue(&conn, project_id, 1);
let disc_id = insert_discussion(&conn, project_id, Some(issue_id), None);
let note_id = insert_note(&conn, disc_id, project_id, "note about deployment", false);
insert_document(
&conn,
"note",
note_id,
project_id,
"deployment configuration details",
);
#[test]
fn test_seed_captures_matched_discussions_from_note_doc() {
let rt = asupersync::runtime::RuntimeBuilder::new().build().unwrap();
rt.block_on(async {
let conn = setup_test_db();
let project_id = insert_test_project(&conn);
let issue_id = insert_test_issue(&conn, project_id, 1);
let disc_id = insert_discussion(&conn, project_id, Some(issue_id), None);
let note_id = insert_note(&conn, disc_id, project_id, "note about deployment", false);
insert_document(
&conn,
"note",
note_id,
project_id,
"deployment configuration details",
);
let result = seed_timeline(&conn, None, "deployment", None, None, 50, 10)
.await
.unwrap();
assert_eq!(
result.matched_discussions.len(),
1,
"Note doc should resolve to parent discussion"
);
assert_eq!(result.matched_discussions[0].discussion_id, disc_id);
assert_eq!(result.matched_discussions[0].entity_type, "issue");
let result = seed_timeline(&conn, None, "deployment", None, None, 50, 10)
.await
.unwrap();
assert_eq!(
result.matched_discussions.len(),
1,
"Note doc should resolve to parent discussion"
);
assert_eq!(result.matched_discussions[0].discussion_id, disc_id);
assert_eq!(result.matched_discussions[0].entity_type, "issue");
});
}
#[tokio::test]
async fn test_seed_deduplicates_matched_discussions() {
let conn = setup_test_db();
let project_id = insert_test_project(&conn);
let issue_id = insert_test_issue(&conn, project_id, 1);
let disc_id = insert_discussion(&conn, project_id, Some(issue_id), None);
#[test]
fn test_seed_deduplicates_matched_discussions() {
let rt = asupersync::runtime::RuntimeBuilder::new().build().unwrap();
rt.block_on(async {
let conn = setup_test_db();
let project_id = insert_test_project(&conn);
let issue_id = insert_test_issue(&conn, project_id, 1);
let disc_id = insert_discussion(&conn, project_id, Some(issue_id), None);
// Two docs referencing the same discussion
insert_document(
&conn,
"discussion",
disc_id,
project_id,
"deployment pipeline first doc",
);
let note_id = insert_note(&conn, disc_id, project_id, "deployment note", false);
insert_document(
&conn,
"note",
note_id,
project_id,
"deployment pipeline second doc",
);
// Two docs referencing the same discussion
insert_document(
&conn,
"discussion",
disc_id,
project_id,
"deployment pipeline first doc",
);
let note_id = insert_note(&conn, disc_id, project_id, "deployment note", false);
insert_document(
&conn,
"note",
note_id,
project_id,
"deployment pipeline second doc",
);
let result = seed_timeline(&conn, None, "deployment", None, None, 50, 10)
.await
.unwrap();
assert_eq!(
result.matched_discussions.len(),
1,
"Same discussion_id from two docs should deduplicate"
);
let result = seed_timeline(&conn, None, "deployment", None, None, 50, 10)
.await
.unwrap();
assert_eq!(
result.matched_discussions.len(),
1,
"Same discussion_id from two docs should deduplicate"
);
});
}
#[tokio::test]
async fn test_seed_matched_discussions_have_correct_parent_entity() {
let conn = setup_test_db();
let project_id = insert_test_project(&conn);
let mr_id = insert_test_mr(&conn, project_id, 99);
let disc_id = insert_discussion(&conn, project_id, None, Some(mr_id));
insert_document(
&conn,
"discussion",
disc_id,
project_id,
"deployment pipeline for merge request",
);
#[test]
fn test_seed_matched_discussions_have_correct_parent_entity() {
let rt = asupersync::runtime::RuntimeBuilder::new().build().unwrap();
rt.block_on(async {
let conn = setup_test_db();
let project_id = insert_test_project(&conn);
let mr_id = insert_test_mr(&conn, project_id, 99);
let disc_id = insert_discussion(&conn, project_id, None, Some(mr_id));
insert_document(
&conn,
"discussion",
disc_id,
project_id,
"deployment pipeline for merge request",
);
let result = seed_timeline(&conn, None, "deployment", None, None, 50, 10)
.await
.unwrap();
assert_eq!(result.matched_discussions.len(), 1);
assert_eq!(result.matched_discussions[0].entity_type, "merge_request");
assert_eq!(result.matched_discussions[0].entity_id, mr_id);
let result = seed_timeline(&conn, None, "deployment", None, None, 50, 10)
.await
.unwrap();
assert_eq!(result.matched_discussions.len(), 1);
assert_eq!(result.matched_discussions[0].entity_type, "merge_request");
assert_eq!(result.matched_discussions[0].entity_id, mr_id);
});
}
// ─── seed_timeline_direct tests ─────────────────────────────────────────────

241
tests/http_integration.rs Normal file
View File

@@ -0,0 +1,241 @@
use std::io::{Read, Write};
use std::net::TcpListener;
use std::time::Duration;
use lore::http::Client;
/// Spin up a one-shot TCP server that replies with `response_bytes` to the first
/// connection, then shuts down. Returns the `http://127.0.0.1:{port}` base URL.
fn oneshot_server(response_bytes: Vec<u8>) -> String {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let port = listener.local_addr().unwrap().port();
std::thread::spawn(move || {
let (mut stream, _) = listener.accept().unwrap();
// Drain the request so the client doesn't get a broken pipe.
let mut buf = [0u8; 4096];
loop {
let n = stream.read(&mut buf).unwrap();
// Detect end-of-headers (double CRLF). For simplicity we don't
// parse Content-Length; our test requests have tiny/no bodies.
if buf[..n].windows(4).any(|w| w == b"\r\n\r\n") {
break;
}
}
stream.write_all(&response_bytes).unwrap();
stream.flush().unwrap();
// Drop closes the connection.
});
format!("http://127.0.0.1:{port}")
}
fn json_response(status: u16, reason: &str, body: &str) -> Vec<u8> {
let headers = format!(
"HTTP/1.1 {status} {reason}\r\n\
Content-Type: application/json\r\n\
Content-Length: {}\r\n\
\r\n",
body.len()
);
let mut bytes = headers.into_bytes();
bytes.extend_from_slice(body.as_bytes());
bytes
}
fn run<F: std::future::Future<Output = T>, T>(f: F) -> T {
asupersync::runtime::RuntimeBuilder::new()
.build()
.unwrap()
.block_on(f)
}
// -------------------------------------------------------------------
// Test 1: GET with headers + JSON response
// -------------------------------------------------------------------
#[test]
fn get_with_headers_and_json_response() {
let body = r#"{"ok":true,"data":"hello"}"#;
let raw = format!(
"HTTP/1.1 200 OK\r\n\
Content-Type: application/json\r\n\
X-Custom: test-value\r\n\
Content-Length: {}\r\n\
\r\n\
{body}",
body.len()
);
let base = oneshot_server(raw.into_bytes());
run(async {
let client = Client::with_timeout(Duration::from_secs(5));
let resp = client
.get(
&format!("{base}/api/test"),
&[("Accept", "application/json")],
)
.await
.unwrap();
assert!(resp.is_success());
assert_eq!(resp.status, 200);
assert_eq!(resp.header("x-custom"), Some("test-value"));
let parsed: serde_json::Value = resp.json().unwrap();
assert_eq!(parsed["ok"], true);
assert_eq!(parsed["data"], "hello");
});
}
// -------------------------------------------------------------------
// Test 2: POST with JSON body
// -------------------------------------------------------------------
#[test]
fn post_json_body_round_trip() {
let resp_body = r#"{"received":true}"#;
let base = oneshot_server(json_response(200, "OK", resp_body));
run(async {
let client = Client::with_timeout(Duration::from_secs(5));
#[derive(serde::Serialize)]
struct Payload {
model: String,
input: Vec<String>,
}
let payload = Payload {
model: "test-model".into(),
input: vec!["hello".into(), "world".into()],
};
let resp = client
.post_json(&format!("{base}/api/embed"), &[], &payload)
.await
.unwrap();
assert!(resp.is_success());
let parsed: serde_json::Value = resp.json().unwrap();
assert_eq!(parsed["received"], true);
});
}
// -------------------------------------------------------------------
// Test 3: Non-success status code (429) with Retry-After header
// -------------------------------------------------------------------
#[test]
fn non_success_status_with_retry_after() {
let body = r#"{"error":"rate limited"}"#;
let raw = format!(
"HTTP/1.1 429 Too Many Requests\r\n\
Retry-After: 30\r\n\
Content-Type: application/json\r\n\
Content-Length: {}\r\n\
\r\n\
{body}",
body.len()
);
let base = oneshot_server(raw.into_bytes());
run(async {
let client = Client::with_timeout(Duration::from_secs(5));
let resp = client.get(&format!("{base}/api/data"), &[]).await.unwrap();
assert!(!resp.is_success());
assert_eq!(resp.status, 429);
assert_eq!(resp.header("retry-after"), Some("30"));
let parsed: serde_json::Value = resp.json().unwrap();
assert_eq!(parsed["error"], "rate limited");
});
}
// -------------------------------------------------------------------
// Test 4: Timeout fires correctly
// -------------------------------------------------------------------
#[test]
fn timeout_fires_on_slow_server() {
// Server accepts but never responds.
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let port = listener.local_addr().unwrap().port();
std::thread::spawn(move || {
let (_stream, _) = listener.accept().unwrap();
// Hold the connection open without writing anything.
std::thread::sleep(Duration::from_secs(30));
});
let base = format!("http://127.0.0.1:{port}");
run(async {
let client = Client::with_timeout(Duration::from_millis(200));
let result = client.get(&format!("{base}/api/slow"), &[]).await;
assert!(result.is_err(), "expected timeout error");
let err_str = format!("{:?}", result.unwrap_err());
// The error should mention timeout.
assert!(
err_str.to_lowercase().contains("timeout")
|| err_str.to_lowercase().contains("timed out"),
"error should mention timeout, got: {err_str}"
);
});
}
// -------------------------------------------------------------------
// Test 5: Large response rejection (64 MiB guard)
// -------------------------------------------------------------------
#[test]
fn large_response_rejected() {
// Build a response claiming a huge Content-Length but only sending the headers
// plus a body that exceeds 64 MiB. We actually send 64 MiB + 1 byte.
// To avoid allocating that much in the test, we use chunked transfer with a
// server that streams data.
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let port = listener.local_addr().unwrap().port();
std::thread::spawn(move || {
let (mut stream, _) = listener.accept().unwrap();
// Drain request headers.
let mut buf = [0u8; 4096];
loop {
let n = stream.read(&mut buf).unwrap();
if buf[..n].windows(4).any(|w| w == b"\r\n\r\n") {
break;
}
}
let oversized = 64 * 1024 * 1024 + 1; // 64 MiB + 1
let header = format!(
"HTTP/1.1 200 OK\r\n\
Content-Length: {oversized}\r\n\
\r\n"
);
stream.write_all(header.as_bytes()).unwrap();
// Stream zeros in chunks to avoid a huge allocation.
let chunk = vec![0u8; 1024 * 1024]; // 1 MiB chunks
for _ in 0..65 {
if stream.write_all(&chunk).is_err() {
break; // Client may close early.
}
}
});
let base = format!("http://127.0.0.1:{port}");
run(async {
let client = Client::with_timeout(Duration::from_secs(30));
let result = client.get(&format!("{base}/api/huge"), &[]).await;
assert!(result.is_err(), "expected large-response rejection");
let err_str = format!("{:?}", result.unwrap_err());
assert!(
err_str.contains("too large")
|| err_str.contains("Response body")
|| err_str.contains("BodyTooLarge"),
"error should mention body size, got: {err_str}"
);
});
}