perf: Configurable rate limit, 429 auto-retry, concurrent project ingestion

The sync pipeline was bottlenecked at 10 req/s (hardcoded) with
sequential project processing and no retry on rate limiting. These
changes target 3-5x throughput improvement.

Rate limit configuration:
- Add requestsPerSecond to SyncConfig (default 30.0, was hardcoded 10)
- Pass configured rate through to GitLabClient::new from ingest
- Floor rate at 0.1 rps in RateLimiter::new to prevent panic on
  Duration::from_secs_f64(1.0 / 0.0) — now reachable via user config

429 auto-retry:
- Both request() and request_with_headers() retry up to 3 times on
  HTTP 429, respecting the retry-after header (default 60s)
- Extract parse_retry_after helper, reused by handle_response fallback
- After exhausting retries, the 429 error propagates as before
- Improved JSON decode errors now include a response body preview

Concurrent project ingestion:
- Derive Clone on GitLabClient (cheap: shares Arc<Mutex<RateLimiter>>
  and reqwest::Client which is already Arc-backed)
- Restructure project loop to use futures::stream::buffer_unordered
  with primary_concurrency (default 4) as the parallelism bound
- Each project gets its own SQLite connection (WAL mode + busy_timeout
  handles concurrent writes)
- Add show_spinner field to IngestDisplay to separate the per-project
  spinner from the sync-level stage spinner
- Error aggregation defers failures: all successful projects get their
  summaries printed and results counted before returning the first error
- Bump dependentConcurrency default from 2 to 8 for discussion prefetch

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Taylor Eernisse
2026-02-03 17:37:06 -05:00
parent 4ee99c1677
commit f5b4a765b7
3 changed files with 319 additions and 190 deletions

View File

@@ -44,6 +44,19 @@ pub struct IngestResult {
pub resource_events_failed: usize, pub resource_events_failed: usize,
} }
/// Outcome of ingesting a single project, used to aggregate results
/// from concurrent project processing.
enum ProjectIngestOutcome {
Issues {
path: String,
result: IngestProjectResult,
},
Mrs {
path: String,
result: IngestMrProjectResult,
},
}
/// Controls what interactive UI elements `run_ingest` displays. /// Controls what interactive UI elements `run_ingest` displays.
/// ///
/// Separates progress indicators (spinners, bars) from text output (headers, /// Separates progress indicators (spinners, bars) from text output (headers,
@@ -53,6 +66,9 @@ pub struct IngestResult {
pub struct IngestDisplay { pub struct IngestDisplay {
/// Show animated spinners and progress bars. /// Show animated spinners and progress bars.
pub show_progress: bool, pub show_progress: bool,
/// Show the per-project spinner. When called from `sync`, the stage
/// spinner already covers this, so a second spinner causes flashing.
pub show_spinner: bool,
/// Show text headers ("Ingesting...") and per-project summary lines. /// Show text headers ("Ingesting...") and per-project summary lines.
pub show_text: bool, pub show_text: bool,
} }
@@ -62,6 +78,7 @@ impl IngestDisplay {
pub fn interactive() -> Self { pub fn interactive() -> Self {
Self { Self {
show_progress: true, show_progress: true,
show_spinner: true,
show_text: true, show_text: true,
} }
} }
@@ -70,14 +87,17 @@ impl IngestDisplay {
pub fn silent() -> Self { pub fn silent() -> Self {
Self { Self {
show_progress: false, show_progress: false,
show_spinner: false,
show_text: false, show_text: false,
} }
} }
/// Progress only (used by sync in interactive mode). /// Progress bars only, no spinner or text (used by sync which provides its
/// own stage spinner).
pub fn progress_only() -> Self { pub fn progress_only() -> Self {
Self { Self {
show_progress: true, show_progress: true,
show_spinner: false,
show_text: false, show_text: false,
} }
} }
@@ -123,7 +143,11 @@ pub async fn run_ingest(
})?; })?;
// Create GitLab client // Create GitLab client
let client = GitLabClient::new(&config.gitlab.base_url, &token, None); let client = GitLabClient::new(
&config.gitlab.base_url,
&token,
Some(config.sync.requests_per_second),
);
// Get projects to sync // Get projects to sync
let projects = get_projects_to_sync(&conn, &config.projects, project_filter)?; let projects = get_projects_to_sync(&conn, &config.projects, project_filter)?;
@@ -188,158 +212,203 @@ pub async fn run_ingest(
println!(); println!();
} }
// Sync each project // Process projects concurrently. Each project gets its own DB connection
for (local_project_id, gitlab_project_id, path) in &projects { // while sharing the rate limiter through the cloned GitLabClient.
// Show spinner while fetching (only in interactive mode) let concurrency = config.sync.primary_concurrency as usize;
let spinner = if !display.show_progress { let resource_type_owned = resource_type.to_string();
ProgressBar::hidden()
} else {
let s = ProgressBar::new_spinner();
s.set_style(
ProgressStyle::default_spinner()
.template("{spinner:.blue} {msg}")
.unwrap(),
);
s.set_message(format!("Fetching {type_label} from {path}..."));
s.enable_steady_tick(std::time::Duration::from_millis(100));
s
};
// Progress bar for discussion sync (hidden until needed, or always hidden in robot mode) use futures::stream::{self, StreamExt};
let disc_bar = if !display.show_progress {
ProgressBar::hidden()
} else {
let b = ProgressBar::new(0);
b.set_style(
ProgressStyle::default_bar()
.template(
" {spinner:.blue} Syncing discussions [{bar:30.cyan/dim}] {pos}/{len}",
)
.unwrap()
.progress_chars("=> "),
);
b
};
// Create progress callback (no-op in robot mode) let project_results: Vec<Result<ProjectIngestOutcome>> = stream::iter(projects.iter())
let spinner_clone = spinner.clone(); .map(|(local_project_id, gitlab_project_id, path)| {
let disc_bar_clone = disc_bar.clone(); let client = client.clone();
let progress_callback: crate::ingestion::ProgressCallback = if !display.show_progress { let db_path = db_path.clone();
Box::new(|_| {}) let config = config.clone();
} else { let resource_type = resource_type_owned.clone();
Box::new(move |event: ProgressEvent| match event { let path = path.clone();
// Issue events let local_project_id = *local_project_id;
ProgressEvent::DiscussionSyncStarted { total } => { let gitlab_project_id = *gitlab_project_id;
spinner_clone.finish_and_clear();
disc_bar_clone.set_length(total as u64); async move {
disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(100)); let proj_conn = create_connection(&db_path)?;
}
ProgressEvent::DiscussionSynced { current, total: _ } => { let multi = crate::cli::progress::multi();
disc_bar_clone.set_position(current as u64);
} let spinner = if !display.show_spinner {
ProgressEvent::DiscussionSyncComplete => { ProgressBar::hidden()
disc_bar_clone.finish_and_clear(); } else {
} let s = multi.add(ProgressBar::new_spinner());
// MR events s.set_style(
ProgressEvent::MrDiscussionSyncStarted { total } => { ProgressStyle::default_spinner()
spinner_clone.finish_and_clear(); .template("{spinner:.blue} {msg}")
disc_bar_clone.set_length(total as u64); .unwrap(),
disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(100)); );
} s.set_message(format!("Fetching {type_label} from {path}..."));
ProgressEvent::MrDiscussionSynced { current, total: _ } => { s.enable_steady_tick(std::time::Duration::from_millis(100));
disc_bar_clone.set_position(current as u64); s
} };
ProgressEvent::MrDiscussionSyncComplete => {
disc_bar_clone.finish_and_clear(); let disc_bar = if !display.show_progress {
} ProgressBar::hidden()
ProgressEvent::ResourceEventsFetchStarted { total } => { } else {
disc_bar_clone.reset(); let b = multi.add(ProgressBar::new(0));
disc_bar_clone.set_length(total as u64); b.set_style(
disc_bar_clone.set_style(
ProgressStyle::default_bar() ProgressStyle::default_bar()
.template(" {spinner:.blue} Fetching resource events [{bar:30.cyan/dim}] {pos}/{len}") .template(
" {spinner:.blue} Syncing discussions [{bar:30.cyan/dim}] {pos}/{len}",
)
.unwrap() .unwrap()
.progress_chars("=> "), .progress_chars("=> "),
); );
disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(100)); b
} };
ProgressEvent::ResourceEventFetched { current, total: _ } => {
disc_bar_clone.set_position(current as u64);
}
ProgressEvent::ResourceEventsFetchComplete { .. } => {
disc_bar_clone.finish_and_clear();
}
_ => {}
})
};
if resource_type == "issues" { let spinner_clone = spinner.clone();
let result = ingest_project_issues_with_progress( let disc_bar_clone = disc_bar.clone();
&conn, let progress_callback: crate::ingestion::ProgressCallback = if !display.show_progress {
&client, Box::new(|_| {})
config, } else {
*local_project_id, Box::new(move |event: ProgressEvent| match event {
*gitlab_project_id, ProgressEvent::DiscussionSyncStarted { total } => {
Some(progress_callback), spinner_clone.finish_and_clear();
) disc_bar_clone.set_length(total as u64);
.await?; disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(100));
}
ProgressEvent::DiscussionSynced { current, total: _ } => {
disc_bar_clone.set_position(current as u64);
}
ProgressEvent::DiscussionSyncComplete => {
disc_bar_clone.finish_and_clear();
}
ProgressEvent::MrDiscussionSyncStarted { total } => {
spinner_clone.finish_and_clear();
disc_bar_clone.set_length(total as u64);
disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(100));
}
ProgressEvent::MrDiscussionSynced { current, total: _ } => {
disc_bar_clone.set_position(current as u64);
}
ProgressEvent::MrDiscussionSyncComplete => {
disc_bar_clone.finish_and_clear();
}
ProgressEvent::ResourceEventsFetchStarted { total } => {
disc_bar_clone.reset();
disc_bar_clone.set_length(total as u64);
disc_bar_clone.set_style(
ProgressStyle::default_bar()
.template(" {spinner:.blue} Fetching resource events [{bar:30.cyan/dim}] {pos}/{len}")
.unwrap()
.progress_chars("=> "),
);
disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(100));
}
ProgressEvent::ResourceEventFetched { current, total: _ } => {
disc_bar_clone.set_position(current as u64);
}
ProgressEvent::ResourceEventsFetchComplete { .. } => {
disc_bar_clone.finish_and_clear();
}
_ => {}
})
};
spinner.finish_and_clear(); let outcome = if resource_type == "issues" {
disc_bar.finish_and_clear(); let result = ingest_project_issues_with_progress(
&proj_conn,
&client,
&config,
local_project_id,
gitlab_project_id,
Some(progress_callback),
)
.await?;
// Print per-project summary (only in interactive mode) spinner.finish_and_clear();
if display.show_text { disc_bar.finish_and_clear();
print_issue_project_summary(path, &result);
ProjectIngestOutcome::Issues { path, result }
} else {
let result = ingest_project_merge_requests_with_progress(
&proj_conn,
&client,
&config,
local_project_id,
gitlab_project_id,
full,
Some(progress_callback),
)
.await?;
spinner.finish_and_clear();
disc_bar.finish_and_clear();
ProjectIngestOutcome::Mrs { path, result }
};
Ok(outcome)
} }
})
.buffer_unordered(concurrency)
.collect()
.await;
// Aggregate totals // Aggregate results and print per-project summaries.
total.projects_synced += 1; // Process all successes first, then return the first error (if any)
total.issues_fetched += result.issues_fetched; // so that successful project summaries are always printed.
total.issues_upserted += result.issues_upserted; let mut first_error: Option<LoreError> = None;
total.labels_created += result.labels_created; for project_result in project_results {
total.discussions_fetched += result.discussions_fetched; match project_result {
total.notes_upserted += result.notes_upserted; Err(e) => {
total.issues_synced_discussions += result.issues_synced_discussions; if first_error.is_none() {
total.issues_skipped_discussion_sync += result.issues_skipped_discussion_sync; first_error = Some(e);
total.resource_events_fetched += result.resource_events_fetched; }
total.resource_events_failed += result.resource_events_failed; }
} else { Ok(ProjectIngestOutcome::Issues {
let result = ingest_project_merge_requests_with_progress( ref path,
&conn, ref result,
&client, }) => {
config, if display.show_text {
*local_project_id, print_issue_project_summary(path, result);
*gitlab_project_id, }
full, total.projects_synced += 1;
Some(progress_callback), total.issues_fetched += result.issues_fetched;
) total.issues_upserted += result.issues_upserted;
.await?; total.labels_created += result.labels_created;
total.discussions_fetched += result.discussions_fetched;
spinner.finish_and_clear(); total.notes_upserted += result.notes_upserted;
disc_bar.finish_and_clear(); total.issues_synced_discussions += result.issues_synced_discussions;
total.issues_skipped_discussion_sync += result.issues_skipped_discussion_sync;
// Print per-project summary (only in interactive mode) total.resource_events_fetched += result.resource_events_fetched;
if display.show_text { total.resource_events_failed += result.resource_events_failed;
print_mr_project_summary(path, &result); }
Ok(ProjectIngestOutcome::Mrs {
ref path,
ref result,
}) => {
if display.show_text {
print_mr_project_summary(path, result);
}
total.projects_synced += 1;
total.mrs_fetched += result.mrs_fetched;
total.mrs_upserted += result.mrs_upserted;
total.labels_created += result.labels_created;
total.assignees_linked += result.assignees_linked;
total.reviewers_linked += result.reviewers_linked;
total.discussions_fetched += result.discussions_fetched;
total.notes_upserted += result.notes_upserted;
total.diffnotes_count += result.diffnotes_count;
total.mrs_synced_discussions += result.mrs_synced_discussions;
total.mrs_skipped_discussion_sync += result.mrs_skipped_discussion_sync;
total.resource_events_fetched += result.resource_events_fetched;
total.resource_events_failed += result.resource_events_failed;
} }
// Aggregate totals
total.projects_synced += 1;
total.mrs_fetched += result.mrs_fetched;
total.mrs_upserted += result.mrs_upserted;
total.labels_created += result.labels_created;
total.assignees_linked += result.assignees_linked;
total.reviewers_linked += result.reviewers_linked;
total.discussions_fetched += result.discussions_fetched;
total.notes_upserted += result.notes_upserted;
total.diffnotes_count += result.diffnotes_count;
total.mrs_synced_discussions += result.mrs_synced_discussions;
total.mrs_skipped_discussion_sync += result.mrs_skipped_discussion_sync;
total.resource_events_fetched += result.resource_events_fetched;
total.resource_events_failed += result.resource_events_failed;
} }
} }
if let Some(e) = first_error {
return Err(e);
}
// Lock is released on drop // Lock is released on drop
Ok(total) Ok(total)
} }

View File

@@ -51,6 +51,9 @@ pub struct SyncConfig {
#[serde(rename = "dependentConcurrency")] #[serde(rename = "dependentConcurrency")]
pub dependent_concurrency: u32, pub dependent_concurrency: u32,
#[serde(rename = "requestsPerSecond")]
pub requests_per_second: f64,
#[serde(rename = "fetchResourceEvents", default = "default_true")] #[serde(rename = "fetchResourceEvents", default = "default_true")]
pub fetch_resource_events: bool, pub fetch_resource_events: bool,
} }
@@ -67,7 +70,8 @@ impl Default for SyncConfig {
heartbeat_interval_seconds: 30, heartbeat_interval_seconds: 30,
cursor_rewind_seconds: 2, cursor_rewind_seconds: 2,
primary_concurrency: 4, primary_concurrency: 4,
dependent_concurrency: 2, dependent_concurrency: 8,
requests_per_second: 30.0,
fetch_resource_events: true, fetch_resource_events: true,
} }
} }

View File

@@ -26,9 +26,11 @@ struct RateLimiter {
impl RateLimiter { impl RateLimiter {
fn new(requests_per_second: f64) -> Self { fn new(requests_per_second: f64) -> Self {
// Floor at 0.1 rps to prevent division-by-zero panic in Duration::from_secs_f64
let rps = requests_per_second.max(0.1);
Self { Self {
last_request: Instant::now() - Duration::from_secs(1), // Allow immediate first request last_request: Instant::now() - Duration::from_secs(1), // Allow immediate first request
min_interval: Duration::from_secs_f64(1.0 / requests_per_second), min_interval: Duration::from_secs_f64(1.0 / rps),
} }
} }
@@ -67,6 +69,10 @@ fn rand_jitter() -> u64 {
} }
/// GitLab API client with rate limiting. /// GitLab API client with rate limiting.
///
/// Cloning shares the underlying HTTP client and rate limiter,
/// making it cheap and safe for concurrent use across projects.
#[derive(Clone)]
pub struct GitLabClient { pub struct GitLabClient {
client: Client, client: Client,
base_url: String, base_url: String,
@@ -112,28 +118,58 @@ impl GitLabClient {
self.request("/api/v4/version").await self.request("/api/v4/version").await
} }
/// Make an authenticated API request. /// Maximum number of retries on 429 Too Many Requests.
const MAX_RETRIES: u32 = 3;
/// Make an authenticated API request with automatic 429 retry.
async fn request<T: serde::de::DeserializeOwned>(&self, path: &str) -> Result<T> { async fn request<T: serde::de::DeserializeOwned>(&self, path: &str) -> Result<T> {
let delay = self.rate_limiter.lock().await.check_delay(); let url = format!("{}{}", self.base_url, path);
if let Some(d) = delay {
sleep(d).await; for attempt in 0..=Self::MAX_RETRIES {
let delay = self.rate_limiter.lock().await.check_delay();
if let Some(d) = delay {
sleep(d).await;
}
debug!(url = %url, attempt, "GitLab request");
let response = self
.client
.get(&url)
.header("PRIVATE-TOKEN", &self.token)
.send()
.await
.map_err(|e| LoreError::GitLabNetworkError {
base_url: self.base_url.clone(),
source: Some(e),
})?;
if response.status() == StatusCode::TOO_MANY_REQUESTS && attempt < Self::MAX_RETRIES {
let retry_after = Self::parse_retry_after(&response);
tracing::warn!(
retry_after_secs = retry_after,
attempt,
path,
"Rate limited by GitLab, retrying"
);
sleep(Duration::from_secs(retry_after)).await;
continue;
}
return self.handle_response(response, path).await;
} }
let url = format!("{}{}", self.base_url, path); unreachable!("loop always returns")
debug!(url = %url, "GitLab request"); }
let response = self /// Parse retry-after header from a 429 response, defaulting to 60s.
.client fn parse_retry_after(response: &Response) -> u64 {
.get(&url) response
.header("PRIVATE-TOKEN", &self.token) .headers()
.send() .get("retry-after")
.await .and_then(|v| v.to_str().ok())
.map_err(|e| LoreError::GitLabNetworkError { .and_then(|s| s.parse().ok())
base_url: self.base_url.clone(), .unwrap_or(60)
source: Some(e),
})?;
self.handle_response(response, path).await
} }
/// Handle API response, converting errors appropriately. /// Handle API response, converting errors appropriately.
@@ -150,19 +186,22 @@ impl GitLabClient {
}), }),
StatusCode::TOO_MANY_REQUESTS => { StatusCode::TOO_MANY_REQUESTS => {
let retry_after = response let retry_after = Self::parse_retry_after(&response);
.headers()
.get("retry-after")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse().ok())
.unwrap_or(60);
Err(LoreError::GitLabRateLimited { retry_after }) Err(LoreError::GitLabRateLimited { retry_after })
} }
status if status.is_success() => { status if status.is_success() => {
let body = response.json().await?; let text = response.text().await?;
Ok(body) serde_json::from_str(&text).map_err(|e| {
let preview = if text.len() > 500 {
&text[..500]
} else {
&text
};
LoreError::Other(format!(
"Failed to decode response from {path}: {e}\nResponse preview: {preview}"
))
})
} }
status => Err(LoreError::Other(format!( status => Err(LoreError::Other(format!(
@@ -498,35 +537,52 @@ impl GitLabClient {
} }
/// Make an authenticated API request with query parameters, returning headers. /// Make an authenticated API request with query parameters, returning headers.
/// Automatically retries on 429 Too Many Requests.
async fn request_with_headers<T: serde::de::DeserializeOwned>( async fn request_with_headers<T: serde::de::DeserializeOwned>(
&self, &self,
path: &str, path: &str,
params: &[(&str, String)], params: &[(&str, String)],
) -> Result<(T, HeaderMap)> { ) -> Result<(T, HeaderMap)> {
let delay = self.rate_limiter.lock().await.check_delay(); let url = format!("{}{}", self.base_url, path);
if let Some(d) = delay {
sleep(d).await; for attempt in 0..=Self::MAX_RETRIES {
let delay = self.rate_limiter.lock().await.check_delay();
if let Some(d) = delay {
sleep(d).await;
}
debug!(url = %url, ?params, attempt, "GitLab paginated request");
let response = self
.client
.get(&url)
.query(params)
.header("PRIVATE-TOKEN", &self.token)
.send()
.await
.map_err(|e| LoreError::GitLabNetworkError {
base_url: self.base_url.clone(),
source: Some(e),
})?;
if response.status() == StatusCode::TOO_MANY_REQUESTS && attempt < Self::MAX_RETRIES {
let retry_after = Self::parse_retry_after(&response);
tracing::warn!(
retry_after_secs = retry_after,
attempt,
path,
"Rate limited by GitLab, retrying"
);
sleep(Duration::from_secs(retry_after)).await;
continue;
}
let headers = response.headers().clone();
let body = self.handle_response(response, path).await?;
return Ok((body, headers));
} }
let url = format!("{}{}", self.base_url, path); unreachable!("loop always returns")
debug!(url = %url, ?params, "GitLab paginated request");
let response = self
.client
.get(&url)
.query(params)
.header("PRIVATE-TOKEN", &self.token)
.send()
.await
.map_err(|e| LoreError::GitLabNetworkError {
base_url: self.base_url.clone(),
source: Some(e),
})?;
let headers = response.headers().clone();
let body = self.handle_response(response, path).await?;
Ok((body, headers))
} }
} }