diff --git a/src/cli/commands/ingest.rs b/src/cli/commands/ingest.rs index b03942a..b006f83 100644 --- a/src/cli/commands/ingest.rs +++ b/src/cli/commands/ingest.rs @@ -44,6 +44,19 @@ pub struct IngestResult { pub resource_events_failed: usize, } +/// Outcome of ingesting a single project, used to aggregate results +/// from concurrent project processing. +enum ProjectIngestOutcome { + Issues { + path: String, + result: IngestProjectResult, + }, + Mrs { + path: String, + result: IngestMrProjectResult, + }, +} + /// Controls what interactive UI elements `run_ingest` displays. /// /// Separates progress indicators (spinners, bars) from text output (headers, @@ -53,6 +66,9 @@ pub struct IngestResult { pub struct IngestDisplay { /// Show animated spinners and progress bars. pub show_progress: bool, + /// Show the per-project spinner. When called from `sync`, the stage + /// spinner already covers this, so a second spinner causes flashing. + pub show_spinner: bool, /// Show text headers ("Ingesting...") and per-project summary lines. pub show_text: bool, } @@ -62,6 +78,7 @@ impl IngestDisplay { pub fn interactive() -> Self { Self { show_progress: true, + show_spinner: true, show_text: true, } } @@ -70,14 +87,17 @@ impl IngestDisplay { pub fn silent() -> Self { Self { show_progress: false, + show_spinner: false, show_text: false, } } - /// Progress only (used by sync in interactive mode). + /// Progress bars only, no spinner or text (used by sync which provides its + /// own stage spinner). pub fn progress_only() -> Self { Self { show_progress: true, + show_spinner: false, show_text: false, } } @@ -123,7 +143,11 @@ pub async fn run_ingest( })?; // Create GitLab client - let client = GitLabClient::new(&config.gitlab.base_url, &token, None); + let client = GitLabClient::new( + &config.gitlab.base_url, + &token, + Some(config.sync.requests_per_second), + ); // Get projects to sync let projects = get_projects_to_sync(&conn, &config.projects, project_filter)?; @@ -188,158 +212,203 @@ pub async fn run_ingest( println!(); } - // Sync each project - for (local_project_id, gitlab_project_id, path) in &projects { - // Show spinner while fetching (only in interactive mode) - let spinner = if !display.show_progress { - ProgressBar::hidden() - } else { - let s = ProgressBar::new_spinner(); - s.set_style( - ProgressStyle::default_spinner() - .template("{spinner:.blue} {msg}") - .unwrap(), - ); - s.set_message(format!("Fetching {type_label} from {path}...")); - s.enable_steady_tick(std::time::Duration::from_millis(100)); - s - }; + // Process projects concurrently. Each project gets its own DB connection + // while sharing the rate limiter through the cloned GitLabClient. + let concurrency = config.sync.primary_concurrency as usize; + let resource_type_owned = resource_type.to_string(); - // Progress bar for discussion sync (hidden until needed, or always hidden in robot mode) - let disc_bar = if !display.show_progress { - ProgressBar::hidden() - } else { - let b = ProgressBar::new(0); - b.set_style( - ProgressStyle::default_bar() - .template( - " {spinner:.blue} Syncing discussions [{bar:30.cyan/dim}] {pos}/{len}", - ) - .unwrap() - .progress_chars("=> "), - ); - b - }; + use futures::stream::{self, StreamExt}; - // Create progress callback (no-op in robot mode) - let spinner_clone = spinner.clone(); - let disc_bar_clone = disc_bar.clone(); - let progress_callback: crate::ingestion::ProgressCallback = if !display.show_progress { - Box::new(|_| {}) - } else { - Box::new(move |event: ProgressEvent| match event { - // Issue events - ProgressEvent::DiscussionSyncStarted { total } => { - spinner_clone.finish_and_clear(); - disc_bar_clone.set_length(total as u64); - disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(100)); - } - ProgressEvent::DiscussionSynced { current, total: _ } => { - disc_bar_clone.set_position(current as u64); - } - ProgressEvent::DiscussionSyncComplete => { - disc_bar_clone.finish_and_clear(); - } - // MR events - ProgressEvent::MrDiscussionSyncStarted { total } => { - spinner_clone.finish_and_clear(); - disc_bar_clone.set_length(total as u64); - disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(100)); - } - ProgressEvent::MrDiscussionSynced { current, total: _ } => { - disc_bar_clone.set_position(current as u64); - } - ProgressEvent::MrDiscussionSyncComplete => { - disc_bar_clone.finish_and_clear(); - } - ProgressEvent::ResourceEventsFetchStarted { total } => { - disc_bar_clone.reset(); - disc_bar_clone.set_length(total as u64); - disc_bar_clone.set_style( + let project_results: Vec> = stream::iter(projects.iter()) + .map(|(local_project_id, gitlab_project_id, path)| { + let client = client.clone(); + let db_path = db_path.clone(); + let config = config.clone(); + let resource_type = resource_type_owned.clone(); + let path = path.clone(); + let local_project_id = *local_project_id; + let gitlab_project_id = *gitlab_project_id; + + async move { + let proj_conn = create_connection(&db_path)?; + + let multi = crate::cli::progress::multi(); + + let spinner = if !display.show_spinner { + ProgressBar::hidden() + } else { + let s = multi.add(ProgressBar::new_spinner()); + s.set_style( + ProgressStyle::default_spinner() + .template("{spinner:.blue} {msg}") + .unwrap(), + ); + s.set_message(format!("Fetching {type_label} from {path}...")); + s.enable_steady_tick(std::time::Duration::from_millis(100)); + s + }; + + let disc_bar = if !display.show_progress { + ProgressBar::hidden() + } else { + let b = multi.add(ProgressBar::new(0)); + b.set_style( ProgressStyle::default_bar() - .template(" {spinner:.blue} Fetching resource events [{bar:30.cyan/dim}] {pos}/{len}") + .template( + " {spinner:.blue} Syncing discussions [{bar:30.cyan/dim}] {pos}/{len}", + ) .unwrap() .progress_chars("=> "), ); - disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(100)); - } - ProgressEvent::ResourceEventFetched { current, total: _ } => { - disc_bar_clone.set_position(current as u64); - } - ProgressEvent::ResourceEventsFetchComplete { .. } => { - disc_bar_clone.finish_and_clear(); - } - _ => {} - }) - }; + b + }; - if resource_type == "issues" { - let result = ingest_project_issues_with_progress( - &conn, - &client, - config, - *local_project_id, - *gitlab_project_id, - Some(progress_callback), - ) - .await?; + let spinner_clone = spinner.clone(); + let disc_bar_clone = disc_bar.clone(); + let progress_callback: crate::ingestion::ProgressCallback = if !display.show_progress { + Box::new(|_| {}) + } else { + Box::new(move |event: ProgressEvent| match event { + ProgressEvent::DiscussionSyncStarted { total } => { + spinner_clone.finish_and_clear(); + disc_bar_clone.set_length(total as u64); + disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(100)); + } + ProgressEvent::DiscussionSynced { current, total: _ } => { + disc_bar_clone.set_position(current as u64); + } + ProgressEvent::DiscussionSyncComplete => { + disc_bar_clone.finish_and_clear(); + } + ProgressEvent::MrDiscussionSyncStarted { total } => { + spinner_clone.finish_and_clear(); + disc_bar_clone.set_length(total as u64); + disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(100)); + } + ProgressEvent::MrDiscussionSynced { current, total: _ } => { + disc_bar_clone.set_position(current as u64); + } + ProgressEvent::MrDiscussionSyncComplete => { + disc_bar_clone.finish_and_clear(); + } + ProgressEvent::ResourceEventsFetchStarted { total } => { + disc_bar_clone.reset(); + disc_bar_clone.set_length(total as u64); + disc_bar_clone.set_style( + ProgressStyle::default_bar() + .template(" {spinner:.blue} Fetching resource events [{bar:30.cyan/dim}] {pos}/{len}") + .unwrap() + .progress_chars("=> "), + ); + disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(100)); + } + ProgressEvent::ResourceEventFetched { current, total: _ } => { + disc_bar_clone.set_position(current as u64); + } + ProgressEvent::ResourceEventsFetchComplete { .. } => { + disc_bar_clone.finish_and_clear(); + } + _ => {} + }) + }; - spinner.finish_and_clear(); - disc_bar.finish_and_clear(); + let outcome = if resource_type == "issues" { + let result = ingest_project_issues_with_progress( + &proj_conn, + &client, + &config, + local_project_id, + gitlab_project_id, + Some(progress_callback), + ) + .await?; - // Print per-project summary (only in interactive mode) - if display.show_text { - print_issue_project_summary(path, &result); + spinner.finish_and_clear(); + disc_bar.finish_and_clear(); + + ProjectIngestOutcome::Issues { path, result } + } else { + let result = ingest_project_merge_requests_with_progress( + &proj_conn, + &client, + &config, + local_project_id, + gitlab_project_id, + full, + Some(progress_callback), + ) + .await?; + + spinner.finish_and_clear(); + disc_bar.finish_and_clear(); + + ProjectIngestOutcome::Mrs { path, result } + }; + + Ok(outcome) } + }) + .buffer_unordered(concurrency) + .collect() + .await; - // Aggregate totals - total.projects_synced += 1; - total.issues_fetched += result.issues_fetched; - total.issues_upserted += result.issues_upserted; - total.labels_created += result.labels_created; - total.discussions_fetched += result.discussions_fetched; - total.notes_upserted += result.notes_upserted; - total.issues_synced_discussions += result.issues_synced_discussions; - total.issues_skipped_discussion_sync += result.issues_skipped_discussion_sync; - total.resource_events_fetched += result.resource_events_fetched; - total.resource_events_failed += result.resource_events_failed; - } else { - let result = ingest_project_merge_requests_with_progress( - &conn, - &client, - config, - *local_project_id, - *gitlab_project_id, - full, - Some(progress_callback), - ) - .await?; - - spinner.finish_and_clear(); - disc_bar.finish_and_clear(); - - // Print per-project summary (only in interactive mode) - if display.show_text { - print_mr_project_summary(path, &result); + // Aggregate results and print per-project summaries. + // Process all successes first, then return the first error (if any) + // so that successful project summaries are always printed. + let mut first_error: Option = None; + for project_result in project_results { + match project_result { + Err(e) => { + if first_error.is_none() { + first_error = Some(e); + } + } + Ok(ProjectIngestOutcome::Issues { + ref path, + ref result, + }) => { + if display.show_text { + print_issue_project_summary(path, result); + } + total.projects_synced += 1; + total.issues_fetched += result.issues_fetched; + total.issues_upserted += result.issues_upserted; + total.labels_created += result.labels_created; + total.discussions_fetched += result.discussions_fetched; + total.notes_upserted += result.notes_upserted; + total.issues_synced_discussions += result.issues_synced_discussions; + total.issues_skipped_discussion_sync += result.issues_skipped_discussion_sync; + total.resource_events_fetched += result.resource_events_fetched; + total.resource_events_failed += result.resource_events_failed; + } + Ok(ProjectIngestOutcome::Mrs { + ref path, + ref result, + }) => { + if display.show_text { + print_mr_project_summary(path, result); + } + total.projects_synced += 1; + total.mrs_fetched += result.mrs_fetched; + total.mrs_upserted += result.mrs_upserted; + total.labels_created += result.labels_created; + total.assignees_linked += result.assignees_linked; + total.reviewers_linked += result.reviewers_linked; + total.discussions_fetched += result.discussions_fetched; + total.notes_upserted += result.notes_upserted; + total.diffnotes_count += result.diffnotes_count; + total.mrs_synced_discussions += result.mrs_synced_discussions; + total.mrs_skipped_discussion_sync += result.mrs_skipped_discussion_sync; + total.resource_events_fetched += result.resource_events_fetched; + total.resource_events_failed += result.resource_events_failed; } - - // Aggregate totals - total.projects_synced += 1; - total.mrs_fetched += result.mrs_fetched; - total.mrs_upserted += result.mrs_upserted; - total.labels_created += result.labels_created; - total.assignees_linked += result.assignees_linked; - total.reviewers_linked += result.reviewers_linked; - total.discussions_fetched += result.discussions_fetched; - total.notes_upserted += result.notes_upserted; - total.diffnotes_count += result.diffnotes_count; - total.mrs_synced_discussions += result.mrs_synced_discussions; - total.mrs_skipped_discussion_sync += result.mrs_skipped_discussion_sync; - total.resource_events_fetched += result.resource_events_fetched; - total.resource_events_failed += result.resource_events_failed; } } + if let Some(e) = first_error { + return Err(e); + } + // Lock is released on drop Ok(total) } diff --git a/src/core/config.rs b/src/core/config.rs index 3d16532..57d1e60 100644 --- a/src/core/config.rs +++ b/src/core/config.rs @@ -51,6 +51,9 @@ pub struct SyncConfig { #[serde(rename = "dependentConcurrency")] pub dependent_concurrency: u32, + #[serde(rename = "requestsPerSecond")] + pub requests_per_second: f64, + #[serde(rename = "fetchResourceEvents", default = "default_true")] pub fetch_resource_events: bool, } @@ -67,7 +70,8 @@ impl Default for SyncConfig { heartbeat_interval_seconds: 30, cursor_rewind_seconds: 2, primary_concurrency: 4, - dependent_concurrency: 2, + dependent_concurrency: 8, + requests_per_second: 30.0, fetch_resource_events: true, } } diff --git a/src/gitlab/client.rs b/src/gitlab/client.rs index 117e7af..e9e0833 100644 --- a/src/gitlab/client.rs +++ b/src/gitlab/client.rs @@ -26,9 +26,11 @@ struct RateLimiter { impl RateLimiter { fn new(requests_per_second: f64) -> Self { + // Floor at 0.1 rps to prevent division-by-zero panic in Duration::from_secs_f64 + let rps = requests_per_second.max(0.1); Self { last_request: Instant::now() - Duration::from_secs(1), // Allow immediate first request - min_interval: Duration::from_secs_f64(1.0 / requests_per_second), + min_interval: Duration::from_secs_f64(1.0 / rps), } } @@ -67,6 +69,10 @@ fn rand_jitter() -> u64 { } /// GitLab API client with rate limiting. +/// +/// Cloning shares the underlying HTTP client and rate limiter, +/// making it cheap and safe for concurrent use across projects. +#[derive(Clone)] pub struct GitLabClient { client: Client, base_url: String, @@ -112,28 +118,58 @@ impl GitLabClient { self.request("/api/v4/version").await } - /// Make an authenticated API request. + /// Maximum number of retries on 429 Too Many Requests. + const MAX_RETRIES: u32 = 3; + + /// Make an authenticated API request with automatic 429 retry. async fn request(&self, path: &str) -> Result { - let delay = self.rate_limiter.lock().await.check_delay(); - if let Some(d) = delay { - sleep(d).await; + let url = format!("{}{}", self.base_url, path); + + for attempt in 0..=Self::MAX_RETRIES { + let delay = self.rate_limiter.lock().await.check_delay(); + if let Some(d) = delay { + sleep(d).await; + } + + debug!(url = %url, attempt, "GitLab request"); + + let response = self + .client + .get(&url) + .header("PRIVATE-TOKEN", &self.token) + .send() + .await + .map_err(|e| LoreError::GitLabNetworkError { + base_url: self.base_url.clone(), + source: Some(e), + })?; + + if response.status() == StatusCode::TOO_MANY_REQUESTS && attempt < Self::MAX_RETRIES { + let retry_after = Self::parse_retry_after(&response); + tracing::warn!( + retry_after_secs = retry_after, + attempt, + path, + "Rate limited by GitLab, retrying" + ); + sleep(Duration::from_secs(retry_after)).await; + continue; + } + + return self.handle_response(response, path).await; } - let url = format!("{}{}", self.base_url, path); - debug!(url = %url, "GitLab request"); + unreachable!("loop always returns") + } - let response = self - .client - .get(&url) - .header("PRIVATE-TOKEN", &self.token) - .send() - .await - .map_err(|e| LoreError::GitLabNetworkError { - base_url: self.base_url.clone(), - source: Some(e), - })?; - - self.handle_response(response, path).await + /// Parse retry-after header from a 429 response, defaulting to 60s. + fn parse_retry_after(response: &Response) -> u64 { + response + .headers() + .get("retry-after") + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse().ok()) + .unwrap_or(60) } /// Handle API response, converting errors appropriately. @@ -150,19 +186,22 @@ impl GitLabClient { }), StatusCode::TOO_MANY_REQUESTS => { - let retry_after = response - .headers() - .get("retry-after") - .and_then(|v| v.to_str().ok()) - .and_then(|s| s.parse().ok()) - .unwrap_or(60); - + let retry_after = Self::parse_retry_after(&response); Err(LoreError::GitLabRateLimited { retry_after }) } status if status.is_success() => { - let body = response.json().await?; - Ok(body) + let text = response.text().await?; + serde_json::from_str(&text).map_err(|e| { + let preview = if text.len() > 500 { + &text[..500] + } else { + &text + }; + LoreError::Other(format!( + "Failed to decode response from {path}: {e}\nResponse preview: {preview}" + )) + }) } status => Err(LoreError::Other(format!( @@ -498,35 +537,52 @@ impl GitLabClient { } /// Make an authenticated API request with query parameters, returning headers. + /// Automatically retries on 429 Too Many Requests. async fn request_with_headers( &self, path: &str, params: &[(&str, String)], ) -> Result<(T, HeaderMap)> { - let delay = self.rate_limiter.lock().await.check_delay(); - if let Some(d) = delay { - sleep(d).await; + let url = format!("{}{}", self.base_url, path); + + for attempt in 0..=Self::MAX_RETRIES { + let delay = self.rate_limiter.lock().await.check_delay(); + if let Some(d) = delay { + sleep(d).await; + } + + debug!(url = %url, ?params, attempt, "GitLab paginated request"); + + let response = self + .client + .get(&url) + .query(params) + .header("PRIVATE-TOKEN", &self.token) + .send() + .await + .map_err(|e| LoreError::GitLabNetworkError { + base_url: self.base_url.clone(), + source: Some(e), + })?; + + if response.status() == StatusCode::TOO_MANY_REQUESTS && attempt < Self::MAX_RETRIES { + let retry_after = Self::parse_retry_after(&response); + tracing::warn!( + retry_after_secs = retry_after, + attempt, + path, + "Rate limited by GitLab, retrying" + ); + sleep(Duration::from_secs(retry_after)).await; + continue; + } + + let headers = response.headers().clone(); + let body = self.handle_response(response, path).await?; + return Ok((body, headers)); } - let url = format!("{}{}", self.base_url, path); - debug!(url = %url, ?params, "GitLab paginated request"); - - let response = self - .client - .get(&url) - .query(params) - .header("PRIVATE-TOKEN", &self.token) - .send() - .await - .map_err(|e| LoreError::GitLabNetworkError { - base_url: self.base_url.clone(), - source: Some(e), - })?; - - let headers = response.headers().clone(); - let body = self.handle_response(response, path).await?; - - Ok((body, headers)) + unreachable!("loop always returns") } }