Files
gitlore/src/gitlab/client.rs
teernisse e8d6c5b15f feat(runtime): replace tokio+reqwest with asupersync async runtime
- Add HTTP adapter layer (src/http.rs) wrapping asupersync h1 client
- Migrate gitlab client, graphql, and ollama to HTTP adapter
- Swap entrypoint from #[tokio::main] to RuntimeBuilder::new().block_on()
- Rewrite signal handler for asupersync (RuntimeHandle::spawn + ctrl_c())
- Migrate rate limiter sleeps to asupersync::time::sleep(wall_now(), d)
- Add asupersync-native HTTP integration tests
- Convert timeline_seed_tests to RuntimeBuilder pattern

Phases 1-3 of asupersync migration (atomic: code won't compile without all pieces).
2026-03-06 15:57:20 -05:00

852 lines
27 KiB
Rust

use asupersync::time::{sleep, wall_now};
use async_stream::stream;
use chrono::{DateTime, Utc};
use futures::Stream;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::{Duration, Instant};
use tracing::debug;
use super::types::{
GitLabDiscussion, GitLabIssue, GitLabIssueRef, GitLabLabelEvent, GitLabMergeRequest,
GitLabMilestoneEvent, GitLabMrDiff, GitLabProject, GitLabStateEvent, GitLabUser, GitLabVersion,
};
use crate::core::error::{LoreError, Result};
use crate::http;
struct RateLimiter {
last_request: Instant,
min_interval: Duration,
}
impl RateLimiter {
fn new(requests_per_second: f64) -> Self {
let rps = requests_per_second.max(0.1);
Self {
last_request: Instant::now() - Duration::from_secs(1),
min_interval: Duration::from_secs_f64(1.0 / rps),
}
}
fn check_delay(&mut self) -> Option<Duration> {
let elapsed = self.last_request.elapsed();
if elapsed < self.min_interval {
let jitter = Duration::from_millis(rand_jitter());
let delay = self.min_interval - elapsed + jitter;
self.last_request = Instant::now() + delay;
Some(delay)
} else {
self.last_request = Instant::now();
None
}
}
}
fn rand_jitter() -> u64 {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let n = COUNTER.fetch_add(1, Ordering::Relaxed);
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.subsec_nanos() as u64;
(n ^ nanos) % 50
}
pub struct GitLabClient {
client: http::Client,
base_url: String,
token: String,
rate_limiter: Arc<Mutex<RateLimiter>>,
}
impl GitLabClient {
pub fn new(base_url: &str, token: &str, requests_per_second: Option<f64>) -> Self {
Self {
client: http::Client::with_timeout(Duration::from_secs(30)),
base_url: base_url.trim_end_matches('/').to_string(),
token: token.to_string(),
rate_limiter: Arc::new(Mutex::new(RateLimiter::new(
requests_per_second.unwrap_or(10.0),
))),
}
}
pub fn graphql_client(&self) -> crate::gitlab::graphql::GraphqlClient {
crate::gitlab::graphql::GraphqlClient::new(&self.base_url, &self.token)
}
pub async fn get_current_user(&self) -> Result<GitLabUser> {
self.request("/api/v4/user").await
}
pub async fn get_project(&self, path_with_namespace: &str) -> Result<GitLabProject> {
let encoded = urlencoding::encode(path_with_namespace);
self.request(&format!("/api/v4/projects/{encoded}")).await
}
pub async fn get_version(&self) -> Result<GitLabVersion> {
self.request("/api/v4/version").await
}
pub async fn get_issue_by_iid(&self, project_id: i64, iid: i64) -> Result<GitLabIssue> {
self.request(&format!("/api/v4/projects/{project_id}/issues/{iid}"))
.await
}
pub async fn get_mr_by_iid(&self, project_id: i64, iid: i64) -> Result<GitLabMergeRequest> {
self.request(&format!(
"/api/v4/projects/{project_id}/merge_requests/{iid}"
))
.await
}
const MAX_RETRIES: u32 = 3;
async fn request<T: serde::de::DeserializeOwned>(&self, path: &str) -> Result<T> {
let url = format!("{}{}", self.base_url, path);
let mut last_response = None;
for attempt in 0..=Self::MAX_RETRIES {
// SAFETY: std::sync::Mutex blocks the executor thread while held. This is safe
// because the critical section is a single Instant::now() comparison with no I/O.
// If async work is ever added inside the lock, switch to an async-aware lock.
let delay = {
let mut limiter = self
.rate_limiter
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
limiter.check_delay()
};
if let Some(d) = delay {
sleep(wall_now(), d).await;
}
debug!(url = %url, attempt, "GitLab request");
let response = self
.client
.get(
&url,
&[
("PRIVATE-TOKEN", self.token.as_str()),
("Accept", "application/json"),
],
)
.await?;
if response.status == 429 && attempt < Self::MAX_RETRIES {
let retry_after = Self::parse_retry_after(&response);
tracing::info!(
path = %path,
attempt,
retry_after_secs = retry_after,
status_code = 429u16,
"Rate limited, retrying"
);
sleep(wall_now(), Duration::from_secs(retry_after)).await;
continue;
}
last_response = Some(response);
break;
}
self.handle_response(last_response.expect("retry loop ran at least once"), path)
}
fn parse_retry_after(response: &http::Response) -> u64 {
response
.header("retry-after")
.and_then(|s| s.parse().ok())
.unwrap_or(60)
}
fn handle_response<T: serde::de::DeserializeOwned>(
&self,
response: http::Response,
path: &str,
) -> Result<T> {
match response.status {
401 => Err(LoreError::GitLabAuthFailed),
404 => Err(LoreError::GitLabNotFound {
resource: path.to_string(),
}),
429 => {
let retry_after = Self::parse_retry_after(&response);
Err(LoreError::GitLabRateLimited { retry_after })
}
_ if response.is_success() => response.json::<T>().map_err(|e| {
LoreError::Other(format!("Failed to decode response from {path}: {e}"))
}),
s => Err(LoreError::Other(format!(
"GitLab API error: {s} {}",
response.reason
))),
}
}
pub fn paginate_issues(
&self,
gitlab_project_id: i64,
updated_after: Option<i64>,
cursor_rewind_seconds: u32,
) -> Pin<Box<dyn Stream<Item = Result<GitLabIssue>> + Send + '_>> {
Box::pin(stream! {
let mut page = 1u32;
let per_page = 100u32;
let rewound_cursor = updated_after.map(|ts| {
let rewind_ms = (cursor_rewind_seconds as i64) * 1000;
(ts - rewind_ms).max(0)
});
loop {
let mut params = vec![
("scope", "all".to_string()),
("state", "all".to_string()),
("order_by", "updated_at".to_string()),
("sort", "asc".to_string()),
("per_page", per_page.to_string()),
("page", page.to_string()),
];
if let Some(ts_ms) = rewound_cursor
&& let Some(iso) = ms_to_iso8601(ts_ms)
{
params.push(("updated_after", iso));
}
let path = format!("/api/v4/projects/{}/issues", gitlab_project_id);
let result = self.request_with_headers::<Vec<GitLabIssue>>(&path, &params).await;
match result {
Ok((issues, headers)) => {
let is_empty = issues.is_empty();
let full_page = issues.len() as u32 == per_page;
for issue in issues {
yield Ok(issue);
}
let next_page = header_value(&headers, "x-next-page")
.and_then(|s| s.parse::<u32>().ok());
match next_page {
Some(next) if next > page => {
page = next;
}
_ => {
if is_empty || !full_page {
break;
}
page += 1;
}
}
}
Err(e) => {
yield Err(e);
break;
}
}
}
})
}
pub fn paginate_issue_discussions(
&self,
gitlab_project_id: i64,
issue_iid: i64,
) -> Pin<Box<dyn Stream<Item = Result<GitLabDiscussion>> + Send + '_>> {
Box::pin(stream! {
let mut page = 1u32;
let per_page = 100u32;
loop {
let params = vec![
("per_page", per_page.to_string()),
("page", page.to_string()),
];
let path = format!(
"/api/v4/projects/{}/issues/{}/discussions",
gitlab_project_id, issue_iid
);
let result = self.request_with_headers::<Vec<GitLabDiscussion>>(&path, &params).await;
match result {
Ok((discussions, headers)) => {
let is_empty = discussions.is_empty();
let full_page = discussions.len() as u32 == per_page;
for discussion in discussions {
yield Ok(discussion);
}
let next_page = header_value(&headers, "x-next-page")
.and_then(|s| s.parse::<u32>().ok());
match next_page {
Some(next) if next > page => {
page = next;
}
_ => {
if is_empty || !full_page {
break;
}
page += 1;
}
}
}
Err(e) => {
yield Err(e);
break;
}
}
}
})
}
pub fn paginate_merge_requests(
&self,
gitlab_project_id: i64,
updated_after: Option<i64>,
cursor_rewind_seconds: u32,
) -> Pin<Box<dyn Stream<Item = Result<GitLabMergeRequest>> + Send + '_>> {
Box::pin(stream! {
let mut page = 1u32;
let per_page = 100u32;
loop {
let page_result = self
.fetch_merge_requests_page(
gitlab_project_id,
updated_after,
cursor_rewind_seconds,
page,
per_page,
)
.await;
match page_result {
Ok(mr_page) => {
for mr in mr_page.items {
yield Ok(mr);
}
if mr_page.is_last_page {
break;
}
match mr_page.next_page {
Some(np) => page = np,
None => break,
}
}
Err(e) => {
yield Err(e);
break;
}
}
}
})
}
pub async fn fetch_merge_requests_page(
&self,
gitlab_project_id: i64,
updated_after: Option<i64>,
cursor_rewind_seconds: u32,
page: u32,
per_page: u32,
) -> Result<MergeRequestPage> {
let rewound_cursor = updated_after.map(|ts| {
let rewind_ms = (cursor_rewind_seconds as i64) * 1000;
(ts - rewind_ms).max(0)
});
let mut params = vec![
("scope", "all".to_string()),
("state", "all".to_string()),
("order_by", "updated_at".to_string()),
("sort", "asc".to_string()),
("per_page", per_page.to_string()),
("page", page.to_string()),
];
if let Some(ts_ms) = rewound_cursor
&& let Some(iso) = ms_to_iso8601(ts_ms)
{
params.push(("updated_after", iso));
}
let path = format!("/api/v4/projects/{}/merge_requests", gitlab_project_id);
let (items, headers) = self
.request_with_headers::<Vec<GitLabMergeRequest>>(&path, &params)
.await?;
let link_next = parse_link_header_next(&headers);
let x_next_page = header_value(&headers, "x-next-page").and_then(|s| s.parse::<u32>().ok());
let full_page = items.len() as u32 == per_page;
let (next_page, is_last_page) = match (link_next.is_some(), x_next_page, full_page) {
(true, _, _) => (Some(page + 1), false),
(false, Some(np), _) => (Some(np), false),
(false, None, true) => (Some(page + 1), false),
(false, None, false) => (None, true),
};
Ok(MergeRequestPage {
items,
next_page,
is_last_page,
})
}
pub fn paginate_mr_discussions(
&self,
gitlab_project_id: i64,
mr_iid: i64,
) -> Pin<Box<dyn Stream<Item = Result<GitLabDiscussion>> + Send + '_>> {
Box::pin(stream! {
let mut page = 1u32;
let per_page = 100u32;
loop {
let params = vec![
("per_page", per_page.to_string()),
("page", page.to_string()),
];
let path = format!(
"/api/v4/projects/{}/merge_requests/{}/discussions",
gitlab_project_id, mr_iid
);
let result = self.request_with_headers::<Vec<GitLabDiscussion>>(&path, &params).await;
match result {
Ok((discussions, headers)) => {
let is_empty = discussions.is_empty();
let full_page = discussions.len() as u32 == per_page;
for discussion in discussions {
yield Ok(discussion);
}
let link_next = parse_link_header_next(&headers);
let x_next_page = header_value(&headers, "x-next-page")
.and_then(|s| s.parse::<u32>().ok());
let should_continue = match (link_next.is_some(), x_next_page, full_page) {
(true, _, _) => {
page += 1;
true
}
(false, Some(np), _) if np > page => {
page = np;
true
}
(false, None, true) => {
page += 1;
true
}
_ => false,
};
if !should_continue || is_empty {
break;
}
}
Err(e) => {
yield Err(e);
break;
}
}
}
})
}
async fn request_with_headers<T: serde::de::DeserializeOwned>(
&self,
path: &str,
params: &[(&str, String)],
) -> Result<(T, Vec<(String, String)>)> {
let url = format!("{}{}", self.base_url, path);
let mut last_response = None;
for attempt in 0..=Self::MAX_RETRIES {
// SAFETY: std::sync::Mutex blocks the executor thread while held. This is safe
// because the critical section is a single Instant::now() comparison with no I/O.
// If async work is ever added inside the lock, switch to an async-aware lock.
let delay = {
let mut limiter = self
.rate_limiter
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
limiter.check_delay()
};
if let Some(d) = delay {
sleep(wall_now(), d).await;
}
debug!(url = %url, ?params, attempt, "GitLab paginated request");
let response = self
.client
.get_with_query(
&url,
params,
&[
("PRIVATE-TOKEN", self.token.as_str()),
("Accept", "application/json"),
],
)
.await?;
if response.status == 429 && attempt < Self::MAX_RETRIES {
let retry_after = Self::parse_retry_after(&response);
tracing::info!(
path = %path,
attempt,
retry_after_secs = retry_after,
status_code = 429u16,
"Rate limited, retrying"
);
sleep(wall_now(), Duration::from_secs(retry_after)).await;
continue;
}
last_response = Some(response);
break;
}
let response = last_response.expect("retry loop ran at least once");
let headers = response.headers.clone();
let body = self.handle_response(response, path)?;
Ok((body, headers))
}
}
impl GitLabClient {
pub async fn fetch_all_mr_discussions(
&self,
gitlab_project_id: i64,
mr_iid: i64,
) -> Result<Vec<GitLabDiscussion>> {
use futures::StreamExt;
let mut discussions = Vec::new();
let mut stream = self.paginate_mr_discussions(gitlab_project_id, mr_iid);
while let Some(result) = stream.next().await {
discussions.push(result?);
}
Ok(discussions)
}
pub async fn fetch_all_issue_discussions(
&self,
gitlab_project_id: i64,
issue_iid: i64,
) -> Result<Vec<GitLabDiscussion>> {
use futures::StreamExt;
let mut discussions = Vec::new();
let mut stream = self.paginate_issue_discussions(gitlab_project_id, issue_iid);
while let Some(result) = stream.next().await {
discussions.push(result?);
}
Ok(discussions)
}
}
impl GitLabClient {
async fn fetch_all_pages<T: serde::de::DeserializeOwned>(&self, path: &str) -> Result<Vec<T>> {
let mut results = Vec::new();
let mut page = 1u32;
let per_page = 100u32;
loop {
let params = vec![
("per_page", per_page.to_string()),
("page", page.to_string()),
];
let (items, headers) = self.request_with_headers::<Vec<T>>(path, &params).await?;
let is_empty = items.is_empty();
let full_page = items.len() as u32 == per_page;
results.extend(items);
let next_page =
header_value(&headers, "x-next-page").and_then(|s| s.parse::<u32>().ok());
match next_page {
Some(next) if next > page => page = next,
_ => {
if is_empty || !full_page {
break;
}
page += 1;
}
}
}
Ok(results)
}
pub async fn fetch_mr_closes_issues(
&self,
gitlab_project_id: i64,
iid: i64,
) -> Result<Vec<GitLabIssueRef>> {
let path =
format!("/api/v4/projects/{gitlab_project_id}/merge_requests/{iid}/closes_issues");
self.fetch_all_pages(&path).await
}
pub async fn fetch_mr_diffs(
&self,
gitlab_project_id: i64,
iid: i64,
) -> Result<Vec<GitLabMrDiff>> {
let path = format!("/api/v4/projects/{gitlab_project_id}/merge_requests/{iid}/diffs");
coalesce_not_found(self.fetch_all_pages(&path).await)
}
pub async fn fetch_issue_state_events(
&self,
gitlab_project_id: i64,
iid: i64,
) -> Result<Vec<GitLabStateEvent>> {
let path =
format!("/api/v4/projects/{gitlab_project_id}/issues/{iid}/resource_state_events");
self.fetch_all_pages(&path).await
}
pub async fn fetch_issue_label_events(
&self,
gitlab_project_id: i64,
iid: i64,
) -> Result<Vec<GitLabLabelEvent>> {
let path =
format!("/api/v4/projects/{gitlab_project_id}/issues/{iid}/resource_label_events");
self.fetch_all_pages(&path).await
}
pub async fn fetch_issue_milestone_events(
&self,
gitlab_project_id: i64,
iid: i64,
) -> Result<Vec<GitLabMilestoneEvent>> {
let path =
format!("/api/v4/projects/{gitlab_project_id}/issues/{iid}/resource_milestone_events");
self.fetch_all_pages(&path).await
}
pub async fn fetch_mr_state_events(
&self,
gitlab_project_id: i64,
iid: i64,
) -> Result<Vec<GitLabStateEvent>> {
let path = format!(
"/api/v4/projects/{gitlab_project_id}/merge_requests/{iid}/resource_state_events"
);
self.fetch_all_pages(&path).await
}
pub async fn fetch_mr_label_events(
&self,
gitlab_project_id: i64,
iid: i64,
) -> Result<Vec<GitLabLabelEvent>> {
let path = format!(
"/api/v4/projects/{gitlab_project_id}/merge_requests/{iid}/resource_label_events"
);
self.fetch_all_pages(&path).await
}
pub async fn fetch_mr_milestone_events(
&self,
gitlab_project_id: i64,
iid: i64,
) -> Result<Vec<GitLabMilestoneEvent>> {
let path = format!(
"/api/v4/projects/{gitlab_project_id}/merge_requests/{iid}/resource_milestone_events"
);
self.fetch_all_pages(&path).await
}
pub async fn fetch_all_resource_events(
&self,
gitlab_project_id: i64,
entity_type: &str,
iid: i64,
) -> Result<(
Vec<GitLabStateEvent>,
Vec<GitLabLabelEvent>,
Vec<GitLabMilestoneEvent>,
)> {
let (state_res, label_res, milestone_res) = match entity_type {
"issue" => {
futures::join!(
self.fetch_issue_state_events(gitlab_project_id, iid),
self.fetch_issue_label_events(gitlab_project_id, iid),
self.fetch_issue_milestone_events(gitlab_project_id, iid),
)
}
"merge_request" => {
futures::join!(
self.fetch_mr_state_events(gitlab_project_id, iid),
self.fetch_mr_label_events(gitlab_project_id, iid),
self.fetch_mr_milestone_events(gitlab_project_id, iid),
)
}
_ => {
return Err(LoreError::Other(format!(
"Invalid entity type for resource events: {entity_type}"
)));
}
};
let state = coalesce_not_found(state_res)?;
let label = coalesce_not_found(label_res)?;
let milestone = coalesce_not_found(milestone_res)?;
Ok((state, label, milestone))
}
}
#[derive(Debug)]
pub struct MergeRequestPage {
pub items: Vec<GitLabMergeRequest>,
pub next_page: Option<u32>,
pub is_last_page: bool,
}
fn header_value<'a>(headers: &'a [(String, String)], name: &str) -> Option<&'a str> {
headers
.iter()
.find(|(k, _)| k.eq_ignore_ascii_case(name))
.map(|(_, v)| v.as_str())
}
fn parse_link_header_next(headers: &[(String, String)]) -> Option<String> {
header_value(headers, "link").and_then(|link_str| {
for part in link_str.split(',') {
let part = part.trim();
if (part.contains("rel=\"next\"") || part.contains("rel=next"))
&& let Some(start) = part.find('<')
&& let Some(end) = part.find('>')
{
return Some(part[start + 1..end].to_string());
}
}
None
})
}
fn coalesce_not_found<T>(result: Result<Vec<T>>) -> Result<Vec<T>> {
match result {
Ok(v) => Ok(v),
Err(LoreError::GitLabNotFound { .. }) => Ok(Vec::new()),
Err(e) => Err(e),
}
}
fn ms_to_iso8601(ms: i64) -> Option<String> {
DateTime::<Utc>::from_timestamp_millis(ms)
.map(|dt| dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string())
}
#[cfg(test)]
#[path = "client_tests.rs"]
mod client_tests;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn ms_to_iso8601_converts_correctly() {
let result = ms_to_iso8601(1705312800000);
assert_eq!(result, Some("2024-01-15T10:00:00.000Z".to_string()));
}
#[test]
fn ms_to_iso8601_handles_zero() {
let result = ms_to_iso8601(0);
assert_eq!(result, Some("1970-01-01T00:00:00.000Z".to_string()));
}
#[test]
fn cursor_rewind_clamps_to_zero() {
let updated_after = 1000i64;
let cursor_rewind_seconds = 10u32;
let rewind_ms = i64::from(cursor_rewind_seconds) * 1000;
let rewound = (updated_after - rewind_ms).max(0);
assert_eq!(rewound, 0);
}
#[test]
fn cursor_rewind_applies_correctly() {
let updated_after = 1705312800000i64;
let cursor_rewind_seconds = 60u32;
let rewind_ms = i64::from(cursor_rewind_seconds) * 1000;
let rewound = (updated_after - rewind_ms).max(0);
assert_eq!(rewound, 1705312740000);
}
#[test]
fn parse_link_header_extracts_next_url() {
let headers = vec![(
"link".to_string(),
r#"<https://gitlab.example.com/api/v4/projects/1/merge_requests?page=2>; rel="next", <https://gitlab.example.com/api/v4/projects/1/merge_requests?page=5>; rel="last""#.to_string(),
)];
let result = parse_link_header_next(&headers);
assert_eq!(
result,
Some("https://gitlab.example.com/api/v4/projects/1/merge_requests?page=2".to_string())
);
}
#[test]
fn parse_link_header_handles_unquoted_rel() {
let headers = vec![(
"link".to_string(),
r#"<https://example.com/next>; rel=next"#.to_string(),
)];
let result = parse_link_header_next(&headers);
assert_eq!(result, Some("https://example.com/next".to_string()));
}
#[test]
fn parse_link_header_returns_none_when_no_next() {
let headers = vec![(
"link".to_string(),
r#"<https://example.com/last>; rel="last""#.to_string(),
)];
let result = parse_link_header_next(&headers);
assert!(result.is_none());
}
#[test]
fn parse_link_header_returns_none_when_missing() {
let headers: Vec<(String, String)> = vec![];
let result = parse_link_header_next(&headers);
assert!(result.is_none());
}
}