//! Issue ingestion with cursor-based incremental sync. //! //! Fetches issues from GitLab and stores them locally with: //! - Cursor-based pagination for incremental sync //! - Raw payload storage with deduplication //! - Label extraction and stale-link removal //! - Milestone normalization with dedicated table //! - Tracking of issues needing discussion sync use std::ops::Deref; use futures::StreamExt; use rusqlite::{Connection, Transaction}; use tracing::{debug, info, warn}; use crate::Config; use crate::core::error::{LoreError, Result}; use crate::core::payloads::{StorePayloadOptions, store_payload}; use crate::core::time::now_ms; use crate::documents::SourceType; use crate::ingestion::dirty_tracker; use crate::gitlab::GitLabClient; use crate::gitlab::transformers::{MilestoneRow, transform_issue}; use crate::gitlab::types::GitLabIssue; /// Result of issue ingestion. #[derive(Debug, Default)] pub struct IngestIssuesResult { pub fetched: usize, pub upserted: usize, pub labels_created: usize, pub issues_needing_discussion_sync: Vec, } /// Issue that needs discussion sync. #[derive(Debug, Clone)] pub struct IssueForDiscussionSync { pub local_issue_id: i64, pub iid: i64, pub updated_at: i64, // ms epoch } /// Cursor state for incremental sync. #[derive(Debug, Default)] struct SyncCursor { updated_at_cursor: Option, tie_breaker_id: Option, } /// Ingest issues for a project. pub async fn ingest_issues( conn: &Connection, client: &GitLabClient, config: &Config, project_id: i64, // Local DB project ID gitlab_project_id: i64, // GitLab project ID ) -> Result { let mut result = IngestIssuesResult::default(); // 1. Get current cursor let cursor = get_sync_cursor(conn, project_id)?; debug!(?cursor, "Starting issue ingestion with cursor"); // 2. Stream issues with cursor rewind let mut issues_stream = client.paginate_issues( gitlab_project_id, cursor.updated_at_cursor, config.sync.cursor_rewind_seconds, ); let mut batch_count = 0; let mut last_updated_at: Option = None; let mut last_gitlab_id: Option = None; // 3. Process each issue while let Some(issue_result) = issues_stream.next().await { let issue = issue_result?; result.fetched += 1; // Parse timestamp early - skip issues with invalid timestamps let issue_updated_at = match parse_timestamp(&issue.updated_at) { Ok(ts) => ts, Err(e) => { warn!( gitlab_id = issue.id, error = %e, "Skipping issue with invalid timestamp" ); continue; } }; // Apply local cursor filter (skip already-processed due to rewind overlap) if !passes_cursor_filter_with_ts(issue.id, issue_updated_at, &cursor) { debug!(gitlab_id = issue.id, "Skipping already-processed issue"); continue; } // Transform and store let labels_created = process_single_issue(conn, config, project_id, &issue)?; result.upserted += 1; result.labels_created += labels_created; // Track cursor position (use already-parsed timestamp) last_updated_at = Some(issue_updated_at); last_gitlab_id = Some(issue.id); batch_count += 1; // Incremental cursor update every 100 issues if batch_count % 100 == 0 && let (Some(ts), Some(id)) = (last_updated_at, last_gitlab_id) { update_sync_cursor(conn, project_id, ts, id)?; debug!(batch_count, "Incremental cursor update"); } } // 4. Final cursor update if let (Some(ts), Some(id)) = (last_updated_at, last_gitlab_id) { update_sync_cursor(conn, project_id, ts, id)?; } else if result.fetched == 0 && cursor.updated_at_cursor.is_some() { // No new issues returned, but we have an existing cursor. // Update sync_attempted_at to track that we checked (useful for monitoring) // The cursor itself stays the same since there's nothing newer to advance to. debug!("No new issues found, cursor unchanged"); } // 5. Find issues needing discussion sync result.issues_needing_discussion_sync = get_issues_needing_discussion_sync(conn, project_id)?; info!( fetched = result.fetched, upserted = result.upserted, labels_created = result.labels_created, needing_sync = result.issues_needing_discussion_sync.len(), "Issue ingestion complete" ); Ok(result) } /// Check if an issue passes the cursor filter (not already processed). /// Takes pre-parsed timestamp to avoid redundant parsing. fn passes_cursor_filter_with_ts(gitlab_id: i64, issue_ts: i64, cursor: &SyncCursor) -> bool { let Some(cursor_ts) = cursor.updated_at_cursor else { return true; // No cursor = fetch all }; if issue_ts < cursor_ts { return false; } if issue_ts == cursor_ts && let Some(cursor_id) = cursor.tie_breaker_id && gitlab_id <= cursor_id { return false; } true } // Keep the original function for backward compatibility with tests /// Check if an issue passes the cursor filter (not already processed). #[cfg(test)] fn passes_cursor_filter(issue: &GitLabIssue, cursor: &SyncCursor) -> Result { let Some(cursor_ts) = cursor.updated_at_cursor else { return Ok(true); // No cursor = fetch all }; let issue_ts = parse_timestamp(&issue.updated_at)?; if issue_ts < cursor_ts { return Ok(false); } if issue_ts == cursor_ts { if let Some(cursor_id) = cursor.tie_breaker_id { if issue.id <= cursor_id { return Ok(false); } } } Ok(true) } /// Process a single issue: store payload, upsert issue, handle labels. /// All operations are wrapped in a transaction for atomicity. fn process_single_issue( conn: &Connection, config: &Config, project_id: i64, issue: &GitLabIssue, ) -> Result { let now = now_ms(); // Transform issue first (outside transaction - no DB access) let payload_json = serde_json::to_value(issue)?; let transformed = transform_issue(issue.clone())?; let issue_row = &transformed.issue; // Wrap all DB operations in a transaction for atomicity let tx = conn.unchecked_transaction()?; let labels_created = process_issue_in_transaction( &tx, config, project_id, issue, &payload_json, issue_row, &transformed.label_names, &transformed.assignee_usernames, transformed.milestone.as_ref(), now, )?; tx.commit()?; Ok(labels_created) } /// Inner function that performs all DB operations within a transaction. #[allow(clippy::too_many_arguments)] fn process_issue_in_transaction( tx: &Transaction<'_>, config: &Config, project_id: i64, issue: &GitLabIssue, payload_json: &serde_json::Value, issue_row: &crate::gitlab::transformers::IssueRow, label_names: &[String], assignee_usernames: &[String], milestone: Option<&MilestoneRow>, now: i64, ) -> Result { let mut labels_created = 0; // Store raw payload (deref Transaction to Connection for store_payload) let payload_id = store_payload( tx.deref(), StorePayloadOptions { project_id: Some(project_id), resource_type: "issue", gitlab_id: &issue.id.to_string(), payload: payload_json, compress: config.storage.compress_raw_payloads, }, )?; // Upsert milestone if present, get local ID let milestone_id: Option = if let Some(m) = milestone { Some(upsert_milestone_tx(tx, project_id, m)?) } else { None }; // Upsert issue (including new fields: due_date, milestone_id, milestone_title) tx.execute( "INSERT INTO issues ( gitlab_id, project_id, iid, title, description, state, author_username, created_at, updated_at, last_seen_at, web_url, due_date, milestone_id, milestone_title, raw_payload_id ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15) ON CONFLICT(gitlab_id) DO UPDATE SET title = excluded.title, description = excluded.description, state = excluded.state, author_username = excluded.author_username, updated_at = excluded.updated_at, last_seen_at = excluded.last_seen_at, web_url = excluded.web_url, due_date = excluded.due_date, milestone_id = excluded.milestone_id, milestone_title = excluded.milestone_title, raw_payload_id = excluded.raw_payload_id", ( issue_row.gitlab_id, project_id, issue_row.iid, &issue_row.title, &issue_row.description, &issue_row.state, &issue_row.author_username, issue_row.created_at, issue_row.updated_at, now, &issue_row.web_url, &issue_row.due_date, milestone_id, &issue_row.milestone_title, payload_id, ), )?; // Get local issue ID let local_issue_id: i64 = tx.query_row( "SELECT id FROM issues WHERE project_id = ? AND iid = ?", (project_id, issue_row.iid), |row| row.get(0), )?; // Mark dirty for document regeneration (inside transaction) dirty_tracker::mark_dirty_tx(tx, SourceType::Issue, local_issue_id)?; // Clear existing label links (stale removal) tx.execute( "DELETE FROM issue_labels WHERE issue_id = ?", [local_issue_id], )?; // Upsert labels and create links for label_name in label_names { let label_id = upsert_label_tx(tx, project_id, label_name, &mut labels_created)?; link_issue_label_tx(tx, local_issue_id, label_id)?; } // Clear existing assignee links (stale removal) tx.execute( "DELETE FROM issue_assignees WHERE issue_id = ?", [local_issue_id], )?; // Insert assignees for username in assignee_usernames { tx.execute( "INSERT OR IGNORE INTO issue_assignees (issue_id, username) VALUES (?, ?)", (local_issue_id, username), )?; } Ok(labels_created) } /// Upsert a label within a transaction, returning its ID. fn upsert_label_tx( tx: &Transaction<'_>, project_id: i64, name: &str, created_count: &mut usize, ) -> Result { // Try to get existing let existing: Option = tx .query_row( "SELECT id FROM labels WHERE project_id = ? AND name = ?", (project_id, name), |row| row.get(0), ) .ok(); if let Some(id) = existing { return Ok(id); } // Insert new tx.execute( "INSERT INTO labels (project_id, name) VALUES (?, ?)", (project_id, name), )?; *created_count += 1; Ok(tx.last_insert_rowid()) } /// Link an issue to a label within a transaction. fn link_issue_label_tx(tx: &Transaction<'_>, issue_id: i64, label_id: i64) -> Result<()> { tx.execute( "INSERT OR IGNORE INTO issue_labels (issue_id, label_id) VALUES (?, ?)", (issue_id, label_id), )?; Ok(()) } /// Upsert a milestone within a transaction, returning its local ID. fn upsert_milestone_tx( tx: &Transaction<'_>, project_id: i64, milestone: &MilestoneRow, ) -> Result { tx.execute( "INSERT INTO milestones (gitlab_id, project_id, iid, title, description, state, due_date, web_url) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) ON CONFLICT(project_id, gitlab_id) DO UPDATE SET iid = excluded.iid, title = excluded.title, description = excluded.description, state = excluded.state, due_date = excluded.due_date, web_url = excluded.web_url", ( milestone.gitlab_id, project_id, milestone.iid, &milestone.title, &milestone.description, &milestone.state, &milestone.due_date, &milestone.web_url, ), )?; // Get the local ID (whether inserted or updated) let local_id: i64 = tx.query_row( "SELECT id FROM milestones WHERE project_id = ? AND gitlab_id = ?", (project_id, milestone.gitlab_id), |row| row.get(0), )?; Ok(local_id) } /// Get the current sync cursor for issues. fn get_sync_cursor(conn: &Connection, project_id: i64) -> Result { let row: Option<(Option, Option)> = conn .query_row( "SELECT updated_at_cursor, tie_breaker_id FROM sync_cursors WHERE project_id = ? AND resource_type = 'issues'", [project_id], |row| Ok((row.get(0)?, row.get(1)?)), ) .ok(); Ok(match row { Some((updated_at, tie_breaker)) => SyncCursor { updated_at_cursor: updated_at, tie_breaker_id: tie_breaker, }, None => SyncCursor::default(), }) } /// Update the sync cursor. fn update_sync_cursor( conn: &Connection, project_id: i64, updated_at: i64, gitlab_id: i64, ) -> Result<()> { conn.execute( "INSERT INTO sync_cursors (project_id, resource_type, updated_at_cursor, tie_breaker_id) VALUES (?1, 'issues', ?2, ?3) ON CONFLICT(project_id, resource_type) DO UPDATE SET updated_at_cursor = excluded.updated_at_cursor, tie_breaker_id = excluded.tie_breaker_id", (project_id, updated_at, gitlab_id), )?; Ok(()) } /// Get issues that need discussion sync (updated_at > discussions_synced_for_updated_at). fn get_issues_needing_discussion_sync( conn: &Connection, project_id: i64, ) -> Result> { let mut stmt = conn.prepare( "SELECT id, iid, updated_at FROM issues WHERE project_id = ? AND updated_at > COALESCE(discussions_synced_for_updated_at, 0)", )?; let issues: std::result::Result, _> = stmt .query_map([project_id], |row| { Ok(IssueForDiscussionSync { local_issue_id: row.get(0)?, iid: row.get(1)?, updated_at: row.get(2)?, }) })? .collect(); Ok(issues?) } /// Parse ISO 8601 timestamp to milliseconds. /// Returns an error if parsing fails instead of silently returning 0. fn parse_timestamp(ts: &str) -> Result { chrono::DateTime::parse_from_rfc3339(ts) .map(|dt| dt.timestamp_millis()) .map_err(|e| LoreError::Other(format!("Failed to parse timestamp '{}': {}", ts, e))) } #[cfg(test)] mod tests { use super::*; use crate::gitlab::types::GitLabAuthor; fn make_test_issue(id: i64, updated_at: &str) -> GitLabIssue { GitLabIssue { id, iid: id, project_id: 100, title: format!("Issue {}", id), description: None, state: "opened".to_string(), created_at: "2024-01-01T00:00:00.000Z".to_string(), updated_at: updated_at.to_string(), closed_at: None, author: GitLabAuthor { id: 1, username: "test".to_string(), name: "Test".to_string(), }, assignees: vec![], labels: vec![], milestone: None, due_date: None, web_url: "https://example.com".to_string(), } } #[test] fn cursor_filter_allows_newer_issues() { let cursor = SyncCursor { updated_at_cursor: Some(1705312800000), // 2024-01-15T10:00:00Z tie_breaker_id: Some(100), }; // Issue with later timestamp passes let issue = make_test_issue(101, "2024-01-16T10:00:00.000Z"); assert!(passes_cursor_filter(&issue, &cursor).unwrap_or(false)); } #[test] fn cursor_filter_blocks_older_issues() { let cursor = SyncCursor { updated_at_cursor: Some(1705312800000), tie_breaker_id: Some(100), }; // Issue with earlier timestamp blocked let issue = make_test_issue(99, "2024-01-14T10:00:00.000Z"); assert!(!passes_cursor_filter(&issue, &cursor).unwrap_or(true)); } #[test] fn cursor_filter_uses_tie_breaker_for_same_timestamp() { let cursor = SyncCursor { updated_at_cursor: Some(1705312800000), tie_breaker_id: Some(100), }; // Same timestamp, higher ID passes let issue1 = make_test_issue(101, "2024-01-15T10:00:00.000Z"); assert!(passes_cursor_filter(&issue1, &cursor).unwrap_or(false)); // Same timestamp, same ID blocked let issue2 = make_test_issue(100, "2024-01-15T10:00:00.000Z"); assert!(!passes_cursor_filter(&issue2, &cursor).unwrap_or(true)); // Same timestamp, lower ID blocked let issue3 = make_test_issue(99, "2024-01-15T10:00:00.000Z"); assert!(!passes_cursor_filter(&issue3, &cursor).unwrap_or(true)); } #[test] fn cursor_filter_allows_all_when_no_cursor() { let cursor = SyncCursor::default(); let issue = make_test_issue(1, "2020-01-01T00:00:00.000Z"); assert!(passes_cursor_filter(&issue, &cursor).unwrap_or(false)); } }