diff --git a/src/gitlab/client.rs b/src/gitlab/client.rs index edee077..46c7632 100644 --- a/src/gitlab/client.rs +++ b/src/gitlab/client.rs @@ -576,6 +576,23 @@ impl GitLabClient { Ok(discussions) } + + pub async fn fetch_all_issue_discussions( + &self, + gitlab_project_id: i64, + issue_iid: i64, + ) -> Result> { + use futures::StreamExt; + + let mut discussions = Vec::new(); + let mut stream = self.paginate_issue_discussions(gitlab_project_id, issue_iid); + + while let Some(result) = stream.next().await { + discussions.push(result?); + } + + Ok(discussions) + } } impl GitLabClient { diff --git a/src/ingestion/discussions.rs b/src/ingestion/discussions.rs index 40d13dd..c0f99f6 100644 --- a/src/ingestion/discussions.rs +++ b/src/ingestion/discussions.rs @@ -1,4 +1,3 @@ -use futures::StreamExt; use rusqlite::{Connection, params}; use tracing::{debug, warn}; @@ -9,8 +8,9 @@ use crate::core::time::now_ms; use crate::documents::SourceType; use crate::gitlab::GitLabClient; use crate::gitlab::transformers::{ - NormalizedNote, NoteableRef, transform_discussion, transform_notes, + NormalizedDiscussion, NormalizedNote, NoteableRef, transform_discussion, transform_notes, }; +use crate::gitlab::types::GitLabDiscussion; use crate::ingestion::dirty_tracker; use super::issues::IssueForDiscussionSync; @@ -29,109 +29,113 @@ pub struct IngestDiscussionsResult { pub stale_discussions_removed: usize, } -pub async fn ingest_issue_discussions( - conn: &Connection, - client: &GitLabClient, - config: &Config, - gitlab_project_id: i64, - local_project_id: i64, - issues: &[IssueForDiscussionSync], -) -> Result { - let mut total_result = IngestDiscussionsResult::default(); +// ═══════════════════════════════════════════════════════════════════════ +// Prefetch pattern — concurrent HTTP fetch, sequential DB write +// ═══════════════════════════════════════════════════════════════════════ - for issue in issues { - let result = ingest_discussions_for_issue( - conn, - client, - config, - gitlab_project_id, - local_project_id, - issue, - ) - .await?; - - total_result.discussions_fetched += result.discussions_fetched; - total_result.discussions_upserted += result.discussions_upserted; - total_result.notes_upserted += result.notes_upserted; - total_result.stale_discussions_removed += result.stale_discussions_removed; - } - - debug!( - issues_processed = issues.len(), - discussions_fetched = total_result.discussions_fetched, - discussions_upserted = total_result.discussions_upserted, - notes_upserted = total_result.notes_upserted, - stale_removed = total_result.stale_discussions_removed, - "Discussion ingestion complete" - ); - - Ok(total_result) +#[derive(Debug)] +pub struct PrefetchedIssueDiscussions { + pub issue: IssueForDiscussionSync, + pub discussions: Vec, + pub fetch_error: Option, } -async fn ingest_discussions_for_issue( - conn: &Connection, +#[derive(Debug)] +pub struct PrefetchedDiscussion { + pub raw: GitLabDiscussion, + pub normalized: NormalizedDiscussion, + pub notes: Vec, +} + +/// Prefetch all discussions for an issue (HTTP only, no DB writes). +/// This function is designed to be called concurrently via `join_all`. +pub async fn prefetch_issue_discussions( client: &GitLabClient, - config: &Config, gitlab_project_id: i64, local_project_id: i64, - issue: &IssueForDiscussionSync, -) -> Result { - let mut result = IngestDiscussionsResult::default(); + issue: IssueForDiscussionSync, +) -> PrefetchedIssueDiscussions { + debug!(issue_iid = issue.iid, "Prefetching discussions for issue"); - debug!( - issue_iid = issue.iid, - local_issue_id = issue.local_issue_id, - "Fetching discussions for issue" - ); + let raw_discussions = match client + .fetch_all_issue_discussions(gitlab_project_id, issue.iid) + .await + { + Ok(d) => d, + Err(e) => { + return PrefetchedIssueDiscussions { + issue, + discussions: Vec::new(), + fetch_error: Some(e.to_string()), + }; + } + }; - let mut discussions_stream = client.paginate_issue_discussions(gitlab_project_id, issue.iid); - - let mut seen_discussion_ids: Vec = Vec::new(); - let mut pagination_error: Option = None; - - let run_seen_at = now_ms(); - - while let Some(disc_result) = discussions_stream.next().await { - let gitlab_discussion = match disc_result { - Ok(d) => d, - Err(e) => { - warn!( - issue_iid = issue.iid, - error = %e, - "Error during discussion pagination, skipping stale removal" - ); - pagination_error = Some(e); - break; - } - }; - result.discussions_fetched += 1; - - let payload_bytes = serde_json::to_vec(&gitlab_discussion)?; + let mut discussions = Vec::with_capacity(raw_discussions.len()); + for raw in raw_discussions { let normalized = transform_discussion( - &gitlab_discussion, + &raw, local_project_id, NoteableRef::Issue(issue.local_issue_id), ); + let notes = transform_notes(&raw, local_project_id); + + discussions.push(PrefetchedDiscussion { + raw, + normalized, + notes, + }); + } + + PrefetchedIssueDiscussions { + issue, + discussions, + fetch_error: None, + } +} + +/// Write prefetched discussions to the database (sequential DB writes). +pub fn write_prefetched_issue_discussions( + conn: &Connection, + config: &Config, + local_project_id: i64, + prefetched: PrefetchedIssueDiscussions, +) -> Result { + let mut result = IngestDiscussionsResult::default(); + let issue = &prefetched.issue; + + if let Some(error) = &prefetched.fetch_error { + warn!(issue_iid = issue.iid, error = %error, "Prefetch failed for issue"); + return Ok(result); + } + + let run_seen_at = now_ms(); + let mut seen_discussion_ids: Vec = Vec::with_capacity(prefetched.discussions.len()); + + for disc in &prefetched.discussions { + result.discussions_fetched += 1; + let notes_count = disc.notes.len(); let tx = conn.unchecked_transaction()?; + let payload_bytes = serde_json::to_vec(&disc.raw)?; let payload_id = store_payload( &tx, StorePayloadOptions { project_id: Some(local_project_id), resource_type: "discussion", - gitlab_id: &gitlab_discussion.id, + gitlab_id: &disc.raw.id, json_bytes: &payload_bytes, compress: config.storage.compress_raw_payloads, }, )?; - upsert_discussion(&tx, &normalized, payload_id)?; + upsert_discussion(&tx, &disc.normalized, payload_id)?; let local_discussion_id: i64 = tx.query_row( "SELECT id FROM discussions WHERE project_id = ? AND gitlab_discussion_id = ?", - (local_project_id, &normalized.gitlab_discussion_id), + (local_project_id, &disc.normalized.gitlab_discussion_id), |row| row.get(0), )?; @@ -147,12 +151,8 @@ async fn ingest_discussions_for_issue( params![now_ms(), local_discussion_id], )?; - let notes = transform_notes(&gitlab_discussion, local_project_id); - let notes_count = notes.len(); - - for note in notes { - let outcome = - upsert_note_for_issue(&tx, local_discussion_id, ¬e, run_seen_at, None)?; + for note in &disc.notes { + let outcome = upsert_note_for_issue(&tx, local_discussion_id, note, run_seen_at, None)?; if !note.is_system && outcome.changed_semantics { dirty_tracker::mark_dirty_tx(&tx, SourceType::Note, outcome.local_note_id)?; } @@ -164,26 +164,22 @@ async fn ingest_discussions_for_issue( result.discussions_upserted += 1; result.notes_upserted += notes_count; - seen_discussion_ids.push(normalized.gitlab_discussion_id.clone()); + seen_discussion_ids.push(disc.normalized.gitlab_discussion_id.clone()); } - if pagination_error.is_none() { - let removed = remove_stale_discussions(conn, issue.local_issue_id, &seen_discussion_ids)?; - result.stale_discussions_removed = removed; + // Only do stale removal if fetch succeeded + let removed = remove_stale_discussions(conn, issue.local_issue_id, &seen_discussion_ids)?; + result.stale_discussions_removed = removed; - update_issue_sync_timestamp(conn, issue.local_issue_id, issue.updated_at)?; - } else if let Some(err) = pagination_error { - warn!( - issue_iid = issue.iid, - discussions_seen = seen_discussion_ids.len(), - "Skipping stale removal due to pagination error" - ); - return Err(err); - } + update_issue_sync_timestamp(conn, issue.local_issue_id, issue.updated_at)?; Ok(result) } +// ═══════════════════════════════════════════════════════════════════════ +// Database helpers +// ═══════════════════════════════════════════════════════════════════════ + fn upsert_discussion( conn: &Connection, discussion: &crate::gitlab::transformers::NormalizedDiscussion, diff --git a/src/ingestion/mod.rs b/src/ingestion/mod.rs index 8d5f3cb..228defc 100644 --- a/src/ingestion/mod.rs +++ b/src/ingestion/mod.rs @@ -8,7 +8,9 @@ pub mod mr_discussions; pub mod orchestrator; pub(crate) mod surgical; -pub use discussions::{IngestDiscussionsResult, ingest_issue_discussions}; +pub use discussions::{ + IngestDiscussionsResult, prefetch_issue_discussions, write_prefetched_issue_discussions, +}; pub use issues::{IngestIssuesResult, IssueForDiscussionSync, ingest_issues}; pub use merge_requests::{ IngestMergeRequestsResult, MrForDiscussionSync, get_mrs_needing_discussion_sync, diff --git a/src/ingestion/orchestrator.rs b/src/ingestion/orchestrator.rs index c04b770..7ddd116 100644 --- a/src/ingestion/orchestrator.rs +++ b/src/ingestion/orchestrator.rs @@ -13,7 +13,7 @@ use crate::core::references::{ use crate::core::shutdown::ShutdownSignal; use crate::gitlab::GitLabClient; -use super::discussions::ingest_issue_discussions; +use super::discussions::{prefetch_issue_discussions, write_prefetched_issue_discussions}; use super::issues::{IssueForDiscussionSync, ingest_issues}; use super::merge_requests::{ MrForDiscussionSync, get_mrs_needing_discussion_sync, ingest_merge_requests, @@ -471,27 +471,30 @@ async fn sync_discussions_sequential( let total = issues.len(); let mut results = Vec::with_capacity(issues.len()); + let mut processed = 0; for chunk in issues.chunks(batch_size) { if signal.is_cancelled() { debug!("Shutdown requested during discussion sync, returning partial results"); break; } - for issue in chunk { - let disc_result = ingest_issue_discussions( - conn, - client, - config, - gitlab_project_id, - local_project_id, - std::slice::from_ref(issue), - ) - .await?; + + // Concurrent HTTP prefetch for all issues in this batch + let prefetch_futures = chunk.iter().map(|issue| { + prefetch_issue_discussions(client, gitlab_project_id, local_project_id, issue.clone()) + }); + let prefetched_batch = join_all(prefetch_futures).await; + + // Sequential DB writes + for prefetched in prefetched_batch { + let disc_result = + write_prefetched_issue_discussions(conn, config, local_project_id, prefetched)?; results.push(disc_result); + processed += 1; if let Some(cb) = progress { cb(ProgressEvent::DiscussionSynced { - current: results.len(), + current: processed, total, }); } diff --git a/src/ingestion/surgical.rs b/src/ingestion/surgical.rs index 46161e8..9d5c1ca 100644 --- a/src/ingestion/surgical.rs +++ b/src/ingestion/surgical.rs @@ -9,7 +9,9 @@ use crate::documents::SourceType; use crate::gitlab::GitLabClient; use crate::gitlab::types::{GitLabIssue, GitLabMergeRequest}; use crate::ingestion::dirty_tracker; -use crate::ingestion::discussions::ingest_issue_discussions; +use crate::ingestion::discussions::{ + prefetch_issue_discussions, write_prefetched_issue_discussions, +}; use crate::ingestion::issues::{IssueForDiscussionSync, process_single_issue}; use crate::ingestion::merge_requests::{MrForDiscussionSync, process_single_mr}; use crate::ingestion::mr_diffs::upsert_mr_file_changes; @@ -289,16 +291,9 @@ pub(crate) async fn fetch_dependents_for_issue( iid, updated_at: 0, // not used for filtering in surgical mode }; - match ingest_issue_discussions( - conn, - client, - config, - gitlab_project_id, - project_id, - &[sync_item], - ) - .await - { + let prefetched = + prefetch_issue_discussions(client, gitlab_project_id, project_id, sync_item).await; + match write_prefetched_issue_discussions(conn, config, project_id, prefetched) { Ok(disc_result) => { result.discussions_fetched = disc_result.discussions_fetched; }