diff --git a/migrations/021_work_item_status.sql b/migrations/021_work_item_status.sql new file mode 100644 index 0000000..7a58377 --- /dev/null +++ b/migrations/021_work_item_status.sql @@ -0,0 +1,9 @@ +ALTER TABLE issues ADD COLUMN status_name TEXT; +ALTER TABLE issues ADD COLUMN status_category TEXT; +ALTER TABLE issues ADD COLUMN status_color TEXT; +ALTER TABLE issues ADD COLUMN status_icon_name TEXT; +ALTER TABLE issues ADD COLUMN status_synced_at INTEGER; +CREATE INDEX IF NOT EXISTS idx_issues_project_status_name ON issues(project_id, status_name); + +INSERT INTO schema_version (version, applied_at, description) +VALUES (21, strftime('%s', 'now') * 1000, 'Work item status columns for issues'); diff --git a/src/cli/commands/ingest.rs b/src/cli/commands/ingest.rs index fd27a1a..2c31e84 100644 --- a/src/cli/commands/ingest.rs +++ b/src/cli/commands/ingest.rs @@ -44,6 +44,21 @@ pub struct IngestResult { pub resource_events_failed: usize, pub mr_diffs_fetched: usize, pub mr_diffs_failed: usize, + pub status_enrichment_errors: usize, + pub status_enrichment_projects: Vec, +} + +/// Per-project status enrichment result, collected during ingestion. +pub struct ProjectStatusEnrichment { + pub mode: String, + pub reason: Option, + pub seen: usize, + pub enriched: usize, + pub cleared: usize, + pub without_widget: usize, + pub partial_errors: usize, + pub first_partial_error: Option, + pub error: Option, } #[derive(Debug, Default, Clone, Serialize)] @@ -517,6 +532,14 @@ async fn run_ingest_inner( ProgressEvent::MrDiffsFetchComplete { .. } => { disc_bar_clone.finish_and_clear(); } + ProgressEvent::StatusEnrichmentComplete { enriched, cleared } => { + if enriched > 0 || cleared > 0 { + stage_bar_clone.set_message(format!( + "Status enrichment: {enriched} enriched, {cleared} cleared" + )); + } + } + ProgressEvent::StatusEnrichmentSkipped => {} }) }; @@ -587,6 +610,22 @@ async fn run_ingest_inner( total.issues_skipped_discussion_sync += result.issues_skipped_discussion_sync; total.resource_events_fetched += result.resource_events_fetched; total.resource_events_failed += result.resource_events_failed; + if result.status_enrichment_error.is_some() { + total.status_enrichment_errors += 1; + } + total + .status_enrichment_projects + .push(ProjectStatusEnrichment { + mode: result.status_enrichment_mode.clone(), + reason: result.status_unsupported_reason.clone(), + seen: result.statuses_seen, + enriched: result.statuses_enriched, + cleared: result.statuses_cleared, + without_widget: result.statuses_without_widget, + partial_errors: result.partial_error_count, + first_partial_error: result.first_partial_error.clone(), + error: result.status_enrichment_error.clone(), + }); } Ok(ProjectIngestOutcome::Mrs { ref path, @@ -767,6 +806,25 @@ struct IngestJsonData { notes_upserted: usize, resource_events_fetched: usize, resource_events_failed: usize, + #[serde(skip_serializing_if = "Vec::is_empty")] + status_enrichment: Vec, + status_enrichment_errors: usize, +} + +#[derive(Serialize)] +struct StatusEnrichmentJson { + mode: String, + #[serde(skip_serializing_if = "Option::is_none")] + reason: Option, + seen: usize, + enriched: usize, + cleared: usize, + without_widget: usize, + partial_errors: usize, + #[serde(skip_serializing_if = "Option::is_none")] + first_partial_error: Option, + #[serde(skip_serializing_if = "Option::is_none")] + error: Option, } #[derive(Serialize)] @@ -814,6 +872,22 @@ pub fn print_ingest_summary_json(result: &IngestResult, elapsed_ms: u64) { ) }; + let status_enrichment: Vec = result + .status_enrichment_projects + .iter() + .map(|p| StatusEnrichmentJson { + mode: p.mode.clone(), + reason: p.reason.clone(), + seen: p.seen, + enriched: p.enriched, + cleared: p.cleared, + without_widget: p.without_widget, + partial_errors: p.partial_errors, + first_partial_error: p.first_partial_error.clone(), + error: p.error.clone(), + }) + .collect(); + let output = IngestJsonOutput { ok: true, data: IngestJsonData { @@ -826,6 +900,8 @@ pub fn print_ingest_summary_json(result: &IngestResult, elapsed_ms: u64) { notes_upserted: result.notes_upserted, resource_events_fetched: result.resource_events_fetched, resource_events_failed: result.resource_events_failed, + status_enrichment, + status_enrichment_errors: result.status_enrichment_errors, }, meta: RobotMeta { elapsed_ms }, }; diff --git a/src/cli/commands/sync.rs b/src/cli/commands/sync.rs index c3d747e..af43c53 100644 --- a/src/cli/commands/sync.rs +++ b/src/cli/commands/sync.rs @@ -39,6 +39,7 @@ pub struct SyncResult { pub mr_diffs_failed: usize, pub documents_regenerated: usize, pub documents_embedded: usize, + pub status_enrichment_errors: usize, } fn stage_spinner(stage: u8, total: u8, msg: &str, robot_mode: bool) -> ProgressBar { @@ -123,6 +124,7 @@ pub async fn run_sync( result.discussions_fetched += issues_result.discussions_fetched; result.resource_events_fetched += issues_result.resource_events_fetched; result.resource_events_failed += issues_result.resource_events_failed; + result.status_enrichment_errors += issues_result.status_enrichment_errors; spinner.finish_and_clear(); if signal.is_cancelled() { diff --git a/src/core/config.rs b/src/core/config.rs index 153fcf3..080dfb0 100644 --- a/src/core/config.rs +++ b/src/core/config.rs @@ -52,6 +52,9 @@ pub struct SyncConfig { #[serde(rename = "fetchMrFileChanges", default = "default_true")] pub fetch_mr_file_changes: bool, + + #[serde(rename = "fetchWorkItemStatus", default = "default_true")] + pub fetch_work_item_status: bool, } fn default_true() -> bool { @@ -70,6 +73,7 @@ impl Default for SyncConfig { requests_per_second: 30.0, fetch_resource_events: true, fetch_mr_file_changes: true, + fetch_work_item_status: true, } } } @@ -348,6 +352,22 @@ mod tests { ); } + #[test] + fn test_config_fetch_work_item_status_default_true() { + let config = SyncConfig::default(); + assert!(config.fetch_work_item_status); + } + + #[test] + fn test_config_deserialize_without_key() { + let json = r#"{}"#; + let config: SyncConfig = serde_json::from_str(json).unwrap(); + assert!( + config.fetch_work_item_status, + "Missing key should default to true" + ); + } + #[test] fn test_load_rejects_negative_note_bonus() { let dir = TempDir::new().unwrap(); diff --git a/src/core/db.rs b/src/core/db.rs index 145ed9e..1a0083d 100644 --- a/src/core/db.rs +++ b/src/core/db.rs @@ -65,6 +65,10 @@ const MIGRATIONS: &[(&str, &str)] = &[ "020", include_str!("../../migrations/020_mr_diffs_watermark.sql"), ), + ( + "021", + include_str!("../../migrations/021_work_item_status.sql"), + ), ]; pub fn create_connection(db_path: &Path) -> Result { diff --git a/src/core/error.rs b/src/core/error.rs index 9f501df..84ccad7 100644 --- a/src/core/error.rs +++ b/src/core/error.rs @@ -232,7 +232,7 @@ impl LoreError { } pub fn is_permanent_api_error(&self) -> bool { - matches!(self, Self::GitLabNotFound { .. }) + matches!(self, Self::GitLabNotFound { .. } | Self::GitLabAuthFailed) } pub fn exit_code(&self) -> i32 { diff --git a/src/ingestion/orchestrator.rs b/src/ingestion/orchestrator.rs index d177f92..51dcf87 100644 --- a/src/ingestion/orchestrator.rs +++ b/src/ingestion/orchestrator.rs @@ -45,6 +45,8 @@ pub enum ProgressEvent { MrDiffsFetchStarted { total: usize }, MrDiffFetched { current: usize, total: usize }, MrDiffsFetchComplete { fetched: usize, failed: usize }, + StatusEnrichmentComplete { enriched: usize, cleared: usize }, + StatusEnrichmentSkipped, } #[derive(Debug, Default)] @@ -59,6 +61,15 @@ pub struct IngestProjectResult { pub issues_skipped_discussion_sync: usize, pub resource_events_fetched: usize, pub resource_events_failed: usize, + pub statuses_enriched: usize, + pub statuses_cleared: usize, + pub statuses_seen: usize, + pub statuses_without_widget: usize, + pub partial_error_count: usize, + pub first_partial_error: Option, + pub status_enrichment_error: Option, + pub status_enrichment_mode: String, + pub status_unsupported_reason: Option, } #[derive(Debug, Default)] @@ -135,6 +146,107 @@ pub async fn ingest_project_issues_with_progress( total: result.issues_fetched, }); + // ── Phase 1.5: Status enrichment via GraphQL ────────────────────── + if config.sync.fetch_work_item_status && !signal.is_cancelled() { + use rusqlite::OptionalExtension; + + let project_path: Option = conn + .query_row( + "SELECT path_with_namespace FROM projects WHERE id = ?1", + [project_id], + |r| r.get(0), + ) + .optional()?; + + match project_path { + None => { + warn!("Cannot enrich statuses: project path not found for project_id={project_id}"); + result.status_enrichment_error = Some("project_path_missing".into()); + result.status_enrichment_mode = "fetched".into(); + emit(ProgressEvent::StatusEnrichmentComplete { + enriched: 0, + cleared: 0, + }); + } + Some(path) => { + let graphql_client = client.graphql_client(); + match crate::gitlab::graphql::fetch_issue_statuses(&graphql_client, &path).await { + Ok(fetch_result) => { + if let Some(ref reason) = fetch_result.unsupported_reason { + result.status_enrichment_mode = "unsupported".into(); + result.status_unsupported_reason = Some(match reason { + crate::gitlab::graphql::UnsupportedReason::GraphqlEndpointMissing => { + "graphql_endpoint_missing".into() + } + crate::gitlab::graphql::UnsupportedReason::AuthForbidden => { + "auth_forbidden".into() + } + }); + } else { + result.status_enrichment_mode = "fetched".into(); + } + + result.statuses_seen = fetch_result.all_fetched_iids.len(); + result.partial_error_count = fetch_result.partial_error_count; + if fetch_result.first_partial_error.is_some() { + result.first_partial_error = fetch_result.first_partial_error.clone(); + } + + if signal.is_cancelled() { + info!("Shutdown requested after status fetch, skipping DB write"); + emit(ProgressEvent::StatusEnrichmentComplete { + enriched: 0, + cleared: 0, + }); + } else { + match enrich_issue_statuses_txn( + conn, + project_id, + &fetch_result.statuses, + &fetch_result.all_fetched_iids, + ) { + Ok((enriched, cleared)) => { + result.statuses_enriched = enriched; + result.statuses_cleared = cleared; + result.statuses_without_widget = + result.statuses_seen.saturating_sub(enriched); + info!( + seen = result.statuses_seen, + enriched, + cleared, + without_widget = result.statuses_without_widget, + "Status enrichment complete" + ); + } + Err(e) => { + warn!("Status enrichment DB write failed: {e}"); + result.status_enrichment_error = + Some(format!("db_write_failed: {e}")); + } + } + emit(ProgressEvent::StatusEnrichmentComplete { + enriched: result.statuses_enriched, + cleared: result.statuses_cleared, + }); + } + } + Err(e) => { + warn!("Status enrichment fetch failed: {e}"); + result.status_enrichment_error = Some(e.to_string()); + result.status_enrichment_mode = "fetched".into(); + emit(ProgressEvent::StatusEnrichmentComplete { + enriched: 0, + cleared: 0, + }); + } + } + } + } + } else if !config.sync.fetch_work_item_status { + result.status_enrichment_mode = "skipped".into(); + emit(ProgressEvent::StatusEnrichmentSkipped); + } + let issues_needing_sync = issue_result.issues_needing_discussion_sync; let total_issues: i64 = conn.query_row( @@ -238,6 +350,66 @@ pub async fn ingest_project_issues_with_progress( Ok(result) } +fn enrich_issue_statuses_txn( + conn: &Connection, + project_id: i64, + statuses: &std::collections::HashMap, + all_fetched_iids: &std::collections::HashSet, +) -> std::result::Result<(usize, usize), rusqlite::Error> { + let now_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as i64; + + let tx = conn.unchecked_transaction()?; + + let mut cleared = 0usize; + let mut enriched = 0usize; + + // Phase 1: Clear stale statuses (fetched but no status widget) + { + let mut clear_stmt = tx.prepare_cached( + "UPDATE issues SET status_name = NULL, status_category = NULL, status_color = NULL, + status_icon_name = NULL, status_synced_at = ?3 + WHERE project_id = ?1 AND iid = ?2 AND status_name IS NOT NULL", + )?; + for iid in all_fetched_iids { + if !statuses.contains_key(iid) { + let rows = clear_stmt.execute(rusqlite::params![project_id, iid, now_ms])?; + if rows > 0 { + cleared += 1; + } + } + } + } + + // Phase 2: Apply new/updated statuses + { + let mut update_stmt = tx.prepare_cached( + "UPDATE issues SET status_name = ?1, status_category = ?2, status_color = ?3, + status_icon_name = ?4, status_synced_at = ?5 + WHERE project_id = ?6 AND iid = ?7", + )?; + for (iid, status) in statuses { + let rows = update_stmt.execute(rusqlite::params![ + &status.name, + &status.category, + &status.color, + &status.icon_name, + now_ms, + project_id, + iid, + ])?; + if rows > 0 { + enriched += 1; + } + } + } + + tx.commit()?; + Ok((enriched, cleared)) +} + #[allow(clippy::too_many_arguments)] async fn sync_discussions_sequential( conn: &Connection, @@ -1181,46 +1353,33 @@ fn store_closes_issues_refs( mr_local_id: i64, closes_issues: &[crate::gitlab::types::GitLabIssueRef], ) -> Result<()> { - conn.execute_batch("SAVEPOINT store_closes_refs")?; - let inner = || -> Result<()> { - for issue_ref in closes_issues { - let target_local_id = resolve_issue_local_id(conn, project_id, issue_ref.iid)?; + for issue_ref in closes_issues { + let target_local_id = resolve_issue_local_id(conn, project_id, issue_ref.iid)?; - let (target_id, target_path, target_iid) = if let Some(local_id) = target_local_id { - (Some(local_id), None, None) - } else { - let path = resolve_project_path(conn, issue_ref.project_id)?; - let fallback = - path.unwrap_or_else(|| format!("gitlab_project:{}", issue_ref.project_id)); - (None, Some(fallback), Some(issue_ref.iid)) - }; + let (target_id, target_path, target_iid) = if let Some(local_id) = target_local_id { + (Some(local_id), None, None) + } else { + let path = resolve_project_path(conn, issue_ref.project_id)?; + let fallback = + path.unwrap_or_else(|| format!("gitlab_project:{}", issue_ref.project_id)); + (None, Some(fallback), Some(issue_ref.iid)) + }; - let ref_ = EntityReference { - project_id, - source_entity_type: "merge_request", - source_entity_id: mr_local_id, - target_entity_type: "issue", - target_entity_id: target_id, - target_project_path: target_path.as_deref(), - target_entity_iid: target_iid, - reference_type: "closes", - source_method: "api", - }; + let ref_ = EntityReference { + project_id, + source_entity_type: "merge_request", + source_entity_id: mr_local_id, + target_entity_type: "issue", + target_entity_id: target_id, + target_project_path: target_path.as_deref(), + target_entity_iid: target_iid, + reference_type: "closes", + source_method: "api", + }; - insert_entity_reference(conn, &ref_)?; - } - Ok(()) - }; - match inner() { - Ok(()) => { - conn.execute_batch("RELEASE store_closes_refs")?; - Ok(()) - } - Err(e) => { - let _ = conn.execute_batch("ROLLBACK TO store_closes_refs; RELEASE store_closes_refs"); - Err(e) - } + insert_entity_reference(conn, &ref_)?; } + Ok(()) } // ─── MR Diffs (file changes) ──────────────────────────────────────────────── @@ -1469,6 +1628,15 @@ mod tests { assert_eq!(result.issues_skipped_discussion_sync, 0); assert_eq!(result.resource_events_fetched, 0); assert_eq!(result.resource_events_failed, 0); + assert_eq!(result.statuses_enriched, 0); + assert_eq!(result.statuses_cleared, 0); + assert_eq!(result.statuses_seen, 0); + assert_eq!(result.statuses_without_widget, 0); + assert_eq!(result.partial_error_count, 0); + assert!(result.first_partial_error.is_none()); + assert!(result.status_enrichment_error.is_none()); + assert!(result.status_enrichment_mode.is_empty()); + assert!(result.status_unsupported_reason.is_none()); } #[test] @@ -1509,5 +1677,10 @@ mod tests { fetched: 8, failed: 2, }; + let _status_complete = ProgressEvent::StatusEnrichmentComplete { + enriched: 5, + cleared: 1, + }; + let _status_skipped = ProgressEvent::StatusEnrichmentSkipped; } }