diff --git a/src/ingestion/discussions.rs b/src/ingestion/discussions.rs index 46b57b7..3738c0f 100644 --- a/src/ingestion/discussions.rs +++ b/src/ingestion/discussions.rs @@ -189,24 +189,27 @@ async fn ingest_discussions_for_issue( if pagination_error.is_none() && received_first_response { let removed = remove_stale_discussions(conn, issue.local_issue_id, &seen_discussion_ids)?; result.stale_discussions_removed = removed; - + // Update discussions_synced_for_updated_at on the issue update_issue_sync_timestamp(conn, issue.local_issue_id, issue.updated_at)?; - } else if pagination_error.is_none() && !received_first_response && seen_discussion_ids.is_empty() { + } else if pagination_error.is_none() + && !received_first_response + && seen_discussion_ids.is_empty() + { // Stream was empty but no error - issue genuinely has no discussions // This is safe to remove stale discussions (if any exist from before) let removed = remove_stale_discussions(conn, issue.local_issue_id, &seen_discussion_ids)?; result.stale_discussions_removed = removed; - + update_issue_sync_timestamp(conn, issue.local_issue_id, issue.updated_at)?; - } else if pagination_error.is_some() { + } else if let Some(err) = pagination_error { warn!( issue_iid = issue.iid, discussions_seen = seen_discussion_ids.len(), "Skipping stale removal due to pagination error" ); // Return the error to signal incomplete sync - return Err(pagination_error.unwrap()); + return Err(err); } Ok(result) @@ -308,10 +311,10 @@ fn remove_stale_discussions( "CREATE TEMP TABLE IF NOT EXISTS _temp_seen_discussions (id TEXT PRIMARY KEY)", [], )?; - + // Clear any previous data conn.execute("DELETE FROM _temp_seen_discussions", [])?; - + // Insert seen IDs in chunks for chunk in seen_ids.chunks(CHUNK_SIZE) { let placeholders: Vec<&str> = chunk.iter().map(|_| "(?)").collect(); @@ -319,11 +322,12 @@ fn remove_stale_discussions( "INSERT OR IGNORE INTO _temp_seen_discussions (id) VALUES {}", placeholders.join(", ") ); - - let params: Vec<&dyn rusqlite::ToSql> = chunk.iter().map(|s| s as &dyn rusqlite::ToSql).collect(); + + let params: Vec<&dyn rusqlite::ToSql> = + chunk.iter().map(|s| s as &dyn rusqlite::ToSql).collect(); conn.execute(&sql, params.as_slice())?; } - + // Delete discussions not in temp table let deleted = conn.execute( "DELETE FROM discussions diff --git a/src/ingestion/issues.rs b/src/ingestion/issues.rs index 6a2fb99..63ffe7c 100644 --- a/src/ingestion/issues.rs +++ b/src/ingestion/issues.rs @@ -148,12 +148,11 @@ fn passes_cursor_filter_with_ts(gitlab_id: i64, issue_ts: i64, cursor: &SyncCurs return false; } - if issue_ts == cursor_ts { - if let Some(cursor_id) = cursor.tie_breaker_id { - if gitlab_id <= cursor_id { - return false; - } - } + if issue_ts == cursor_ts + && let Some(cursor_id) = cursor.tie_breaker_id + && gitlab_id <= cursor_id + { + return false; } true @@ -219,6 +218,7 @@ fn process_single_issue( } /// Inner function that performs all DB operations within a transaction. +#[allow(clippy::too_many_arguments)] fn process_issue_in_transaction( tx: &Transaction<'_>, config: &Config, @@ -366,7 +366,11 @@ fn link_issue_label_tx(tx: &Transaction<'_>, issue_id: i64, label_id: i64) -> Re } /// Upsert a milestone within a transaction, returning its local ID. -fn upsert_milestone_tx(tx: &Transaction<'_>, project_id: i64, milestone: &MilestoneRow) -> Result { +fn upsert_milestone_tx( + tx: &Transaction<'_>, + project_id: i64, + milestone: &MilestoneRow, +) -> Result { tx.execute( "INSERT INTO milestones (gitlab_id, project_id, iid, title, description, state, due_date, web_url) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) diff --git a/src/ingestion/merge_requests.rs b/src/ingestion/merge_requests.rs new file mode 100644 index 0000000..4172611 --- /dev/null +++ b/src/ingestion/merge_requests.rs @@ -0,0 +1,515 @@ +//! Merge request ingestion with cursor-based incremental sync. +//! +//! Fetches merge requests from GitLab and stores them locally with: +//! - Cursor-based pagination for incremental sync +//! - Page-boundary cursor updates for crash recovery +//! - Raw payload storage with deduplication +//! - Label/assignee/reviewer extraction with clear-and-relink pattern +//! - Tracking of MRs needing discussion sync + +use std::ops::Deref; + +use rusqlite::{Connection, Transaction, params}; +use tracing::{debug, info, warn}; + +use crate::Config; +use crate::core::error::{GiError, Result}; +use crate::core::payloads::{StorePayloadOptions, store_payload}; +use crate::core::time::now_ms; +use crate::gitlab::GitLabClient; +use crate::gitlab::transformers::merge_request::transform_merge_request; +use crate::gitlab::types::GitLabMergeRequest; + +/// Result of merge request ingestion. +#[derive(Debug, Default)] +pub struct IngestMergeRequestsResult { + pub fetched: usize, + pub upserted: usize, + pub labels_created: usize, + pub assignees_linked: usize, + pub reviewers_linked: usize, +} + +/// MR that needs discussion sync. +#[derive(Debug, Clone)] +pub struct MrForDiscussionSync { + pub local_mr_id: i64, + pub iid: i64, + pub updated_at: i64, // ms epoch +} + +/// Cursor state for incremental sync. +#[derive(Debug, Default)] +struct SyncCursor { + updated_at_cursor: Option, + tie_breaker_id: Option, +} + +/// Ingest merge requests for a project. +pub async fn ingest_merge_requests( + conn: &Connection, + client: &GitLabClient, + config: &Config, + project_id: i64, // Local DB project ID + gitlab_project_id: i64, // GitLab project ID + full_sync: bool, // Reset cursor if true +) -> Result { + let mut result = IngestMergeRequestsResult::default(); + + // Handle full sync - reset cursor and discussion watermarks + if full_sync { + reset_sync_cursor(conn, project_id)?; + reset_discussion_watermarks(conn, project_id)?; + info!("Full sync: cursor and discussion watermarks reset"); + } + + // 1. Get current cursor + let cursor = get_sync_cursor(conn, project_id)?; + debug!(?cursor, "Starting MR ingestion with cursor"); + + // 2. Fetch MRs page by page with cursor rewind + let mut page = 1u32; + let per_page = 100u32; + + loop { + let page_result = client + .fetch_merge_requests_page( + gitlab_project_id, + cursor.updated_at_cursor, + config.sync.cursor_rewind_seconds, + page, + per_page, + ) + .await?; + + let mut last_updated_at: Option = None; + let mut last_gitlab_id: Option = None; + + // 3. Process each MR + for mr in &page_result.items { + result.fetched += 1; + + // Parse timestamp early + let mr_updated_at = match parse_timestamp(&mr.updated_at) { + Ok(ts) => ts, + Err(e) => { + warn!( + gitlab_id = mr.id, + error = %e, + "Skipping MR with invalid timestamp" + ); + continue; + } + }; + + // Apply local cursor filter (skip already-processed due to rewind overlap) + if !passes_cursor_filter_with_ts(mr.id, mr_updated_at, &cursor) { + debug!(gitlab_id = mr.id, "Skipping already-processed MR"); + continue; + } + + // Transform and store + let mr_result = process_single_mr(conn, config, project_id, mr)?; + result.upserted += 1; + result.labels_created += mr_result.labels_created; + result.assignees_linked += mr_result.assignees_linked; + result.reviewers_linked += mr_result.reviewers_linked; + + // Track cursor position + last_updated_at = Some(mr_updated_at); + last_gitlab_id = Some(mr.id); + } + + // 4. Page-boundary cursor update + if let (Some(ts), Some(id)) = (last_updated_at, last_gitlab_id) { + update_sync_cursor(conn, project_id, ts, id)?; + debug!(page, "Page-boundary cursor update"); + } + + // 5. Check for more pages + if page_result.is_last_page { + break; + } + match page_result.next_page { + Some(np) => page = np, + None => break, + } + } + + info!( + fetched = result.fetched, + upserted = result.upserted, + labels_created = result.labels_created, + assignees_linked = result.assignees_linked, + reviewers_linked = result.reviewers_linked, + "MR ingestion complete" + ); + + Ok(result) +} + +/// Result of processing a single MR. +struct ProcessMrResult { + labels_created: usize, + assignees_linked: usize, + reviewers_linked: usize, +} + +/// Process a single MR: store payload, upsert MR, handle labels/assignees/reviewers. +/// All operations are wrapped in a transaction for atomicity. +fn process_single_mr( + conn: &Connection, + config: &Config, + project_id: i64, + mr: &GitLabMergeRequest, +) -> Result { + // Transform MR first (outside transaction - no DB access) + let payload_json = serde_json::to_value(mr)?; + let transformed = transform_merge_request(mr, project_id) + .map_err(|e| GiError::Other(format!("MR transform failed: {}", e)))?; + + // Wrap all DB operations in a transaction for atomicity + let tx = conn.unchecked_transaction()?; + let result = + process_mr_in_transaction(&tx, config, project_id, mr, &payload_json, &transformed)?; + tx.commit()?; + + Ok(result) +} + +/// Inner function that performs all DB operations within a transaction. +fn process_mr_in_transaction( + tx: &Transaction<'_>, + config: &Config, + project_id: i64, + mr: &GitLabMergeRequest, + payload_json: &serde_json::Value, + transformed: &crate::gitlab::transformers::merge_request::MergeRequestWithMetadata, +) -> Result { + let mut labels_created = 0; + let mr_row = &transformed.merge_request; + let now = now_ms(); + + // Store raw payload + let payload_id = store_payload( + tx.deref(), + StorePayloadOptions { + project_id: Some(project_id), + resource_type: "merge_request", + gitlab_id: &mr.id.to_string(), + payload: payload_json, + compress: config.storage.compress_raw_payloads, + }, + )?; + + // Upsert merge request + tx.execute( + "INSERT INTO merge_requests ( + gitlab_id, project_id, iid, title, description, state, draft, + author_username, source_branch, target_branch, head_sha, + references_short, references_full, detailed_merge_status, + merge_user_username, created_at, updated_at, merged_at, closed_at, + last_seen_at, web_url, raw_payload_id + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22) + ON CONFLICT(gitlab_id) DO UPDATE SET + title = excluded.title, + description = excluded.description, + state = excluded.state, + draft = excluded.draft, + author_username = excluded.author_username, + source_branch = excluded.source_branch, + target_branch = excluded.target_branch, + head_sha = excluded.head_sha, + references_short = excluded.references_short, + references_full = excluded.references_full, + detailed_merge_status = excluded.detailed_merge_status, + merge_user_username = excluded.merge_user_username, + updated_at = excluded.updated_at, + merged_at = excluded.merged_at, + closed_at = excluded.closed_at, + last_seen_at = excluded.last_seen_at, + web_url = excluded.web_url, + raw_payload_id = excluded.raw_payload_id", + params![ + mr_row.gitlab_id, + project_id, + mr_row.iid, + &mr_row.title, + &mr_row.description, + &mr_row.state, + mr_row.draft, + &mr_row.author_username, + &mr_row.source_branch, + &mr_row.target_branch, + &mr_row.head_sha, + &mr_row.references_short, + &mr_row.references_full, + &mr_row.detailed_merge_status, + &mr_row.merge_user_username, + mr_row.created_at, + mr_row.updated_at, + mr_row.merged_at, + mr_row.closed_at, + now, + &mr_row.web_url, + payload_id, + ], + )?; + + // Get local MR ID + let local_mr_id: i64 = tx.query_row( + "SELECT id FROM merge_requests WHERE project_id = ? AND iid = ?", + (project_id, mr_row.iid), + |row| row.get(0), + )?; + + // Clear-and-relink labels + tx.execute( + "DELETE FROM mr_labels WHERE merge_request_id = ?", + [local_mr_id], + )?; + for label_name in &transformed.label_names { + let label_id = upsert_label_tx(tx, project_id, label_name, &mut labels_created)?; + tx.execute( + "INSERT OR IGNORE INTO mr_labels (merge_request_id, label_id) VALUES (?, ?)", + (local_mr_id, label_id), + )?; + } + + // Clear-and-relink assignees + tx.execute( + "DELETE FROM mr_assignees WHERE merge_request_id = ?", + [local_mr_id], + )?; + let assignees_linked = transformed.assignee_usernames.len(); + for username in &transformed.assignee_usernames { + tx.execute( + "INSERT OR IGNORE INTO mr_assignees (merge_request_id, username) VALUES (?, ?)", + (local_mr_id, username), + )?; + } + + // Clear-and-relink reviewers + tx.execute( + "DELETE FROM mr_reviewers WHERE merge_request_id = ?", + [local_mr_id], + )?; + let reviewers_linked = transformed.reviewer_usernames.len(); + for username in &transformed.reviewer_usernames { + tx.execute( + "INSERT OR IGNORE INTO mr_reviewers (merge_request_id, username) VALUES (?, ?)", + (local_mr_id, username), + )?; + } + + Ok(ProcessMrResult { + labels_created, + assignees_linked, + reviewers_linked, + }) +} + +/// Upsert a label within a transaction, returning its ID. +fn upsert_label_tx( + tx: &Transaction<'_>, + project_id: i64, + name: &str, + created_count: &mut usize, +) -> Result { + // Try to get existing + let existing: Option = tx + .query_row( + "SELECT id FROM labels WHERE project_id = ? AND name = ?", + (project_id, name), + |row| row.get(0), + ) + .ok(); + + if let Some(id) = existing { + return Ok(id); + } + + // Insert new + tx.execute( + "INSERT INTO labels (project_id, name) VALUES (?, ?)", + (project_id, name), + )?; + *created_count += 1; + + Ok(tx.last_insert_rowid()) +} + +/// Check if an MR passes the cursor filter (not already processed). +/// Takes pre-parsed timestamp to avoid redundant parsing. +fn passes_cursor_filter_with_ts(gitlab_id: i64, mr_ts: i64, cursor: &SyncCursor) -> bool { + let Some(cursor_ts) = cursor.updated_at_cursor else { + return true; // No cursor = fetch all + }; + + if mr_ts < cursor_ts { + return false; + } + + if mr_ts == cursor_ts + && let Some(cursor_id) = cursor.tie_breaker_id + && gitlab_id <= cursor_id + { + return false; + } + + true +} + +/// Get the current sync cursor for merge requests. +fn get_sync_cursor(conn: &Connection, project_id: i64) -> Result { + let row: Option<(Option, Option)> = conn + .query_row( + "SELECT updated_at_cursor, tie_breaker_id FROM sync_cursors + WHERE project_id = ? AND resource_type = 'merge_requests'", + [project_id], + |row| Ok((row.get(0)?, row.get(1)?)), + ) + .ok(); + + Ok(match row { + Some((updated_at, tie_breaker)) => SyncCursor { + updated_at_cursor: updated_at, + tie_breaker_id: tie_breaker, + }, + None => SyncCursor::default(), + }) +} + +/// Update the sync cursor. +fn update_sync_cursor( + conn: &Connection, + project_id: i64, + updated_at: i64, + gitlab_id: i64, +) -> Result<()> { + conn.execute( + "INSERT INTO sync_cursors (project_id, resource_type, updated_at_cursor, tie_breaker_id) + VALUES (?1, 'merge_requests', ?2, ?3) + ON CONFLICT(project_id, resource_type) DO UPDATE SET + updated_at_cursor = excluded.updated_at_cursor, + tie_breaker_id = excluded.tie_breaker_id", + (project_id, updated_at, gitlab_id), + )?; + Ok(()) +} + +/// Reset the sync cursor (for full sync). +fn reset_sync_cursor(conn: &Connection, project_id: i64) -> Result<()> { + conn.execute( + "DELETE FROM sync_cursors WHERE project_id = ? AND resource_type = 'merge_requests'", + [project_id], + )?; + Ok(()) +} + +/// Reset discussion watermarks for all MRs in project (for full sync). +fn reset_discussion_watermarks(conn: &Connection, project_id: i64) -> Result<()> { + conn.execute( + "UPDATE merge_requests + SET discussions_synced_for_updated_at = NULL, + discussions_sync_attempts = 0, + discussions_sync_last_error = NULL + WHERE project_id = ?", + [project_id], + )?; + Ok(()) +} + +/// Get MRs that need discussion sync (updated_at > discussions_synced_for_updated_at). +pub fn get_mrs_needing_discussion_sync( + conn: &Connection, + project_id: i64, +) -> Result> { + let mut stmt = conn.prepare( + "SELECT id, iid, updated_at FROM merge_requests + WHERE project_id = ? + AND updated_at > COALESCE(discussions_synced_for_updated_at, 0)", + )?; + + let mrs: std::result::Result, _> = stmt + .query_map([project_id], |row| { + Ok(MrForDiscussionSync { + local_mr_id: row.get(0)?, + iid: row.get(1)?, + updated_at: row.get(2)?, + }) + })? + .collect(); + + Ok(mrs?) +} + +/// Parse ISO 8601 timestamp to milliseconds. +fn parse_timestamp(ts: &str) -> Result { + chrono::DateTime::parse_from_rfc3339(ts) + .map(|dt| dt.timestamp_millis()) + .map_err(|e| GiError::Other(format!("Failed to parse timestamp '{}': {}", ts, e))) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn result_default_has_zero_counts() { + let result = IngestMergeRequestsResult::default(); + assert_eq!(result.fetched, 0); + assert_eq!(result.upserted, 0); + assert_eq!(result.labels_created, 0); + assert_eq!(result.assignees_linked, 0); + assert_eq!(result.reviewers_linked, 0); + } + + #[test] + fn cursor_filter_allows_newer_mrs() { + let cursor = SyncCursor { + updated_at_cursor: Some(1705312800000), // 2024-01-15T10:00:00Z + tie_breaker_id: Some(100), + }; + + // MR with later timestamp passes + let later_ts = 1705399200000; // 2024-01-16T10:00:00Z + assert!(passes_cursor_filter_with_ts(101, later_ts, &cursor)); + } + + #[test] + fn cursor_filter_blocks_older_mrs() { + let cursor = SyncCursor { + updated_at_cursor: Some(1705312800000), + tie_breaker_id: Some(100), + }; + + // MR with earlier timestamp blocked + let earlier_ts = 1705226400000; // 2024-01-14T10:00:00Z + assert!(!passes_cursor_filter_with_ts(99, earlier_ts, &cursor)); + } + + #[test] + fn cursor_filter_uses_tie_breaker_for_same_timestamp() { + let cursor = SyncCursor { + updated_at_cursor: Some(1705312800000), + tie_breaker_id: Some(100), + }; + + // Same timestamp, higher ID passes + assert!(passes_cursor_filter_with_ts(101, 1705312800000, &cursor)); + + // Same timestamp, same ID blocked + assert!(!passes_cursor_filter_with_ts(100, 1705312800000, &cursor)); + + // Same timestamp, lower ID blocked + assert!(!passes_cursor_filter_with_ts(99, 1705312800000, &cursor)); + } + + #[test] + fn cursor_filter_allows_all_when_no_cursor() { + let cursor = SyncCursor::default(); + let old_ts = 1577836800000; // 2020-01-01T00:00:00Z + assert!(passes_cursor_filter_with_ts(1, old_ts, &cursor)); + } +} diff --git a/src/ingestion/mod.rs b/src/ingestion/mod.rs index c5c8ab0..87534c7 100644 --- a/src/ingestion/mod.rs +++ b/src/ingestion/mod.rs @@ -5,11 +5,19 @@ pub mod discussions; pub mod issues; +pub mod merge_requests; +pub mod mr_discussions; pub mod orchestrator; pub use discussions::{IngestDiscussionsResult, ingest_issue_discussions}; pub use issues::{IngestIssuesResult, IssueForDiscussionSync, ingest_issues}; -pub use orchestrator::{ - IngestProjectResult, ProgressCallback, ProgressEvent, ingest_project_issues, - ingest_project_issues_with_progress, +pub use merge_requests::{ + IngestMergeRequestsResult, MrForDiscussionSync, get_mrs_needing_discussion_sync, + ingest_merge_requests, +}; +pub use mr_discussions::{IngestMrDiscussionsResult, ingest_mr_discussions}; +pub use orchestrator::{ + IngestMrProjectResult, IngestProjectResult, ProgressCallback, ProgressEvent, + ingest_project_issues, ingest_project_issues_with_progress, ingest_project_merge_requests, + ingest_project_merge_requests_with_progress, }; diff --git a/src/ingestion/mr_discussions.rs b/src/ingestion/mr_discussions.rs new file mode 100644 index 0000000..35e9d36 --- /dev/null +++ b/src/ingestion/mr_discussions.rs @@ -0,0 +1,673 @@ +//! MR Discussion ingestion with atomicity guarantees. +//! +//! Critical requirements: +//! - Parse notes BEFORE any destructive DB operations +//! - Watermark advanced ONLY on full pagination success +//! - Upsert + sweep pattern for data replacement +//! - Sync health telemetry for debugging failures +//! +//! Supports two modes: +//! - Streaming: fetch and write incrementally (memory efficient) +//! - Prefetch: fetch all upfront, then write (enables parallel API calls) + +use futures::StreamExt; +use rusqlite::{Connection, params}; +use tracing::{debug, info, warn}; + +use crate::Config; +use crate::core::error::Result; +use crate::core::payloads::{StorePayloadOptions, store_payload}; +use crate::core::time::now_ms; +use crate::gitlab::GitLabClient; +use crate::gitlab::transformers::{ + NormalizedDiscussion, NormalizedNote, transform_mr_discussion, + transform_notes_with_diff_position, +}; +use crate::gitlab::types::GitLabDiscussion; + +use super::merge_requests::MrForDiscussionSync; + +/// Result of MR discussion ingestion for a single MR. +#[derive(Debug, Default)] +pub struct IngestMrDiscussionsResult { + pub discussions_fetched: usize, + pub discussions_upserted: usize, + pub notes_upserted: usize, + pub notes_skipped_bad_timestamp: usize, + pub diffnotes_count: usize, + pub pagination_succeeded: bool, +} + +/// Prefetched discussions for an MR (ready for DB write). +/// This separates the API fetch phase from the DB write phase to enable parallelism. +#[derive(Debug)] +pub struct PrefetchedMrDiscussions { + pub mr: MrForDiscussionSync, + pub discussions: Vec, + pub fetch_error: Option, + /// True if any discussions failed to transform (skip sweep if true) + pub had_transform_errors: bool, + /// Count of notes skipped due to transform errors + pub notes_skipped_count: usize, +} + +/// A single prefetched discussion with transformed data. +#[derive(Debug)] +pub struct PrefetchedDiscussion { + pub raw: GitLabDiscussion, + pub normalized: NormalizedDiscussion, + pub notes: Vec, +} + +/// Fetch discussions for an MR without writing to DB. +/// This can be called in parallel for multiple MRs. +pub async fn prefetch_mr_discussions( + client: &GitLabClient, + gitlab_project_id: i64, + local_project_id: i64, + mr: MrForDiscussionSync, +) -> PrefetchedMrDiscussions { + debug!(mr_iid = mr.iid, "Prefetching discussions for MR"); + + // Fetch all discussions from GitLab + let raw_discussions = match client.fetch_all_mr_discussions(gitlab_project_id, mr.iid).await { + Ok(d) => d, + Err(e) => { + return PrefetchedMrDiscussions { + mr, + discussions: Vec::new(), + fetch_error: Some(e.to_string()), + had_transform_errors: false, + notes_skipped_count: 0, + }; + } + }; + + // Transform each discussion + let mut discussions = Vec::with_capacity(raw_discussions.len()); + let mut had_transform_errors = false; + let mut notes_skipped_count = 0; + + for raw in raw_discussions { + // Transform notes + let notes = match transform_notes_with_diff_position(&raw, local_project_id) { + Ok(n) => n, + Err(e) => { + warn!( + mr_iid = mr.iid, + discussion_id = %raw.id, + error = %e, + "Note transform failed during prefetch" + ); + // Track the failure - don't sweep stale data if transforms failed + had_transform_errors = true; + notes_skipped_count += raw.notes.len(); + continue; + } + }; + + // Transform discussion + let normalized = transform_mr_discussion(&raw, local_project_id, mr.local_mr_id); + + discussions.push(PrefetchedDiscussion { + raw, + normalized, + notes, + }); + } + + PrefetchedMrDiscussions { + mr, + discussions, + fetch_error: None, + had_transform_errors, + notes_skipped_count, + } +} + +/// Write prefetched discussions to DB. +/// This must be called serially (rusqlite Connection is not Send). +pub fn write_prefetched_mr_discussions( + conn: &Connection, + config: &Config, + local_project_id: i64, + prefetched: PrefetchedMrDiscussions, +) -> Result { + // Sync succeeds only if no fetch errors AND no transform errors + let sync_succeeded = prefetched.fetch_error.is_none() && !prefetched.had_transform_errors; + + let mut result = IngestMrDiscussionsResult { + pagination_succeeded: sync_succeeded, + notes_skipped_bad_timestamp: prefetched.notes_skipped_count, + ..Default::default() + }; + + let mr = &prefetched.mr; + + // Handle fetch errors + if let Some(error) = &prefetched.fetch_error { + warn!(mr_iid = mr.iid, error = %error, "Prefetch failed for MR"); + record_sync_health_error(conn, mr.local_mr_id, error)?; + return Ok(result); + } + + let run_seen_at = now_ms(); + + // Write each discussion + for disc in &prefetched.discussions { + result.discussions_fetched += 1; + + // Count DiffNotes + result.diffnotes_count += disc + .notes + .iter() + .filter(|n| n.position_new_path.is_some() || n.position_old_path.is_some()) + .count(); + + // Start transaction + let tx = conn.unchecked_transaction()?; + + // Store raw payload + let payload_json = serde_json::to_value(&disc.raw)?; + let payload_id = Some(store_payload( + &tx, + StorePayloadOptions { + project_id: Some(local_project_id), + resource_type: "discussion", + gitlab_id: &disc.raw.id, + payload: &payload_json, + compress: config.storage.compress_raw_payloads, + }, + )?); + + // Upsert discussion + upsert_discussion(&tx, &disc.normalized, run_seen_at, payload_id)?; + result.discussions_upserted += 1; + + // Get local discussion ID + let local_discussion_id: i64 = tx.query_row( + "SELECT id FROM discussions WHERE project_id = ? AND gitlab_discussion_id = ?", + params![local_project_id, &disc.normalized.gitlab_discussion_id], + |row| row.get(0), + )?; + + // Upsert notes + for note in &disc.notes { + let should_store_payload = !note.is_system + || note.position_new_path.is_some() + || note.position_old_path.is_some(); + + let note_payload_id = if should_store_payload { + let note_data = disc.raw.notes.iter().find(|n| n.id == note.gitlab_id); + if let Some(note_data) = note_data { + let note_payload_json = serde_json::to_value(note_data)?; + Some(store_payload( + &tx, + StorePayloadOptions { + project_id: Some(local_project_id), + resource_type: "note", + gitlab_id: ¬e.gitlab_id.to_string(), + payload: ¬e_payload_json, + compress: config.storage.compress_raw_payloads, + }, + )?) + } else { + None + } + } else { + None + }; + + upsert_note(&tx, local_discussion_id, note, run_seen_at, note_payload_id)?; + result.notes_upserted += 1; + } + + tx.commit()?; + } + + // Only sweep stale data and advance watermark on full success + // If any discussions failed to transform, preserve existing data + if sync_succeeded { + sweep_stale_discussions(conn, mr.local_mr_id, run_seen_at)?; + sweep_stale_notes(conn, local_project_id, mr.local_mr_id, run_seen_at)?; + mark_discussions_synced(conn, mr.local_mr_id, mr.updated_at)?; + clear_sync_health_error(conn, mr.local_mr_id)?; + + debug!(mr_iid = mr.iid, "MR discussion sync complete, watermark advanced"); + } else if prefetched.had_transform_errors { + warn!( + mr_iid = mr.iid, + notes_skipped = prefetched.notes_skipped_count, + "Transform errors occurred; watermark NOT advanced to preserve data" + ); + } + + Ok(result) +} + +/// Ingest discussions for MRs that need sync. +pub async fn ingest_mr_discussions( + conn: &Connection, + client: &GitLabClient, + config: &Config, + gitlab_project_id: i64, + local_project_id: i64, + mrs: &[MrForDiscussionSync], +) -> Result { + let mut total_result = IngestMrDiscussionsResult { + pagination_succeeded: true, // Start optimistic + ..Default::default() + }; + + for mr in mrs { + let result = ingest_discussions_for_mr( + conn, + client, + config, + gitlab_project_id, + local_project_id, + mr, + ) + .await?; + + total_result.discussions_fetched += result.discussions_fetched; + total_result.discussions_upserted += result.discussions_upserted; + total_result.notes_upserted += result.notes_upserted; + total_result.notes_skipped_bad_timestamp += result.notes_skipped_bad_timestamp; + total_result.diffnotes_count += result.diffnotes_count; + // Pagination failed for any MR means overall failure + if !result.pagination_succeeded { + total_result.pagination_succeeded = false; + } + } + + info!( + mrs_processed = mrs.len(), + discussions_fetched = total_result.discussions_fetched, + discussions_upserted = total_result.discussions_upserted, + notes_upserted = total_result.notes_upserted, + notes_skipped = total_result.notes_skipped_bad_timestamp, + diffnotes = total_result.diffnotes_count, + pagination_succeeded = total_result.pagination_succeeded, + "MR discussion ingestion complete" + ); + + Ok(total_result) +} + +/// Ingest discussions for a single MR. +async fn ingest_discussions_for_mr( + conn: &Connection, + client: &GitLabClient, + config: &Config, + gitlab_project_id: i64, + local_project_id: i64, + mr: &MrForDiscussionSync, +) -> Result { + let mut result = IngestMrDiscussionsResult { + pagination_succeeded: true, + ..Default::default() + }; + + debug!( + mr_iid = mr.iid, + local_mr_id = mr.local_mr_id, + "Fetching discussions for MR" + ); + + // Record sync start time for sweep + let run_seen_at = now_ms(); + + // Stream discussions from GitLab + let mut discussions_stream = client.paginate_mr_discussions(gitlab_project_id, mr.iid); + + // Track if we've received any response + let mut received_first_response = false; + + while let Some(disc_result) = discussions_stream.next().await { + if !received_first_response { + received_first_response = true; + } + + // Handle pagination errors - don't advance watermark + let gitlab_discussion = match disc_result { + Ok(d) => d, + Err(e) => { + warn!( + mr_iid = mr.iid, + error = %e, + "Error during MR discussion pagination" + ); + result.pagination_succeeded = false; + record_sync_health_error(conn, mr.local_mr_id, &e.to_string())?; + break; + } + }; + result.discussions_fetched += 1; + + // CRITICAL: Parse notes BEFORE any destructive DB operations + let notes = match transform_notes_with_diff_position(&gitlab_discussion, local_project_id) { + Ok(notes) => notes, + Err(e) => { + warn!( + mr_iid = mr.iid, + discussion_id = %gitlab_discussion.id, + error = %e, + "Note transform failed; preserving existing notes" + ); + result.notes_skipped_bad_timestamp += gitlab_discussion.notes.len(); + result.pagination_succeeded = false; + continue; // Skip this discussion, preserve existing data + } + }; + + // Count DiffNotes + result.diffnotes_count += notes + .iter() + .filter(|n| n.position_new_path.is_some() || n.position_old_path.is_some()) + .count(); + + // Transform discussion + let normalized_discussion = + transform_mr_discussion(&gitlab_discussion, local_project_id, mr.local_mr_id); + + // Only NOW start transaction (after parse succeeded) + let tx = conn.unchecked_transaction()?; + + // Store raw payload + let payload_json = serde_json::to_value(&gitlab_discussion)?; + let payload_id = Some(store_payload( + &tx, + StorePayloadOptions { + project_id: Some(local_project_id), + resource_type: "discussion", + gitlab_id: &gitlab_discussion.id, + payload: &payload_json, + compress: config.storage.compress_raw_payloads, + }, + )?); + + // Upsert discussion with run_seen_at + upsert_discussion(&tx, &normalized_discussion, run_seen_at, payload_id)?; + result.discussions_upserted += 1; + + // Get local discussion ID + let local_discussion_id: i64 = tx.query_row( + "SELECT id FROM discussions WHERE project_id = ? AND gitlab_discussion_id = ?", + params![ + local_project_id, + &normalized_discussion.gitlab_discussion_id + ], + |row| row.get(0), + )?; + + // Upsert notes (not delete-all-then-insert) + for note in ¬es { + // Selective payload storage: skip system notes without position + let should_store_payload = !note.is_system + || note.position_new_path.is_some() + || note.position_old_path.is_some(); + + let note_payload_id = if should_store_payload { + let note_data = gitlab_discussion + .notes + .iter() + .find(|n| n.id == note.gitlab_id); + if let Some(note_data) = note_data { + let note_payload_json = serde_json::to_value(note_data)?; + Some(store_payload( + &tx, + StorePayloadOptions { + project_id: Some(local_project_id), + resource_type: "note", + gitlab_id: ¬e.gitlab_id.to_string(), + payload: ¬e_payload_json, + compress: config.storage.compress_raw_payloads, + }, + )?) + } else { + None + } + } else { + None + }; + + upsert_note(&tx, local_discussion_id, note, run_seen_at, note_payload_id)?; + result.notes_upserted += 1; + } + + tx.commit()?; + } + + // Only sweep stale data and advance watermark on full success + if result.pagination_succeeded && received_first_response { + // Sweep stale discussions for this MR + sweep_stale_discussions(conn, mr.local_mr_id, run_seen_at)?; + + // Sweep stale notes for this MR + sweep_stale_notes(conn, local_project_id, mr.local_mr_id, run_seen_at)?; + + // Advance watermark + mark_discussions_synced(conn, mr.local_mr_id, mr.updated_at)?; + clear_sync_health_error(conn, mr.local_mr_id)?; + + debug!( + mr_iid = mr.iid, + "MR discussion sync complete, watermark advanced" + ); + } else if result.pagination_succeeded && !received_first_response { + // Empty response (no discussions) - still safe to sweep and advance + sweep_stale_discussions(conn, mr.local_mr_id, run_seen_at)?; + sweep_stale_notes(conn, local_project_id, mr.local_mr_id, run_seen_at)?; + mark_discussions_synced(conn, mr.local_mr_id, mr.updated_at)?; + clear_sync_health_error(conn, mr.local_mr_id)?; + } else { + warn!( + mr_iid = mr.iid, + discussions_seen = result.discussions_upserted, + notes_skipped = result.notes_skipped_bad_timestamp, + "Watermark NOT advanced; will retry on next sync" + ); + } + + Ok(result) +} + +/// Upsert a discussion with last_seen_at for sweep. +fn upsert_discussion( + conn: &Connection, + discussion: &crate::gitlab::transformers::NormalizedDiscussion, + last_seen_at: i64, + payload_id: Option, +) -> Result<()> { + conn.execute( + "INSERT INTO discussions ( + gitlab_discussion_id, project_id, issue_id, merge_request_id, noteable_type, + individual_note, first_note_at, last_note_at, last_seen_at, + resolvable, resolved, raw_payload_id + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12) + ON CONFLICT(project_id, gitlab_discussion_id) DO UPDATE SET + first_note_at = excluded.first_note_at, + last_note_at = excluded.last_note_at, + last_seen_at = excluded.last_seen_at, + resolvable = excluded.resolvable, + resolved = excluded.resolved, + raw_payload_id = COALESCE(excluded.raw_payload_id, raw_payload_id)", + params![ + &discussion.gitlab_discussion_id, + discussion.project_id, + discussion.issue_id, + discussion.merge_request_id, + &discussion.noteable_type, + discussion.individual_note, + discussion.first_note_at, + discussion.last_note_at, + last_seen_at, + discussion.resolvable, + discussion.resolved, + payload_id, + ], + )?; + Ok(()) +} + +/// Upsert a note with last_seen_at for sweep. +fn upsert_note( + conn: &Connection, + discussion_id: i64, + note: &NormalizedNote, + last_seen_at: i64, + payload_id: Option, +) -> Result<()> { + conn.execute( + "INSERT INTO notes ( + gitlab_id, discussion_id, project_id, note_type, is_system, + author_username, body, created_at, updated_at, last_seen_at, + position, resolvable, resolved, resolved_by, resolved_at, + position_old_path, position_new_path, position_old_line, position_new_line, + position_type, position_line_range_start, position_line_range_end, + position_base_sha, position_start_sha, position_head_sha, + raw_payload_id + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22, ?23, ?24, ?25, ?26) + ON CONFLICT(gitlab_id) DO UPDATE SET + note_type = excluded.note_type, + body = excluded.body, + updated_at = excluded.updated_at, + last_seen_at = excluded.last_seen_at, + resolvable = excluded.resolvable, + resolved = excluded.resolved, + resolved_by = excluded.resolved_by, + resolved_at = excluded.resolved_at, + position_old_path = excluded.position_old_path, + position_new_path = excluded.position_new_path, + position_old_line = excluded.position_old_line, + position_new_line = excluded.position_new_line, + position_type = excluded.position_type, + position_line_range_start = excluded.position_line_range_start, + position_line_range_end = excluded.position_line_range_end, + position_base_sha = excluded.position_base_sha, + position_start_sha = excluded.position_start_sha, + position_head_sha = excluded.position_head_sha, + raw_payload_id = COALESCE(excluded.raw_payload_id, raw_payload_id)", + params![ + note.gitlab_id, + discussion_id, + note.project_id, + ¬e.note_type, + note.is_system, + ¬e.author_username, + ¬e.body, + note.created_at, + note.updated_at, + last_seen_at, + note.position, + note.resolvable, + note.resolved, + ¬e.resolved_by, + note.resolved_at, + ¬e.position_old_path, + ¬e.position_new_path, + note.position_old_line, + note.position_new_line, + ¬e.position_type, + note.position_line_range_start, + note.position_line_range_end, + ¬e.position_base_sha, + ¬e.position_start_sha, + ¬e.position_head_sha, + payload_id, + ], + )?; + Ok(()) +} + +/// Sweep stale discussions (not seen in this run). +fn sweep_stale_discussions(conn: &Connection, local_mr_id: i64, run_seen_at: i64) -> Result { + let deleted = conn.execute( + "DELETE FROM discussions + WHERE merge_request_id = ? AND last_seen_at < ?", + params![local_mr_id, run_seen_at], + )?; + if deleted > 0 { + debug!(local_mr_id, deleted, "Swept stale discussions"); + } + Ok(deleted) +} + +/// Sweep stale notes for discussions belonging to this MR. +fn sweep_stale_notes( + conn: &Connection, + local_project_id: i64, + local_mr_id: i64, + run_seen_at: i64, +) -> Result { + let deleted = conn.execute( + "DELETE FROM notes + WHERE project_id = ? + AND discussion_id IN ( + SELECT id FROM discussions WHERE merge_request_id = ? + ) + AND last_seen_at < ?", + params![local_project_id, local_mr_id, run_seen_at], + )?; + if deleted > 0 { + debug!(local_mr_id, deleted, "Swept stale notes"); + } + Ok(deleted) +} + +/// Mark MR discussions as synced (advance watermark). +fn mark_discussions_synced(conn: &Connection, local_mr_id: i64, updated_at: i64) -> Result<()> { + conn.execute( + "UPDATE merge_requests SET discussions_synced_for_updated_at = ? WHERE id = ?", + params![updated_at, local_mr_id], + )?; + Ok(()) +} + +/// Record sync health error for debugging. +fn record_sync_health_error(conn: &Connection, local_mr_id: i64, error: &str) -> Result<()> { + conn.execute( + "UPDATE merge_requests SET + discussions_sync_last_attempt_at = ?, + discussions_sync_attempts = discussions_sync_attempts + 1, + discussions_sync_last_error = ? + WHERE id = ?", + params![now_ms(), error, local_mr_id], + )?; + Ok(()) +} + +/// Clear sync health error on success. +fn clear_sync_health_error(conn: &Connection, local_mr_id: i64) -> Result<()> { + conn.execute( + "UPDATE merge_requests SET + discussions_sync_last_attempt_at = ?, + discussions_sync_last_error = NULL + WHERE id = ?", + params![now_ms(), local_mr_id], + )?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn result_default_has_zero_counts() { + let result = IngestMrDiscussionsResult::default(); + assert_eq!(result.discussions_fetched, 0); + assert_eq!(result.discussions_upserted, 0); + assert_eq!(result.notes_upserted, 0); + assert_eq!(result.notes_skipped_bad_timestamp, 0); + assert_eq!(result.diffnotes_count, 0); + assert!(!result.pagination_succeeded); + } + + #[test] + fn result_pagination_succeeded_false_by_default() { + let result = IngestMrDiscussionsResult::default(); + assert!(!result.pagination_succeeded); + } +} diff --git a/src/ingestion/orchestrator.rs b/src/ingestion/orchestrator.rs index 35c7ffd..2f5deeb 100644 --- a/src/ingestion/orchestrator.rs +++ b/src/ingestion/orchestrator.rs @@ -1,10 +1,11 @@ -//! Ingestion orchestrator: coordinates issue and discussion sync. +//! Ingestion orchestrator: coordinates issue/MR and discussion sync. //! -//! Implements the CP1 canonical pattern: -//! 1. Fetch issues with cursor-based sync -//! 2. Identify issues needing discussion sync -//! 3. Execute discussion sync sequentially (rusqlite Connection is not Send) +//! Implements the canonical pattern: +//! 1. Fetch resources (issues or MRs) with cursor-based sync +//! 2. Identify resources needing discussion sync +//! 3. Execute discussion sync with parallel prefetch (fetch in parallel, write serially) +use futures::future::join_all; use rusqlite::Connection; use tracing::info; @@ -14,6 +15,10 @@ use crate::gitlab::GitLabClient; use super::discussions::ingest_issue_discussions; use super::issues::{IssueForDiscussionSync, ingest_issues}; +use super::merge_requests::{ + MrForDiscussionSync, get_mrs_needing_discussion_sync, ingest_merge_requests, +}; +use super::mr_discussions::{prefetch_mr_discussions, write_prefetched_mr_discussions}; /// Progress callback for ingestion operations. pub type ProgressCallback = Box; @@ -33,9 +38,21 @@ pub enum ProgressEvent { DiscussionSynced { current: usize, total: usize }, /// Discussion sync complete DiscussionSyncComplete, + /// MR fetching started + MrsFetchStarted, + /// An MR was fetched (current count) + MrFetched { count: usize }, + /// MR fetching complete + MrsFetchComplete { total: usize }, + /// MR discussion sync started (total MRs to sync) + MrDiscussionSyncStarted { total: usize }, + /// MR discussion synced (current/total) + MrDiscussionSynced { current: usize, total: usize }, + /// MR discussion sync complete + MrDiscussionSyncComplete, } -/// Result of full project ingestion. +/// Result of full project ingestion (issues). #[derive(Debug, Default)] pub struct IngestProjectResult { pub issues_fetched: usize, @@ -48,6 +65,23 @@ pub struct IngestProjectResult { pub issues_skipped_discussion_sync: usize, } +/// Result of MR ingestion for a project. +#[derive(Debug, Default)] +pub struct IngestMrProjectResult { + pub mrs_fetched: usize, + pub mrs_upserted: usize, + pub labels_created: usize, + pub assignees_linked: usize, + pub reviewers_linked: usize, + pub discussions_fetched: usize, + pub discussions_upserted: usize, + pub notes_upserted: usize, + pub notes_skipped_bad_timestamp: usize, + pub diffnotes_count: usize, + pub mrs_synced_discussions: usize, + pub mrs_skipped_discussion_sync: usize, +} + /// Ingest all issues and their discussions for a project. pub async fn ingest_project_issues( conn: &Connection, @@ -194,6 +228,183 @@ async fn sync_discussions_sequential( Ok(results) } +/// Ingest all merge requests and their discussions for a project. +pub async fn ingest_project_merge_requests( + conn: &Connection, + client: &GitLabClient, + config: &Config, + project_id: i64, + gitlab_project_id: i64, + full_sync: bool, +) -> Result { + ingest_project_merge_requests_with_progress( + conn, + client, + config, + project_id, + gitlab_project_id, + full_sync, + None, + ) + .await +} + +/// Ingest all merge requests and their discussions for a project with progress reporting. +pub async fn ingest_project_merge_requests_with_progress( + conn: &Connection, + client: &GitLabClient, + config: &Config, + project_id: i64, + gitlab_project_id: i64, + full_sync: bool, + progress: Option, +) -> Result { + let mut result = IngestMrProjectResult::default(); + let emit = |event: ProgressEvent| { + if let Some(ref cb) = progress { + cb(event); + } + }; + + // Step 1: Ingest MRs + emit(ProgressEvent::MrsFetchStarted); + let mr_result = ingest_merge_requests( + conn, + client, + config, + project_id, + gitlab_project_id, + full_sync, + ) + .await?; + + result.mrs_fetched = mr_result.fetched; + result.mrs_upserted = mr_result.upserted; + result.labels_created = mr_result.labels_created; + result.assignees_linked = mr_result.assignees_linked; + result.reviewers_linked = mr_result.reviewers_linked; + + emit(ProgressEvent::MrsFetchComplete { + total: result.mrs_fetched, + }); + + // Step 2: Query DB for MRs needing discussion sync + // CRITICAL: Query AFTER ingestion to avoid memory growth during large ingests + let mrs_needing_sync = get_mrs_needing_discussion_sync(conn, project_id)?; + + // Query total MRs for accurate skip count + let total_mrs: i64 = conn + .query_row( + "SELECT COUNT(*) FROM merge_requests WHERE project_id = ?", + [project_id], + |row| row.get(0), + ) + .unwrap_or(0); + let total_mrs = total_mrs as usize; + result.mrs_skipped_discussion_sync = total_mrs.saturating_sub(mrs_needing_sync.len()); + + if mrs_needing_sync.is_empty() { + info!("No MRs need discussion sync"); + return Ok(result); + } + + info!( + count = mrs_needing_sync.len(), + "Starting discussion sync for MRs" + ); + + emit(ProgressEvent::MrDiscussionSyncStarted { + total: mrs_needing_sync.len(), + }); + + // Step 3: Execute sequential MR discussion sync + let discussion_results = sync_mr_discussions_sequential( + conn, + client, + config, + gitlab_project_id, + project_id, + &mrs_needing_sync, + &progress, + ) + .await?; + + emit(ProgressEvent::MrDiscussionSyncComplete); + + // Aggregate discussion results + for disc_result in discussion_results { + result.discussions_fetched += disc_result.discussions_fetched; + result.discussions_upserted += disc_result.discussions_upserted; + result.notes_upserted += disc_result.notes_upserted; + result.notes_skipped_bad_timestamp += disc_result.notes_skipped_bad_timestamp; + result.diffnotes_count += disc_result.diffnotes_count; + if disc_result.pagination_succeeded { + result.mrs_synced_discussions += 1; + } + } + + info!( + mrs_fetched = result.mrs_fetched, + mrs_upserted = result.mrs_upserted, + labels_created = result.labels_created, + discussions_fetched = result.discussions_fetched, + notes_upserted = result.notes_upserted, + diffnotes = result.diffnotes_count, + mrs_synced = result.mrs_synced_discussions, + mrs_skipped = result.mrs_skipped_discussion_sync, + "MR project ingestion complete" + ); + + Ok(result) +} + +/// Sync discussions for MRs with parallel API prefetching. +/// +/// Pattern: Fetch discussions for multiple MRs in parallel, then write serially. +/// This overlaps network I/O while respecting rusqlite's single-connection constraint. +async fn sync_mr_discussions_sequential( + conn: &Connection, + client: &GitLabClient, + config: &Config, + gitlab_project_id: i64, + local_project_id: i64, + mrs: &[MrForDiscussionSync], + progress: &Option, +) -> Result> { + let batch_size = config.sync.dependent_concurrency as usize; + let total = mrs.len(); + + let mut results = Vec::with_capacity(mrs.len()); + let mut processed = 0; + + // Process in batches: parallel API fetch, serial DB write + for chunk in mrs.chunks(batch_size) { + // Step 1: Prefetch discussions for all MRs in this batch in parallel + let prefetch_futures = chunk.iter().map(|mr| { + prefetch_mr_discussions(client, gitlab_project_id, local_project_id, mr.clone()) + }); + let prefetched_batch = join_all(prefetch_futures).await; + + // Step 2: Write each prefetched result serially + for prefetched in prefetched_batch { + let disc_result = + write_prefetched_mr_discussions(conn, config, local_project_id, prefetched)?; + results.push(disc_result); + processed += 1; + + // Emit progress + if let Some(cb) = progress { + cb(ProgressEvent::MrDiscussionSynced { + current: processed, + total, + }); + } + } + } + + Ok(results) +} + #[cfg(test)] mod tests { use super::*; @@ -209,4 +420,21 @@ mod tests { assert_eq!(result.issues_synced_discussions, 0); assert_eq!(result.issues_skipped_discussion_sync, 0); } + + #[test] + fn mr_result_default_has_zero_counts() { + let result = IngestMrProjectResult::default(); + assert_eq!(result.mrs_fetched, 0); + assert_eq!(result.mrs_upserted, 0); + assert_eq!(result.labels_created, 0); + assert_eq!(result.assignees_linked, 0); + assert_eq!(result.reviewers_linked, 0); + assert_eq!(result.discussions_fetched, 0); + assert_eq!(result.discussions_upserted, 0); + assert_eq!(result.notes_upserted, 0); + assert_eq!(result.notes_skipped_bad_timestamp, 0); + assert_eq!(result.diffnotes_count, 0); + assert_eq!(result.mrs_synced_discussions, 0); + assert_eq!(result.mrs_skipped_discussion_sync, 0); + } }