feat(ingestion): enrich issues with work item status from GraphQL API
Add a "Phase 1.5" status enrichment step to the issue ingestion pipeline that fetches work item statuses via the GitLab GraphQL API after the standard REST API ingestion completes. Schema changes (migration 021): - Add status_name, status_category, status_color, status_icon_name, and status_synced_at columns to the issues table (all nullable) Ingestion pipeline changes: - New `enrich_issue_statuses_txn()` function that applies fetched statuses in a single transaction with two phases: clear stale statuses for issues that no longer have a status widget, then apply new/updated statuses from the GraphQL response - ProgressEvent variants for status enrichment (complete/skipped) - IngestProjectResult tracks enrichment metrics (seen, enriched, cleared, without_widget, partial_error_count, enrichment_mode, errors) - Robot mode JSON output includes per-project status enrichment details Configuration: - New `sync.fetchWorkItemStatus` config option (defaults true) to disable GraphQL status enrichment on instances without Premium/Ultimate - `LoreError::GitLabAuthFailed` now treated as permanent API error so status enrichment auth failures don't trigger retries Also removes the unnecessary nested SAVEPOINT in store_closes_issues_refs (already runs within the orchestrator's transaction context). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -44,6 +44,21 @@ pub struct IngestResult {
|
||||
pub resource_events_failed: usize,
|
||||
pub mr_diffs_fetched: usize,
|
||||
pub mr_diffs_failed: usize,
|
||||
pub status_enrichment_errors: usize,
|
||||
pub status_enrichment_projects: Vec<ProjectStatusEnrichment>,
|
||||
}
|
||||
|
||||
/// Per-project status enrichment result, collected during ingestion.
|
||||
pub struct ProjectStatusEnrichment {
|
||||
pub mode: String,
|
||||
pub reason: Option<String>,
|
||||
pub seen: usize,
|
||||
pub enriched: usize,
|
||||
pub cleared: usize,
|
||||
pub without_widget: usize,
|
||||
pub partial_errors: usize,
|
||||
pub first_partial_error: Option<String>,
|
||||
pub error: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone, Serialize)]
|
||||
@@ -517,6 +532,14 @@ async fn run_ingest_inner(
|
||||
ProgressEvent::MrDiffsFetchComplete { .. } => {
|
||||
disc_bar_clone.finish_and_clear();
|
||||
}
|
||||
ProgressEvent::StatusEnrichmentComplete { enriched, cleared } => {
|
||||
if enriched > 0 || cleared > 0 {
|
||||
stage_bar_clone.set_message(format!(
|
||||
"Status enrichment: {enriched} enriched, {cleared} cleared"
|
||||
));
|
||||
}
|
||||
}
|
||||
ProgressEvent::StatusEnrichmentSkipped => {}
|
||||
})
|
||||
};
|
||||
|
||||
@@ -587,6 +610,22 @@ async fn run_ingest_inner(
|
||||
total.issues_skipped_discussion_sync += result.issues_skipped_discussion_sync;
|
||||
total.resource_events_fetched += result.resource_events_fetched;
|
||||
total.resource_events_failed += result.resource_events_failed;
|
||||
if result.status_enrichment_error.is_some() {
|
||||
total.status_enrichment_errors += 1;
|
||||
}
|
||||
total
|
||||
.status_enrichment_projects
|
||||
.push(ProjectStatusEnrichment {
|
||||
mode: result.status_enrichment_mode.clone(),
|
||||
reason: result.status_unsupported_reason.clone(),
|
||||
seen: result.statuses_seen,
|
||||
enriched: result.statuses_enriched,
|
||||
cleared: result.statuses_cleared,
|
||||
without_widget: result.statuses_without_widget,
|
||||
partial_errors: result.partial_error_count,
|
||||
first_partial_error: result.first_partial_error.clone(),
|
||||
error: result.status_enrichment_error.clone(),
|
||||
});
|
||||
}
|
||||
Ok(ProjectIngestOutcome::Mrs {
|
||||
ref path,
|
||||
@@ -767,6 +806,25 @@ struct IngestJsonData {
|
||||
notes_upserted: usize,
|
||||
resource_events_fetched: usize,
|
||||
resource_events_failed: usize,
|
||||
#[serde(skip_serializing_if = "Vec::is_empty")]
|
||||
status_enrichment: Vec<StatusEnrichmentJson>,
|
||||
status_enrichment_errors: usize,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct StatusEnrichmentJson {
|
||||
mode: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
reason: Option<String>,
|
||||
seen: usize,
|
||||
enriched: usize,
|
||||
cleared: usize,
|
||||
without_widget: usize,
|
||||
partial_errors: usize,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
first_partial_error: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
error: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
@@ -814,6 +872,22 @@ pub fn print_ingest_summary_json(result: &IngestResult, elapsed_ms: u64) {
|
||||
)
|
||||
};
|
||||
|
||||
let status_enrichment: Vec<StatusEnrichmentJson> = result
|
||||
.status_enrichment_projects
|
||||
.iter()
|
||||
.map(|p| StatusEnrichmentJson {
|
||||
mode: p.mode.clone(),
|
||||
reason: p.reason.clone(),
|
||||
seen: p.seen,
|
||||
enriched: p.enriched,
|
||||
cleared: p.cleared,
|
||||
without_widget: p.without_widget,
|
||||
partial_errors: p.partial_errors,
|
||||
first_partial_error: p.first_partial_error.clone(),
|
||||
error: p.error.clone(),
|
||||
})
|
||||
.collect();
|
||||
|
||||
let output = IngestJsonOutput {
|
||||
ok: true,
|
||||
data: IngestJsonData {
|
||||
@@ -826,6 +900,8 @@ pub fn print_ingest_summary_json(result: &IngestResult, elapsed_ms: u64) {
|
||||
notes_upserted: result.notes_upserted,
|
||||
resource_events_fetched: result.resource_events_fetched,
|
||||
resource_events_failed: result.resource_events_failed,
|
||||
status_enrichment,
|
||||
status_enrichment_errors: result.status_enrichment_errors,
|
||||
},
|
||||
meta: RobotMeta { elapsed_ms },
|
||||
};
|
||||
|
||||
@@ -39,6 +39,7 @@ pub struct SyncResult {
|
||||
pub mr_diffs_failed: usize,
|
||||
pub documents_regenerated: usize,
|
||||
pub documents_embedded: usize,
|
||||
pub status_enrichment_errors: usize,
|
||||
}
|
||||
|
||||
fn stage_spinner(stage: u8, total: u8, msg: &str, robot_mode: bool) -> ProgressBar {
|
||||
@@ -123,6 +124,7 @@ pub async fn run_sync(
|
||||
result.discussions_fetched += issues_result.discussions_fetched;
|
||||
result.resource_events_fetched += issues_result.resource_events_fetched;
|
||||
result.resource_events_failed += issues_result.resource_events_failed;
|
||||
result.status_enrichment_errors += issues_result.status_enrichment_errors;
|
||||
spinner.finish_and_clear();
|
||||
|
||||
if signal.is_cancelled() {
|
||||
|
||||
@@ -52,6 +52,9 @@ pub struct SyncConfig {
|
||||
|
||||
#[serde(rename = "fetchMrFileChanges", default = "default_true")]
|
||||
pub fetch_mr_file_changes: bool,
|
||||
|
||||
#[serde(rename = "fetchWorkItemStatus", default = "default_true")]
|
||||
pub fetch_work_item_status: bool,
|
||||
}
|
||||
|
||||
fn default_true() -> bool {
|
||||
@@ -70,6 +73,7 @@ impl Default for SyncConfig {
|
||||
requests_per_second: 30.0,
|
||||
fetch_resource_events: true,
|
||||
fetch_mr_file_changes: true,
|
||||
fetch_work_item_status: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -348,6 +352,22 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_config_fetch_work_item_status_default_true() {
|
||||
let config = SyncConfig::default();
|
||||
assert!(config.fetch_work_item_status);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_config_deserialize_without_key() {
|
||||
let json = r#"{}"#;
|
||||
let config: SyncConfig = serde_json::from_str(json).unwrap();
|
||||
assert!(
|
||||
config.fetch_work_item_status,
|
||||
"Missing key should default to true"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_load_rejects_negative_note_bonus() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
|
||||
@@ -65,6 +65,10 @@ const MIGRATIONS: &[(&str, &str)] = &[
|
||||
"020",
|
||||
include_str!("../../migrations/020_mr_diffs_watermark.sql"),
|
||||
),
|
||||
(
|
||||
"021",
|
||||
include_str!("../../migrations/021_work_item_status.sql"),
|
||||
),
|
||||
];
|
||||
|
||||
pub fn create_connection(db_path: &Path) -> Result<Connection> {
|
||||
|
||||
@@ -232,7 +232,7 @@ impl LoreError {
|
||||
}
|
||||
|
||||
pub fn is_permanent_api_error(&self) -> bool {
|
||||
matches!(self, Self::GitLabNotFound { .. })
|
||||
matches!(self, Self::GitLabNotFound { .. } | Self::GitLabAuthFailed)
|
||||
}
|
||||
|
||||
pub fn exit_code(&self) -> i32 {
|
||||
|
||||
@@ -45,6 +45,8 @@ pub enum ProgressEvent {
|
||||
MrDiffsFetchStarted { total: usize },
|
||||
MrDiffFetched { current: usize, total: usize },
|
||||
MrDiffsFetchComplete { fetched: usize, failed: usize },
|
||||
StatusEnrichmentComplete { enriched: usize, cleared: usize },
|
||||
StatusEnrichmentSkipped,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
@@ -59,6 +61,15 @@ pub struct IngestProjectResult {
|
||||
pub issues_skipped_discussion_sync: usize,
|
||||
pub resource_events_fetched: usize,
|
||||
pub resource_events_failed: usize,
|
||||
pub statuses_enriched: usize,
|
||||
pub statuses_cleared: usize,
|
||||
pub statuses_seen: usize,
|
||||
pub statuses_without_widget: usize,
|
||||
pub partial_error_count: usize,
|
||||
pub first_partial_error: Option<String>,
|
||||
pub status_enrichment_error: Option<String>,
|
||||
pub status_enrichment_mode: String,
|
||||
pub status_unsupported_reason: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
@@ -135,6 +146,107 @@ pub async fn ingest_project_issues_with_progress(
|
||||
total: result.issues_fetched,
|
||||
});
|
||||
|
||||
// ── Phase 1.5: Status enrichment via GraphQL ──────────────────────
|
||||
if config.sync.fetch_work_item_status && !signal.is_cancelled() {
|
||||
use rusqlite::OptionalExtension;
|
||||
|
||||
let project_path: Option<String> = conn
|
||||
.query_row(
|
||||
"SELECT path_with_namespace FROM projects WHERE id = ?1",
|
||||
[project_id],
|
||||
|r| r.get(0),
|
||||
)
|
||||
.optional()?;
|
||||
|
||||
match project_path {
|
||||
None => {
|
||||
warn!("Cannot enrich statuses: project path not found for project_id={project_id}");
|
||||
result.status_enrichment_error = Some("project_path_missing".into());
|
||||
result.status_enrichment_mode = "fetched".into();
|
||||
emit(ProgressEvent::StatusEnrichmentComplete {
|
||||
enriched: 0,
|
||||
cleared: 0,
|
||||
});
|
||||
}
|
||||
Some(path) => {
|
||||
let graphql_client = client.graphql_client();
|
||||
match crate::gitlab::graphql::fetch_issue_statuses(&graphql_client, &path).await {
|
||||
Ok(fetch_result) => {
|
||||
if let Some(ref reason) = fetch_result.unsupported_reason {
|
||||
result.status_enrichment_mode = "unsupported".into();
|
||||
result.status_unsupported_reason = Some(match reason {
|
||||
crate::gitlab::graphql::UnsupportedReason::GraphqlEndpointMissing => {
|
||||
"graphql_endpoint_missing".into()
|
||||
}
|
||||
crate::gitlab::graphql::UnsupportedReason::AuthForbidden => {
|
||||
"auth_forbidden".into()
|
||||
}
|
||||
});
|
||||
} else {
|
||||
result.status_enrichment_mode = "fetched".into();
|
||||
}
|
||||
|
||||
result.statuses_seen = fetch_result.all_fetched_iids.len();
|
||||
result.partial_error_count = fetch_result.partial_error_count;
|
||||
if fetch_result.first_partial_error.is_some() {
|
||||
result.first_partial_error = fetch_result.first_partial_error.clone();
|
||||
}
|
||||
|
||||
if signal.is_cancelled() {
|
||||
info!("Shutdown requested after status fetch, skipping DB write");
|
||||
emit(ProgressEvent::StatusEnrichmentComplete {
|
||||
enriched: 0,
|
||||
cleared: 0,
|
||||
});
|
||||
} else {
|
||||
match enrich_issue_statuses_txn(
|
||||
conn,
|
||||
project_id,
|
||||
&fetch_result.statuses,
|
||||
&fetch_result.all_fetched_iids,
|
||||
) {
|
||||
Ok((enriched, cleared)) => {
|
||||
result.statuses_enriched = enriched;
|
||||
result.statuses_cleared = cleared;
|
||||
result.statuses_without_widget =
|
||||
result.statuses_seen.saturating_sub(enriched);
|
||||
info!(
|
||||
seen = result.statuses_seen,
|
||||
enriched,
|
||||
cleared,
|
||||
without_widget = result.statuses_without_widget,
|
||||
"Status enrichment complete"
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Status enrichment DB write failed: {e}");
|
||||
result.status_enrichment_error =
|
||||
Some(format!("db_write_failed: {e}"));
|
||||
}
|
||||
}
|
||||
emit(ProgressEvent::StatusEnrichmentComplete {
|
||||
enriched: result.statuses_enriched,
|
||||
cleared: result.statuses_cleared,
|
||||
});
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Status enrichment fetch failed: {e}");
|
||||
result.status_enrichment_error = Some(e.to_string());
|
||||
result.status_enrichment_mode = "fetched".into();
|
||||
emit(ProgressEvent::StatusEnrichmentComplete {
|
||||
enriched: 0,
|
||||
cleared: 0,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if !config.sync.fetch_work_item_status {
|
||||
result.status_enrichment_mode = "skipped".into();
|
||||
emit(ProgressEvent::StatusEnrichmentSkipped);
|
||||
}
|
||||
|
||||
let issues_needing_sync = issue_result.issues_needing_discussion_sync;
|
||||
|
||||
let total_issues: i64 = conn.query_row(
|
||||
@@ -238,6 +350,66 @@ pub async fn ingest_project_issues_with_progress(
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
fn enrich_issue_statuses_txn(
|
||||
conn: &Connection,
|
||||
project_id: i64,
|
||||
statuses: &std::collections::HashMap<i64, crate::gitlab::types::WorkItemStatus>,
|
||||
all_fetched_iids: &std::collections::HashSet<i64>,
|
||||
) -> std::result::Result<(usize, usize), rusqlite::Error> {
|
||||
let now_ms = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_millis() as i64;
|
||||
|
||||
let tx = conn.unchecked_transaction()?;
|
||||
|
||||
let mut cleared = 0usize;
|
||||
let mut enriched = 0usize;
|
||||
|
||||
// Phase 1: Clear stale statuses (fetched but no status widget)
|
||||
{
|
||||
let mut clear_stmt = tx.prepare_cached(
|
||||
"UPDATE issues SET status_name = NULL, status_category = NULL, status_color = NULL,
|
||||
status_icon_name = NULL, status_synced_at = ?3
|
||||
WHERE project_id = ?1 AND iid = ?2 AND status_name IS NOT NULL",
|
||||
)?;
|
||||
for iid in all_fetched_iids {
|
||||
if !statuses.contains_key(iid) {
|
||||
let rows = clear_stmt.execute(rusqlite::params![project_id, iid, now_ms])?;
|
||||
if rows > 0 {
|
||||
cleared += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Phase 2: Apply new/updated statuses
|
||||
{
|
||||
let mut update_stmt = tx.prepare_cached(
|
||||
"UPDATE issues SET status_name = ?1, status_category = ?2, status_color = ?3,
|
||||
status_icon_name = ?4, status_synced_at = ?5
|
||||
WHERE project_id = ?6 AND iid = ?7",
|
||||
)?;
|
||||
for (iid, status) in statuses {
|
||||
let rows = update_stmt.execute(rusqlite::params![
|
||||
&status.name,
|
||||
&status.category,
|
||||
&status.color,
|
||||
&status.icon_name,
|
||||
now_ms,
|
||||
project_id,
|
||||
iid,
|
||||
])?;
|
||||
if rows > 0 {
|
||||
enriched += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tx.commit()?;
|
||||
Ok((enriched, cleared))
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn sync_discussions_sequential(
|
||||
conn: &Connection,
|
||||
@@ -1181,46 +1353,33 @@ fn store_closes_issues_refs(
|
||||
mr_local_id: i64,
|
||||
closes_issues: &[crate::gitlab::types::GitLabIssueRef],
|
||||
) -> Result<()> {
|
||||
conn.execute_batch("SAVEPOINT store_closes_refs")?;
|
||||
let inner = || -> Result<()> {
|
||||
for issue_ref in closes_issues {
|
||||
let target_local_id = resolve_issue_local_id(conn, project_id, issue_ref.iid)?;
|
||||
for issue_ref in closes_issues {
|
||||
let target_local_id = resolve_issue_local_id(conn, project_id, issue_ref.iid)?;
|
||||
|
||||
let (target_id, target_path, target_iid) = if let Some(local_id) = target_local_id {
|
||||
(Some(local_id), None, None)
|
||||
} else {
|
||||
let path = resolve_project_path(conn, issue_ref.project_id)?;
|
||||
let fallback =
|
||||
path.unwrap_or_else(|| format!("gitlab_project:{}", issue_ref.project_id));
|
||||
(None, Some(fallback), Some(issue_ref.iid))
|
||||
};
|
||||
let (target_id, target_path, target_iid) = if let Some(local_id) = target_local_id {
|
||||
(Some(local_id), None, None)
|
||||
} else {
|
||||
let path = resolve_project_path(conn, issue_ref.project_id)?;
|
||||
let fallback =
|
||||
path.unwrap_or_else(|| format!("gitlab_project:{}", issue_ref.project_id));
|
||||
(None, Some(fallback), Some(issue_ref.iid))
|
||||
};
|
||||
|
||||
let ref_ = EntityReference {
|
||||
project_id,
|
||||
source_entity_type: "merge_request",
|
||||
source_entity_id: mr_local_id,
|
||||
target_entity_type: "issue",
|
||||
target_entity_id: target_id,
|
||||
target_project_path: target_path.as_deref(),
|
||||
target_entity_iid: target_iid,
|
||||
reference_type: "closes",
|
||||
source_method: "api",
|
||||
};
|
||||
let ref_ = EntityReference {
|
||||
project_id,
|
||||
source_entity_type: "merge_request",
|
||||
source_entity_id: mr_local_id,
|
||||
target_entity_type: "issue",
|
||||
target_entity_id: target_id,
|
||||
target_project_path: target_path.as_deref(),
|
||||
target_entity_iid: target_iid,
|
||||
reference_type: "closes",
|
||||
source_method: "api",
|
||||
};
|
||||
|
||||
insert_entity_reference(conn, &ref_)?;
|
||||
}
|
||||
Ok(())
|
||||
};
|
||||
match inner() {
|
||||
Ok(()) => {
|
||||
conn.execute_batch("RELEASE store_closes_refs")?;
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = conn.execute_batch("ROLLBACK TO store_closes_refs; RELEASE store_closes_refs");
|
||||
Err(e)
|
||||
}
|
||||
insert_entity_reference(conn, &ref_)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ─── MR Diffs (file changes) ────────────────────────────────────────────────
|
||||
@@ -1469,6 +1628,15 @@ mod tests {
|
||||
assert_eq!(result.issues_skipped_discussion_sync, 0);
|
||||
assert_eq!(result.resource_events_fetched, 0);
|
||||
assert_eq!(result.resource_events_failed, 0);
|
||||
assert_eq!(result.statuses_enriched, 0);
|
||||
assert_eq!(result.statuses_cleared, 0);
|
||||
assert_eq!(result.statuses_seen, 0);
|
||||
assert_eq!(result.statuses_without_widget, 0);
|
||||
assert_eq!(result.partial_error_count, 0);
|
||||
assert!(result.first_partial_error.is_none());
|
||||
assert!(result.status_enrichment_error.is_none());
|
||||
assert!(result.status_enrichment_mode.is_empty());
|
||||
assert!(result.status_unsupported_reason.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1509,5 +1677,10 @@ mod tests {
|
||||
fetched: 8,
|
||||
failed: 2,
|
||||
};
|
||||
let _status_complete = ProgressEvent::StatusEnrichmentComplete {
|
||||
enriched: 5,
|
||||
cleared: 1,
|
||||
};
|
||||
let _status_skipped = ProgressEvent::StatusEnrichmentSkipped;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user