//! Ingestion orchestrator: coordinates issue/MR and discussion sync. //! //! Implements the canonical pattern: //! 1. Fetch resources (issues or MRs) with cursor-based sync //! 2. Identify resources needing discussion sync //! 3. Execute discussion sync with parallel prefetch (fetch in parallel, write serially) use futures::future::join_all; use rusqlite::Connection; use tracing::{debug, info, instrument, warn}; use crate::Config; use crate::core::dependent_queue::{ claim_jobs, complete_job, count_claimable_jobs, enqueue_job, fail_job, reclaim_stale_locks, }; use crate::core::error::Result; use crate::gitlab::GitLabClient; use super::discussions::ingest_issue_discussions; use super::issues::{IssueForDiscussionSync, ingest_issues}; use super::merge_requests::{ MrForDiscussionSync, get_mrs_needing_discussion_sync, ingest_merge_requests, }; use super::mr_discussions::{prefetch_mr_discussions, write_prefetched_mr_discussions}; /// Progress callback for ingestion operations. pub type ProgressCallback = Box; /// Progress events emitted during ingestion. #[derive(Debug, Clone)] pub enum ProgressEvent { /// Issue fetching started IssuesFetchStarted, /// An issue was fetched (current count) IssueFetched { count: usize }, /// Issue fetching complete IssuesFetchComplete { total: usize }, /// Discussion sync started (total issues to sync) DiscussionSyncStarted { total: usize }, /// Discussion synced for an issue (current/total) DiscussionSynced { current: usize, total: usize }, /// Discussion sync complete DiscussionSyncComplete, /// MR fetching started MrsFetchStarted, /// An MR was fetched (current count) MrFetched { count: usize }, /// MR fetching complete MrsFetchComplete { total: usize }, /// MR discussion sync started (total MRs to sync) MrDiscussionSyncStarted { total: usize }, /// MR discussion synced (current/total) MrDiscussionSynced { current: usize, total: usize }, /// MR discussion sync complete MrDiscussionSyncComplete, /// Resource event fetching started (total jobs) ResourceEventsFetchStarted { total: usize }, /// Resource event fetched for an entity (current/total) ResourceEventFetched { current: usize, total: usize }, /// Resource event fetching complete ResourceEventsFetchComplete { fetched: usize, failed: usize }, } /// Result of full project ingestion (issues). #[derive(Debug, Default)] pub struct IngestProjectResult { pub issues_fetched: usize, pub issues_upserted: usize, pub labels_created: usize, pub discussions_fetched: usize, pub discussions_upserted: usize, pub notes_upserted: usize, pub issues_synced_discussions: usize, pub issues_skipped_discussion_sync: usize, pub resource_events_fetched: usize, pub resource_events_failed: usize, } /// Result of MR ingestion for a project. #[derive(Debug, Default)] pub struct IngestMrProjectResult { pub mrs_fetched: usize, pub mrs_upserted: usize, pub labels_created: usize, pub assignees_linked: usize, pub reviewers_linked: usize, pub discussions_fetched: usize, pub discussions_upserted: usize, pub notes_upserted: usize, pub notes_skipped_bad_timestamp: usize, pub diffnotes_count: usize, pub mrs_synced_discussions: usize, pub mrs_skipped_discussion_sync: usize, pub resource_events_fetched: usize, pub resource_events_failed: usize, } /// Ingest all issues and their discussions for a project. pub async fn ingest_project_issues( conn: &Connection, client: &GitLabClient, config: &Config, project_id: i64, gitlab_project_id: i64, ) -> Result { ingest_project_issues_with_progress(conn, client, config, project_id, gitlab_project_id, None) .await } /// Ingest all issues and their discussions for a project with progress reporting. #[instrument( skip(conn, client, config, progress), fields(project_id, gitlab_project_id, items_processed, items_skipped, errors) )] pub async fn ingest_project_issues_with_progress( conn: &Connection, client: &GitLabClient, config: &Config, project_id: i64, gitlab_project_id: i64, progress: Option, ) -> Result { let mut result = IngestProjectResult::default(); let emit = |event: ProgressEvent| { if let Some(ref cb) = progress { cb(event); } }; // Step 1: Ingest issues emit(ProgressEvent::IssuesFetchStarted); let issue_result = ingest_issues(conn, client, config, project_id, gitlab_project_id).await?; result.issues_fetched = issue_result.fetched; result.issues_upserted = issue_result.upserted; result.labels_created = issue_result.labels_created; emit(ProgressEvent::IssuesFetchComplete { total: result.issues_fetched, }); // Step 2: Sync discussions for issues that need it let issues_needing_sync = issue_result.issues_needing_discussion_sync; // Query actual total issues for accurate skip count (issues_upserted only counts this run) let total_issues: i64 = conn .query_row( "SELECT COUNT(*) FROM issues WHERE project_id = ?", [project_id], |row| row.get(0), ) .unwrap_or(0); let total_issues = total_issues as usize; result.issues_skipped_discussion_sync = total_issues.saturating_sub(issues_needing_sync.len()); // Step 3: Sync discussions for issues that need it if issues_needing_sync.is_empty() { info!("No issues need discussion sync"); } else { info!( count = issues_needing_sync.len(), "Starting discussion sync for issues" ); emit(ProgressEvent::DiscussionSyncStarted { total: issues_needing_sync.len(), }); // Execute sequential discussion sync (see function doc for why not concurrent) let discussion_results = sync_discussions_sequential( conn, client, config, gitlab_project_id, project_id, &issues_needing_sync, &progress, ) .await?; emit(ProgressEvent::DiscussionSyncComplete); // Aggregate discussion results for disc_result in discussion_results { result.discussions_fetched += disc_result.discussions_fetched; result.discussions_upserted += disc_result.discussions_upserted; result.notes_upserted += disc_result.notes_upserted; result.issues_synced_discussions += 1; } } // Step 4: Enqueue and drain resource events (if enabled) if config.sync.fetch_resource_events { // Enqueue resource_events jobs for all issues in this project let enqueued = enqueue_resource_events_for_entity_type(conn, project_id, "issue")?; if enqueued > 0 { debug!(enqueued, "Enqueued resource events jobs for issues"); } // Drain the queue let drain_result = drain_resource_events( conn, client, config, project_id, gitlab_project_id, &progress, ) .await?; result.resource_events_fetched = drain_result.fetched; result.resource_events_failed = drain_result.failed; } info!( issues_fetched = result.issues_fetched, issues_upserted = result.issues_upserted, labels_created = result.labels_created, discussions_fetched = result.discussions_fetched, notes_upserted = result.notes_upserted, issues_synced = result.issues_synced_discussions, issues_skipped = result.issues_skipped_discussion_sync, resource_events_fetched = result.resource_events_fetched, resource_events_failed = result.resource_events_failed, "Project ingestion complete" ); tracing::Span::current().record("items_processed", result.issues_upserted); tracing::Span::current().record("items_skipped", result.issues_skipped_discussion_sync); tracing::Span::current().record("errors", result.resource_events_failed); Ok(result) } /// Sync discussions sequentially for each issue. /// /// NOTE: Despite the config having `dependent_concurrency`, we process sequentially /// because rusqlite's `Connection` is not `Send` and cannot be shared across tasks. /// True concurrency would require connection pooling (r2d2, deadpool, etc.). /// The batch_size from config is used for progress logging granularity. async fn sync_discussions_sequential( conn: &Connection, client: &GitLabClient, config: &Config, gitlab_project_id: i64, local_project_id: i64, issues: &[IssueForDiscussionSync], progress: &Option, ) -> Result> { let batch_size = config.sync.dependent_concurrency as usize; let total = issues.len(); let mut results = Vec::with_capacity(issues.len()); // Process in batches for progress feedback (actual processing is sequential) for chunk in issues.chunks(batch_size) { for issue in chunk { let disc_result = ingest_issue_discussions( conn, client, config, gitlab_project_id, local_project_id, std::slice::from_ref(issue), ) .await?; results.push(disc_result); // Emit progress if let Some(cb) = progress { cb(ProgressEvent::DiscussionSynced { current: results.len(), total, }); } } } Ok(results) } /// Ingest all merge requests and their discussions for a project. pub async fn ingest_project_merge_requests( conn: &Connection, client: &GitLabClient, config: &Config, project_id: i64, gitlab_project_id: i64, full_sync: bool, ) -> Result { ingest_project_merge_requests_with_progress( conn, client, config, project_id, gitlab_project_id, full_sync, None, ) .await } /// Ingest all merge requests and their discussions for a project with progress reporting. #[instrument( skip(conn, client, config, progress), fields(project_id, gitlab_project_id, items_processed, items_skipped, errors) )] pub async fn ingest_project_merge_requests_with_progress( conn: &Connection, client: &GitLabClient, config: &Config, project_id: i64, gitlab_project_id: i64, full_sync: bool, progress: Option, ) -> Result { let mut result = IngestMrProjectResult::default(); let emit = |event: ProgressEvent| { if let Some(ref cb) = progress { cb(event); } }; // Step 1: Ingest MRs emit(ProgressEvent::MrsFetchStarted); let mr_result = ingest_merge_requests( conn, client, config, project_id, gitlab_project_id, full_sync, ) .await?; result.mrs_fetched = mr_result.fetched; result.mrs_upserted = mr_result.upserted; result.labels_created = mr_result.labels_created; result.assignees_linked = mr_result.assignees_linked; result.reviewers_linked = mr_result.reviewers_linked; emit(ProgressEvent::MrsFetchComplete { total: result.mrs_fetched, }); // Step 2: Query DB for MRs needing discussion sync // CRITICAL: Query AFTER ingestion to avoid memory growth during large ingests let mrs_needing_sync = get_mrs_needing_discussion_sync(conn, project_id)?; // Query total MRs for accurate skip count let total_mrs: i64 = conn .query_row( "SELECT COUNT(*) FROM merge_requests WHERE project_id = ?", [project_id], |row| row.get(0), ) .unwrap_or(0); let total_mrs = total_mrs as usize; result.mrs_skipped_discussion_sync = total_mrs.saturating_sub(mrs_needing_sync.len()); // Step 3: Sync discussions for MRs that need it if mrs_needing_sync.is_empty() { info!("No MRs need discussion sync"); } else { info!( count = mrs_needing_sync.len(), "Starting discussion sync for MRs" ); emit(ProgressEvent::MrDiscussionSyncStarted { total: mrs_needing_sync.len(), }); // Execute sequential MR discussion sync let discussion_results = sync_mr_discussions_sequential( conn, client, config, gitlab_project_id, project_id, &mrs_needing_sync, &progress, ) .await?; emit(ProgressEvent::MrDiscussionSyncComplete); // Aggregate discussion results for disc_result in discussion_results { result.discussions_fetched += disc_result.discussions_fetched; result.discussions_upserted += disc_result.discussions_upserted; result.notes_upserted += disc_result.notes_upserted; result.notes_skipped_bad_timestamp += disc_result.notes_skipped_bad_timestamp; result.diffnotes_count += disc_result.diffnotes_count; if disc_result.pagination_succeeded { result.mrs_synced_discussions += 1; } } } // Step 4: Enqueue and drain resource events (if enabled) if config.sync.fetch_resource_events { let enqueued = enqueue_resource_events_for_entity_type(conn, project_id, "merge_request")?; if enqueued > 0 { debug!(enqueued, "Enqueued resource events jobs for MRs"); } let drain_result = drain_resource_events( conn, client, config, project_id, gitlab_project_id, &progress, ) .await?; result.resource_events_fetched = drain_result.fetched; result.resource_events_failed = drain_result.failed; } info!( mrs_fetched = result.mrs_fetched, mrs_upserted = result.mrs_upserted, labels_created = result.labels_created, discussions_fetched = result.discussions_fetched, notes_upserted = result.notes_upserted, diffnotes = result.diffnotes_count, mrs_synced = result.mrs_synced_discussions, mrs_skipped = result.mrs_skipped_discussion_sync, resource_events_fetched = result.resource_events_fetched, resource_events_failed = result.resource_events_failed, "MR project ingestion complete" ); tracing::Span::current().record("items_processed", result.mrs_upserted); tracing::Span::current().record("items_skipped", result.mrs_skipped_discussion_sync); tracing::Span::current().record("errors", result.resource_events_failed); Ok(result) } /// Sync discussions for MRs with parallel API prefetching. /// /// Pattern: Fetch discussions for multiple MRs in parallel, then write serially. /// This overlaps network I/O while respecting rusqlite's single-connection constraint. async fn sync_mr_discussions_sequential( conn: &Connection, client: &GitLabClient, config: &Config, gitlab_project_id: i64, local_project_id: i64, mrs: &[MrForDiscussionSync], progress: &Option, ) -> Result> { let batch_size = config.sync.dependent_concurrency as usize; let total = mrs.len(); let mut results = Vec::with_capacity(mrs.len()); let mut processed = 0; // Process in batches: parallel API fetch, serial DB write for chunk in mrs.chunks(batch_size) { // Step 1: Prefetch discussions for all MRs in this batch in parallel let prefetch_futures = chunk.iter().map(|mr| { prefetch_mr_discussions(client, gitlab_project_id, local_project_id, mr.clone()) }); let prefetched_batch = join_all(prefetch_futures).await; // Step 2: Write each prefetched result serially for prefetched in prefetched_batch { let disc_result = write_prefetched_mr_discussions(conn, config, local_project_id, prefetched)?; results.push(disc_result); processed += 1; // Emit progress if let Some(cb) = progress { cb(ProgressEvent::MrDiscussionSynced { current: processed, total, }); } } } Ok(results) } /// Result of draining the resource events queue. #[derive(Debug, Default)] pub struct DrainResult { pub fetched: usize, pub failed: usize, pub skipped_not_found: usize, } /// Enqueue resource_events jobs for all entities of a given type in a project. /// /// Uses the pending_dependent_fetches queue. Jobs are deduplicated by the UNIQUE /// constraint, so re-enqueueing the same entity is a no-op. fn enqueue_resource_events_for_entity_type( conn: &Connection, project_id: i64, entity_type: &str, ) -> Result { // Clean up obsolete jobs: remove resource_events jobs for entities whose // watermark is already current (updated_at <= resource_events_synced_for_updated_at). // These are leftover from prior runs that failed after watermark-stamping but // before job deletion, or from entities that no longer need syncing. // We intentionally keep jobs for entities that still need syncing (including // in-progress or failed-with-backoff jobs) to preserve retry state. match entity_type { "issue" => { conn.execute( "DELETE FROM pending_dependent_fetches \ WHERE project_id = ?1 AND entity_type = 'issue' AND job_type = 'resource_events' \ AND entity_local_id IN ( \ SELECT id FROM issues \ WHERE project_id = ?1 \ AND updated_at <= COALESCE(resource_events_synced_for_updated_at, 0) \ )", [project_id], )?; } "merge_request" => { conn.execute( "DELETE FROM pending_dependent_fetches \ WHERE project_id = ?1 AND entity_type = 'merge_request' AND job_type = 'resource_events' \ AND entity_local_id IN ( \ SELECT id FROM merge_requests \ WHERE project_id = ?1 \ AND updated_at <= COALESCE(resource_events_synced_for_updated_at, 0) \ )", [project_id], )?; } _ => {} } // Enqueue resource_events jobs only for entities whose updated_at exceeds // their last resource event sync watermark. // // Use separate hardcoded queries per entity type to avoid format!-based SQL. let entities: Vec<(i64, i64)> = match entity_type { "issue" => { let mut stmt = conn.prepare_cached( "SELECT id, iid FROM issues \ WHERE project_id = ?1 \ AND updated_at > COALESCE(resource_events_synced_for_updated_at, 0)", )?; stmt.query_map([project_id], |row| Ok((row.get(0)?, row.get(1)?)))? .collect::, _>>()? } "merge_request" => { let mut stmt = conn.prepare_cached( "SELECT id, iid FROM merge_requests \ WHERE project_id = ?1 \ AND updated_at > COALESCE(resource_events_synced_for_updated_at, 0)", )?; stmt.query_map([project_id], |row| Ok((row.get(0)?, row.get(1)?)))? .collect::, _>>()? } _ => return Ok(0), }; let mut enqueued = 0; for (local_id, iid) in &entities { if enqueue_job( conn, project_id, entity_type, *iid, *local_id, "resource_events", None, )? { enqueued += 1; } } Ok(enqueued) } /// Drain pending resource_events jobs: claim, fetch from GitLab, store, complete/fail. /// /// Processes jobs sequentially since `rusqlite::Connection` is not `Send`. /// Uses exponential backoff on failure via `fail_job`. #[instrument( skip(conn, client, config, progress), fields(project_id, gitlab_project_id, items_processed, errors) )] async fn drain_resource_events( conn: &Connection, client: &GitLabClient, config: &Config, project_id: i64, gitlab_project_id: i64, progress: &Option, ) -> Result { let mut result = DrainResult::default(); let batch_size = config.sync.dependent_concurrency as usize; // Reclaim stale locks from crashed processes let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?; if reclaimed > 0 { info!(reclaimed, "Reclaimed stale resource event locks"); } // Count only claimable jobs (unlocked, past retry backoff) for accurate progress. // Using count_pending_jobs here would inflate the total with locked/backing-off // jobs that can't be claimed in this drain run, causing the progress bar to // never reach 100%. let claimable_counts = count_claimable_jobs(conn, project_id)?; let total_pending = claimable_counts .get("resource_events") .copied() .unwrap_or(0); if total_pending == 0 { return Ok(result); } let emit = |event: ProgressEvent| { if let Some(cb) = progress { cb(event); } }; emit(ProgressEvent::ResourceEventsFetchStarted { total: total_pending, }); let mut processed = 0; let mut seen_job_ids = std::collections::HashSet::new(); loop { let jobs = claim_jobs(conn, "resource_events", project_id, batch_size)?; if jobs.is_empty() { break; } // Track whether any job in this batch was actually new. If every // claimed job was already seen, break to avoid an infinite loop // (can happen with clock skew or zero-backoff edge cases). let mut any_new_in_batch = false; for job in &jobs { // Guard against re-processing a job that was failed and re-claimed // within the same drain run. if !seen_job_ids.insert(job.id) { warn!( job_id = job.id, "Skipping already-processed job in same drain run" ); continue; } any_new_in_batch = true; match client .fetch_all_resource_events(gitlab_project_id, &job.entity_type, job.entity_iid) .await { Ok((state_events, label_events, milestone_events)) => { let store_result = store_resource_events( conn, job.project_id, &job.entity_type, job.entity_local_id, &state_events, &label_events, &milestone_events, ); match store_result { Ok(()) => { complete_job(conn, job.id)?; update_resource_event_watermark( conn, &job.entity_type, job.entity_local_id, )?; result.fetched += 1; } Err(e) => { warn!( entity_type = %job.entity_type, entity_iid = job.entity_iid, error = %e, "Failed to store resource events" ); fail_job(conn, job.id, &e.to_string())?; result.failed += 1; } } } Err(e) => { // Only 404 (not found) is truly permanent -- the resource // events endpoint doesn't exist for this entity. Stamp the // watermark so we skip it next run. All other errors // (403, auth, network) get backoff retry. if e.is_permanent_api_error() { debug!( entity_type = %job.entity_type, entity_iid = job.entity_iid, error = %e, "Permanent API error for resource events, marking complete" ); complete_job(conn, job.id)?; update_resource_event_watermark( conn, &job.entity_type, job.entity_local_id, )?; result.skipped_not_found += 1; } else { warn!( entity_type = %job.entity_type, entity_iid = job.entity_iid, error = %e, "Failed to fetch resource events from GitLab" ); fail_job(conn, job.id, &e.to_string())?; result.failed += 1; } } } processed += 1; emit(ProgressEvent::ResourceEventFetched { current: processed, total: total_pending, }); } // If every job in this batch was already seen, stop to prevent spinning. if !any_new_in_batch { warn!("All claimed jobs were already processed, breaking drain loop"); break; } } emit(ProgressEvent::ResourceEventsFetchComplete { fetched: result.fetched, failed: result.failed, }); if result.fetched > 0 || result.failed > 0 { info!( fetched = result.fetched, failed = result.failed, "Resource events drain complete" ); } tracing::Span::current().record("items_processed", result.fetched); tracing::Span::current().record("errors", result.failed); Ok(result) } /// Store fetched resource events in the database. /// /// Wraps all three event types in a single transaction for atomicity. fn store_resource_events( conn: &Connection, project_id: i64, entity_type: &str, entity_local_id: i64, state_events: &[crate::gitlab::types::GitLabStateEvent], label_events: &[crate::gitlab::types::GitLabLabelEvent], milestone_events: &[crate::gitlab::types::GitLabMilestoneEvent], ) -> Result<()> { let tx = conn.unchecked_transaction()?; if !state_events.is_empty() { crate::core::events_db::upsert_state_events( &tx, project_id, entity_type, entity_local_id, state_events, )?; } if !label_events.is_empty() { crate::core::events_db::upsert_label_events( &tx, project_id, entity_type, entity_local_id, label_events, )?; } if !milestone_events.is_empty() { crate::core::events_db::upsert_milestone_events( &tx, project_id, entity_type, entity_local_id, milestone_events, )?; } tx.commit()?; Ok(()) } /// Update the resource event watermark for an entity after successful event fetch. /// /// Sets `resource_events_synced_for_updated_at = updated_at` so the entity /// won't be re-enqueued until its `updated_at` advances again. fn update_resource_event_watermark( conn: &Connection, entity_type: &str, entity_local_id: i64, ) -> Result<()> { match entity_type { "issue" => { conn.execute( "UPDATE issues SET resource_events_synced_for_updated_at = updated_at WHERE id = ?", [entity_local_id], )?; } "merge_request" => { conn.execute( "UPDATE merge_requests SET resource_events_synced_for_updated_at = updated_at WHERE id = ?", [entity_local_id], )?; } _ => {} } Ok(()) } #[cfg(test)] mod tests { use super::*; #[test] fn result_default_has_zero_counts() { let result = IngestProjectResult::default(); assert_eq!(result.issues_fetched, 0); assert_eq!(result.issues_upserted, 0); assert_eq!(result.labels_created, 0); assert_eq!(result.discussions_fetched, 0); assert_eq!(result.notes_upserted, 0); assert_eq!(result.issues_synced_discussions, 0); assert_eq!(result.issues_skipped_discussion_sync, 0); assert_eq!(result.resource_events_fetched, 0); assert_eq!(result.resource_events_failed, 0); } #[test] fn mr_result_default_has_zero_counts() { let result = IngestMrProjectResult::default(); assert_eq!(result.mrs_fetched, 0); assert_eq!(result.mrs_upserted, 0); assert_eq!(result.labels_created, 0); assert_eq!(result.assignees_linked, 0); assert_eq!(result.reviewers_linked, 0); assert_eq!(result.discussions_fetched, 0); assert_eq!(result.discussions_upserted, 0); assert_eq!(result.notes_upserted, 0); assert_eq!(result.notes_skipped_bad_timestamp, 0); assert_eq!(result.diffnotes_count, 0); assert_eq!(result.mrs_synced_discussions, 0); assert_eq!(result.mrs_skipped_discussion_sync, 0); assert_eq!(result.resource_events_fetched, 0); assert_eq!(result.resource_events_failed, 0); } #[test] fn drain_result_default_has_zero_counts() { let result = DrainResult::default(); assert_eq!(result.fetched, 0); assert_eq!(result.failed, 0); assert_eq!(result.skipped_not_found, 0); } #[test] fn progress_event_resource_variants_exist() { // Verify the new progress event variants are constructible let _start = ProgressEvent::ResourceEventsFetchStarted { total: 10 }; let _progress = ProgressEvent::ResourceEventFetched { current: 5, total: 10, }; let _complete = ProgressEvent::ResourceEventsFetchComplete { fetched: 8, failed: 2, }; } }