diff --git a/src/ingestion/orchestrator.rs b/src/ingestion/orchestrator.rs index d34cd52..8dc53a8 100644 --- a/src/ingestion/orchestrator.rs +++ b/src/ingestion/orchestrator.rs @@ -1,10 +1,3 @@ -//! Ingestion orchestrator: coordinates issue/MR and discussion sync. -//! -//! Implements the canonical pattern: -//! 1. Fetch resources (issues or MRs) with cursor-based sync -//! 2. Identify resources needing discussion sync -//! 3. Execute discussion sync with parallel prefetch (fetch in parallel, write serially) - use futures::future::join_all; use rusqlite::Connection; use tracing::{debug, info, instrument, warn}; @@ -14,6 +7,9 @@ use crate::core::dependent_queue::{ claim_jobs, complete_job, count_claimable_jobs, enqueue_job, fail_job, reclaim_stale_locks, }; use crate::core::error::Result; +use crate::core::references::{ + EntityReference, insert_entity_reference, resolve_issue_local_id, resolve_project_path, +}; use crate::gitlab::GitLabClient; use super::discussions::ingest_issue_discussions; @@ -23,45 +19,30 @@ use super::merge_requests::{ }; use super::mr_discussions::{prefetch_mr_discussions, write_prefetched_mr_discussions}; -/// Progress callback for ingestion operations. pub type ProgressCallback = Box; -/// Progress events emitted during ingestion. #[derive(Debug, Clone)] pub enum ProgressEvent { - /// Issue fetching started IssuesFetchStarted, - /// An issue was fetched (current count) IssueFetched { count: usize }, - /// Issue fetching complete IssuesFetchComplete { total: usize }, - /// Discussion sync started (total issues to sync) DiscussionSyncStarted { total: usize }, - /// Discussion synced for an issue (current/total) DiscussionSynced { current: usize, total: usize }, - /// Discussion sync complete DiscussionSyncComplete, - /// MR fetching started MrsFetchStarted, - /// An MR was fetched (current count) MrFetched { count: usize }, - /// MR fetching complete MrsFetchComplete { total: usize }, - /// MR discussion sync started (total MRs to sync) MrDiscussionSyncStarted { total: usize }, - /// MR discussion synced (current/total) MrDiscussionSynced { current: usize, total: usize }, - /// MR discussion sync complete MrDiscussionSyncComplete, - /// Resource event fetching started (total jobs) ResourceEventsFetchStarted { total: usize }, - /// Resource event fetched for an entity (current/total) ResourceEventFetched { current: usize, total: usize }, - /// Resource event fetching complete ResourceEventsFetchComplete { fetched: usize, failed: usize }, + ClosesIssuesFetchStarted { total: usize }, + ClosesIssueFetched { current: usize, total: usize }, + ClosesIssuesFetchComplete { fetched: usize, failed: usize }, } -/// Result of full project ingestion (issues). #[derive(Debug, Default)] pub struct IngestProjectResult { pub issues_fetched: usize, @@ -76,7 +57,6 @@ pub struct IngestProjectResult { pub resource_events_failed: usize, } -/// Result of MR ingestion for a project. #[derive(Debug, Default)] pub struct IngestMrProjectResult { pub mrs_fetched: usize, @@ -93,9 +73,10 @@ pub struct IngestMrProjectResult { pub mrs_skipped_discussion_sync: usize, pub resource_events_fetched: usize, pub resource_events_failed: usize, + pub closes_issues_fetched: usize, + pub closes_issues_failed: usize, } -/// Ingest all issues and their discussions for a project. pub async fn ingest_project_issues( conn: &Connection, client: &GitLabClient, @@ -107,7 +88,6 @@ pub async fn ingest_project_issues( .await } -/// Ingest all issues and their discussions for a project with progress reporting. #[instrument( skip(conn, client, config, progress), fields(project_id, gitlab_project_id, items_processed, items_skipped, errors) @@ -127,7 +107,6 @@ pub async fn ingest_project_issues_with_progress( } }; - // Step 1: Ingest issues emit(ProgressEvent::IssuesFetchStarted); let issue_result = ingest_issues(conn, client, config, project_id, gitlab_project_id).await?; @@ -139,10 +118,8 @@ pub async fn ingest_project_issues_with_progress( total: result.issues_fetched, }); - // Step 2: Sync discussions for issues that need it let issues_needing_sync = issue_result.issues_needing_discussion_sync; - // Query actual total issues for accurate skip count (issues_upserted only counts this run) let total_issues: i64 = conn .query_row( "SELECT COUNT(*) FROM issues WHERE project_id = ?", @@ -153,7 +130,6 @@ pub async fn ingest_project_issues_with_progress( let total_issues = total_issues as usize; result.issues_skipped_discussion_sync = total_issues.saturating_sub(issues_needing_sync.len()); - // Step 3: Sync discussions for issues that need it if issues_needing_sync.is_empty() { info!("No issues need discussion sync"); } else { @@ -166,7 +142,6 @@ pub async fn ingest_project_issues_with_progress( total: issues_needing_sync.len(), }); - // Execute sequential discussion sync (see function doc for why not concurrent) let discussion_results = sync_discussions_sequential( conn, client, @@ -180,7 +155,6 @@ pub async fn ingest_project_issues_with_progress( emit(ProgressEvent::DiscussionSyncComplete); - // Aggregate discussion results for disc_result in discussion_results { result.discussions_fetched += disc_result.discussions_fetched; result.discussions_upserted += disc_result.discussions_upserted; @@ -189,15 +163,12 @@ pub async fn ingest_project_issues_with_progress( } } - // Step 4: Enqueue and drain resource events (if enabled) if config.sync.fetch_resource_events { - // Enqueue resource_events jobs for all issues in this project let enqueued = enqueue_resource_events_for_entity_type(conn, project_id, "issue")?; if enqueued > 0 { debug!(enqueued, "Enqueued resource events jobs for issues"); } - // Drain the queue let drain_result = drain_resource_events( conn, client, @@ -209,6 +180,15 @@ pub async fn ingest_project_issues_with_progress( .await?; result.resource_events_fetched = drain_result.fetched; result.resource_events_failed = drain_result.failed; + + let refs_inserted = + crate::core::references::extract_refs_from_state_events(conn, project_id)?; + if refs_inserted > 0 { + debug!( + refs_inserted, + "Extracted cross-references from state events" + ); + } } info!( @@ -231,12 +211,6 @@ pub async fn ingest_project_issues_with_progress( Ok(result) } -/// Sync discussions sequentially for each issue. -/// -/// NOTE: Despite the config having `dependent_concurrency`, we process sequentially -/// because rusqlite's `Connection` is not `Send` and cannot be shared across tasks. -/// True concurrency would require connection pooling (r2d2, deadpool, etc.). -/// The batch_size from config is used for progress logging granularity. async fn sync_discussions_sequential( conn: &Connection, client: &GitLabClient, @@ -251,7 +225,6 @@ async fn sync_discussions_sequential( let mut results = Vec::with_capacity(issues.len()); - // Process in batches for progress feedback (actual processing is sequential) for chunk in issues.chunks(batch_size) { for issue in chunk { let disc_result = ingest_issue_discussions( @@ -265,7 +238,6 @@ async fn sync_discussions_sequential( .await?; results.push(disc_result); - // Emit progress if let Some(cb) = progress { cb(ProgressEvent::DiscussionSynced { current: results.len(), @@ -278,7 +250,6 @@ async fn sync_discussions_sequential( Ok(results) } -/// Ingest all merge requests and their discussions for a project. pub async fn ingest_project_merge_requests( conn: &Connection, client: &GitLabClient, @@ -299,7 +270,6 @@ pub async fn ingest_project_merge_requests( .await } -/// Ingest all merge requests and their discussions for a project with progress reporting. #[instrument( skip(conn, client, config, progress), fields(project_id, gitlab_project_id, items_processed, items_skipped, errors) @@ -320,7 +290,6 @@ pub async fn ingest_project_merge_requests_with_progress( } }; - // Step 1: Ingest MRs emit(ProgressEvent::MrsFetchStarted); let mr_result = ingest_merge_requests( conn, @@ -342,11 +311,8 @@ pub async fn ingest_project_merge_requests_with_progress( total: result.mrs_fetched, }); - // Step 2: Query DB for MRs needing discussion sync - // CRITICAL: Query AFTER ingestion to avoid memory growth during large ingests let mrs_needing_sync = get_mrs_needing_discussion_sync(conn, project_id)?; - // Query total MRs for accurate skip count let total_mrs: i64 = conn .query_row( "SELECT COUNT(*) FROM merge_requests WHERE project_id = ?", @@ -357,7 +323,6 @@ pub async fn ingest_project_merge_requests_with_progress( let total_mrs = total_mrs as usize; result.mrs_skipped_discussion_sync = total_mrs.saturating_sub(mrs_needing_sync.len()); - // Step 3: Sync discussions for MRs that need it if mrs_needing_sync.is_empty() { info!("No MRs need discussion sync"); } else { @@ -370,7 +335,6 @@ pub async fn ingest_project_merge_requests_with_progress( total: mrs_needing_sync.len(), }); - // Execute sequential MR discussion sync let discussion_results = sync_mr_discussions_sequential( conn, client, @@ -384,7 +348,6 @@ pub async fn ingest_project_merge_requests_with_progress( emit(ProgressEvent::MrDiscussionSyncComplete); - // Aggregate discussion results for disc_result in discussion_results { result.discussions_fetched += disc_result.discussions_fetched; result.discussions_upserted += disc_result.discussions_upserted; @@ -397,7 +360,6 @@ pub async fn ingest_project_merge_requests_with_progress( } } - // Step 4: Enqueue and drain resource events (if enabled) if config.sync.fetch_resource_events { let enqueued = enqueue_resource_events_for_entity_type(conn, project_id, "merge_request")?; if enqueued > 0 { @@ -415,6 +377,44 @@ pub async fn ingest_project_merge_requests_with_progress( .await?; result.resource_events_fetched = drain_result.fetched; result.resource_events_failed = drain_result.failed; + + let refs_inserted = + crate::core::references::extract_refs_from_state_events(conn, project_id)?; + if refs_inserted > 0 { + debug!( + refs_inserted, + "Extracted cross-references from state events" + ); + } + } + + let note_refs = crate::core::note_parser::extract_refs_from_system_notes(conn, project_id)?; + if note_refs.inserted > 0 || note_refs.skipped_unresolvable > 0 { + debug!( + inserted = note_refs.inserted, + unresolvable = note_refs.skipped_unresolvable, + parse_failures = note_refs.parse_failures, + "Extracted cross-references from system notes (MRs)" + ); + } + + { + let enqueued = enqueue_mr_closes_issues_jobs(conn, project_id)?; + if enqueued > 0 { + debug!(enqueued, "Enqueued mr_closes_issues jobs"); + } + + let closes_result = drain_mr_closes_issues( + conn, + client, + config, + project_id, + gitlab_project_id, + &progress, + ) + .await?; + result.closes_issues_fetched = closes_result.fetched; + result.closes_issues_failed = closes_result.failed; } info!( @@ -428,6 +428,8 @@ pub async fn ingest_project_merge_requests_with_progress( mrs_skipped = result.mrs_skipped_discussion_sync, resource_events_fetched = result.resource_events_fetched, resource_events_failed = result.resource_events_failed, + closes_issues_fetched = result.closes_issues_fetched, + closes_issues_failed = result.closes_issues_failed, "MR project ingestion complete" ); @@ -438,10 +440,6 @@ pub async fn ingest_project_merge_requests_with_progress( Ok(result) } -/// Sync discussions for MRs with parallel API prefetching. -/// -/// Pattern: Fetch discussions for multiple MRs in parallel, then write serially. -/// This overlaps network I/O while respecting rusqlite's single-connection constraint. async fn sync_mr_discussions_sequential( conn: &Connection, client: &GitLabClient, @@ -457,22 +455,18 @@ async fn sync_mr_discussions_sequential( let mut results = Vec::with_capacity(mrs.len()); let mut processed = 0; - // Process in batches: parallel API fetch, serial DB write for chunk in mrs.chunks(batch_size) { - // Step 1: Prefetch discussions for all MRs in this batch in parallel let prefetch_futures = chunk.iter().map(|mr| { prefetch_mr_discussions(client, gitlab_project_id, local_project_id, mr.clone()) }); let prefetched_batch = join_all(prefetch_futures).await; - // Step 2: Write each prefetched result serially for prefetched in prefetched_batch { let disc_result = write_prefetched_mr_discussions(conn, config, local_project_id, prefetched)?; results.push(disc_result); processed += 1; - // Emit progress if let Some(cb) = progress { cb(ProgressEvent::MrDiscussionSynced { current: processed, @@ -485,7 +479,6 @@ async fn sync_mr_discussions_sequential( Ok(results) } -/// Result of draining the resource events queue. #[derive(Debug, Default)] pub struct DrainResult { pub fetched: usize, @@ -493,21 +486,11 @@ pub struct DrainResult { pub skipped_not_found: usize, } -/// Enqueue resource_events jobs for all entities of a given type in a project. -/// -/// Uses the pending_dependent_fetches queue. Jobs are deduplicated by the UNIQUE -/// constraint, so re-enqueueing the same entity is a no-op. fn enqueue_resource_events_for_entity_type( conn: &Connection, project_id: i64, entity_type: &str, ) -> Result { - // Clean up obsolete jobs: remove resource_events jobs for entities whose - // watermark is already current (updated_at <= resource_events_synced_for_updated_at). - // These are leftover from prior runs that failed after watermark-stamping but - // before job deletion, or from entities that no longer need syncing. - // We intentionally keep jobs for entities that still need syncing (including - // in-progress or failed-with-backoff jobs) to preserve retry state. match entity_type { "issue" => { conn.execute( @@ -536,10 +519,6 @@ fn enqueue_resource_events_for_entity_type( _ => {} } - // Enqueue resource_events jobs only for entities whose updated_at exceeds - // their last resource event sync watermark. - // - // Use separate hardcoded queries per entity type to avoid format!-based SQL. let entities: Vec<(i64, i64)> = match entity_type { "issue" => { let mut stmt = conn.prepare_cached( @@ -580,10 +559,6 @@ fn enqueue_resource_events_for_entity_type( Ok(enqueued) } -/// Drain pending resource_events jobs: claim, fetch from GitLab, store, complete/fail. -/// -/// Processes jobs sequentially since `rusqlite::Connection` is not `Send`. -/// Uses exponential backoff on failure via `fail_job`. #[instrument( skip(conn, client, config, progress), fields(project_id, gitlab_project_id, items_processed, errors) @@ -599,16 +574,11 @@ async fn drain_resource_events( let mut result = DrainResult::default(); let batch_size = config.sync.dependent_concurrency as usize; - // Reclaim stale locks from crashed processes let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?; if reclaimed > 0 { info!(reclaimed, "Reclaimed stale resource event locks"); } - // Count only claimable jobs (unlocked, past retry backoff) for accurate progress. - // Using count_pending_jobs here would inflate the total with locked/backing-off - // jobs that can't be claimed in this drain run, causing the progress bar to - // never reach 100%. let claimable_counts = count_claimable_jobs(conn, project_id)?; let total_pending = claimable_counts .get("resource_events") @@ -638,14 +608,9 @@ async fn drain_resource_events( break; } - // Track whether any job in this batch was actually new. If every - // claimed job was already seen, break to avoid an infinite loop - // (can happen with clock skew or zero-backoff edge cases). let mut any_new_in_batch = false; for job in &jobs { - // Guard against re-processing a job that was failed and re-claimed - // within the same drain run. if !seen_job_ids.insert(job.id) { warn!( job_id = job.id, @@ -693,10 +658,6 @@ async fn drain_resource_events( } } Err(e) => { - // Only 404 (not found) is truly permanent -- the resource - // events endpoint doesn't exist for this entity. Stamp the - // watermark so we skip it next run. All other errors - // (403, auth, network) get backoff retry. if e.is_permanent_api_error() { debug!( entity_type = %job.entity_type, @@ -731,7 +692,6 @@ async fn drain_resource_events( }); } - // If every job in this batch was already seen, stop to prevent spinning. if !any_new_in_batch { warn!("All claimed jobs were already processed, breaking drain loop"); break; @@ -757,9 +717,6 @@ async fn drain_resource_events( Ok(result) } -/// Store fetched resource events in the database. -/// -/// Wraps all three event types in a single transaction for atomicity. fn store_resource_events( conn: &Connection, project_id: i64, @@ -805,10 +762,6 @@ fn store_resource_events( Ok(()) } -/// Update the resource event watermark for an entity after successful event fetch. -/// -/// Sets `resource_events_synced_for_updated_at = updated_at` so the entity -/// won't be re-enqueued until its `updated_at` advances again. fn update_resource_event_watermark( conn: &Connection, entity_type: &str, @@ -832,6 +785,209 @@ fn update_resource_event_watermark( Ok(()) } +fn enqueue_mr_closes_issues_jobs(conn: &Connection, project_id: i64) -> Result { + let mut stmt = + conn.prepare_cached("SELECT id, iid FROM merge_requests WHERE project_id = ?1")?; + let entities: Vec<(i64, i64)> = stmt + .query_map([project_id], |row| Ok((row.get(0)?, row.get(1)?)))? + .collect::, _>>()?; + + let mut enqueued = 0; + for (local_id, iid) in &entities { + if enqueue_job( + conn, + project_id, + "merge_request", + *iid, + *local_id, + "mr_closes_issues", + None, + )? { + enqueued += 1; + } + } + + Ok(enqueued) +} + +#[instrument( + skip(conn, client, config, progress), + fields(project_id, gitlab_project_id, items_processed, errors) +)] +async fn drain_mr_closes_issues( + conn: &Connection, + client: &GitLabClient, + config: &Config, + project_id: i64, + gitlab_project_id: i64, + progress: &Option, +) -> Result { + let mut result = DrainResult::default(); + let batch_size = config.sync.dependent_concurrency as usize; + + let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?; + if reclaimed > 0 { + info!(reclaimed, "Reclaimed stale mr_closes_issues locks"); + } + + let claimable_counts = count_claimable_jobs(conn, project_id)?; + let total_pending = claimable_counts + .get("mr_closes_issues") + .copied() + .unwrap_or(0); + + if total_pending == 0 { + return Ok(result); + } + + let emit = |event: ProgressEvent| { + if let Some(cb) = progress { + cb(event); + } + }; + + emit(ProgressEvent::ClosesIssuesFetchStarted { + total: total_pending, + }); + + let mut processed = 0; + let mut seen_job_ids = std::collections::HashSet::new(); + + loop { + let jobs = claim_jobs(conn, "mr_closes_issues", project_id, batch_size)?; + if jobs.is_empty() { + break; + } + + let mut any_new_in_batch = false; + + for job in &jobs { + if !seen_job_ids.insert(job.id) { + warn!( + job_id = job.id, + "Skipping already-processed mr_closes_issues job" + ); + continue; + } + any_new_in_batch = true; + + match client + .fetch_mr_closes_issues(gitlab_project_id, job.entity_iid) + .await + { + Ok(closes_issues) => { + let store_result = store_closes_issues_refs( + conn, + project_id, + job.entity_local_id, + &closes_issues, + ); + + match store_result { + Ok(()) => { + complete_job(conn, job.id)?; + result.fetched += 1; + } + Err(e) => { + warn!( + entity_iid = job.entity_iid, + error = %e, + "Failed to store closes_issues references" + ); + fail_job(conn, job.id, &e.to_string())?; + result.failed += 1; + } + } + } + Err(e) => { + if e.is_permanent_api_error() { + debug!( + entity_iid = job.entity_iid, + error = %e, + "Permanent API error for closes_issues, marking complete" + ); + complete_job(conn, job.id)?; + result.skipped_not_found += 1; + } else { + warn!( + entity_iid = job.entity_iid, + error = %e, + "Failed to fetch closes_issues from GitLab" + ); + fail_job(conn, job.id, &e.to_string())?; + result.failed += 1; + } + } + } + + processed += 1; + emit(ProgressEvent::ClosesIssueFetched { + current: processed, + total: total_pending, + }); + } + + if !any_new_in_batch { + warn!("All claimed mr_closes_issues jobs were already processed, breaking drain loop"); + break; + } + } + + emit(ProgressEvent::ClosesIssuesFetchComplete { + fetched: result.fetched, + failed: result.failed, + }); + + if result.fetched > 0 || result.failed > 0 { + info!( + fetched = result.fetched, + failed = result.failed, + "mr_closes_issues drain complete" + ); + } + + tracing::Span::current().record("items_processed", result.fetched); + tracing::Span::current().record("errors", result.failed); + + Ok(result) +} + +fn store_closes_issues_refs( + conn: &Connection, + project_id: i64, + mr_local_id: i64, + closes_issues: &[crate::gitlab::types::GitLabIssueRef], +) -> Result<()> { + for issue_ref in closes_issues { + let target_local_id = resolve_issue_local_id(conn, project_id, issue_ref.iid)?; + + let (target_id, target_path, target_iid) = if let Some(local_id) = target_local_id { + (Some(local_id), None, None) + } else { + let path = resolve_project_path(conn, issue_ref.project_id)?; + let fallback = + path.unwrap_or_else(|| format!("gitlab_project:{}", issue_ref.project_id)); + (None, Some(fallback), Some(issue_ref.iid)) + }; + + let ref_ = EntityReference { + project_id, + source_entity_type: "merge_request", + source_entity_id: mr_local_id, + target_entity_type: "issue", + target_entity_id: target_id, + target_project_path: target_path.as_deref(), + target_entity_iid: target_iid, + reference_type: "closes", + source_method: "api", + }; + + insert_entity_reference(conn, &ref_)?; + } + + Ok(()) +} + #[cfg(test)] mod tests { use super::*; @@ -879,7 +1035,6 @@ mod tests { #[test] fn progress_event_resource_variants_exist() { - // Verify the new progress event variants are constructible let _start = ProgressEvent::ResourceEventsFetchStarted { total: 10 }; let _progress = ProgressEvent::ResourceEventFetched { current: 5,