feat(ingestion): add progress reporting for status enrichment pipeline
Previously the status enrichment phase (GraphQL work item status fetch)
ran silently — users saw no feedback between "syncing issues" and the
final enrichment summary. For projects with hundreds of issues and
adaptive page-size retries, this felt like a hang.
Changes across three layers:
GraphQL (graphql.rs):
- Extract fetch_issue_statuses_with_progress() accepting an optional
on_page callback invoked after each paginated fetch with the
running count of fetched IIDs
- Original fetch_issue_statuses() preserved as a zero-cost
delegation wrapper (no callback overhead)
Orchestrator (orchestrator.rs):
- Three new ProgressEvent variants: StatusEnrichmentStarted,
StatusEnrichmentPageFetched, StatusEnrichmentWriting
- Wire the page callback through to the new _with_progress fn
CLI (ingest.rs):
- Handle all three new events in the progress callback, updating
both the per-project spinner and the stage bar with live counts
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -532,8 +532,35 @@ async fn run_ingest_inner(
|
|||||||
ProgressEvent::MrDiffsFetchComplete { .. } => {
|
ProgressEvent::MrDiffsFetchComplete { .. } => {
|
||||||
disc_bar_clone.finish_and_clear();
|
disc_bar_clone.finish_and_clear();
|
||||||
}
|
}
|
||||||
|
ProgressEvent::StatusEnrichmentStarted => {
|
||||||
|
spinner_clone.set_message(format!(
|
||||||
|
"{path_for_cb}: Enriching work item statuses..."
|
||||||
|
));
|
||||||
|
stage_bar_clone.set_message(
|
||||||
|
"Enriching work item statuses...".to_string()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
ProgressEvent::StatusEnrichmentPageFetched { items_so_far } => {
|
||||||
|
spinner_clone.set_message(format!(
|
||||||
|
"{path_for_cb}: Fetching statuses... ({items_so_far} work items)"
|
||||||
|
));
|
||||||
|
stage_bar_clone.set_message(format!(
|
||||||
|
"Enriching work item statuses... ({items_so_far} fetched)"
|
||||||
|
));
|
||||||
|
}
|
||||||
|
ProgressEvent::StatusEnrichmentWriting { total } => {
|
||||||
|
spinner_clone.set_message(format!(
|
||||||
|
"{path_for_cb}: Writing {total} statuses..."
|
||||||
|
));
|
||||||
|
stage_bar_clone.set_message(format!(
|
||||||
|
"Writing {total} work item statuses..."
|
||||||
|
));
|
||||||
|
}
|
||||||
ProgressEvent::StatusEnrichmentComplete { enriched, cleared } => {
|
ProgressEvent::StatusEnrichmentComplete { enriched, cleared } => {
|
||||||
if enriched > 0 || cleared > 0 {
|
if enriched > 0 || cleared > 0 {
|
||||||
|
spinner_clone.set_message(format!(
|
||||||
|
"{path_for_cb}: {enriched} statuses enriched, {cleared} cleared"
|
||||||
|
));
|
||||||
stage_bar_clone.set_message(format!(
|
stage_bar_clone.set_message(format!(
|
||||||
"Status enrichment: {enriched} enriched, {cleared} cleared"
|
"Status enrichment: {enriched} enriched, {cleared} cleared"
|
||||||
));
|
));
|
||||||
|
|||||||
@@ -233,6 +233,14 @@ fn is_complexity_or_timeout_error(msg: &str) -> bool {
|
|||||||
pub async fn fetch_issue_statuses(
|
pub async fn fetch_issue_statuses(
|
||||||
client: &GraphqlClient,
|
client: &GraphqlClient,
|
||||||
project_path: &str,
|
project_path: &str,
|
||||||
|
) -> crate::core::error::Result<FetchStatusResult> {
|
||||||
|
fetch_issue_statuses_with_progress(client, project_path, None).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn fetch_issue_statuses_with_progress(
|
||||||
|
client: &GraphqlClient,
|
||||||
|
project_path: &str,
|
||||||
|
on_page: Option<&dyn Fn(usize)>,
|
||||||
) -> crate::core::error::Result<FetchStatusResult> {
|
) -> crate::core::error::Result<FetchStatusResult> {
|
||||||
let mut statuses = std::collections::HashMap::new();
|
let mut statuses = std::collections::HashMap::new();
|
||||||
let mut all_fetched_iids = std::collections::HashSet::new();
|
let mut all_fetched_iids = std::collections::HashSet::new();
|
||||||
@@ -327,6 +335,10 @@ pub async fn fetch_issue_statuses(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(cb) = &on_page {
|
||||||
|
cb(all_fetched_iids.len());
|
||||||
|
}
|
||||||
|
|
||||||
// Pagination
|
// Pagination
|
||||||
if !connection.page_info.has_next_page {
|
if !connection.page_info.has_next_page {
|
||||||
break;
|
break;
|
||||||
|
|||||||
@@ -45,6 +45,9 @@ pub enum ProgressEvent {
|
|||||||
MrDiffsFetchStarted { total: usize },
|
MrDiffsFetchStarted { total: usize },
|
||||||
MrDiffFetched { current: usize, total: usize },
|
MrDiffFetched { current: usize, total: usize },
|
||||||
MrDiffsFetchComplete { fetched: usize, failed: usize },
|
MrDiffsFetchComplete { fetched: usize, failed: usize },
|
||||||
|
StatusEnrichmentStarted,
|
||||||
|
StatusEnrichmentPageFetched { items_so_far: usize },
|
||||||
|
StatusEnrichmentWriting { total: usize },
|
||||||
StatusEnrichmentComplete { enriched: usize, cleared: usize },
|
StatusEnrichmentComplete { enriched: usize, cleared: usize },
|
||||||
StatusEnrichmentSkipped,
|
StatusEnrichmentSkipped,
|
||||||
}
|
}
|
||||||
@@ -150,6 +153,8 @@ pub async fn ingest_project_issues_with_progress(
|
|||||||
if config.sync.fetch_work_item_status && !signal.is_cancelled() {
|
if config.sync.fetch_work_item_status && !signal.is_cancelled() {
|
||||||
use rusqlite::OptionalExtension;
|
use rusqlite::OptionalExtension;
|
||||||
|
|
||||||
|
emit(ProgressEvent::StatusEnrichmentStarted);
|
||||||
|
|
||||||
let project_path: Option<String> = conn
|
let project_path: Option<String> = conn
|
||||||
.query_row(
|
.query_row(
|
||||||
"SELECT path_with_namespace FROM projects WHERE id = ?1",
|
"SELECT path_with_namespace FROM projects WHERE id = ?1",
|
||||||
@@ -170,7 +175,16 @@ pub async fn ingest_project_issues_with_progress(
|
|||||||
}
|
}
|
||||||
Some(path) => {
|
Some(path) => {
|
||||||
let graphql_client = client.graphql_client();
|
let graphql_client = client.graphql_client();
|
||||||
match crate::gitlab::graphql::fetch_issue_statuses(&graphql_client, &path).await {
|
let page_cb = |items_so_far: usize| {
|
||||||
|
emit(ProgressEvent::StatusEnrichmentPageFetched { items_so_far });
|
||||||
|
};
|
||||||
|
match crate::gitlab::graphql::fetch_issue_statuses_with_progress(
|
||||||
|
&graphql_client,
|
||||||
|
&path,
|
||||||
|
Some(&page_cb),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
Ok(fetch_result) => {
|
Ok(fetch_result) => {
|
||||||
if let Some(ref reason) = fetch_result.unsupported_reason {
|
if let Some(ref reason) = fetch_result.unsupported_reason {
|
||||||
result.status_enrichment_mode = "unsupported".into();
|
result.status_enrichment_mode = "unsupported".into();
|
||||||
@@ -199,6 +213,9 @@ pub async fn ingest_project_issues_with_progress(
|
|||||||
cleared: 0,
|
cleared: 0,
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
|
emit(ProgressEvent::StatusEnrichmentWriting {
|
||||||
|
total: fetch_result.all_fetched_iids.len(),
|
||||||
|
});
|
||||||
match enrich_issue_statuses_txn(
|
match enrich_issue_statuses_txn(
|
||||||
conn,
|
conn,
|
||||||
project_id,
|
project_id,
|
||||||
|
|||||||
Reference in New Issue
Block a user