diff --git a/src/ingestion/issues.rs b/src/ingestion/issues.rs index 374f51c..b8ae13d 100644 --- a/src/ingestion/issues.rs +++ b/src/ingestion/issues.rs @@ -7,6 +7,7 @@ use tracing::{debug, info, warn}; use crate::Config; use crate::core::error::{LoreError, Result}; use crate::core::payloads::{StorePayloadOptions, store_payload}; +use crate::core::shutdown::ShutdownSignal; use crate::core::time::now_ms; use crate::documents::SourceType; use crate::gitlab::GitLabClient; @@ -41,6 +42,7 @@ pub async fn ingest_issues( config: &Config, project_id: i64, gitlab_project_id: i64, + signal: &ShutdownSignal, ) -> Result { let mut result = IngestIssuesResult::default(); @@ -58,6 +60,10 @@ pub async fn ingest_issues( let mut last_gitlab_id: Option = None; while let Some(issue_result) = issues_stream.next().await { + if signal.is_cancelled() { + info!("Issue ingestion interrupted by shutdown signal"); + break; + } let issue = issue_result?; result.fetched += 1; diff --git a/src/ingestion/merge_requests.rs b/src/ingestion/merge_requests.rs index 46ed77b..136096d 100644 --- a/src/ingestion/merge_requests.rs +++ b/src/ingestion/merge_requests.rs @@ -6,6 +6,7 @@ use tracing::{debug, info, warn}; use crate::Config; use crate::core::error::{LoreError, Result}; use crate::core::payloads::{StorePayloadOptions, store_payload}; +use crate::core::shutdown::ShutdownSignal; use crate::core::time::now_ms; use crate::documents::SourceType; use crate::gitlab::GitLabClient; @@ -42,6 +43,7 @@ pub async fn ingest_merge_requests( project_id: i64, gitlab_project_id: i64, full_sync: bool, + signal: &ShutdownSignal, ) -> Result { let mut result = IngestMergeRequestsResult::default(); @@ -58,6 +60,10 @@ pub async fn ingest_merge_requests( let per_page = 100u32; loop { + if signal.is_cancelled() { + info!("MR ingestion interrupted by shutdown signal"); + break; + } let page_result = client .fetch_merge_requests_page( gitlab_project_id, diff --git a/src/ingestion/orchestrator.rs b/src/ingestion/orchestrator.rs index 58c7a6a..6475000 100644 --- a/src/ingestion/orchestrator.rs +++ b/src/ingestion/orchestrator.rs @@ -119,7 +119,8 @@ pub async fn ingest_project_issues_with_progress( }; emit(ProgressEvent::IssuesFetchStarted); - let issue_result = ingest_issues(conn, client, config, project_id, gitlab_project_id).await?; + let issue_result = + ingest_issues(conn, client, config, project_id, gitlab_project_id, signal).await?; result.issues_fetched = issue_result.fetched; result.issues_upserted = issue_result.upserted; @@ -329,6 +330,7 @@ pub async fn ingest_project_merge_requests_with_progress( project_id, gitlab_project_id, full_sync, + signal, ) .await?;