refactor(structure): reorganize codebase into domain-focused modules
This commit is contained in:
761
src/cli/commands/ingest/run.rs
Normal file
761
src/cli/commands/ingest/run.rs
Normal file
@@ -0,0 +1,761 @@
|
||||
#[derive(Default)]
|
||||
pub struct IngestResult {
|
||||
pub resource_type: String,
|
||||
pub projects_synced: usize,
|
||||
pub issues_fetched: usize,
|
||||
pub issues_upserted: usize,
|
||||
pub issues_synced_discussions: usize,
|
||||
pub issues_skipped_discussion_sync: usize,
|
||||
pub mrs_fetched: usize,
|
||||
pub mrs_upserted: usize,
|
||||
pub mrs_synced_discussions: usize,
|
||||
pub mrs_skipped_discussion_sync: usize,
|
||||
pub assignees_linked: usize,
|
||||
pub reviewers_linked: usize,
|
||||
pub diffnotes_count: usize,
|
||||
pub labels_created: usize,
|
||||
pub discussions_fetched: usize,
|
||||
pub notes_upserted: usize,
|
||||
pub resource_events_fetched: usize,
|
||||
pub resource_events_failed: usize,
|
||||
pub mr_diffs_fetched: usize,
|
||||
pub mr_diffs_failed: usize,
|
||||
pub status_enrichment_errors: usize,
|
||||
pub status_enrichment_projects: Vec<ProjectStatusEnrichment>,
|
||||
pub project_summaries: Vec<ProjectSummary>,
|
||||
}
|
||||
|
||||
/// Per-project summary for display in stage completion sub-rows.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct ProjectSummary {
|
||||
pub path: String,
|
||||
pub items_upserted: usize,
|
||||
pub discussions_synced: usize,
|
||||
pub events_fetched: usize,
|
||||
pub events_failed: usize,
|
||||
pub statuses_enriched: usize,
|
||||
pub statuses_seen: usize,
|
||||
pub status_errors: usize,
|
||||
pub mr_diffs_fetched: usize,
|
||||
pub mr_diffs_failed: usize,
|
||||
}
|
||||
|
||||
/// Per-project status enrichment result, collected during ingestion.
|
||||
pub struct ProjectStatusEnrichment {
|
||||
pub path: String,
|
||||
pub mode: String,
|
||||
pub reason: Option<String>,
|
||||
pub seen: usize,
|
||||
pub enriched: usize,
|
||||
pub cleared: usize,
|
||||
pub without_widget: usize,
|
||||
pub partial_errors: usize,
|
||||
pub first_partial_error: Option<String>,
|
||||
pub error: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone, Serialize)]
|
||||
pub struct DryRunPreview {
|
||||
pub resource_type: String,
|
||||
pub projects: Vec<DryRunProjectPreview>,
|
||||
pub sync_mode: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone, Serialize)]
|
||||
pub struct DryRunProjectPreview {
|
||||
pub path: String,
|
||||
pub local_id: i64,
|
||||
pub gitlab_id: i64,
|
||||
pub has_cursor: bool,
|
||||
pub last_synced: Option<String>,
|
||||
pub existing_count: i64,
|
||||
}
|
||||
|
||||
enum ProjectIngestOutcome {
|
||||
Issues {
|
||||
path: String,
|
||||
result: IngestProjectResult,
|
||||
},
|
||||
Mrs {
|
||||
path: String,
|
||||
result: IngestMrProjectResult,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct IngestDisplay {
|
||||
pub show_progress: bool,
|
||||
pub show_spinner: bool,
|
||||
pub show_text: bool,
|
||||
}
|
||||
|
||||
impl IngestDisplay {
|
||||
pub fn interactive() -> Self {
|
||||
Self {
|
||||
show_progress: true,
|
||||
show_spinner: true,
|
||||
show_text: true,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn silent() -> Self {
|
||||
Self {
|
||||
show_progress: false,
|
||||
show_spinner: false,
|
||||
show_text: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn progress_only() -> Self {
|
||||
Self {
|
||||
show_progress: true,
|
||||
show_spinner: false,
|
||||
show_text: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn run_ingest(
|
||||
config: &Config,
|
||||
resource_type: &str,
|
||||
project_filter: Option<&str>,
|
||||
force: bool,
|
||||
full: bool,
|
||||
dry_run: bool,
|
||||
display: IngestDisplay,
|
||||
stage_bar: Option<ProgressBar>,
|
||||
signal: &ShutdownSignal,
|
||||
) -> Result<IngestResult> {
|
||||
let run_id = uuid::Uuid::new_v4().simple().to_string();
|
||||
let run_id = &run_id[..8];
|
||||
let span = tracing::info_span!("ingest", %run_id, %resource_type);
|
||||
|
||||
run_ingest_inner(
|
||||
config,
|
||||
resource_type,
|
||||
project_filter,
|
||||
force,
|
||||
full,
|
||||
dry_run,
|
||||
display,
|
||||
stage_bar,
|
||||
signal,
|
||||
)
|
||||
.instrument(span)
|
||||
.await
|
||||
}
|
||||
|
||||
pub fn run_ingest_dry_run(
|
||||
config: &Config,
|
||||
resource_type: &str,
|
||||
project_filter: Option<&str>,
|
||||
full: bool,
|
||||
) -> Result<DryRunPreview> {
|
||||
if resource_type != "issues" && resource_type != "mrs" {
|
||||
return Err(LoreError::Other(format!(
|
||||
"Invalid resource type '{}'. Valid types: issues, mrs",
|
||||
resource_type
|
||||
)));
|
||||
}
|
||||
|
||||
let db_path = get_db_path(config.storage.db_path.as_deref());
|
||||
let conn = create_connection(&db_path)?;
|
||||
|
||||
let projects = get_projects_to_sync(&conn, &config.projects, project_filter)?;
|
||||
|
||||
if projects.is_empty() {
|
||||
if let Some(filter) = project_filter {
|
||||
return Err(LoreError::Other(format!(
|
||||
"Project '{}' not found in configuration",
|
||||
filter
|
||||
)));
|
||||
}
|
||||
return Err(LoreError::Other(
|
||||
"No projects configured. Run 'lore init' first.".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
let mut preview = DryRunPreview {
|
||||
resource_type: resource_type.to_string(),
|
||||
projects: Vec::new(),
|
||||
sync_mode: if full {
|
||||
"full".to_string()
|
||||
} else {
|
||||
"incremental".to_string()
|
||||
},
|
||||
};
|
||||
|
||||
for (local_project_id, gitlab_project_id, path) in &projects {
|
||||
let cursor_exists: bool = conn
|
||||
.query_row(
|
||||
"SELECT EXISTS(SELECT 1 FROM sync_cursors WHERE project_id = ? AND resource_type = ?)",
|
||||
(*local_project_id, resource_type),
|
||||
|row| row.get(0),
|
||||
)
|
||||
.unwrap_or(false);
|
||||
|
||||
let last_synced: Option<String> = conn
|
||||
.query_row(
|
||||
"SELECT updated_at FROM sync_cursors WHERE project_id = ? AND resource_type = ?",
|
||||
(*local_project_id, resource_type),
|
||||
|row| row.get(0),
|
||||
)
|
||||
.ok();
|
||||
|
||||
let existing_count: i64 = if resource_type == "issues" {
|
||||
conn.query_row(
|
||||
"SELECT COUNT(*) FROM issues WHERE project_id = ?",
|
||||
[*local_project_id],
|
||||
|row| row.get(0),
|
||||
)
|
||||
.unwrap_or(0)
|
||||
} else {
|
||||
conn.query_row(
|
||||
"SELECT COUNT(*) FROM merge_requests WHERE project_id = ?",
|
||||
[*local_project_id],
|
||||
|row| row.get(0),
|
||||
)
|
||||
.unwrap_or(0)
|
||||
};
|
||||
|
||||
preview.projects.push(DryRunProjectPreview {
|
||||
path: path.clone(),
|
||||
local_id: *local_project_id,
|
||||
gitlab_id: *gitlab_project_id,
|
||||
has_cursor: cursor_exists && !full,
|
||||
last_synced: if full { None } else { last_synced },
|
||||
existing_count,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(preview)
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn run_ingest_inner(
|
||||
config: &Config,
|
||||
resource_type: &str,
|
||||
project_filter: Option<&str>,
|
||||
force: bool,
|
||||
full: bool,
|
||||
dry_run: bool,
|
||||
display: IngestDisplay,
|
||||
stage_bar: Option<ProgressBar>,
|
||||
signal: &ShutdownSignal,
|
||||
) -> Result<IngestResult> {
|
||||
// In dry_run mode, we don't actually ingest - use run_ingest_dry_run instead
|
||||
// This flag is passed through for consistency but the actual dry-run logic
|
||||
// is handled at the caller level
|
||||
let _ = dry_run;
|
||||
if resource_type != "issues" && resource_type != "mrs" {
|
||||
return Err(LoreError::Other(format!(
|
||||
"Invalid resource type '{}'. Valid types: issues, mrs",
|
||||
resource_type
|
||||
)));
|
||||
}
|
||||
|
||||
let db_path = get_db_path(config.storage.db_path.as_deref());
|
||||
let conn = create_connection(&db_path)?;
|
||||
|
||||
let lock_conn = create_connection(&db_path)?;
|
||||
let mut lock = AppLock::new(
|
||||
lock_conn,
|
||||
LockOptions {
|
||||
name: "sync".to_string(),
|
||||
stale_lock_minutes: config.sync.stale_lock_minutes,
|
||||
heartbeat_interval_seconds: config.sync.heartbeat_interval_seconds,
|
||||
},
|
||||
);
|
||||
lock.acquire(force)?;
|
||||
|
||||
let token = config.gitlab.resolve_token()?;
|
||||
|
||||
let client = GitLabClient::new(
|
||||
&config.gitlab.base_url,
|
||||
&token,
|
||||
Some(config.sync.requests_per_second),
|
||||
);
|
||||
|
||||
let projects = get_projects_to_sync(&conn, &config.projects, project_filter)?;
|
||||
|
||||
if full {
|
||||
if display.show_text {
|
||||
println!(
|
||||
"{}",
|
||||
Theme::warning().render("Full sync: resetting cursors to fetch all data...")
|
||||
);
|
||||
}
|
||||
for (local_project_id, _, path) in &projects {
|
||||
if resource_type == "issues" {
|
||||
conn.execute(
|
||||
"UPDATE issues SET discussions_synced_for_updated_at = NULL, resource_events_synced_for_updated_at = NULL WHERE project_id = ?",
|
||||
[*local_project_id],
|
||||
)?;
|
||||
} else if resource_type == "mrs" {
|
||||
conn.execute(
|
||||
"UPDATE merge_requests SET discussions_synced_for_updated_at = NULL, resource_events_synced_for_updated_at = NULL WHERE project_id = ?",
|
||||
[*local_project_id],
|
||||
)?;
|
||||
}
|
||||
|
||||
conn.execute(
|
||||
"DELETE FROM sync_cursors WHERE project_id = ? AND resource_type = ?",
|
||||
(*local_project_id, resource_type),
|
||||
)?;
|
||||
|
||||
tracing::info!(project = %path, resource_type, "Reset sync cursor and discussion watermarks for full re-fetch");
|
||||
}
|
||||
}
|
||||
|
||||
if projects.is_empty() {
|
||||
if let Some(filter) = project_filter {
|
||||
return Err(LoreError::Other(format!(
|
||||
"Project '{}' not found in configuration",
|
||||
filter
|
||||
)));
|
||||
}
|
||||
return Err(LoreError::Other(
|
||||
"No projects configured. Run 'lore init' first.".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
let mut total = IngestResult {
|
||||
resource_type: resource_type.to_string(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let type_label = if resource_type == "issues" {
|
||||
"issues"
|
||||
} else {
|
||||
"merge requests"
|
||||
};
|
||||
if display.show_text {
|
||||
println!(
|
||||
"{}",
|
||||
Theme::info().render(&format!("Ingesting {type_label}..."))
|
||||
);
|
||||
println!();
|
||||
}
|
||||
|
||||
let concurrency = config.sync.primary_concurrency as usize;
|
||||
let resource_type_owned = resource_type.to_string();
|
||||
|
||||
let agg_fetched = Arc::new(AtomicUsize::new(0));
|
||||
let agg_discussions = Arc::new(AtomicUsize::new(0));
|
||||
let agg_disc_total = Arc::new(AtomicUsize::new(0));
|
||||
let agg_events = Arc::new(AtomicUsize::new(0));
|
||||
let agg_events_total = Arc::new(AtomicUsize::new(0));
|
||||
let stage_bar = stage_bar.unwrap_or_else(ProgressBar::hidden);
|
||||
|
||||
use futures::stream::{self, StreamExt};
|
||||
|
||||
let project_results: Vec<Result<ProjectIngestOutcome>> = stream::iter(projects.iter())
|
||||
.map(|(local_project_id, gitlab_project_id, path)| {
|
||||
let client = client.clone();
|
||||
let db_path = db_path.clone();
|
||||
let config = config.clone();
|
||||
let resource_type = resource_type_owned.clone();
|
||||
let path = path.clone();
|
||||
let local_project_id = *local_project_id;
|
||||
let gitlab_project_id = *gitlab_project_id;
|
||||
let stage_bar = stage_bar.clone();
|
||||
let agg_fetched = Arc::clone(&agg_fetched);
|
||||
let agg_discussions = Arc::clone(&agg_discussions);
|
||||
let agg_disc_total = Arc::clone(&agg_disc_total);
|
||||
let agg_events = Arc::clone(&agg_events);
|
||||
let agg_events_total = Arc::clone(&agg_events_total);
|
||||
let signal = signal.clone();
|
||||
|
||||
async move {
|
||||
let proj_conn = create_connection(&db_path)?;
|
||||
|
||||
let multi = crate::cli::progress::multi();
|
||||
|
||||
let spinner = if !display.show_spinner {
|
||||
ProgressBar::hidden()
|
||||
} else {
|
||||
let s = multi.add(ProgressBar::new_spinner());
|
||||
s.set_style(
|
||||
ProgressStyle::default_spinner()
|
||||
.template("{spinner:.cyan} {msg}")
|
||||
.unwrap(),
|
||||
);
|
||||
s.set_message(format!("Fetching {type_label} from {path}..."));
|
||||
s.enable_steady_tick(std::time::Duration::from_millis(60));
|
||||
s
|
||||
};
|
||||
|
||||
let disc_bar = if !display.show_progress {
|
||||
ProgressBar::hidden()
|
||||
} else {
|
||||
let b = multi.add(ProgressBar::new(0));
|
||||
b.set_style(
|
||||
ProgressStyle::default_bar()
|
||||
.template(
|
||||
" {spinner:.dim} {prefix:.cyan} Syncing discussions [{bar:30.cyan/dark_gray}] {pos}/{len} {per_sec:.dim} {eta:.dim}",
|
||||
)
|
||||
.unwrap()
|
||||
.progress_chars(crate::cli::render::Icons::progress_chars()),
|
||||
);
|
||||
b.set_prefix(path.clone());
|
||||
b.enable_steady_tick(std::time::Duration::from_millis(60));
|
||||
b
|
||||
};
|
||||
|
||||
let spinner_clone = spinner.clone();
|
||||
let disc_bar_clone = disc_bar.clone();
|
||||
let stage_bar_clone = stage_bar.clone();
|
||||
let agg_fetched_clone = Arc::clone(&agg_fetched);
|
||||
let agg_discussions_clone = Arc::clone(&agg_discussions);
|
||||
let agg_disc_total_clone = Arc::clone(&agg_disc_total);
|
||||
let agg_events_clone = Arc::clone(&agg_events);
|
||||
let agg_events_total_clone = Arc::clone(&agg_events_total);
|
||||
let path_for_cb = path.clone();
|
||||
let progress_callback: crate::ingestion::ProgressCallback = if !display.show_progress {
|
||||
Box::new(|_| {})
|
||||
} else {
|
||||
Box::new(move |event: ProgressEvent| match event {
|
||||
ProgressEvent::IssuesFetchStarted | ProgressEvent::MrsFetchStarted => {
|
||||
}
|
||||
ProgressEvent::IssuesFetchComplete { total } | ProgressEvent::MrsFetchComplete { total } => {
|
||||
let agg = agg_fetched_clone.fetch_add(total, Ordering::Relaxed) + total;
|
||||
spinner_clone.set_message(format!(
|
||||
"{path_for_cb}: {total} {type_label} fetched"
|
||||
));
|
||||
stage_bar_clone.set_message(format!(
|
||||
"Fetching {type_label}... ({agg} fetched across projects)"
|
||||
));
|
||||
}
|
||||
ProgressEvent::IssueFetched { count } | ProgressEvent::MrFetched { count } => {
|
||||
spinner_clone.set_message(format!(
|
||||
"{path_for_cb}: {count} fetched so far..."
|
||||
));
|
||||
}
|
||||
ProgressEvent::DiscussionSyncStarted { total } => {
|
||||
spinner_clone.finish_and_clear();
|
||||
let agg_total = agg_disc_total_clone.fetch_add(total, Ordering::Relaxed) + total;
|
||||
disc_bar_clone.set_length(total as u64);
|
||||
disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(60));
|
||||
stage_bar_clone.set_message(format!(
|
||||
"Syncing discussions... (0/{agg_total})"
|
||||
));
|
||||
}
|
||||
ProgressEvent::DiscussionSynced { current, total: _ } => {
|
||||
disc_bar_clone.set_position(current as u64);
|
||||
let agg = agg_discussions_clone.fetch_add(1, Ordering::Relaxed) + 1;
|
||||
let agg_total = agg_disc_total_clone.load(Ordering::Relaxed);
|
||||
stage_bar_clone.set_message(format!(
|
||||
"Syncing discussions... ({agg}/{agg_total})"
|
||||
));
|
||||
}
|
||||
ProgressEvent::DiscussionSyncComplete => {
|
||||
disc_bar_clone.finish_and_clear();
|
||||
}
|
||||
ProgressEvent::MrDiscussionSyncStarted { total } => {
|
||||
spinner_clone.finish_and_clear();
|
||||
let agg_total = agg_disc_total_clone.fetch_add(total, Ordering::Relaxed) + total;
|
||||
disc_bar_clone.set_length(total as u64);
|
||||
disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(60));
|
||||
stage_bar_clone.set_message(format!(
|
||||
"Syncing discussions... (0/{agg_total})"
|
||||
));
|
||||
}
|
||||
ProgressEvent::MrDiscussionSynced { current, total: _ } => {
|
||||
disc_bar_clone.set_position(current as u64);
|
||||
let agg = agg_discussions_clone.fetch_add(1, Ordering::Relaxed) + 1;
|
||||
let agg_total = agg_disc_total_clone.load(Ordering::Relaxed);
|
||||
stage_bar_clone.set_message(format!(
|
||||
"Syncing discussions... ({agg}/{agg_total})"
|
||||
));
|
||||
}
|
||||
ProgressEvent::MrDiscussionSyncComplete => {
|
||||
disc_bar_clone.finish_and_clear();
|
||||
}
|
||||
ProgressEvent::ResourceEventsFetchStarted { total } => {
|
||||
disc_bar_clone.reset();
|
||||
disc_bar_clone.set_length(total as u64);
|
||||
disc_bar_clone.set_style(
|
||||
ProgressStyle::default_bar()
|
||||
.template(" {spinner:.dim} {prefix:.cyan} Fetching resource events [{bar:30.cyan/dark_gray}] {pos}/{len} {per_sec:.dim} {eta:.dim}")
|
||||
.unwrap()
|
||||
.progress_chars(crate::cli::render::Icons::progress_chars()),
|
||||
);
|
||||
disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(60));
|
||||
agg_events_total_clone.fetch_add(total, Ordering::Relaxed);
|
||||
stage_bar_clone.set_message(
|
||||
"Fetching resource events...".to_string()
|
||||
);
|
||||
}
|
||||
ProgressEvent::ResourceEventFetched { current, total: _ } => {
|
||||
disc_bar_clone.set_position(current as u64);
|
||||
let agg = agg_events_clone.fetch_add(1, Ordering::Relaxed) + 1;
|
||||
let agg_total = agg_events_total_clone.load(Ordering::Relaxed);
|
||||
stage_bar_clone.set_message(format!(
|
||||
"Fetching resource events... ({agg}/{agg_total})"
|
||||
));
|
||||
}
|
||||
ProgressEvent::ResourceEventsFetchComplete { .. } => {
|
||||
disc_bar_clone.finish_and_clear();
|
||||
}
|
||||
ProgressEvent::ClosesIssuesFetchStarted { total } => {
|
||||
disc_bar_clone.reset();
|
||||
disc_bar_clone.set_length(total as u64);
|
||||
disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(60));
|
||||
stage_bar_clone.set_message(
|
||||
"Fetching closes-issues references...".to_string()
|
||||
);
|
||||
}
|
||||
ProgressEvent::ClosesIssueFetched { current, total: _ } => {
|
||||
disc_bar_clone.set_position(current as u64);
|
||||
}
|
||||
ProgressEvent::ClosesIssuesFetchComplete { .. } => {
|
||||
disc_bar_clone.finish_and_clear();
|
||||
}
|
||||
ProgressEvent::MrDiffsFetchStarted { total } => {
|
||||
disc_bar_clone.reset();
|
||||
disc_bar_clone.set_length(total as u64);
|
||||
disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(60));
|
||||
stage_bar_clone.set_message(
|
||||
"Fetching MR file changes...".to_string()
|
||||
);
|
||||
}
|
||||
ProgressEvent::MrDiffFetched { current, total: _ } => {
|
||||
disc_bar_clone.set_position(current as u64);
|
||||
}
|
||||
ProgressEvent::MrDiffsFetchComplete { .. } => {
|
||||
disc_bar_clone.finish_and_clear();
|
||||
}
|
||||
ProgressEvent::StatusEnrichmentStarted { total } => {
|
||||
spinner_clone.finish_and_clear();
|
||||
disc_bar_clone.reset();
|
||||
disc_bar_clone.set_length(total as u64);
|
||||
disc_bar_clone.set_style(
|
||||
ProgressStyle::default_bar()
|
||||
.template(" {spinner:.dim} {prefix:.cyan} Statuses [{bar:30.cyan/dark_gray}] {pos}/{len} {per_sec:.dim} {eta:.dim}")
|
||||
.unwrap()
|
||||
.progress_chars(crate::cli::render::Icons::progress_chars()),
|
||||
);
|
||||
disc_bar_clone.set_prefix(path_for_cb.clone());
|
||||
disc_bar_clone.enable_steady_tick(std::time::Duration::from_millis(60));
|
||||
stage_bar_clone.set_message(
|
||||
"Enriching work item statuses...".to_string()
|
||||
);
|
||||
}
|
||||
ProgressEvent::StatusEnrichmentPageFetched { items_so_far } => {
|
||||
disc_bar_clone.set_position(items_so_far as u64);
|
||||
stage_bar_clone.set_message(format!(
|
||||
"Enriching work item statuses... ({items_so_far} fetched)"
|
||||
));
|
||||
}
|
||||
ProgressEvent::StatusEnrichmentWriting { total } => {
|
||||
disc_bar_clone.set_message(format!("Writing {total} statuses..."));
|
||||
stage_bar_clone.set_message(format!(
|
||||
"Writing {total} work item statuses..."
|
||||
));
|
||||
}
|
||||
ProgressEvent::StatusEnrichmentComplete { enriched, cleared } => {
|
||||
disc_bar_clone.finish_and_clear();
|
||||
if enriched > 0 || cleared > 0 {
|
||||
stage_bar_clone.set_message(format!(
|
||||
"Status enrichment: {enriched} enriched, {cleared} cleared"
|
||||
));
|
||||
}
|
||||
}
|
||||
ProgressEvent::StatusEnrichmentSkipped => {}
|
||||
})
|
||||
};
|
||||
|
||||
let outcome = if resource_type == "issues" {
|
||||
let result = ingest_project_issues_with_progress(
|
||||
&proj_conn,
|
||||
&client,
|
||||
&config,
|
||||
local_project_id,
|
||||
gitlab_project_id,
|
||||
Some(progress_callback),
|
||||
&signal,
|
||||
)
|
||||
.await?;
|
||||
|
||||
spinner.finish_and_clear();
|
||||
disc_bar.finish_and_clear();
|
||||
|
||||
ProjectIngestOutcome::Issues { path, result }
|
||||
} else {
|
||||
let result = ingest_project_merge_requests_with_progress(
|
||||
&proj_conn,
|
||||
&client,
|
||||
&config,
|
||||
local_project_id,
|
||||
gitlab_project_id,
|
||||
full,
|
||||
Some(progress_callback),
|
||||
&signal,
|
||||
)
|
||||
.await?;
|
||||
|
||||
spinner.finish_and_clear();
|
||||
disc_bar.finish_and_clear();
|
||||
|
||||
ProjectIngestOutcome::Mrs { path, result }
|
||||
};
|
||||
|
||||
Ok(outcome)
|
||||
}
|
||||
})
|
||||
.buffer_unordered(concurrency)
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
let mut first_error: Option<LoreError> = None;
|
||||
for project_result in project_results {
|
||||
match project_result {
|
||||
Err(e) => {
|
||||
if first_error.is_none() {
|
||||
first_error = Some(e);
|
||||
}
|
||||
}
|
||||
Ok(ProjectIngestOutcome::Issues {
|
||||
ref path,
|
||||
ref result,
|
||||
}) => {
|
||||
if display.show_text {
|
||||
print_issue_project_summary(path, result);
|
||||
}
|
||||
total.projects_synced += 1;
|
||||
total.issues_fetched += result.issues_fetched;
|
||||
total.issues_upserted += result.issues_upserted;
|
||||
total.labels_created += result.labels_created;
|
||||
total.discussions_fetched += result.discussions_fetched;
|
||||
total.notes_upserted += result.notes_upserted;
|
||||
total.issues_synced_discussions += result.issues_synced_discussions;
|
||||
total.issues_skipped_discussion_sync += result.issues_skipped_discussion_sync;
|
||||
total.resource_events_fetched += result.resource_events_fetched;
|
||||
total.resource_events_failed += result.resource_events_failed;
|
||||
if result.status_enrichment_error.is_some() {
|
||||
total.status_enrichment_errors += 1;
|
||||
}
|
||||
total
|
||||
.status_enrichment_projects
|
||||
.push(ProjectStatusEnrichment {
|
||||
path: path.clone(),
|
||||
mode: result.status_enrichment_mode.clone(),
|
||||
reason: result.status_unsupported_reason.clone(),
|
||||
seen: result.statuses_seen,
|
||||
enriched: result.statuses_enriched,
|
||||
cleared: result.statuses_cleared,
|
||||
without_widget: result.statuses_without_widget,
|
||||
partial_errors: result.partial_error_count,
|
||||
first_partial_error: result.first_partial_error.clone(),
|
||||
error: result.status_enrichment_error.clone(),
|
||||
});
|
||||
total.project_summaries.push(ProjectSummary {
|
||||
path: path.clone(),
|
||||
items_upserted: result.issues_upserted,
|
||||
discussions_synced: result.discussions_fetched,
|
||||
events_fetched: result.resource_events_fetched,
|
||||
events_failed: result.resource_events_failed,
|
||||
statuses_enriched: result.statuses_enriched,
|
||||
statuses_seen: result.statuses_seen,
|
||||
status_errors: result.partial_error_count
|
||||
+ usize::from(result.status_enrichment_error.is_some()),
|
||||
mr_diffs_fetched: 0,
|
||||
mr_diffs_failed: 0,
|
||||
});
|
||||
}
|
||||
Ok(ProjectIngestOutcome::Mrs {
|
||||
ref path,
|
||||
ref result,
|
||||
}) => {
|
||||
if display.show_text {
|
||||
print_mr_project_summary(path, result);
|
||||
}
|
||||
total.projects_synced += 1;
|
||||
total.mrs_fetched += result.mrs_fetched;
|
||||
total.mrs_upserted += result.mrs_upserted;
|
||||
total.labels_created += result.labels_created;
|
||||
total.assignees_linked += result.assignees_linked;
|
||||
total.reviewers_linked += result.reviewers_linked;
|
||||
total.discussions_fetched += result.discussions_fetched;
|
||||
total.notes_upserted += result.notes_upserted;
|
||||
total.diffnotes_count += result.diffnotes_count;
|
||||
total.mrs_synced_discussions += result.mrs_synced_discussions;
|
||||
total.mrs_skipped_discussion_sync += result.mrs_skipped_discussion_sync;
|
||||
total.resource_events_fetched += result.resource_events_fetched;
|
||||
total.resource_events_failed += result.resource_events_failed;
|
||||
total.mr_diffs_fetched += result.mr_diffs_fetched;
|
||||
total.mr_diffs_failed += result.mr_diffs_failed;
|
||||
total.project_summaries.push(ProjectSummary {
|
||||
path: path.clone(),
|
||||
items_upserted: result.mrs_upserted,
|
||||
discussions_synced: result.discussions_fetched,
|
||||
events_fetched: result.resource_events_fetched,
|
||||
events_failed: result.resource_events_failed,
|
||||
statuses_enriched: 0,
|
||||
statuses_seen: 0,
|
||||
status_errors: 0,
|
||||
mr_diffs_fetched: result.mr_diffs_fetched,
|
||||
mr_diffs_failed: result.mr_diffs_failed,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(e) = first_error {
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
Ok(total)
|
||||
}
|
||||
|
||||
fn get_projects_to_sync(
|
||||
conn: &Connection,
|
||||
configured_projects: &[crate::core::config::ProjectConfig],
|
||||
filter: Option<&str>,
|
||||
) -> Result<Vec<(i64, i64, String)>> {
|
||||
if let Some(filter_str) = filter {
|
||||
let project_id = resolve_project(conn, filter_str)?;
|
||||
|
||||
let row: Option<(i64, String)> = conn
|
||||
.query_row(
|
||||
"SELECT gitlab_project_id, path_with_namespace FROM projects WHERE id = ?1",
|
||||
[project_id],
|
||||
|row| Ok((row.get(0)?, row.get(1)?)),
|
||||
)
|
||||
.ok();
|
||||
|
||||
if let Some((gitlab_id, path)) = row {
|
||||
if configured_projects.iter().any(|p| p.path == path) {
|
||||
return Ok(vec![(project_id, gitlab_id, path)]);
|
||||
}
|
||||
return Err(LoreError::Other(format!(
|
||||
"Project '{}' exists in database but is not in configuration",
|
||||
path
|
||||
)));
|
||||
}
|
||||
|
||||
return Err(LoreError::Other(format!(
|
||||
"Project '{}' not found in database",
|
||||
filter_str
|
||||
)));
|
||||
}
|
||||
|
||||
let mut projects = Vec::new();
|
||||
for project_config in configured_projects {
|
||||
let result: Option<(i64, i64)> = conn
|
||||
.query_row(
|
||||
"SELECT id, gitlab_project_id FROM projects WHERE path_with_namespace = ?",
|
||||
[&project_config.path],
|
||||
|row| Ok((row.get(0)?, row.get(1)?)),
|
||||
)
|
||||
.ok();
|
||||
|
||||
if let Some((local_id, gitlab_id)) = result {
|
||||
projects.push((local_id, gitlab_id, project_config.path.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(projects)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user