use async_stream::stream; use chrono::{DateTime, Utc}; use futures::Stream; use reqwest::header::{ACCEPT, HeaderMap, HeaderValue}; use reqwest::{Client, Response, StatusCode}; use std::pin::Pin; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::Mutex; use tokio::time::sleep; use tracing::{debug, warn}; use super::types::{ GitLabDiscussion, GitLabIssue, GitLabIssueRef, GitLabLabelEvent, GitLabMergeRequest, GitLabMilestoneEvent, GitLabMrDiff, GitLabProject, GitLabStateEvent, GitLabUser, GitLabVersion, }; use crate::core::error::{LoreError, Result}; struct RateLimiter { last_request: Instant, min_interval: Duration, } impl RateLimiter { fn new(requests_per_second: f64) -> Self { let rps = requests_per_second.max(0.1); Self { last_request: Instant::now() - Duration::from_secs(1), min_interval: Duration::from_secs_f64(1.0 / rps), } } fn check_delay(&mut self) -> Option { let elapsed = self.last_request.elapsed(); if elapsed < self.min_interval { let jitter = Duration::from_millis(rand_jitter()); let delay = self.min_interval - elapsed + jitter; self.last_request = Instant::now() + delay; Some(delay) } else { self.last_request = Instant::now(); None } } } fn rand_jitter() -> u64 { use std::sync::atomic::{AtomicU64, Ordering}; static COUNTER: AtomicU64 = AtomicU64::new(0); let n = COUNTER.fetch_add(1, Ordering::Relaxed); let nanos = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .subsec_nanos() as u64; (n ^ nanos) % 50 } #[derive(Clone)] pub struct GitLabClient { client: Client, base_url: String, token: String, rate_limiter: Arc>, } impl GitLabClient { pub fn new(base_url: &str, token: &str, requests_per_second: Option) -> Self { let mut headers = HeaderMap::new(); headers.insert(ACCEPT, HeaderValue::from_static("application/json")); let client = Client::builder() .default_headers(headers.clone()) .timeout(Duration::from_secs(30)) .build() .unwrap_or_else(|e| { warn!( error = %e, "Failed to build configured HTTP client; falling back to default client with timeout" ); Client::builder() .default_headers(headers) .timeout(Duration::from_secs(30)) .build() .unwrap_or_else(|_| Client::new()) }); Self { client, base_url: base_url.trim_end_matches('/').to_string(), token: token.to_string(), rate_limiter: Arc::new(Mutex::new(RateLimiter::new( requests_per_second.unwrap_or(10.0), ))), } } pub fn graphql_client(&self) -> crate::gitlab::graphql::GraphqlClient { crate::gitlab::graphql::GraphqlClient::new(&self.base_url, &self.token) } pub async fn get_current_user(&self) -> Result { self.request("/api/v4/user").await } pub async fn get_project(&self, path_with_namespace: &str) -> Result { let encoded = urlencoding::encode(path_with_namespace); self.request(&format!("/api/v4/projects/{encoded}")).await } pub async fn get_version(&self) -> Result { self.request("/api/v4/version").await } pub async fn get_issue_by_iid(&self, project_id: i64, iid: i64) -> Result { self.request(&format!("/api/v4/projects/{project_id}/issues/{iid}")) .await } pub async fn get_mr_by_iid(&self, project_id: i64, iid: i64) -> Result { self.request(&format!( "/api/v4/projects/{project_id}/merge_requests/{iid}" )) .await } const MAX_RETRIES: u32 = 3; async fn request(&self, path: &str) -> Result { let url = format!("{}{}", self.base_url, path); let mut last_response = None; for attempt in 0..=Self::MAX_RETRIES { let delay = self.rate_limiter.lock().await.check_delay(); if let Some(d) = delay { sleep(d).await; } debug!(url = %url, attempt, "GitLab request"); let response = self .client .get(&url) .header("PRIVATE-TOKEN", &self.token) .send() .await .map_err(|e| LoreError::GitLabNetworkError { base_url: self.base_url.clone(), source: Some(e), })?; if response.status() == StatusCode::TOO_MANY_REQUESTS && attempt < Self::MAX_RETRIES { let retry_after = Self::parse_retry_after(&response); tracing::info!( path = %path, attempt, retry_after_secs = retry_after, status_code = 429u16, "Rate limited, retrying" ); sleep(Duration::from_secs(retry_after)).await; continue; } last_response = Some(response); break; } self.handle_response(last_response.expect("retry loop ran at least once"), path) .await } fn parse_retry_after(response: &Response) -> u64 { response .headers() .get("retry-after") .and_then(|v| v.to_str().ok()) .and_then(|s| s.parse().ok()) .unwrap_or(60) } async fn handle_response( &self, response: Response, path: &str, ) -> Result { match response.status() { StatusCode::UNAUTHORIZED => Err(LoreError::GitLabAuthFailed), StatusCode::NOT_FOUND => Err(LoreError::GitLabNotFound { resource: path.to_string(), }), StatusCode::TOO_MANY_REQUESTS => { let retry_after = Self::parse_retry_after(&response); Err(LoreError::GitLabRateLimited { retry_after }) } status if status.is_success() => { let text = response.text().await?; serde_json::from_str(&text).map_err(|e| { let preview = if text.len() > 500 { &text[..text.floor_char_boundary(500)] } else { &text }; LoreError::Other(format!( "Failed to decode response from {path}: {e}\nResponse preview: {preview}" )) }) } status => Err(LoreError::Other(format!( "GitLab API error: {} {}", status.as_u16(), status.canonical_reason().unwrap_or("Unknown") ))), } } pub fn paginate_issues( &self, gitlab_project_id: i64, updated_after: Option, cursor_rewind_seconds: u32, ) -> Pin> + Send + '_>> { Box::pin(stream! { let mut page = 1u32; let per_page = 100u32; let rewound_cursor = updated_after.map(|ts| { let rewind_ms = (cursor_rewind_seconds as i64) * 1000; (ts - rewind_ms).max(0) }); loop { let mut params = vec![ ("scope", "all".to_string()), ("state", "all".to_string()), ("order_by", "updated_at".to_string()), ("sort", "asc".to_string()), ("per_page", per_page.to_string()), ("page", page.to_string()), ]; if let Some(ts_ms) = rewound_cursor && let Some(iso) = ms_to_iso8601(ts_ms) { params.push(("updated_after", iso)); } let path = format!("/api/v4/projects/{}/issues", gitlab_project_id); let result = self.request_with_headers::>(&path, ¶ms).await; match result { Ok((issues, headers)) => { let is_empty = issues.is_empty(); let full_page = issues.len() as u32 == per_page; for issue in issues { yield Ok(issue); } let next_page = headers .get("x-next-page") .and_then(|v| v.to_str().ok()) .and_then(|s| s.parse::().ok()); match next_page { Some(next) if next > page => { page = next; } _ => { if is_empty || !full_page { break; } page += 1; } } } Err(e) => { yield Err(e); break; } } } }) } pub fn paginate_issue_discussions( &self, gitlab_project_id: i64, issue_iid: i64, ) -> Pin> + Send + '_>> { Box::pin(stream! { let mut page = 1u32; let per_page = 100u32; loop { let params = vec![ ("per_page", per_page.to_string()), ("page", page.to_string()), ]; let path = format!( "/api/v4/projects/{}/issues/{}/discussions", gitlab_project_id, issue_iid ); let result = self.request_with_headers::>(&path, ¶ms).await; match result { Ok((discussions, headers)) => { let is_empty = discussions.is_empty(); let full_page = discussions.len() as u32 == per_page; for discussion in discussions { yield Ok(discussion); } let next_page = headers .get("x-next-page") .and_then(|v| v.to_str().ok()) .and_then(|s| s.parse::().ok()); match next_page { Some(next) if next > page => { page = next; } _ => { if is_empty || !full_page { break; } page += 1; } } } Err(e) => { yield Err(e); break; } } } }) } pub fn paginate_merge_requests( &self, gitlab_project_id: i64, updated_after: Option, cursor_rewind_seconds: u32, ) -> Pin> + Send + '_>> { Box::pin(stream! { let mut page = 1u32; let per_page = 100u32; loop { let page_result = self .fetch_merge_requests_page( gitlab_project_id, updated_after, cursor_rewind_seconds, page, per_page, ) .await; match page_result { Ok(mr_page) => { for mr in mr_page.items { yield Ok(mr); } if mr_page.is_last_page { break; } match mr_page.next_page { Some(np) => page = np, None => break, } } Err(e) => { yield Err(e); break; } } } }) } pub async fn fetch_merge_requests_page( &self, gitlab_project_id: i64, updated_after: Option, cursor_rewind_seconds: u32, page: u32, per_page: u32, ) -> Result { let rewound_cursor = updated_after.map(|ts| { let rewind_ms = (cursor_rewind_seconds as i64) * 1000; (ts - rewind_ms).max(0) }); let mut params = vec![ ("scope", "all".to_string()), ("state", "all".to_string()), ("order_by", "updated_at".to_string()), ("sort", "asc".to_string()), ("per_page", per_page.to_string()), ("page", page.to_string()), ]; if let Some(ts_ms) = rewound_cursor && let Some(iso) = ms_to_iso8601(ts_ms) { params.push(("updated_after", iso)); } let path = format!("/api/v4/projects/{}/merge_requests", gitlab_project_id); let (items, headers) = self .request_with_headers::>(&path, ¶ms) .await?; let link_next = parse_link_header_next(&headers); let x_next_page = headers .get("x-next-page") .and_then(|v| v.to_str().ok()) .and_then(|s| s.parse::().ok()); let full_page = items.len() as u32 == per_page; let (next_page, is_last_page) = match (link_next.is_some(), x_next_page, full_page) { (true, _, _) => (Some(page + 1), false), (false, Some(np), _) => (Some(np), false), (false, None, true) => (Some(page + 1), false), (false, None, false) => (None, true), }; Ok(MergeRequestPage { items, next_page, is_last_page, }) } pub fn paginate_mr_discussions( &self, gitlab_project_id: i64, mr_iid: i64, ) -> Pin> + Send + '_>> { Box::pin(stream! { let mut page = 1u32; let per_page = 100u32; loop { let params = vec![ ("per_page", per_page.to_string()), ("page", page.to_string()), ]; let path = format!( "/api/v4/projects/{}/merge_requests/{}/discussions", gitlab_project_id, mr_iid ); let result = self.request_with_headers::>(&path, ¶ms).await; match result { Ok((discussions, headers)) => { let is_empty = discussions.is_empty(); let full_page = discussions.len() as u32 == per_page; for discussion in discussions { yield Ok(discussion); } let link_next = parse_link_header_next(&headers); let x_next_page = headers .get("x-next-page") .and_then(|v| v.to_str().ok()) .and_then(|s| s.parse::().ok()); let should_continue = match (link_next.is_some(), x_next_page, full_page) { (true, _, _) => { page += 1; true } (false, Some(np), _) if np > page => { page = np; true } (false, None, true) => { page += 1; true } _ => false, }; if !should_continue || is_empty { break; } } Err(e) => { yield Err(e); break; } } } }) } async fn request_with_headers( &self, path: &str, params: &[(&str, String)], ) -> Result<(T, HeaderMap)> { let url = format!("{}{}", self.base_url, path); let mut last_response = None; for attempt in 0..=Self::MAX_RETRIES { let delay = self.rate_limiter.lock().await.check_delay(); if let Some(d) = delay { sleep(d).await; } debug!(url = %url, ?params, attempt, "GitLab paginated request"); let response = self .client .get(&url) .query(params) .header("PRIVATE-TOKEN", &self.token) .send() .await .map_err(|e| LoreError::GitLabNetworkError { base_url: self.base_url.clone(), source: Some(e), })?; if response.status() == StatusCode::TOO_MANY_REQUESTS && attempt < Self::MAX_RETRIES { let retry_after = Self::parse_retry_after(&response); tracing::info!( path = %path, attempt, retry_after_secs = retry_after, status_code = 429u16, "Rate limited, retrying" ); sleep(Duration::from_secs(retry_after)).await; continue; } last_response = Some(response); break; } let response = last_response.expect("retry loop ran at least once"); let headers = response.headers().clone(); let body = self.handle_response(response, path).await?; Ok((body, headers)) } } impl GitLabClient { pub async fn fetch_all_mr_discussions( &self, gitlab_project_id: i64, mr_iid: i64, ) -> Result> { use futures::StreamExt; let mut discussions = Vec::new(); let mut stream = self.paginate_mr_discussions(gitlab_project_id, mr_iid); while let Some(result) = stream.next().await { discussions.push(result?); } Ok(discussions) } } impl GitLabClient { async fn fetch_all_pages(&self, path: &str) -> Result> { let mut results = Vec::new(); let mut page = 1u32; let per_page = 100u32; loop { let params = vec![ ("per_page", per_page.to_string()), ("page", page.to_string()), ]; let (items, headers) = self.request_with_headers::>(path, ¶ms).await?; let is_empty = items.is_empty(); let full_page = items.len() as u32 == per_page; results.extend(items); let next_page = headers .get("x-next-page") .and_then(|v| v.to_str().ok()) .and_then(|s| s.parse::().ok()); match next_page { Some(next) if next > page => page = next, _ => { if is_empty || !full_page { break; } page += 1; } } } Ok(results) } pub async fn fetch_mr_closes_issues( &self, gitlab_project_id: i64, iid: i64, ) -> Result> { let path = format!("/api/v4/projects/{gitlab_project_id}/merge_requests/{iid}/closes_issues"); self.fetch_all_pages(&path).await } pub async fn fetch_mr_diffs( &self, gitlab_project_id: i64, iid: i64, ) -> Result> { let path = format!("/api/v4/projects/{gitlab_project_id}/merge_requests/{iid}/diffs"); coalesce_not_found(self.fetch_all_pages(&path).await) } pub async fn fetch_issue_state_events( &self, gitlab_project_id: i64, iid: i64, ) -> Result> { let path = format!("/api/v4/projects/{gitlab_project_id}/issues/{iid}/resource_state_events"); self.fetch_all_pages(&path).await } pub async fn fetch_issue_label_events( &self, gitlab_project_id: i64, iid: i64, ) -> Result> { let path = format!("/api/v4/projects/{gitlab_project_id}/issues/{iid}/resource_label_events"); self.fetch_all_pages(&path).await } pub async fn fetch_issue_milestone_events( &self, gitlab_project_id: i64, iid: i64, ) -> Result> { let path = format!("/api/v4/projects/{gitlab_project_id}/issues/{iid}/resource_milestone_events"); self.fetch_all_pages(&path).await } pub async fn fetch_mr_state_events( &self, gitlab_project_id: i64, iid: i64, ) -> Result> { let path = format!( "/api/v4/projects/{gitlab_project_id}/merge_requests/{iid}/resource_state_events" ); self.fetch_all_pages(&path).await } pub async fn fetch_mr_label_events( &self, gitlab_project_id: i64, iid: i64, ) -> Result> { let path = format!( "/api/v4/projects/{gitlab_project_id}/merge_requests/{iid}/resource_label_events" ); self.fetch_all_pages(&path).await } pub async fn fetch_mr_milestone_events( &self, gitlab_project_id: i64, iid: i64, ) -> Result> { let path = format!( "/api/v4/projects/{gitlab_project_id}/merge_requests/{iid}/resource_milestone_events" ); self.fetch_all_pages(&path).await } pub async fn fetch_all_resource_events( &self, gitlab_project_id: i64, entity_type: &str, iid: i64, ) -> Result<( Vec, Vec, Vec, )> { let (state_res, label_res, milestone_res) = match entity_type { "issue" => { tokio::join!( self.fetch_issue_state_events(gitlab_project_id, iid), self.fetch_issue_label_events(gitlab_project_id, iid), self.fetch_issue_milestone_events(gitlab_project_id, iid), ) } "merge_request" => { tokio::join!( self.fetch_mr_state_events(gitlab_project_id, iid), self.fetch_mr_label_events(gitlab_project_id, iid), self.fetch_mr_milestone_events(gitlab_project_id, iid), ) } _ => { return Err(LoreError::Other(format!( "Invalid entity type for resource events: {entity_type}" ))); } }; let state = coalesce_not_found(state_res)?; let label = coalesce_not_found(label_res)?; let milestone = coalesce_not_found(milestone_res)?; Ok((state, label, milestone)) } } #[derive(Debug)] pub struct MergeRequestPage { pub items: Vec, pub next_page: Option, pub is_last_page: bool, } fn parse_link_header_next(headers: &HeaderMap) -> Option { headers .get("link") .and_then(|v| v.to_str().ok()) .and_then(|link_str| { for part in link_str.split(',') { let part = part.trim(); if (part.contains("rel=\"next\"") || part.contains("rel=next")) && let Some(start) = part.find('<') && let Some(end) = part.find('>') { return Some(part[start + 1..end].to_string()); } } None }) } fn coalesce_not_found(result: Result>) -> Result> { match result { Ok(v) => Ok(v), Err(LoreError::GitLabNotFound { .. }) => Ok(Vec::new()), Err(e) => Err(e), } } fn ms_to_iso8601(ms: i64) -> Option { DateTime::::from_timestamp_millis(ms) .map(|dt| dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string()) } #[cfg(test)] #[path = "client_tests.rs"] mod client_tests; #[cfg(test)] mod tests { use super::*; #[test] fn ms_to_iso8601_converts_correctly() { let result = ms_to_iso8601(1705312800000); assert_eq!(result, Some("2024-01-15T10:00:00.000Z".to_string())); } #[test] fn ms_to_iso8601_handles_zero() { let result = ms_to_iso8601(0); assert_eq!(result, Some("1970-01-01T00:00:00.000Z".to_string())); } #[test] fn cursor_rewind_clamps_to_zero() { let updated_after = 1000i64; let cursor_rewind_seconds = 10u32; let rewind_ms = i64::from(cursor_rewind_seconds) * 1000; let rewound = (updated_after - rewind_ms).max(0); assert_eq!(rewound, 0); } #[test] fn cursor_rewind_applies_correctly() { let updated_after = 1705312800000i64; let cursor_rewind_seconds = 60u32; let rewind_ms = i64::from(cursor_rewind_seconds) * 1000; let rewound = (updated_after - rewind_ms).max(0); assert_eq!(rewound, 1705312740000); } #[test] fn parse_link_header_extracts_next_url() { let mut headers = HeaderMap::new(); headers.insert( "link", HeaderValue::from_static( r#"; rel="next", ; rel="last""#, ), ); let result = parse_link_header_next(&headers); assert_eq!( result, Some("https://gitlab.example.com/api/v4/projects/1/merge_requests?page=2".to_string()) ); } #[test] fn parse_link_header_handles_unquoted_rel() { let mut headers = HeaderMap::new(); headers.insert( "link", HeaderValue::from_static(r#"; rel=next"#), ); let result = parse_link_header_next(&headers); assert_eq!(result, Some("https://example.com/next".to_string())); } #[test] fn parse_link_header_returns_none_when_no_next() { let mut headers = HeaderMap::new(); headers.insert( "link", HeaderValue::from_static(r#"; rel="last""#), ); let result = parse_link_header_next(&headers); assert!(result.is_none()); } #[test] fn parse_link_header_returns_none_when_missing() { let headers = HeaderMap::new(); let result = parse_link_header_next(&headers); assert!(result.is_none()); } }