use futures::future::join_all; use rusqlite::Connection; use tracing::{debug, instrument, warn}; use crate::Config; use crate::core::dependent_queue::{ claim_jobs, complete_job_tx, count_claimable_jobs, enqueue_job, fail_job, reclaim_stale_locks, }; use crate::core::error::Result; use crate::core::references::{ EntityReference, insert_entity_reference, resolve_issue_local_id, resolve_project_path, }; use crate::core::shutdown::ShutdownSignal; use crate::gitlab::GitLabClient; use super::discussions::ingest_issue_discussions; use super::issues::{IssueForDiscussionSync, ingest_issues}; use super::merge_requests::{ MrForDiscussionSync, get_mrs_needing_discussion_sync, ingest_merge_requests, }; use super::mr_discussions::{prefetch_mr_discussions, write_prefetched_mr_discussions}; pub type ProgressCallback = Box; #[derive(Debug, Clone)] pub enum ProgressEvent { IssuesFetchStarted, IssueFetched { count: usize }, IssuesFetchComplete { total: usize }, DiscussionSyncStarted { total: usize }, DiscussionSynced { current: usize, total: usize }, DiscussionSyncComplete, MrsFetchStarted, MrFetched { count: usize }, MrsFetchComplete { total: usize }, MrDiscussionSyncStarted { total: usize }, MrDiscussionSynced { current: usize, total: usize }, MrDiscussionSyncComplete, ResourceEventsFetchStarted { total: usize }, ResourceEventFetched { current: usize, total: usize }, ResourceEventsFetchComplete { fetched: usize, failed: usize }, ClosesIssuesFetchStarted { total: usize }, ClosesIssueFetched { current: usize, total: usize }, ClosesIssuesFetchComplete { fetched: usize, failed: usize }, MrDiffsFetchStarted { total: usize }, MrDiffFetched { current: usize, total: usize }, MrDiffsFetchComplete { fetched: usize, failed: usize }, StatusEnrichmentStarted { total: usize }, StatusEnrichmentPageFetched { items_so_far: usize }, StatusEnrichmentWriting { total: usize }, StatusEnrichmentComplete { enriched: usize, cleared: usize }, StatusEnrichmentSkipped, } #[derive(Debug, Default)] pub struct IngestProjectResult { pub issues_fetched: usize, pub issues_upserted: usize, pub labels_created: usize, pub discussions_fetched: usize, pub discussions_upserted: usize, pub notes_upserted: usize, pub issues_synced_discussions: usize, pub issues_skipped_discussion_sync: usize, pub resource_events_fetched: usize, pub resource_events_failed: usize, pub statuses_enriched: usize, pub statuses_cleared: usize, pub statuses_seen: usize, pub statuses_without_widget: usize, pub partial_error_count: usize, pub first_partial_error: Option, pub status_enrichment_error: Option, pub status_enrichment_mode: String, pub status_unsupported_reason: Option, } #[derive(Debug, Default)] pub struct IngestMrProjectResult { pub mrs_fetched: usize, pub mrs_upserted: usize, pub labels_created: usize, pub assignees_linked: usize, pub reviewers_linked: usize, pub discussions_fetched: usize, pub discussions_upserted: usize, pub notes_upserted: usize, pub notes_skipped_bad_timestamp: usize, pub diffnotes_count: usize, pub mrs_synced_discussions: usize, pub mrs_skipped_discussion_sync: usize, pub resource_events_fetched: usize, pub resource_events_failed: usize, pub closes_issues_fetched: usize, pub closes_issues_failed: usize, pub mr_diffs_fetched: usize, pub mr_diffs_failed: usize, } pub async fn ingest_project_issues( conn: &Connection, client: &GitLabClient, config: &Config, project_id: i64, gitlab_project_id: i64, ) -> Result { let signal = ShutdownSignal::new(); ingest_project_issues_with_progress( conn, client, config, project_id, gitlab_project_id, None, &signal, ) .await } #[instrument( skip(conn, client, config, progress, signal), fields(project_id, gitlab_project_id, items_processed, items_skipped, errors) )] pub async fn ingest_project_issues_with_progress( conn: &Connection, client: &GitLabClient, config: &Config, project_id: i64, gitlab_project_id: i64, progress: Option, signal: &ShutdownSignal, ) -> Result { let mut result = IngestProjectResult::default(); let emit = |event: ProgressEvent| { if let Some(ref cb) = progress { cb(event); } }; emit(ProgressEvent::IssuesFetchStarted); let issue_result = ingest_issues(conn, client, config, project_id, gitlab_project_id, signal).await?; result.issues_fetched = issue_result.fetched; result.issues_upserted = issue_result.upserted; result.labels_created = issue_result.labels_created; emit(ProgressEvent::IssuesFetchComplete { total: result.issues_fetched, }); // ── Phase 1.5: Status enrichment via GraphQL ────────────────────── if config.sync.fetch_work_item_status && !signal.is_cancelled() { use rusqlite::OptionalExtension; let issue_count: i64 = conn .query_row( "SELECT COUNT(*) FROM issues WHERE project_id = ?1", [project_id], |r| r.get(0), ) .unwrap_or(0); emit(ProgressEvent::StatusEnrichmentStarted { total: issue_count as usize, }); let project_path: Option = conn .query_row( "SELECT path_with_namespace FROM projects WHERE id = ?1", [project_id], |r| r.get(0), ) .optional()?; match project_path { None => { warn!("Cannot enrich statuses: project path not found for project_id={project_id}"); result.status_enrichment_error = Some("project_path_missing".into()); result.status_enrichment_mode = "fetched".into(); emit(ProgressEvent::StatusEnrichmentComplete { enriched: 0, cleared: 0, }); } Some(path) => { let graphql_client = client.graphql_client(); let page_cb = |items_so_far: usize| { emit(ProgressEvent::StatusEnrichmentPageFetched { items_so_far }); }; match crate::gitlab::graphql::fetch_issue_statuses_with_progress( &graphql_client, &path, Some(&page_cb), ) .await { Ok(fetch_result) => { if let Some(ref reason) = fetch_result.unsupported_reason { result.status_enrichment_mode = "unsupported".into(); result.status_unsupported_reason = Some(match reason { crate::gitlab::graphql::UnsupportedReason::GraphqlEndpointMissing => { "graphql_endpoint_missing".into() } crate::gitlab::graphql::UnsupportedReason::AuthForbidden => { "auth_forbidden".into() } }); } else { result.status_enrichment_mode = "fetched".into(); } result.statuses_seen = fetch_result.all_fetched_iids.len(); result.partial_error_count = fetch_result.partial_error_count; if fetch_result.first_partial_error.is_some() { result.first_partial_error = fetch_result.first_partial_error.clone(); } if signal.is_cancelled() { debug!("Shutdown requested after status fetch, skipping DB write"); emit(ProgressEvent::StatusEnrichmentComplete { enriched: 0, cleared: 0, }); } else { emit(ProgressEvent::StatusEnrichmentWriting { total: fetch_result.all_fetched_iids.len(), }); match enrich_issue_statuses_txn( conn, project_id, &fetch_result.statuses, &fetch_result.all_fetched_iids, ) { Ok((enriched, cleared)) => { result.statuses_enriched = enriched; result.statuses_cleared = cleared; result.statuses_without_widget = result .statuses_seen .saturating_sub(fetch_result.statuses.len()); debug!( seen = result.statuses_seen, enriched, cleared, without_widget = result.statuses_without_widget, "Status enrichment complete" ); } Err(e) => { warn!("Status enrichment DB write failed: {e}"); result.status_enrichment_error = Some(format!("db_write_failed: {e}")); } } emit(ProgressEvent::StatusEnrichmentComplete { enriched: result.statuses_enriched, cleared: result.statuses_cleared, }); } } Err(e) => { warn!("Status enrichment fetch failed: {e}"); result.status_enrichment_error = Some(e.to_string()); result.status_enrichment_mode = "fetched".into(); emit(ProgressEvent::StatusEnrichmentComplete { enriched: 0, cleared: 0, }); } } } } } else if !config.sync.fetch_work_item_status { result.status_enrichment_mode = "skipped".into(); emit(ProgressEvent::StatusEnrichmentSkipped); } let issues_needing_sync = issue_result.issues_needing_discussion_sync; let total_issues: i64 = conn.query_row( "SELECT COUNT(*) FROM issues WHERE project_id = ?", [project_id], |row| row.get(0), )?; let total_issues = total_issues as usize; result.issues_skipped_discussion_sync = total_issues.saturating_sub(issues_needing_sync.len()); if signal.is_cancelled() { debug!("Shutdown requested, returning partial issue results"); return Ok(result); } if issues_needing_sync.is_empty() { debug!("No issues need discussion sync"); } else { debug!( count = issues_needing_sync.len(), "Starting discussion sync for issues" ); emit(ProgressEvent::DiscussionSyncStarted { total: issues_needing_sync.len(), }); let discussion_results = sync_discussions_sequential( conn, client, config, gitlab_project_id, project_id, &issues_needing_sync, &progress, signal, ) .await?; emit(ProgressEvent::DiscussionSyncComplete); for disc_result in discussion_results { result.discussions_fetched += disc_result.discussions_fetched; result.discussions_upserted += disc_result.discussions_upserted; result.notes_upserted += disc_result.notes_upserted; result.issues_synced_discussions += 1; } } if signal.is_cancelled() { debug!("Shutdown requested, returning partial issue results"); return Ok(result); } if config.sync.fetch_resource_events { let enqueued = enqueue_resource_events_for_entity_type(conn, project_id, "issue")?; if enqueued > 0 { debug!(enqueued, "Enqueued resource events jobs for issues"); } let drain_result = drain_resource_events( conn, client, config, project_id, gitlab_project_id, &progress, signal, ) .await?; result.resource_events_fetched = drain_result.fetched; result.resource_events_failed = drain_result.failed; let refs_inserted = crate::core::references::extract_refs_from_state_events(conn, project_id)?; if refs_inserted > 0 { debug!( refs_inserted, "Extracted cross-references from state events" ); } } debug!( summary = crate::ingestion::nonzero_summary(&[ ("fetched", result.issues_fetched), ("upserted", result.issues_upserted), ("labels", result.labels_created), ("discussions", result.discussions_fetched), ("notes", result.notes_upserted), ("synced", result.issues_synced_discussions), ("skipped", result.issues_skipped_discussion_sync), ("events", result.resource_events_fetched), ("event errors", result.resource_events_failed), ]), "Project complete" ); tracing::Span::current().record("items_processed", result.issues_upserted); tracing::Span::current().record("items_skipped", result.issues_skipped_discussion_sync); tracing::Span::current().record("errors", result.resource_events_failed); Ok(result) } fn enrich_issue_statuses_txn( conn: &Connection, project_id: i64, statuses: &std::collections::HashMap, all_fetched_iids: &std::collections::HashSet, ) -> std::result::Result<(usize, usize), rusqlite::Error> { let now_ms = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_millis() as i64; let tx = conn.unchecked_transaction()?; let mut cleared = 0usize; let mut enriched = 0usize; // Phase 1: Clear stale statuses (fetched but no status widget) { let mut clear_stmt = tx.prepare_cached( "UPDATE issues SET status_name = NULL, status_category = NULL, status_color = NULL, status_icon_name = NULL, status_synced_at = ?3 WHERE project_id = ?1 AND iid = ?2 AND status_name IS NOT NULL", )?; for iid in all_fetched_iids { if !statuses.contains_key(iid) { let rows = clear_stmt.execute(rusqlite::params![project_id, iid, now_ms])?; if rows > 0 { cleared += 1; } } } } // Phase 2: Apply new/updated statuses (only write when values actually differ) { let mut update_stmt = tx.prepare_cached( "UPDATE issues SET status_name = ?1, status_category = ?2, status_color = ?3, status_icon_name = ?4, status_synced_at = ?5 WHERE project_id = ?6 AND iid = ?7 AND (status_name IS NOT ?1 OR status_category IS NOT ?2 OR status_color IS NOT ?3 OR status_icon_name IS NOT ?4)", )?; for (iid, status) in statuses { let rows = update_stmt.execute(rusqlite::params![ &status.name, &status.category, &status.color, &status.icon_name, now_ms, project_id, iid, ])?; if rows > 0 { enriched += 1; } } // Update synced_at timestamp for unchanged rows too let mut touch_stmt = tx.prepare_cached( "UPDATE issues SET status_synced_at = ?1 WHERE project_id = ?2 AND iid = ?3 AND status_synced_at IS NOT ?1", )?; for iid in statuses.keys() { touch_stmt.execute(rusqlite::params![now_ms, project_id, iid])?; } } tx.commit()?; Ok((enriched, cleared)) } #[allow(clippy::too_many_arguments)] async fn sync_discussions_sequential( conn: &Connection, client: &GitLabClient, config: &Config, gitlab_project_id: i64, local_project_id: i64, issues: &[IssueForDiscussionSync], progress: &Option, signal: &ShutdownSignal, ) -> Result> { let batch_size = config.sync.dependent_concurrency as usize; let total = issues.len(); let mut results = Vec::with_capacity(issues.len()); for chunk in issues.chunks(batch_size) { if signal.is_cancelled() { debug!("Shutdown requested during discussion sync, returning partial results"); break; } for issue in chunk { let disc_result = ingest_issue_discussions( conn, client, config, gitlab_project_id, local_project_id, std::slice::from_ref(issue), ) .await?; results.push(disc_result); if let Some(cb) = progress { cb(ProgressEvent::DiscussionSynced { current: results.len(), total, }); } } } Ok(results) } pub async fn ingest_project_merge_requests( conn: &Connection, client: &GitLabClient, config: &Config, project_id: i64, gitlab_project_id: i64, full_sync: bool, ) -> Result { let signal = ShutdownSignal::new(); ingest_project_merge_requests_with_progress( conn, client, config, project_id, gitlab_project_id, full_sync, None, &signal, ) .await } #[instrument( skip(conn, client, config, progress, signal), fields(project_id, gitlab_project_id, items_processed, items_skipped, errors) )] #[allow(clippy::too_many_arguments)] pub async fn ingest_project_merge_requests_with_progress( conn: &Connection, client: &GitLabClient, config: &Config, project_id: i64, gitlab_project_id: i64, full_sync: bool, progress: Option, signal: &ShutdownSignal, ) -> Result { let mut result = IngestMrProjectResult::default(); let emit = |event: ProgressEvent| { if let Some(ref cb) = progress { cb(event); } }; emit(ProgressEvent::MrsFetchStarted); let mr_result = ingest_merge_requests( conn, client, config, project_id, gitlab_project_id, full_sync, signal, ) .await?; result.mrs_fetched = mr_result.fetched; result.mrs_upserted = mr_result.upserted; result.labels_created = mr_result.labels_created; result.assignees_linked = mr_result.assignees_linked; result.reviewers_linked = mr_result.reviewers_linked; emit(ProgressEvent::MrsFetchComplete { total: result.mrs_fetched, }); let mrs_needing_sync = get_mrs_needing_discussion_sync(conn, project_id)?; let total_mrs: i64 = conn.query_row( "SELECT COUNT(*) FROM merge_requests WHERE project_id = ?", [project_id], |row| row.get(0), )?; let total_mrs = total_mrs as usize; result.mrs_skipped_discussion_sync = total_mrs.saturating_sub(mrs_needing_sync.len()); if signal.is_cancelled() { debug!("Shutdown requested, returning partial MR results"); return Ok(result); } if mrs_needing_sync.is_empty() { debug!("No MRs need discussion sync"); } else { debug!( count = mrs_needing_sync.len(), "Starting discussion sync for MRs" ); emit(ProgressEvent::MrDiscussionSyncStarted { total: mrs_needing_sync.len(), }); let discussion_results = sync_mr_discussions_sequential( conn, client, config, gitlab_project_id, project_id, &mrs_needing_sync, &progress, signal, ) .await?; emit(ProgressEvent::MrDiscussionSyncComplete); for disc_result in discussion_results { result.discussions_fetched += disc_result.discussions_fetched; result.discussions_upserted += disc_result.discussions_upserted; result.notes_upserted += disc_result.notes_upserted; result.notes_skipped_bad_timestamp += disc_result.notes_skipped_bad_timestamp; result.diffnotes_count += disc_result.diffnotes_count; if disc_result.pagination_succeeded { result.mrs_synced_discussions += 1; } } } if signal.is_cancelled() { debug!("Shutdown requested, returning partial MR results"); return Ok(result); } if config.sync.fetch_resource_events { let enqueued = enqueue_resource_events_for_entity_type(conn, project_id, "merge_request")?; if enqueued > 0 { debug!(enqueued, "Enqueued resource events jobs for MRs"); } let drain_result = drain_resource_events( conn, client, config, project_id, gitlab_project_id, &progress, signal, ) .await?; result.resource_events_fetched = drain_result.fetched; result.resource_events_failed = drain_result.failed; let refs_inserted = crate::core::references::extract_refs_from_state_events(conn, project_id)?; if refs_inserted > 0 { debug!( refs_inserted, "Extracted cross-references from state events" ); } } if signal.is_cancelled() { debug!("Shutdown requested, returning partial MR results"); return Ok(result); } let note_refs = crate::core::note_parser::extract_refs_from_system_notes(conn, project_id)?; if note_refs.inserted > 0 || note_refs.skipped_unresolvable > 0 { debug!( inserted = note_refs.inserted, unresolvable = note_refs.skipped_unresolvable, parse_failures = note_refs.parse_failures, "Extracted cross-references from system notes (MRs)" ); } let desc_refs = crate::core::note_parser::extract_refs_from_descriptions(conn, project_id)?; if desc_refs.inserted > 0 || desc_refs.skipped_unresolvable > 0 { debug!( inserted = desc_refs.inserted, unresolvable = desc_refs.skipped_unresolvable, "Extracted cross-references from descriptions" ); } let user_note_refs = crate::core::note_parser::extract_refs_from_user_notes(conn, project_id)?; if user_note_refs.inserted > 0 || user_note_refs.skipped_unresolvable > 0 { debug!( inserted = user_note_refs.inserted, unresolvable = user_note_refs.skipped_unresolvable, "Extracted cross-references from user notes" ); } { let enqueued = enqueue_mr_closes_issues_jobs(conn, project_id)?; if enqueued > 0 { debug!(enqueued, "Enqueued mr_closes_issues jobs"); } let closes_result = drain_mr_closes_issues( conn, client, config, project_id, gitlab_project_id, &progress, signal, ) .await?; result.closes_issues_fetched = closes_result.fetched; result.closes_issues_failed = closes_result.failed; } if signal.is_cancelled() { debug!("Shutdown requested, returning partial MR results"); return Ok(result); } if config.sync.fetch_mr_file_changes { let enqueued = enqueue_mr_diffs_jobs(conn, project_id)?; if enqueued > 0 { debug!(enqueued, "Enqueued mr_diffs jobs"); } let diffs_result = drain_mr_diffs( conn, client, config, project_id, gitlab_project_id, &progress, signal, ) .await?; result.mr_diffs_fetched = diffs_result.fetched; result.mr_diffs_failed = diffs_result.failed; } debug!( summary = crate::ingestion::nonzero_summary(&[ ("fetched", result.mrs_fetched), ("upserted", result.mrs_upserted), ("labels", result.labels_created), ("discussions", result.discussions_fetched), ("notes", result.notes_upserted), ("diffnotes", result.diffnotes_count), ("synced", result.mrs_synced_discussions), ("skipped", result.mrs_skipped_discussion_sync), ("events", result.resource_events_fetched), ("event errors", result.resource_events_failed), ("closes", result.closes_issues_fetched), ("close errors", result.closes_issues_failed), ("diffs", result.mr_diffs_fetched), ("diff errors", result.mr_diffs_failed), ]), "MR project complete" ); tracing::Span::current().record("items_processed", result.mrs_upserted); tracing::Span::current().record("items_skipped", result.mrs_skipped_discussion_sync); tracing::Span::current().record( "errors", result.resource_events_failed + result.closes_issues_failed + result.mr_diffs_failed, ); Ok(result) } #[allow(clippy::too_many_arguments)] async fn sync_mr_discussions_sequential( conn: &Connection, client: &GitLabClient, config: &Config, gitlab_project_id: i64, local_project_id: i64, mrs: &[MrForDiscussionSync], progress: &Option, signal: &ShutdownSignal, ) -> Result> { let batch_size = config.sync.dependent_concurrency as usize; let total = mrs.len(); let mut results = Vec::with_capacity(mrs.len()); let mut processed = 0; for chunk in mrs.chunks(batch_size) { if signal.is_cancelled() { debug!("Shutdown requested during MR discussion sync, returning partial results"); break; } let prefetch_futures = chunk.iter().map(|mr| { prefetch_mr_discussions(client, gitlab_project_id, local_project_id, mr.clone()) }); let prefetched_batch = join_all(prefetch_futures).await; for prefetched in prefetched_batch { let disc_result = write_prefetched_mr_discussions(conn, config, local_project_id, prefetched)?; results.push(disc_result); processed += 1; if let Some(cb) = progress { cb(ProgressEvent::MrDiscussionSynced { current: processed, total, }); } } } Ok(results) } #[derive(Debug, Default)] pub struct DrainResult { pub fetched: usize, pub failed: usize, pub skipped_not_found: usize, } fn enqueue_resource_events_for_entity_type( conn: &Connection, project_id: i64, entity_type: &str, ) -> Result { match entity_type { "issue" => { conn.execute( "DELETE FROM pending_dependent_fetches \ WHERE project_id = ?1 AND entity_type = 'issue' AND job_type = 'resource_events' \ AND entity_local_id IN ( \ SELECT id FROM issues \ WHERE project_id = ?1 \ AND updated_at <= COALESCE(resource_events_synced_for_updated_at, 0) \ )", [project_id], )?; } "merge_request" => { conn.execute( "DELETE FROM pending_dependent_fetches \ WHERE project_id = ?1 AND entity_type = 'merge_request' AND job_type = 'resource_events' \ AND entity_local_id IN ( \ SELECT id FROM merge_requests \ WHERE project_id = ?1 \ AND updated_at <= COALESCE(resource_events_synced_for_updated_at, 0) \ )", [project_id], )?; } other => { warn!( entity_type = other, "Unknown entity_type in enqueue_resource_events, skipping stale job cleanup" ); } } let entities: Vec<(i64, i64)> = match entity_type { "issue" => { let mut stmt = conn.prepare_cached( "SELECT id, iid FROM issues \ WHERE project_id = ?1 \ AND updated_at > COALESCE(resource_events_synced_for_updated_at, 0)", )?; stmt.query_map([project_id], |row| Ok((row.get(0)?, row.get(1)?)))? .collect::, _>>()? } "merge_request" => { let mut stmt = conn.prepare_cached( "SELECT id, iid FROM merge_requests \ WHERE project_id = ?1 \ AND updated_at > COALESCE(resource_events_synced_for_updated_at, 0)", )?; stmt.query_map([project_id], |row| Ok((row.get(0)?, row.get(1)?)))? .collect::, _>>()? } _ => return Ok(0), }; let mut enqueued = 0; for (local_id, iid) in &entities { if enqueue_job( conn, project_id, entity_type, *iid, *local_id, "resource_events", None, )? { enqueued += 1; } } Ok(enqueued) } /// Result of a concurrent HTTP prefetch for resource events. #[allow(clippy::type_complexity)] struct PrefetchedResourceEvents { job_id: i64, project_id: i64, entity_type: String, entity_iid: i64, entity_local_id: i64, result: std::result::Result< ( Vec, Vec, Vec, ), crate::core::error::LoreError, >, } async fn prefetch_resource_events( client: &GitLabClient, gitlab_project_id: i64, job_id: i64, project_id: i64, entity_type: String, entity_iid: i64, entity_local_id: i64, ) -> PrefetchedResourceEvents { let result = client .fetch_all_resource_events(gitlab_project_id, &entity_type, entity_iid) .await; PrefetchedResourceEvents { job_id, project_id, entity_type, entity_iid, entity_local_id, result, } } #[instrument( skip(conn, client, config, progress, signal), fields(project_id, gitlab_project_id, items_processed, errors) )] async fn drain_resource_events( conn: &Connection, client: &GitLabClient, config: &Config, project_id: i64, gitlab_project_id: i64, progress: &Option, signal: &ShutdownSignal, ) -> Result { let mut result = DrainResult::default(); let batch_size = config.sync.dependent_concurrency as usize; let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?; if reclaimed > 0 { debug!(reclaimed, "Reclaimed stale resource event locks"); } let claimable_counts = count_claimable_jobs(conn, project_id)?; let total_pending = claimable_counts .get("resource_events") .copied() .unwrap_or(0); if total_pending == 0 { return Ok(result); } let emit = |event: ProgressEvent| { if let Some(cb) = progress { cb(event); } }; emit(ProgressEvent::ResourceEventsFetchStarted { total: total_pending, }); let mut processed = 0; let mut seen_job_ids = std::collections::HashSet::new(); loop { if signal.is_cancelled() { debug!("Shutdown requested during resource events drain, returning partial results"); break; } let jobs = claim_jobs(conn, "resource_events", project_id, batch_size)?; if jobs.is_empty() { break; } // Phase 1: Concurrent HTTP fetches let futures: Vec<_> = jobs .iter() .filter(|j| seen_job_ids.insert(j.id)) .map(|j| { prefetch_resource_events( client, gitlab_project_id, j.id, j.project_id, j.entity_type.clone(), j.entity_iid, j.entity_local_id, ) }) .collect(); if futures.is_empty() { warn!("All claimed jobs were already processed, breaking drain loop"); break; } let prefetched = join_all(futures).await; // Phase 2: Serial DB writes for p in prefetched { match p.result { Ok((state_events, label_events, milestone_events)) => { let tx = conn.unchecked_transaction()?; let store_result = store_resource_events( &tx, p.project_id, &p.entity_type, p.entity_local_id, &state_events, &label_events, &milestone_events, ); match store_result { Ok(()) => { complete_job_tx(&tx, p.job_id)?; update_resource_event_watermark_tx( &tx, &p.entity_type, p.entity_local_id, )?; tx.commit()?; result.fetched += 1; } Err(e) => { drop(tx); warn!( entity_type = %p.entity_type, entity_iid = p.entity_iid, error = %e, "Failed to store resource events" ); fail_job(conn, p.job_id, &e.to_string())?; result.failed += 1; } } } Err(e) => { if e.is_permanent_api_error() { debug!( entity_type = %p.entity_type, entity_iid = p.entity_iid, error = %e, "Permanent API error for resource events, marking complete" ); let tx = conn.unchecked_transaction()?; complete_job_tx(&tx, p.job_id)?; update_resource_event_watermark_tx(&tx, &p.entity_type, p.entity_local_id)?; tx.commit()?; result.skipped_not_found += 1; } else { warn!( entity_type = %p.entity_type, entity_iid = p.entity_iid, error = %e, "Failed to fetch resource events from GitLab" ); fail_job(conn, p.job_id, &e.to_string())?; result.failed += 1; } } } processed += 1; emit(ProgressEvent::ResourceEventFetched { current: processed, total: total_pending, }); } } emit(ProgressEvent::ResourceEventsFetchComplete { fetched: result.fetched, failed: result.failed, }); if result.fetched > 0 || result.failed > 0 { debug!( fetched = result.fetched, failed = result.failed, "Resource events drain complete" ); } tracing::Span::current().record("items_processed", result.fetched); tracing::Span::current().record("errors", result.failed); Ok(result) } /// Store resource events using the provided connection (caller manages the transaction). pub(crate) fn store_resource_events( conn: &Connection, project_id: i64, entity_type: &str, entity_local_id: i64, state_events: &[crate::gitlab::types::GitLabStateEvent], label_events: &[crate::gitlab::types::GitLabLabelEvent], milestone_events: &[crate::gitlab::types::GitLabMilestoneEvent], ) -> Result<()> { if !state_events.is_empty() { crate::core::events_db::upsert_state_events( conn, project_id, entity_type, entity_local_id, state_events, )?; } if !label_events.is_empty() { crate::core::events_db::upsert_label_events( conn, project_id, entity_type, entity_local_id, label_events, )?; } if !milestone_events.is_empty() { crate::core::events_db::upsert_milestone_events( conn, project_id, entity_type, entity_local_id, milestone_events, )?; } Ok(()) } fn update_closes_issues_watermark_tx( tx: &rusqlite::Transaction<'_>, mr_local_id: i64, ) -> Result<()> { tx.execute( "UPDATE merge_requests SET closes_issues_synced_for_updated_at = updated_at WHERE id = ?", [mr_local_id], )?; Ok(()) } fn update_resource_event_watermark_tx( tx: &rusqlite::Transaction<'_>, entity_type: &str, entity_local_id: i64, ) -> Result<()> { match entity_type { "issue" => { tx.execute( "UPDATE issues SET resource_events_synced_for_updated_at = updated_at WHERE id = ?", [entity_local_id], )?; } "merge_request" => { tx.execute( "UPDATE merge_requests SET resource_events_synced_for_updated_at = updated_at WHERE id = ?", [entity_local_id], )?; } other => { warn!( entity_type = other, "Unknown entity_type in watermark update, skipping" ); } } Ok(()) } fn enqueue_mr_closes_issues_jobs(conn: &Connection, project_id: i64) -> Result { // Remove stale jobs for MRs that haven't changed since their last closes_issues sync conn.execute( "DELETE FROM pending_dependent_fetches \ WHERE project_id = ?1 AND entity_type = 'merge_request' AND job_type = 'mr_closes_issues' \ AND entity_local_id IN ( \ SELECT id FROM merge_requests \ WHERE project_id = ?1 \ AND updated_at <= COALESCE(closes_issues_synced_for_updated_at, 0) \ )", [project_id], )?; let mut stmt = conn.prepare_cached( "SELECT id, iid FROM merge_requests \ WHERE project_id = ?1 \ AND updated_at > COALESCE(closes_issues_synced_for_updated_at, 0)", )?; let entities: Vec<(i64, i64)> = stmt .query_map([project_id], |row| Ok((row.get(0)?, row.get(1)?)))? .collect::, _>>()?; let mut enqueued = 0; for (local_id, iid) in &entities { if enqueue_job( conn, project_id, "merge_request", *iid, *local_id, "mr_closes_issues", None, )? { enqueued += 1; } } Ok(enqueued) } /// Result of a concurrent HTTP prefetch for closes-issues references. struct PrefetchedClosesIssues { job_id: i64, entity_iid: i64, entity_local_id: i64, result: std::result::Result< Vec, crate::core::error::LoreError, >, } async fn prefetch_closes_issues( client: &GitLabClient, gitlab_project_id: i64, job_id: i64, entity_iid: i64, entity_local_id: i64, ) -> PrefetchedClosesIssues { let result = client .fetch_mr_closes_issues(gitlab_project_id, entity_iid) .await; PrefetchedClosesIssues { job_id, entity_iid, entity_local_id, result, } } #[instrument( skip(conn, client, config, progress, signal), fields(project_id, gitlab_project_id, items_processed, errors) )] async fn drain_mr_closes_issues( conn: &Connection, client: &GitLabClient, config: &Config, project_id: i64, gitlab_project_id: i64, progress: &Option, signal: &ShutdownSignal, ) -> Result { let mut result = DrainResult::default(); let batch_size = config.sync.dependent_concurrency as usize; let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?; if reclaimed > 0 { debug!(reclaimed, "Reclaimed stale mr_closes_issues locks"); } let claimable_counts = count_claimable_jobs(conn, project_id)?; let total_pending = claimable_counts .get("mr_closes_issues") .copied() .unwrap_or(0); if total_pending == 0 { return Ok(result); } let emit = |event: ProgressEvent| { if let Some(cb) = progress { cb(event); } }; emit(ProgressEvent::ClosesIssuesFetchStarted { total: total_pending, }); let mut processed = 0; let mut seen_job_ids = std::collections::HashSet::new(); loop { if signal.is_cancelled() { debug!("Shutdown requested during closes_issues drain, returning partial results"); break; } let jobs = claim_jobs(conn, "mr_closes_issues", project_id, batch_size)?; if jobs.is_empty() { break; } // Phase 1: Concurrent HTTP fetches let futures: Vec<_> = jobs .iter() .filter(|j| seen_job_ids.insert(j.id)) .map(|j| { prefetch_closes_issues( client, gitlab_project_id, j.id, j.entity_iid, j.entity_local_id, ) }) .collect(); if futures.is_empty() { warn!("All claimed mr_closes_issues jobs were already processed, breaking drain loop"); break; } let prefetched = join_all(futures).await; // Phase 2: Serial DB writes for p in prefetched { match p.result { Ok(closes_issues) => { let tx = conn.unchecked_transaction()?; let store_result = store_closes_issues_refs( &tx, project_id, p.entity_local_id, &closes_issues, ); match store_result { Ok(()) => { complete_job_tx(&tx, p.job_id)?; update_closes_issues_watermark_tx(&tx, p.entity_local_id)?; tx.commit()?; result.fetched += 1; } Err(e) => { drop(tx); warn!( entity_iid = p.entity_iid, error = %e, "Failed to store closes_issues references" ); fail_job(conn, p.job_id, &e.to_string())?; result.failed += 1; } } } Err(e) => { if e.is_permanent_api_error() { debug!( entity_iid = p.entity_iid, error = %e, "Permanent API error for closes_issues, marking complete" ); let tx = conn.unchecked_transaction()?; complete_job_tx(&tx, p.job_id)?; update_closes_issues_watermark_tx(&tx, p.entity_local_id)?; tx.commit()?; result.skipped_not_found += 1; } else { warn!( entity_iid = p.entity_iid, error = %e, "Failed to fetch closes_issues from GitLab" ); fail_job(conn, p.job_id, &e.to_string())?; result.failed += 1; } } } processed += 1; emit(ProgressEvent::ClosesIssueFetched { current: processed, total: total_pending, }); } } emit(ProgressEvent::ClosesIssuesFetchComplete { fetched: result.fetched, failed: result.failed, }); if result.fetched > 0 || result.failed > 0 { debug!( fetched = result.fetched, failed = result.failed, "mr_closes_issues drain complete" ); } tracing::Span::current().record("items_processed", result.fetched); tracing::Span::current().record("errors", result.failed); Ok(result) } pub(crate) fn store_closes_issues_refs( conn: &Connection, project_id: i64, mr_local_id: i64, closes_issues: &[crate::gitlab::types::GitLabIssueRef], ) -> Result<()> { for issue_ref in closes_issues { let target_local_id = resolve_issue_local_id(conn, project_id, issue_ref.iid)?; let (target_id, target_path, target_iid) = if let Some(local_id) = target_local_id { (Some(local_id), None, None) } else { let path = resolve_project_path(conn, issue_ref.project_id)?; let fallback = path.unwrap_or_else(|| format!("gitlab_project:{}", issue_ref.project_id)); (None, Some(fallback), Some(issue_ref.iid)) }; let ref_ = EntityReference { project_id, source_entity_type: "merge_request", source_entity_id: mr_local_id, target_entity_type: "issue", target_entity_id: target_id, target_project_path: target_path.as_deref(), target_entity_iid: target_iid, reference_type: "closes", source_method: "api", }; insert_entity_reference(conn, &ref_)?; } Ok(()) } // ─── MR Diffs (file changes) ──────────────────────────────────────────────── fn enqueue_mr_diffs_jobs(conn: &Connection, project_id: i64) -> Result { // Remove stale jobs for MRs that haven't changed since their last diffs sync conn.execute( "DELETE FROM pending_dependent_fetches \ WHERE project_id = ?1 AND entity_type = 'merge_request' AND job_type = 'mr_diffs' \ AND entity_local_id IN ( \ SELECT id FROM merge_requests \ WHERE project_id = ?1 \ AND updated_at <= COALESCE(diffs_synced_for_updated_at, 0) \ )", [project_id], )?; let mut stmt = conn.prepare_cached( "SELECT id, iid FROM merge_requests \ WHERE project_id = ?1 \ AND updated_at > COALESCE(diffs_synced_for_updated_at, 0)", )?; let entities: Vec<(i64, i64)> = stmt .query_map([project_id], |row| Ok((row.get(0)?, row.get(1)?)))? .collect::, _>>()?; let mut enqueued = 0; for (local_id, iid) in &entities { if enqueue_job( conn, project_id, "merge_request", *iid, *local_id, "mr_diffs", None, )? { enqueued += 1; } } Ok(enqueued) } struct PrefetchedMrDiffs { job_id: i64, entity_iid: i64, entity_local_id: i64, result: std::result::Result, crate::core::error::LoreError>, } async fn prefetch_mr_diffs( client: &GitLabClient, gitlab_project_id: i64, job_id: i64, entity_iid: i64, entity_local_id: i64, ) -> PrefetchedMrDiffs { let result = client.fetch_mr_diffs(gitlab_project_id, entity_iid).await; PrefetchedMrDiffs { job_id, entity_iid, entity_local_id, result, } } #[instrument( skip(conn, client, config, progress, signal), fields(project_id, gitlab_project_id, items_processed, errors) )] async fn drain_mr_diffs( conn: &Connection, client: &GitLabClient, config: &Config, project_id: i64, gitlab_project_id: i64, progress: &Option, signal: &ShutdownSignal, ) -> Result { let mut result = DrainResult::default(); let batch_size = config.sync.dependent_concurrency as usize; let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?; if reclaimed > 0 { debug!(reclaimed, "Reclaimed stale mr_diffs locks"); } let claimable_counts = count_claimable_jobs(conn, project_id)?; let total_pending = claimable_counts.get("mr_diffs").copied().unwrap_or(0); if total_pending == 0 { return Ok(result); } let emit = |event: ProgressEvent| { if let Some(cb) = progress { cb(event); } }; emit(ProgressEvent::MrDiffsFetchStarted { total: total_pending, }); let mut processed = 0; let mut seen_job_ids = std::collections::HashSet::new(); loop { if signal.is_cancelled() { debug!("Shutdown requested during mr_diffs drain, returning partial results"); break; } let jobs = claim_jobs(conn, "mr_diffs", project_id, batch_size)?; if jobs.is_empty() { break; } // Phase 1: Concurrent HTTP fetches let futures: Vec<_> = jobs .iter() .filter(|j| seen_job_ids.insert(j.id)) .map(|j| { prefetch_mr_diffs( client, gitlab_project_id, j.id, j.entity_iid, j.entity_local_id, ) }) .collect(); if futures.is_empty() { warn!("All claimed mr_diffs jobs were already processed, breaking drain loop"); break; } let prefetched = join_all(futures).await; // Phase 2: Serial DB writes for p in prefetched { match p.result { Ok(diffs) => { let tx = conn.unchecked_transaction()?; let store_result = super::mr_diffs::upsert_mr_file_changes( &tx, p.entity_local_id, project_id, &diffs, ); match store_result { Ok(_) => { complete_job_tx(&tx, p.job_id)?; update_diffs_watermark_tx(&tx, p.entity_local_id)?; tx.commit()?; result.fetched += 1; } Err(e) => { drop(tx); warn!( entity_iid = p.entity_iid, error = %e, "Failed to store MR file changes" ); fail_job(conn, p.job_id, &e.to_string())?; result.failed += 1; } } } Err(e) => { if e.is_permanent_api_error() { debug!( entity_iid = p.entity_iid, error = %e, "Permanent API error for mr_diffs, marking complete" ); let tx = conn.unchecked_transaction()?; complete_job_tx(&tx, p.job_id)?; update_diffs_watermark_tx(&tx, p.entity_local_id)?; tx.commit()?; result.skipped_not_found += 1; } else { warn!( entity_iid = p.entity_iid, error = %e, "Failed to fetch MR diffs from GitLab" ); fail_job(conn, p.job_id, &e.to_string())?; result.failed += 1; } } } processed += 1; emit(ProgressEvent::MrDiffFetched { current: processed, total: total_pending, }); } } emit(ProgressEvent::MrDiffsFetchComplete { fetched: result.fetched, failed: result.failed, }); if result.fetched > 0 || result.failed > 0 { debug!( fetched = result.fetched, failed = result.failed, "mr_diffs drain complete" ); } tracing::Span::current().record("items_processed", result.fetched); tracing::Span::current().record("errors", result.failed); Ok(result) } fn update_diffs_watermark_tx(tx: &rusqlite::Transaction<'_>, mr_local_id: i64) -> Result<()> { tx.execute( "UPDATE merge_requests SET diffs_synced_for_updated_at = updated_at WHERE id = ?", [mr_local_id], )?; Ok(()) } #[cfg(test)] mod tests { use super::*; #[test] fn result_default_has_zero_counts() { let result = IngestProjectResult::default(); assert_eq!(result.issues_fetched, 0); assert_eq!(result.issues_upserted, 0); assert_eq!(result.labels_created, 0); assert_eq!(result.discussions_fetched, 0); assert_eq!(result.notes_upserted, 0); assert_eq!(result.issues_synced_discussions, 0); assert_eq!(result.issues_skipped_discussion_sync, 0); assert_eq!(result.resource_events_fetched, 0); assert_eq!(result.resource_events_failed, 0); assert_eq!(result.statuses_enriched, 0); assert_eq!(result.statuses_cleared, 0); assert_eq!(result.statuses_seen, 0); assert_eq!(result.statuses_without_widget, 0); assert_eq!(result.partial_error_count, 0); assert!(result.first_partial_error.is_none()); assert!(result.status_enrichment_error.is_none()); assert!(result.status_enrichment_mode.is_empty()); assert!(result.status_unsupported_reason.is_none()); } #[test] fn mr_result_default_has_zero_counts() { let result = IngestMrProjectResult::default(); assert_eq!(result.mrs_fetched, 0); assert_eq!(result.mrs_upserted, 0); assert_eq!(result.labels_created, 0); assert_eq!(result.assignees_linked, 0); assert_eq!(result.reviewers_linked, 0); assert_eq!(result.discussions_fetched, 0); assert_eq!(result.discussions_upserted, 0); assert_eq!(result.notes_upserted, 0); assert_eq!(result.notes_skipped_bad_timestamp, 0); assert_eq!(result.diffnotes_count, 0); assert_eq!(result.mrs_synced_discussions, 0); assert_eq!(result.mrs_skipped_discussion_sync, 0); assert_eq!(result.resource_events_fetched, 0); assert_eq!(result.resource_events_failed, 0); } #[test] fn drain_result_default_has_zero_counts() { let result = DrainResult::default(); assert_eq!(result.fetched, 0); assert_eq!(result.failed, 0); assert_eq!(result.skipped_not_found, 0); } #[test] fn progress_event_resource_variants_exist() { let _start = ProgressEvent::ResourceEventsFetchStarted { total: 10 }; let _progress = ProgressEvent::ResourceEventFetched { current: 5, total: 10, }; let _complete = ProgressEvent::ResourceEventsFetchComplete { fetched: 8, failed: 2, }; let _status_complete = ProgressEvent::StatusEnrichmentComplete { enriched: 5, cleared: 1, }; let _status_skipped = ProgressEvent::StatusEnrichmentSkipped; } }