use std::ops::Deref; use futures::StreamExt; use rusqlite::{Connection, Transaction}; use tracing::{debug, warn}; use crate::Config; use crate::core::error::{LoreError, Result}; use crate::core::payloads::{StorePayloadOptions, store_payload}; use crate::core::shutdown::ShutdownSignal; use crate::core::time::now_ms; use crate::documents::SourceType; use crate::gitlab::GitLabClient; use crate::gitlab::transformers::{MilestoneRow, transform_issue}; use crate::gitlab::types::GitLabIssue; use crate::ingestion::dirty_tracker; #[derive(Debug, Default)] pub struct IngestIssuesResult { pub fetched: usize, pub upserted: usize, pub labels_created: usize, pub issues_needing_discussion_sync: Vec, } #[derive(Debug, Clone)] pub struct IssueForDiscussionSync { pub local_issue_id: i64, pub iid: i64, pub updated_at: i64, } #[derive(Debug, Default)] struct SyncCursor { updated_at_cursor: Option, tie_breaker_id: Option, } pub async fn ingest_issues( conn: &Connection, client: &GitLabClient, config: &Config, project_id: i64, gitlab_project_id: i64, signal: &ShutdownSignal, ) -> Result { let mut result = IngestIssuesResult::default(); let cursor = get_sync_cursor(conn, project_id)?; debug!(?cursor, "Starting issue ingestion with cursor"); let mut issues_stream = client.paginate_issues( gitlab_project_id, cursor.updated_at_cursor, config.sync.cursor_rewind_seconds, ); let mut batch_count = 0; let mut last_updated_at: Option = None; let mut last_gitlab_id: Option = None; while let Some(issue_result) = issues_stream.next().await { if signal.is_cancelled() { debug!("Issue ingestion interrupted by shutdown signal"); break; } let issue = issue_result?; result.fetched += 1; let issue_updated_at = match parse_timestamp(&issue.updated_at) { Ok(ts) => ts, Err(e) => { warn!( gitlab_id = issue.id, error = %e, "Skipping issue with invalid timestamp" ); continue; } }; if !passes_cursor_filter_with_ts(issue.id, issue_updated_at, &cursor) { debug!(gitlab_id = issue.id, "Skipping already-processed issue"); continue; } let labels_created = process_single_issue(conn, config, project_id, &issue)?; result.upserted += 1; result.labels_created += labels_created; last_updated_at = Some(issue_updated_at); last_gitlab_id = Some(issue.id); batch_count += 1; if batch_count % 100 == 0 && let (Some(ts), Some(id)) = (last_updated_at, last_gitlab_id) { update_sync_cursor(conn, project_id, ts, id)?; debug!(batch_count, "Incremental cursor update"); } } if let (Some(ts), Some(id)) = (last_updated_at, last_gitlab_id) { update_sync_cursor(conn, project_id, ts, id)?; } else if result.fetched == 0 && cursor.updated_at_cursor.is_some() { debug!("No new issues found, cursor unchanged"); } result.issues_needing_discussion_sync = get_issues_needing_discussion_sync(conn, project_id)?; debug!( summary = crate::ingestion::nonzero_summary(&[ ("fetched", result.fetched), ("upserted", result.upserted), ("labels", result.labels_created), ("needing sync", result.issues_needing_discussion_sync.len()), ]), "Issue ingestion" ); Ok(result) } fn passes_cursor_filter_with_ts(gitlab_id: i64, issue_ts: i64, cursor: &SyncCursor) -> bool { let Some(cursor_ts) = cursor.updated_at_cursor else { return true; }; if issue_ts < cursor_ts { return false; } if issue_ts == cursor_ts && let Some(cursor_id) = cursor.tie_breaker_id && gitlab_id <= cursor_id { return false; } true } pub(crate) fn process_single_issue( conn: &Connection, config: &Config, project_id: i64, issue: &GitLabIssue, ) -> Result { let now = now_ms(); let payload_bytes = serde_json::to_vec(issue)?; let transformed = transform_issue(issue)?; let issue_row = &transformed.issue; let tx = conn.unchecked_transaction()?; let labels_created = process_issue_in_transaction( &tx, config, project_id, issue, &payload_bytes, issue_row, &transformed.label_names, &transformed.assignee_usernames, transformed.milestone.as_ref(), now, )?; tx.commit()?; Ok(labels_created) } #[allow(clippy::too_many_arguments)] fn process_issue_in_transaction( tx: &Transaction<'_>, config: &Config, project_id: i64, issue: &GitLabIssue, payload_bytes: &[u8], issue_row: &crate::gitlab::transformers::IssueRow, label_names: &[String], assignee_usernames: &[String], milestone: Option<&MilestoneRow>, now: i64, ) -> Result { let mut labels_created = 0; let payload_id = store_payload( tx.deref(), StorePayloadOptions { project_id: Some(project_id), resource_type: "issue", gitlab_id: &issue.id.to_string(), json_bytes: payload_bytes, compress: config.storage.compress_raw_payloads, }, )?; let milestone_id: Option = if let Some(m) = milestone { Some(upsert_milestone_tx(tx, project_id, m)?) } else { None }; tx.execute( "INSERT INTO issues ( gitlab_id, project_id, iid, title, description, state, author_username, created_at, updated_at, last_seen_at, web_url, due_date, milestone_id, milestone_title, raw_payload_id ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15) ON CONFLICT(gitlab_id) DO UPDATE SET title = excluded.title, description = excluded.description, state = excluded.state, author_username = excluded.author_username, updated_at = excluded.updated_at, last_seen_at = excluded.last_seen_at, web_url = excluded.web_url, due_date = excluded.due_date, milestone_id = excluded.milestone_id, milestone_title = excluded.milestone_title, raw_payload_id = excluded.raw_payload_id", ( issue_row.gitlab_id, project_id, issue_row.iid, &issue_row.title, &issue_row.description, &issue_row.state, &issue_row.author_username, issue_row.created_at, issue_row.updated_at, now, &issue_row.web_url, &issue_row.due_date, milestone_id, &issue_row.milestone_title, payload_id, ), )?; let local_issue_id: i64 = tx.query_row( "SELECT id FROM issues WHERE project_id = ? AND iid = ?", (project_id, issue_row.iid), |row| row.get(0), )?; dirty_tracker::mark_dirty_tx(tx, SourceType::Issue, local_issue_id)?; tx.execute( "DELETE FROM issue_labels WHERE issue_id = ?", [local_issue_id], )?; for label_name in label_names { let label_id = upsert_label_tx(tx, project_id, label_name, &mut labels_created)?; link_issue_label_tx(tx, local_issue_id, label_id)?; } tx.execute( "DELETE FROM issue_assignees WHERE issue_id = ?", [local_issue_id], )?; for username in assignee_usernames { tx.execute( "INSERT OR IGNORE INTO issue_assignees (issue_id, username) VALUES (?, ?)", (local_issue_id, username), )?; } Ok(labels_created) } fn upsert_label_tx( tx: &Transaction<'_>, project_id: i64, name: &str, created_count: &mut usize, ) -> Result { tx.execute( "INSERT OR IGNORE INTO labels (project_id, name) VALUES (?1, ?2)", (project_id, name), )?; if tx.changes() > 0 { *created_count += 1; } let id: i64 = tx.query_row( "SELECT id FROM labels WHERE project_id = ?1 AND name = ?2", (project_id, name), |row| row.get(0), )?; Ok(id) } fn link_issue_label_tx(tx: &Transaction<'_>, issue_id: i64, label_id: i64) -> Result<()> { tx.execute( "INSERT OR IGNORE INTO issue_labels (issue_id, label_id) VALUES (?, ?)", (issue_id, label_id), )?; Ok(()) } fn upsert_milestone_tx( tx: &Transaction<'_>, project_id: i64, milestone: &MilestoneRow, ) -> Result { let local_id: i64 = tx.query_row( "INSERT INTO milestones (gitlab_id, project_id, iid, title, description, state, due_date, web_url) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) ON CONFLICT(project_id, gitlab_id) DO UPDATE SET iid = excluded.iid, title = excluded.title, description = excluded.description, state = excluded.state, due_date = excluded.due_date, web_url = excluded.web_url RETURNING id", ( milestone.gitlab_id, project_id, milestone.iid, &milestone.title, &milestone.description, &milestone.state, &milestone.due_date, &milestone.web_url, ), |row| row.get(0), )?; Ok(local_id) } fn get_sync_cursor(conn: &Connection, project_id: i64) -> Result { let row: Option<(Option, Option)> = conn .query_row( "SELECT updated_at_cursor, tie_breaker_id FROM sync_cursors WHERE project_id = ? AND resource_type = 'issues'", [project_id], |row| Ok((row.get(0)?, row.get(1)?)), ) .ok(); Ok(match row { Some((updated_at, tie_breaker)) => SyncCursor { updated_at_cursor: updated_at, tie_breaker_id: tie_breaker, }, None => SyncCursor::default(), }) } fn update_sync_cursor( conn: &Connection, project_id: i64, updated_at: i64, gitlab_id: i64, ) -> Result<()> { conn.execute( "INSERT INTO sync_cursors (project_id, resource_type, updated_at_cursor, tie_breaker_id) VALUES (?1, 'issues', ?2, ?3) ON CONFLICT(project_id, resource_type) DO UPDATE SET updated_at_cursor = excluded.updated_at_cursor, tie_breaker_id = excluded.tie_breaker_id", (project_id, updated_at, gitlab_id), )?; Ok(()) } fn get_issues_needing_discussion_sync( conn: &Connection, project_id: i64, ) -> Result> { let mut stmt = conn.prepare( "SELECT id, iid, updated_at FROM issues WHERE project_id = ? AND updated_at > COALESCE(discussions_synced_for_updated_at, 0)", )?; let issues: std::result::Result, _> = stmt .query_map([project_id], |row| { Ok(IssueForDiscussionSync { local_issue_id: row.get(0)?, iid: row.get(1)?, updated_at: row.get(2)?, }) })? .collect(); Ok(issues?) } fn parse_timestamp(ts: &str) -> Result { chrono::DateTime::parse_from_rfc3339(ts) .map(|dt| dt.timestamp_millis()) .map_err(|e| LoreError::Other(format!("Failed to parse timestamp '{}': {}", ts, e))) } #[cfg(test)] #[path = "issues_tests.rs"] mod tests;