use futures::stream::StreamExt; use rusqlite::Connection; use rusqlite::OptionalExtension; use tracing::{debug, warn}; use crate::Config; use crate::core::error::{LoreError, Result}; use crate::documents::SourceType; use crate::gitlab::GitLabClient; use crate::gitlab::types::{GitLabIssue, GitLabMergeRequest}; use crate::ingestion::dirty_tracker; use crate::ingestion::discussions::ingest_issue_discussions; use crate::ingestion::issues::{IssueForDiscussionSync, process_single_issue}; use crate::ingestion::merge_requests::{MrForDiscussionSync, process_single_mr}; use crate::ingestion::mr_diffs::upsert_mr_file_changes; use crate::ingestion::mr_discussions::ingest_mr_discussions; use crate::ingestion::orchestrator::{store_closes_issues_refs, store_resource_events}; // --------------------------------------------------------------------------- // Result types // --------------------------------------------------------------------------- #[derive(Debug)] pub(crate) struct IngestIssueResult { pub skipped_stale: bool, pub dirty_source_keys: Vec<(SourceType, i64)>, } #[derive(Debug)] pub(crate) struct IngestMrResult { pub skipped_stale: bool, pub dirty_source_keys: Vec<(SourceType, i64)>, } #[derive(Debug)] pub(crate) struct PreflightResult { pub issues: Vec, pub merge_requests: Vec, pub failures: Vec, } #[derive(Debug)] pub(crate) struct PreflightFailure { pub entity_type: String, pub iid: i64, pub error: LoreError, } // --------------------------------------------------------------------------- // TOCTOU guard // --------------------------------------------------------------------------- /// Returns `true` if the payload is stale (same age or older than what the DB /// already has). Returns `false` when the entity is new (no DB row) or when /// the payload is strictly newer. pub(crate) fn is_stale(payload_updated_at: &str, db_updated_at_ms: Option) -> Result { let Some(db_ms) = db_updated_at_ms else { return Ok(false); }; let payload_ms = chrono::DateTime::parse_from_rfc3339(payload_updated_at) .map(|dt| dt.timestamp_millis()) .map_err(|e| { LoreError::Other(format!( "Failed to parse timestamp '{}': {}", payload_updated_at, e )) })?; Ok(payload_ms <= db_ms) } // --------------------------------------------------------------------------- // Ingestion wrappers // --------------------------------------------------------------------------- /// Ingest a single issue by IID with TOCTOU guard and dirty marking. pub(crate) fn ingest_issue_by_iid( conn: &Connection, config: &Config, project_id: i64, issue: &GitLabIssue, ) -> Result { let db_updated_at = get_db_updated_at(conn, "issues", issue.iid, project_id)?; if is_stale(&issue.updated_at, db_updated_at)? { debug!(iid = issue.iid, "Skipping stale issue (TOCTOU guard)"); return Ok(IngestIssueResult { skipped_stale: true, dirty_source_keys: vec![], }); } process_single_issue(conn, config, project_id, issue)?; let local_id: i64 = conn.query_row( "SELECT id FROM issues WHERE project_id = ? AND iid = ?", (project_id, issue.iid), |row| row.get(0), )?; dirty_tracker::mark_dirty(conn, SourceType::Issue, local_id)?; Ok(IngestIssueResult { skipped_stale: false, dirty_source_keys: vec![(SourceType::Issue, local_id)], }) } /// Ingest a single merge request by IID with TOCTOU guard and dirty marking. pub(crate) fn ingest_mr_by_iid( conn: &Connection, config: &Config, project_id: i64, mr: &GitLabMergeRequest, ) -> Result { let db_updated_at = get_db_updated_at(conn, "merge_requests", mr.iid, project_id)?; if is_stale(&mr.updated_at, db_updated_at)? { debug!(iid = mr.iid, "Skipping stale MR (TOCTOU guard)"); return Ok(IngestMrResult { skipped_stale: true, dirty_source_keys: vec![], }); } process_single_mr(conn, config, project_id, mr)?; let local_id: i64 = conn.query_row( "SELECT id FROM merge_requests WHERE project_id = ? AND iid = ?", (project_id, mr.iid), |row| row.get(0), )?; dirty_tracker::mark_dirty(conn, SourceType::MergeRequest, local_id)?; Ok(IngestMrResult { skipped_stale: false, dirty_source_keys: vec![(SourceType::MergeRequest, local_id)], }) } // --------------------------------------------------------------------------- // Preflight fetch // --------------------------------------------------------------------------- /// Fetch specific issues and MRs by IID from GitLab. Collects successes and /// failures without aborting on individual 404s. /// /// Requests are dispatched concurrently (up to 10 in-flight at once) to avoid /// sequential round-trip latency when syncing many IIDs. pub(crate) async fn preflight_fetch( client: &GitLabClient, gitlab_project_id: i64, targets: &[(String, i64)], ) -> PreflightResult { /// Max concurrent HTTP requests during preflight. const PREFLIGHT_CONCURRENCY: usize = 10; #[allow(clippy::large_enum_variant)] enum FetchOutcome { Issue(std::result::Result), MergeRequest(std::result::Result), UnknownType(String, i64), } let mut result = PreflightResult { issues: Vec::new(), merge_requests: Vec::new(), failures: Vec::new(), }; let mut stream = futures::stream::iter(targets.iter().map(|(entity_type, iid)| { let entity_type = entity_type.clone(); let iid = *iid; async move { match entity_type.as_str() { "issue" => FetchOutcome::Issue( client .get_issue_by_iid(gitlab_project_id, iid) .await .map_err(|e| (entity_type, iid, e)), ), "merge_request" => FetchOutcome::MergeRequest( client .get_mr_by_iid(gitlab_project_id, iid) .await .map_err(|e| (entity_type, iid, e)), ), _ => FetchOutcome::UnknownType(entity_type, iid), } } })) .buffer_unordered(PREFLIGHT_CONCURRENCY); while let Some(outcome) = stream.next().await { match outcome { FetchOutcome::Issue(Ok(issue)) => result.issues.push(issue), FetchOutcome::Issue(Err((et, iid, e))) => { result.failures.push(PreflightFailure { entity_type: et, iid, error: e, }); } FetchOutcome::MergeRequest(Ok(mr)) => result.merge_requests.push(mr), FetchOutcome::MergeRequest(Err((et, iid, e))) => { result.failures.push(PreflightFailure { entity_type: et, iid, error: e, }); } FetchOutcome::UnknownType(et, iid) => { result.failures.push(PreflightFailure { entity_type: et.clone(), iid, error: LoreError::Other(format!("Unknown entity type: {et}")), }); } } } result } // --------------------------------------------------------------------------- // Dependent fetch helpers (surgical mode) // --------------------------------------------------------------------------- /// Counts returned from fetching dependents for a single entity. #[derive(Debug, Default)] pub(crate) struct DependentFetchResult { pub resource_events_fetched: usize, pub discussions_fetched: usize, pub closes_issues_stored: usize, pub file_changes_stored: usize, } /// Fetch and store all dependents for a single issue: /// resource events (state, label, milestone) and discussions. pub(crate) async fn fetch_dependents_for_issue( client: &GitLabClient, conn: &Connection, project_id: i64, gitlab_project_id: i64, iid: i64, local_id: i64, config: &Config, ) -> Result { let mut result = DependentFetchResult::default(); // --- Resource events --- match client .fetch_all_resource_events(gitlab_project_id, "issue", iid) .await { Ok((state_events, label_events, milestone_events)) => { let count = state_events.len() + label_events.len() + milestone_events.len(); let tx = conn.unchecked_transaction()?; store_resource_events( &tx, project_id, "issue", local_id, &state_events, &label_events, &milestone_events, )?; tx.execute( "UPDATE issues SET resource_events_synced_for_updated_at = updated_at WHERE id = ?", [local_id], )?; tx.commit()?; result.resource_events_fetched = count; } Err(e) => { warn!( iid, error = %e, "Failed to fetch resource events for issue, continuing" ); } } // --- Discussions --- let sync_item = IssueForDiscussionSync { local_issue_id: local_id, iid, updated_at: 0, // not used for filtering in surgical mode }; match ingest_issue_discussions( conn, client, config, gitlab_project_id, project_id, &[sync_item], ) .await { Ok(disc_result) => { result.discussions_fetched = disc_result.discussions_fetched; } Err(e) => { warn!( iid, error = %e, "Failed to ingest discussions for issue, continuing" ); } } Ok(result) } /// Fetch and store all dependents for a single merge request: /// resource events, discussions, closes-issues references, and file changes (diffs). pub(crate) async fn fetch_dependents_for_mr( client: &GitLabClient, conn: &Connection, project_id: i64, gitlab_project_id: i64, iid: i64, local_id: i64, config: &Config, ) -> Result { let mut result = DependentFetchResult::default(); // --- Resource events --- match client .fetch_all_resource_events(gitlab_project_id, "merge_request", iid) .await { Ok((state_events, label_events, milestone_events)) => { let count = state_events.len() + label_events.len() + milestone_events.len(); let tx = conn.unchecked_transaction()?; store_resource_events( &tx, project_id, "merge_request", local_id, &state_events, &label_events, &milestone_events, )?; tx.execute( "UPDATE merge_requests SET resource_events_synced_for_updated_at = updated_at WHERE id = ?", [local_id], )?; tx.commit()?; result.resource_events_fetched = count; } Err(e) => { warn!( iid, error = %e, "Failed to fetch resource events for MR, continuing" ); } } // --- Discussions --- let sync_item = MrForDiscussionSync { local_mr_id: local_id, iid, updated_at: 0, }; match ingest_mr_discussions( conn, client, config, gitlab_project_id, project_id, &[sync_item], ) .await { Ok(disc_result) => { result.discussions_fetched = disc_result.discussions_fetched; } Err(e) => { warn!( iid, error = %e, "Failed to ingest discussions for MR, continuing" ); } } // --- Closes issues --- match client.fetch_mr_closes_issues(gitlab_project_id, iid).await { Ok(closes_issues) => { let count = closes_issues.len(); let tx = conn.unchecked_transaction()?; store_closes_issues_refs(&tx, project_id, local_id, &closes_issues)?; tx.execute( "UPDATE merge_requests SET closes_issues_synced_for_updated_at = updated_at WHERE id = ?", [local_id], )?; tx.commit()?; result.closes_issues_stored = count; } Err(e) => { warn!( iid, error = %e, "Failed to fetch closes_issues for MR, continuing" ); } } // --- File changes (diffs) --- match client.fetch_mr_diffs(gitlab_project_id, iid).await { Ok(diffs) => { let tx = conn.unchecked_transaction()?; let stored = upsert_mr_file_changes(&tx, local_id, project_id, &diffs)?; tx.execute( "UPDATE merge_requests SET diffs_synced_for_updated_at = updated_at WHERE id = ?", [local_id], )?; tx.commit()?; result.file_changes_stored = stored; } Err(e) => { warn!( iid, error = %e, "Failed to fetch diffs for MR, continuing" ); } } Ok(result) } // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- fn get_db_updated_at( conn: &Connection, table: &str, iid: i64, project_id: i64, ) -> Result> { // Using a match on known table names avoids SQL injection from the table parameter. let sql = match table { "issues" => "SELECT updated_at FROM issues WHERE project_id = ?1 AND iid = ?2", "merge_requests" => { "SELECT updated_at FROM merge_requests WHERE project_id = ?1 AND iid = ?2" } _ => { return Err(LoreError::Other(format!( "Unknown table for updated_at lookup: {table}" ))); } }; let result: Option = conn .query_row(sql, (project_id, iid), |row| row.get(0)) .optional()?; Ok(result) } #[cfg(test)] #[path = "surgical_tests.rs"] mod tests;