perf(ingestion): implement prefetch pattern for issue discussions

Issue discussion sync was ~10x slower than MR discussion sync because it
used a fully sequential pattern: fetch one issue's discussions, write to
DB, repeat. MR sync already used a prefetch pattern with concurrent HTTP
requests followed by sequential DB writes.

This commit brings issue discussion sync to parity with MRs:

Architecture (prefetch pattern):
  1. HTTP phase: Concurrent fetches via `join_all()` with batch size
     controlled by `dependent_concurrency` config (default 8)
  2. Transform phase: Normalize discussions and notes during prefetch
  3. DB phase: Sequential writes with proper transaction boundaries

Changes:
  - gitlab/client.rs: Add `fetch_all_issue_discussions()` to mirror
    the existing MR pattern for API consistency
  - discussions.rs: Replace `ingest_issue_discussions()` with:
    * `prefetch_issue_discussions()` - async HTTP fetch + transform
    * `write_prefetched_issue_discussions()` - sync DB writes
    * New structs: `PrefetchedIssueDiscussions`, `PrefetchedDiscussion`
  - orchestrator.rs: Update `sync_discussions_sequential()` to use
    concurrent prefetch for each batch instead of sequential calls
  - surgical.rs: Update single-issue surgical sync to use new functions
  - mod.rs: Update public exports

Expected improvement: 5-10x speedup on issue discussion sync (from ~50s
to ~5-10s for large projects) due to concurrent HTTP round-trips.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
teernisse
2026-03-02 14:13:51 -05:00
parent b67bb8754c
commit 5fd1ce6905
5 changed files with 132 additions and 119 deletions

View File

@@ -576,6 +576,23 @@ impl GitLabClient {
Ok(discussions) Ok(discussions)
} }
pub async fn fetch_all_issue_discussions(
&self,
gitlab_project_id: i64,
issue_iid: i64,
) -> Result<Vec<GitLabDiscussion>> {
use futures::StreamExt;
let mut discussions = Vec::new();
let mut stream = self.paginate_issue_discussions(gitlab_project_id, issue_iid);
while let Some(result) = stream.next().await {
discussions.push(result?);
}
Ok(discussions)
}
} }
impl GitLabClient { impl GitLabClient {

View File

@@ -1,4 +1,3 @@
use futures::StreamExt;
use rusqlite::{Connection, params}; use rusqlite::{Connection, params};
use tracing::{debug, warn}; use tracing::{debug, warn};
@@ -9,8 +8,9 @@ use crate::core::time::now_ms;
use crate::documents::SourceType; use crate::documents::SourceType;
use crate::gitlab::GitLabClient; use crate::gitlab::GitLabClient;
use crate::gitlab::transformers::{ use crate::gitlab::transformers::{
NormalizedNote, NoteableRef, transform_discussion, transform_notes, NormalizedDiscussion, NormalizedNote, NoteableRef, transform_discussion, transform_notes,
}; };
use crate::gitlab::types::GitLabDiscussion;
use crate::ingestion::dirty_tracker; use crate::ingestion::dirty_tracker;
use super::issues::IssueForDiscussionSync; use super::issues::IssueForDiscussionSync;
@@ -29,109 +29,113 @@ pub struct IngestDiscussionsResult {
pub stale_discussions_removed: usize, pub stale_discussions_removed: usize,
} }
pub async fn ingest_issue_discussions( // ═══════════════════════════════════════════════════════════════════════
conn: &Connection, // Prefetch pattern — concurrent HTTP fetch, sequential DB write
client: &GitLabClient, // ═══════════════════════════════════════════════════════════════════════
config: &Config,
gitlab_project_id: i64,
local_project_id: i64,
issues: &[IssueForDiscussionSync],
) -> Result<IngestDiscussionsResult> {
let mut total_result = IngestDiscussionsResult::default();
for issue in issues { #[derive(Debug)]
let result = ingest_discussions_for_issue( pub struct PrefetchedIssueDiscussions {
conn, pub issue: IssueForDiscussionSync,
client, pub discussions: Vec<PrefetchedDiscussion>,
config, pub fetch_error: Option<String>,
gitlab_project_id,
local_project_id,
issue,
)
.await?;
total_result.discussions_fetched += result.discussions_fetched;
total_result.discussions_upserted += result.discussions_upserted;
total_result.notes_upserted += result.notes_upserted;
total_result.stale_discussions_removed += result.stale_discussions_removed;
} }
debug!( #[derive(Debug)]
issues_processed = issues.len(), pub struct PrefetchedDiscussion {
discussions_fetched = total_result.discussions_fetched, pub raw: GitLabDiscussion,
discussions_upserted = total_result.discussions_upserted, pub normalized: NormalizedDiscussion,
notes_upserted = total_result.notes_upserted, pub notes: Vec<NormalizedNote>,
stale_removed = total_result.stale_discussions_removed,
"Discussion ingestion complete"
);
Ok(total_result)
} }
async fn ingest_discussions_for_issue( /// Prefetch all discussions for an issue (HTTP only, no DB writes).
conn: &Connection, /// This function is designed to be called concurrently via `join_all`.
pub async fn prefetch_issue_discussions(
client: &GitLabClient, client: &GitLabClient,
config: &Config,
gitlab_project_id: i64, gitlab_project_id: i64,
local_project_id: i64, local_project_id: i64,
issue: &IssueForDiscussionSync, issue: IssueForDiscussionSync,
) -> Result<IngestDiscussionsResult> { ) -> PrefetchedIssueDiscussions {
let mut result = IngestDiscussionsResult::default(); debug!(issue_iid = issue.iid, "Prefetching discussions for issue");
debug!( let raw_discussions = match client
issue_iid = issue.iid, .fetch_all_issue_discussions(gitlab_project_id, issue.iid)
local_issue_id = issue.local_issue_id, .await
"Fetching discussions for issue" {
);
let mut discussions_stream = client.paginate_issue_discussions(gitlab_project_id, issue.iid);
let mut seen_discussion_ids: Vec<String> = Vec::new();
let mut pagination_error: Option<crate::core::error::LoreError> = None;
let run_seen_at = now_ms();
while let Some(disc_result) = discussions_stream.next().await {
let gitlab_discussion = match disc_result {
Ok(d) => d, Ok(d) => d,
Err(e) => { Err(e) => {
warn!( return PrefetchedIssueDiscussions {
issue_iid = issue.iid, issue,
error = %e, discussions: Vec::new(),
"Error during discussion pagination, skipping stale removal" fetch_error: Some(e.to_string()),
); };
pagination_error = Some(e);
break;
} }
}; };
result.discussions_fetched += 1;
let payload_bytes = serde_json::to_vec(&gitlab_discussion)?; let mut discussions = Vec::with_capacity(raw_discussions.len());
for raw in raw_discussions {
let normalized = transform_discussion( let normalized = transform_discussion(
&gitlab_discussion, &raw,
local_project_id, local_project_id,
NoteableRef::Issue(issue.local_issue_id), NoteableRef::Issue(issue.local_issue_id),
); );
let notes = transform_notes(&raw, local_project_id);
discussions.push(PrefetchedDiscussion {
raw,
normalized,
notes,
});
}
PrefetchedIssueDiscussions {
issue,
discussions,
fetch_error: None,
}
}
/// Write prefetched discussions to the database (sequential DB writes).
pub fn write_prefetched_issue_discussions(
conn: &Connection,
config: &Config,
local_project_id: i64,
prefetched: PrefetchedIssueDiscussions,
) -> Result<IngestDiscussionsResult> {
let mut result = IngestDiscussionsResult::default();
let issue = &prefetched.issue;
if let Some(error) = &prefetched.fetch_error {
warn!(issue_iid = issue.iid, error = %error, "Prefetch failed for issue");
return Ok(result);
}
let run_seen_at = now_ms();
let mut seen_discussion_ids: Vec<String> = Vec::with_capacity(prefetched.discussions.len());
for disc in &prefetched.discussions {
result.discussions_fetched += 1;
let notes_count = disc.notes.len();
let tx = conn.unchecked_transaction()?; let tx = conn.unchecked_transaction()?;
let payload_bytes = serde_json::to_vec(&disc.raw)?;
let payload_id = store_payload( let payload_id = store_payload(
&tx, &tx,
StorePayloadOptions { StorePayloadOptions {
project_id: Some(local_project_id), project_id: Some(local_project_id),
resource_type: "discussion", resource_type: "discussion",
gitlab_id: &gitlab_discussion.id, gitlab_id: &disc.raw.id,
json_bytes: &payload_bytes, json_bytes: &payload_bytes,
compress: config.storage.compress_raw_payloads, compress: config.storage.compress_raw_payloads,
}, },
)?; )?;
upsert_discussion(&tx, &normalized, payload_id)?; upsert_discussion(&tx, &disc.normalized, payload_id)?;
let local_discussion_id: i64 = tx.query_row( let local_discussion_id: i64 = tx.query_row(
"SELECT id FROM discussions WHERE project_id = ? AND gitlab_discussion_id = ?", "SELECT id FROM discussions WHERE project_id = ? AND gitlab_discussion_id = ?",
(local_project_id, &normalized.gitlab_discussion_id), (local_project_id, &disc.normalized.gitlab_discussion_id),
|row| row.get(0), |row| row.get(0),
)?; )?;
@@ -147,12 +151,8 @@ async fn ingest_discussions_for_issue(
params![now_ms(), local_discussion_id], params![now_ms(), local_discussion_id],
)?; )?;
let notes = transform_notes(&gitlab_discussion, local_project_id); for note in &disc.notes {
let notes_count = notes.len(); let outcome = upsert_note_for_issue(&tx, local_discussion_id, note, run_seen_at, None)?;
for note in notes {
let outcome =
upsert_note_for_issue(&tx, local_discussion_id, &note, run_seen_at, None)?;
if !note.is_system && outcome.changed_semantics { if !note.is_system && outcome.changed_semantics {
dirty_tracker::mark_dirty_tx(&tx, SourceType::Note, outcome.local_note_id)?; dirty_tracker::mark_dirty_tx(&tx, SourceType::Note, outcome.local_note_id)?;
} }
@@ -164,26 +164,22 @@ async fn ingest_discussions_for_issue(
result.discussions_upserted += 1; result.discussions_upserted += 1;
result.notes_upserted += notes_count; result.notes_upserted += notes_count;
seen_discussion_ids.push(normalized.gitlab_discussion_id.clone()); seen_discussion_ids.push(disc.normalized.gitlab_discussion_id.clone());
} }
if pagination_error.is_none() { // Only do stale removal if fetch succeeded
let removed = remove_stale_discussions(conn, issue.local_issue_id, &seen_discussion_ids)?; let removed = remove_stale_discussions(conn, issue.local_issue_id, &seen_discussion_ids)?;
result.stale_discussions_removed = removed; result.stale_discussions_removed = removed;
update_issue_sync_timestamp(conn, issue.local_issue_id, issue.updated_at)?; update_issue_sync_timestamp(conn, issue.local_issue_id, issue.updated_at)?;
} else if let Some(err) = pagination_error {
warn!(
issue_iid = issue.iid,
discussions_seen = seen_discussion_ids.len(),
"Skipping stale removal due to pagination error"
);
return Err(err);
}
Ok(result) Ok(result)
} }
// ═══════════════════════════════════════════════════════════════════════
// Database helpers
// ═══════════════════════════════════════════════════════════════════════
fn upsert_discussion( fn upsert_discussion(
conn: &Connection, conn: &Connection,
discussion: &crate::gitlab::transformers::NormalizedDiscussion, discussion: &crate::gitlab::transformers::NormalizedDiscussion,

View File

@@ -8,7 +8,9 @@ pub mod mr_discussions;
pub mod orchestrator; pub mod orchestrator;
pub(crate) mod surgical; pub(crate) mod surgical;
pub use discussions::{IngestDiscussionsResult, ingest_issue_discussions}; pub use discussions::{
IngestDiscussionsResult, prefetch_issue_discussions, write_prefetched_issue_discussions,
};
pub use issues::{IngestIssuesResult, IssueForDiscussionSync, ingest_issues}; pub use issues::{IngestIssuesResult, IssueForDiscussionSync, ingest_issues};
pub use merge_requests::{ pub use merge_requests::{
IngestMergeRequestsResult, MrForDiscussionSync, get_mrs_needing_discussion_sync, IngestMergeRequestsResult, MrForDiscussionSync, get_mrs_needing_discussion_sync,

View File

@@ -13,7 +13,7 @@ use crate::core::references::{
use crate::core::shutdown::ShutdownSignal; use crate::core::shutdown::ShutdownSignal;
use crate::gitlab::GitLabClient; use crate::gitlab::GitLabClient;
use super::discussions::ingest_issue_discussions; use super::discussions::{prefetch_issue_discussions, write_prefetched_issue_discussions};
use super::issues::{IssueForDiscussionSync, ingest_issues}; use super::issues::{IssueForDiscussionSync, ingest_issues};
use super::merge_requests::{ use super::merge_requests::{
MrForDiscussionSync, get_mrs_needing_discussion_sync, ingest_merge_requests, MrForDiscussionSync, get_mrs_needing_discussion_sync, ingest_merge_requests,
@@ -471,27 +471,30 @@ async fn sync_discussions_sequential(
let total = issues.len(); let total = issues.len();
let mut results = Vec::with_capacity(issues.len()); let mut results = Vec::with_capacity(issues.len());
let mut processed = 0;
for chunk in issues.chunks(batch_size) { for chunk in issues.chunks(batch_size) {
if signal.is_cancelled() { if signal.is_cancelled() {
debug!("Shutdown requested during discussion sync, returning partial results"); debug!("Shutdown requested during discussion sync, returning partial results");
break; break;
} }
for issue in chunk {
let disc_result = ingest_issue_discussions( // Concurrent HTTP prefetch for all issues in this batch
conn, let prefetch_futures = chunk.iter().map(|issue| {
client, prefetch_issue_discussions(client, gitlab_project_id, local_project_id, issue.clone())
config, });
gitlab_project_id, let prefetched_batch = join_all(prefetch_futures).await;
local_project_id,
std::slice::from_ref(issue), // Sequential DB writes
) for prefetched in prefetched_batch {
.await?; let disc_result =
write_prefetched_issue_discussions(conn, config, local_project_id, prefetched)?;
results.push(disc_result); results.push(disc_result);
processed += 1;
if let Some(cb) = progress { if let Some(cb) = progress {
cb(ProgressEvent::DiscussionSynced { cb(ProgressEvent::DiscussionSynced {
current: results.len(), current: processed,
total, total,
}); });
} }

View File

@@ -9,7 +9,9 @@ use crate::documents::SourceType;
use crate::gitlab::GitLabClient; use crate::gitlab::GitLabClient;
use crate::gitlab::types::{GitLabIssue, GitLabMergeRequest}; use crate::gitlab::types::{GitLabIssue, GitLabMergeRequest};
use crate::ingestion::dirty_tracker; use crate::ingestion::dirty_tracker;
use crate::ingestion::discussions::ingest_issue_discussions; use crate::ingestion::discussions::{
prefetch_issue_discussions, write_prefetched_issue_discussions,
};
use crate::ingestion::issues::{IssueForDiscussionSync, process_single_issue}; use crate::ingestion::issues::{IssueForDiscussionSync, process_single_issue};
use crate::ingestion::merge_requests::{MrForDiscussionSync, process_single_mr}; use crate::ingestion::merge_requests::{MrForDiscussionSync, process_single_mr};
use crate::ingestion::mr_diffs::upsert_mr_file_changes; use crate::ingestion::mr_diffs::upsert_mr_file_changes;
@@ -289,16 +291,9 @@ pub(crate) async fn fetch_dependents_for_issue(
iid, iid,
updated_at: 0, // not used for filtering in surgical mode updated_at: 0, // not used for filtering in surgical mode
}; };
match ingest_issue_discussions( let prefetched =
conn, prefetch_issue_discussions(client, gitlab_project_id, project_id, sync_item).await;
client, match write_prefetched_issue_discussions(conn, config, project_id, prefetched) {
config,
gitlab_project_id,
project_id,
&[sync_item],
)
.await
{
Ok(disc_result) => { Ok(disc_result) => {
result.discussions_fetched = disc_result.discussions_fetched; result.discussions_fetched = disc_result.discussions_fetched;
} }