use std::time::Instant; use tracing::{Instrument, debug, info, warn}; use crate::Config; use crate::cli::commands::sync::{EntitySyncResult, SurgicalIids, SyncOptions, SyncResult}; use crate::cli::progress::{format_stage_line, stage_spinner_v2}; use crate::cli::render::{Icons, Theme}; use crate::core::db::{LATEST_SCHEMA_VERSION, create_connection, get_schema_version}; use crate::core::error::{LoreError, Result}; use crate::core::lock::{AppLock, LockOptions}; use crate::core::paths::get_db_path; use crate::core::project::resolve_project; use crate::core::shutdown::ShutdownSignal; use crate::core::sync_run::SyncRunRecorder; use crate::documents::{SourceType, regenerate_dirty_documents_for_sources}; use crate::embedding::ollama::{OllamaClient, OllamaConfig}; use crate::embedding::pipeline::{DEFAULT_EMBED_CONCURRENCY, embed_documents_by_ids}; use crate::gitlab::GitLabClient; use crate::ingestion::surgical::{ fetch_dependents_for_issue, fetch_dependents_for_mr, ingest_issue_by_iid, ingest_mr_by_iid, preflight_fetch, }; pub async fn run_sync_surgical( config: &Config, options: SyncOptions, run_id: Option<&str>, signal: &ShutdownSignal, ) -> Result { // ── Generate run_id ── let generated_id; let run_id = match run_id { Some(id) => id, None => { generated_id = uuid::Uuid::new_v4().simple().to_string(); &generated_id[..8] } }; let span = tracing::info_span!("surgical_sync", %run_id); async move { let pipeline_start = Instant::now(); let mut result = SyncResult { run_id: run_id.to_string(), surgical_mode: Some(true), surgical_iids: Some(SurgicalIids { issues: options.issue_iids.clone(), merge_requests: options.mr_iids.clone(), }), ..SyncResult::default() }; let mut entity_results: Vec = Vec::new(); // ── Resolve project ── let project_str = options.project.as_deref().ok_or_else(|| { LoreError::Other( "Surgical sync requires --project. Specify the project path.".to_string(), ) })?; let db_path = get_db_path(config.storage.db_path.as_deref()); let conn = create_connection(&db_path)?; let schema_version = get_schema_version(&conn); if schema_version < LATEST_SCHEMA_VERSION { return Err(LoreError::MigrationFailed { version: schema_version, message: format!( "Database is at schema version {schema_version} but {LATEST_SCHEMA_VERSION} is required. \ Run 'lore sync' first to apply migrations." ), source: None, }); } let project_id = resolve_project(&conn, project_str)?; let gitlab_project_id: i64 = conn.query_row( "SELECT gitlab_project_id FROM projects WHERE id = ?1", [project_id], |row| row.get(0), )?; debug!( project_str, project_id, gitlab_project_id, "Resolved project for surgical sync" ); // ── Start recorder ── let recorder_conn = create_connection(&db_path)?; let recorder = SyncRunRecorder::start(&recorder_conn, "surgical-sync", run_id)?; let iids_json = serde_json::to_string(&SurgicalIids { issues: options.issue_iids.clone(), merge_requests: options.mr_iids.clone(), }) .unwrap_or_else(|_| "{}".to_string()); recorder.set_surgical_metadata(&recorder_conn, "surgical", "preflight", &iids_json)?; // Wrap recorder in Option for consuming terminal methods let mut recorder = Some(recorder); // ── Build GitLab client ── let token = config.gitlab.resolve_token()?; let client = GitLabClient::new( &config.gitlab.base_url, &token, Some(config.sync.requests_per_second), ); // ── Build targets list ── let mut targets: Vec<(String, i64)> = Vec::new(); for iid in &options.issue_iids { targets.push(("issue".to_string(), *iid as i64)); } for iid in &options.mr_iids { targets.push(("merge_request".to_string(), *iid as i64)); } // ── Stage: Preflight ── let stage_start = Instant::now(); let spinner = stage_spinner_v2(Icons::sync(), "Preflight", "fetching...", options.robot_mode); info!(targets = targets.len(), "Preflight: fetching entities from GitLab"); let preflight = preflight_fetch(&client, gitlab_project_id, &targets).await; // Record preflight failures for failure in &preflight.failures { let is_not_found = matches!(&failure.error, LoreError::GitLabNotFound { .. }); entity_results.push(EntitySyncResult { entity_type: failure.entity_type.clone(), iid: failure.iid as u64, outcome: if is_not_found { "not_found".to_string() } else { "preflight_failed".to_string() }, error: Some(failure.error.to_string()), toctou_reason: None, }); if let Some(ref rec) = recorder { let _ = rec.record_entity_result(&recorder_conn, &failure.entity_type, "warning"); } } let preflight_summary = format!( "{} issues, {} MRs fetched ({} failed)", preflight.issues.len(), preflight.merge_requests.len(), preflight.failures.len() ); let preflight_icon = color_icon( if preflight.failures.is_empty() { Icons::success() } else { Icons::warning() }, !preflight.failures.is_empty(), ); emit_stage_line( &spinner, &preflight_icon, "Preflight", &preflight_summary, stage_start.elapsed(), options.robot_mode, ); // ── Preflight-only early return ── if options.preflight_only { result.preflight_only = Some(true); result.entity_results = Some(entity_results); if let Some(rec) = recorder.take() { rec.succeed(&recorder_conn, &[], 0, preflight.failures.len())?; } return Ok(result); } // ── Check cancellation ── if signal.is_cancelled() { if let Some(rec) = recorder.take() { rec.cancel(&recorder_conn, "cancelled before ingest")?; } result.entity_results = Some(entity_results); return Ok(result); } // ── Acquire lock ── let lock_conn = create_connection(&db_path)?; let mut lock = AppLock::new( lock_conn, LockOptions { name: "sync".to_string(), stale_lock_minutes: config.sync.stale_lock_minutes, heartbeat_interval_seconds: config.sync.heartbeat_interval_seconds, }, ); lock.acquire(options.force)?; // Wrap the rest in a closure-like block to ensure lock release on error let pipeline_result = run_pipeline_stages( &conn, &recorder_conn, config, &client, &options, &preflight, project_id, gitlab_project_id, &mut entity_results, &mut result, recorder.as_ref(), signal, ) .await; match pipeline_result { Ok(()) => { // ── Finalize: succeed ── if let Some(ref rec) = recorder { let _ = rec.update_phase(&recorder_conn, "finalize"); } let total_items = result.issues_updated + result.mrs_updated + result.documents_regenerated + result.documents_embedded; let total_errors = result.documents_errored + result.embedding_failed + entity_results .iter() .filter(|e| e.outcome != "synced" && e.outcome != "skipped_stale") .count(); if let Some(rec) = recorder.take() { rec.succeed(&recorder_conn, &[], total_items, total_errors)?; } } Err(ref e) => { if let Some(rec) = recorder.take() { let _ = rec.fail(&recorder_conn, &e.to_string(), None); } } } lock.release(); // Propagate error after cleanup pipeline_result?; result.entity_results = Some(entity_results); let elapsed = pipeline_start.elapsed(); debug!( elapsed_ms = elapsed.as_millis(), issues = result.issues_updated, mrs = result.mrs_updated, docs = result.documents_regenerated, embedded = result.documents_embedded, "Surgical sync pipeline complete" ); Ok(result) } .instrument(span) .await } #[allow(clippy::too_many_arguments)] async fn run_pipeline_stages( conn: &rusqlite::Connection, recorder_conn: &rusqlite::Connection, config: &Config, client: &GitLabClient, options: &SyncOptions, preflight: &crate::ingestion::surgical::PreflightResult, project_id: i64, gitlab_project_id: i64, entity_results: &mut Vec, result: &mut SyncResult, recorder: Option<&SyncRunRecorder>, signal: &ShutdownSignal, ) -> Result<()> { let mut all_dirty_source_keys: Vec<(SourceType, i64)> = Vec::new(); // ── Stage: Ingest ── if let Some(rec) = recorder { rec.update_phase(recorder_conn, "ingest")?; } let stage_start = Instant::now(); let spinner = stage_spinner_v2(Icons::sync(), "Ingest", "processing...", options.robot_mode); // Ingest issues for issue in &preflight.issues { match ingest_issue_by_iid(conn, config, project_id, issue) { Ok(ingest_result) => { if ingest_result.skipped_stale { entity_results.push(EntitySyncResult { entity_type: "issue".to_string(), iid: issue.iid as u64, outcome: "skipped_stale".to_string(), error: None, toctou_reason: Some("updated_at not newer than DB".to_string()), }); if let Some(rec) = recorder { let _ = rec.record_entity_result(recorder_conn, "issue", "skipped_stale"); } } else { result.issues_updated += 1; all_dirty_source_keys.extend(ingest_result.dirty_source_keys); entity_results.push(EntitySyncResult { entity_type: "issue".to_string(), iid: issue.iid as u64, outcome: "synced".to_string(), error: None, toctou_reason: None, }); if let Some(rec) = recorder { let _ = rec.record_entity_result(recorder_conn, "issue", "ingested"); } } } Err(e) => { warn!(iid = issue.iid, error = %e, "Failed to ingest issue"); entity_results.push(EntitySyncResult { entity_type: "issue".to_string(), iid: issue.iid as u64, outcome: "error".to_string(), error: Some(e.to_string()), toctou_reason: None, }); if let Some(rec) = recorder { let _ = rec.record_entity_result(recorder_conn, "issue", "warning"); } } } } // Ingest MRs for mr in &preflight.merge_requests { match ingest_mr_by_iid(conn, config, project_id, mr) { Ok(ingest_result) => { if ingest_result.skipped_stale { entity_results.push(EntitySyncResult { entity_type: "merge_request".to_string(), iid: mr.iid as u64, outcome: "skipped_stale".to_string(), error: None, toctou_reason: Some("updated_at not newer than DB".to_string()), }); if let Some(rec) = recorder { let _ = rec.record_entity_result(recorder_conn, "mr", "skipped_stale"); } } else { result.mrs_updated += 1; all_dirty_source_keys.extend(ingest_result.dirty_source_keys); entity_results.push(EntitySyncResult { entity_type: "merge_request".to_string(), iid: mr.iid as u64, outcome: "synced".to_string(), error: None, toctou_reason: None, }); if let Some(rec) = recorder { let _ = rec.record_entity_result(recorder_conn, "mr", "ingested"); } } } Err(e) => { warn!(iid = mr.iid, error = %e, "Failed to ingest MR"); entity_results.push(EntitySyncResult { entity_type: "merge_request".to_string(), iid: mr.iid as u64, outcome: "error".to_string(), error: Some(e.to_string()), toctou_reason: None, }); if let Some(rec) = recorder { let _ = rec.record_entity_result(recorder_conn, "mr", "warning"); } } } } let ingest_summary = format!( "{} issues, {} MRs ingested", result.issues_updated, result.mrs_updated ); let ingest_icon = color_icon(Icons::success(), false); emit_stage_line( &spinner, &ingest_icon, "Ingest", &ingest_summary, stage_start.elapsed(), options.robot_mode, ); // ── Check cancellation ── if signal.is_cancelled() { debug!("Shutdown requested after ingest stage"); return Ok(()); } // ── Stage: Dependents ── if let Some(rec) = recorder { rec.update_phase(recorder_conn, "dependents")?; } let stage_start = Instant::now(); let spinner = stage_spinner_v2( Icons::sync(), "Dependents", "fetching...", options.robot_mode, ); let mut total_discussions: usize = 0; let mut total_events: usize = 0; // Fetch dependents for successfully ingested issues for issue in &preflight.issues { // Only fetch dependents for entities that were actually ingested let was_ingested = entity_results.iter().any(|e| { e.entity_type == "issue" && e.iid == issue.iid as u64 && e.outcome == "synced" }); if !was_ingested { continue; } let local_id: i64 = match conn.query_row( "SELECT id FROM issues WHERE project_id = ?1 AND iid = ?2", (project_id, issue.iid), |row| row.get(0), ) { Ok(id) => id, Err(e) => { warn!(iid = issue.iid, error = %e, "Could not find local issue ID for dependents"); continue; } }; match fetch_dependents_for_issue( client, conn, project_id, gitlab_project_id, issue.iid, local_id, config, ) .await { Ok(dep_result) => { total_discussions += dep_result.discussions_fetched; total_events += dep_result.resource_events_fetched; result.discussions_fetched += dep_result.discussions_fetched; result.resource_events_fetched += dep_result.resource_events_fetched; } Err(e) => { warn!(iid = issue.iid, error = %e, "Failed to fetch dependents for issue"); } } } // Fetch dependents for successfully ingested MRs for mr in &preflight.merge_requests { let was_ingested = entity_results.iter().any(|e| { e.entity_type == "merge_request" && e.iid == mr.iid as u64 && e.outcome == "synced" }); if !was_ingested { continue; } let local_id: i64 = match conn.query_row( "SELECT id FROM merge_requests WHERE project_id = ?1 AND iid = ?2", (project_id, mr.iid), |row| row.get(0), ) { Ok(id) => id, Err(e) => { warn!(iid = mr.iid, error = %e, "Could not find local MR ID for dependents"); continue; } }; match fetch_dependents_for_mr( client, conn, project_id, gitlab_project_id, mr.iid, local_id, config, ) .await { Ok(dep_result) => { total_discussions += dep_result.discussions_fetched; total_events += dep_result.resource_events_fetched; result.discussions_fetched += dep_result.discussions_fetched; result.resource_events_fetched += dep_result.resource_events_fetched; result.mr_diffs_fetched += dep_result.file_changes_stored; } Err(e) => { warn!(iid = mr.iid, error = %e, "Failed to fetch dependents for MR"); } } } let dep_summary = format!("{} discussions, {} events", total_discussions, total_events); let dep_icon = color_icon(Icons::success(), false); emit_stage_line( &spinner, &dep_icon, "Dependents", &dep_summary, stage_start.elapsed(), options.robot_mode, ); // ── Check cancellation ── if signal.is_cancelled() { debug!("Shutdown requested after dependents stage"); return Ok(()); } // ── Stage: Docs ── if !options.no_docs && !all_dirty_source_keys.is_empty() { if let Some(rec) = recorder { rec.update_phase(recorder_conn, "docs")?; } let stage_start = Instant::now(); let spinner = stage_spinner_v2(Icons::sync(), "Docs", "regenerating...", options.robot_mode); let docs_result = regenerate_dirty_documents_for_sources(conn, &all_dirty_source_keys)?; result.documents_regenerated = docs_result.regenerated; result.documents_errored = docs_result.errored; for _ in 0..docs_result.regenerated { if let Some(rec) = recorder { let _ = rec.record_entity_result(recorder_conn, "doc", "regenerated"); } } let docs_summary = format!("{} documents regenerated", result.documents_regenerated); let docs_icon = color_icon( if docs_result.errored > 0 { Icons::warning() } else { Icons::success() }, docs_result.errored > 0, ); emit_stage_line( &spinner, &docs_icon, "Docs", &docs_summary, stage_start.elapsed(), options.robot_mode, ); // ── Check cancellation ── if signal.is_cancelled() { debug!("Shutdown requested after docs stage"); return Ok(()); } // ── Stage: Embed ── if !options.no_embed && !docs_result.document_ids.is_empty() { if let Some(rec) = recorder { rec.update_phase(recorder_conn, "embed")?; } let stage_start = Instant::now(); let spinner = stage_spinner_v2(Icons::sync(), "Embed", "embedding...", options.robot_mode); let ollama_config = OllamaConfig { base_url: config.embedding.base_url.clone(), model: config.embedding.model.clone(), ..OllamaConfig::default() }; let ollama_client = OllamaClient::new(ollama_config); let model_name = &config.embedding.model; let concurrency = if config.embedding.concurrency > 0 { config.embedding.concurrency as usize } else { DEFAULT_EMBED_CONCURRENCY }; match embed_documents_by_ids( conn, &ollama_client, model_name, concurrency, &docs_result.document_ids, signal, ) .await { Ok(embed_result) => { result.documents_embedded = embed_result.docs_embedded; result.embedding_failed = embed_result.failed; for _ in 0..embed_result.docs_embedded { if let Some(rec) = recorder { let _ = rec.record_entity_result(recorder_conn, "doc", "embedded"); } } let embed_summary = format!("{} chunks embedded", embed_result.chunks_embedded); let embed_icon = color_icon( if embed_result.failed > 0 { Icons::warning() } else { Icons::success() }, embed_result.failed > 0, ); emit_stage_line( &spinner, &embed_icon, "Embed", &embed_summary, stage_start.elapsed(), options.robot_mode, ); } Err(e) => { let warn_summary = format!("skipped ({})", e); let warn_icon = color_icon(Icons::warning(), true); emit_stage_line( &spinner, &warn_icon, "Embed", &warn_summary, stage_start.elapsed(), options.robot_mode, ); warn!(error = %e, "Embedding stage failed (Ollama may be unavailable), continuing"); } } } } Ok(()) } /// Apply semantic color to a stage-completion icon glyph. fn color_icon(icon: &str, has_errors: bool) -> String { if has_errors { Theme::warning().render(icon) } else { Theme::success().render(icon) } } fn emit_stage_line( pb: &indicatif::ProgressBar, icon: &str, label: &str, summary: &str, elapsed: std::time::Duration, robot_mode: bool, ) { pb.finish_and_clear(); if !robot_mode { crate::cli::progress::multi().suspend(|| { println!("{}", format_stage_line(icon, label, summary, elapsed)); }); } } #[cfg(test)] mod tests { use crate::cli::commands::sync::SyncOptions; #[test] fn sync_options_is_surgical_required() { let opts = SyncOptions { issue_iids: vec![1], project: Some("group/repo".to_string()), ..SyncOptions::default() }; assert!(opts.is_surgical()); } #[test] fn sync_options_surgical_with_mrs() { let opts = SyncOptions { mr_iids: vec![10, 20], project: Some("group/repo".to_string()), ..SyncOptions::default() }; assert!(opts.is_surgical()); } #[test] fn sync_options_not_surgical_without_iids() { let opts = SyncOptions { project: Some("group/repo".to_string()), ..SyncOptions::default() }; assert!(!opts.is_surgical()); } }