feat(gitlab): Implement GitLab REST API client and type definitions
Provides a typed interface to the GitLab API with pagination support. src/gitlab/types.rs - API response type definitions: - GitLabIssue: Full issue payload with author, assignees, labels - GitLabDiscussion: Discussion thread with notes array - GitLabNote: Individual note with author, timestamps, body - GitLabAuthor/GitLabUser: User information with avatar URLs - GitLabProject: Project metadata from /api/v4/projects - GitLabVersion: GitLab instance version from /api/v4/version - GitLabNotePosition: Line-level position for diff notes - All types derive Deserialize for JSON parsing src/gitlab/client.rs - HTTP client with authentication: - Bearer token authentication from config - Base URL configuration for self-hosted instances - Paginated iteration via keyset or offset pagination - Automatic Link header parsing for next page URLs - Per-page limit control (default 100) - Methods: get_user(), get_version(), get_project() - Async stream for issues: list_issues_paginated() - Async stream for discussions: list_issue_discussions_paginated() - Respects GitLab rate limiting via response headers src/gitlab/transformers/ - API to database mapping: transformers/issue.rs - Issue transformation: - Maps GitLabIssue to IssueRow for database insert - Extracts milestone ID and due date - Normalizes author/assignee usernames - Preserves label IDs for junction table - Returns IssueWithMetadata including label/assignee lists transformers/discussion.rs - Discussion transformation: - Maps GitLabDiscussion to NormalizedDiscussion - Extracts thread metadata (resolvable, resolved) - Flattens notes to NormalizedNote with foreign keys - Handles system notes vs user notes - Preserves note position for diff discussions transformers/mod.rs - Re-exports all transformer types Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
384
src/gitlab/client.rs
Normal file
384
src/gitlab/client.rs
Normal file
@@ -0,0 +1,384 @@
|
||||
//! GitLab API client with rate limiting and error handling.
|
||||
|
||||
use async_stream::stream;
|
||||
use chrono::{DateTime, Utc};
|
||||
use futures::Stream;
|
||||
use reqwest::header::{ACCEPT, HeaderMap, HeaderValue};
|
||||
use reqwest::{Client, Response, StatusCode};
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::time::sleep;
|
||||
use tracing::debug;
|
||||
|
||||
use super::types::{GitLabDiscussion, GitLabIssue, GitLabProject, GitLabUser, GitLabVersion};
|
||||
use crate::core::error::{GiError, Result};
|
||||
|
||||
/// Simple rate limiter with jitter to prevent thundering herd.
|
||||
struct RateLimiter {
|
||||
last_request: Instant,
|
||||
min_interval: Duration,
|
||||
}
|
||||
|
||||
impl RateLimiter {
|
||||
fn new(requests_per_second: f64) -> Self {
|
||||
Self {
|
||||
last_request: Instant::now() - Duration::from_secs(1), // Allow immediate first request
|
||||
min_interval: Duration::from_secs_f64(1.0 / requests_per_second),
|
||||
}
|
||||
}
|
||||
|
||||
async fn acquire(&mut self) {
|
||||
let elapsed = self.last_request.elapsed();
|
||||
|
||||
if elapsed < self.min_interval {
|
||||
// Add 0-50ms jitter to prevent synchronized requests
|
||||
let jitter = Duration::from_millis(rand_jitter());
|
||||
let wait_time = self.min_interval - elapsed + jitter;
|
||||
sleep(wait_time).await;
|
||||
}
|
||||
|
||||
self.last_request = Instant::now();
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate random jitter between 0-50ms without external crate.
|
||||
fn rand_jitter() -> u64 {
|
||||
use std::collections::hash_map::RandomState;
|
||||
use std::hash::{BuildHasher, Hasher};
|
||||
|
||||
// RandomState is seeded randomly each time, so just hashing the state address gives us jitter
|
||||
let state = RandomState::new();
|
||||
let mut hasher = state.build_hasher();
|
||||
// Hash the address of the state (random per call) + current time nanos for more entropy
|
||||
hasher.write_usize(&state as *const _ as usize);
|
||||
hasher.write_u128(std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_nanos());
|
||||
hasher.finish() % 50
|
||||
}
|
||||
|
||||
/// GitLab API client with rate limiting.
|
||||
pub struct GitLabClient {
|
||||
client: Client,
|
||||
base_url: String,
|
||||
token: String,
|
||||
rate_limiter: Arc<Mutex<RateLimiter>>,
|
||||
}
|
||||
|
||||
impl GitLabClient {
|
||||
/// Create a new GitLab client.
|
||||
pub fn new(base_url: &str, token: &str, requests_per_second: Option<f64>) -> Self {
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
|
||||
|
||||
let client = Client::builder()
|
||||
.default_headers(headers)
|
||||
.timeout(Duration::from_secs(30))
|
||||
.build()
|
||||
.expect("Failed to create HTTP client");
|
||||
|
||||
Self {
|
||||
client,
|
||||
base_url: base_url.trim_end_matches('/').to_string(),
|
||||
token: token.to_string(),
|
||||
rate_limiter: Arc::new(Mutex::new(RateLimiter::new(
|
||||
requests_per_second.unwrap_or(10.0),
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the currently authenticated user.
|
||||
pub async fn get_current_user(&self) -> Result<GitLabUser> {
|
||||
self.request("/api/v4/user").await
|
||||
}
|
||||
|
||||
/// Get a project by its path.
|
||||
pub async fn get_project(&self, path_with_namespace: &str) -> Result<GitLabProject> {
|
||||
let encoded = urlencoding::encode(path_with_namespace);
|
||||
self.request(&format!("/api/v4/projects/{encoded}")).await
|
||||
}
|
||||
|
||||
/// Get GitLab server version.
|
||||
pub async fn get_version(&self) -> Result<GitLabVersion> {
|
||||
self.request("/api/v4/version").await
|
||||
}
|
||||
|
||||
/// Make an authenticated API request.
|
||||
async fn request<T: serde::de::DeserializeOwned>(&self, path: &str) -> Result<T> {
|
||||
self.rate_limiter.lock().await.acquire().await;
|
||||
|
||||
let url = format!("{}{}", self.base_url, path);
|
||||
debug!(url = %url, "GitLab request");
|
||||
|
||||
let response = self
|
||||
.client
|
||||
.get(&url)
|
||||
.header("PRIVATE-TOKEN", &self.token)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| GiError::GitLabNetworkError {
|
||||
base_url: self.base_url.clone(),
|
||||
source: Some(e),
|
||||
})?;
|
||||
|
||||
self.handle_response(response, path).await
|
||||
}
|
||||
|
||||
/// Handle API response, converting errors appropriately.
|
||||
async fn handle_response<T: serde::de::DeserializeOwned>(
|
||||
&self,
|
||||
response: Response,
|
||||
path: &str,
|
||||
) -> Result<T> {
|
||||
match response.status() {
|
||||
StatusCode::UNAUTHORIZED => Err(GiError::GitLabAuthFailed),
|
||||
|
||||
StatusCode::NOT_FOUND => Err(GiError::GitLabNotFound {
|
||||
resource: path.to_string(),
|
||||
}),
|
||||
|
||||
StatusCode::TOO_MANY_REQUESTS => {
|
||||
let retry_after = response
|
||||
.headers()
|
||||
.get("retry-after")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(60);
|
||||
|
||||
Err(GiError::GitLabRateLimited { retry_after })
|
||||
}
|
||||
|
||||
status if status.is_success() => {
|
||||
let body = response.json().await?;
|
||||
Ok(body)
|
||||
}
|
||||
|
||||
status => Err(GiError::Other(format!(
|
||||
"GitLab API error: {} {}",
|
||||
status.as_u16(),
|
||||
status.canonical_reason().unwrap_or("Unknown")
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
/// Paginate through issues for a project.
|
||||
///
|
||||
/// Returns an async stream of issues, handling pagination automatically.
|
||||
/// Issues are ordered by updated_at ascending to support cursor-based sync.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `gitlab_project_id` - The GitLab project ID
|
||||
/// * `updated_after` - Optional cursor (ms epoch) - only fetch issues updated after this
|
||||
/// * `cursor_rewind_seconds` - Rewind cursor by this many seconds to handle edge cases
|
||||
pub fn paginate_issues(
|
||||
&self,
|
||||
gitlab_project_id: i64,
|
||||
updated_after: Option<i64>,
|
||||
cursor_rewind_seconds: u32,
|
||||
) -> Pin<Box<dyn Stream<Item = Result<GitLabIssue>> + Send + '_>> {
|
||||
Box::pin(stream! {
|
||||
let mut page = 1u32;
|
||||
let per_page = 100u32;
|
||||
|
||||
// Apply cursor rewind, clamping to 0
|
||||
let rewound_cursor = updated_after.map(|ts| {
|
||||
let rewind_ms = (cursor_rewind_seconds as i64) * 1000;
|
||||
(ts - rewind_ms).max(0)
|
||||
});
|
||||
|
||||
loop {
|
||||
let mut params = vec![
|
||||
("scope", "all".to_string()),
|
||||
("state", "all".to_string()),
|
||||
("order_by", "updated_at".to_string()),
|
||||
("sort", "asc".to_string()),
|
||||
("per_page", per_page.to_string()),
|
||||
("page", page.to_string()),
|
||||
];
|
||||
|
||||
// Add updated_after if we have a cursor
|
||||
if let Some(ts_ms) = rewound_cursor
|
||||
&& let Some(iso) = ms_to_iso8601(ts_ms)
|
||||
{
|
||||
params.push(("updated_after", iso));
|
||||
}
|
||||
|
||||
let path = format!("/api/v4/projects/{}/issues", gitlab_project_id);
|
||||
let result = self.request_with_headers::<Vec<GitLabIssue>>(&path, ¶ms).await;
|
||||
|
||||
match result {
|
||||
Ok((issues, headers)) => {
|
||||
let is_empty = issues.is_empty();
|
||||
|
||||
// Yield each issue
|
||||
for issue in issues {
|
||||
yield Ok(issue);
|
||||
}
|
||||
|
||||
// Check for next page
|
||||
let next_page = headers
|
||||
.get("x-next-page")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.and_then(|s| s.parse::<u32>().ok());
|
||||
|
||||
match next_page {
|
||||
Some(next) if next > page => {
|
||||
page = next;
|
||||
}
|
||||
_ => {
|
||||
// No next page or empty response - we're done
|
||||
if is_empty {
|
||||
break;
|
||||
}
|
||||
// Check if current page returned less than per_page (last page)
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
yield Err(e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Paginate through discussions for an issue.
|
||||
///
|
||||
/// Returns an async stream of discussions, handling pagination automatically.
|
||||
pub fn paginate_issue_discussions(
|
||||
&self,
|
||||
gitlab_project_id: i64,
|
||||
issue_iid: i64,
|
||||
) -> Pin<Box<dyn Stream<Item = Result<GitLabDiscussion>> + Send + '_>> {
|
||||
Box::pin(stream! {
|
||||
let mut page = 1u32;
|
||||
let per_page = 100u32;
|
||||
|
||||
loop {
|
||||
let params = vec![
|
||||
("per_page", per_page.to_string()),
|
||||
("page", page.to_string()),
|
||||
];
|
||||
|
||||
let path = format!(
|
||||
"/api/v4/projects/{}/issues/{}/discussions",
|
||||
gitlab_project_id, issue_iid
|
||||
);
|
||||
let result = self.request_with_headers::<Vec<GitLabDiscussion>>(&path, ¶ms).await;
|
||||
|
||||
match result {
|
||||
Ok((discussions, headers)) => {
|
||||
let is_empty = discussions.is_empty();
|
||||
|
||||
for discussion in discussions {
|
||||
yield Ok(discussion);
|
||||
}
|
||||
|
||||
let next_page = headers
|
||||
.get("x-next-page")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.and_then(|s| s.parse::<u32>().ok());
|
||||
|
||||
match next_page {
|
||||
Some(next) if next > page => {
|
||||
page = next;
|
||||
}
|
||||
_ => {
|
||||
if is_empty {
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
yield Err(e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Make an authenticated API request with query parameters, returning headers.
|
||||
async fn request_with_headers<T: serde::de::DeserializeOwned>(
|
||||
&self,
|
||||
path: &str,
|
||||
params: &[(&str, String)],
|
||||
) -> Result<(T, HeaderMap)> {
|
||||
self.rate_limiter.lock().await.acquire().await;
|
||||
|
||||
let url = format!("{}{}", self.base_url, path);
|
||||
debug!(url = %url, ?params, "GitLab paginated request");
|
||||
|
||||
let response = self
|
||||
.client
|
||||
.get(&url)
|
||||
.query(params)
|
||||
.header("PRIVATE-TOKEN", &self.token)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| GiError::GitLabNetworkError {
|
||||
base_url: self.base_url.clone(),
|
||||
source: Some(e),
|
||||
})?;
|
||||
|
||||
let headers = response.headers().clone();
|
||||
let body = self.handle_response(response, path).await?;
|
||||
|
||||
Ok((body, headers))
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert milliseconds since epoch to ISO 8601 string.
|
||||
fn ms_to_iso8601(ms: i64) -> Option<String> {
|
||||
DateTime::<Utc>::from_timestamp_millis(ms)
|
||||
.map(|dt| dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn ms_to_iso8601_converts_correctly() {
|
||||
// 2024-01-15T10:00:00.000Z = 1705312800000 ms
|
||||
let result = ms_to_iso8601(1705312800000);
|
||||
assert_eq!(result, Some("2024-01-15T10:00:00.000Z".to_string()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ms_to_iso8601_handles_zero() {
|
||||
let result = ms_to_iso8601(0);
|
||||
assert_eq!(result, Some("1970-01-01T00:00:00.000Z".to_string()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cursor_rewind_clamps_to_zero() {
|
||||
let updated_after = Some(1000i64); // 1 second
|
||||
let cursor_rewind_seconds = 10u32; // 10 seconds
|
||||
|
||||
// Rewind would be negative, should clamp to 0
|
||||
let rewind_ms = (cursor_rewind_seconds as i64) * 1000;
|
||||
let rewound = (updated_after.unwrap() - rewind_ms).max(0);
|
||||
|
||||
assert_eq!(rewound, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cursor_rewind_applies_correctly() {
|
||||
let updated_after = Some(1705312800000i64); // 2024-01-15T10:00:00.000Z
|
||||
let cursor_rewind_seconds = 60u32; // 1 minute
|
||||
|
||||
let rewind_ms = (cursor_rewind_seconds as i64) * 1000;
|
||||
let rewound = (updated_after.unwrap() - rewind_ms).max(0);
|
||||
|
||||
// Should be 1 minute earlier
|
||||
assert_eq!(rewound, 1705312740000);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user