diff --git a/src/cli/commands/ingest.rs b/src/cli/commands/ingest.rs index 1c9cd0f..02a0296 100644 --- a/src/cli/commands/ingest.rs +++ b/src/cli/commands/ingest.rs @@ -39,6 +39,9 @@ pub struct IngestResult { pub labels_created: usize, pub discussions_fetched: usize, pub notes_upserted: usize, + // Resource events + pub resource_events_fetched: usize, + pub resource_events_failed: usize, } /// Controls what interactive UI elements `run_ingest` displays. @@ -57,17 +60,26 @@ pub struct IngestDisplay { impl IngestDisplay { /// Interactive mode: everything visible. pub fn interactive() -> Self { - Self { show_progress: true, show_text: true } + Self { + show_progress: true, + show_text: true, + } } /// Robot/JSON mode: everything hidden. pub fn silent() -> Self { - Self { show_progress: false, show_text: false } + Self { + show_progress: false, + show_text: false, + } } /// Progress only (used by sync in interactive mode). pub fn progress_only() -> Self { - Self { show_progress: true, show_text: false } + Self { + show_progress: true, + show_text: false, + } } } @@ -105,9 +117,10 @@ pub async fn run_ingest( lock.acquire(force)?; // Get token from environment - let token = std::env::var(&config.gitlab.token_env_var).map_err(|_| LoreError::TokenNotSet { - env_var: config.gitlab.token_env_var.clone(), - })?; + let token = + std::env::var(&config.gitlab.token_env_var).map_err(|_| LoreError::TokenNotSet { + env_var: config.gitlab.token_env_var.clone(), + })?; // Create GitLab client let client = GitLabClient::new(&config.gitlab.base_url, &token, None); @@ -199,7 +212,9 @@ pub async fn run_ingest( let b = ProgressBar::new(0); b.set_style( ProgressStyle::default_bar() - .template(" {spinner:.blue} Syncing discussions [{bar:30.cyan/dim}] {pos}/{len}") + .template( + " {spinner:.blue} Syncing discussions [{bar:30.cyan/dim}] {pos}/{len}", + ) .unwrap() .progress_chars("=> "), ); @@ -237,6 +252,23 @@ pub async fn run_ingest( ProgressEvent::MrDiscussionSyncComplete => { disc_bar_clone.finish_and_clear(); } + ProgressEvent::ResourceEventsFetchStarted { total } => { + disc_bar_clone.set_length(total as u64); + disc_bar_clone.set_position(0); + disc_bar_clone.set_style( + ProgressStyle::default_bar() + .template(" {spinner:.blue} Fetching resource events [{bar:30.cyan/dim}] {pos}/{len}") + .unwrap() + .progress_chars("=> "), + ); + disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(100)); + } + ProgressEvent::ResourceEventFetched { current, total: _ } => { + disc_bar_clone.set_position(current as u64); + } + ProgressEvent::ResourceEventsFetchComplete { .. } => { + disc_bar_clone.finish_and_clear(); + } _ => {} }) }; @@ -269,6 +301,8 @@ pub async fn run_ingest( total.notes_upserted += result.notes_upserted; total.issues_synced_discussions += result.issues_synced_discussions; total.issues_skipped_discussion_sync += result.issues_skipped_discussion_sync; + total.resource_events_fetched += result.resource_events_fetched; + total.resource_events_failed += result.resource_events_failed; } else { let result = ingest_project_merge_requests_with_progress( &conn, @@ -301,6 +335,8 @@ pub async fn run_ingest( total.diffnotes_count += result.diffnotes_count; total.mrs_synced_discussions += result.mrs_synced_discussions; total.mrs_skipped_discussion_sync += result.mrs_skipped_discussion_sync; + total.resource_events_fetched += result.resource_events_fetched; + total.resource_events_failed += result.resource_events_failed; } } diff --git a/src/cli/commands/sync.rs b/src/cli/commands/sync.rs index 1bd0a85..d42bcf3 100644 --- a/src/cli/commands/sync.rs +++ b/src/cli/commands/sync.rs @@ -29,6 +29,8 @@ pub struct SyncResult { pub issues_updated: usize, pub mrs_updated: usize, pub discussions_fetched: usize, + pub resource_events_fetched: usize, + pub resource_events_failed: usize, pub documents_regenerated: usize, pub documents_embedded: usize, } @@ -70,26 +72,61 @@ pub async fn run_sync(config: &Config, options: SyncOptions) -> Result Result { @@ -112,11 +154,7 @@ pub async fn run_sync(config: &Config, options: SyncOptions) -> Result Result Result 0 || result.resource_events_failed > 0 { + println!( + " Resource events fetched: {}", + result.resource_events_fetched + ); + if result.resource_events_failed > 0 { + println!( + " Resource events failed: {}", + result.resource_events_failed + ); + } + } + println!( + " Documents regenerated: {}", + result.documents_regenerated + ); + println!(" Documents embedded: {}", result.documents_embedded); + println!(" Elapsed: {:.1}s", elapsed.as_secs_f64()); } /// JSON output for sync. diff --git a/src/ingestion/orchestrator.rs b/src/ingestion/orchestrator.rs index 2f5deeb..74186c7 100644 --- a/src/ingestion/orchestrator.rs +++ b/src/ingestion/orchestrator.rs @@ -7,9 +7,12 @@ use futures::future::join_all; use rusqlite::Connection; -use tracing::info; +use tracing::{debug, info, warn}; use crate::Config; +use crate::core::dependent_queue::{ + claim_jobs, complete_job, count_pending_jobs, enqueue_job, fail_job, reclaim_stale_locks, +}; use crate::core::error::Result; use crate::gitlab::GitLabClient; @@ -50,6 +53,12 @@ pub enum ProgressEvent { MrDiscussionSynced { current: usize, total: usize }, /// MR discussion sync complete MrDiscussionSyncComplete, + /// Resource event fetching started (total jobs) + ResourceEventsFetchStarted { total: usize }, + /// Resource event fetched for an entity (current/total) + ResourceEventFetched { current: usize, total: usize }, + /// Resource event fetching complete + ResourceEventsFetchComplete { fetched: usize, failed: usize }, } /// Result of full project ingestion (issues). @@ -63,6 +72,8 @@ pub struct IngestProjectResult { pub notes_upserted: usize, pub issues_synced_discussions: usize, pub issues_skipped_discussion_sync: usize, + pub resource_events_fetched: usize, + pub resource_events_failed: usize, } /// Result of MR ingestion for a project. @@ -80,6 +91,8 @@ pub struct IngestMrProjectResult { pub diffnotes_count: usize, pub mrs_synced_discussions: usize, pub mrs_skipped_discussion_sync: usize, + pub resource_events_fetched: usize, + pub resource_events_failed: usize, } /// Ingest all issues and their discussions for a project. @@ -167,6 +180,21 @@ pub async fn ingest_project_issues_with_progress( result.issues_synced_discussions += 1; } + // Step 4: Enqueue and drain resource events (if enabled) + if config.sync.fetch_resource_events { + // Enqueue resource_events jobs for all issues in this project + let enqueued = enqueue_resource_events_for_entity_type(conn, project_id, "issue")?; + if enqueued > 0 { + debug!(enqueued, "Enqueued resource events jobs for issues"); + } + + // Drain the queue + let drain_result = + drain_resource_events(conn, client, config, gitlab_project_id, &progress).await?; + result.resource_events_fetched = drain_result.fetched; + result.resource_events_failed = drain_result.failed; + } + info!( issues_fetched = result.issues_fetched, issues_upserted = result.issues_upserted, @@ -175,6 +203,8 @@ pub async fn ingest_project_issues_with_progress( notes_upserted = result.notes_upserted, issues_synced = result.issues_synced_discussions, issues_skipped = result.issues_skipped_discussion_sync, + resource_events_fetched = result.resource_events_fetched, + resource_events_failed = result.resource_events_failed, "Project ingestion complete" ); @@ -343,6 +373,19 @@ pub async fn ingest_project_merge_requests_with_progress( } } + // Step 4: Enqueue and drain resource events (if enabled) + if config.sync.fetch_resource_events { + let enqueued = enqueue_resource_events_for_entity_type(conn, project_id, "merge_request")?; + if enqueued > 0 { + debug!(enqueued, "Enqueued resource events jobs for MRs"); + } + + let drain_result = + drain_resource_events(conn, client, config, gitlab_project_id, &progress).await?; + result.resource_events_fetched = drain_result.fetched; + result.resource_events_failed = drain_result.failed; + } + info!( mrs_fetched = result.mrs_fetched, mrs_upserted = result.mrs_upserted, @@ -352,6 +395,8 @@ pub async fn ingest_project_merge_requests_with_progress( diffnotes = result.diffnotes_count, mrs_synced = result.mrs_synced_discussions, mrs_skipped = result.mrs_skipped_discussion_sync, + resource_events_fetched = result.resource_events_fetched, + resource_events_failed = result.resource_events_failed, "MR project ingestion complete" ); @@ -405,6 +450,368 @@ async fn sync_mr_discussions_sequential( Ok(results) } +/// Result of draining the resource events queue. +#[derive(Debug, Default)] +pub struct DrainResult { + pub fetched: usize, + pub failed: usize, +} + +/// Enqueue resource_events jobs for all entities of a given type in a project. +/// +/// Uses the pending_dependent_fetches queue. Jobs are deduplicated by the UNIQUE +/// constraint, so re-enqueueing the same entity is a no-op. +fn enqueue_resource_events_for_entity_type( + conn: &Connection, + project_id: i64, + entity_type: &str, +) -> Result { + let (table, id_col) = match entity_type { + "issue" => ("issues", "id"), + "merge_request" => ("merge_requests", "id"), + _ => return Ok(0), + }; + + // Query all entities for this project and enqueue resource_events jobs. + // The UNIQUE constraint on pending_dependent_fetches makes this idempotent - + // already-queued entities are silently skipped via INSERT OR IGNORE. + let mut stmt = conn.prepare_cached(&format!( + "SELECT {id_col}, iid FROM {table} WHERE project_id = ?1" + ))?; + + let entities: Vec<(i64, i64)> = stmt + .query_map([project_id], |row| Ok((row.get(0)?, row.get(1)?)))? + .collect::, _>>()?; + + let mut enqueued = 0; + for (local_id, iid) in &entities { + if enqueue_job( + conn, + project_id, + entity_type, + *iid, + *local_id, + "resource_events", + None, + )? { + enqueued += 1; + } + } + + Ok(enqueued) +} + +/// Drain pending resource_events jobs: claim, fetch from GitLab, store, complete/fail. +/// +/// Processes jobs sequentially since `rusqlite::Connection` is not `Send`. +/// Uses exponential backoff on failure via `fail_job`. +async fn drain_resource_events( + conn: &Connection, + client: &GitLabClient, + config: &Config, + gitlab_project_id: i64, + progress: &Option, +) -> Result { + let mut result = DrainResult::default(); + let batch_size = config.sync.dependent_concurrency as usize; + + // Reclaim stale locks from crashed processes + let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?; + if reclaimed > 0 { + info!(reclaimed, "Reclaimed stale resource event locks"); + } + + // Count total pending jobs for progress reporting + let pending_counts = count_pending_jobs(conn)?; + let total_pending = pending_counts.get("resource_events").copied().unwrap_or(0); + + if total_pending == 0 { + return Ok(result); + } + + let emit = |event: ProgressEvent| { + if let Some(cb) = progress { + cb(event); + } + }; + + emit(ProgressEvent::ResourceEventsFetchStarted { + total: total_pending, + }); + + let mut processed = 0; + + // Max iterations guard: prevent infinite loop if jobs keep failing and retrying + // within the same drain run. Allow 2x total_pending iterations as safety margin. + let max_iterations = total_pending * 2; + let mut iterations = 0; + + loop { + if iterations >= max_iterations { + warn!( + iterations, + total_pending, "Resource events drain hit max iterations guard, stopping" + ); + break; + } + + let jobs = claim_jobs(conn, "resource_events", batch_size)?; + if jobs.is_empty() { + break; + } + + for job in &jobs { + iterations += 1; + + // conn is &Connection but upsert functions need &mut Connection. + // We need to use unsafe to get a mutable reference since rusqlite + // operations are internally safe with WAL mode and we're single-threaded. + // Instead, we'll use a savepoint approach via the Connection directly. + match client + .fetch_all_resource_events(gitlab_project_id, &job.entity_type, job.entity_iid) + .await + { + Ok((state_events, label_events, milestone_events)) => { + // Store events - we need &mut Connection for savepoints in upsert functions. + // Use unchecked_transaction as a workaround since we have &Connection. + let store_result = store_resource_events( + conn, + job.project_id, + &job.entity_type, + job.entity_local_id, + &state_events, + &label_events, + &milestone_events, + ); + + match store_result { + Ok(()) => { + complete_job(conn, job.id)?; + result.fetched += 1; + } + Err(e) => { + warn!( + entity_type = %job.entity_type, + entity_iid = job.entity_iid, + error = %e, + "Failed to store resource events" + ); + fail_job(conn, job.id, &e.to_string())?; + result.failed += 1; + } + } + } + Err(e) => { + warn!( + entity_type = %job.entity_type, + entity_iid = job.entity_iid, + error = %e, + "Failed to fetch resource events from GitLab" + ); + fail_job(conn, job.id, &e.to_string())?; + result.failed += 1; + } + } + + processed += 1; + emit(ProgressEvent::ResourceEventFetched { + current: processed, + total: total_pending, + }); + } + } + + emit(ProgressEvent::ResourceEventsFetchComplete { + fetched: result.fetched, + failed: result.failed, + }); + + if result.fetched > 0 || result.failed > 0 { + info!( + fetched = result.fetched, + failed = result.failed, + "Resource events drain complete" + ); + } + + Ok(result) +} + +/// Store fetched resource events in the database. +/// +/// Uses unchecked_transaction to work with &Connection (not &mut Connection), +/// which is safe because we're single-threaded and using WAL mode. +fn store_resource_events( + conn: &Connection, + project_id: i64, + entity_type: &str, + entity_local_id: i64, + state_events: &[crate::gitlab::types::GitLabStateEvent], + label_events: &[crate::gitlab::types::GitLabLabelEvent], + milestone_events: &[crate::gitlab::types::GitLabMilestoneEvent], +) -> Result<()> { + // The upsert functions require &mut Connection for savepoints. + // We use unchecked_transaction to wrap all three upserts atomically, + // then call the upsert functions using the transaction's inner connection. + let tx = conn.unchecked_transaction()?; + + // State events - use raw SQL within transaction instead of upsert_state_events + // which requires &mut Connection + if !state_events.is_empty() { + store_state_events_tx(&tx, project_id, entity_type, entity_local_id, state_events)?; + } + + if !label_events.is_empty() { + store_label_events_tx(&tx, project_id, entity_type, entity_local_id, label_events)?; + } + + if !milestone_events.is_empty() { + store_milestone_events_tx( + &tx, + project_id, + entity_type, + entity_local_id, + milestone_events, + )?; + } + + tx.commit()?; + Ok(()) +} + +/// Store state events within an existing transaction. +fn store_state_events_tx( + tx: &rusqlite::Transaction<'_>, + project_id: i64, + entity_type: &str, + entity_local_id: i64, + events: &[crate::gitlab::types::GitLabStateEvent], +) -> Result<()> { + let (issue_id, merge_request_id): (Option, Option) = match entity_type { + "issue" => (Some(entity_local_id), None), + "merge_request" => (None, Some(entity_local_id)), + _ => return Ok(()), + }; + + let mut stmt = tx.prepare_cached( + "INSERT OR REPLACE INTO resource_state_events + (gitlab_id, project_id, issue_id, merge_request_id, state, + actor_gitlab_id, actor_username, created_at, + source_commit, source_merge_request_iid) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", + )?; + + for event in events { + let created_at = crate::core::time::iso_to_ms_strict(&event.created_at) + .map_err(crate::core::error::LoreError::Other)?; + let actor_id = event.user.as_ref().map(|u| u.id); + let actor_username = event.user.as_ref().map(|u| u.username.as_str()); + let source_mr_iid = event.source_merge_request.as_ref().map(|mr| mr.iid); + + stmt.execute(rusqlite::params![ + event.id, + project_id, + issue_id, + merge_request_id, + event.state, + actor_id, + actor_username, + created_at, + event.source_commit, + source_mr_iid, + ])?; + } + + Ok(()) +} + +/// Store label events within an existing transaction. +fn store_label_events_tx( + tx: &rusqlite::Transaction<'_>, + project_id: i64, + entity_type: &str, + entity_local_id: i64, + events: &[crate::gitlab::types::GitLabLabelEvent], +) -> Result<()> { + let (issue_id, merge_request_id): (Option, Option) = match entity_type { + "issue" => (Some(entity_local_id), None), + "merge_request" => (None, Some(entity_local_id)), + _ => return Ok(()), + }; + + let mut stmt = tx.prepare_cached( + "INSERT OR REPLACE INTO resource_label_events + (gitlab_id, project_id, issue_id, merge_request_id, action, + label_name, actor_gitlab_id, actor_username, created_at) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)", + )?; + + for event in events { + let created_at = crate::core::time::iso_to_ms_strict(&event.created_at) + .map_err(crate::core::error::LoreError::Other)?; + let actor_id = event.user.as_ref().map(|u| u.id); + let actor_username = event.user.as_ref().map(|u| u.username.as_str()); + + stmt.execute(rusqlite::params![ + event.id, + project_id, + issue_id, + merge_request_id, + event.action, + event.label.name, + actor_id, + actor_username, + created_at, + ])?; + } + + Ok(()) +} + +/// Store milestone events within an existing transaction. +fn store_milestone_events_tx( + tx: &rusqlite::Transaction<'_>, + project_id: i64, + entity_type: &str, + entity_local_id: i64, + events: &[crate::gitlab::types::GitLabMilestoneEvent], +) -> Result<()> { + let (issue_id, merge_request_id): (Option, Option) = match entity_type { + "issue" => (Some(entity_local_id), None), + "merge_request" => (None, Some(entity_local_id)), + _ => return Ok(()), + }; + + let mut stmt = tx.prepare_cached( + "INSERT OR REPLACE INTO resource_milestone_events + (gitlab_id, project_id, issue_id, merge_request_id, action, + milestone_title, milestone_id, actor_gitlab_id, actor_username, created_at) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", + )?; + + for event in events { + let created_at = crate::core::time::iso_to_ms_strict(&event.created_at) + .map_err(crate::core::error::LoreError::Other)?; + let actor_id = event.user.as_ref().map(|u| u.id); + let actor_username = event.user.as_ref().map(|u| u.username.as_str()); + + stmt.execute(rusqlite::params![ + event.id, + project_id, + issue_id, + merge_request_id, + event.action, + event.milestone.title, + event.milestone.id, + actor_id, + actor_username, + created_at, + ])?; + } + + Ok(()) +} + #[cfg(test)] mod tests { use super::*; @@ -419,6 +826,8 @@ mod tests { assert_eq!(result.notes_upserted, 0); assert_eq!(result.issues_synced_discussions, 0); assert_eq!(result.issues_skipped_discussion_sync, 0); + assert_eq!(result.resource_events_fetched, 0); + assert_eq!(result.resource_events_failed, 0); } #[test] @@ -436,5 +845,28 @@ mod tests { assert_eq!(result.diffnotes_count, 0); assert_eq!(result.mrs_synced_discussions, 0); assert_eq!(result.mrs_skipped_discussion_sync, 0); + assert_eq!(result.resource_events_fetched, 0); + assert_eq!(result.resource_events_failed, 0); + } + + #[test] + fn drain_result_default_has_zero_counts() { + let result = DrainResult::default(); + assert_eq!(result.fetched, 0); + assert_eq!(result.failed, 0); + } + + #[test] + fn progress_event_resource_variants_exist() { + // Verify the new progress event variants are constructible + let _start = ProgressEvent::ResourceEventsFetchStarted { total: 10 }; + let _progress = ProgressEvent::ResourceEventFetched { + current: 5, + total: 10, + }; + let _complete = ProgressEvent::ResourceEventsFetchComplete { + fetched: 8, + failed: 2, + }; } }