use futures::StreamExt; use rusqlite::{Connection, params}; use tracing::{debug, info, warn}; use crate::Config; use crate::core::error::Result; use crate::core::payloads::{StorePayloadOptions, store_payload}; use crate::core::time::now_ms; use crate::documents::SourceType; use crate::gitlab::GitLabClient; use crate::gitlab::transformers::{ NormalizedDiscussion, NormalizedNote, transform_mr_discussion, transform_notes_with_diff_position, }; use crate::gitlab::types::GitLabDiscussion; use crate::ingestion::dirty_tracker; use super::merge_requests::MrForDiscussionSync; #[derive(Debug, Default)] pub struct IngestMrDiscussionsResult { pub discussions_fetched: usize, pub discussions_upserted: usize, pub notes_upserted: usize, pub notes_skipped_bad_timestamp: usize, pub diffnotes_count: usize, pub pagination_succeeded: bool, } #[derive(Debug)] pub struct PrefetchedMrDiscussions { pub mr: MrForDiscussionSync, pub discussions: Vec, pub fetch_error: Option, pub had_transform_errors: bool, pub notes_skipped_count: usize, } #[derive(Debug)] pub struct PrefetchedDiscussion { pub raw: GitLabDiscussion, pub normalized: NormalizedDiscussion, pub notes: Vec, } pub async fn prefetch_mr_discussions( client: &GitLabClient, gitlab_project_id: i64, local_project_id: i64, mr: MrForDiscussionSync, ) -> PrefetchedMrDiscussions { debug!(mr_iid = mr.iid, "Prefetching discussions for MR"); let raw_discussions = match client .fetch_all_mr_discussions(gitlab_project_id, mr.iid) .await { Ok(d) => d, Err(e) => { return PrefetchedMrDiscussions { mr, discussions: Vec::new(), fetch_error: Some(e.to_string()), had_transform_errors: false, notes_skipped_count: 0, }; } }; let mut discussions = Vec::with_capacity(raw_discussions.len()); let mut had_transform_errors = false; let mut notes_skipped_count = 0; for raw in raw_discussions { let notes = match transform_notes_with_diff_position(&raw, local_project_id) { Ok(n) => n, Err(e) => { warn!( mr_iid = mr.iid, discussion_id = %raw.id, error = %e, "Note transform failed during prefetch" ); had_transform_errors = true; notes_skipped_count += raw.notes.len(); continue; } }; let normalized = transform_mr_discussion(&raw, local_project_id, mr.local_mr_id); discussions.push(PrefetchedDiscussion { raw, normalized, notes, }); } PrefetchedMrDiscussions { mr, discussions, fetch_error: None, had_transform_errors, notes_skipped_count, } } pub fn write_prefetched_mr_discussions( conn: &Connection, config: &Config, local_project_id: i64, prefetched: PrefetchedMrDiscussions, ) -> Result { let sync_succeeded = prefetched.fetch_error.is_none() && !prefetched.had_transform_errors; let mut result = IngestMrDiscussionsResult { pagination_succeeded: sync_succeeded, notes_skipped_bad_timestamp: prefetched.notes_skipped_count, ..Default::default() }; let mr = &prefetched.mr; if let Some(error) = &prefetched.fetch_error { warn!(mr_iid = mr.iid, error = %error, "Prefetch failed for MR"); record_sync_health_error(conn, mr.local_mr_id, error)?; return Ok(result); } let run_seen_at = now_ms(); for disc in &prefetched.discussions { let diffnotes_in_disc = disc .notes .iter() .filter(|n| n.position_new_path.is_some() || n.position_old_path.is_some()) .count(); let notes_in_disc = disc.notes.len(); let tx = conn.unchecked_transaction()?; let payload_bytes = serde_json::to_vec(&disc.raw)?; let payload_id = Some(store_payload( &tx, StorePayloadOptions { project_id: Some(local_project_id), resource_type: "discussion", gitlab_id: &disc.raw.id, json_bytes: &payload_bytes, compress: config.storage.compress_raw_payloads, }, )?); upsert_discussion(&tx, &disc.normalized, run_seen_at, payload_id)?; let local_discussion_id: i64 = tx.query_row( "SELECT id FROM discussions WHERE project_id = ? AND gitlab_discussion_id = ?", params![local_project_id, &disc.normalized.gitlab_discussion_id], |row| row.get(0), )?; dirty_tracker::mark_dirty_tx(&tx, SourceType::Discussion, local_discussion_id)?; for note in &disc.notes { let should_store_payload = !note.is_system || note.position_new_path.is_some() || note.position_old_path.is_some(); let note_payload_id = if should_store_payload { let note_data = disc.raw.notes.iter().find(|n| n.id == note.gitlab_id); if let Some(note_data) = note_data { let note_payload_bytes = serde_json::to_vec(note_data)?; Some(store_payload( &tx, StorePayloadOptions { project_id: Some(local_project_id), resource_type: "note", gitlab_id: ¬e.gitlab_id.to_string(), json_bytes: ¬e_payload_bytes, compress: config.storage.compress_raw_payloads, }, )?) } else { None } } else { None }; upsert_note(&tx, local_discussion_id, note, run_seen_at, note_payload_id)?; } tx.commit()?; result.discussions_fetched += 1; result.discussions_upserted += 1; result.notes_upserted += notes_in_disc; result.diffnotes_count += diffnotes_in_disc; } if sync_succeeded { sweep_stale_discussions(conn, mr.local_mr_id, run_seen_at)?; sweep_stale_notes(conn, local_project_id, mr.local_mr_id, run_seen_at)?; mark_discussions_synced(conn, mr.local_mr_id, mr.updated_at)?; clear_sync_health_error(conn, mr.local_mr_id)?; debug!( mr_iid = mr.iid, "MR discussion sync complete, watermark advanced" ); } else if prefetched.had_transform_errors { warn!( mr_iid = mr.iid, notes_skipped = prefetched.notes_skipped_count, "Transform errors occurred; watermark NOT advanced to preserve data" ); } Ok(result) } pub async fn ingest_mr_discussions( conn: &Connection, client: &GitLabClient, config: &Config, gitlab_project_id: i64, local_project_id: i64, mrs: &[MrForDiscussionSync], ) -> Result { let mut total_result = IngestMrDiscussionsResult { pagination_succeeded: true, ..Default::default() }; for mr in mrs { let result = ingest_discussions_for_mr( conn, client, config, gitlab_project_id, local_project_id, mr, ) .await?; total_result.discussions_fetched += result.discussions_fetched; total_result.discussions_upserted += result.discussions_upserted; total_result.notes_upserted += result.notes_upserted; total_result.notes_skipped_bad_timestamp += result.notes_skipped_bad_timestamp; total_result.diffnotes_count += result.diffnotes_count; if !result.pagination_succeeded { total_result.pagination_succeeded = false; } } info!( mrs_processed = mrs.len(), discussions_fetched = total_result.discussions_fetched, discussions_upserted = total_result.discussions_upserted, notes_upserted = total_result.notes_upserted, notes_skipped = total_result.notes_skipped_bad_timestamp, diffnotes = total_result.diffnotes_count, pagination_succeeded = total_result.pagination_succeeded, "MR discussion ingestion complete" ); Ok(total_result) } async fn ingest_discussions_for_mr( conn: &Connection, client: &GitLabClient, config: &Config, gitlab_project_id: i64, local_project_id: i64, mr: &MrForDiscussionSync, ) -> Result { let mut result = IngestMrDiscussionsResult { pagination_succeeded: true, ..Default::default() }; debug!( mr_iid = mr.iid, local_mr_id = mr.local_mr_id, "Fetching discussions for MR" ); let run_seen_at = now_ms(); let mut discussions_stream = client.paginate_mr_discussions(gitlab_project_id, mr.iid); let mut received_first_response = false; while let Some(disc_result) = discussions_stream.next().await { if !received_first_response { received_first_response = true; } let gitlab_discussion = match disc_result { Ok(d) => d, Err(e) => { warn!( mr_iid = mr.iid, error = %e, "Error during MR discussion pagination" ); result.pagination_succeeded = false; record_sync_health_error(conn, mr.local_mr_id, &e.to_string())?; break; } }; let notes = match transform_notes_with_diff_position(&gitlab_discussion, local_project_id) { Ok(notes) => notes, Err(e) => { warn!( mr_iid = mr.iid, discussion_id = %gitlab_discussion.id, error = %e, "Note transform failed; preserving existing notes" ); result.notes_skipped_bad_timestamp += gitlab_discussion.notes.len(); result.pagination_succeeded = false; continue; } }; let diffnotes_in_disc = notes .iter() .filter(|n| n.position_new_path.is_some() || n.position_old_path.is_some()) .count(); let notes_count = notes.len(); let normalized_discussion = transform_mr_discussion(&gitlab_discussion, local_project_id, mr.local_mr_id); let tx = conn.unchecked_transaction()?; let payload_bytes = serde_json::to_vec(&gitlab_discussion)?; let payload_id = Some(store_payload( &tx, StorePayloadOptions { project_id: Some(local_project_id), resource_type: "discussion", gitlab_id: &gitlab_discussion.id, json_bytes: &payload_bytes, compress: config.storage.compress_raw_payloads, }, )?); upsert_discussion(&tx, &normalized_discussion, run_seen_at, payload_id)?; let local_discussion_id: i64 = tx.query_row( "SELECT id FROM discussions WHERE project_id = ? AND gitlab_discussion_id = ?", params![ local_project_id, &normalized_discussion.gitlab_discussion_id ], |row| row.get(0), )?; dirty_tracker::mark_dirty_tx(&tx, SourceType::Discussion, local_discussion_id)?; for note in ¬es { let should_store_payload = !note.is_system || note.position_new_path.is_some() || note.position_old_path.is_some(); let note_payload_id = if should_store_payload { let note_data = gitlab_discussion .notes .iter() .find(|n| n.id == note.gitlab_id); if let Some(note_data) = note_data { let note_payload_bytes = serde_json::to_vec(note_data)?; Some(store_payload( &tx, StorePayloadOptions { project_id: Some(local_project_id), resource_type: "note", gitlab_id: ¬e.gitlab_id.to_string(), json_bytes: ¬e_payload_bytes, compress: config.storage.compress_raw_payloads, }, )?) } else { None } } else { None }; upsert_note(&tx, local_discussion_id, note, run_seen_at, note_payload_id)?; } tx.commit()?; result.discussions_fetched += 1; result.discussions_upserted += 1; result.notes_upserted += notes_count; result.diffnotes_count += diffnotes_in_disc; } if result.pagination_succeeded && received_first_response { sweep_stale_discussions(conn, mr.local_mr_id, run_seen_at)?; sweep_stale_notes(conn, local_project_id, mr.local_mr_id, run_seen_at)?; mark_discussions_synced(conn, mr.local_mr_id, mr.updated_at)?; clear_sync_health_error(conn, mr.local_mr_id)?; debug!( mr_iid = mr.iid, "MR discussion sync complete, watermark advanced" ); } else if result.pagination_succeeded && !received_first_response { sweep_stale_discussions(conn, mr.local_mr_id, run_seen_at)?; sweep_stale_notes(conn, local_project_id, mr.local_mr_id, run_seen_at)?; mark_discussions_synced(conn, mr.local_mr_id, mr.updated_at)?; clear_sync_health_error(conn, mr.local_mr_id)?; } else { warn!( mr_iid = mr.iid, discussions_seen = result.discussions_upserted, notes_skipped = result.notes_skipped_bad_timestamp, "Watermark NOT advanced; will retry on next sync" ); } Ok(result) } fn upsert_discussion( conn: &Connection, discussion: &crate::gitlab::transformers::NormalizedDiscussion, last_seen_at: i64, payload_id: Option, ) -> Result<()> { conn.execute( "INSERT INTO discussions ( gitlab_discussion_id, project_id, issue_id, merge_request_id, noteable_type, individual_note, first_note_at, last_note_at, last_seen_at, resolvable, resolved, raw_payload_id ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12) ON CONFLICT(project_id, gitlab_discussion_id) DO UPDATE SET first_note_at = excluded.first_note_at, last_note_at = excluded.last_note_at, last_seen_at = excluded.last_seen_at, resolvable = excluded.resolvable, resolved = excluded.resolved, raw_payload_id = COALESCE(excluded.raw_payload_id, raw_payload_id)", params![ &discussion.gitlab_discussion_id, discussion.project_id, discussion.issue_id, discussion.merge_request_id, &discussion.noteable_type, discussion.individual_note, discussion.first_note_at, discussion.last_note_at, last_seen_at, discussion.resolvable, discussion.resolved, payload_id, ], )?; Ok(()) } fn upsert_note( conn: &Connection, discussion_id: i64, note: &NormalizedNote, last_seen_at: i64, payload_id: Option, ) -> Result<()> { conn.execute( "INSERT INTO notes ( gitlab_id, discussion_id, project_id, note_type, is_system, author_username, body, created_at, updated_at, last_seen_at, position, resolvable, resolved, resolved_by, resolved_at, position_old_path, position_new_path, position_old_line, position_new_line, position_type, position_line_range_start, position_line_range_end, position_base_sha, position_start_sha, position_head_sha, raw_payload_id ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22, ?23, ?24, ?25, ?26) ON CONFLICT(gitlab_id) DO UPDATE SET note_type = excluded.note_type, body = excluded.body, updated_at = excluded.updated_at, last_seen_at = excluded.last_seen_at, resolvable = excluded.resolvable, resolved = excluded.resolved, resolved_by = excluded.resolved_by, resolved_at = excluded.resolved_at, position_old_path = excluded.position_old_path, position_new_path = excluded.position_new_path, position_old_line = excluded.position_old_line, position_new_line = excluded.position_new_line, position_type = excluded.position_type, position_line_range_start = excluded.position_line_range_start, position_line_range_end = excluded.position_line_range_end, position_base_sha = excluded.position_base_sha, position_start_sha = excluded.position_start_sha, position_head_sha = excluded.position_head_sha, raw_payload_id = COALESCE(excluded.raw_payload_id, raw_payload_id)", params![ note.gitlab_id, discussion_id, note.project_id, ¬e.note_type, note.is_system, ¬e.author_username, ¬e.body, note.created_at, note.updated_at, last_seen_at, note.position, note.resolvable, note.resolved, ¬e.resolved_by, note.resolved_at, ¬e.position_old_path, ¬e.position_new_path, note.position_old_line, note.position_new_line, ¬e.position_type, note.position_line_range_start, note.position_line_range_end, ¬e.position_base_sha, ¬e.position_start_sha, ¬e.position_head_sha, payload_id, ], )?; Ok(()) } fn sweep_stale_discussions(conn: &Connection, local_mr_id: i64, run_seen_at: i64) -> Result { let deleted = conn.execute( "DELETE FROM discussions WHERE merge_request_id = ? AND last_seen_at < ?", params![local_mr_id, run_seen_at], )?; if deleted > 0 { debug!(local_mr_id, deleted, "Swept stale discussions"); } Ok(deleted) } fn sweep_stale_notes( conn: &Connection, local_project_id: i64, local_mr_id: i64, run_seen_at: i64, ) -> Result { let deleted = conn.execute( "DELETE FROM notes WHERE project_id = ? AND discussion_id IN ( SELECT id FROM discussions WHERE merge_request_id = ? ) AND last_seen_at < ?", params![local_project_id, local_mr_id, run_seen_at], )?; if deleted > 0 { debug!(local_mr_id, deleted, "Swept stale notes"); } Ok(deleted) } fn mark_discussions_synced(conn: &Connection, local_mr_id: i64, updated_at: i64) -> Result<()> { conn.execute( "UPDATE merge_requests SET discussions_synced_for_updated_at = ? WHERE id = ?", params![updated_at, local_mr_id], )?; Ok(()) } fn record_sync_health_error(conn: &Connection, local_mr_id: i64, error: &str) -> Result<()> { conn.execute( "UPDATE merge_requests SET discussions_sync_last_attempt_at = ?, discussions_sync_attempts = discussions_sync_attempts + 1, discussions_sync_last_error = ? WHERE id = ?", params![now_ms(), error, local_mr_id], )?; Ok(()) } fn clear_sync_health_error(conn: &Connection, local_mr_id: i64) -> Result<()> { conn.execute( "UPDATE merge_requests SET discussions_sync_last_attempt_at = ?, discussions_sync_last_error = NULL WHERE id = ?", params![now_ms(), local_mr_id], )?; Ok(()) } #[cfg(test)] mod tests { use super::*; #[test] fn result_default_has_zero_counts() { let result = IngestMrDiscussionsResult::default(); assert_eq!(result.discussions_fetched, 0); assert_eq!(result.discussions_upserted, 0); assert_eq!(result.notes_upserted, 0); assert_eq!(result.notes_skipped_bad_timestamp, 0); assert_eq!(result.diffnotes_count, 0); assert!(!result.pagination_succeeded); } #[test] fn result_pagination_succeeded_false_by_default() { let result = IngestMrDiscussionsResult::default(); assert!(!result.pagination_succeeded); } }