use futures::StreamExt; use rusqlite::{Connection, params}; use tracing::{debug, warn}; use crate::Config; use crate::core::error::Result; use crate::core::payloads::{StorePayloadOptions, store_payload}; use crate::core::time::now_ms; use crate::documents::SourceType; use crate::gitlab::GitLabClient; use crate::gitlab::transformers::{ NormalizedNote, NoteableRef, transform_discussion, transform_notes, }; use crate::ingestion::dirty_tracker; use super::issues::IssueForDiscussionSync; #[derive(Debug)] pub struct NoteUpsertOutcome { pub local_note_id: i64, pub changed_semantics: bool, } #[derive(Debug, Default)] pub struct IngestDiscussionsResult { pub discussions_fetched: usize, pub discussions_upserted: usize, pub notes_upserted: usize, pub stale_discussions_removed: usize, } pub async fn ingest_issue_discussions( conn: &Connection, client: &GitLabClient, config: &Config, gitlab_project_id: i64, local_project_id: i64, issues: &[IssueForDiscussionSync], ) -> Result { let mut total_result = IngestDiscussionsResult::default(); for issue in issues { let result = ingest_discussions_for_issue( conn, client, config, gitlab_project_id, local_project_id, issue, ) .await?; total_result.discussions_fetched += result.discussions_fetched; total_result.discussions_upserted += result.discussions_upserted; total_result.notes_upserted += result.notes_upserted; total_result.stale_discussions_removed += result.stale_discussions_removed; } debug!( issues_processed = issues.len(), discussions_fetched = total_result.discussions_fetched, discussions_upserted = total_result.discussions_upserted, notes_upserted = total_result.notes_upserted, stale_removed = total_result.stale_discussions_removed, "Discussion ingestion complete" ); Ok(total_result) } async fn ingest_discussions_for_issue( conn: &Connection, client: &GitLabClient, config: &Config, gitlab_project_id: i64, local_project_id: i64, issue: &IssueForDiscussionSync, ) -> Result { let mut result = IngestDiscussionsResult::default(); debug!( issue_iid = issue.iid, local_issue_id = issue.local_issue_id, "Fetching discussions for issue" ); let mut discussions_stream = client.paginate_issue_discussions(gitlab_project_id, issue.iid); let mut seen_discussion_ids: Vec = Vec::new(); let mut pagination_error: Option = None; let run_seen_at = now_ms(); while let Some(disc_result) = discussions_stream.next().await { let gitlab_discussion = match disc_result { Ok(d) => d, Err(e) => { warn!( issue_iid = issue.iid, error = %e, "Error during discussion pagination, skipping stale removal" ); pagination_error = Some(e); break; } }; result.discussions_fetched += 1; let payload_bytes = serde_json::to_vec(&gitlab_discussion)?; let normalized = transform_discussion( &gitlab_discussion, local_project_id, NoteableRef::Issue(issue.local_issue_id), ); let tx = conn.unchecked_transaction()?; let payload_id = store_payload( &tx, StorePayloadOptions { project_id: Some(local_project_id), resource_type: "discussion", gitlab_id: &gitlab_discussion.id, json_bytes: &payload_bytes, compress: config.storage.compress_raw_payloads, }, )?; upsert_discussion(&tx, &normalized, payload_id)?; let local_discussion_id: i64 = tx.query_row( "SELECT id FROM discussions WHERE project_id = ? AND gitlab_discussion_id = ?", (local_project_id, &normalized.gitlab_discussion_id), |row| row.get(0), )?; dirty_tracker::mark_dirty_tx(&tx, SourceType::Discussion, local_discussion_id)?; // Mark child note documents dirty (they inherit parent metadata) tx.execute( "INSERT INTO dirty_sources (source_type, source_id, queued_at) SELECT 'note', n.id, ?1 FROM notes n WHERE n.discussion_id = ?2 AND n.is_system = 0 ON CONFLICT(source_type, source_id) DO UPDATE SET queued_at = excluded.queued_at, attempt_count = 0", params![now_ms(), local_discussion_id], )?; let notes = transform_notes(&gitlab_discussion, local_project_id); let notes_count = notes.len(); for note in notes { let outcome = upsert_note_for_issue(&tx, local_discussion_id, ¬e, run_seen_at, None)?; if !note.is_system && outcome.changed_semantics { dirty_tracker::mark_dirty_tx(&tx, SourceType::Note, outcome.local_note_id)?; } } sweep_stale_issue_notes(&tx, local_discussion_id, run_seen_at)?; tx.commit()?; result.discussions_upserted += 1; result.notes_upserted += notes_count; seen_discussion_ids.push(normalized.gitlab_discussion_id.clone()); } if pagination_error.is_none() { let removed = remove_stale_discussions(conn, issue.local_issue_id, &seen_discussion_ids)?; result.stale_discussions_removed = removed; update_issue_sync_timestamp(conn, issue.local_issue_id, issue.updated_at)?; } else if let Some(err) = pagination_error { warn!( issue_iid = issue.iid, discussions_seen = seen_discussion_ids.len(), "Skipping stale removal due to pagination error" ); return Err(err); } Ok(result) } fn upsert_discussion( conn: &Connection, discussion: &crate::gitlab::transformers::NormalizedDiscussion, payload_id: i64, ) -> Result<()> { conn.execute( "INSERT INTO discussions ( gitlab_discussion_id, project_id, issue_id, merge_request_id, noteable_type, individual_note, first_note_at, last_note_at, last_seen_at, resolvable, resolved, raw_payload_id ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12) ON CONFLICT(project_id, gitlab_discussion_id) DO UPDATE SET first_note_at = excluded.first_note_at, last_note_at = excluded.last_note_at, last_seen_at = excluded.last_seen_at, resolvable = excluded.resolvable, resolved = excluded.resolved, raw_payload_id = excluded.raw_payload_id", ( &discussion.gitlab_discussion_id, discussion.project_id, discussion.issue_id, discussion.merge_request_id, &discussion.noteable_type, discussion.individual_note, discussion.first_note_at, discussion.last_note_at, discussion.last_seen_at, discussion.resolvable, discussion.resolved, payload_id, ), )?; Ok(()) } fn upsert_note_for_issue( conn: &Connection, discussion_id: i64, note: &NormalizedNote, last_seen_at: i64, payload_id: Option, ) -> Result { // Pre-read for semantic change detection let existing = conn .query_row( "SELECT id, body, note_type, resolved, resolved_by, position_old_path, position_new_path, position_old_line, position_new_line, position_type, position_line_range_start, position_line_range_end, position_base_sha, position_start_sha, position_head_sha FROM notes WHERE gitlab_id = ?", params![note.gitlab_id], |row| { Ok(( row.get::<_, i64>(0)?, row.get::<_, String>(1)?, row.get::<_, Option>(2)?, row.get::<_, bool>(3)?, row.get::<_, Option>(4)?, row.get::<_, Option>(5)?, row.get::<_, Option>(6)?, row.get::<_, Option>(7)?, row.get::<_, Option>(8)?, row.get::<_, Option>(9)?, row.get::<_, Option>(10)?, row.get::<_, Option>(11)?, row.get::<_, Option>(12)?, row.get::<_, Option>(13)?, row.get::<_, Option>(14)?, )) }, ) .ok(); let changed_semantics = match &existing { Some(( _id, body, note_type, resolved, resolved_by, pos_old_path, pos_new_path, pos_old_line, pos_new_line, pos_type, pos_range_start, pos_range_end, pos_base_sha, pos_start_sha, pos_head_sha, )) => { *body != note.body || *note_type != note.note_type || *resolved != note.resolved || *resolved_by != note.resolved_by || *pos_old_path != note.position_old_path || *pos_new_path != note.position_new_path || *pos_old_line != note.position_old_line || *pos_new_line != note.position_new_line || *pos_type != note.position_type || *pos_range_start != note.position_line_range_start || *pos_range_end != note.position_line_range_end || *pos_base_sha != note.position_base_sha || *pos_start_sha != note.position_start_sha || *pos_head_sha != note.position_head_sha } None => true, }; conn.execute( "INSERT INTO notes ( gitlab_id, discussion_id, project_id, note_type, is_system, author_id, author_username, body, created_at, updated_at, last_seen_at, position, resolvable, resolved, resolved_by, resolved_at, position_old_path, position_new_path, position_old_line, position_new_line, position_type, position_line_range_start, position_line_range_end, position_base_sha, position_start_sha, position_head_sha, raw_payload_id ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22, ?23, ?24, ?25, ?26, ?27) ON CONFLICT(gitlab_id) DO UPDATE SET body = excluded.body, note_type = excluded.note_type, author_id = excluded.author_id, updated_at = excluded.updated_at, last_seen_at = excluded.last_seen_at, resolvable = excluded.resolvable, resolved = excluded.resolved, resolved_by = excluded.resolved_by, resolved_at = excluded.resolved_at, position_old_path = excluded.position_old_path, position_new_path = excluded.position_new_path, position_old_line = excluded.position_old_line, position_new_line = excluded.position_new_line, position_type = excluded.position_type, position_line_range_start = excluded.position_line_range_start, position_line_range_end = excluded.position_line_range_end, position_base_sha = excluded.position_base_sha, position_start_sha = excluded.position_start_sha, position_head_sha = excluded.position_head_sha, raw_payload_id = COALESCE(excluded.raw_payload_id, raw_payload_id)", params![ note.gitlab_id, discussion_id, note.project_id, ¬e.note_type, note.is_system, note.author_id, ¬e.author_username, ¬e.body, note.created_at, note.updated_at, last_seen_at, note.position, note.resolvable, note.resolved, ¬e.resolved_by, note.resolved_at, ¬e.position_old_path, ¬e.position_new_path, note.position_old_line, note.position_new_line, ¬e.position_type, note.position_line_range_start, note.position_line_range_end, ¬e.position_base_sha, ¬e.position_start_sha, ¬e.position_head_sha, payload_id, ], )?; let local_note_id: i64 = conn.query_row( "SELECT id FROM notes WHERE gitlab_id = ?", params![note.gitlab_id], |row| row.get(0), )?; Ok(NoteUpsertOutcome { local_note_id, changed_semantics, }) } fn sweep_stale_issue_notes( conn: &Connection, discussion_id: i64, last_seen_at: i64, ) -> Result { // Step 1: Delete note documents for stale notes conn.execute( "DELETE FROM documents WHERE source_type = 'note' AND source_id IN (SELECT id FROM notes WHERE discussion_id = ?1 AND last_seen_at < ?2 AND is_system = 0)", params![discussion_id, last_seen_at], )?; // Step 2: Delete dirty_sources entries for stale notes conn.execute( "DELETE FROM dirty_sources WHERE source_type = 'note' AND source_id IN (SELECT id FROM notes WHERE discussion_id = ?1 AND last_seen_at < ?2 AND is_system = 0)", params![discussion_id, last_seen_at], )?; // Step 3: Delete the stale notes themselves let deleted = conn.execute( "DELETE FROM notes WHERE discussion_id = ?1 AND last_seen_at < ?2", params![discussion_id, last_seen_at], )?; if deleted > 0 { debug!(discussion_id, deleted, "Swept stale issue notes"); } Ok(deleted) } fn remove_stale_discussions( conn: &Connection, issue_id: i64, seen_ids: &[String], ) -> Result { if seen_ids.is_empty() { let deleted = conn.execute("DELETE FROM discussions WHERE issue_id = ?", [issue_id])?; return Ok(deleted); } const CHUNK_SIZE: usize = 500; let total_deleted = if seen_ids.len() > CHUNK_SIZE { conn.execute( "CREATE TEMP TABLE IF NOT EXISTS _temp_seen_discussions (id TEXT PRIMARY KEY)", [], )?; conn.execute("DELETE FROM _temp_seen_discussions", [])?; for chunk in seen_ids.chunks(CHUNK_SIZE) { let placeholders: Vec<&str> = chunk.iter().map(|_| "(?)").collect(); let sql = format!( "INSERT OR IGNORE INTO _temp_seen_discussions (id) VALUES {}", placeholders.join(", ") ); let params: Vec<&dyn rusqlite::ToSql> = chunk.iter().map(|s| s as &dyn rusqlite::ToSql).collect(); conn.execute(&sql, params.as_slice())?; } let deleted = conn.execute( "DELETE FROM discussions WHERE issue_id = ?1 AND gitlab_discussion_id NOT IN (SELECT id FROM _temp_seen_discussions)", [issue_id], )?; conn.execute("DROP TABLE IF EXISTS _temp_seen_discussions", [])?; deleted } else { let placeholders: Vec<&str> = seen_ids.iter().map(|_| "?").collect(); let sql = format!( "DELETE FROM discussions WHERE issue_id = ?1 AND gitlab_discussion_id NOT IN ({})", placeholders.join(", ") ); let mut params: Vec> = vec![Box::new(issue_id)]; for id in seen_ids { params.push(Box::new(id.clone())); } let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect(); conn.execute(&sql, param_refs.as_slice())? }; Ok(total_deleted) } fn update_issue_sync_timestamp(conn: &Connection, issue_id: i64, updated_at: i64) -> Result<()> { conn.execute( "UPDATE issues SET discussions_synced_for_updated_at = ? WHERE id = ?", (updated_at, issue_id), )?; Ok(()) } #[cfg(test)] #[path = "discussions_tests.rs"] mod tests;