diff --git a/src/ingestion/discussions.rs b/src/ingestion/discussions.rs new file mode 100644 index 0000000..62ab3d8 --- /dev/null +++ b/src/ingestion/discussions.rs @@ -0,0 +1,374 @@ +//! Discussion ingestion with full-refresh strategy. +//! +//! Fetches discussions for an issue and stores them locally with: +//! - Raw payload storage with deduplication +//! - Full discussion and note replacement per issue +//! - Sync timestamp tracking per issue +//! - Safe stale removal only after successful pagination + +use futures::StreamExt; +use rusqlite::Connection; +use tracing::{debug, info, warn}; + +use crate::Config; +use crate::core::error::Result; +use crate::core::payloads::{StorePayloadOptions, store_payload}; +use crate::gitlab::GitLabClient; +use crate::gitlab::transformers::{transform_discussion, transform_notes}; + +use super::issues::IssueForDiscussionSync; + +/// Result of discussion ingestion for a single issue. +#[derive(Debug, Default)] +pub struct IngestDiscussionsResult { + pub discussions_fetched: usize, + pub discussions_upserted: usize, + pub notes_upserted: usize, + pub stale_discussions_removed: usize, +} + +/// Ingest discussions for a list of issues that need sync. +pub async fn ingest_issue_discussions( + conn: &Connection, + client: &GitLabClient, + config: &Config, + gitlab_project_id: i64, + local_project_id: i64, + issues: &[IssueForDiscussionSync], +) -> Result { + let mut total_result = IngestDiscussionsResult::default(); + + for issue in issues { + let result = ingest_discussions_for_issue( + conn, + client, + config, + gitlab_project_id, + local_project_id, + issue, + ) + .await?; + + total_result.discussions_fetched += result.discussions_fetched; + total_result.discussions_upserted += result.discussions_upserted; + total_result.notes_upserted += result.notes_upserted; + total_result.stale_discussions_removed += result.stale_discussions_removed; + } + + info!( + issues_processed = issues.len(), + discussions_fetched = total_result.discussions_fetched, + discussions_upserted = total_result.discussions_upserted, + notes_upserted = total_result.notes_upserted, + stale_removed = total_result.stale_discussions_removed, + "Discussion ingestion complete" + ); + + Ok(total_result) +} + +/// Ingest discussions for a single issue. +async fn ingest_discussions_for_issue( + conn: &Connection, + client: &GitLabClient, + config: &Config, + gitlab_project_id: i64, + local_project_id: i64, + issue: &IssueForDiscussionSync, +) -> Result { + let mut result = IngestDiscussionsResult::default(); + + debug!( + issue_iid = issue.iid, + local_issue_id = issue.local_issue_id, + "Fetching discussions for issue" + ); + + // Stream discussions from GitLab + let mut discussions_stream = client.paginate_issue_discussions(gitlab_project_id, issue.iid); + + // Track discussions we've seen for stale removal + let mut seen_discussion_ids: Vec = Vec::new(); + // Track if we've started receiving data (to distinguish empty result from failure) + let mut received_first_response = false; + // Track if any error occurred during pagination + let mut pagination_error: Option = None; + + while let Some(disc_result) = discussions_stream.next().await { + // Mark that we've received at least one response from the API + if !received_first_response { + received_first_response = true; + } + + // Handle errors - record but don't delete stale data + let gitlab_discussion = match disc_result { + Ok(d) => d, + Err(e) => { + warn!( + issue_iid = issue.iid, + error = %e, + "Error during discussion pagination, skipping stale removal" + ); + pagination_error = Some(e); + break; + } + }; + result.discussions_fetched += 1; + + // Store raw payload + let payload_json = serde_json::to_value(&gitlab_discussion)?; + let payload_id = store_payload( + conn, + StorePayloadOptions { + project_id: Some(local_project_id), + resource_type: "discussion", + gitlab_id: &gitlab_discussion.id, + payload: &payload_json, + compress: config.storage.compress_raw_payloads, + }, + )?; + + // Transform and store discussion + let normalized = + transform_discussion(&gitlab_discussion, local_project_id, issue.local_issue_id); + + // Wrap all discussion+notes operations in a transaction for atomicity + let tx = conn.unchecked_transaction()?; + + upsert_discussion(&tx, &normalized, payload_id)?; + result.discussions_upserted += 1; + seen_discussion_ids.push(normalized.gitlab_discussion_id.clone()); + + // Get local discussion ID + let local_discussion_id: i64 = tx.query_row( + "SELECT id FROM discussions WHERE project_id = ? AND gitlab_discussion_id = ?", + (local_project_id, &normalized.gitlab_discussion_id), + |row| row.get(0), + )?; + + // Transform and store notes + let notes = transform_notes(&gitlab_discussion, local_project_id); + + // Delete existing notes for this discussion (full refresh) + tx.execute( + "DELETE FROM notes WHERE discussion_id = ?", + [local_discussion_id], + )?; + + for note in notes { + // Store raw note payload + let note_payload_json = serde_json::to_value( + gitlab_discussion + .notes + .iter() + .find(|n| n.id == note.gitlab_id), + )?; + let note_payload_id = store_payload( + &tx, + StorePayloadOptions { + project_id: Some(local_project_id), + resource_type: "note", + gitlab_id: ¬e.gitlab_id.to_string(), + payload: ¬e_payload_json, + compress: config.storage.compress_raw_payloads, + }, + )?; + + insert_note(&tx, local_discussion_id, ¬e, note_payload_id)?; + result.notes_upserted += 1; + } + + tx.commit()?; + } + + // Only remove stale discussions if pagination completed without errors + // AND we actually received a response (empty or not) + if pagination_error.is_none() && received_first_response { + let removed = remove_stale_discussions(conn, issue.local_issue_id, &seen_discussion_ids)?; + result.stale_discussions_removed = removed; + + // Update discussions_synced_for_updated_at on the issue + update_issue_sync_timestamp(conn, issue.local_issue_id, issue.updated_at)?; + } else if pagination_error.is_none() && !received_first_response && seen_discussion_ids.is_empty() { + // Stream was empty but no error - issue genuinely has no discussions + // This is safe to remove stale discussions (if any exist from before) + let removed = remove_stale_discussions(conn, issue.local_issue_id, &seen_discussion_ids)?; + result.stale_discussions_removed = removed; + + update_issue_sync_timestamp(conn, issue.local_issue_id, issue.updated_at)?; + } else if pagination_error.is_some() { + warn!( + issue_iid = issue.iid, + discussions_seen = seen_discussion_ids.len(), + "Skipping stale removal due to pagination error" + ); + // Return the error to signal incomplete sync + return Err(pagination_error.unwrap()); + } + + Ok(result) +} + +/// Upsert a discussion. +fn upsert_discussion( + conn: &Connection, + discussion: &crate::gitlab::transformers::NormalizedDiscussion, + payload_id: i64, +) -> Result<()> { + conn.execute( + "INSERT INTO discussions ( + gitlab_discussion_id, project_id, issue_id, noteable_type, + individual_note, first_note_at, last_note_at, last_seen_at, + resolvable, resolved, raw_payload_id + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11) + ON CONFLICT(project_id, gitlab_discussion_id) DO UPDATE SET + first_note_at = excluded.first_note_at, + last_note_at = excluded.last_note_at, + last_seen_at = excluded.last_seen_at, + resolvable = excluded.resolvable, + resolved = excluded.resolved, + raw_payload_id = excluded.raw_payload_id", + ( + &discussion.gitlab_discussion_id, + discussion.project_id, + discussion.issue_id, + &discussion.noteable_type, + discussion.individual_note, + discussion.first_note_at, + discussion.last_note_at, + discussion.last_seen_at, + discussion.resolvable, + discussion.resolved, + payload_id, + ), + )?; + Ok(()) +} + +/// Insert a note. +fn insert_note( + conn: &Connection, + discussion_id: i64, + note: &crate::gitlab::transformers::NormalizedNote, + payload_id: i64, +) -> Result<()> { + conn.execute( + "INSERT INTO notes ( + gitlab_id, discussion_id, project_id, note_type, is_system, + author_username, body, created_at, updated_at, last_seen_at, + position, resolvable, resolved, resolved_by, resolved_at, raw_payload_id + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)", + ( + note.gitlab_id, + discussion_id, + note.project_id, + ¬e.note_type, + note.is_system, + ¬e.author_username, + ¬e.body, + note.created_at, + note.updated_at, + note.last_seen_at, + note.position, + note.resolvable, + note.resolved, + ¬e.resolved_by, + note.resolved_at, + payload_id, + ), + )?; + Ok(()) +} + +/// Remove discussions that were not seen in this fetch (stale removal). +/// Chunks large sets to avoid SQL query size limits. +fn remove_stale_discussions( + conn: &Connection, + issue_id: i64, + seen_ids: &[String], +) -> Result { + if seen_ids.is_empty() { + // No discussions seen - remove all for this issue + let deleted = conn.execute("DELETE FROM discussions WHERE issue_id = ?", [issue_id])?; + return Ok(deleted); + } + + // SQLite has a limit of 999 variables per query by default + // Chunk the seen_ids to stay well under this limit + const CHUNK_SIZE: usize = 500; + + // For safety, use a temp table approach for large sets + let total_deleted = if seen_ids.len() > CHUNK_SIZE { + // Create temp table for seen IDs + conn.execute( + "CREATE TEMP TABLE IF NOT EXISTS _temp_seen_discussions (id TEXT PRIMARY KEY)", + [], + )?; + + // Clear any previous data + conn.execute("DELETE FROM _temp_seen_discussions", [])?; + + // Insert seen IDs in chunks + for chunk in seen_ids.chunks(CHUNK_SIZE) { + let placeholders: Vec<&str> = chunk.iter().map(|_| "(?)").collect(); + let sql = format!( + "INSERT OR IGNORE INTO _temp_seen_discussions (id) VALUES {}", + placeholders.join(", ") + ); + + let params: Vec<&dyn rusqlite::ToSql> = chunk.iter().map(|s| s as &dyn rusqlite::ToSql).collect(); + conn.execute(&sql, params.as_slice())?; + } + + // Delete discussions not in temp table + let deleted = conn.execute( + "DELETE FROM discussions + WHERE issue_id = ?1 + AND gitlab_discussion_id NOT IN (SELECT id FROM _temp_seen_discussions)", + [issue_id], + )?; + + // Clean up temp table + conn.execute("DROP TABLE IF EXISTS _temp_seen_discussions", [])?; + deleted + } else { + // Small set - use simple IN clause + let placeholders: Vec<&str> = seen_ids.iter().map(|_| "?").collect(); + let sql = format!( + "DELETE FROM discussions WHERE issue_id = ?1 AND gitlab_discussion_id NOT IN ({})", + placeholders.join(", ") + ); + + let mut params: Vec> = vec![Box::new(issue_id)]; + for id in seen_ids { + params.push(Box::new(id.clone())); + } + + let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect(); + conn.execute(&sql, param_refs.as_slice())? + }; + + Ok(total_deleted) +} + +/// Update the discussions_synced_for_updated_at timestamp on an issue. +fn update_issue_sync_timestamp(conn: &Connection, issue_id: i64, updated_at: i64) -> Result<()> { + conn.execute( + "UPDATE issues SET discussions_synced_for_updated_at = ? WHERE id = ?", + (updated_at, issue_id), + )?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn result_default_has_zero_counts() { + let result = IngestDiscussionsResult::default(); + assert_eq!(result.discussions_fetched, 0); + assert_eq!(result.discussions_upserted, 0); + assert_eq!(result.notes_upserted, 0); + } +} diff --git a/src/ingestion/issues.rs b/src/ingestion/issues.rs new file mode 100644 index 0000000..6a2fb99 --- /dev/null +++ b/src/ingestion/issues.rs @@ -0,0 +1,552 @@ +//! Issue ingestion with cursor-based incremental sync. +//! +//! Fetches issues from GitLab and stores them locally with: +//! - Cursor-based pagination for incremental sync +//! - Raw payload storage with deduplication +//! - Label extraction and stale-link removal +//! - Milestone normalization with dedicated table +//! - Tracking of issues needing discussion sync + +use std::ops::Deref; + +use futures::StreamExt; +use rusqlite::{Connection, Transaction}; +use tracing::{debug, info, warn}; + +use crate::Config; +use crate::core::error::{GiError, Result}; +use crate::core::payloads::{StorePayloadOptions, store_payload}; +use crate::core::time::now_ms; +use crate::gitlab::GitLabClient; +use crate::gitlab::transformers::{MilestoneRow, transform_issue}; +use crate::gitlab::types::GitLabIssue; + +/// Result of issue ingestion. +#[derive(Debug, Default)] +pub struct IngestIssuesResult { + pub fetched: usize, + pub upserted: usize, + pub labels_created: usize, + pub issues_needing_discussion_sync: Vec, +} + +/// Issue that needs discussion sync. +#[derive(Debug, Clone)] +pub struct IssueForDiscussionSync { + pub local_issue_id: i64, + pub iid: i64, + pub updated_at: i64, // ms epoch +} + +/// Cursor state for incremental sync. +#[derive(Debug, Default)] +struct SyncCursor { + updated_at_cursor: Option, + tie_breaker_id: Option, +} + +/// Ingest issues for a project. +pub async fn ingest_issues( + conn: &Connection, + client: &GitLabClient, + config: &Config, + project_id: i64, // Local DB project ID + gitlab_project_id: i64, // GitLab project ID +) -> Result { + let mut result = IngestIssuesResult::default(); + + // 1. Get current cursor + let cursor = get_sync_cursor(conn, project_id)?; + debug!(?cursor, "Starting issue ingestion with cursor"); + + // 2. Stream issues with cursor rewind + let mut issues_stream = client.paginate_issues( + gitlab_project_id, + cursor.updated_at_cursor, + config.sync.cursor_rewind_seconds, + ); + + let mut batch_count = 0; + let mut last_updated_at: Option = None; + let mut last_gitlab_id: Option = None; + + // 3. Process each issue + while let Some(issue_result) = issues_stream.next().await { + let issue = issue_result?; + result.fetched += 1; + + // Parse timestamp early - skip issues with invalid timestamps + let issue_updated_at = match parse_timestamp(&issue.updated_at) { + Ok(ts) => ts, + Err(e) => { + warn!( + gitlab_id = issue.id, + error = %e, + "Skipping issue with invalid timestamp" + ); + continue; + } + }; + + // Apply local cursor filter (skip already-processed due to rewind overlap) + if !passes_cursor_filter_with_ts(issue.id, issue_updated_at, &cursor) { + debug!(gitlab_id = issue.id, "Skipping already-processed issue"); + continue; + } + + // Transform and store + let labels_created = process_single_issue(conn, config, project_id, &issue)?; + result.upserted += 1; + result.labels_created += labels_created; + + // Track cursor position (use already-parsed timestamp) + last_updated_at = Some(issue_updated_at); + last_gitlab_id = Some(issue.id); + batch_count += 1; + + // Incremental cursor update every 100 issues + if batch_count % 100 == 0 + && let (Some(ts), Some(id)) = (last_updated_at, last_gitlab_id) + { + update_sync_cursor(conn, project_id, ts, id)?; + debug!(batch_count, "Incremental cursor update"); + } + } + + // 4. Final cursor update + if let (Some(ts), Some(id)) = (last_updated_at, last_gitlab_id) { + update_sync_cursor(conn, project_id, ts, id)?; + } else if result.fetched == 0 && cursor.updated_at_cursor.is_some() { + // No new issues returned, but we have an existing cursor. + // Update sync_attempted_at to track that we checked (useful for monitoring) + // The cursor itself stays the same since there's nothing newer to advance to. + debug!("No new issues found, cursor unchanged"); + } + + // 5. Find issues needing discussion sync + result.issues_needing_discussion_sync = get_issues_needing_discussion_sync(conn, project_id)?; + + info!( + fetched = result.fetched, + upserted = result.upserted, + labels_created = result.labels_created, + needing_sync = result.issues_needing_discussion_sync.len(), + "Issue ingestion complete" + ); + + Ok(result) +} + +/// Check if an issue passes the cursor filter (not already processed). +/// Takes pre-parsed timestamp to avoid redundant parsing. +fn passes_cursor_filter_with_ts(gitlab_id: i64, issue_ts: i64, cursor: &SyncCursor) -> bool { + let Some(cursor_ts) = cursor.updated_at_cursor else { + return true; // No cursor = fetch all + }; + + if issue_ts < cursor_ts { + return false; + } + + if issue_ts == cursor_ts { + if let Some(cursor_id) = cursor.tie_breaker_id { + if gitlab_id <= cursor_id { + return false; + } + } + } + + true +} + +// Keep the original function for backward compatibility with tests +/// Check if an issue passes the cursor filter (not already processed). +#[cfg(test)] +fn passes_cursor_filter(issue: &GitLabIssue, cursor: &SyncCursor) -> Result { + let Some(cursor_ts) = cursor.updated_at_cursor else { + return Ok(true); // No cursor = fetch all + }; + + let issue_ts = parse_timestamp(&issue.updated_at)?; + + if issue_ts < cursor_ts { + return Ok(false); + } + + if issue_ts == cursor_ts { + if let Some(cursor_id) = cursor.tie_breaker_id { + if issue.id <= cursor_id { + return Ok(false); + } + } + } + + Ok(true) +} + +/// Process a single issue: store payload, upsert issue, handle labels. +/// All operations are wrapped in a transaction for atomicity. +fn process_single_issue( + conn: &Connection, + config: &Config, + project_id: i64, + issue: &GitLabIssue, +) -> Result { + let now = now_ms(); + + // Transform issue first (outside transaction - no DB access) + let payload_json = serde_json::to_value(issue)?; + let transformed = transform_issue(issue.clone())?; + let issue_row = &transformed.issue; + + // Wrap all DB operations in a transaction for atomicity + let tx = conn.unchecked_transaction()?; + let labels_created = process_issue_in_transaction( + &tx, + config, + project_id, + issue, + &payload_json, + issue_row, + &transformed.label_names, + &transformed.assignee_usernames, + transformed.milestone.as_ref(), + now, + )?; + tx.commit()?; + + Ok(labels_created) +} + +/// Inner function that performs all DB operations within a transaction. +fn process_issue_in_transaction( + tx: &Transaction<'_>, + config: &Config, + project_id: i64, + issue: &GitLabIssue, + payload_json: &serde_json::Value, + issue_row: &crate::gitlab::transformers::IssueRow, + label_names: &[String], + assignee_usernames: &[String], + milestone: Option<&MilestoneRow>, + now: i64, +) -> Result { + let mut labels_created = 0; + + // Store raw payload (deref Transaction to Connection for store_payload) + let payload_id = store_payload( + tx.deref(), + StorePayloadOptions { + project_id: Some(project_id), + resource_type: "issue", + gitlab_id: &issue.id.to_string(), + payload: payload_json, + compress: config.storage.compress_raw_payloads, + }, + )?; + + // Upsert milestone if present, get local ID + let milestone_id: Option = if let Some(m) = milestone { + Some(upsert_milestone_tx(tx, project_id, m)?) + } else { + None + }; + + // Upsert issue (including new fields: due_date, milestone_id, milestone_title) + tx.execute( + "INSERT INTO issues ( + gitlab_id, project_id, iid, title, description, state, + author_username, created_at, updated_at, last_seen_at, web_url, + due_date, milestone_id, milestone_title, raw_payload_id + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15) + ON CONFLICT(gitlab_id) DO UPDATE SET + title = excluded.title, + description = excluded.description, + state = excluded.state, + author_username = excluded.author_username, + updated_at = excluded.updated_at, + last_seen_at = excluded.last_seen_at, + web_url = excluded.web_url, + due_date = excluded.due_date, + milestone_id = excluded.milestone_id, + milestone_title = excluded.milestone_title, + raw_payload_id = excluded.raw_payload_id", + ( + issue_row.gitlab_id, + project_id, + issue_row.iid, + &issue_row.title, + &issue_row.description, + &issue_row.state, + &issue_row.author_username, + issue_row.created_at, + issue_row.updated_at, + now, + &issue_row.web_url, + &issue_row.due_date, + milestone_id, + &issue_row.milestone_title, + payload_id, + ), + )?; + + // Get local issue ID + let local_issue_id: i64 = tx.query_row( + "SELECT id FROM issues WHERE project_id = ? AND iid = ?", + (project_id, issue_row.iid), + |row| row.get(0), + )?; + + // Clear existing label links (stale removal) + tx.execute( + "DELETE FROM issue_labels WHERE issue_id = ?", + [local_issue_id], + )?; + + // Upsert labels and create links + for label_name in label_names { + let label_id = upsert_label_tx(tx, project_id, label_name, &mut labels_created)?; + link_issue_label_tx(tx, local_issue_id, label_id)?; + } + + // Clear existing assignee links (stale removal) + tx.execute( + "DELETE FROM issue_assignees WHERE issue_id = ?", + [local_issue_id], + )?; + + // Insert assignees + for username in assignee_usernames { + tx.execute( + "INSERT OR IGNORE INTO issue_assignees (issue_id, username) VALUES (?, ?)", + (local_issue_id, username), + )?; + } + + Ok(labels_created) +} + +/// Upsert a label within a transaction, returning its ID. +fn upsert_label_tx( + tx: &Transaction<'_>, + project_id: i64, + name: &str, + created_count: &mut usize, +) -> Result { + // Try to get existing + let existing: Option = tx + .query_row( + "SELECT id FROM labels WHERE project_id = ? AND name = ?", + (project_id, name), + |row| row.get(0), + ) + .ok(); + + if let Some(id) = existing { + return Ok(id); + } + + // Insert new + tx.execute( + "INSERT INTO labels (project_id, name) VALUES (?, ?)", + (project_id, name), + )?; + *created_count += 1; + + Ok(tx.last_insert_rowid()) +} + +/// Link an issue to a label within a transaction. +fn link_issue_label_tx(tx: &Transaction<'_>, issue_id: i64, label_id: i64) -> Result<()> { + tx.execute( + "INSERT OR IGNORE INTO issue_labels (issue_id, label_id) VALUES (?, ?)", + (issue_id, label_id), + )?; + Ok(()) +} + +/// Upsert a milestone within a transaction, returning its local ID. +fn upsert_milestone_tx(tx: &Transaction<'_>, project_id: i64, milestone: &MilestoneRow) -> Result { + tx.execute( + "INSERT INTO milestones (gitlab_id, project_id, iid, title, description, state, due_date, web_url) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) + ON CONFLICT(project_id, gitlab_id) DO UPDATE SET + iid = excluded.iid, + title = excluded.title, + description = excluded.description, + state = excluded.state, + due_date = excluded.due_date, + web_url = excluded.web_url", + ( + milestone.gitlab_id, + project_id, + milestone.iid, + &milestone.title, + &milestone.description, + &milestone.state, + &milestone.due_date, + &milestone.web_url, + ), + )?; + + // Get the local ID (whether inserted or updated) + let local_id: i64 = tx.query_row( + "SELECT id FROM milestones WHERE project_id = ? AND gitlab_id = ?", + (project_id, milestone.gitlab_id), + |row| row.get(0), + )?; + + Ok(local_id) +} + +/// Get the current sync cursor for issues. +fn get_sync_cursor(conn: &Connection, project_id: i64) -> Result { + let row: Option<(Option, Option)> = conn + .query_row( + "SELECT updated_at_cursor, tie_breaker_id FROM sync_cursors + WHERE project_id = ? AND resource_type = 'issues'", + [project_id], + |row| Ok((row.get(0)?, row.get(1)?)), + ) + .ok(); + + Ok(match row { + Some((updated_at, tie_breaker)) => SyncCursor { + updated_at_cursor: updated_at, + tie_breaker_id: tie_breaker, + }, + None => SyncCursor::default(), + }) +} + +/// Update the sync cursor. +fn update_sync_cursor( + conn: &Connection, + project_id: i64, + updated_at: i64, + gitlab_id: i64, +) -> Result<()> { + conn.execute( + "INSERT INTO sync_cursors (project_id, resource_type, updated_at_cursor, tie_breaker_id) + VALUES (?1, 'issues', ?2, ?3) + ON CONFLICT(project_id, resource_type) DO UPDATE SET + updated_at_cursor = excluded.updated_at_cursor, + tie_breaker_id = excluded.tie_breaker_id", + (project_id, updated_at, gitlab_id), + )?; + Ok(()) +} + +/// Get issues that need discussion sync (updated_at > discussions_synced_for_updated_at). +fn get_issues_needing_discussion_sync( + conn: &Connection, + project_id: i64, +) -> Result> { + let mut stmt = conn.prepare( + "SELECT id, iid, updated_at FROM issues + WHERE project_id = ? + AND updated_at > COALESCE(discussions_synced_for_updated_at, 0)", + )?; + + let issues: std::result::Result, _> = stmt + .query_map([project_id], |row| { + Ok(IssueForDiscussionSync { + local_issue_id: row.get(0)?, + iid: row.get(1)?, + updated_at: row.get(2)?, + }) + })? + .collect(); + + Ok(issues?) +} + +/// Parse ISO 8601 timestamp to milliseconds. +/// Returns an error if parsing fails instead of silently returning 0. +fn parse_timestamp(ts: &str) -> Result { + chrono::DateTime::parse_from_rfc3339(ts) + .map(|dt| dt.timestamp_millis()) + .map_err(|e| GiError::Other(format!("Failed to parse timestamp '{}': {}", ts, e))) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::gitlab::types::GitLabAuthor; + + fn make_test_issue(id: i64, updated_at: &str) -> GitLabIssue { + GitLabIssue { + id, + iid: id, + project_id: 100, + title: format!("Issue {}", id), + description: None, + state: "opened".to_string(), + created_at: "2024-01-01T00:00:00.000Z".to_string(), + updated_at: updated_at.to_string(), + closed_at: None, + author: GitLabAuthor { + id: 1, + username: "test".to_string(), + name: "Test".to_string(), + }, + assignees: vec![], + labels: vec![], + milestone: None, + due_date: None, + web_url: "https://example.com".to_string(), + } + } + + #[test] + fn cursor_filter_allows_newer_issues() { + let cursor = SyncCursor { + updated_at_cursor: Some(1705312800000), // 2024-01-15T10:00:00Z + tie_breaker_id: Some(100), + }; + + // Issue with later timestamp passes + let issue = make_test_issue(101, "2024-01-16T10:00:00.000Z"); + assert!(passes_cursor_filter(&issue, &cursor).unwrap_or(false)); + } + + #[test] + fn cursor_filter_blocks_older_issues() { + let cursor = SyncCursor { + updated_at_cursor: Some(1705312800000), + tie_breaker_id: Some(100), + }; + + // Issue with earlier timestamp blocked + let issue = make_test_issue(99, "2024-01-14T10:00:00.000Z"); + assert!(!passes_cursor_filter(&issue, &cursor).unwrap_or(true)); + } + + #[test] + fn cursor_filter_uses_tie_breaker_for_same_timestamp() { + let cursor = SyncCursor { + updated_at_cursor: Some(1705312800000), + tie_breaker_id: Some(100), + }; + + // Same timestamp, higher ID passes + let issue1 = make_test_issue(101, "2024-01-15T10:00:00.000Z"); + assert!(passes_cursor_filter(&issue1, &cursor).unwrap_or(false)); + + // Same timestamp, same ID blocked + let issue2 = make_test_issue(100, "2024-01-15T10:00:00.000Z"); + assert!(!passes_cursor_filter(&issue2, &cursor).unwrap_or(true)); + + // Same timestamp, lower ID blocked + let issue3 = make_test_issue(99, "2024-01-15T10:00:00.000Z"); + assert!(!passes_cursor_filter(&issue3, &cursor).unwrap_or(true)); + } + + #[test] + fn cursor_filter_allows_all_when_no_cursor() { + let cursor = SyncCursor::default(); + + let issue = make_test_issue(1, "2020-01-01T00:00:00.000Z"); + assert!(passes_cursor_filter(&issue, &cursor).unwrap_or(false)); + } +} diff --git a/src/ingestion/mod.rs b/src/ingestion/mod.rs new file mode 100644 index 0000000..c5c8ab0 --- /dev/null +++ b/src/ingestion/mod.rs @@ -0,0 +1,15 @@ +//! Data ingestion modules for GitLab resources. +//! +//! This module handles fetching and storing issues, discussions, and notes +//! from GitLab with cursor-based incremental sync. + +pub mod discussions; +pub mod issues; +pub mod orchestrator; + +pub use discussions::{IngestDiscussionsResult, ingest_issue_discussions}; +pub use issues::{IngestIssuesResult, IssueForDiscussionSync, ingest_issues}; +pub use orchestrator::{ + IngestProjectResult, ProgressCallback, ProgressEvent, ingest_project_issues, + ingest_project_issues_with_progress, +}; diff --git a/src/ingestion/orchestrator.rs b/src/ingestion/orchestrator.rs new file mode 100644 index 0000000..35c7ffd --- /dev/null +++ b/src/ingestion/orchestrator.rs @@ -0,0 +1,212 @@ +//! Ingestion orchestrator: coordinates issue and discussion sync. +//! +//! Implements the CP1 canonical pattern: +//! 1. Fetch issues with cursor-based sync +//! 2. Identify issues needing discussion sync +//! 3. Execute discussion sync sequentially (rusqlite Connection is not Send) + +use rusqlite::Connection; +use tracing::info; + +use crate::Config; +use crate::core::error::Result; +use crate::gitlab::GitLabClient; + +use super::discussions::ingest_issue_discussions; +use super::issues::{IssueForDiscussionSync, ingest_issues}; + +/// Progress callback for ingestion operations. +pub type ProgressCallback = Box; + +/// Progress events emitted during ingestion. +#[derive(Debug, Clone)] +pub enum ProgressEvent { + /// Issue fetching started + IssuesFetchStarted, + /// An issue was fetched (current count) + IssueFetched { count: usize }, + /// Issue fetching complete + IssuesFetchComplete { total: usize }, + /// Discussion sync started (total issues to sync) + DiscussionSyncStarted { total: usize }, + /// Discussion synced for an issue (current/total) + DiscussionSynced { current: usize, total: usize }, + /// Discussion sync complete + DiscussionSyncComplete, +} + +/// Result of full project ingestion. +#[derive(Debug, Default)] +pub struct IngestProjectResult { + pub issues_fetched: usize, + pub issues_upserted: usize, + pub labels_created: usize, + pub discussions_fetched: usize, + pub discussions_upserted: usize, + pub notes_upserted: usize, + pub issues_synced_discussions: usize, + pub issues_skipped_discussion_sync: usize, +} + +/// Ingest all issues and their discussions for a project. +pub async fn ingest_project_issues( + conn: &Connection, + client: &GitLabClient, + config: &Config, + project_id: i64, + gitlab_project_id: i64, +) -> Result { + ingest_project_issues_with_progress(conn, client, config, project_id, gitlab_project_id, None) + .await +} + +/// Ingest all issues and their discussions for a project with progress reporting. +pub async fn ingest_project_issues_with_progress( + conn: &Connection, + client: &GitLabClient, + config: &Config, + project_id: i64, + gitlab_project_id: i64, + progress: Option, +) -> Result { + let mut result = IngestProjectResult::default(); + let emit = |event: ProgressEvent| { + if let Some(ref cb) = progress { + cb(event); + } + }; + + // Step 1: Ingest issues + let issue_result = ingest_issues(conn, client, config, project_id, gitlab_project_id).await?; + + result.issues_fetched = issue_result.fetched; + result.issues_upserted = issue_result.upserted; + result.labels_created = issue_result.labels_created; + + // Step 2: Sync discussions for issues that need it + let issues_needing_sync = issue_result.issues_needing_discussion_sync; + + // Query actual total issues for accurate skip count (issues_upserted only counts this run) + let total_issues: i64 = conn + .query_row( + "SELECT COUNT(*) FROM issues WHERE project_id = ?", + [project_id], + |row| row.get(0), + ) + .unwrap_or(0); + let total_issues = total_issues as usize; + result.issues_skipped_discussion_sync = total_issues.saturating_sub(issues_needing_sync.len()); + + if issues_needing_sync.is_empty() { + info!("No issues need discussion sync"); + return Ok(result); + } + + info!( + count = issues_needing_sync.len(), + "Starting discussion sync for issues" + ); + + emit(ProgressEvent::DiscussionSyncStarted { + total: issues_needing_sync.len(), + }); + + // Step 3: Execute sequential discussion sync (see function doc for why not concurrent) + let discussion_results = sync_discussions_sequential( + conn, + client, + config, + gitlab_project_id, + project_id, + &issues_needing_sync, + &progress, + ) + .await?; + + emit(ProgressEvent::DiscussionSyncComplete); + + // Aggregate discussion results + for disc_result in discussion_results { + result.discussions_fetched += disc_result.discussions_fetched; + result.discussions_upserted += disc_result.discussions_upserted; + result.notes_upserted += disc_result.notes_upserted; + result.issues_synced_discussions += 1; + } + + info!( + issues_fetched = result.issues_fetched, + issues_upserted = result.issues_upserted, + labels_created = result.labels_created, + discussions_fetched = result.discussions_fetched, + notes_upserted = result.notes_upserted, + issues_synced = result.issues_synced_discussions, + issues_skipped = result.issues_skipped_discussion_sync, + "Project ingestion complete" + ); + + Ok(result) +} + +/// Sync discussions sequentially for each issue. +/// +/// NOTE: Despite the config having `dependent_concurrency`, we process sequentially +/// because rusqlite's `Connection` is not `Send` and cannot be shared across tasks. +/// True concurrency would require connection pooling (r2d2, deadpool, etc.). +/// The batch_size from config is used for progress logging granularity. +async fn sync_discussions_sequential( + conn: &Connection, + client: &GitLabClient, + config: &Config, + gitlab_project_id: i64, + local_project_id: i64, + issues: &[IssueForDiscussionSync], + progress: &Option, +) -> Result> { + let batch_size = config.sync.dependent_concurrency as usize; + let total = issues.len(); + + let mut results = Vec::with_capacity(issues.len()); + + // Process in batches for progress feedback (actual processing is sequential) + for chunk in issues.chunks(batch_size) { + for issue in chunk { + let disc_result = ingest_issue_discussions( + conn, + client, + config, + gitlab_project_id, + local_project_id, + std::slice::from_ref(issue), + ) + .await?; + results.push(disc_result); + + // Emit progress + if let Some(cb) = progress { + cb(ProgressEvent::DiscussionSynced { + current: results.len(), + total, + }); + } + } + } + + Ok(results) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn result_default_has_zero_counts() { + let result = IngestProjectResult::default(); + assert_eq!(result.issues_fetched, 0); + assert_eq!(result.issues_upserted, 0); + assert_eq!(result.labels_created, 0); + assert_eq!(result.discussions_fetched, 0); + assert_eq!(result.notes_upserted, 0); + assert_eq!(result.issues_synced_discussions, 0); + assert_eq!(result.issues_skipped_discussion_sync, 0); + } +}