diff --git a/Cargo.lock b/Cargo.lock index a8a1b3a..df77a34 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1756,6 +1756,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410" +dependencies = [ + "libc", +] + [[package]] name = "simd-adler32" version = "0.3.8" @@ -1968,6 +1977,7 @@ dependencies = [ "libc", "mio", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.61.2", diff --git a/Cargo.toml b/Cargo.toml index 0efa027..fb216cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,7 @@ open = "5" # HTTP reqwest = { version = "0.12", features = ["json"] } -tokio = { version = "1", features = ["rt-multi-thread", "macros", "time"] } +tokio = { version = "1", features = ["rt-multi-thread", "macros", "time", "signal"] } # Async streaming for pagination async-stream = "0.3" diff --git a/src/cli/commands/ingest.rs b/src/cli/commands/ingest.rs index da70f0d..9897bdb 100644 --- a/src/cli/commands/ingest.rs +++ b/src/cli/commands/ingest.rs @@ -14,6 +14,7 @@ use crate::core::error::{LoreError, Result}; use crate::core::lock::{AppLock, LockOptions}; use crate::core::paths::get_db_path; use crate::core::project::resolve_project; +use crate::core::shutdown::ShutdownSignal; use crate::gitlab::GitLabClient; use crate::ingestion::{ IngestMrProjectResult, IngestProjectResult, ProgressEvent, ingest_project_issues_with_progress, @@ -113,6 +114,7 @@ pub async fn run_ingest( dry_run: bool, display: IngestDisplay, stage_bar: Option, + signal: &ShutdownSignal, ) -> Result { let run_id = uuid::Uuid::new_v4().simple().to_string(); let run_id = &run_id[..8]; @@ -127,6 +129,7 @@ pub async fn run_ingest( dry_run, display, stage_bar, + signal, ) .instrument(span) .await @@ -228,6 +231,7 @@ async fn run_ingest_inner( dry_run: bool, display: IngestDisplay, stage_bar: Option, + signal: &ShutdownSignal, ) -> Result { // In dry_run mode, we don't actually ingest - use run_ingest_dry_run instead // This flag is passed through for consistency but the actual dry-run logic @@ -350,6 +354,7 @@ async fn run_ingest_inner( let agg_disc_total = Arc::clone(&agg_disc_total); let agg_events = Arc::clone(&agg_events); let agg_events_total = Arc::clone(&agg_events_total); + let signal = signal.clone(); async move { let proj_conn = create_connection(&db_path)?; @@ -506,6 +511,7 @@ async fn run_ingest_inner( local_project_id, gitlab_project_id, Some(progress_callback), + &signal, ) .await?; @@ -522,6 +528,7 @@ async fn run_ingest_inner( gitlab_project_id, full, Some(progress_callback), + &signal, ) .await?; diff --git a/src/cli/commands/sync.rs b/src/cli/commands/sync.rs index 9a852e6..402aaa4 100644 --- a/src/cli/commands/sync.rs +++ b/src/cli/commands/sync.rs @@ -9,6 +9,7 @@ use tracing::{info, warn}; use crate::Config; use crate::core::error::Result; use crate::core::metrics::{MetricsLayer, StageTiming}; +use crate::core::shutdown::ShutdownSignal; use super::embed::run_embed; use super::generate_docs::run_generate_docs; @@ -58,6 +59,7 @@ pub async fn run_sync( config: &Config, options: SyncOptions, run_id: Option<&str>, + signal: &ShutdownSignal, ) -> Result { let generated_id; let run_id = match run_id { @@ -112,6 +114,7 @@ pub async fn run_sync( false, // dry_run - sync has its own dry_run handling ingest_display, Some(spinner.clone()), + signal, ) .await?; result.issues_updated = issues_result.issues_upserted; @@ -120,6 +123,11 @@ pub async fn run_sync( result.resource_events_failed += issues_result.resource_events_failed; spinner.finish_and_clear(); + if signal.is_cancelled() { + info!("Shutdown requested after issues stage, returning partial sync results"); + return Ok(result); + } + current_stage += 1; let spinner = stage_spinner( current_stage, @@ -137,6 +145,7 @@ pub async fn run_sync( false, // dry_run - sync has its own dry_run handling ingest_display, Some(spinner.clone()), + signal, ) .await?; result.mrs_updated = mrs_result.mrs_upserted; @@ -145,6 +154,11 @@ pub async fn run_sync( result.resource_events_failed += mrs_result.resource_events_failed; spinner.finish_and_clear(); + if signal.is_cancelled() { + info!("Shutdown requested after MRs stage, returning partial sync results"); + return Ok(result); + } + if !options.no_docs { current_stage += 1; let spinner = stage_spinner( diff --git a/src/core/dependent_queue.rs b/src/core/dependent_queue.rs index 0652473..be0ed82 100644 --- a/src/core/dependent_queue.rs +++ b/src/core/dependent_queue.rs @@ -103,6 +103,28 @@ pub fn complete_job(conn: &Connection, job_id: i64) -> Result<()> { Ok(()) } +/// Same DELETE as `complete_job`, but on an existing transaction so the caller +/// can bundle it atomically with a watermark update. +pub fn complete_job_tx(tx: &rusqlite::Transaction<'_>, job_id: i64) -> Result<()> { + tx.execute( + "DELETE FROM pending_dependent_fetches WHERE id = ?1", + rusqlite::params![job_id], + )?; + + Ok(()) +} + +/// Release all currently locked jobs (set `locked_at = NULL`). +/// Used during graceful shutdown so the next sync doesn't wait for stale locks. +pub fn release_all_locked_jobs(conn: &Connection) -> Result { + let changes = conn.execute( + "UPDATE pending_dependent_fetches SET locked_at = NULL WHERE locked_at IS NOT NULL", + [], + )?; + + Ok(changes) +} + pub fn fail_job(conn: &Connection, job_id: i64, error: &str) -> Result<()> { let now = now_ms(); @@ -200,3 +222,109 @@ pub fn count_claimable_jobs(conn: &Connection, project_id: i64) -> Result (Connection, i64) { + let conn = create_connection(Path::new(":memory:")).unwrap(); + run_migrations(&conn).unwrap(); + + conn.execute( + "INSERT INTO projects (gitlab_project_id, path_with_namespace, web_url) \ + VALUES (1, 'group/repo', 'https://gitlab.com/group/repo')", + [], + ) + .unwrap(); + + let project_id: i64 = conn + .query_row("SELECT id FROM projects LIMIT 1", [], |row| row.get(0)) + .unwrap(); + + enqueue_job(&conn, project_id, "issue", 42, 100, "resource_events", None).unwrap(); + + let job_id: i64 = conn + .query_row( + "SELECT id FROM pending_dependent_fetches LIMIT 1", + [], + |row| row.get(0), + ) + .unwrap(); + + (conn, job_id) + } + + #[test] + fn complete_job_tx_commits() { + let (conn, job_id) = setup_db_with_job(); + + let tx = conn.unchecked_transaction().unwrap(); + complete_job_tx(&tx, job_id).unwrap(); + tx.commit().unwrap(); + + let count: i64 = conn + .query_row( + "SELECT COUNT(*) FROM pending_dependent_fetches WHERE id = ?1", + [job_id], + |row| row.get(0), + ) + .unwrap(); + assert_eq!(count, 0, "job should be deleted after commit"); + } + + #[test] + fn complete_job_tx_rollback() { + let (conn, job_id) = setup_db_with_job(); + + { + let tx = conn.unchecked_transaction().unwrap(); + complete_job_tx(&tx, job_id).unwrap(); + // drop tx without commit = rollback + } + + let count: i64 = conn + .query_row( + "SELECT COUNT(*) FROM pending_dependent_fetches WHERE id = ?1", + [job_id], + |row| row.get(0), + ) + .unwrap(); + assert_eq!(count, 1, "job should survive dropped (rolled-back) tx"); + } + + #[test] + fn release_all_locked_jobs_clears_locks() { + let (conn, _job_id) = setup_db_with_job(); + + let project_id: i64 = conn + .query_row("SELECT id FROM projects LIMIT 1", [], |row| row.get(0)) + .unwrap(); + let jobs = claim_jobs(&conn, "resource_events", project_id, 10).unwrap(); + assert_eq!(jobs.len(), 1); + + let locked: bool = conn + .query_row( + "SELECT locked_at IS NOT NULL FROM pending_dependent_fetches WHERE id = ?1", + [jobs[0].id], + |row| row.get(0), + ) + .unwrap(); + assert!(locked, "job should be locked after claim"); + + let released = release_all_locked_jobs(&conn).unwrap(); + assert_eq!(released, 1); + + let locked: bool = conn + .query_row( + "SELECT locked_at IS NOT NULL FROM pending_dependent_fetches WHERE id = ?1", + [jobs[0].id], + |row| row.get(0), + ) + .unwrap(); + assert!(!locked, "job should be unlocked after release_all"); + } +} diff --git a/src/core/mod.rs b/src/core/mod.rs index 72cf9a2..0bf1534 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -12,6 +12,7 @@ pub mod paths; pub mod payloads; pub mod project; pub mod references; +pub mod shutdown; pub mod sync_run; pub mod time; pub mod timeline; diff --git a/src/core/shutdown.rs b/src/core/shutdown.rs new file mode 100644 index 0000000..2adc77b --- /dev/null +++ b/src/core/shutdown.rs @@ -0,0 +1,63 @@ +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; + +/// A cooperative cancellation token for graceful shutdown. +/// +/// Clone-able and cheaply checkable from any thread or async task. +/// When `cancel()` is called (typically from a Ctrl+C signal handler), +/// all clones observe the cancellation via `is_cancelled()`. +#[derive(Clone)] +pub struct ShutdownSignal { + cancelled: Arc, +} + +impl ShutdownSignal { + pub fn new() -> Self { + Self { + cancelled: Arc::new(AtomicBool::new(false)), + } + } + + pub fn cancel(&self) { + self.cancelled.store(true, Ordering::Relaxed); + } + + pub fn is_cancelled(&self) -> bool { + self.cancelled.load(Ordering::Relaxed) + } +} + +impl Default for ShutdownSignal { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn signal_starts_uncancelled() { + let signal = ShutdownSignal::new(); + assert!(!signal.is_cancelled()); + } + + #[test] + fn cancel_sets_flag() { + let signal = ShutdownSignal::new(); + signal.cancel(); + assert!(signal.is_cancelled()); + } + + #[test] + fn clone_propagates_cancellation() { + let signal = ShutdownSignal::new(); + let clone = signal.clone(); + signal.cancel(); + assert!( + clone.is_cancelled(), + "clone should see cancellation from original" + ); + } +} diff --git a/src/ingestion/orchestrator.rs b/src/ingestion/orchestrator.rs index d1ac576..33a0b9e 100644 --- a/src/ingestion/orchestrator.rs +++ b/src/ingestion/orchestrator.rs @@ -4,12 +4,13 @@ use tracing::{debug, info, instrument, warn}; use crate::Config; use crate::core::dependent_queue::{ - claim_jobs, complete_job, count_claimable_jobs, enqueue_job, fail_job, reclaim_stale_locks, + claim_jobs, complete_job_tx, count_claimable_jobs, enqueue_job, fail_job, reclaim_stale_locks, }; use crate::core::error::Result; use crate::core::references::{ EntityReference, insert_entity_reference, resolve_issue_local_id, resolve_project_path, }; +use crate::core::shutdown::ShutdownSignal; use crate::gitlab::GitLabClient; use super::discussions::ingest_issue_discussions; @@ -84,12 +85,21 @@ pub async fn ingest_project_issues( project_id: i64, gitlab_project_id: i64, ) -> Result { - ingest_project_issues_with_progress(conn, client, config, project_id, gitlab_project_id, None) - .await + let signal = ShutdownSignal::new(); + ingest_project_issues_with_progress( + conn, + client, + config, + project_id, + gitlab_project_id, + None, + &signal, + ) + .await } #[instrument( - skip(conn, client, config, progress), + skip(conn, client, config, progress, signal), fields(project_id, gitlab_project_id, items_processed, items_skipped, errors) )] pub async fn ingest_project_issues_with_progress( @@ -99,6 +109,7 @@ pub async fn ingest_project_issues_with_progress( project_id: i64, gitlab_project_id: i64, progress: Option, + signal: &ShutdownSignal, ) -> Result { let mut result = IngestProjectResult::default(); let emit = |event: ProgressEvent| { @@ -130,6 +141,11 @@ pub async fn ingest_project_issues_with_progress( let total_issues = total_issues as usize; result.issues_skipped_discussion_sync = total_issues.saturating_sub(issues_needing_sync.len()); + if signal.is_cancelled() { + info!("Shutdown requested, returning partial issue results"); + return Ok(result); + } + if issues_needing_sync.is_empty() { info!("No issues need discussion sync"); } else { @@ -150,6 +166,7 @@ pub async fn ingest_project_issues_with_progress( project_id, &issues_needing_sync, &progress, + signal, ) .await?; @@ -163,6 +180,11 @@ pub async fn ingest_project_issues_with_progress( } } + if signal.is_cancelled() { + info!("Shutdown requested, returning partial issue results"); + return Ok(result); + } + if config.sync.fetch_resource_events { let enqueued = enqueue_resource_events_for_entity_type(conn, project_id, "issue")?; if enqueued > 0 { @@ -176,6 +198,7 @@ pub async fn ingest_project_issues_with_progress( project_id, gitlab_project_id, &progress, + signal, ) .await?; result.resource_events_fetched = drain_result.fetched; @@ -211,6 +234,7 @@ pub async fn ingest_project_issues_with_progress( Ok(result) } +#[allow(clippy::too_many_arguments)] async fn sync_discussions_sequential( conn: &Connection, client: &GitLabClient, @@ -219,6 +243,7 @@ async fn sync_discussions_sequential( local_project_id: i64, issues: &[IssueForDiscussionSync], progress: &Option, + signal: &ShutdownSignal, ) -> Result> { let batch_size = config.sync.dependent_concurrency as usize; let total = issues.len(); @@ -226,6 +251,10 @@ async fn sync_discussions_sequential( let mut results = Vec::with_capacity(issues.len()); for chunk in issues.chunks(batch_size) { + if signal.is_cancelled() { + info!("Shutdown requested during discussion sync, returning partial results"); + break; + } for issue in chunk { let disc_result = ingest_issue_discussions( conn, @@ -258,6 +287,7 @@ pub async fn ingest_project_merge_requests( gitlab_project_id: i64, full_sync: bool, ) -> Result { + let signal = ShutdownSignal::new(); ingest_project_merge_requests_with_progress( conn, client, @@ -266,14 +296,16 @@ pub async fn ingest_project_merge_requests( gitlab_project_id, full_sync, None, + &signal, ) .await } #[instrument( - skip(conn, client, config, progress), + skip(conn, client, config, progress, signal), fields(project_id, gitlab_project_id, items_processed, items_skipped, errors) )] +#[allow(clippy::too_many_arguments)] pub async fn ingest_project_merge_requests_with_progress( conn: &Connection, client: &GitLabClient, @@ -282,6 +314,7 @@ pub async fn ingest_project_merge_requests_with_progress( gitlab_project_id: i64, full_sync: bool, progress: Option, + signal: &ShutdownSignal, ) -> Result { let mut result = IngestMrProjectResult::default(); let emit = |event: ProgressEvent| { @@ -323,6 +356,11 @@ pub async fn ingest_project_merge_requests_with_progress( let total_mrs = total_mrs as usize; result.mrs_skipped_discussion_sync = total_mrs.saturating_sub(mrs_needing_sync.len()); + if signal.is_cancelled() { + info!("Shutdown requested, returning partial MR results"); + return Ok(result); + } + if mrs_needing_sync.is_empty() { info!("No MRs need discussion sync"); } else { @@ -343,6 +381,7 @@ pub async fn ingest_project_merge_requests_with_progress( project_id, &mrs_needing_sync, &progress, + signal, ) .await?; @@ -360,6 +399,11 @@ pub async fn ingest_project_merge_requests_with_progress( } } + if signal.is_cancelled() { + info!("Shutdown requested, returning partial MR results"); + return Ok(result); + } + if config.sync.fetch_resource_events { let enqueued = enqueue_resource_events_for_entity_type(conn, project_id, "merge_request")?; if enqueued > 0 { @@ -373,6 +417,7 @@ pub async fn ingest_project_merge_requests_with_progress( project_id, gitlab_project_id, &progress, + signal, ) .await?; result.resource_events_fetched = drain_result.fetched; @@ -388,6 +433,11 @@ pub async fn ingest_project_merge_requests_with_progress( } } + if signal.is_cancelled() { + info!("Shutdown requested, returning partial MR results"); + return Ok(result); + } + let note_refs = crate::core::note_parser::extract_refs_from_system_notes(conn, project_id)?; if note_refs.inserted > 0 || note_refs.skipped_unresolvable > 0 { debug!( @@ -411,6 +461,7 @@ pub async fn ingest_project_merge_requests_with_progress( project_id, gitlab_project_id, &progress, + signal, ) .await?; result.closes_issues_fetched = closes_result.fetched; @@ -440,6 +491,7 @@ pub async fn ingest_project_merge_requests_with_progress( Ok(result) } +#[allow(clippy::too_many_arguments)] async fn sync_mr_discussions_sequential( conn: &Connection, client: &GitLabClient, @@ -448,6 +500,7 @@ async fn sync_mr_discussions_sequential( local_project_id: i64, mrs: &[MrForDiscussionSync], progress: &Option, + signal: &ShutdownSignal, ) -> Result> { let batch_size = config.sync.dependent_concurrency as usize; let total = mrs.len(); @@ -456,6 +509,10 @@ async fn sync_mr_discussions_sequential( let mut processed = 0; for chunk in mrs.chunks(batch_size) { + if signal.is_cancelled() { + info!("Shutdown requested during MR discussion sync, returning partial results"); + break; + } let prefetch_futures = chunk.iter().map(|mr| { prefetch_mr_discussions(client, gitlab_project_id, local_project_id, mr.clone()) }); @@ -559,8 +616,48 @@ fn enqueue_resource_events_for_entity_type( Ok(enqueued) } +/// Result of a concurrent HTTP prefetch for resource events. +#[allow(clippy::type_complexity)] +struct PrefetchedResourceEvents { + job_id: i64, + project_id: i64, + entity_type: String, + entity_iid: i64, + entity_local_id: i64, + result: std::result::Result< + ( + Vec, + Vec, + Vec, + ), + crate::core::error::LoreError, + >, +} + +async fn prefetch_resource_events( + client: &GitLabClient, + gitlab_project_id: i64, + job_id: i64, + project_id: i64, + entity_type: String, + entity_iid: i64, + entity_local_id: i64, +) -> PrefetchedResourceEvents { + let result = client + .fetch_all_resource_events(gitlab_project_id, &entity_type, entity_iid) + .await; + PrefetchedResourceEvents { + job_id, + project_id, + entity_type, + entity_iid, + entity_local_id, + result, + } +} + #[instrument( - skip(conn, client, config, progress), + skip(conn, client, config, progress, signal), fields(project_id, gitlab_project_id, items_processed, errors) )] async fn drain_resource_events( @@ -570,6 +667,7 @@ async fn drain_resource_events( project_id: i64, gitlab_project_id: i64, progress: &Option, + signal: &ShutdownSignal, ) -> Result { let mut result = DrainResult::default(); let batch_size = config.sync.dependent_concurrency as usize; @@ -603,33 +701,49 @@ async fn drain_resource_events( let mut seen_job_ids = std::collections::HashSet::new(); loop { + if signal.is_cancelled() { + info!("Shutdown requested during resource events drain, returning partial results"); + break; + } + let jobs = claim_jobs(conn, "resource_events", project_id, batch_size)?; if jobs.is_empty() { break; } - let mut any_new_in_batch = false; + // Phase 1: Concurrent HTTP fetches + let futures: Vec<_> = jobs + .iter() + .filter(|j| seen_job_ids.insert(j.id)) + .map(|j| { + prefetch_resource_events( + client, + gitlab_project_id, + j.id, + j.project_id, + j.entity_type.clone(), + j.entity_iid, + j.entity_local_id, + ) + }) + .collect(); - for job in &jobs { - if !seen_job_ids.insert(job.id) { - warn!( - job_id = job.id, - "Skipping already-processed job in same drain run" - ); - continue; - } - any_new_in_batch = true; + if futures.is_empty() { + warn!("All claimed jobs were already processed, breaking drain loop"); + break; + } - match client - .fetch_all_resource_events(gitlab_project_id, &job.entity_type, job.entity_iid) - .await - { + let prefetched = join_all(futures).await; + + // Phase 2: Serial DB writes + for p in prefetched { + match p.result { Ok((state_events, label_events, milestone_events)) => { let store_result = store_resource_events( conn, - job.project_id, - &job.entity_type, - job.entity_local_id, + p.project_id, + &p.entity_type, + p.entity_local_id, &state_events, &label_events, &milestone_events, @@ -637,22 +751,24 @@ async fn drain_resource_events( match store_result { Ok(()) => { - complete_job(conn, job.id)?; - update_resource_event_watermark( - conn, - &job.entity_type, - job.entity_local_id, + let tx = conn.unchecked_transaction()?; + complete_job_tx(&tx, p.job_id)?; + update_resource_event_watermark_tx( + &tx, + &p.entity_type, + p.entity_local_id, )?; + tx.commit()?; result.fetched += 1; } Err(e) => { warn!( - entity_type = %job.entity_type, - entity_iid = job.entity_iid, + entity_type = %p.entity_type, + entity_iid = p.entity_iid, error = %e, "Failed to store resource events" ); - fail_job(conn, job.id, &e.to_string())?; + fail_job(conn, p.job_id, &e.to_string())?; result.failed += 1; } } @@ -660,26 +776,24 @@ async fn drain_resource_events( Err(e) => { if e.is_permanent_api_error() { debug!( - entity_type = %job.entity_type, - entity_iid = job.entity_iid, + entity_type = %p.entity_type, + entity_iid = p.entity_iid, error = %e, "Permanent API error for resource events, marking complete" ); - complete_job(conn, job.id)?; - update_resource_event_watermark( - conn, - &job.entity_type, - job.entity_local_id, - )?; + let tx = conn.unchecked_transaction()?; + complete_job_tx(&tx, p.job_id)?; + update_resource_event_watermark_tx(&tx, &p.entity_type, p.entity_local_id)?; + tx.commit()?; result.skipped_not_found += 1; } else { warn!( - entity_type = %job.entity_type, - entity_iid = job.entity_iid, + entity_type = %p.entity_type, + entity_iid = p.entity_iid, error = %e, "Failed to fetch resource events from GitLab" ); - fail_job(conn, job.id, &e.to_string())?; + fail_job(conn, p.job_id, &e.to_string())?; result.failed += 1; } } @@ -691,11 +805,6 @@ async fn drain_resource_events( total: total_pending, }); } - - if !any_new_in_batch { - warn!("All claimed jobs were already processed, breaking drain loop"); - break; - } } emit(ProgressEvent::ResourceEventsFetchComplete { @@ -762,20 +871,31 @@ fn store_resource_events( Ok(()) } -fn update_resource_event_watermark( - conn: &Connection, +fn update_closes_issues_watermark_tx( + tx: &rusqlite::Transaction<'_>, + mr_local_id: i64, +) -> Result<()> { + tx.execute( + "UPDATE merge_requests SET closes_issues_synced_for_updated_at = updated_at WHERE id = ?", + [mr_local_id], + )?; + Ok(()) +} + +fn update_resource_event_watermark_tx( + tx: &rusqlite::Transaction<'_>, entity_type: &str, entity_local_id: i64, ) -> Result<()> { match entity_type { "issue" => { - conn.execute( + tx.execute( "UPDATE issues SET resource_events_synced_for_updated_at = updated_at WHERE id = ?", [entity_local_id], )?; } "merge_request" => { - conn.execute( + tx.execute( "UPDATE merge_requests SET resource_events_synced_for_updated_at = updated_at WHERE id = ?", [entity_local_id], )?; @@ -785,14 +905,6 @@ fn update_resource_event_watermark( Ok(()) } -fn update_closes_issues_watermark(conn: &Connection, mr_local_id: i64) -> Result<()> { - conn.execute( - "UPDATE merge_requests SET closes_issues_synced_for_updated_at = updated_at WHERE id = ?", - [mr_local_id], - )?; - Ok(()) -} - fn enqueue_mr_closes_issues_jobs(conn: &Connection, project_id: i64) -> Result { // Remove stale jobs for MRs that haven't changed since their last closes_issues sync conn.execute( @@ -833,8 +945,37 @@ fn enqueue_mr_closes_issues_jobs(conn: &Connection, project_id: i64) -> Result, + crate::core::error::LoreError, + >, +} + +async fn prefetch_closes_issues( + client: &GitLabClient, + gitlab_project_id: i64, + job_id: i64, + entity_iid: i64, + entity_local_id: i64, +) -> PrefetchedClosesIssues { + let result = client + .fetch_mr_closes_issues(gitlab_project_id, entity_iid) + .await; + PrefetchedClosesIssues { + job_id, + entity_iid, + entity_local_id, + result, + } +} + #[instrument( - skip(conn, client, config, progress), + skip(conn, client, config, progress, signal), fields(project_id, gitlab_project_id, items_processed, errors) )] async fn drain_mr_closes_issues( @@ -844,6 +985,7 @@ async fn drain_mr_closes_issues( project_id: i64, gitlab_project_id: i64, progress: &Option, + signal: &ShutdownSignal, ) -> Result { let mut result = DrainResult::default(); let batch_size = config.sync.dependent_concurrency as usize; @@ -877,48 +1019,64 @@ async fn drain_mr_closes_issues( let mut seen_job_ids = std::collections::HashSet::new(); loop { + if signal.is_cancelled() { + info!("Shutdown requested during closes_issues drain, returning partial results"); + break; + } + let jobs = claim_jobs(conn, "mr_closes_issues", project_id, batch_size)?; if jobs.is_empty() { break; } - let mut any_new_in_batch = false; + // Phase 1: Concurrent HTTP fetches + let futures: Vec<_> = jobs + .iter() + .filter(|j| seen_job_ids.insert(j.id)) + .map(|j| { + prefetch_closes_issues( + client, + gitlab_project_id, + j.id, + j.entity_iid, + j.entity_local_id, + ) + }) + .collect(); - for job in &jobs { - if !seen_job_ids.insert(job.id) { - warn!( - job_id = job.id, - "Skipping already-processed mr_closes_issues job" - ); - continue; - } - any_new_in_batch = true; + if futures.is_empty() { + warn!("All claimed mr_closes_issues jobs were already processed, breaking drain loop"); + break; + } - match client - .fetch_mr_closes_issues(gitlab_project_id, job.entity_iid) - .await - { + let prefetched = join_all(futures).await; + + // Phase 2: Serial DB writes + for p in prefetched { + match p.result { Ok(closes_issues) => { let store_result = store_closes_issues_refs( conn, project_id, - job.entity_local_id, + p.entity_local_id, &closes_issues, ); match store_result { Ok(()) => { - complete_job(conn, job.id)?; - update_closes_issues_watermark(conn, job.entity_local_id)?; + let tx = conn.unchecked_transaction()?; + complete_job_tx(&tx, p.job_id)?; + update_closes_issues_watermark_tx(&tx, p.entity_local_id)?; + tx.commit()?; result.fetched += 1; } Err(e) => { warn!( - entity_iid = job.entity_iid, + entity_iid = p.entity_iid, error = %e, "Failed to store closes_issues references" ); - fail_job(conn, job.id, &e.to_string())?; + fail_job(conn, p.job_id, &e.to_string())?; result.failed += 1; } } @@ -926,20 +1084,22 @@ async fn drain_mr_closes_issues( Err(e) => { if e.is_permanent_api_error() { debug!( - entity_iid = job.entity_iid, + entity_iid = p.entity_iid, error = %e, "Permanent API error for closes_issues, marking complete" ); - complete_job(conn, job.id)?; - update_closes_issues_watermark(conn, job.entity_local_id)?; + let tx = conn.unchecked_transaction()?; + complete_job_tx(&tx, p.job_id)?; + update_closes_issues_watermark_tx(&tx, p.entity_local_id)?; + tx.commit()?; result.skipped_not_found += 1; } else { warn!( - entity_iid = job.entity_iid, + entity_iid = p.entity_iid, error = %e, "Failed to fetch closes_issues from GitLab" ); - fail_job(conn, job.id, &e.to_string())?; + fail_job(conn, p.job_id, &e.to_string())?; result.failed += 1; } } @@ -951,11 +1111,6 @@ async fn drain_mr_closes_issues( total: total_pending, }); } - - if !any_new_in_batch { - warn!("All claimed mr_closes_issues jobs were already processed, breaking drain loop"); - break; - } } emit(ProgressEvent::ClosesIssuesFetchComplete { diff --git a/src/main.rs b/src/main.rs index bda60f4..8d7ed9c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -30,10 +30,12 @@ use lore::cli::{ use lore::core::db::{ LATEST_SCHEMA_VERSION, create_connection, get_schema_version, run_migrations, }; +use lore::core::dependent_queue::release_all_locked_jobs; use lore::core::error::{LoreError, RobotErrorOutput}; use lore::core::logging; use lore::core::metrics::MetricsLayer; use lore::core::paths::{get_config_path, get_db_path, get_log_dir}; +use lore::core::shutdown::ShutdownSignal; use lore::core::sync_run::SyncRunRecorder; #[tokio::main] @@ -658,6 +660,13 @@ async fn handle_ingest( let run_id_short = &run_id[..8]; let recorder = SyncRunRecorder::start(&recorder_conn, &command, run_id_short)?; + let signal = ShutdownSignal::new(); + let signal_for_handler = signal.clone(); + tokio::spawn(async move { + let _ = tokio::signal::ctrl_c().await; + signal_for_handler.cancel(); + }); + let ingest_result: std::result::Result<(), Box> = async { match args.entity.as_deref() { Some(resource_type) => { @@ -670,6 +679,7 @@ async fn handle_ingest( false, display, None, + &signal, ) .await?; @@ -697,6 +707,7 @@ async fn handle_ingest( false, display, None, + &signal, ) .await?; @@ -709,6 +720,7 @@ async fn handle_ingest( false, display, None, + &signal, ) .await?; @@ -725,6 +737,22 @@ async fn handle_ingest( .await; match ingest_result { + Ok(()) if signal.is_cancelled() => { + let stages = metrics.extract_timings(); + let _ = release_all_locked_jobs(&recorder_conn); + let _ = recorder.fail( + &recorder_conn, + "Interrupted by user (Ctrl+C)", + Some(&stages), + ); + if !robot_mode { + eprintln!( + "{}", + style("Interrupted by Ctrl+C. Partial data has been saved.").yellow() + ); + } + Ok(()) + } Ok(()) => { let stages = metrics.extract_timings(); let total_items: usize = stages.iter().map(|s| s.items_processed).sum(); @@ -734,6 +762,7 @@ async fn handle_ingest( } Err(e) => { let stages = metrics.extract_timings(); + let _ = release_all_locked_jobs(&recorder_conn); let _ = recorder.fail(&recorder_conn, &e.to_string(), Some(&stages)); Err(e) } @@ -1521,7 +1550,8 @@ async fn handle_sync_cmd( // For dry_run, skip recording and just show the preview if dry_run { - run_sync(&config, options, None).await?; + let signal = ShutdownSignal::new(); + run_sync(&config, options, None, &signal).await?; return Ok(()); } @@ -1531,8 +1561,43 @@ async fn handle_sync_cmd( let run_id_short = &run_id[..8]; let recorder = SyncRunRecorder::start(&recorder_conn, "sync", run_id_short)?; + let signal = ShutdownSignal::new(); + let signal_for_handler = signal.clone(); + tokio::spawn(async move { + let _ = tokio::signal::ctrl_c().await; + signal_for_handler.cancel(); + }); + let start = std::time::Instant::now(); - match run_sync(&config, options, Some(run_id_short)).await { + match run_sync(&config, options, Some(run_id_short), &signal).await { + Ok(result) if signal.is_cancelled() => { + let elapsed = start.elapsed(); + let stages = metrics.extract_timings(); + let released = release_all_locked_jobs(&recorder_conn).unwrap_or(0); + let _ = recorder.fail( + &recorder_conn, + "Interrupted by user (Ctrl+C)", + Some(&stages), + ); + + if robot_mode { + print_sync_json(&result, elapsed.as_millis() as u64, Some(metrics)); + } else { + eprintln!(); + eprintln!( + "{}", + console::style("Interrupted by Ctrl+C. Partial results:").yellow() + ); + print_sync(&result, elapsed, Some(metrics)); + if released > 0 { + eprintln!( + "{}", + console::style(format!("Released {released} locked jobs")).dim() + ); + } + } + Ok(()) + } Ok(result) => { let elapsed = start.elapsed(); let stages = metrics.extract_timings(); @@ -1552,6 +1617,7 @@ async fn handle_sync_cmd( } Err(e) => { let stages = metrics.extract_timings(); + let _ = release_all_locked_jobs(&recorder_conn); let _ = recorder.fail(&recorder_conn, &e.to_string(), Some(&stages)); Err(e.into()) }