diff --git a/src/gitlab/client.rs b/src/gitlab/client.rs index 80995f4..a56b37c 100644 --- a/src/gitlab/client.rs +++ b/src/gitlab/client.rs @@ -12,7 +12,9 @@ use tokio::sync::Mutex; use tokio::time::sleep; use tracing::debug; -use super::types::{GitLabDiscussion, GitLabIssue, GitLabProject, GitLabUser, GitLabVersion}; +use super::types::{ + GitLabDiscussion, GitLabIssue, GitLabMergeRequest, GitLabProject, GitLabUser, GitLabVersion, +}; use crate::core::error::{GiError, Result}; /// Simple rate limiter with jitter to prevent thundering herd. @@ -53,10 +55,12 @@ fn rand_jitter() -> u64 { let mut hasher = state.build_hasher(); // Hash the address of the state (random per call) + current time nanos for more entropy hasher.write_usize(&state as *const _ as usize); - hasher.write_u128(std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_nanos()); + hasher.write_u128( + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos(), + ); hasher.finish() % 50 } @@ -305,6 +309,182 @@ impl GitLabClient { }) } + /// Paginate through merge requests for a project. + /// + /// Returns an async stream of merge requests, handling pagination automatically. + /// MRs are ordered by updated_at ascending to support cursor-based sync. + /// + /// # Arguments + /// * `gitlab_project_id` - The GitLab project ID + /// * `updated_after` - Optional cursor (ms epoch) - only fetch MRs updated after this + /// * `cursor_rewind_seconds` - Rewind cursor by this many seconds to handle edge cases + pub fn paginate_merge_requests( + &self, + gitlab_project_id: i64, + updated_after: Option, + cursor_rewind_seconds: u32, + ) -> Pin> + Send + '_>> { + Box::pin(stream! { + let mut page = 1u32; + let per_page = 100u32; + + loop { + let page_result = self + .fetch_merge_requests_page( + gitlab_project_id, + updated_after, + cursor_rewind_seconds, + page, + per_page, + ) + .await; + + match page_result { + Ok(mr_page) => { + for mr in mr_page.items { + yield Ok(mr); + } + + if mr_page.is_last_page { + break; + } + + match mr_page.next_page { + Some(np) => page = np, + None => break, + } + } + Err(e) => { + yield Err(e); + break; + } + } + } + }) + } + + /// Fetch a single page of merge requests with pagination metadata. + pub async fn fetch_merge_requests_page( + &self, + gitlab_project_id: i64, + updated_after: Option, + cursor_rewind_seconds: u32, + page: u32, + per_page: u32, + ) -> Result { + // Apply cursor rewind, clamping to 0 + let rewound_cursor = updated_after.map(|ts| { + let rewind_ms = (cursor_rewind_seconds as i64) * 1000; + (ts - rewind_ms).max(0) + }); + + let mut params = vec![ + ("scope", "all".to_string()), + ("state", "all".to_string()), + ("order_by", "updated_at".to_string()), + ("sort", "asc".to_string()), + ("per_page", per_page.to_string()), + ("page", page.to_string()), + ]; + + // Add updated_after if we have a cursor + if let Some(ts_ms) = rewound_cursor + && let Some(iso) = ms_to_iso8601(ts_ms) + { + params.push(("updated_after", iso)); + } + + let path = format!("/api/v4/projects/{}/merge_requests", gitlab_project_id); + let (items, headers) = self + .request_with_headers::>(&path, ¶ms) + .await?; + + // Pagination fallback chain: Link header > x-next-page > full-page heuristic + let link_next = parse_link_header_next(&headers); + let x_next_page = headers + .get("x-next-page") + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()); + let full_page = items.len() as u32 == per_page; + + let (next_page, is_last_page) = match (link_next.is_some(), x_next_page, full_page) { + (true, _, _) => (Some(page + 1), false), // Link header present: continue + (false, Some(np), _) => (Some(np), false), // x-next-page present: use it + (false, None, true) => (Some(page + 1), false), // Full page, no headers: try next + (false, None, false) => (None, true), // Partial page: we're done + }; + + Ok(MergeRequestPage { + items, + next_page, + is_last_page, + }) + } + + /// Paginate through discussions for a merge request. + /// + /// Returns an async stream of discussions, handling pagination automatically. + pub fn paginate_mr_discussions( + &self, + gitlab_project_id: i64, + mr_iid: i64, + ) -> Pin> + Send + '_>> { + Box::pin(stream! { + let mut page = 1u32; + let per_page = 100u32; + + loop { + let params = vec![ + ("per_page", per_page.to_string()), + ("page", page.to_string()), + ]; + + let path = format!( + "/api/v4/projects/{}/merge_requests/{}/discussions", + gitlab_project_id, mr_iid + ); + let result = self.request_with_headers::>(&path, ¶ms).await; + + match result { + Ok((discussions, headers)) => { + let is_empty = discussions.is_empty(); + let full_page = discussions.len() as u32 == per_page; + + for discussion in discussions { + yield Ok(discussion); + } + + // Pagination fallback chain: Link header > x-next-page > full-page heuristic + let link_next = parse_link_header_next(&headers); + let x_next_page = headers + .get("x-next-page") + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()); + + let should_continue = match (link_next.is_some(), x_next_page, full_page) { + (true, _, _) => true, // Link header present: continue + (false, Some(np), _) if np > page => { + page = np; + true + } + (false, None, true) => true, // Full page, no headers: try next + _ => false, // Otherwise we're done + }; + + if !should_continue || is_empty { + break; + } + page += 1; + } + Err(e) => { + yield Err(e); + break; + } + } + } + }) + } + /// Make an authenticated API request with query parameters, returning headers. async fn request_with_headers( &self, @@ -335,6 +515,55 @@ impl GitLabClient { } } +/// Fetch all discussions for an MR (collects paginated results). +/// This is useful for parallel prefetching where we want all data upfront. +impl GitLabClient { + pub async fn fetch_all_mr_discussions( + &self, + gitlab_project_id: i64, + mr_iid: i64, + ) -> Result> { + use futures::StreamExt; + + let mut discussions = Vec::new(); + let mut stream = self.paginate_mr_discussions(gitlab_project_id, mr_iid); + + while let Some(result) = stream.next().await { + discussions.push(result?); + } + + Ok(discussions) + } +} + +/// Page result for merge request pagination. +#[derive(Debug)] +pub struct MergeRequestPage { + pub items: Vec, + pub next_page: Option, + pub is_last_page: bool, +} + +/// Parse Link header to extract rel="next" URL (RFC 8288). +fn parse_link_header_next(headers: &HeaderMap) -> Option { + headers + .get("link") + .and_then(|v| v.to_str().ok()) + .and_then(|link_str| { + // Format: ; rel="next", ; rel="last" + for part in link_str.split(',') { + let part = part.trim(); + if (part.contains("rel=\"next\"") || part.contains("rel=next")) + && let Some(start) = part.find('<') + && let Some(end) = part.find('>') + { + return Some(part[start + 1..end].to_string()); + } + } + None + }) +} + /// Convert milliseconds since epoch to ISO 8601 string. fn ms_to_iso8601(ms: i64) -> Option { DateTime::::from_timestamp_millis(ms) @@ -381,4 +610,52 @@ mod tests { // Should be 1 minute earlier assert_eq!(rewound, 1705312740000); } + + #[test] + fn parse_link_header_extracts_next_url() { + let mut headers = HeaderMap::new(); + headers.insert( + "link", + HeaderValue::from_static( + r#"; rel="next", ; rel="last""#, + ), + ); + + let result = parse_link_header_next(&headers); + assert_eq!( + result, + Some("https://gitlab.example.com/api/v4/projects/1/merge_requests?page=2".to_string()) + ); + } + + #[test] + fn parse_link_header_handles_unquoted_rel() { + let mut headers = HeaderMap::new(); + headers.insert( + "link", + HeaderValue::from_static(r#"; rel=next"#), + ); + + let result = parse_link_header_next(&headers); + assert_eq!(result, Some("https://example.com/next".to_string())); + } + + #[test] + fn parse_link_header_returns_none_when_no_next() { + let mut headers = HeaderMap::new(); + headers.insert( + "link", + HeaderValue::from_static(r#"; rel="last""#), + ); + + let result = parse_link_header_next(&headers); + assert!(result.is_none()); + } + + #[test] + fn parse_link_header_returns_none_when_missing() { + let headers = HeaderMap::new(); + let result = parse_link_header_next(&headers); + assert!(result.is_none()); + } }