#[derive(Default)] pub struct IngestResult { pub resource_type: String, pub projects_synced: usize, pub issues_fetched: usize, pub issues_upserted: usize, pub issues_synced_discussions: usize, pub issues_skipped_discussion_sync: usize, pub mrs_fetched: usize, pub mrs_upserted: usize, pub mrs_synced_discussions: usize, pub mrs_skipped_discussion_sync: usize, pub assignees_linked: usize, pub reviewers_linked: usize, pub diffnotes_count: usize, pub labels_created: usize, pub discussions_fetched: usize, pub notes_upserted: usize, pub resource_events_fetched: usize, pub resource_events_failed: usize, pub mr_diffs_fetched: usize, pub mr_diffs_failed: usize, pub status_enrichment_errors: usize, pub status_enrichment_projects: Vec, pub project_summaries: Vec, } /// Per-project summary for display in stage completion sub-rows. #[derive(Debug, Default)] pub struct ProjectSummary { pub path: String, pub items_upserted: usize, pub discussions_synced: usize, pub events_fetched: usize, pub events_failed: usize, pub statuses_enriched: usize, pub statuses_seen: usize, pub status_errors: usize, pub mr_diffs_fetched: usize, pub mr_diffs_failed: usize, } /// Per-project status enrichment result, collected during ingestion. pub struct ProjectStatusEnrichment { pub path: String, pub mode: String, pub reason: Option, pub seen: usize, pub enriched: usize, pub cleared: usize, pub without_widget: usize, pub partial_errors: usize, pub first_partial_error: Option, pub error: Option, } #[derive(Debug, Default, Clone, Serialize)] pub struct DryRunPreview { pub resource_type: String, pub projects: Vec, pub sync_mode: String, } #[derive(Debug, Default, Clone, Serialize)] pub struct DryRunProjectPreview { pub path: String, pub local_id: i64, pub gitlab_id: i64, pub has_cursor: bool, pub last_synced: Option, pub existing_count: i64, } enum ProjectIngestOutcome { Issues { path: String, result: IngestProjectResult, }, Mrs { path: String, result: IngestMrProjectResult, }, } #[derive(Debug, Clone, Copy)] pub struct IngestDisplay { pub show_progress: bool, pub show_spinner: bool, pub show_text: bool, } impl IngestDisplay { pub fn interactive() -> Self { Self { show_progress: true, show_spinner: true, show_text: true, } } pub fn silent() -> Self { Self { show_progress: false, show_spinner: false, show_text: false, } } pub fn progress_only() -> Self { Self { show_progress: true, show_spinner: false, show_text: false, } } } #[allow(clippy::too_many_arguments)] pub async fn run_ingest( config: &Config, resource_type: &str, project_filter: Option<&str>, force: bool, full: bool, dry_run: bool, display: IngestDisplay, stage_bar: Option, signal: &ShutdownSignal, ) -> Result { let run_id = uuid::Uuid::new_v4().simple().to_string(); let run_id = &run_id[..8]; let span = tracing::info_span!("ingest", %run_id, %resource_type); run_ingest_inner( config, resource_type, project_filter, force, full, dry_run, display, stage_bar, signal, ) .instrument(span) .await } pub fn run_ingest_dry_run( config: &Config, resource_type: &str, project_filter: Option<&str>, full: bool, ) -> Result { if resource_type != "issues" && resource_type != "mrs" { return Err(LoreError::Other(format!( "Invalid resource type '{}'. Valid types: issues, mrs", resource_type ))); } let db_path = get_db_path(config.storage.db_path.as_deref()); let conn = create_connection(&db_path)?; let projects = get_projects_to_sync(&conn, &config.projects, project_filter)?; if projects.is_empty() { if let Some(filter) = project_filter { return Err(LoreError::Other(format!( "Project '{}' not found in configuration", filter ))); } return Err(LoreError::Other( "No projects configured. Run 'lore init' first.".to_string(), )); } let mut preview = DryRunPreview { resource_type: resource_type.to_string(), projects: Vec::new(), sync_mode: if full { "full".to_string() } else { "incremental".to_string() }, }; for (local_project_id, gitlab_project_id, path) in &projects { let cursor_exists: bool = conn .query_row( "SELECT EXISTS(SELECT 1 FROM sync_cursors WHERE project_id = ? AND resource_type = ?)", (*local_project_id, resource_type), |row| row.get(0), ) .unwrap_or(false); let last_synced: Option = conn .query_row( "SELECT updated_at FROM sync_cursors WHERE project_id = ? AND resource_type = ?", (*local_project_id, resource_type), |row| row.get(0), ) .ok(); let existing_count: i64 = if resource_type == "issues" { conn.query_row( "SELECT COUNT(*) FROM issues WHERE project_id = ?", [*local_project_id], |row| row.get(0), ) .unwrap_or(0) } else { conn.query_row( "SELECT COUNT(*) FROM merge_requests WHERE project_id = ?", [*local_project_id], |row| row.get(0), ) .unwrap_or(0) }; preview.projects.push(DryRunProjectPreview { path: path.clone(), local_id: *local_project_id, gitlab_id: *gitlab_project_id, has_cursor: cursor_exists && !full, last_synced: if full { None } else { last_synced }, existing_count, }); } Ok(preview) } #[allow(clippy::too_many_arguments)] async fn run_ingest_inner( config: &Config, resource_type: &str, project_filter: Option<&str>, force: bool, full: bool, dry_run: bool, display: IngestDisplay, stage_bar: Option, signal: &ShutdownSignal, ) -> Result { // In dry_run mode, we don't actually ingest - use run_ingest_dry_run instead // This flag is passed through for consistency but the actual dry-run logic // is handled at the caller level let _ = dry_run; if resource_type != "issues" && resource_type != "mrs" { return Err(LoreError::Other(format!( "Invalid resource type '{}'. Valid types: issues, mrs", resource_type ))); } let db_path = get_db_path(config.storage.db_path.as_deref()); let conn = create_connection(&db_path)?; let lock_conn = create_connection(&db_path)?; let mut lock = AppLock::new( lock_conn, LockOptions { name: "sync".to_string(), stale_lock_minutes: config.sync.stale_lock_minutes, heartbeat_interval_seconds: config.sync.heartbeat_interval_seconds, }, ); lock.acquire(force)?; let token = config.gitlab.resolve_token()?; let client = Arc::new(GitLabClient::new( &config.gitlab.base_url, &token, Some(config.sync.requests_per_second), )); let projects = get_projects_to_sync(&conn, &config.projects, project_filter)?; if full { if display.show_text { println!( "{}", Theme::warning().render("Full sync: resetting cursors to fetch all data...") ); } for (local_project_id, _, path) in &projects { if resource_type == "issues" { conn.execute( "UPDATE issues SET discussions_synced_for_updated_at = NULL, resource_events_synced_for_updated_at = NULL WHERE project_id = ?", [*local_project_id], )?; } else if resource_type == "mrs" { conn.execute( "UPDATE merge_requests SET discussions_synced_for_updated_at = NULL, resource_events_synced_for_updated_at = NULL WHERE project_id = ?", [*local_project_id], )?; } conn.execute( "DELETE FROM sync_cursors WHERE project_id = ? AND resource_type = ?", (*local_project_id, resource_type), )?; tracing::info!(project = %path, resource_type, "Reset sync cursor and discussion watermarks for full re-fetch"); } } if projects.is_empty() { if let Some(filter) = project_filter { return Err(LoreError::Other(format!( "Project '{}' not found in configuration", filter ))); } return Err(LoreError::Other( "No projects configured. Run 'lore init' first.".to_string(), )); } let mut total = IngestResult { resource_type: resource_type.to_string(), ..Default::default() }; let type_label = if resource_type == "issues" { "issues" } else { "merge requests" }; if display.show_text { println!( "{}", Theme::info().render(&format!("Ingesting {type_label}...")) ); println!(); } let concurrency = config.sync.primary_concurrency as usize; let resource_type_owned = resource_type.to_string(); let agg_fetched = Arc::new(AtomicUsize::new(0)); let agg_discussions = Arc::new(AtomicUsize::new(0)); let agg_disc_total = Arc::new(AtomicUsize::new(0)); let agg_events = Arc::new(AtomicUsize::new(0)); let agg_events_total = Arc::new(AtomicUsize::new(0)); let stage_bar = stage_bar.unwrap_or_else(ProgressBar::hidden); use futures::stream::{self, StreamExt}; let project_results: Vec> = stream::iter(projects.iter()) .map(|(local_project_id, gitlab_project_id, path)| { let client = Arc::clone(&client); let db_path = db_path.clone(); let config = config.clone(); let resource_type = resource_type_owned.clone(); let path = path.clone(); let local_project_id = *local_project_id; let gitlab_project_id = *gitlab_project_id; let stage_bar = stage_bar.clone(); let agg_fetched = Arc::clone(&agg_fetched); let agg_discussions = Arc::clone(&agg_discussions); let agg_disc_total = Arc::clone(&agg_disc_total); let agg_events = Arc::clone(&agg_events); let agg_events_total = Arc::clone(&agg_events_total); let signal = signal.clone(); async move { let proj_conn = create_connection(&db_path)?; let multi = crate::cli::progress::multi(); let spinner = if !display.show_spinner { ProgressBar::hidden() } else { let s = multi.add(ProgressBar::new_spinner()); s.set_style( ProgressStyle::default_spinner() .template("{spinner:.cyan} {msg}") .unwrap(), ); s.set_message(format!("Fetching {type_label} from {path}...")); s.enable_steady_tick(std::time::Duration::from_millis(60)); s }; let disc_bar = if !display.show_progress { ProgressBar::hidden() } else { let b = multi.add(ProgressBar::new(0)); b.set_style( ProgressStyle::default_bar() .template( " {spinner:.dim} {prefix:.cyan} Syncing discussions [{bar:30.cyan/dark_gray}] {pos}/{len} {per_sec:.dim} {eta:.dim}", ) .unwrap() .progress_chars(crate::cli::render::Icons::progress_chars()), ); b.set_prefix(path.clone()); b.enable_steady_tick(std::time::Duration::from_millis(60)); b }; let spinner_clone = spinner.clone(); let disc_bar_clone = disc_bar.clone(); let stage_bar_clone = stage_bar.clone(); let agg_fetched_clone = Arc::clone(&agg_fetched); let agg_discussions_clone = Arc::clone(&agg_discussions); let agg_disc_total_clone = Arc::clone(&agg_disc_total); let agg_events_clone = Arc::clone(&agg_events); let agg_events_total_clone = Arc::clone(&agg_events_total); let path_for_cb = path.clone(); let progress_callback: crate::ingestion::ProgressCallback = if !display.show_progress { Box::new(|_| {}) } else { Box::new(move |event: ProgressEvent| match event { ProgressEvent::IssuesFetchStarted | ProgressEvent::MrsFetchStarted => { } ProgressEvent::IssuesFetchComplete { total } | ProgressEvent::MrsFetchComplete { total } => { let agg = agg_fetched_clone.fetch_add(total, Ordering::Relaxed) + total; spinner_clone.set_message(format!( "{path_for_cb}: {total} {type_label} fetched" )); stage_bar_clone.set_message(format!( "Fetching {type_label}... ({agg} fetched across projects)" )); } ProgressEvent::IssueFetched { count } | ProgressEvent::MrFetched { count } => { spinner_clone.set_message(format!( "{path_for_cb}: {count} fetched so far..." )); } ProgressEvent::DiscussionSyncStarted { total } => { spinner_clone.finish_and_clear(); let agg_total = agg_disc_total_clone.fetch_add(total, Ordering::Relaxed) + total; disc_bar_clone.set_length(total as u64); disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(60)); stage_bar_clone.set_message(format!( "Syncing discussions... (0/{agg_total})" )); } ProgressEvent::DiscussionSynced { current, total: _ } => { disc_bar_clone.set_position(current as u64); let agg = agg_discussions_clone.fetch_add(1, Ordering::Relaxed) + 1; let agg_total = agg_disc_total_clone.load(Ordering::Relaxed); stage_bar_clone.set_message(format!( "Syncing discussions... ({agg}/{agg_total})" )); } ProgressEvent::DiscussionSyncComplete => { disc_bar_clone.finish_and_clear(); } ProgressEvent::MrDiscussionSyncStarted { total } => { spinner_clone.finish_and_clear(); let agg_total = agg_disc_total_clone.fetch_add(total, Ordering::Relaxed) + total; disc_bar_clone.set_length(total as u64); disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(60)); stage_bar_clone.set_message(format!( "Syncing discussions... (0/{agg_total})" )); } ProgressEvent::MrDiscussionSynced { current, total: _ } => { disc_bar_clone.set_position(current as u64); let agg = agg_discussions_clone.fetch_add(1, Ordering::Relaxed) + 1; let agg_total = agg_disc_total_clone.load(Ordering::Relaxed); stage_bar_clone.set_message(format!( "Syncing discussions... ({agg}/{agg_total})" )); } ProgressEvent::MrDiscussionSyncComplete => { disc_bar_clone.finish_and_clear(); } ProgressEvent::ResourceEventsFetchStarted { total } => { disc_bar_clone.reset(); disc_bar_clone.set_length(total as u64); disc_bar_clone.set_style( ProgressStyle::default_bar() .template(" {spinner:.dim} {prefix:.cyan} Fetching resource events [{bar:30.cyan/dark_gray}] {pos}/{len} {per_sec:.dim} {eta:.dim}") .unwrap() .progress_chars(crate::cli::render::Icons::progress_chars()), ); disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(60)); agg_events_total_clone.fetch_add(total, Ordering::Relaxed); stage_bar_clone.set_message( "Fetching resource events...".to_string() ); } ProgressEvent::ResourceEventFetched { current, total: _ } => { disc_bar_clone.set_position(current as u64); let agg = agg_events_clone.fetch_add(1, Ordering::Relaxed) + 1; let agg_total = agg_events_total_clone.load(Ordering::Relaxed); stage_bar_clone.set_message(format!( "Fetching resource events... ({agg}/{agg_total})" )); } ProgressEvent::ResourceEventsFetchComplete { .. } => { disc_bar_clone.finish_and_clear(); } ProgressEvent::ClosesIssuesFetchStarted { total } => { disc_bar_clone.reset(); disc_bar_clone.set_length(total as u64); disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(60)); stage_bar_clone.set_message( "Fetching closes-issues references...".to_string() ); } ProgressEvent::ClosesIssueFetched { current, total: _ } => { disc_bar_clone.set_position(current as u64); } ProgressEvent::ClosesIssuesFetchComplete { .. } => { disc_bar_clone.finish_and_clear(); } ProgressEvent::MrDiffsFetchStarted { total } => { disc_bar_clone.reset(); disc_bar_clone.set_length(total as u64); disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(60)); stage_bar_clone.set_message( "Fetching MR file changes...".to_string() ); } ProgressEvent::MrDiffFetched { current, total: _ } => { disc_bar_clone.set_position(current as u64); } ProgressEvent::MrDiffsFetchComplete { .. } => { disc_bar_clone.finish_and_clear(); } ProgressEvent::StatusEnrichmentStarted { total } => { spinner_clone.finish_and_clear(); disc_bar_clone.reset(); disc_bar_clone.set_length(total as u64); disc_bar_clone.set_style( ProgressStyle::default_bar() .template(" {spinner:.dim} {prefix:.cyan} Statuses [{bar:30.cyan/dark_gray}] {pos}/{len} {per_sec:.dim} {eta:.dim}") .unwrap() .progress_chars(crate::cli::render::Icons::progress_chars()), ); disc_bar_clone.set_prefix(path_for_cb.clone()); disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(60)); stage_bar_clone.set_message( "Enriching work item statuses...".to_string() ); } ProgressEvent::StatusEnrichmentPageFetched { items_so_far } => { disc_bar_clone.set_position(items_so_far as u64); stage_bar_clone.set_message(format!( "Enriching work item statuses... ({items_so_far} fetched)" )); } ProgressEvent::StatusEnrichmentWriting { total } => { disc_bar_clone.set_message(format!("Writing {total} statuses...")); stage_bar_clone.set_message(format!( "Writing {total} work item statuses..." )); } ProgressEvent::StatusEnrichmentComplete { enriched, cleared } => { disc_bar_clone.finish_and_clear(); if enriched > 0 || cleared > 0 { stage_bar_clone.set_message(format!( "Status enrichment: {enriched} enriched, {cleared} cleared" )); } } ProgressEvent::StatusEnrichmentSkipped => {} }) }; let outcome = if resource_type == "issues" { let result = ingest_project_issues_with_progress( &proj_conn, &client, &config, local_project_id, gitlab_project_id, Some(progress_callback), &signal, ) .await?; spinner.finish_and_clear(); disc_bar.finish_and_clear(); ProjectIngestOutcome::Issues { path, result } } else { let result = ingest_project_merge_requests_with_progress( &proj_conn, &client, &config, local_project_id, gitlab_project_id, full, Some(progress_callback), &signal, ) .await?; spinner.finish_and_clear(); disc_bar.finish_and_clear(); ProjectIngestOutcome::Mrs { path, result } }; Ok(outcome) } }) .buffer_unordered(concurrency) .collect() .await; let mut first_error: Option = None; for project_result in project_results { match project_result { Err(e) => { if first_error.is_none() { first_error = Some(e); } } Ok(ProjectIngestOutcome::Issues { ref path, ref result, }) => { if display.show_text { print_issue_project_summary(path, result); } total.projects_synced += 1; total.issues_fetched += result.issues_fetched; total.issues_upserted += result.issues_upserted; total.labels_created += result.labels_created; total.discussions_fetched += result.discussions_fetched; total.notes_upserted += result.notes_upserted; total.issues_synced_discussions += result.issues_synced_discussions; total.issues_skipped_discussion_sync += result.issues_skipped_discussion_sync; total.resource_events_fetched += result.resource_events_fetched; total.resource_events_failed += result.resource_events_failed; if result.status_enrichment_error.is_some() { total.status_enrichment_errors += 1; } total .status_enrichment_projects .push(ProjectStatusEnrichment { path: path.clone(), mode: result.status_enrichment_mode.clone(), reason: result.status_unsupported_reason.clone(), seen: result.statuses_seen, enriched: result.statuses_enriched, cleared: result.statuses_cleared, without_widget: result.statuses_without_widget, partial_errors: result.partial_error_count, first_partial_error: result.first_partial_error.clone(), error: result.status_enrichment_error.clone(), }); total.project_summaries.push(ProjectSummary { path: path.clone(), items_upserted: result.issues_upserted, discussions_synced: result.discussions_fetched, events_fetched: result.resource_events_fetched, events_failed: result.resource_events_failed, statuses_enriched: result.statuses_enriched, statuses_seen: result.statuses_seen, status_errors: result.partial_error_count + usize::from(result.status_enrichment_error.is_some()), mr_diffs_fetched: 0, mr_diffs_failed: 0, }); } Ok(ProjectIngestOutcome::Mrs { ref path, ref result, }) => { if display.show_text { print_mr_project_summary(path, result); } total.projects_synced += 1; total.mrs_fetched += result.mrs_fetched; total.mrs_upserted += result.mrs_upserted; total.labels_created += result.labels_created; total.assignees_linked += result.assignees_linked; total.reviewers_linked += result.reviewers_linked; total.discussions_fetched += result.discussions_fetched; total.notes_upserted += result.notes_upserted; total.diffnotes_count += result.diffnotes_count; total.mrs_synced_discussions += result.mrs_synced_discussions; total.mrs_skipped_discussion_sync += result.mrs_skipped_discussion_sync; total.resource_events_fetched += result.resource_events_fetched; total.resource_events_failed += result.resource_events_failed; total.mr_diffs_fetched += result.mr_diffs_fetched; total.mr_diffs_failed += result.mr_diffs_failed; total.project_summaries.push(ProjectSummary { path: path.clone(), items_upserted: result.mrs_upserted, discussions_synced: result.discussions_fetched, events_fetched: result.resource_events_fetched, events_failed: result.resource_events_failed, statuses_enriched: 0, statuses_seen: 0, status_errors: 0, mr_diffs_fetched: result.mr_diffs_fetched, mr_diffs_failed: result.mr_diffs_failed, }); } } } if let Some(e) = first_error { return Err(e); } Ok(total) } fn get_projects_to_sync( conn: &Connection, configured_projects: &[crate::core::config::ProjectConfig], filter: Option<&str>, ) -> Result> { if let Some(filter_str) = filter { let project_id = resolve_project(conn, filter_str)?; let row: Option<(i64, String)> = conn .query_row( "SELECT gitlab_project_id, path_with_namespace FROM projects WHERE id = ?1", [project_id], |row| Ok((row.get(0)?, row.get(1)?)), ) .ok(); if let Some((gitlab_id, path)) = row { if configured_projects.iter().any(|p| p.path == path) { return Ok(vec![(project_id, gitlab_id, path)]); } return Err(LoreError::Other(format!( "Project '{}' exists in database but is not in configuration", path ))); } return Err(LoreError::Other(format!( "Project '{}' not found in database", filter_str ))); } let mut projects = Vec::new(); for project_config in configured_projects { let result: Option<(i64, i64)> = conn .query_row( "SELECT id, gitlab_project_id FROM projects WHERE path_with_namespace = ?", [&project_config.path], |row| Ok((row.get(0)?, row.get(1)?)), ) .ok(); if let Some((local_id, gitlab_id)) = result { projects.push((local_id, gitlab_id, project_config.path.clone())); } } Ok(projects) }