refactor(ingestion): compact log summaries and quieter shutdown messages

Migrate all ingestion completion logs to use nonzero_summary() for compact,
zero-suppressed output. Before: 8-14 individual key=value structured fields
per completion message. After: a single summary field like
'42 fetched · 3 labels · 12 notes' that only shows non-zero counters.

Also downgrade all 'Shutdown requested...' messages from info! to debug!.
These are emitted on every Ctrl+C and add noise to the partial results
output that immediately follows. They remain visible at -vv for debugging
graceful shutdown behavior.

Affected modules:
- issues.rs: issue ingestion completion
- merge_requests.rs: MR ingestion completion, full-sync cursor reset
- mr_discussions.rs: discussion ingestion completion
- orchestrator.rs: project-level issue and MR completion summaries,
  all shutdown-requested checkpoints across discussion sync, resource
  events drain, closes-issues drain, and MR diffs drain

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Taylor Eernisse
2026-02-13 22:31:57 -05:00
parent a7f86b26e4
commit c6a5461d41
4 changed files with 67 additions and 59 deletions

View File

@@ -109,11 +109,13 @@ pub async fn ingest_issues(
result.issues_needing_discussion_sync = get_issues_needing_discussion_sync(conn, project_id)?; result.issues_needing_discussion_sync = get_issues_needing_discussion_sync(conn, project_id)?;
info!( info!(
fetched = result.fetched, summary = crate::ingestion::nonzero_summary(&[
upserted = result.upserted, ("fetched", result.fetched),
labels_created = result.labels_created, ("upserted", result.upserted),
needing_sync = result.issues_needing_discussion_sync.len(), ("labels", result.labels_created),
"Issue ingestion complete" ("needing sync", result.issues_needing_discussion_sync.len()),
]),
"Issue ingestion"
); );
Ok(result) Ok(result)

View File

@@ -50,7 +50,7 @@ pub async fn ingest_merge_requests(
if full_sync { if full_sync {
reset_sync_cursor(conn, project_id)?; reset_sync_cursor(conn, project_id)?;
reset_discussion_watermarks(conn, project_id)?; reset_discussion_watermarks(conn, project_id)?;
info!("Full sync: cursor and discussion watermarks reset"); debug!("Full sync: cursor and discussion watermarks reset");
} }
let cursor = get_sync_cursor(conn, project_id)?; let cursor = get_sync_cursor(conn, project_id)?;
@@ -122,12 +122,14 @@ pub async fn ingest_merge_requests(
} }
info!( info!(
fetched = result.fetched, summary = crate::ingestion::nonzero_summary(&[
upserted = result.upserted, ("fetched", result.fetched),
labels_created = result.labels_created, ("upserted", result.upserted),
assignees_linked = result.assignees_linked, ("labels", result.labels_created),
reviewers_linked = result.reviewers_linked, ("assignees", result.assignees_linked),
"MR ingestion complete" ("reviewers", result.reviewers_linked),
]),
"MR ingestion"
); );
Ok(result) Ok(result)

View File

@@ -269,14 +269,14 @@ pub async fn ingest_mr_discussions(
} }
info!( info!(
mrs_processed = mrs.len(), summary = crate::ingestion::nonzero_summary(&[
discussions_fetched = total_result.discussions_fetched, ("MRs", mrs.len()),
discussions_upserted = total_result.discussions_upserted, ("discussions", total_result.discussions_fetched),
notes_upserted = total_result.notes_upserted, ("notes", total_result.notes_upserted),
notes_skipped = total_result.notes_skipped_bad_timestamp, ("skipped", total_result.notes_skipped_bad_timestamp),
diffnotes = total_result.diffnotes_count, ("diffnotes", total_result.diffnotes_count),
pagination_succeeded = total_result.pagination_succeeded, ]),
"MR discussion ingestion complete" "MR discussion ingestion"
); );
Ok(total_result) Ok(total_result)

View File

@@ -207,7 +207,7 @@ pub async fn ingest_project_issues_with_progress(
} }
if signal.is_cancelled() { if signal.is_cancelled() {
info!("Shutdown requested after status fetch, skipping DB write"); debug!("Shutdown requested after status fetch, skipping DB write");
emit(ProgressEvent::StatusEnrichmentComplete { emit(ProgressEvent::StatusEnrichmentComplete {
enriched: 0, enriched: 0,
cleared: 0, cleared: 0,
@@ -275,12 +275,12 @@ pub async fn ingest_project_issues_with_progress(
result.issues_skipped_discussion_sync = total_issues.saturating_sub(issues_needing_sync.len()); result.issues_skipped_discussion_sync = total_issues.saturating_sub(issues_needing_sync.len());
if signal.is_cancelled() { if signal.is_cancelled() {
info!("Shutdown requested, returning partial issue results"); debug!("Shutdown requested, returning partial issue results");
return Ok(result); return Ok(result);
} }
if issues_needing_sync.is_empty() { if issues_needing_sync.is_empty() {
info!("No issues need discussion sync"); debug!("No issues need discussion sync");
} else { } else {
info!( info!(
count = issues_needing_sync.len(), count = issues_needing_sync.len(),
@@ -314,7 +314,7 @@ pub async fn ingest_project_issues_with_progress(
} }
if signal.is_cancelled() { if signal.is_cancelled() {
info!("Shutdown requested, returning partial issue results"); debug!("Shutdown requested, returning partial issue results");
return Ok(result); return Ok(result);
} }
@@ -348,16 +348,18 @@ pub async fn ingest_project_issues_with_progress(
} }
info!( info!(
issues_fetched = result.issues_fetched, summary = crate::ingestion::nonzero_summary(&[
issues_upserted = result.issues_upserted, ("fetched", result.issues_fetched),
labels_created = result.labels_created, ("upserted", result.issues_upserted),
discussions_fetched = result.discussions_fetched, ("labels", result.labels_created),
notes_upserted = result.notes_upserted, ("discussions", result.discussions_fetched),
issues_synced = result.issues_synced_discussions, ("notes", result.notes_upserted),
issues_skipped = result.issues_skipped_discussion_sync, ("synced", result.issues_synced_discussions),
resource_events_fetched = result.resource_events_fetched, ("skipped", result.issues_skipped_discussion_sync),
resource_events_failed = result.resource_events_failed, ("events", result.resource_events_fetched),
"Project ingestion complete" ("event errors", result.resource_events_failed),
]),
"Project complete"
); );
tracing::Span::current().record("items_processed", result.issues_upserted); tracing::Span::current().record("items_processed", result.issues_upserted);
@@ -445,7 +447,7 @@ async fn sync_discussions_sequential(
for chunk in issues.chunks(batch_size) { for chunk in issues.chunks(batch_size) {
if signal.is_cancelled() { if signal.is_cancelled() {
info!("Shutdown requested during discussion sync, returning partial results"); debug!("Shutdown requested during discussion sync, returning partial results");
break; break;
} }
for issue in chunk { for issue in chunk {
@@ -549,12 +551,12 @@ pub async fn ingest_project_merge_requests_with_progress(
result.mrs_skipped_discussion_sync = total_mrs.saturating_sub(mrs_needing_sync.len()); result.mrs_skipped_discussion_sync = total_mrs.saturating_sub(mrs_needing_sync.len());
if signal.is_cancelled() { if signal.is_cancelled() {
info!("Shutdown requested, returning partial MR results"); debug!("Shutdown requested, returning partial MR results");
return Ok(result); return Ok(result);
} }
if mrs_needing_sync.is_empty() { if mrs_needing_sync.is_empty() {
info!("No MRs need discussion sync"); debug!("No MRs need discussion sync");
} else { } else {
info!( info!(
count = mrs_needing_sync.len(), count = mrs_needing_sync.len(),
@@ -592,7 +594,7 @@ pub async fn ingest_project_merge_requests_with_progress(
} }
if signal.is_cancelled() { if signal.is_cancelled() {
info!("Shutdown requested, returning partial MR results"); debug!("Shutdown requested, returning partial MR results");
return Ok(result); return Ok(result);
} }
@@ -626,7 +628,7 @@ pub async fn ingest_project_merge_requests_with_progress(
} }
if signal.is_cancelled() { if signal.is_cancelled() {
info!("Shutdown requested, returning partial MR results"); debug!("Shutdown requested, returning partial MR results");
return Ok(result); return Ok(result);
} }
@@ -679,7 +681,7 @@ pub async fn ingest_project_merge_requests_with_progress(
} }
if signal.is_cancelled() { if signal.is_cancelled() {
info!("Shutdown requested, returning partial MR results"); debug!("Shutdown requested, returning partial MR results");
return Ok(result); return Ok(result);
} }
@@ -704,21 +706,23 @@ pub async fn ingest_project_merge_requests_with_progress(
} }
info!( info!(
mrs_fetched = result.mrs_fetched, summary = crate::ingestion::nonzero_summary(&[
mrs_upserted = result.mrs_upserted, ("fetched", result.mrs_fetched),
labels_created = result.labels_created, ("upserted", result.mrs_upserted),
discussions_fetched = result.discussions_fetched, ("labels", result.labels_created),
notes_upserted = result.notes_upserted, ("discussions", result.discussions_fetched),
diffnotes = result.diffnotes_count, ("notes", result.notes_upserted),
mrs_synced = result.mrs_synced_discussions, ("diffnotes", result.diffnotes_count),
mrs_skipped = result.mrs_skipped_discussion_sync, ("synced", result.mrs_synced_discussions),
resource_events_fetched = result.resource_events_fetched, ("skipped", result.mrs_skipped_discussion_sync),
resource_events_failed = result.resource_events_failed, ("events", result.resource_events_fetched),
closes_issues_fetched = result.closes_issues_fetched, ("event errors", result.resource_events_failed),
closes_issues_failed = result.closes_issues_failed, ("closes", result.closes_issues_fetched),
mr_diffs_fetched = result.mr_diffs_fetched, ("close errors", result.closes_issues_failed),
mr_diffs_failed = result.mr_diffs_failed, ("diffs", result.mr_diffs_fetched),
"MR project ingestion complete" ("diff errors", result.mr_diffs_failed),
]),
"MR project complete"
); );
tracing::Span::current().record("items_processed", result.mrs_upserted); tracing::Span::current().record("items_processed", result.mrs_upserted);
@@ -750,7 +754,7 @@ async fn sync_mr_discussions_sequential(
for chunk in mrs.chunks(batch_size) { for chunk in mrs.chunks(batch_size) {
if signal.is_cancelled() { if signal.is_cancelled() {
info!("Shutdown requested during MR discussion sync, returning partial results"); debug!("Shutdown requested during MR discussion sync, returning partial results");
break; break;
} }
let prefetch_futures = chunk.iter().map(|mr| { let prefetch_futures = chunk.iter().map(|mr| {
@@ -947,7 +951,7 @@ async fn drain_resource_events(
loop { loop {
if signal.is_cancelled() { if signal.is_cancelled() {
info!("Shutdown requested during resource events drain, returning partial results"); debug!("Shutdown requested during resource events drain, returning partial results");
break; break;
} }
@@ -1269,7 +1273,7 @@ async fn drain_mr_closes_issues(
loop { loop {
if signal.is_cancelled() { if signal.is_cancelled() {
info!("Shutdown requested during closes_issues drain, returning partial results"); debug!("Shutdown requested during closes_issues drain, returning partial results");
break; break;
} }
@@ -1526,7 +1530,7 @@ async fn drain_mr_diffs(
loop { loop {
if signal.is_cancelled() { if signal.is_cancelled() {
info!("Shutdown requested during mr_diffs drain, returning partial results"); debug!("Shutdown requested during mr_diffs drain, returning partial results");
break; break;
} }