feat(gitlab): Add MR closes_issues API endpoint and GitLabIssueRef type
Extends the GitLab client to fetch the list of issues that an MR will close when merged, using the /projects/:id/merge_requests/:iid/closes_issues endpoint. New type: - GitLabIssueRef: Lightweight issue reference with id, iid, project_id, title, state, and web_url. Used for the closes_issues response which returns a list of issue summaries rather than full GitLabIssue objects. New client method: - fetch_mr_closes_issues(gitlab_project_id, iid): Returns Vec<GitLabIssueRef> for all issues that the MR's description/commits indicate will be closed. This enables building the entity_references table from API data in addition to parsing system notes, providing more reliable cross-reference discovery. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -1,5 +1,3 @@
|
||||
//! GitLab API client with rate limiting and error handling.
|
||||
|
||||
use async_stream::stream;
|
||||
use chrono::{DateTime, Utc};
|
||||
use futures::Stream;
|
||||
@@ -13,12 +11,11 @@ use tokio::time::sleep;
|
||||
use tracing::debug;
|
||||
|
||||
use super::types::{
|
||||
GitLabDiscussion, GitLabIssue, GitLabLabelEvent, GitLabMergeRequest, GitLabMilestoneEvent,
|
||||
GitLabProject, GitLabStateEvent, GitLabUser, GitLabVersion,
|
||||
GitLabDiscussion, GitLabIssue, GitLabIssueRef, GitLabLabelEvent, GitLabMergeRequest,
|
||||
GitLabMilestoneEvent, GitLabProject, GitLabStateEvent, GitLabUser, GitLabVersion,
|
||||
};
|
||||
use crate::core::error::{LoreError, Result};
|
||||
|
||||
/// Simple rate limiter with jitter to prevent thundering herd.
|
||||
struct RateLimiter {
|
||||
last_request: Instant,
|
||||
min_interval: Duration,
|
||||
@@ -26,35 +23,28 @@ struct RateLimiter {
|
||||
|
||||
impl RateLimiter {
|
||||
fn new(requests_per_second: f64) -> Self {
|
||||
// Floor at 0.1 rps to prevent division-by-zero panic in Duration::from_secs_f64
|
||||
let rps = requests_per_second.max(0.1);
|
||||
Self {
|
||||
last_request: Instant::now() - Duration::from_secs(1), // Allow immediate first request
|
||||
last_request: Instant::now() - Duration::from_secs(1),
|
||||
min_interval: Duration::from_secs_f64(1.0 / rps),
|
||||
}
|
||||
}
|
||||
|
||||
/// Compute how long to wait and update last_request to the expected
|
||||
/// request time (now, or now + delay). The caller sleeps *after*
|
||||
/// releasing the mutex guard.
|
||||
fn check_delay(&mut self) -> Option<Duration> {
|
||||
let elapsed = self.last_request.elapsed();
|
||||
|
||||
if elapsed < self.min_interval {
|
||||
let jitter = Duration::from_millis(rand_jitter());
|
||||
let delay = self.min_interval - elapsed + jitter;
|
||||
// Set last_request to when the request will actually fire
|
||||
self.last_request = Instant::now() + delay;
|
||||
Some(delay)
|
||||
} else {
|
||||
// No delay needed; request fires immediately
|
||||
self.last_request = Instant::now();
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate random jitter between 0-50ms using a lightweight atomic counter.
|
||||
fn rand_jitter() -> u64 {
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
static COUNTER: AtomicU64 = AtomicU64::new(0);
|
||||
@@ -66,10 +56,6 @@ fn rand_jitter() -> u64 {
|
||||
(n ^ nanos) % 50
|
||||
}
|
||||
|
||||
/// GitLab API client with rate limiting.
|
||||
///
|
||||
/// Cloning shares the underlying HTTP client and rate limiter,
|
||||
/// making it cheap and safe for concurrent use across projects.
|
||||
#[derive(Clone)]
|
||||
pub struct GitLabClient {
|
||||
client: Client,
|
||||
@@ -79,7 +65,6 @@ pub struct GitLabClient {
|
||||
}
|
||||
|
||||
impl GitLabClient {
|
||||
/// Create a new GitLab client.
|
||||
pub fn new(base_url: &str, token: &str, requests_per_second: Option<f64>) -> Self {
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
|
||||
@@ -100,26 +85,21 @@ impl GitLabClient {
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the currently authenticated user.
|
||||
pub async fn get_current_user(&self) -> Result<GitLabUser> {
|
||||
self.request("/api/v4/user").await
|
||||
}
|
||||
|
||||
/// Get a project by its path.
|
||||
pub async fn get_project(&self, path_with_namespace: &str) -> Result<GitLabProject> {
|
||||
let encoded = urlencoding::encode(path_with_namespace);
|
||||
self.request(&format!("/api/v4/projects/{encoded}")).await
|
||||
}
|
||||
|
||||
/// Get GitLab server version.
|
||||
pub async fn get_version(&self) -> Result<GitLabVersion> {
|
||||
self.request("/api/v4/version").await
|
||||
}
|
||||
|
||||
/// Maximum number of retries on 429 Too Many Requests.
|
||||
const MAX_RETRIES: u32 = 3;
|
||||
|
||||
/// Make an authenticated API request with automatic 429 retry.
|
||||
async fn request<T: serde::de::DeserializeOwned>(&self, path: &str) -> Result<T> {
|
||||
let url = format!("{}{}", self.base_url, path);
|
||||
let mut last_response = None;
|
||||
@@ -160,14 +140,10 @@ impl GitLabClient {
|
||||
break;
|
||||
}
|
||||
|
||||
// Safety: the loop always executes at least once (0..=MAX_RETRIES)
|
||||
// and either sets last_response+break, or continues (only when
|
||||
// attempt < MAX_RETRIES). The final iteration always reaches break.
|
||||
self.handle_response(last_response.expect("retry loop ran at least once"), path)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Parse retry-after header from a 429 response, defaulting to 60s.
|
||||
fn parse_retry_after(response: &Response) -> u64 {
|
||||
response
|
||||
.headers()
|
||||
@@ -177,7 +153,6 @@ impl GitLabClient {
|
||||
.unwrap_or(60)
|
||||
}
|
||||
|
||||
/// Handle API response, converting errors appropriately.
|
||||
async fn handle_response<T: serde::de::DeserializeOwned>(
|
||||
&self,
|
||||
response: Response,
|
||||
@@ -217,15 +192,6 @@ impl GitLabClient {
|
||||
}
|
||||
}
|
||||
|
||||
/// Paginate through issues for a project.
|
||||
///
|
||||
/// Returns an async stream of issues, handling pagination automatically.
|
||||
/// Issues are ordered by updated_at ascending to support cursor-based sync.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `gitlab_project_id` - The GitLab project ID
|
||||
/// * `updated_after` - Optional cursor (ms epoch) - only fetch issues updated after this
|
||||
/// * `cursor_rewind_seconds` - Rewind cursor by this many seconds to handle edge cases
|
||||
pub fn paginate_issues(
|
||||
&self,
|
||||
gitlab_project_id: i64,
|
||||
@@ -236,7 +202,6 @@ impl GitLabClient {
|
||||
let mut page = 1u32;
|
||||
let per_page = 100u32;
|
||||
|
||||
// Apply cursor rewind, clamping to 0
|
||||
let rewound_cursor = updated_after.map(|ts| {
|
||||
let rewind_ms = (cursor_rewind_seconds as i64) * 1000;
|
||||
(ts - rewind_ms).max(0)
|
||||
@@ -252,7 +217,6 @@ impl GitLabClient {
|
||||
("page", page.to_string()),
|
||||
];
|
||||
|
||||
// Add updated_after if we have a cursor
|
||||
if let Some(ts_ms) = rewound_cursor
|
||||
&& let Some(iso) = ms_to_iso8601(ts_ms)
|
||||
{
|
||||
@@ -267,12 +231,10 @@ impl GitLabClient {
|
||||
let is_empty = issues.is_empty();
|
||||
let full_page = issues.len() as u32 == per_page;
|
||||
|
||||
// Yield each issue
|
||||
for issue in issues {
|
||||
yield Ok(issue);
|
||||
}
|
||||
|
||||
// Check for next page
|
||||
let next_page = headers
|
||||
.get("x-next-page")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
@@ -286,7 +248,6 @@ impl GitLabClient {
|
||||
if is_empty || !full_page {
|
||||
break;
|
||||
}
|
||||
// Full page but no x-next-page header: try next page heuristically
|
||||
page += 1;
|
||||
}
|
||||
}
|
||||
@@ -300,9 +261,6 @@ impl GitLabClient {
|
||||
})
|
||||
}
|
||||
|
||||
/// Paginate through discussions for an issue.
|
||||
///
|
||||
/// Returns an async stream of discussions, handling pagination automatically.
|
||||
pub fn paginate_issue_discussions(
|
||||
&self,
|
||||
gitlab_project_id: i64,
|
||||
@@ -346,7 +304,6 @@ impl GitLabClient {
|
||||
if is_empty || !full_page {
|
||||
break;
|
||||
}
|
||||
// Full page but no x-next-page header: try next page heuristically
|
||||
page += 1;
|
||||
}
|
||||
}
|
||||
@@ -360,15 +317,6 @@ impl GitLabClient {
|
||||
})
|
||||
}
|
||||
|
||||
/// Paginate through merge requests for a project.
|
||||
///
|
||||
/// Returns an async stream of merge requests, handling pagination automatically.
|
||||
/// MRs are ordered by updated_at ascending to support cursor-based sync.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `gitlab_project_id` - The GitLab project ID
|
||||
/// * `updated_after` - Optional cursor (ms epoch) - only fetch MRs updated after this
|
||||
/// * `cursor_rewind_seconds` - Rewind cursor by this many seconds to handle edge cases
|
||||
pub fn paginate_merge_requests(
|
||||
&self,
|
||||
gitlab_project_id: i64,
|
||||
@@ -414,7 +362,6 @@ impl GitLabClient {
|
||||
})
|
||||
}
|
||||
|
||||
/// Fetch a single page of merge requests with pagination metadata.
|
||||
pub async fn fetch_merge_requests_page(
|
||||
&self,
|
||||
gitlab_project_id: i64,
|
||||
@@ -423,7 +370,6 @@ impl GitLabClient {
|
||||
page: u32,
|
||||
per_page: u32,
|
||||
) -> Result<MergeRequestPage> {
|
||||
// Apply cursor rewind, clamping to 0
|
||||
let rewound_cursor = updated_after.map(|ts| {
|
||||
let rewind_ms = (cursor_rewind_seconds as i64) * 1000;
|
||||
(ts - rewind_ms).max(0)
|
||||
@@ -438,7 +384,6 @@ impl GitLabClient {
|
||||
("page", page.to_string()),
|
||||
];
|
||||
|
||||
// Add updated_after if we have a cursor
|
||||
if let Some(ts_ms) = rewound_cursor
|
||||
&& let Some(iso) = ms_to_iso8601(ts_ms)
|
||||
{
|
||||
@@ -450,7 +395,6 @@ impl GitLabClient {
|
||||
.request_with_headers::<Vec<GitLabMergeRequest>>(&path, ¶ms)
|
||||
.await?;
|
||||
|
||||
// Pagination fallback chain: Link header > x-next-page > full-page heuristic
|
||||
let link_next = parse_link_header_next(&headers);
|
||||
let x_next_page = headers
|
||||
.get("x-next-page")
|
||||
@@ -459,10 +403,10 @@ impl GitLabClient {
|
||||
let full_page = items.len() as u32 == per_page;
|
||||
|
||||
let (next_page, is_last_page) = match (link_next.is_some(), x_next_page, full_page) {
|
||||
(true, _, _) => (Some(page + 1), false), // Link header present: continue
|
||||
(false, Some(np), _) => (Some(np), false), // x-next-page present: use it
|
||||
(false, None, true) => (Some(page + 1), false), // Full page, no headers: try next
|
||||
(false, None, false) => (None, true), // Partial page: we're done
|
||||
(true, _, _) => (Some(page + 1), false),
|
||||
(false, Some(np), _) => (Some(np), false),
|
||||
(false, None, true) => (Some(page + 1), false),
|
||||
(false, None, false) => (None, true),
|
||||
};
|
||||
|
||||
Ok(MergeRequestPage {
|
||||
@@ -472,9 +416,6 @@ impl GitLabClient {
|
||||
})
|
||||
}
|
||||
|
||||
/// Paginate through discussions for a merge request.
|
||||
///
|
||||
/// Returns an async stream of discussions, handling pagination automatically.
|
||||
pub fn paginate_mr_discussions(
|
||||
&self,
|
||||
gitlab_project_id: i64,
|
||||
@@ -505,7 +446,6 @@ impl GitLabClient {
|
||||
yield Ok(discussion);
|
||||
}
|
||||
|
||||
// Pagination fallback chain: Link header > x-next-page > full-page heuristic
|
||||
let link_next = parse_link_header_next(&headers);
|
||||
let x_next_page = headers
|
||||
.get("x-next-page")
|
||||
@@ -514,18 +454,18 @@ impl GitLabClient {
|
||||
|
||||
let should_continue = match (link_next.is_some(), x_next_page, full_page) {
|
||||
(true, _, _) => {
|
||||
page += 1; // Link header present: continue to next
|
||||
page += 1;
|
||||
true
|
||||
}
|
||||
(false, Some(np), _) if np > page => {
|
||||
page = np; // x-next-page tells us exactly which page
|
||||
page = np;
|
||||
true
|
||||
}
|
||||
(false, None, true) => {
|
||||
page += 1; // Full page, no headers: try next
|
||||
page += 1;
|
||||
true
|
||||
}
|
||||
_ => false, // Otherwise we're done
|
||||
_ => false,
|
||||
};
|
||||
|
||||
if !should_continue || is_empty {
|
||||
@@ -541,8 +481,6 @@ impl GitLabClient {
|
||||
})
|
||||
}
|
||||
|
||||
/// Make an authenticated API request with query parameters, returning headers.
|
||||
/// Automatically retries on 429 Too Many Requests.
|
||||
async fn request_with_headers<T: serde::de::DeserializeOwned>(
|
||||
&self,
|
||||
path: &str,
|
||||
@@ -595,8 +533,6 @@ impl GitLabClient {
|
||||
}
|
||||
}
|
||||
|
||||
/// Fetch all discussions for an MR (collects paginated results).
|
||||
/// This is useful for parallel prefetching where we want all data upfront.
|
||||
impl GitLabClient {
|
||||
pub async fn fetch_all_mr_discussions(
|
||||
&self,
|
||||
@@ -616,12 +552,7 @@ impl GitLabClient {
|
||||
}
|
||||
}
|
||||
|
||||
/// Resource events API methods.
|
||||
///
|
||||
/// These endpoints return per-entity events (not project-wide), so they collect
|
||||
/// all pages into a Vec rather than using streaming.
|
||||
impl GitLabClient {
|
||||
/// Fetch all pages from a paginated endpoint, returning collected results.
|
||||
async fn fetch_all_pages<T: serde::de::DeserializeOwned>(&self, path: &str) -> Result<Vec<T>> {
|
||||
let mut results = Vec::new();
|
||||
let mut page = 1u32;
|
||||
@@ -658,7 +589,16 @@ impl GitLabClient {
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
/// Fetch state events for an issue.
|
||||
pub async fn fetch_mr_closes_issues(
|
||||
&self,
|
||||
gitlab_project_id: i64,
|
||||
iid: i64,
|
||||
) -> Result<Vec<GitLabIssueRef>> {
|
||||
let path =
|
||||
format!("/api/v4/projects/{gitlab_project_id}/merge_requests/{iid}/closes_issues");
|
||||
self.fetch_all_pages(&path).await
|
||||
}
|
||||
|
||||
pub async fn fetch_issue_state_events(
|
||||
&self,
|
||||
gitlab_project_id: i64,
|
||||
@@ -669,7 +609,6 @@ impl GitLabClient {
|
||||
self.fetch_all_pages(&path).await
|
||||
}
|
||||
|
||||
/// Fetch label events for an issue.
|
||||
pub async fn fetch_issue_label_events(
|
||||
&self,
|
||||
gitlab_project_id: i64,
|
||||
@@ -680,7 +619,6 @@ impl GitLabClient {
|
||||
self.fetch_all_pages(&path).await
|
||||
}
|
||||
|
||||
/// Fetch milestone events for an issue.
|
||||
pub async fn fetch_issue_milestone_events(
|
||||
&self,
|
||||
gitlab_project_id: i64,
|
||||
@@ -691,7 +629,6 @@ impl GitLabClient {
|
||||
self.fetch_all_pages(&path).await
|
||||
}
|
||||
|
||||
/// Fetch state events for a merge request.
|
||||
pub async fn fetch_mr_state_events(
|
||||
&self,
|
||||
gitlab_project_id: i64,
|
||||
@@ -703,7 +640,6 @@ impl GitLabClient {
|
||||
self.fetch_all_pages(&path).await
|
||||
}
|
||||
|
||||
/// Fetch label events for a merge request.
|
||||
pub async fn fetch_mr_label_events(
|
||||
&self,
|
||||
gitlab_project_id: i64,
|
||||
@@ -715,7 +651,6 @@ impl GitLabClient {
|
||||
self.fetch_all_pages(&path).await
|
||||
}
|
||||
|
||||
/// Fetch milestone events for a merge request.
|
||||
pub async fn fetch_mr_milestone_events(
|
||||
&self,
|
||||
gitlab_project_id: i64,
|
||||
@@ -727,12 +662,6 @@ impl GitLabClient {
|
||||
self.fetch_all_pages(&path).await
|
||||
}
|
||||
|
||||
/// Fetch all three event types for an entity concurrently.
|
||||
///
|
||||
/// Uses `tokio::join!` instead of `try_join!` so that a 404 on one event
|
||||
/// type (e.g., labels) doesn't discard successfully-fetched data from the
|
||||
/// others (e.g., state events). 404s are treated as "no events" (empty vec);
|
||||
/// all other errors (including 403) are propagated for retry.
|
||||
pub async fn fetch_all_resource_events(
|
||||
&self,
|
||||
gitlab_project_id: i64,
|
||||
@@ -765,8 +694,6 @@ impl GitLabClient {
|
||||
}
|
||||
};
|
||||
|
||||
// Treat 404 as "endpoint not available for this entity" → empty vec.
|
||||
// All other errors (403, network, etc.) propagate for retry handling.
|
||||
let state = coalesce_not_found(state_res)?;
|
||||
let label = coalesce_not_found(label_res)?;
|
||||
let milestone = coalesce_not_found(milestone_res)?;
|
||||
@@ -775,7 +702,6 @@ impl GitLabClient {
|
||||
}
|
||||
}
|
||||
|
||||
/// Page result for merge request pagination.
|
||||
#[derive(Debug)]
|
||||
pub struct MergeRequestPage {
|
||||
pub items: Vec<GitLabMergeRequest>,
|
||||
@@ -783,13 +709,11 @@ pub struct MergeRequestPage {
|
||||
pub is_last_page: bool,
|
||||
}
|
||||
|
||||
/// Parse Link header to extract rel="next" URL (RFC 8288).
|
||||
fn parse_link_header_next(headers: &HeaderMap) -> Option<String> {
|
||||
headers
|
||||
.get("link")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.and_then(|link_str| {
|
||||
// Format: <url>; rel="next", <url>; rel="last"
|
||||
for part in link_str.split(',') {
|
||||
let part = part.trim();
|
||||
if (part.contains("rel=\"next\"") || part.contains("rel=next"))
|
||||
@@ -803,11 +727,6 @@ fn parse_link_header_next(headers: &HeaderMap) -> Option<String> {
|
||||
})
|
||||
}
|
||||
|
||||
/// Convert a resource-event fetch result: 404 → empty vec, other errors propagated.
|
||||
///
|
||||
/// 404 means the endpoint doesn't exist for this entity type — truly permanent.
|
||||
/// 403 and other errors are NOT coalesced: they may be environmental (VPN, token
|
||||
/// rotation) and should be retried via the drain loop's backoff mechanism.
|
||||
fn coalesce_not_found<T>(result: Result<Vec<T>>) -> Result<Vec<T>> {
|
||||
match result {
|
||||
Ok(v) => Ok(v),
|
||||
@@ -816,7 +735,6 @@ fn coalesce_not_found<T>(result: Result<Vec<T>>) -> Result<Vec<T>> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert milliseconds since epoch to ISO 8601 string.
|
||||
fn ms_to_iso8601(ms: i64) -> Option<String> {
|
||||
DateTime::<Utc>::from_timestamp_millis(ms)
|
||||
.map(|dt| dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string())
|
||||
@@ -828,7 +746,6 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn ms_to_iso8601_converts_correctly() {
|
||||
// 2024-01-15T10:00:00.000Z = 1705312800000 ms
|
||||
let result = ms_to_iso8601(1705312800000);
|
||||
assert_eq!(result, Some("2024-01-15T10:00:00.000Z".to_string()));
|
||||
}
|
||||
@@ -841,10 +758,9 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn cursor_rewind_clamps_to_zero() {
|
||||
let updated_after = 1000i64; // 1 second
|
||||
let cursor_rewind_seconds = 10u32; // 10 seconds
|
||||
let updated_after = 1000i64;
|
||||
let cursor_rewind_seconds = 10u32;
|
||||
|
||||
// Rewind would be negative, should clamp to 0
|
||||
let rewind_ms = i64::from(cursor_rewind_seconds) * 1000;
|
||||
let rewound = (updated_after - rewind_ms).max(0);
|
||||
|
||||
@@ -853,13 +769,12 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn cursor_rewind_applies_correctly() {
|
||||
let updated_after = 1705312800000i64; // 2024-01-15T10:00:00.000Z
|
||||
let cursor_rewind_seconds = 60u32; // 1 minute
|
||||
let updated_after = 1705312800000i64;
|
||||
let cursor_rewind_seconds = 60u32;
|
||||
|
||||
let rewind_ms = i64::from(cursor_rewind_seconds) * 1000;
|
||||
let rewound = (updated_after - rewind_ms).max(0);
|
||||
|
||||
// Should be 1 minute earlier
|
||||
assert_eq!(rewound, 1705312740000);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user