use futures::future::join_all; use rusqlite::Connection; use tracing::{debug, info, instrument, warn}; use crate::Config; use crate::core::dependent_queue::{ claim_jobs, complete_job_tx, count_claimable_jobs, enqueue_job, fail_job, reclaim_stale_locks, }; use crate::core::error::Result; use crate::core::references::{ EntityReference, insert_entity_reference, resolve_issue_local_id, resolve_project_path, }; use crate::core::shutdown::ShutdownSignal; use crate::gitlab::GitLabClient; use super::discussions::ingest_issue_discussions; use super::issues::{IssueForDiscussionSync, ingest_issues}; use super::merge_requests::{ MrForDiscussionSync, get_mrs_needing_discussion_sync, ingest_merge_requests, }; use super::mr_discussions::{prefetch_mr_discussions, write_prefetched_mr_discussions}; pub type ProgressCallback = Box; #[derive(Debug, Clone)] pub enum ProgressEvent { IssuesFetchStarted, IssueFetched { count: usize }, IssuesFetchComplete { total: usize }, DiscussionSyncStarted { total: usize }, DiscussionSynced { current: usize, total: usize }, DiscussionSyncComplete, MrsFetchStarted, MrFetched { count: usize }, MrsFetchComplete { total: usize }, MrDiscussionSyncStarted { total: usize }, MrDiscussionSynced { current: usize, total: usize }, MrDiscussionSyncComplete, ResourceEventsFetchStarted { total: usize }, ResourceEventFetched { current: usize, total: usize }, ResourceEventsFetchComplete { fetched: usize, failed: usize }, ClosesIssuesFetchStarted { total: usize }, ClosesIssueFetched { current: usize, total: usize }, ClosesIssuesFetchComplete { fetched: usize, failed: usize }, } #[derive(Debug, Default)] pub struct IngestProjectResult { pub issues_fetched: usize, pub issues_upserted: usize, pub labels_created: usize, pub discussions_fetched: usize, pub discussions_upserted: usize, pub notes_upserted: usize, pub issues_synced_discussions: usize, pub issues_skipped_discussion_sync: usize, pub resource_events_fetched: usize, pub resource_events_failed: usize, } #[derive(Debug, Default)] pub struct IngestMrProjectResult { pub mrs_fetched: usize, pub mrs_upserted: usize, pub labels_created: usize, pub assignees_linked: usize, pub reviewers_linked: usize, pub discussions_fetched: usize, pub discussions_upserted: usize, pub notes_upserted: usize, pub notes_skipped_bad_timestamp: usize, pub diffnotes_count: usize, pub mrs_synced_discussions: usize, pub mrs_skipped_discussion_sync: usize, pub resource_events_fetched: usize, pub resource_events_failed: usize, pub closes_issues_fetched: usize, pub closes_issues_failed: usize, } pub async fn ingest_project_issues( conn: &Connection, client: &GitLabClient, config: &Config, project_id: i64, gitlab_project_id: i64, ) -> Result { let signal = ShutdownSignal::new(); ingest_project_issues_with_progress( conn, client, config, project_id, gitlab_project_id, None, &signal, ) .await } #[instrument( skip(conn, client, config, progress, signal), fields(project_id, gitlab_project_id, items_processed, items_skipped, errors) )] pub async fn ingest_project_issues_with_progress( conn: &Connection, client: &GitLabClient, config: &Config, project_id: i64, gitlab_project_id: i64, progress: Option, signal: &ShutdownSignal, ) -> Result { let mut result = IngestProjectResult::default(); let emit = |event: ProgressEvent| { if let Some(ref cb) = progress { cb(event); } }; emit(ProgressEvent::IssuesFetchStarted); let issue_result = ingest_issues(conn, client, config, project_id, gitlab_project_id).await?; result.issues_fetched = issue_result.fetched; result.issues_upserted = issue_result.upserted; result.labels_created = issue_result.labels_created; emit(ProgressEvent::IssuesFetchComplete { total: result.issues_fetched, }); let issues_needing_sync = issue_result.issues_needing_discussion_sync; let total_issues: i64 = conn .query_row( "SELECT COUNT(*) FROM issues WHERE project_id = ?", [project_id], |row| row.get(0), ) .unwrap_or(0); let total_issues = total_issues as usize; result.issues_skipped_discussion_sync = total_issues.saturating_sub(issues_needing_sync.len()); if signal.is_cancelled() { info!("Shutdown requested, returning partial issue results"); return Ok(result); } if issues_needing_sync.is_empty() { info!("No issues need discussion sync"); } else { info!( count = issues_needing_sync.len(), "Starting discussion sync for issues" ); emit(ProgressEvent::DiscussionSyncStarted { total: issues_needing_sync.len(), }); let discussion_results = sync_discussions_sequential( conn, client, config, gitlab_project_id, project_id, &issues_needing_sync, &progress, signal, ) .await?; emit(ProgressEvent::DiscussionSyncComplete); for disc_result in discussion_results { result.discussions_fetched += disc_result.discussions_fetched; result.discussions_upserted += disc_result.discussions_upserted; result.notes_upserted += disc_result.notes_upserted; result.issues_synced_discussions += 1; } } if signal.is_cancelled() { info!("Shutdown requested, returning partial issue results"); return Ok(result); } if config.sync.fetch_resource_events { let enqueued = enqueue_resource_events_for_entity_type(conn, project_id, "issue")?; if enqueued > 0 { debug!(enqueued, "Enqueued resource events jobs for issues"); } let drain_result = drain_resource_events( conn, client, config, project_id, gitlab_project_id, &progress, signal, ) .await?; result.resource_events_fetched = drain_result.fetched; result.resource_events_failed = drain_result.failed; let refs_inserted = crate::core::references::extract_refs_from_state_events(conn, project_id)?; if refs_inserted > 0 { debug!( refs_inserted, "Extracted cross-references from state events" ); } } info!( issues_fetched = result.issues_fetched, issues_upserted = result.issues_upserted, labels_created = result.labels_created, discussions_fetched = result.discussions_fetched, notes_upserted = result.notes_upserted, issues_synced = result.issues_synced_discussions, issues_skipped = result.issues_skipped_discussion_sync, resource_events_fetched = result.resource_events_fetched, resource_events_failed = result.resource_events_failed, "Project ingestion complete" ); tracing::Span::current().record("items_processed", result.issues_upserted); tracing::Span::current().record("items_skipped", result.issues_skipped_discussion_sync); tracing::Span::current().record("errors", result.resource_events_failed); Ok(result) } #[allow(clippy::too_many_arguments)] async fn sync_discussions_sequential( conn: &Connection, client: &GitLabClient, config: &Config, gitlab_project_id: i64, local_project_id: i64, issues: &[IssueForDiscussionSync], progress: &Option, signal: &ShutdownSignal, ) -> Result> { let batch_size = config.sync.dependent_concurrency as usize; let total = issues.len(); let mut results = Vec::with_capacity(issues.len()); for chunk in issues.chunks(batch_size) { if signal.is_cancelled() { info!("Shutdown requested during discussion sync, returning partial results"); break; } for issue in chunk { let disc_result = ingest_issue_discussions( conn, client, config, gitlab_project_id, local_project_id, std::slice::from_ref(issue), ) .await?; results.push(disc_result); if let Some(cb) = progress { cb(ProgressEvent::DiscussionSynced { current: results.len(), total, }); } } } Ok(results) } pub async fn ingest_project_merge_requests( conn: &Connection, client: &GitLabClient, config: &Config, project_id: i64, gitlab_project_id: i64, full_sync: bool, ) -> Result { let signal = ShutdownSignal::new(); ingest_project_merge_requests_with_progress( conn, client, config, project_id, gitlab_project_id, full_sync, None, &signal, ) .await } #[instrument( skip(conn, client, config, progress, signal), fields(project_id, gitlab_project_id, items_processed, items_skipped, errors) )] #[allow(clippy::too_many_arguments)] pub async fn ingest_project_merge_requests_with_progress( conn: &Connection, client: &GitLabClient, config: &Config, project_id: i64, gitlab_project_id: i64, full_sync: bool, progress: Option, signal: &ShutdownSignal, ) -> Result { let mut result = IngestMrProjectResult::default(); let emit = |event: ProgressEvent| { if let Some(ref cb) = progress { cb(event); } }; emit(ProgressEvent::MrsFetchStarted); let mr_result = ingest_merge_requests( conn, client, config, project_id, gitlab_project_id, full_sync, ) .await?; result.mrs_fetched = mr_result.fetched; result.mrs_upserted = mr_result.upserted; result.labels_created = mr_result.labels_created; result.assignees_linked = mr_result.assignees_linked; result.reviewers_linked = mr_result.reviewers_linked; emit(ProgressEvent::MrsFetchComplete { total: result.mrs_fetched, }); let mrs_needing_sync = get_mrs_needing_discussion_sync(conn, project_id)?; let total_mrs: i64 = conn .query_row( "SELECT COUNT(*) FROM merge_requests WHERE project_id = ?", [project_id], |row| row.get(0), ) .unwrap_or(0); let total_mrs = total_mrs as usize; result.mrs_skipped_discussion_sync = total_mrs.saturating_sub(mrs_needing_sync.len()); if signal.is_cancelled() { info!("Shutdown requested, returning partial MR results"); return Ok(result); } if mrs_needing_sync.is_empty() { info!("No MRs need discussion sync"); } else { info!( count = mrs_needing_sync.len(), "Starting discussion sync for MRs" ); emit(ProgressEvent::MrDiscussionSyncStarted { total: mrs_needing_sync.len(), }); let discussion_results = sync_mr_discussions_sequential( conn, client, config, gitlab_project_id, project_id, &mrs_needing_sync, &progress, signal, ) .await?; emit(ProgressEvent::MrDiscussionSyncComplete); for disc_result in discussion_results { result.discussions_fetched += disc_result.discussions_fetched; result.discussions_upserted += disc_result.discussions_upserted; result.notes_upserted += disc_result.notes_upserted; result.notes_skipped_bad_timestamp += disc_result.notes_skipped_bad_timestamp; result.diffnotes_count += disc_result.diffnotes_count; if disc_result.pagination_succeeded { result.mrs_synced_discussions += 1; } } } if signal.is_cancelled() { info!("Shutdown requested, returning partial MR results"); return Ok(result); } if config.sync.fetch_resource_events { let enqueued = enqueue_resource_events_for_entity_type(conn, project_id, "merge_request")?; if enqueued > 0 { debug!(enqueued, "Enqueued resource events jobs for MRs"); } let drain_result = drain_resource_events( conn, client, config, project_id, gitlab_project_id, &progress, signal, ) .await?; result.resource_events_fetched = drain_result.fetched; result.resource_events_failed = drain_result.failed; let refs_inserted = crate::core::references::extract_refs_from_state_events(conn, project_id)?; if refs_inserted > 0 { debug!( refs_inserted, "Extracted cross-references from state events" ); } } if signal.is_cancelled() { info!("Shutdown requested, returning partial MR results"); return Ok(result); } let note_refs = crate::core::note_parser::extract_refs_from_system_notes(conn, project_id)?; if note_refs.inserted > 0 || note_refs.skipped_unresolvable > 0 { debug!( inserted = note_refs.inserted, unresolvable = note_refs.skipped_unresolvable, parse_failures = note_refs.parse_failures, "Extracted cross-references from system notes (MRs)" ); } { let enqueued = enqueue_mr_closes_issues_jobs(conn, project_id)?; if enqueued > 0 { debug!(enqueued, "Enqueued mr_closes_issues jobs"); } let closes_result = drain_mr_closes_issues( conn, client, config, project_id, gitlab_project_id, &progress, signal, ) .await?; result.closes_issues_fetched = closes_result.fetched; result.closes_issues_failed = closes_result.failed; } info!( mrs_fetched = result.mrs_fetched, mrs_upserted = result.mrs_upserted, labels_created = result.labels_created, discussions_fetched = result.discussions_fetched, notes_upserted = result.notes_upserted, diffnotes = result.diffnotes_count, mrs_synced = result.mrs_synced_discussions, mrs_skipped = result.mrs_skipped_discussion_sync, resource_events_fetched = result.resource_events_fetched, resource_events_failed = result.resource_events_failed, closes_issues_fetched = result.closes_issues_fetched, closes_issues_failed = result.closes_issues_failed, "MR project ingestion complete" ); tracing::Span::current().record("items_processed", result.mrs_upserted); tracing::Span::current().record("items_skipped", result.mrs_skipped_discussion_sync); tracing::Span::current().record("errors", result.resource_events_failed); Ok(result) } #[allow(clippy::too_many_arguments)] async fn sync_mr_discussions_sequential( conn: &Connection, client: &GitLabClient, config: &Config, gitlab_project_id: i64, local_project_id: i64, mrs: &[MrForDiscussionSync], progress: &Option, signal: &ShutdownSignal, ) -> Result> { let batch_size = config.sync.dependent_concurrency as usize; let total = mrs.len(); let mut results = Vec::with_capacity(mrs.len()); let mut processed = 0; for chunk in mrs.chunks(batch_size) { if signal.is_cancelled() { info!("Shutdown requested during MR discussion sync, returning partial results"); break; } let prefetch_futures = chunk.iter().map(|mr| { prefetch_mr_discussions(client, gitlab_project_id, local_project_id, mr.clone()) }); let prefetched_batch = join_all(prefetch_futures).await; for prefetched in prefetched_batch { let disc_result = write_prefetched_mr_discussions(conn, config, local_project_id, prefetched)?; results.push(disc_result); processed += 1; if let Some(cb) = progress { cb(ProgressEvent::MrDiscussionSynced { current: processed, total, }); } } } Ok(results) } #[derive(Debug, Default)] pub struct DrainResult { pub fetched: usize, pub failed: usize, pub skipped_not_found: usize, } fn enqueue_resource_events_for_entity_type( conn: &Connection, project_id: i64, entity_type: &str, ) -> Result { match entity_type { "issue" => { conn.execute( "DELETE FROM pending_dependent_fetches \ WHERE project_id = ?1 AND entity_type = 'issue' AND job_type = 'resource_events' \ AND entity_local_id IN ( \ SELECT id FROM issues \ WHERE project_id = ?1 \ AND updated_at <= COALESCE(resource_events_synced_for_updated_at, 0) \ )", [project_id], )?; } "merge_request" => { conn.execute( "DELETE FROM pending_dependent_fetches \ WHERE project_id = ?1 AND entity_type = 'merge_request' AND job_type = 'resource_events' \ AND entity_local_id IN ( \ SELECT id FROM merge_requests \ WHERE project_id = ?1 \ AND updated_at <= COALESCE(resource_events_synced_for_updated_at, 0) \ )", [project_id], )?; } _ => {} } let entities: Vec<(i64, i64)> = match entity_type { "issue" => { let mut stmt = conn.prepare_cached( "SELECT id, iid FROM issues \ WHERE project_id = ?1 \ AND updated_at > COALESCE(resource_events_synced_for_updated_at, 0)", )?; stmt.query_map([project_id], |row| Ok((row.get(0)?, row.get(1)?)))? .collect::, _>>()? } "merge_request" => { let mut stmt = conn.prepare_cached( "SELECT id, iid FROM merge_requests \ WHERE project_id = ?1 \ AND updated_at > COALESCE(resource_events_synced_for_updated_at, 0)", )?; stmt.query_map([project_id], |row| Ok((row.get(0)?, row.get(1)?)))? .collect::, _>>()? } _ => return Ok(0), }; let mut enqueued = 0; for (local_id, iid) in &entities { if enqueue_job( conn, project_id, entity_type, *iid, *local_id, "resource_events", None, )? { enqueued += 1; } } Ok(enqueued) } /// Result of a concurrent HTTP prefetch for resource events. #[allow(clippy::type_complexity)] struct PrefetchedResourceEvents { job_id: i64, project_id: i64, entity_type: String, entity_iid: i64, entity_local_id: i64, result: std::result::Result< ( Vec, Vec, Vec, ), crate::core::error::LoreError, >, } async fn prefetch_resource_events( client: &GitLabClient, gitlab_project_id: i64, job_id: i64, project_id: i64, entity_type: String, entity_iid: i64, entity_local_id: i64, ) -> PrefetchedResourceEvents { let result = client .fetch_all_resource_events(gitlab_project_id, &entity_type, entity_iid) .await; PrefetchedResourceEvents { job_id, project_id, entity_type, entity_iid, entity_local_id, result, } } #[instrument( skip(conn, client, config, progress, signal), fields(project_id, gitlab_project_id, items_processed, errors) )] async fn drain_resource_events( conn: &Connection, client: &GitLabClient, config: &Config, project_id: i64, gitlab_project_id: i64, progress: &Option, signal: &ShutdownSignal, ) -> Result { let mut result = DrainResult::default(); let batch_size = config.sync.dependent_concurrency as usize; let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?; if reclaimed > 0 { info!(reclaimed, "Reclaimed stale resource event locks"); } let claimable_counts = count_claimable_jobs(conn, project_id)?; let total_pending = claimable_counts .get("resource_events") .copied() .unwrap_or(0); if total_pending == 0 { return Ok(result); } let emit = |event: ProgressEvent| { if let Some(cb) = progress { cb(event); } }; emit(ProgressEvent::ResourceEventsFetchStarted { total: total_pending, }); let mut processed = 0; let mut seen_job_ids = std::collections::HashSet::new(); loop { if signal.is_cancelled() { info!("Shutdown requested during resource events drain, returning partial results"); break; } let jobs = claim_jobs(conn, "resource_events", project_id, batch_size)?; if jobs.is_empty() { break; } // Phase 1: Concurrent HTTP fetches let futures: Vec<_> = jobs .iter() .filter(|j| seen_job_ids.insert(j.id)) .map(|j| { prefetch_resource_events( client, gitlab_project_id, j.id, j.project_id, j.entity_type.clone(), j.entity_iid, j.entity_local_id, ) }) .collect(); if futures.is_empty() { warn!("All claimed jobs were already processed, breaking drain loop"); break; } let prefetched = join_all(futures).await; // Phase 2: Serial DB writes for p in prefetched { match p.result { Ok((state_events, label_events, milestone_events)) => { let store_result = store_resource_events( conn, p.project_id, &p.entity_type, p.entity_local_id, &state_events, &label_events, &milestone_events, ); match store_result { Ok(()) => { let tx = conn.unchecked_transaction()?; complete_job_tx(&tx, p.job_id)?; update_resource_event_watermark_tx( &tx, &p.entity_type, p.entity_local_id, )?; tx.commit()?; result.fetched += 1; } Err(e) => { warn!( entity_type = %p.entity_type, entity_iid = p.entity_iid, error = %e, "Failed to store resource events" ); fail_job(conn, p.job_id, &e.to_string())?; result.failed += 1; } } } Err(e) => { if e.is_permanent_api_error() { debug!( entity_type = %p.entity_type, entity_iid = p.entity_iid, error = %e, "Permanent API error for resource events, marking complete" ); let tx = conn.unchecked_transaction()?; complete_job_tx(&tx, p.job_id)?; update_resource_event_watermark_tx(&tx, &p.entity_type, p.entity_local_id)?; tx.commit()?; result.skipped_not_found += 1; } else { warn!( entity_type = %p.entity_type, entity_iid = p.entity_iid, error = %e, "Failed to fetch resource events from GitLab" ); fail_job(conn, p.job_id, &e.to_string())?; result.failed += 1; } } } processed += 1; emit(ProgressEvent::ResourceEventFetched { current: processed, total: total_pending, }); } } emit(ProgressEvent::ResourceEventsFetchComplete { fetched: result.fetched, failed: result.failed, }); if result.fetched > 0 || result.failed > 0 { info!( fetched = result.fetched, failed = result.failed, "Resource events drain complete" ); } tracing::Span::current().record("items_processed", result.fetched); tracing::Span::current().record("errors", result.failed); Ok(result) } fn store_resource_events( conn: &Connection, project_id: i64, entity_type: &str, entity_local_id: i64, state_events: &[crate::gitlab::types::GitLabStateEvent], label_events: &[crate::gitlab::types::GitLabLabelEvent], milestone_events: &[crate::gitlab::types::GitLabMilestoneEvent], ) -> Result<()> { let tx = conn.unchecked_transaction()?; if !state_events.is_empty() { crate::core::events_db::upsert_state_events( &tx, project_id, entity_type, entity_local_id, state_events, )?; } if !label_events.is_empty() { crate::core::events_db::upsert_label_events( &tx, project_id, entity_type, entity_local_id, label_events, )?; } if !milestone_events.is_empty() { crate::core::events_db::upsert_milestone_events( &tx, project_id, entity_type, entity_local_id, milestone_events, )?; } tx.commit()?; Ok(()) } fn update_closes_issues_watermark_tx( tx: &rusqlite::Transaction<'_>, mr_local_id: i64, ) -> Result<()> { tx.execute( "UPDATE merge_requests SET closes_issues_synced_for_updated_at = updated_at WHERE id = ?", [mr_local_id], )?; Ok(()) } fn update_resource_event_watermark_tx( tx: &rusqlite::Transaction<'_>, entity_type: &str, entity_local_id: i64, ) -> Result<()> { match entity_type { "issue" => { tx.execute( "UPDATE issues SET resource_events_synced_for_updated_at = updated_at WHERE id = ?", [entity_local_id], )?; } "merge_request" => { tx.execute( "UPDATE merge_requests SET resource_events_synced_for_updated_at = updated_at WHERE id = ?", [entity_local_id], )?; } _ => {} } Ok(()) } fn enqueue_mr_closes_issues_jobs(conn: &Connection, project_id: i64) -> Result { // Remove stale jobs for MRs that haven't changed since their last closes_issues sync conn.execute( "DELETE FROM pending_dependent_fetches \ WHERE project_id = ?1 AND entity_type = 'merge_request' AND job_type = 'mr_closes_issues' \ AND entity_local_id IN ( \ SELECT id FROM merge_requests \ WHERE project_id = ?1 \ AND updated_at <= COALESCE(closes_issues_synced_for_updated_at, 0) \ )", [project_id], )?; let mut stmt = conn.prepare_cached( "SELECT id, iid FROM merge_requests \ WHERE project_id = ?1 \ AND updated_at > COALESCE(closes_issues_synced_for_updated_at, 0)", )?; let entities: Vec<(i64, i64)> = stmt .query_map([project_id], |row| Ok((row.get(0)?, row.get(1)?)))? .collect::, _>>()?; let mut enqueued = 0; for (local_id, iid) in &entities { if enqueue_job( conn, project_id, "merge_request", *iid, *local_id, "mr_closes_issues", None, )? { enqueued += 1; } } Ok(enqueued) } /// Result of a concurrent HTTP prefetch for closes-issues references. struct PrefetchedClosesIssues { job_id: i64, entity_iid: i64, entity_local_id: i64, result: std::result::Result< Vec, crate::core::error::LoreError, >, } async fn prefetch_closes_issues( client: &GitLabClient, gitlab_project_id: i64, job_id: i64, entity_iid: i64, entity_local_id: i64, ) -> PrefetchedClosesIssues { let result = client .fetch_mr_closes_issues(gitlab_project_id, entity_iid) .await; PrefetchedClosesIssues { job_id, entity_iid, entity_local_id, result, } } #[instrument( skip(conn, client, config, progress, signal), fields(project_id, gitlab_project_id, items_processed, errors) )] async fn drain_mr_closes_issues( conn: &Connection, client: &GitLabClient, config: &Config, project_id: i64, gitlab_project_id: i64, progress: &Option, signal: &ShutdownSignal, ) -> Result { let mut result = DrainResult::default(); let batch_size = config.sync.dependent_concurrency as usize; let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?; if reclaimed > 0 { info!(reclaimed, "Reclaimed stale mr_closes_issues locks"); } let claimable_counts = count_claimable_jobs(conn, project_id)?; let total_pending = claimable_counts .get("mr_closes_issues") .copied() .unwrap_or(0); if total_pending == 0 { return Ok(result); } let emit = |event: ProgressEvent| { if let Some(cb) = progress { cb(event); } }; emit(ProgressEvent::ClosesIssuesFetchStarted { total: total_pending, }); let mut processed = 0; let mut seen_job_ids = std::collections::HashSet::new(); loop { if signal.is_cancelled() { info!("Shutdown requested during closes_issues drain, returning partial results"); break; } let jobs = claim_jobs(conn, "mr_closes_issues", project_id, batch_size)?; if jobs.is_empty() { break; } // Phase 1: Concurrent HTTP fetches let futures: Vec<_> = jobs .iter() .filter(|j| seen_job_ids.insert(j.id)) .map(|j| { prefetch_closes_issues( client, gitlab_project_id, j.id, j.entity_iid, j.entity_local_id, ) }) .collect(); if futures.is_empty() { warn!("All claimed mr_closes_issues jobs were already processed, breaking drain loop"); break; } let prefetched = join_all(futures).await; // Phase 2: Serial DB writes for p in prefetched { match p.result { Ok(closes_issues) => { let store_result = store_closes_issues_refs( conn, project_id, p.entity_local_id, &closes_issues, ); match store_result { Ok(()) => { let tx = conn.unchecked_transaction()?; complete_job_tx(&tx, p.job_id)?; update_closes_issues_watermark_tx(&tx, p.entity_local_id)?; tx.commit()?; result.fetched += 1; } Err(e) => { warn!( entity_iid = p.entity_iid, error = %e, "Failed to store closes_issues references" ); fail_job(conn, p.job_id, &e.to_string())?; result.failed += 1; } } } Err(e) => { if e.is_permanent_api_error() { debug!( entity_iid = p.entity_iid, error = %e, "Permanent API error for closes_issues, marking complete" ); let tx = conn.unchecked_transaction()?; complete_job_tx(&tx, p.job_id)?; update_closes_issues_watermark_tx(&tx, p.entity_local_id)?; tx.commit()?; result.skipped_not_found += 1; } else { warn!( entity_iid = p.entity_iid, error = %e, "Failed to fetch closes_issues from GitLab" ); fail_job(conn, p.job_id, &e.to_string())?; result.failed += 1; } } } processed += 1; emit(ProgressEvent::ClosesIssueFetched { current: processed, total: total_pending, }); } } emit(ProgressEvent::ClosesIssuesFetchComplete { fetched: result.fetched, failed: result.failed, }); if result.fetched > 0 || result.failed > 0 { info!( fetched = result.fetched, failed = result.failed, "mr_closes_issues drain complete" ); } tracing::Span::current().record("items_processed", result.fetched); tracing::Span::current().record("errors", result.failed); Ok(result) } fn store_closes_issues_refs( conn: &Connection, project_id: i64, mr_local_id: i64, closes_issues: &[crate::gitlab::types::GitLabIssueRef], ) -> Result<()> { for issue_ref in closes_issues { let target_local_id = resolve_issue_local_id(conn, project_id, issue_ref.iid)?; let (target_id, target_path, target_iid) = if let Some(local_id) = target_local_id { (Some(local_id), None, None) } else { let path = resolve_project_path(conn, issue_ref.project_id)?; let fallback = path.unwrap_or_else(|| format!("gitlab_project:{}", issue_ref.project_id)); (None, Some(fallback), Some(issue_ref.iid)) }; let ref_ = EntityReference { project_id, source_entity_type: "merge_request", source_entity_id: mr_local_id, target_entity_type: "issue", target_entity_id: target_id, target_project_path: target_path.as_deref(), target_entity_iid: target_iid, reference_type: "closes", source_method: "api", }; insert_entity_reference(conn, &ref_)?; } Ok(()) } #[cfg(test)] mod tests { use super::*; #[test] fn result_default_has_zero_counts() { let result = IngestProjectResult::default(); assert_eq!(result.issues_fetched, 0); assert_eq!(result.issues_upserted, 0); assert_eq!(result.labels_created, 0); assert_eq!(result.discussions_fetched, 0); assert_eq!(result.notes_upserted, 0); assert_eq!(result.issues_synced_discussions, 0); assert_eq!(result.issues_skipped_discussion_sync, 0); assert_eq!(result.resource_events_fetched, 0); assert_eq!(result.resource_events_failed, 0); } #[test] fn mr_result_default_has_zero_counts() { let result = IngestMrProjectResult::default(); assert_eq!(result.mrs_fetched, 0); assert_eq!(result.mrs_upserted, 0); assert_eq!(result.labels_created, 0); assert_eq!(result.assignees_linked, 0); assert_eq!(result.reviewers_linked, 0); assert_eq!(result.discussions_fetched, 0); assert_eq!(result.discussions_upserted, 0); assert_eq!(result.notes_upserted, 0); assert_eq!(result.notes_skipped_bad_timestamp, 0); assert_eq!(result.diffnotes_count, 0); assert_eq!(result.mrs_synced_discussions, 0); assert_eq!(result.mrs_skipped_discussion_sync, 0); assert_eq!(result.resource_events_fetched, 0); assert_eq!(result.resource_events_failed, 0); } #[test] fn drain_result_default_has_zero_counts() { let result = DrainResult::default(); assert_eq!(result.fetched, 0); assert_eq!(result.failed, 0); assert_eq!(result.skipped_not_found, 0); } #[test] fn progress_event_resource_variants_exist() { let _start = ProgressEvent::ResourceEventsFetchStarted { total: 10 }; let _progress = ProgressEvent::ResourceEventFetched { current: 5, total: 10, }; let _complete = ProgressEvent::ResourceEventsFetchComplete { fetched: 8, failed: 2, }; } }