use std::ops::Deref; use rusqlite::{Connection, Transaction, params}; use tracing::{debug, info, warn}; use crate::Config; use crate::core::error::{LoreError, Result}; use crate::core::payloads::{StorePayloadOptions, store_payload}; use crate::core::time::now_ms; use crate::documents::SourceType; use crate::gitlab::GitLabClient; use crate::gitlab::transformers::merge_request::transform_merge_request; use crate::gitlab::types::GitLabMergeRequest; use crate::ingestion::dirty_tracker; #[derive(Debug, Default)] pub struct IngestMergeRequestsResult { pub fetched: usize, pub upserted: usize, pub labels_created: usize, pub assignees_linked: usize, pub reviewers_linked: usize, } #[derive(Debug, Clone)] pub struct MrForDiscussionSync { pub local_mr_id: i64, pub iid: i64, pub updated_at: i64, } #[derive(Debug, Default)] struct SyncCursor { updated_at_cursor: Option, tie_breaker_id: Option, } pub async fn ingest_merge_requests( conn: &Connection, client: &GitLabClient, config: &Config, project_id: i64, gitlab_project_id: i64, full_sync: bool, ) -> Result { let mut result = IngestMergeRequestsResult::default(); if full_sync { reset_sync_cursor(conn, project_id)?; reset_discussion_watermarks(conn, project_id)?; info!("Full sync: cursor and discussion watermarks reset"); } let cursor = get_sync_cursor(conn, project_id)?; debug!(?cursor, "Starting MR ingestion with cursor"); let mut page = 1u32; let per_page = 100u32; loop { let page_result = client .fetch_merge_requests_page( gitlab_project_id, cursor.updated_at_cursor, config.sync.cursor_rewind_seconds, page, per_page, ) .await?; let mut last_updated_at: Option = None; let mut last_gitlab_id: Option = None; for mr in &page_result.items { result.fetched += 1; let mr_updated_at = match parse_timestamp(&mr.updated_at) { Ok(ts) => ts, Err(e) => { warn!( gitlab_id = mr.id, error = %e, "Skipping MR with invalid timestamp" ); continue; } }; if !passes_cursor_filter_with_ts(mr.id, mr_updated_at, &cursor) { debug!(gitlab_id = mr.id, "Skipping already-processed MR"); continue; } let mr_result = process_single_mr(conn, config, project_id, mr)?; result.upserted += 1; result.labels_created += mr_result.labels_created; result.assignees_linked += mr_result.assignees_linked; result.reviewers_linked += mr_result.reviewers_linked; last_updated_at = Some(mr_updated_at); last_gitlab_id = Some(mr.id); } if let (Some(ts), Some(id)) = (last_updated_at, last_gitlab_id) { update_sync_cursor(conn, project_id, ts, id)?; debug!(page, "Page-boundary cursor update"); } if page_result.is_last_page { break; } match page_result.next_page { Some(np) => page = np, None => break, } } info!( fetched = result.fetched, upserted = result.upserted, labels_created = result.labels_created, assignees_linked = result.assignees_linked, reviewers_linked = result.reviewers_linked, "MR ingestion complete" ); Ok(result) } struct ProcessMrResult { labels_created: usize, assignees_linked: usize, reviewers_linked: usize, } fn process_single_mr( conn: &Connection, config: &Config, project_id: i64, mr: &GitLabMergeRequest, ) -> Result { let payload_bytes = serde_json::to_vec(mr)?; let transformed = transform_merge_request(mr, project_id) .map_err(|e| LoreError::Other(format!("MR transform failed: {}", e)))?; let tx = conn.unchecked_transaction()?; let result = process_mr_in_transaction(&tx, config, project_id, mr, &payload_bytes, &transformed)?; tx.commit()?; Ok(result) } fn process_mr_in_transaction( tx: &Transaction<'_>, config: &Config, project_id: i64, mr: &GitLabMergeRequest, payload_bytes: &[u8], transformed: &crate::gitlab::transformers::merge_request::MergeRequestWithMetadata, ) -> Result { let mut labels_created = 0; let mr_row = &transformed.merge_request; let now = now_ms(); let payload_id = store_payload( tx.deref(), StorePayloadOptions { project_id: Some(project_id), resource_type: "merge_request", gitlab_id: &mr.id.to_string(), json_bytes: payload_bytes, compress: config.storage.compress_raw_payloads, }, )?; tx.execute( "INSERT INTO merge_requests ( gitlab_id, project_id, iid, title, description, state, draft, author_username, source_branch, target_branch, head_sha, references_short, references_full, detailed_merge_status, merge_user_username, created_at, updated_at, merged_at, closed_at, last_seen_at, web_url, raw_payload_id ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22) ON CONFLICT(gitlab_id) DO UPDATE SET title = excluded.title, description = excluded.description, state = excluded.state, draft = excluded.draft, author_username = excluded.author_username, source_branch = excluded.source_branch, target_branch = excluded.target_branch, head_sha = excluded.head_sha, references_short = excluded.references_short, references_full = excluded.references_full, detailed_merge_status = excluded.detailed_merge_status, merge_user_username = excluded.merge_user_username, updated_at = excluded.updated_at, merged_at = excluded.merged_at, closed_at = excluded.closed_at, last_seen_at = excluded.last_seen_at, web_url = excluded.web_url, raw_payload_id = excluded.raw_payload_id", params![ mr_row.gitlab_id, project_id, mr_row.iid, &mr_row.title, &mr_row.description, &mr_row.state, mr_row.draft, &mr_row.author_username, &mr_row.source_branch, &mr_row.target_branch, &mr_row.head_sha, &mr_row.references_short, &mr_row.references_full, &mr_row.detailed_merge_status, &mr_row.merge_user_username, mr_row.created_at, mr_row.updated_at, mr_row.merged_at, mr_row.closed_at, now, &mr_row.web_url, payload_id, ], )?; let local_mr_id: i64 = tx.query_row( "SELECT id FROM merge_requests WHERE project_id = ? AND iid = ?", (project_id, mr_row.iid), |row| row.get(0), )?; dirty_tracker::mark_dirty_tx(tx, SourceType::MergeRequest, local_mr_id)?; tx.execute( "DELETE FROM mr_labels WHERE merge_request_id = ?", [local_mr_id], )?; for label_name in &transformed.label_names { let label_id = upsert_label_tx(tx, project_id, label_name, &mut labels_created)?; tx.execute( "INSERT OR IGNORE INTO mr_labels (merge_request_id, label_id) VALUES (?, ?)", (local_mr_id, label_id), )?; } tx.execute( "DELETE FROM mr_assignees WHERE merge_request_id = ?", [local_mr_id], )?; let assignees_linked = transformed.assignee_usernames.len(); for username in &transformed.assignee_usernames { tx.execute( "INSERT OR IGNORE INTO mr_assignees (merge_request_id, username) VALUES (?, ?)", (local_mr_id, username), )?; } tx.execute( "DELETE FROM mr_reviewers WHERE merge_request_id = ?", [local_mr_id], )?; let reviewers_linked = transformed.reviewer_usernames.len(); for username in &transformed.reviewer_usernames { tx.execute( "INSERT OR IGNORE INTO mr_reviewers (merge_request_id, username) VALUES (?, ?)", (local_mr_id, username), )?; } Ok(ProcessMrResult { labels_created, assignees_linked, reviewers_linked, }) } fn upsert_label_tx( tx: &Transaction<'_>, project_id: i64, name: &str, created_count: &mut usize, ) -> Result { let id: i64 = tx.query_row( "INSERT INTO labels (project_id, name) VALUES (?1, ?2) ON CONFLICT(project_id, name) DO UPDATE SET name = excluded.name RETURNING id", (project_id, name), |row| row.get(0), )?; if tx.last_insert_rowid() == id { *created_count += 1; } Ok(id) } fn passes_cursor_filter_with_ts(gitlab_id: i64, mr_ts: i64, cursor: &SyncCursor) -> bool { let Some(cursor_ts) = cursor.updated_at_cursor else { return true; }; if mr_ts < cursor_ts { return false; } if mr_ts == cursor_ts && let Some(cursor_id) = cursor.tie_breaker_id && gitlab_id <= cursor_id { return false; } true } fn get_sync_cursor(conn: &Connection, project_id: i64) -> Result { let row: Option<(Option, Option)> = conn .query_row( "SELECT updated_at_cursor, tie_breaker_id FROM sync_cursors WHERE project_id = ? AND resource_type = 'merge_requests'", [project_id], |row| Ok((row.get(0)?, row.get(1)?)), ) .ok(); Ok(match row { Some((updated_at, tie_breaker)) => SyncCursor { updated_at_cursor: updated_at, tie_breaker_id: tie_breaker, }, None => SyncCursor::default(), }) } fn update_sync_cursor( conn: &Connection, project_id: i64, updated_at: i64, gitlab_id: i64, ) -> Result<()> { conn.execute( "INSERT INTO sync_cursors (project_id, resource_type, updated_at_cursor, tie_breaker_id) VALUES (?1, 'merge_requests', ?2, ?3) ON CONFLICT(project_id, resource_type) DO UPDATE SET updated_at_cursor = excluded.updated_at_cursor, tie_breaker_id = excluded.tie_breaker_id", (project_id, updated_at, gitlab_id), )?; Ok(()) } fn reset_sync_cursor(conn: &Connection, project_id: i64) -> Result<()> { conn.execute( "DELETE FROM sync_cursors WHERE project_id = ? AND resource_type = 'merge_requests'", [project_id], )?; Ok(()) } fn reset_discussion_watermarks(conn: &Connection, project_id: i64) -> Result<()> { conn.execute( "UPDATE merge_requests SET discussions_synced_for_updated_at = NULL, discussions_sync_attempts = 0, discussions_sync_last_error = NULL, resource_events_synced_for_updated_at = NULL WHERE project_id = ?", [project_id], )?; Ok(()) } pub fn get_mrs_needing_discussion_sync( conn: &Connection, project_id: i64, ) -> Result> { let mut stmt = conn.prepare( "SELECT id, iid, updated_at FROM merge_requests WHERE project_id = ? AND updated_at > COALESCE(discussions_synced_for_updated_at, 0)", )?; let mrs: std::result::Result, _> = stmt .query_map([project_id], |row| { Ok(MrForDiscussionSync { local_mr_id: row.get(0)?, iid: row.get(1)?, updated_at: row.get(2)?, }) })? .collect(); Ok(mrs?) } fn parse_timestamp(ts: &str) -> Result { chrono::DateTime::parse_from_rfc3339(ts) .map(|dt| dt.timestamp_millis()) .map_err(|e| LoreError::Other(format!("Failed to parse timestamp '{}': {}", ts, e))) } #[cfg(test)] mod tests { use super::*; #[test] fn result_default_has_zero_counts() { let result = IngestMergeRequestsResult::default(); assert_eq!(result.fetched, 0); assert_eq!(result.upserted, 0); assert_eq!(result.labels_created, 0); assert_eq!(result.assignees_linked, 0); assert_eq!(result.reviewers_linked, 0); } #[test] fn cursor_filter_allows_newer_mrs() { let cursor = SyncCursor { updated_at_cursor: Some(1705312800000), tie_breaker_id: Some(100), }; let later_ts = 1705399200000; assert!(passes_cursor_filter_with_ts(101, later_ts, &cursor)); } #[test] fn cursor_filter_blocks_older_mrs() { let cursor = SyncCursor { updated_at_cursor: Some(1705312800000), tie_breaker_id: Some(100), }; let earlier_ts = 1705226400000; assert!(!passes_cursor_filter_with_ts(99, earlier_ts, &cursor)); } #[test] fn cursor_filter_uses_tie_breaker_for_same_timestamp() { let cursor = SyncCursor { updated_at_cursor: Some(1705312800000), tie_breaker_id: Some(100), }; assert!(passes_cursor_filter_with_ts(101, 1705312800000, &cursor)); assert!(!passes_cursor_filter_with_ts(100, 1705312800000, &cursor)); assert!(!passes_cursor_filter_with_ts(99, 1705312800000, &cursor)); } #[test] fn cursor_filter_allows_all_when_no_cursor() { let cursor = SyncCursor::default(); let old_ts = 1577836800000; assert!(passes_cursor_filter_with_ts(1, old_ts, &cursor)); } }