diff --git a/migrations/027_surgical_sync_runs.sql b/migrations/027_surgical_sync_runs.sql new file mode 100644 index 0000000..005f980 --- /dev/null +++ b/migrations/027_surgical_sync_runs.sql @@ -0,0 +1,20 @@ +-- Migration 027: Extend sync_runs for surgical sync observability +-- Adds mode/phase tracking and surgical-specific counters. + +ALTER TABLE sync_runs ADD COLUMN mode TEXT; +ALTER TABLE sync_runs ADD COLUMN phase TEXT; +ALTER TABLE sync_runs ADD COLUMN surgical_iids_json TEXT; +ALTER TABLE sync_runs ADD COLUMN issues_fetched INTEGER NOT NULL DEFAULT 0; +ALTER TABLE sync_runs ADD COLUMN mrs_fetched INTEGER NOT NULL DEFAULT 0; +ALTER TABLE sync_runs ADD COLUMN issues_ingested INTEGER NOT NULL DEFAULT 0; +ALTER TABLE sync_runs ADD COLUMN mrs_ingested INTEGER NOT NULL DEFAULT 0; +ALTER TABLE sync_runs ADD COLUMN skipped_stale INTEGER NOT NULL DEFAULT 0; +ALTER TABLE sync_runs ADD COLUMN docs_regenerated INTEGER NOT NULL DEFAULT 0; +ALTER TABLE sync_runs ADD COLUMN docs_embedded INTEGER NOT NULL DEFAULT 0; +ALTER TABLE sync_runs ADD COLUMN warnings_count INTEGER NOT NULL DEFAULT 0; +ALTER TABLE sync_runs ADD COLUMN cancelled_at INTEGER; + +CREATE INDEX IF NOT EXISTS idx_sync_runs_mode_started + ON sync_runs(mode, started_at DESC); +CREATE INDEX IF NOT EXISTS idx_sync_runs_status_phase_started + ON sync_runs(status, phase, started_at DESC); diff --git a/src/cli/autocorrect.rs b/src/cli/autocorrect.rs index 1f19af5..39fcef3 100644 --- a/src/cli/autocorrect.rs +++ b/src/cli/autocorrect.rs @@ -130,6 +130,10 @@ const COMMAND_FLAGS: &[(&str, &[&str])] = &[ "--no-dry-run", "--timings", "--lock", + "--issue", + "--mr", + "--project", + "--preflight-only", ], ), ( diff --git a/src/cli/commands/mod.rs b/src/cli/commands/mod.rs index 40e683e..d359bb6 100644 --- a/src/cli/commands/mod.rs +++ b/src/cli/commands/mod.rs @@ -15,6 +15,7 @@ pub mod show; pub mod stats; pub mod sync; pub mod sync_status; +pub mod sync_surgical; pub mod timeline; pub mod trace; pub mod who; @@ -39,7 +40,7 @@ pub use ingest::{ DryRunPreview, IngestDisplay, print_dry_run_preview, print_dry_run_preview_json, print_ingest_summary, print_ingest_summary_json, run_ingest, run_ingest_dry_run, }; -pub use init::{InitInputs, InitOptions, InitResult, run_init}; +pub use init::{InitInputs, InitOptions, InitResult, run_init, run_token_set, run_token_show}; pub use list::{ ListFilters, MrListFilters, NoteListFilters, open_issue_in_browser, open_mr_in_browser, print_list_issues, print_list_issues_json, print_list_mrs, print_list_mrs_json, @@ -55,6 +56,7 @@ pub use show::{ pub use stats::{print_stats, print_stats_json, run_stats}; pub use sync::{SyncOptions, SyncResult, print_sync, print_sync_json, run_sync}; pub use sync_status::{print_sync_status, print_sync_status_json, run_sync_status}; +pub use sync_surgical::run_sync_surgical; pub use timeline::{TimelineParams, print_timeline, print_timeline_json_with_meta, run_timeline}; pub use trace::{parse_trace_path, print_trace, print_trace_json}; pub use who::{WhoRun, print_who_human, print_who_json, run_who}; diff --git a/src/cli/commands/sync.rs b/src/cli/commands/sync.rs index 65d2a6e..f6f1b20 100644 --- a/src/cli/commands/sync.rs +++ b/src/cli/commands/sync.rs @@ -16,6 +16,7 @@ use super::ingest::{ DryRunPreview, IngestDisplay, ProjectStatusEnrichment, ProjectSummary, run_ingest, run_ingest_dry_run, }; +use super::sync_surgical::run_sync_surgical; #[derive(Debug, Default)] pub struct SyncOptions { @@ -26,6 +27,35 @@ pub struct SyncOptions { pub no_events: bool, pub robot_mode: bool, pub dry_run: bool, + pub issue_iids: Vec, + pub mr_iids: Vec, + pub project: Option, + pub preflight_only: bool, +} + +impl SyncOptions { + pub const MAX_SURGICAL_TARGETS: usize = 100; + + pub fn is_surgical(&self) -> bool { + !self.issue_iids.is_empty() || !self.mr_iids.is_empty() + } +} + +#[derive(Debug, Default, Serialize)] +pub struct SurgicalIids { + pub issues: Vec, + pub merge_requests: Vec, +} + +#[derive(Debug, Serialize)] +pub struct EntitySyncResult { + pub entity_type: String, + pub iid: u64, + pub outcome: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub toctou_reason: Option, } #[derive(Debug, Default, Serialize)] @@ -45,19 +75,23 @@ pub struct SyncResult { pub embedding_failed: usize, pub status_enrichment_errors: usize, pub statuses_enriched: usize, + #[serde(skip_serializing_if = "Option::is_none")] + pub surgical_mode: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub surgical_iids: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub entity_results: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub preflight_only: Option, #[serde(skip)] pub issue_projects: Vec, #[serde(skip)] pub mr_projects: Vec, } -/// Apply semantic color to a stage-completion icon glyph. +/// Alias for [`Theme::color_icon`] to keep call sites concise. fn color_icon(icon: &str, has_errors: bool) -> String { - if has_errors { - Theme::warning().render(icon) - } else { - Theme::success().render(icon) - } + Theme::color_icon(icon, has_errors) } pub async fn run_sync( @@ -66,6 +100,11 @@ pub async fn run_sync( run_id: Option<&str>, signal: &ShutdownSignal, ) -> Result { + // Surgical dispatch: if any IIDs specified, route to surgical pipeline + if options.is_surgical() { + return run_sync_surgical(config, options, run_id, signal).await; + } + let generated_id; let run_id = match run_id { Some(id) => id, @@ -893,6 +932,22 @@ pub fn print_sync_dry_run_json(result: &SyncDryRunResult) { mod tests { use super::*; + fn default_options() -> SyncOptions { + SyncOptions { + full: false, + force: false, + no_embed: false, + no_docs: false, + no_events: false, + robot_mode: false, + dry_run: false, + issue_iids: vec![], + mr_iids: vec![], + project: None, + preflight_only: false, + } + } + #[test] fn append_failures_skips_zeroes() { let mut summary = "base".to_string(); @@ -1035,4 +1090,112 @@ mod tests { assert!(rows[0].contains("0 statuses updated")); assert!(rows[0].contains("skipped (disabled)")); } + + #[test] + fn is_surgical_with_issues() { + let opts = SyncOptions { + issue_iids: vec![1], + ..default_options() + }; + assert!(opts.is_surgical()); + } + + #[test] + fn is_surgical_with_mrs() { + let opts = SyncOptions { + mr_iids: vec![10], + ..default_options() + }; + assert!(opts.is_surgical()); + } + + #[test] + fn is_surgical_empty() { + let opts = default_options(); + assert!(!opts.is_surgical()); + } + + #[test] + fn max_surgical_targets_is_100() { + assert_eq!(SyncOptions::MAX_SURGICAL_TARGETS, 100); + } + + #[test] + fn sync_result_default_omits_surgical_fields() { + let result = SyncResult::default(); + let json = serde_json::to_value(&result).unwrap(); + assert!(json.get("surgical_mode").is_none()); + assert!(json.get("surgical_iids").is_none()); + assert!(json.get("entity_results").is_none()); + assert!(json.get("preflight_only").is_none()); + } + + #[test] + fn sync_result_with_surgical_fields_serializes_correctly() { + let result = SyncResult { + surgical_mode: Some(true), + surgical_iids: Some(SurgicalIids { + issues: vec![7, 42], + merge_requests: vec![10], + }), + entity_results: Some(vec![ + EntitySyncResult { + entity_type: "issue".to_string(), + iid: 7, + outcome: "synced".to_string(), + error: None, + toctou_reason: None, + }, + EntitySyncResult { + entity_type: "issue".to_string(), + iid: 42, + outcome: "skipped_toctou".to_string(), + error: None, + toctou_reason: Some("updated_at changed".to_string()), + }, + ]), + preflight_only: Some(false), + ..SyncResult::default() + }; + let json = serde_json::to_value(&result).unwrap(); + assert_eq!(json["surgical_mode"], true); + assert_eq!(json["surgical_iids"]["issues"], serde_json::json!([7, 42])); + assert_eq!(json["entity_results"].as_array().unwrap().len(), 2); + assert_eq!(json["entity_results"][1]["outcome"], "skipped_toctou"); + assert_eq!(json["preflight_only"], false); + } + + #[test] + fn entity_sync_result_omits_none_fields() { + let entity = EntitySyncResult { + entity_type: "merge_request".to_string(), + iid: 10, + outcome: "synced".to_string(), + error: None, + toctou_reason: None, + }; + let json = serde_json::to_value(&entity).unwrap(); + assert!(json.get("error").is_none()); + assert!(json.get("toctou_reason").is_none()); + assert!(json.get("entity_type").is_some()); + } + + #[test] + fn is_surgical_with_both_issues_and_mrs() { + let opts = SyncOptions { + issue_iids: vec![1, 2], + mr_iids: vec![10], + ..default_options() + }; + assert!(opts.is_surgical()); + } + + #[test] + fn is_not_surgical_with_only_project() { + let opts = SyncOptions { + project: Some("group/repo".to_string()), + ..default_options() + }; + assert!(!opts.is_surgical()); + } } diff --git a/src/cli/commands/sync_surgical.rs b/src/cli/commands/sync_surgical.rs new file mode 100644 index 0000000..a4952bd --- /dev/null +++ b/src/cli/commands/sync_surgical.rs @@ -0,0 +1,711 @@ +use std::time::Instant; + +use tracing::{Instrument, debug, info, warn}; + +use crate::Config; +use crate::cli::commands::sync::{EntitySyncResult, SurgicalIids, SyncOptions, SyncResult}; +use crate::cli::progress::{format_stage_line, stage_spinner_v2}; +use crate::cli::render::{Icons, Theme}; +use crate::core::db::{LATEST_SCHEMA_VERSION, create_connection, get_schema_version}; +use crate::core::error::{LoreError, Result}; +use crate::core::lock::{AppLock, LockOptions}; +use crate::core::paths::get_db_path; +use crate::core::project::resolve_project; +use crate::core::shutdown::ShutdownSignal; +use crate::core::sync_run::SyncRunRecorder; +use crate::documents::{SourceType, regenerate_dirty_documents_for_sources}; +use crate::embedding::ollama::{OllamaClient, OllamaConfig}; +use crate::embedding::pipeline::{DEFAULT_EMBED_CONCURRENCY, embed_documents_by_ids}; +use crate::gitlab::GitLabClient; +use crate::ingestion::surgical::{ + fetch_dependents_for_issue, fetch_dependents_for_mr, ingest_issue_by_iid, ingest_mr_by_iid, + preflight_fetch, +}; + +pub async fn run_sync_surgical( + config: &Config, + options: SyncOptions, + run_id: Option<&str>, + signal: &ShutdownSignal, +) -> Result { + // ── Generate run_id ── + let generated_id; + let run_id = match run_id { + Some(id) => id, + None => { + generated_id = uuid::Uuid::new_v4().simple().to_string(); + &generated_id[..8] + } + }; + let span = tracing::info_span!("surgical_sync", %run_id); + + async move { + let pipeline_start = Instant::now(); + let mut result = SyncResult { + run_id: run_id.to_string(), + surgical_mode: Some(true), + surgical_iids: Some(SurgicalIids { + issues: options.issue_iids.clone(), + merge_requests: options.mr_iids.clone(), + }), + ..SyncResult::default() + }; + let mut entity_results: Vec = Vec::new(); + + // ── Resolve project ── + let project_str = options.project.as_deref().ok_or_else(|| { + LoreError::Other( + "Surgical sync requires --project. Specify the project path.".to_string(), + ) + })?; + + let db_path = get_db_path(config.storage.db_path.as_deref()); + let conn = create_connection(&db_path)?; + + let schema_version = get_schema_version(&conn); + if schema_version < LATEST_SCHEMA_VERSION { + return Err(LoreError::MigrationFailed { + version: schema_version, + message: format!( + "Database is at schema version {schema_version} but {LATEST_SCHEMA_VERSION} is required. \ + Run 'lore sync' first to apply migrations." + ), + source: None, + }); + } + + let project_id = resolve_project(&conn, project_str)?; + + let gitlab_project_id: i64 = conn.query_row( + "SELECT gitlab_project_id FROM projects WHERE id = ?1", + [project_id], + |row| row.get(0), + )?; + + debug!( + project_str, + project_id, + gitlab_project_id, + "Resolved project for surgical sync" + ); + + // ── Start recorder ── + let recorder_conn = create_connection(&db_path)?; + let recorder = SyncRunRecorder::start(&recorder_conn, "surgical-sync", run_id)?; + + let iids_json = serde_json::to_string(&SurgicalIids { + issues: options.issue_iids.clone(), + merge_requests: options.mr_iids.clone(), + }) + .unwrap_or_else(|_| "{}".to_string()); + + recorder.set_surgical_metadata(&recorder_conn, "surgical", "preflight", &iids_json)?; + + // Wrap recorder in Option for consuming terminal methods + let mut recorder = Some(recorder); + + // ── Build GitLab client ── + let token = config.gitlab.resolve_token()?; + let client = GitLabClient::new( + &config.gitlab.base_url, + &token, + Some(config.sync.requests_per_second), + ); + + // ── Build targets list ── + let mut targets: Vec<(String, i64)> = Vec::new(); + for iid in &options.issue_iids { + targets.push(("issue".to_string(), *iid as i64)); + } + for iid in &options.mr_iids { + targets.push(("merge_request".to_string(), *iid as i64)); + } + + // ── Stage: Preflight ── + let stage_start = Instant::now(); + let spinner = + stage_spinner_v2(Icons::sync(), "Preflight", "fetching...", options.robot_mode); + + info!(targets = targets.len(), "Preflight: fetching entities from GitLab"); + let preflight = preflight_fetch(&client, gitlab_project_id, &targets).await; + + // Record preflight failures + for failure in &preflight.failures { + let is_not_found = matches!(&failure.error, LoreError::GitLabNotFound { .. }); + entity_results.push(EntitySyncResult { + entity_type: failure.entity_type.clone(), + iid: failure.iid as u64, + outcome: if is_not_found { + "not_found".to_string() + } else { + "preflight_failed".to_string() + }, + error: Some(failure.error.to_string()), + toctou_reason: None, + }); + if let Some(ref rec) = recorder { + let _ = rec.record_entity_result(&recorder_conn, &failure.entity_type, "warning"); + } + } + + let preflight_summary = format!( + "{} issues, {} MRs fetched ({} failed)", + preflight.issues.len(), + preflight.merge_requests.len(), + preflight.failures.len() + ); + let preflight_icon = color_icon( + if preflight.failures.is_empty() { + Icons::success() + } else { + Icons::warning() + }, + !preflight.failures.is_empty(), + ); + emit_stage_line( + &spinner, + &preflight_icon, + "Preflight", + &preflight_summary, + stage_start.elapsed(), + options.robot_mode, + ); + + // ── Preflight-only early return ── + if options.preflight_only { + result.preflight_only = Some(true); + result.entity_results = Some(entity_results); + if let Some(rec) = recorder.take() { + rec.succeed(&recorder_conn, &[], 0, preflight.failures.len())?; + } + return Ok(result); + } + + // ── Check cancellation ── + if signal.is_cancelled() { + if let Some(rec) = recorder.take() { + rec.cancel(&recorder_conn, "cancelled before ingest")?; + } + result.entity_results = Some(entity_results); + return Ok(result); + } + + // ── Acquire lock ── + let lock_conn = create_connection(&db_path)?; + let mut lock = AppLock::new( + lock_conn, + LockOptions { + name: "sync".to_string(), + stale_lock_minutes: config.sync.stale_lock_minutes, + heartbeat_interval_seconds: config.sync.heartbeat_interval_seconds, + }, + ); + lock.acquire(options.force)?; + + // Wrap the rest in a closure-like block to ensure lock release on error + let pipeline_result = run_pipeline_stages( + &conn, + &recorder_conn, + config, + &client, + &options, + &preflight, + project_id, + gitlab_project_id, + &mut entity_results, + &mut result, + recorder.as_ref(), + signal, + ) + .await; + + match pipeline_result { + Ok(()) => { + // ── Finalize: succeed ── + if let Some(ref rec) = recorder { + let _ = rec.update_phase(&recorder_conn, "finalize"); + } + let total_items = result.issues_updated + + result.mrs_updated + + result.documents_regenerated + + result.documents_embedded; + let total_errors = result.documents_errored + + result.embedding_failed + + entity_results + .iter() + .filter(|e| e.outcome != "synced" && e.outcome != "skipped_stale") + .count(); + if let Some(rec) = recorder.take() { + rec.succeed(&recorder_conn, &[], total_items, total_errors)?; + } + } + Err(ref e) => { + if let Some(rec) = recorder.take() { + let _ = rec.fail(&recorder_conn, &e.to_string(), None); + } + } + } + + lock.release(); + + // Propagate error after cleanup + pipeline_result?; + + result.entity_results = Some(entity_results); + + let elapsed = pipeline_start.elapsed(); + debug!( + elapsed_ms = elapsed.as_millis(), + issues = result.issues_updated, + mrs = result.mrs_updated, + docs = result.documents_regenerated, + embedded = result.documents_embedded, + "Surgical sync pipeline complete" + ); + + Ok(result) + } + .instrument(span) + .await +} + +#[allow(clippy::too_many_arguments)] +async fn run_pipeline_stages( + conn: &rusqlite::Connection, + recorder_conn: &rusqlite::Connection, + config: &Config, + client: &GitLabClient, + options: &SyncOptions, + preflight: &crate::ingestion::surgical::PreflightResult, + project_id: i64, + gitlab_project_id: i64, + entity_results: &mut Vec, + result: &mut SyncResult, + recorder: Option<&SyncRunRecorder>, + signal: &ShutdownSignal, +) -> Result<()> { + let mut all_dirty_source_keys: Vec<(SourceType, i64)> = Vec::new(); + + // ── Stage: Ingest ── + if let Some(rec) = recorder { + rec.update_phase(recorder_conn, "ingest")?; + } + + let stage_start = Instant::now(); + let spinner = stage_spinner_v2(Icons::sync(), "Ingest", "processing...", options.robot_mode); + + // Ingest issues + for issue in &preflight.issues { + match ingest_issue_by_iid(conn, config, project_id, issue) { + Ok(ingest_result) => { + if ingest_result.skipped_stale { + entity_results.push(EntitySyncResult { + entity_type: "issue".to_string(), + iid: issue.iid as u64, + outcome: "skipped_stale".to_string(), + error: None, + toctou_reason: Some("updated_at not newer than DB".to_string()), + }); + if let Some(rec) = recorder { + let _ = rec.record_entity_result(recorder_conn, "issue", "skipped_stale"); + } + } else { + result.issues_updated += 1; + all_dirty_source_keys.extend(ingest_result.dirty_source_keys); + entity_results.push(EntitySyncResult { + entity_type: "issue".to_string(), + iid: issue.iid as u64, + outcome: "synced".to_string(), + error: None, + toctou_reason: None, + }); + if let Some(rec) = recorder { + let _ = rec.record_entity_result(recorder_conn, "issue", "ingested"); + } + } + } + Err(e) => { + warn!(iid = issue.iid, error = %e, "Failed to ingest issue"); + entity_results.push(EntitySyncResult { + entity_type: "issue".to_string(), + iid: issue.iid as u64, + outcome: "error".to_string(), + error: Some(e.to_string()), + toctou_reason: None, + }); + if let Some(rec) = recorder { + let _ = rec.record_entity_result(recorder_conn, "issue", "warning"); + } + } + } + } + + // Ingest MRs + for mr in &preflight.merge_requests { + match ingest_mr_by_iid(conn, config, project_id, mr) { + Ok(ingest_result) => { + if ingest_result.skipped_stale { + entity_results.push(EntitySyncResult { + entity_type: "merge_request".to_string(), + iid: mr.iid as u64, + outcome: "skipped_stale".to_string(), + error: None, + toctou_reason: Some("updated_at not newer than DB".to_string()), + }); + if let Some(rec) = recorder { + let _ = rec.record_entity_result(recorder_conn, "mr", "skipped_stale"); + } + } else { + result.mrs_updated += 1; + all_dirty_source_keys.extend(ingest_result.dirty_source_keys); + entity_results.push(EntitySyncResult { + entity_type: "merge_request".to_string(), + iid: mr.iid as u64, + outcome: "synced".to_string(), + error: None, + toctou_reason: None, + }); + if let Some(rec) = recorder { + let _ = rec.record_entity_result(recorder_conn, "mr", "ingested"); + } + } + } + Err(e) => { + warn!(iid = mr.iid, error = %e, "Failed to ingest MR"); + entity_results.push(EntitySyncResult { + entity_type: "merge_request".to_string(), + iid: mr.iid as u64, + outcome: "error".to_string(), + error: Some(e.to_string()), + toctou_reason: None, + }); + if let Some(rec) = recorder { + let _ = rec.record_entity_result(recorder_conn, "mr", "warning"); + } + } + } + } + + let ingest_summary = format!( + "{} issues, {} MRs ingested", + result.issues_updated, result.mrs_updated + ); + let ingest_icon = color_icon(Icons::success(), false); + emit_stage_line( + &spinner, + &ingest_icon, + "Ingest", + &ingest_summary, + stage_start.elapsed(), + options.robot_mode, + ); + + // ── Check cancellation ── + if signal.is_cancelled() { + debug!("Shutdown requested after ingest stage"); + return Ok(()); + } + + // ── Stage: Dependents ── + if let Some(rec) = recorder { + rec.update_phase(recorder_conn, "dependents")?; + } + + let stage_start = Instant::now(); + let spinner = stage_spinner_v2( + Icons::sync(), + "Dependents", + "fetching...", + options.robot_mode, + ); + + let mut total_discussions: usize = 0; + let mut total_events: usize = 0; + + // Fetch dependents for successfully ingested issues + for issue in &preflight.issues { + // Only fetch dependents for entities that were actually ingested + let was_ingested = entity_results.iter().any(|e| { + e.entity_type == "issue" && e.iid == issue.iid as u64 && e.outcome == "synced" + }); + if !was_ingested { + continue; + } + + let local_id: i64 = match conn.query_row( + "SELECT id FROM issues WHERE project_id = ?1 AND iid = ?2", + (project_id, issue.iid), + |row| row.get(0), + ) { + Ok(id) => id, + Err(e) => { + warn!(iid = issue.iid, error = %e, "Could not find local issue ID for dependents"); + continue; + } + }; + + match fetch_dependents_for_issue( + client, + conn, + project_id, + gitlab_project_id, + issue.iid, + local_id, + config, + ) + .await + { + Ok(dep_result) => { + total_discussions += dep_result.discussions_fetched; + total_events += dep_result.resource_events_fetched; + result.discussions_fetched += dep_result.discussions_fetched; + result.resource_events_fetched += dep_result.resource_events_fetched; + } + Err(e) => { + warn!(iid = issue.iid, error = %e, "Failed to fetch dependents for issue"); + } + } + } + + // Fetch dependents for successfully ingested MRs + for mr in &preflight.merge_requests { + let was_ingested = entity_results.iter().any(|e| { + e.entity_type == "merge_request" && e.iid == mr.iid as u64 && e.outcome == "synced" + }); + if !was_ingested { + continue; + } + + let local_id: i64 = match conn.query_row( + "SELECT id FROM merge_requests WHERE project_id = ?1 AND iid = ?2", + (project_id, mr.iid), + |row| row.get(0), + ) { + Ok(id) => id, + Err(e) => { + warn!(iid = mr.iid, error = %e, "Could not find local MR ID for dependents"); + continue; + } + }; + + match fetch_dependents_for_mr( + client, + conn, + project_id, + gitlab_project_id, + mr.iid, + local_id, + config, + ) + .await + { + Ok(dep_result) => { + total_discussions += dep_result.discussions_fetched; + total_events += dep_result.resource_events_fetched; + result.discussions_fetched += dep_result.discussions_fetched; + result.resource_events_fetched += dep_result.resource_events_fetched; + result.mr_diffs_fetched += dep_result.file_changes_stored; + } + Err(e) => { + warn!(iid = mr.iid, error = %e, "Failed to fetch dependents for MR"); + } + } + } + + let dep_summary = format!("{} discussions, {} events", total_discussions, total_events); + let dep_icon = color_icon(Icons::success(), false); + emit_stage_line( + &spinner, + &dep_icon, + "Dependents", + &dep_summary, + stage_start.elapsed(), + options.robot_mode, + ); + + // ── Check cancellation ── + if signal.is_cancelled() { + debug!("Shutdown requested after dependents stage"); + return Ok(()); + } + + // ── Stage: Docs ── + if !options.no_docs && !all_dirty_source_keys.is_empty() { + if let Some(rec) = recorder { + rec.update_phase(recorder_conn, "docs")?; + } + + let stage_start = Instant::now(); + let spinner = + stage_spinner_v2(Icons::sync(), "Docs", "regenerating...", options.robot_mode); + + let docs_result = regenerate_dirty_documents_for_sources(conn, &all_dirty_source_keys)?; + result.documents_regenerated = docs_result.regenerated; + result.documents_errored = docs_result.errored; + + for _ in 0..docs_result.regenerated { + if let Some(rec) = recorder { + let _ = rec.record_entity_result(recorder_conn, "doc", "regenerated"); + } + } + + let docs_summary = format!("{} documents regenerated", result.documents_regenerated); + let docs_icon = color_icon( + if docs_result.errored > 0 { + Icons::warning() + } else { + Icons::success() + }, + docs_result.errored > 0, + ); + emit_stage_line( + &spinner, + &docs_icon, + "Docs", + &docs_summary, + stage_start.elapsed(), + options.robot_mode, + ); + + // ── Check cancellation ── + if signal.is_cancelled() { + debug!("Shutdown requested after docs stage"); + return Ok(()); + } + + // ── Stage: Embed ── + if !options.no_embed && !docs_result.document_ids.is_empty() { + if let Some(rec) = recorder { + rec.update_phase(recorder_conn, "embed")?; + } + + let stage_start = Instant::now(); + let spinner = + stage_spinner_v2(Icons::sync(), "Embed", "embedding...", options.robot_mode); + + let ollama_config = OllamaConfig { + base_url: config.embedding.base_url.clone(), + model: config.embedding.model.clone(), + ..OllamaConfig::default() + }; + let ollama_client = OllamaClient::new(ollama_config); + + let model_name = &config.embedding.model; + let concurrency = if config.embedding.concurrency > 0 { + config.embedding.concurrency as usize + } else { + DEFAULT_EMBED_CONCURRENCY + }; + + match embed_documents_by_ids( + conn, + &ollama_client, + model_name, + concurrency, + &docs_result.document_ids, + signal, + ) + .await + { + Ok(embed_result) => { + result.documents_embedded = embed_result.docs_embedded; + result.embedding_failed = embed_result.failed; + + for _ in 0..embed_result.docs_embedded { + if let Some(rec) = recorder { + let _ = rec.record_entity_result(recorder_conn, "doc", "embedded"); + } + } + + let embed_summary = format!("{} chunks embedded", embed_result.chunks_embedded); + let embed_icon = color_icon( + if embed_result.failed > 0 { + Icons::warning() + } else { + Icons::success() + }, + embed_result.failed > 0, + ); + emit_stage_line( + &spinner, + &embed_icon, + "Embed", + &embed_summary, + stage_start.elapsed(), + options.robot_mode, + ); + } + Err(e) => { + let warn_summary = format!("skipped ({})", e); + let warn_icon = color_icon(Icons::warning(), true); + emit_stage_line( + &spinner, + &warn_icon, + "Embed", + &warn_summary, + stage_start.elapsed(), + options.robot_mode, + ); + warn!(error = %e, "Embedding stage failed (Ollama may be unavailable), continuing"); + } + } + } + } + + Ok(()) +} + +/// Alias for [`Theme::color_icon`] to keep call sites concise. +fn color_icon(icon: &str, has_errors: bool) -> String { + Theme::color_icon(icon, has_errors) +} + +fn emit_stage_line( + pb: &indicatif::ProgressBar, + icon: &str, + label: &str, + summary: &str, + elapsed: std::time::Duration, + robot_mode: bool, +) { + pb.finish_and_clear(); + if !robot_mode { + crate::cli::progress::multi().suspend(|| { + println!("{}", format_stage_line(icon, label, summary, elapsed)); + }); + } +} + +#[cfg(test)] +mod tests { + use crate::cli::commands::sync::SyncOptions; + + #[test] + fn sync_options_is_surgical_required() { + let opts = SyncOptions { + issue_iids: vec![1], + project: Some("group/repo".to_string()), + ..SyncOptions::default() + }; + assert!(opts.is_surgical()); + } + + #[test] + fn sync_options_surgical_with_mrs() { + let opts = SyncOptions { + mr_iids: vec![10, 20], + project: Some("group/repo".to_string()), + ..SyncOptions::default() + }; + assert!(opts.is_surgical()); + } + + #[test] + fn sync_options_not_surgical_without_iids() { + let opts = SyncOptions { + project: Some("group/repo".to_string()), + ..SyncOptions::default() + }; + assert!(!opts.is_surgical()); + } +} diff --git a/src/cli/mod.rs b/src/cli/mod.rs index 1ec545e..5f0171d 100644 --- a/src/cli/mod.rs +++ b/src/cli/mod.rs @@ -4,7 +4,7 @@ pub mod progress; pub mod render; pub mod robot; -use clap::{Parser, Subcommand}; +use clap::{Args, Parser, Subcommand}; use std::io::IsTerminal; #[derive(Parser)] @@ -298,6 +298,15 @@ pub enum Commands { lore cron uninstall # Remove cron job")] Cron(CronArgs), + /// Manage stored GitLab token + #[command(after_help = "\x1b[1mExamples:\x1b[0m + lore token set # Interactive token entry + validation + lore token set --token glpat-xxx # Non-interactive token storage + echo glpat-xxx | lore token set # Pipe token from stdin + lore token show # Show token (masked) + lore token show --unmask # Show full token")] + Token(TokenArgs), + #[command(hide = true)] List { #[arg(value_parser = ["issues", "mrs"])] @@ -798,7 +807,9 @@ pub struct GenerateDocsArgs { lore sync --no-embed # Skip embedding step lore sync --no-status # Skip work-item status enrichment lore sync --full --force # Full re-sync, override stale lock - lore sync --dry-run # Preview what would change")] + lore sync --dry-run # Preview what would change + lore sync --issue 42 -p group/repo # Surgically sync one issue + lore sync --mr 10 --mr 20 -p g/r # Surgically sync two MRs")] pub struct SyncArgs { /// Reset cursors, fetch everything #[arg(long, overrides_with = "no_full")] @@ -848,6 +859,22 @@ pub struct SyncArgs { /// Acquire file lock before syncing (skip if another sync is running) #[arg(long)] pub lock: bool, + + /// Surgically sync specific issues by IID (repeatable, must be positive) + #[arg(long, value_parser = clap::value_parser!(u64).range(1..), action = clap::ArgAction::Append)] + pub issue: Vec, + + /// Surgically sync specific merge requests by IID (repeatable, must be positive) + #[arg(long, value_parser = clap::value_parser!(u64).range(1..), action = clap::ArgAction::Append)] + pub mr: Vec, + + /// Scope to a single project (required when --issue or --mr is used) + #[arg(short = 'p', long)] + pub project: Option, + + /// Validate remote entities exist without DB writes (preflight only) + #[arg(long)] + pub preflight_only: bool, } #[derive(Parser)] @@ -973,15 +1000,14 @@ pub struct WhoArgs { #[arg(short = 'p', long, help_heading = "Filters")] pub project: Option, - /// Maximum results per section (1..=500, bounded for output safety) + /// Maximum results per section (1..=500); omit for unlimited #[arg( short = 'n', long = "limit", - default_value = "20", value_parser = clap::value_parser!(u16).range(1..=500), help_heading = "Output" )] - pub limit: u16, + pub limit: Option, /// Select output fields (comma-separated, or 'minimal' preset; varies by mode) #[arg(long, help_heading = "Output", value_delimiter = ',')] @@ -1128,3 +1154,26 @@ pub enum CronAction { /// Show current cron configuration Status, } + +#[derive(Args)] +pub struct TokenArgs { + #[command(subcommand)] + pub action: TokenAction, +} + +#[derive(Subcommand)] +pub enum TokenAction { + /// Store a GitLab token in the config file + Set { + /// Token value (reads from stdin if omitted in non-interactive mode) + #[arg(long)] + token: Option, + }, + + /// Show the current token (masked by default) + Show { + /// Show the full unmasked token + #[arg(long)] + unmask: bool, + }, +} diff --git a/src/core/db.rs b/src/core/db.rs index 78af367..c4da226 100644 --- a/src/core/db.rs +++ b/src/core/db.rs @@ -89,6 +89,10 @@ const MIGRATIONS: &[(&str, &str)] = &[ "026", include_str!("../../migrations/026_scoring_indexes.sql"), ), + ( + "027", + include_str!("../../migrations/027_surgical_sync_runs.sql"), + ), ]; pub fn create_connection(db_path: &Path) -> Result { diff --git a/src/core/error.rs b/src/core/error.rs index 84ccad7..b9c5f2f 100644 --- a/src/core/error.rs +++ b/src/core/error.rs @@ -21,6 +21,7 @@ pub enum ErrorCode { EmbeddingFailed, NotFound, Ambiguous, + SurgicalPreflightFailed, } impl std::fmt::Display for ErrorCode { @@ -44,6 +45,7 @@ impl std::fmt::Display for ErrorCode { Self::EmbeddingFailed => "EMBEDDING_FAILED", Self::NotFound => "NOT_FOUND", Self::Ambiguous => "AMBIGUOUS", + Self::SurgicalPreflightFailed => "SURGICAL_PREFLIGHT_FAILED", }; write!(f, "{code}") } @@ -70,6 +72,9 @@ impl ErrorCode { Self::EmbeddingFailed => 16, Self::NotFound => 17, Self::Ambiguous => 18, + // Shares exit code 6 with GitLabNotFound — same semantic category (resource not found). + // Robot consumers distinguish via ErrorCode string, not exit code. + Self::SurgicalPreflightFailed => 6, } } } @@ -111,7 +116,7 @@ pub enum LoreError { source: Option, }, - #[error("GitLab token not set. Export {env_var} environment variable.")] + #[error("GitLab token not set. Run 'lore token set' or export {env_var}.")] TokenNotSet { env_var: String }, #[error("Database error: {0}")] @@ -153,6 +158,14 @@ pub enum LoreError { #[error("No embeddings found. Run: lore embed")] EmbeddingsNotBuilt, + + #[error("Surgical preflight failed for {entity_type} !{iid} in {project}: {reason}")] + SurgicalPreflightFailed { + entity_type: String, + iid: u64, + project: String, + reason: String, + }, } impl LoreError { @@ -179,6 +192,7 @@ impl LoreError { Self::OllamaModelNotFound { .. } => ErrorCode::OllamaModelNotFound, Self::EmbeddingFailed { .. } => ErrorCode::EmbeddingFailed, Self::EmbeddingsNotBuilt => ErrorCode::EmbeddingFailed, + Self::SurgicalPreflightFailed { .. } => ErrorCode::SurgicalPreflightFailed, } } @@ -207,7 +221,7 @@ impl LoreError { "Check database file permissions or reset with 'lore reset'.\n\n Example:\n lore migrate\n lore reset --yes", ), Self::TokenNotSet { .. } => Some( - "Export the token to your shell:\n\n export GITLAB_TOKEN=glpat-xxxxxxxxxxxx\n\n Your token needs the read_api scope.", + "Set your token:\n\n lore token set\n\n Or export to your shell:\n\n export GITLAB_TOKEN=glpat-xxxxxxxxxxxx\n\n Your token needs the read_api scope.", ), Self::Database(_) => Some( "Check database file permissions or reset with 'lore reset'.\n\n Example:\n lore doctor\n lore reset --yes", @@ -227,6 +241,9 @@ impl LoreError { Some("Check Ollama logs or retry with 'lore embed --retry-failed'") } Self::EmbeddingsNotBuilt => Some("Generate embeddings first: lore embed"), + Self::SurgicalPreflightFailed { .. } => Some( + "Verify the IID exists in the project and you have access.\n\n Example:\n lore issues -p \n lore mrs -p ", + ), Self::Json(_) | Self::Io(_) | Self::Transform(_) | Self::Other(_) => None, } } @@ -246,7 +263,7 @@ impl LoreError { Self::GitLabAuthFailed => { vec!["export GITLAB_TOKEN=glpat-xxx", "lore auth"] } - Self::TokenNotSet { .. } => vec!["export GITLAB_TOKEN=glpat-xxx"], + Self::TokenNotSet { .. } => vec!["lore token set", "export GITLAB_TOKEN=glpat-xxx"], Self::OllamaUnavailable { .. } => vec!["ollama serve"], Self::OllamaModelNotFound { .. } => vec!["ollama pull nomic-embed-text"], Self::DatabaseLocked { .. } => vec!["lore ingest --force"], @@ -254,6 +271,9 @@ impl LoreError { Self::EmbeddingFailed { .. } => vec!["lore embed --retry-failed"], Self::MigrationFailed { .. } => vec!["lore migrate"], Self::GitLabNetworkError { .. } => vec!["lore doctor"], + Self::SurgicalPreflightFailed { .. } => { + vec!["lore issues -p ", "lore mrs -p "] + } _ => vec![], } } @@ -293,3 +313,40 @@ impl From<&LoreError> for RobotErrorOutput { } pub type Result = std::result::Result; + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn surgical_preflight_failed_display() { + let err = LoreError::SurgicalPreflightFailed { + entity_type: "issue".to_string(), + iid: 42, + project: "group/repo".to_string(), + reason: "not found on GitLab".to_string(), + }; + let msg = err.to_string(); + assert!(msg.contains("issue"), "missing entity_type: {msg}"); + assert!(msg.contains("42"), "missing iid: {msg}"); + assert!(msg.contains("group/repo"), "missing project: {msg}"); + assert!(msg.contains("not found on GitLab"), "missing reason: {msg}"); + } + + #[test] + fn surgical_preflight_failed_error_code() { + let code = ErrorCode::SurgicalPreflightFailed; + assert_eq!(code.exit_code(), 6); + } + + #[test] + fn surgical_preflight_failed_code_mapping() { + let err = LoreError::SurgicalPreflightFailed { + entity_type: "merge_request".to_string(), + iid: 99, + project: "ns/proj".to_string(), + reason: "404".to_string(), + }; + assert_eq!(err.code(), ErrorCode::SurgicalPreflightFailed); + } +} diff --git a/src/core/sync_run.rs b/src/core/sync_run.rs index a07b250..ab135df 100644 --- a/src/core/sync_run.rs +++ b/src/core/sync_run.rs @@ -20,6 +20,75 @@ impl SyncRunRecorder { Ok(Self { row_id }) } + /// Returns the database row ID of this sync run. + pub fn row_id(&self) -> i64 { + self.row_id + } + + /// Sets surgical-mode metadata on the run (mode, phase, IID manifest). + pub fn set_surgical_metadata( + &self, + conn: &Connection, + mode: &str, + phase: &str, + surgical_iids_json: &str, + ) -> Result<()> { + conn.execute( + "UPDATE sync_runs + SET mode = ?1, phase = ?2, surgical_iids_json = ?3 + WHERE id = ?4", + rusqlite::params![mode, phase, surgical_iids_json, self.row_id], + )?; + Ok(()) + } + + /// Updates the current phase and refreshes the heartbeat timestamp. + pub fn update_phase(&self, conn: &Connection, phase: &str) -> Result<()> { + let now = now_ms(); + conn.execute( + "UPDATE sync_runs SET phase = ?1, heartbeat_at = ?2 WHERE id = ?3", + rusqlite::params![phase, now, self.row_id], + )?; + Ok(()) + } + + /// Increments a counter column by 1 based on entity type and stage. + /// Unknown (entity_type, stage) combinations are silently ignored. + pub fn record_entity_result( + &self, + conn: &Connection, + entity_type: &str, + stage: &str, + ) -> Result<()> { + let column = match (entity_type, stage) { + ("issue", "fetched") => "issues_fetched", + ("issue", "ingested") => "issues_ingested", + ("mr", "fetched") => "mrs_fetched", + ("mr", "ingested") => "mrs_ingested", + ("issue" | "mr", "skipped_stale") => "skipped_stale", + ("doc", "regenerated") => "docs_regenerated", + ("doc", "embedded") => "docs_embedded", + (_, "warning") => "warnings_count", + _ => return Ok(()), + }; + // Column name is from a hardcoded match, not user input — safe to interpolate. + let sql = format!("UPDATE sync_runs SET {column} = {column} + 1 WHERE id = ?1"); + conn.execute(&sql, rusqlite::params![self.row_id])?; + Ok(()) + } + + /// Marks the run as cancelled with a reason. Consumes self (terminal state). + pub fn cancel(self, conn: &Connection, reason: &str) -> Result<()> { + let now = now_ms(); + conn.execute( + "UPDATE sync_runs + SET status = 'cancelled', error = ?1, cancelled_at = ?2, finished_at = ?3 + WHERE id = ?4", + rusqlite::params![reason, now, now, self.row_id], + )?; + Ok(()) + } + pub fn succeed( self, conn: &Connection, diff --git a/src/core/sync_run_tests.rs b/src/core/sync_run_tests.rs index b17c816..af2eeab 100644 --- a/src/core/sync_run_tests.rs +++ b/src/core/sync_run_tests.rs @@ -146,3 +146,239 @@ fn test_sync_run_recorder_fail_with_partial_metrics() { assert_eq!(parsed.len(), 1); assert_eq!(parsed[0].name, "ingest_issues"); } + +#[test] +fn sync_run_surgical_columns_exist() { + let conn = setup_test_db(); + conn.execute( + "INSERT INTO sync_runs (started_at, heartbeat_at, status, command, mode, phase, surgical_iids_json) + VALUES (1000, 1000, 'running', 'sync', 'surgical', 'preflight', '{\"issues\":[7],\"mrs\":[]}')", + [], + ) + .unwrap(); + let (mode, phase, iids_json): (String, String, String) = conn + .query_row( + "SELECT mode, phase, surgical_iids_json FROM sync_runs WHERE mode = 'surgical'", + [], + |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)), + ) + .unwrap(); + assert_eq!(mode, "surgical"); + assert_eq!(phase, "preflight"); + assert!(iids_json.contains("7")); +} + +#[test] +fn sync_run_counter_defaults_are_zero() { + let conn = setup_test_db(); + conn.execute( + "INSERT INTO sync_runs (started_at, heartbeat_at, status, command) + VALUES (2000, 2000, 'running', 'sync')", + [], + ) + .unwrap(); + let row_id = conn.last_insert_rowid(); + let (issues_fetched, mrs_fetched, docs_regenerated, warnings_count): (i64, i64, i64, i64) = + conn.query_row( + "SELECT issues_fetched, mrs_fetched, docs_regenerated, warnings_count FROM sync_runs WHERE id = ?1", + [row_id], + |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)), + ) + .unwrap(); + assert_eq!(issues_fetched, 0); + assert_eq!(mrs_fetched, 0); + assert_eq!(docs_regenerated, 0); + assert_eq!(warnings_count, 0); +} + +#[test] +fn sync_run_nullable_columns_default_to_null() { + let conn = setup_test_db(); + conn.execute( + "INSERT INTO sync_runs (started_at, heartbeat_at, status, command) + VALUES (3000, 3000, 'running', 'sync')", + [], + ) + .unwrap(); + let row_id = conn.last_insert_rowid(); + let (mode, phase, cancelled_at): (Option, Option, Option) = conn + .query_row( + "SELECT mode, phase, cancelled_at FROM sync_runs WHERE id = ?1", + [row_id], + |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)), + ) + .unwrap(); + assert!(mode.is_none()); + assert!(phase.is_none()); + assert!(cancelled_at.is_none()); +} + +#[test] +fn sync_run_counter_round_trip() { + let conn = setup_test_db(); + conn.execute( + "INSERT INTO sync_runs (started_at, heartbeat_at, status, command, mode, issues_fetched, mrs_ingested, docs_embedded) + VALUES (4000, 4000, 'succeeded', 'sync', 'surgical', 3, 2, 5)", + [], + ) + .unwrap(); + let row_id = conn.last_insert_rowid(); + let (issues_fetched, mrs_ingested, docs_embedded): (i64, i64, i64) = conn + .query_row( + "SELECT issues_fetched, mrs_ingested, docs_embedded FROM sync_runs WHERE id = ?1", + [row_id], + |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)), + ) + .unwrap(); + assert_eq!(issues_fetched, 3); + assert_eq!(mrs_ingested, 2); + assert_eq!(docs_embedded, 5); +} + +#[test] +fn surgical_lifecycle_start_metadata_succeed() { + let conn = setup_test_db(); + let recorder = SyncRunRecorder::start(&conn, "sync", "surg001").unwrap(); + let row_id = recorder.row_id(); + + recorder + .set_surgical_metadata( + &conn, + "surgical", + "preflight", + r#"{"issues":[7,8],"mrs":[101]}"#, + ) + .unwrap(); + + recorder.update_phase(&conn, "ingest").unwrap(); + recorder + .record_entity_result(&conn, "issue", "fetched") + .unwrap(); + recorder + .record_entity_result(&conn, "issue", "fetched") + .unwrap(); + recorder + .record_entity_result(&conn, "issue", "ingested") + .unwrap(); + recorder + .record_entity_result(&conn, "mr", "fetched") + .unwrap(); + recorder + .record_entity_result(&conn, "mr", "ingested") + .unwrap(); + + recorder.succeed(&conn, &[], 3, 0).unwrap(); + + #[allow(clippy::type_complexity)] + let (mode, phase, iids, issues_fetched, mrs_fetched, issues_ingested, mrs_ingested, status): ( + String, + String, + String, + i64, + i64, + i64, + i64, + String, + ) = conn + .query_row( + "SELECT mode, phase, surgical_iids_json, issues_fetched, mrs_fetched, \ + issues_ingested, mrs_ingested, status \ + FROM sync_runs WHERE id = ?1", + [row_id], + |r| { + Ok(( + r.get(0)?, + r.get(1)?, + r.get(2)?, + r.get(3)?, + r.get(4)?, + r.get(5)?, + r.get(6)?, + r.get(7)?, + )) + }, + ) + .unwrap(); + + assert_eq!(mode, "surgical"); + assert_eq!(phase, "ingest"); + assert!(iids.contains("101")); + assert_eq!(issues_fetched, 2); + assert_eq!(mrs_fetched, 1); + assert_eq!(issues_ingested, 1); + assert_eq!(mrs_ingested, 1); + assert_eq!(status, "succeeded"); +} + +#[test] +fn surgical_lifecycle_cancel() { + let conn = setup_test_db(); + let recorder = SyncRunRecorder::start(&conn, "sync", "cancel01").unwrap(); + let row_id = recorder.row_id(); + + recorder + .set_surgical_metadata(&conn, "surgical", "preflight", "{}") + .unwrap(); + recorder + .cancel(&conn, "User requested cancellation") + .unwrap(); + + let (status, error, cancelled_at, finished_at): ( + String, + Option, + Option, + Option, + ) = conn + .query_row( + "SELECT status, error, cancelled_at, finished_at FROM sync_runs WHERE id = ?1", + [row_id], + |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)), + ) + .unwrap(); + + assert_eq!(status, "cancelled"); + assert_eq!(error.as_deref(), Some("User requested cancellation")); + assert!(cancelled_at.is_some()); + assert!(finished_at.is_some()); +} + +#[test] +fn record_entity_result_ignores_unknown() { + let conn = setup_test_db(); + let recorder = SyncRunRecorder::start(&conn, "sync", "unk001").unwrap(); + recorder + .record_entity_result(&conn, "widget", "exploded") + .unwrap(); +} + +#[test] +fn record_entity_result_doc_counters() { + let conn = setup_test_db(); + let recorder = SyncRunRecorder::start(&conn, "sync", "cnt001").unwrap(); + let row_id = recorder.row_id(); + + recorder + .record_entity_result(&conn, "doc", "regenerated") + .unwrap(); + recorder + .record_entity_result(&conn, "doc", "regenerated") + .unwrap(); + recorder + .record_entity_result(&conn, "doc", "embedded") + .unwrap(); + recorder + .record_entity_result(&conn, "issue", "skipped_stale") + .unwrap(); + + let (docs_regen, docs_embed, skipped): (i64, i64, i64) = conn + .query_row( + "SELECT docs_regenerated, docs_embedded, skipped_stale FROM sync_runs WHERE id = ?1", + [row_id], + |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)), + ) + .unwrap(); + + assert_eq!(docs_regen, 2); + assert_eq!(docs_embed, 1); + assert_eq!(skipped, 1); +} diff --git a/src/documents/mod.rs b/src/documents/mod.rs index 3681cb8..7ae02ac 100644 --- a/src/documents/mod.rs +++ b/src/documents/mod.rs @@ -7,7 +7,10 @@ pub use extractor::{ extract_discussion_document, extract_issue_document, extract_mr_document, extract_note_document, extract_note_document_cached, }; -pub use regenerator::{RegenerateResult, regenerate_dirty_documents}; +pub use regenerator::{ + RegenerateForSourcesResult, RegenerateResult, regenerate_dirty_documents, + regenerate_dirty_documents_for_sources, +}; pub use truncation::{ MAX_DISCUSSION_BYTES, MAX_DOCUMENT_BYTES_HARD, NoteContent, TruncationReason, TruncationResult, truncate_discussion, truncate_hard_cap, truncate_utf8, diff --git a/src/documents/regenerator.rs b/src/documents/regenerator.rs index baaadb3..f43e001 100644 --- a/src/documents/regenerator.rs +++ b/src/documents/regenerator.rs @@ -84,6 +84,60 @@ pub fn regenerate_dirty_documents( Ok(result) } +#[derive(Debug, Default)] +pub struct RegenerateForSourcesResult { + pub regenerated: usize, + pub unchanged: usize, + pub errored: usize, + pub document_ids: Vec, +} + +pub fn regenerate_dirty_documents_for_sources( + conn: &Connection, + source_keys: &[(SourceType, i64)], +) -> Result { + let mut result = RegenerateForSourcesResult::default(); + let mut cache = ParentMetadataCache::new(); + + for &(source_type, source_id) in source_keys { + match regenerate_one(conn, source_type, source_id, &mut cache) { + Ok(changed) => { + if changed { + result.regenerated += 1; + } else { + result.unchanged += 1; + } + clear_dirty(conn, source_type, source_id)?; + + // Try to collect the document_id if a document exists + if let Ok(doc_id) = get_document_id(conn, source_type, source_id) { + result.document_ids.push(doc_id); + } + } + Err(e) => { + warn!( + source_type = %source_type, + source_id, + error = %e, + "Failed to regenerate document for source" + ); + record_dirty_error(conn, source_type, source_id, &e.to_string())?; + result.errored += 1; + } + } + } + + debug!( + regenerated = result.regenerated, + unchanged = result.unchanged, + errored = result.errored, + document_ids = result.document_ids.len(), + "Scoped document regeneration complete" + ); + + Ok(result) +} + fn regenerate_one( conn: &Connection, source_type: SourceType, diff --git a/src/documents/regenerator_tests.rs b/src/documents/regenerator_tests.rs index 04bde8c..2b5d156 100644 --- a/src/documents/regenerator_tests.rs +++ b/src/documents/regenerator_tests.rs @@ -518,3 +518,88 @@ fn test_note_regeneration_cache_invalidates_across_parents() { assert!(beta_content.contains("parent_iid: 99")); assert!(beta_content.contains("parent_title: Issue Beta")); } + +#[test] +fn test_scoped_regen_only_processes_specified_sources() { + let conn = setup_db(); + // Insert two issues + conn.execute( + "INSERT INTO issues (id, gitlab_id, project_id, iid, title, state, created_at, updated_at, last_seen_at) VALUES (1, 10, 1, 42, 'First Issue', 'opened', 1000, 2000, 3000)", + [], + ).unwrap(); + conn.execute( + "INSERT INTO issues (id, gitlab_id, project_id, iid, title, state, created_at, updated_at, last_seen_at) VALUES (2, 20, 1, 43, 'Second Issue', 'opened', 1000, 2000, 3000)", + [], + ).unwrap(); + + // Mark both dirty + mark_dirty(&conn, SourceType::Issue, 1).unwrap(); + mark_dirty(&conn, SourceType::Issue, 2).unwrap(); + + // Regenerate only issue 1 + let result = regenerate_dirty_documents_for_sources(&conn, &[(SourceType::Issue, 1)]).unwrap(); + + assert_eq!(result.regenerated, 1); + assert_eq!(result.errored, 0); + + // Issue 1 should be regenerated and cleared from dirty + let doc_count: i64 = conn + .query_row( + "SELECT COUNT(*) FROM documents WHERE source_type = 'issue' AND source_id = 1", + [], + |r| r.get(0), + ) + .unwrap(); + assert_eq!(doc_count, 1); + + // Issue 2 should still be dirty + let dirty_count: i64 = conn + .query_row( + "SELECT COUNT(*) FROM dirty_sources WHERE source_type = 'issue' AND source_id = 2", + [], + |r| r.get(0), + ) + .unwrap(); + assert_eq!(dirty_count, 1); +} + +#[test] +fn test_scoped_regen_returns_document_ids() { + let conn = setup_db(); + conn.execute( + "INSERT INTO issues (id, gitlab_id, project_id, iid, title, state, created_at, updated_at, last_seen_at) VALUES (1, 10, 1, 42, 'Test Issue', 'opened', 1000, 2000, 3000)", + [], + ).unwrap(); + mark_dirty(&conn, SourceType::Issue, 1).unwrap(); + + let result = regenerate_dirty_documents_for_sources(&conn, &[(SourceType::Issue, 1)]).unwrap(); + + assert_eq!(result.document_ids.len(), 1); + + // Verify returned ID matches the actual document + let actual_id: i64 = conn + .query_row( + "SELECT id FROM documents WHERE source_type = 'issue' AND source_id = 1", + [], + |r| r.get(0), + ) + .unwrap(); + assert_eq!(result.document_ids[0], actual_id); +} + +#[test] +fn test_scoped_regen_handles_missing_source() { + let conn = setup_db(); + // Don't insert any issues — source_id 999 doesn't exist + // But mark it dirty so the function tries to process it + mark_dirty(&conn, SourceType::Issue, 999).unwrap(); + + let result = + regenerate_dirty_documents_for_sources(&conn, &[(SourceType::Issue, 999)]).unwrap(); + + // Source doesn't exist, so regenerate_one returns Ok(true) deleting the doc. + // No document_id to collect since there's nothing in the documents table. + assert_eq!(result.regenerated, 1); + assert_eq!(result.errored, 0); + assert!(result.document_ids.is_empty()); +} diff --git a/src/embedding/mod.rs b/src/embedding/mod.rs index 0e4458c..fd3ac9f 100644 --- a/src/embedding/mod.rs +++ b/src/embedding/mod.rs @@ -7,5 +7,5 @@ pub mod similarity; pub use change_detector::{PendingDocument, count_pending_documents, find_pending_documents}; pub use chunking::{CHUNK_MAX_BYTES, CHUNK_OVERLAP_CHARS, split_into_chunks}; -pub use pipeline::{EmbedResult, embed_documents}; +pub use pipeline::{EmbedForIdsResult, EmbedResult, embed_documents, embed_documents_by_ids}; pub use similarity::cosine_similarity; diff --git a/src/embedding/pipeline.rs b/src/embedding/pipeline.rs index cc84b36..ba37d2d 100644 --- a/src/embedding/pipeline.rs +++ b/src/embedding/pipeline.rs @@ -578,3 +578,207 @@ fn sha256_hash(input: &str) -> String { hasher.update(input.as_bytes()); format!("{:x}", hasher.finalize()) } + +#[derive(Debug, Default)] +pub struct EmbedForIdsResult { + pub chunks_embedded: usize, + pub docs_embedded: usize, + pub failed: usize, + pub skipped: usize, +} + +/// Embed only the documents with the given IDs, skipping any that are +/// already embedded with matching config (model, dims, chunk size, hash). +pub async fn embed_documents_by_ids( + conn: &Connection, + client: &OllamaClient, + model_name: &str, + concurrency: usize, + document_ids: &[i64], + signal: &ShutdownSignal, +) -> Result { + let mut result = EmbedForIdsResult::default(); + + if document_ids.is_empty() { + return Ok(result); + } + + if signal.is_cancelled() { + return Ok(result); + } + + // Load documents for the specified IDs, filtering out already-embedded + let pending = find_documents_by_ids(conn, document_ids, model_name)?; + + if pending.is_empty() { + result.skipped = document_ids.len(); + return Ok(result); + } + + let skipped_count = document_ids.len() - pending.len(); + result.skipped = skipped_count; + + info!( + requested = document_ids.len(), + pending = pending.len(), + skipped = skipped_count, + "Scoped embedding: processing documents by ID" + ); + + // Use the same SAVEPOINT + embed_page pattern as the main pipeline + let mut last_id: i64 = 0; + let mut processed: usize = 0; + let total = pending.len(); + let mut page_stats = EmbedResult::default(); + + conn.execute_batch("SAVEPOINT embed_by_ids")?; + let page_result = embed_page( + conn, + client, + model_name, + concurrency, + &pending, + &mut page_stats, + &mut last_id, + &mut processed, + total, + &None, + signal, + ) + .await; + + match page_result { + Ok(()) if signal.is_cancelled() => { + let _ = conn.execute_batch("ROLLBACK TO embed_by_ids; RELEASE embed_by_ids"); + info!("Rolled back scoped embed page due to cancellation"); + } + Ok(()) => { + conn.execute_batch("RELEASE embed_by_ids")?; + + // Count actual results from DB + let (chunks, docs) = count_embedded_results(conn, &pending)?; + result.chunks_embedded = chunks; + result.docs_embedded = docs; + result.failed = page_stats.failed; + } + Err(e) => { + let _ = conn.execute_batch("ROLLBACK TO embed_by_ids; RELEASE embed_by_ids"); + return Err(e); + } + } + + info!( + chunks_embedded = result.chunks_embedded, + docs_embedded = result.docs_embedded, + failed = result.failed, + skipped = result.skipped, + "Scoped embedding complete" + ); + + Ok(result) +} + +/// Load documents by specific IDs, filtering out those already embedded +/// with matching config (same logic as `find_pending_documents` but scoped). +fn find_documents_by_ids( + conn: &Connection, + document_ids: &[i64], + model_name: &str, +) -> Result> { + use crate::embedding::chunking::{CHUNK_MAX_BYTES, EXPECTED_DIMS}; + + if document_ids.is_empty() { + return Ok(Vec::new()); + } + + // Build IN clause with placeholders + let placeholders: Vec = (0..document_ids.len()) + .map(|i| format!("?{}", i + 1)) + .collect(); + let in_clause = placeholders.join(", "); + + let sql = format!( + r#" + SELECT d.id, d.content_text, d.content_hash + FROM documents d + LEFT JOIN embedding_metadata em + ON em.document_id = d.id AND em.chunk_index = 0 + WHERE d.id IN ({in_clause}) + AND ( + em.document_id IS NULL + OR em.document_hash != d.content_hash + OR em.chunk_max_bytes IS NULL + OR em.chunk_max_bytes != ?{chunk_bytes_idx} + OR em.model != ?{model_idx} + OR em.dims != ?{dims_idx} + ) + ORDER BY d.id + "#, + in_clause = in_clause, + chunk_bytes_idx = document_ids.len() + 1, + model_idx = document_ids.len() + 2, + dims_idx = document_ids.len() + 3, + ); + + let mut stmt = conn.prepare(&sql)?; + + // Build params: document_ids... then chunk_max_bytes, model, dims + let mut params: Vec> = Vec::new(); + for id in document_ids { + params.push(Box::new(*id)); + } + params.push(Box::new(CHUNK_MAX_BYTES as i64)); + params.push(Box::new(model_name.to_string())); + params.push(Box::new(EXPECTED_DIMS as i64)); + + let param_refs: Vec<&dyn rusqlite::types::ToSql> = params.iter().map(|p| p.as_ref()).collect(); + + let rows = stmt + .query_map(param_refs.as_slice(), |row| { + Ok(crate::embedding::change_detector::PendingDocument { + document_id: row.get(0)?, + content_text: row.get(1)?, + content_hash: row.get(2)?, + }) + })? + .collect::, _>>()?; + + Ok(rows) +} + +/// Count how many chunks and complete docs were embedded for the given pending docs. +fn count_embedded_results( + conn: &Connection, + pending: &[crate::embedding::change_detector::PendingDocument], +) -> Result<(usize, usize)> { + let mut total_chunks: usize = 0; + let mut total_docs: usize = 0; + + for doc in pending { + let chunk_count: i64 = conn.query_row( + "SELECT COUNT(*) FROM embedding_metadata WHERE document_id = ?1 AND last_error IS NULL", + [doc.document_id], + |row| row.get(0), + )?; + if chunk_count > 0 { + total_chunks += chunk_count as usize; + // Check if all expected chunks are present (chunk_count metadata on chunk_index=0) + let expected: Option = conn.query_row( + "SELECT chunk_count FROM embedding_metadata WHERE document_id = ?1 AND chunk_index = 0", + [doc.document_id], + |row| row.get(0), + )?; + if let Some(expected_count) = expected + && chunk_count >= expected_count + { + total_docs += 1; + } + } + } + + Ok((total_chunks, total_docs)) +} + +#[cfg(test)] +#[path = "pipeline_tests.rs"] +mod tests; diff --git a/src/embedding/pipeline_tests.rs b/src/embedding/pipeline_tests.rs new file mode 100644 index 0000000..08e272c --- /dev/null +++ b/src/embedding/pipeline_tests.rs @@ -0,0 +1,184 @@ +use std::path::Path; + +use rusqlite::Connection; +use wiremock::matchers::{method, path}; +use wiremock::{Mock, MockServer, ResponseTemplate}; + +use crate::core::db::{create_connection, run_migrations}; +use crate::core::shutdown::ShutdownSignal; +use crate::embedding::chunking::EXPECTED_DIMS; +use crate::embedding::ollama::{OllamaClient, OllamaConfig}; +use crate::embedding::pipeline::embed_documents_by_ids; + +const MODEL: &str = "nomic-embed-text"; + +fn setup_db() -> Connection { + let conn = create_connection(Path::new(":memory:")).unwrap(); + run_migrations(&conn).unwrap(); + conn +} + +fn insert_test_project(conn: &Connection) -> i64 { + conn.execute( + "INSERT INTO projects (gitlab_project_id, path_with_namespace, web_url) + VALUES (1, 'group/test', 'https://gitlab.example.com/group/test')", + [], + ) + .unwrap(); + conn.last_insert_rowid() +} + +fn insert_test_document( + conn: &Connection, + project_id: i64, + source_id: i64, + content: &str, + hash: &str, +) -> i64 { + conn.execute( + "INSERT INTO documents (source_type, source_id, project_id, content_text, content_hash) + VALUES ('issue', ?1, ?2, ?3, ?4)", + rusqlite::params![source_id, project_id, content, hash], + ) + .unwrap(); + conn.last_insert_rowid() +} + +fn make_fake_embedding() -> Vec { + vec![0.1_f32; EXPECTED_DIMS] +} + +fn make_ollama_response(count: usize) -> serde_json::Value { + let embedding = make_fake_embedding(); + let embeddings: Vec<_> = (0..count).map(|_| embedding.clone()).collect(); + serde_json::json!({ + "model": MODEL, + "embeddings": embeddings + }) +} + +fn count_embeddings_for_doc(conn: &Connection, doc_id: i64) -> i64 { + conn.query_row( + "SELECT COUNT(*) FROM embedding_metadata WHERE document_id = ?1", + [doc_id], + |row| row.get(0), + ) + .unwrap() +} + +fn make_client(base_url: &str) -> OllamaClient { + OllamaClient::new(OllamaConfig { + base_url: base_url.to_string(), + model: MODEL.to_string(), + timeout_secs: 10, + }) +} + +#[tokio::test] +async fn test_embed_by_ids_only_embeds_specified_docs() { + let mock_server = MockServer::start().await; + + Mock::given(method("POST")) + .and(path("/api/embed")) + .respond_with(ResponseTemplate::new(200).set_body_json(make_ollama_response(1))) + .mount(&mock_server) + .await; + + let conn = setup_db(); + let proj_id = insert_test_project(&conn); + let doc1 = insert_test_document(&conn, proj_id, 1, "Hello world content for doc 1", "hash_a"); + let doc2 = insert_test_document(&conn, proj_id, 2, "Hello world content for doc 2", "hash_b"); + + let signal = ShutdownSignal::new(); + let client = make_client(&mock_server.uri()); + + // Only embed doc1 + let result = embed_documents_by_ids(&conn, &client, MODEL, 1, &[doc1], &signal) + .await + .unwrap(); + + assert_eq!(result.docs_embedded, 1, "Should embed exactly 1 doc"); + assert!(result.chunks_embedded > 0, "Should have embedded chunks"); + + // doc1 should have embeddings + assert!( + count_embeddings_for_doc(&conn, doc1) > 0, + "doc1 should have embeddings" + ); + + // doc2 should have NO embeddings + assert_eq!( + count_embeddings_for_doc(&conn, doc2), + 0, + "doc2 should have no embeddings" + ); +} + +#[tokio::test] +async fn test_embed_by_ids_skips_already_embedded() { + let mock_server = MockServer::start().await; + + Mock::given(method("POST")) + .and(path("/api/embed")) + .respond_with(ResponseTemplate::new(200).set_body_json(make_ollama_response(1))) + .expect(1) // Should only be called once + .mount(&mock_server) + .await; + + let conn = setup_db(); + let proj_id = insert_test_project(&conn); + let doc1 = insert_test_document(&conn, proj_id, 1, "Hello world content for doc 1", "hash_a"); + + let signal = ShutdownSignal::new(); + let client = make_client(&mock_server.uri()); + + // First embed + let result1 = embed_documents_by_ids(&conn, &client, MODEL, 1, &[doc1], &signal) + .await + .unwrap(); + assert_eq!(result1.docs_embedded, 1); + + // Second embed with same doc — should skip + let result2 = embed_documents_by_ids(&conn, &client, MODEL, 1, &[doc1], &signal) + .await + .unwrap(); + assert_eq!(result2.docs_embedded, 0, "Should embed 0 on second call"); + assert_eq!(result2.skipped, 1, "Should report 1 skipped"); + assert_eq!(result2.chunks_embedded, 0, "No new chunks"); +} + +#[tokio::test] +async fn test_embed_by_ids_empty_input() { + let conn = setup_db(); + let signal = ShutdownSignal::new(); + // Client URL doesn't matter — should never be called + let client = make_client("http://localhost:99999"); + + let result = embed_documents_by_ids(&conn, &client, MODEL, 1, &[], &signal) + .await + .unwrap(); + + assert_eq!(result.docs_embedded, 0); + assert_eq!(result.chunks_embedded, 0); + assert_eq!(result.failed, 0); + assert_eq!(result.skipped, 0); +} + +#[tokio::test] +async fn test_embed_by_ids_respects_cancellation() { + let conn = setup_db(); + let proj_id = insert_test_project(&conn); + let doc1 = insert_test_document(&conn, proj_id, 1, "Hello world content for doc 1", "hash_a"); + + let signal = ShutdownSignal::new(); + signal.cancel(); // Pre-cancel + + let client = make_client("http://localhost:99999"); + + let result = embed_documents_by_ids(&conn, &client, MODEL, 1, &[doc1], &signal) + .await + .unwrap(); + + assert_eq!(result.docs_embedded, 0, "Should embed 0 when cancelled"); + assert_eq!(result.chunks_embedded, 0, "No chunks when cancelled"); +} diff --git a/src/gitlab/client.rs b/src/gitlab/client.rs index 2c69f97..edee077 100644 --- a/src/gitlab/client.rs +++ b/src/gitlab/client.rs @@ -112,6 +112,18 @@ impl GitLabClient { self.request("/api/v4/version").await } + pub async fn get_issue_by_iid(&self, project_id: i64, iid: i64) -> Result { + self.request(&format!("/api/v4/projects/{project_id}/issues/{iid}")) + .await + } + + pub async fn get_mr_by_iid(&self, project_id: i64, iid: i64) -> Result { + self.request(&format!( + "/api/v4/projects/{project_id}/merge_requests/{iid}" + )) + .await + } + const MAX_RETRIES: u32 = 3; async fn request(&self, path: &str) -> Result { @@ -763,6 +775,10 @@ fn ms_to_iso8601(ms: i64) -> Option { .map(|dt| dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string()) } +#[cfg(test)] +#[path = "client_tests.rs"] +mod client_tests; + #[cfg(test)] mod tests { use super::*; diff --git a/src/gitlab/client_tests.rs b/src/gitlab/client_tests.rs new file mode 100644 index 0000000..209eee4 --- /dev/null +++ b/src/gitlab/client_tests.rs @@ -0,0 +1,113 @@ +use super::*; +use crate::core::error::LoreError; +use wiremock::matchers::{header, method, path}; +use wiremock::{Mock, MockServer, ResponseTemplate}; + +#[tokio::test] +async fn get_issue_by_iid_success() { + let server = MockServer::start().await; + let issue_json = serde_json::json!({ + "id": 1001, + "iid": 42, + "project_id": 5, + "title": "Fix login bug", + "state": "opened", + "created_at": "2026-01-15T10:00:00Z", + "updated_at": "2026-02-01T14:30:00Z", + "author": { "id": 1, "username": "dev1", "name": "Developer One" }, + "web_url": "https://gitlab.example.com/group/repo/-/issues/42", + "labels": [], + "milestone": null, + "assignees": [], + "closed_at": null, + "description": "Login fails on mobile" + }); + + Mock::given(method("GET")) + .and(path("/api/v4/projects/5/issues/42")) + .and(header("PRIVATE-TOKEN", "test-token")) + .respond_with(ResponseTemplate::new(200).set_body_json(&issue_json)) + .mount(&server) + .await; + + let client = GitLabClient::new(&server.uri(), "test-token", Some(100.0)); + let issue = client.get_issue_by_iid(5, 42).await.unwrap(); + assert_eq!(issue.iid, 42); + assert_eq!(issue.title, "Fix login bug"); +} + +#[tokio::test] +async fn get_issue_by_iid_not_found() { + let server = MockServer::start().await; + + Mock::given(method("GET")) + .and(path("/api/v4/projects/5/issues/999")) + .respond_with( + ResponseTemplate::new(404) + .set_body_json(serde_json::json!({"message": "404 Not Found"})), + ) + .mount(&server) + .await; + + let client = GitLabClient::new(&server.uri(), "test-token", Some(100.0)); + let err = client.get_issue_by_iid(5, 999).await.unwrap_err(); + assert!(matches!(err, LoreError::GitLabNotFound { .. })); +} + +#[tokio::test] +async fn get_mr_by_iid_success() { + let server = MockServer::start().await; + let mr_json = serde_json::json!({ + "id": 2001, + "iid": 101, + "project_id": 5, + "title": "Add caching layer", + "state": "merged", + "created_at": "2026-01-20T09:00:00Z", + "updated_at": "2026-02-10T16:00:00Z", + "author": { "id": 2, "username": "dev2", "name": "Developer Two" }, + "web_url": "https://gitlab.example.com/group/repo/-/merge_requests/101", + "source_branch": "feature/caching", + "target_branch": "main", + "draft": false, + "labels": [], + "milestone": null, + "assignees": [], + "reviewers": [], + "merged_by": null, + "merged_at": null, + "closed_at": null, + "description": "Adds Redis caching" + }); + + Mock::given(method("GET")) + .and(path("/api/v4/projects/5/merge_requests/101")) + .and(header("PRIVATE-TOKEN", "test-token")) + .respond_with(ResponseTemplate::new(200).set_body_json(&mr_json)) + .mount(&server) + .await; + + let client = GitLabClient::new(&server.uri(), "test-token", Some(100.0)); + let mr = client.get_mr_by_iid(5, 101).await.unwrap(); + assert_eq!(mr.iid, 101); + assert_eq!(mr.title, "Add caching layer"); + assert_eq!(mr.source_branch, "feature/caching"); +} + +#[tokio::test] +async fn get_mr_by_iid_not_found() { + let server = MockServer::start().await; + + Mock::given(method("GET")) + .and(path("/api/v4/projects/5/merge_requests/999")) + .respond_with( + ResponseTemplate::new(404) + .set_body_json(serde_json::json!({"message": "404 Not Found"})), + ) + .mount(&server) + .await; + + let client = GitLabClient::new(&server.uri(), "test-token", Some(100.0)); + let err = client.get_mr_by_iid(5, 999).await.unwrap_err(); + assert!(matches!(err, LoreError::GitLabNotFound { .. })); +} diff --git a/src/ingestion/issues.rs b/src/ingestion/issues.rs index cd912c9..48da301 100644 --- a/src/ingestion/issues.rs +++ b/src/ingestion/issues.rs @@ -140,7 +140,7 @@ fn passes_cursor_filter_with_ts(gitlab_id: i64, issue_ts: i64, cursor: &SyncCurs true } -fn process_single_issue( +pub(crate) fn process_single_issue( conn: &Connection, config: &Config, project_id: i64, diff --git a/src/ingestion/merge_requests.rs b/src/ingestion/merge_requests.rs index 0d1453e..baa5763 100644 --- a/src/ingestion/merge_requests.rs +++ b/src/ingestion/merge_requests.rs @@ -135,13 +135,13 @@ pub async fn ingest_merge_requests( Ok(result) } -struct ProcessMrResult { - labels_created: usize, - assignees_linked: usize, - reviewers_linked: usize, +pub(crate) struct ProcessMrResult { + pub(crate) labels_created: usize, + pub(crate) assignees_linked: usize, + pub(crate) reviewers_linked: usize, } -fn process_single_mr( +pub(crate) fn process_single_mr( conn: &Connection, config: &Config, project_id: i64, diff --git a/src/ingestion/mod.rs b/src/ingestion/mod.rs index aa64675..8d5f3cb 100644 --- a/src/ingestion/mod.rs +++ b/src/ingestion/mod.rs @@ -6,6 +6,7 @@ pub mod merge_requests; pub mod mr_diffs; pub mod mr_discussions; pub mod orchestrator; +pub(crate) mod surgical; pub use discussions::{IngestDiscussionsResult, ingest_issue_discussions}; pub use issues::{IngestIssuesResult, IssueForDiscussionSync, ingest_issues}; diff --git a/src/ingestion/orchestrator.rs b/src/ingestion/orchestrator.rs index cab8650..113c5d7 100644 --- a/src/ingestion/orchestrator.rs +++ b/src/ingestion/orchestrator.rs @@ -1097,7 +1097,7 @@ async fn drain_resource_events( } /// Store resource events using the provided connection (caller manages the transaction). -fn store_resource_events( +pub(crate) fn store_resource_events( conn: &Connection, project_id: i64, entity_type: &str, @@ -1406,7 +1406,7 @@ async fn drain_mr_closes_issues( Ok(result) } -fn store_closes_issues_refs( +pub(crate) fn store_closes_issues_refs( conn: &Connection, project_id: i64, mr_local_id: i64, diff --git a/src/ingestion/surgical.rs b/src/ingestion/surgical.rs new file mode 100644 index 0000000..46161e8 --- /dev/null +++ b/src/ingestion/surgical.rs @@ -0,0 +1,469 @@ +use futures::stream::StreamExt; +use rusqlite::Connection; +use rusqlite::OptionalExtension; +use tracing::{debug, warn}; + +use crate::Config; +use crate::core::error::{LoreError, Result}; +use crate::documents::SourceType; +use crate::gitlab::GitLabClient; +use crate::gitlab::types::{GitLabIssue, GitLabMergeRequest}; +use crate::ingestion::dirty_tracker; +use crate::ingestion::discussions::ingest_issue_discussions; +use crate::ingestion::issues::{IssueForDiscussionSync, process_single_issue}; +use crate::ingestion::merge_requests::{MrForDiscussionSync, process_single_mr}; +use crate::ingestion::mr_diffs::upsert_mr_file_changes; +use crate::ingestion::mr_discussions::ingest_mr_discussions; +use crate::ingestion::orchestrator::{store_closes_issues_refs, store_resource_events}; + +// --------------------------------------------------------------------------- +// Result types +// --------------------------------------------------------------------------- + +#[derive(Debug)] +pub(crate) struct IngestIssueResult { + pub skipped_stale: bool, + pub dirty_source_keys: Vec<(SourceType, i64)>, +} + +#[derive(Debug)] +pub(crate) struct IngestMrResult { + pub skipped_stale: bool, + pub dirty_source_keys: Vec<(SourceType, i64)>, +} + +#[derive(Debug)] +pub(crate) struct PreflightResult { + pub issues: Vec, + pub merge_requests: Vec, + pub failures: Vec, +} + +#[derive(Debug)] +pub(crate) struct PreflightFailure { + pub entity_type: String, + pub iid: i64, + pub error: LoreError, +} + +// --------------------------------------------------------------------------- +// TOCTOU guard +// --------------------------------------------------------------------------- + +/// Returns `true` if the payload is stale (same age or older than what the DB +/// already has). Returns `false` when the entity is new (no DB row) or when +/// the payload is strictly newer. +pub(crate) fn is_stale(payload_updated_at: &str, db_updated_at_ms: Option) -> Result { + let Some(db_ms) = db_updated_at_ms else { + return Ok(false); + }; + + let payload_ms = chrono::DateTime::parse_from_rfc3339(payload_updated_at) + .map(|dt| dt.timestamp_millis()) + .map_err(|e| { + LoreError::Other(format!( + "Failed to parse timestamp '{}': {}", + payload_updated_at, e + )) + })?; + + Ok(payload_ms <= db_ms) +} + +// --------------------------------------------------------------------------- +// Ingestion wrappers +// --------------------------------------------------------------------------- + +/// Ingest a single issue by IID with TOCTOU guard and dirty marking. +pub(crate) fn ingest_issue_by_iid( + conn: &Connection, + config: &Config, + project_id: i64, + issue: &GitLabIssue, +) -> Result { + let db_updated_at = get_db_updated_at(conn, "issues", issue.iid, project_id)?; + + if is_stale(&issue.updated_at, db_updated_at)? { + debug!(iid = issue.iid, "Skipping stale issue (TOCTOU guard)"); + return Ok(IngestIssueResult { + skipped_stale: true, + dirty_source_keys: vec![], + }); + } + + process_single_issue(conn, config, project_id, issue)?; + + let local_id: i64 = conn.query_row( + "SELECT id FROM issues WHERE project_id = ? AND iid = ?", + (project_id, issue.iid), + |row| row.get(0), + )?; + + dirty_tracker::mark_dirty(conn, SourceType::Issue, local_id)?; + + Ok(IngestIssueResult { + skipped_stale: false, + dirty_source_keys: vec![(SourceType::Issue, local_id)], + }) +} + +/// Ingest a single merge request by IID with TOCTOU guard and dirty marking. +pub(crate) fn ingest_mr_by_iid( + conn: &Connection, + config: &Config, + project_id: i64, + mr: &GitLabMergeRequest, +) -> Result { + let db_updated_at = get_db_updated_at(conn, "merge_requests", mr.iid, project_id)?; + + if is_stale(&mr.updated_at, db_updated_at)? { + debug!(iid = mr.iid, "Skipping stale MR (TOCTOU guard)"); + return Ok(IngestMrResult { + skipped_stale: true, + dirty_source_keys: vec![], + }); + } + + process_single_mr(conn, config, project_id, mr)?; + + let local_id: i64 = conn.query_row( + "SELECT id FROM merge_requests WHERE project_id = ? AND iid = ?", + (project_id, mr.iid), + |row| row.get(0), + )?; + + dirty_tracker::mark_dirty(conn, SourceType::MergeRequest, local_id)?; + + Ok(IngestMrResult { + skipped_stale: false, + dirty_source_keys: vec![(SourceType::MergeRequest, local_id)], + }) +} + +// --------------------------------------------------------------------------- +// Preflight fetch +// --------------------------------------------------------------------------- + +/// Fetch specific issues and MRs by IID from GitLab. Collects successes and +/// failures without aborting on individual 404s. +/// +/// Requests are dispatched concurrently (up to 10 in-flight at once) to avoid +/// sequential round-trip latency when syncing many IIDs. +pub(crate) async fn preflight_fetch( + client: &GitLabClient, + gitlab_project_id: i64, + targets: &[(String, i64)], +) -> PreflightResult { + /// Max concurrent HTTP requests during preflight. + const PREFLIGHT_CONCURRENCY: usize = 10; + + #[allow(clippy::large_enum_variant)] + enum FetchOutcome { + Issue(std::result::Result), + MergeRequest(std::result::Result), + UnknownType(String, i64), + } + + let mut result = PreflightResult { + issues: Vec::new(), + merge_requests: Vec::new(), + failures: Vec::new(), + }; + + let mut stream = futures::stream::iter(targets.iter().map(|(entity_type, iid)| { + let entity_type = entity_type.clone(); + let iid = *iid; + async move { + match entity_type.as_str() { + "issue" => FetchOutcome::Issue( + client + .get_issue_by_iid(gitlab_project_id, iid) + .await + .map_err(|e| (entity_type, iid, e)), + ), + "merge_request" => FetchOutcome::MergeRequest( + client + .get_mr_by_iid(gitlab_project_id, iid) + .await + .map_err(|e| (entity_type, iid, e)), + ), + _ => FetchOutcome::UnknownType(entity_type, iid), + } + } + })) + .buffer_unordered(PREFLIGHT_CONCURRENCY); + + while let Some(outcome) = stream.next().await { + match outcome { + FetchOutcome::Issue(Ok(issue)) => result.issues.push(issue), + FetchOutcome::Issue(Err((et, iid, e))) => { + result.failures.push(PreflightFailure { + entity_type: et, + iid, + error: e, + }); + } + FetchOutcome::MergeRequest(Ok(mr)) => result.merge_requests.push(mr), + FetchOutcome::MergeRequest(Err((et, iid, e))) => { + result.failures.push(PreflightFailure { + entity_type: et, + iid, + error: e, + }); + } + FetchOutcome::UnknownType(et, iid) => { + result.failures.push(PreflightFailure { + entity_type: et.clone(), + iid, + error: LoreError::Other(format!("Unknown entity type: {et}")), + }); + } + } + } + + result +} + +// --------------------------------------------------------------------------- +// Dependent fetch helpers (surgical mode) +// --------------------------------------------------------------------------- + +/// Counts returned from fetching dependents for a single entity. +#[derive(Debug, Default)] +pub(crate) struct DependentFetchResult { + pub resource_events_fetched: usize, + pub discussions_fetched: usize, + pub closes_issues_stored: usize, + pub file_changes_stored: usize, +} + +/// Fetch and store all dependents for a single issue: +/// resource events (state, label, milestone) and discussions. +pub(crate) async fn fetch_dependents_for_issue( + client: &GitLabClient, + conn: &Connection, + project_id: i64, + gitlab_project_id: i64, + iid: i64, + local_id: i64, + config: &Config, +) -> Result { + let mut result = DependentFetchResult::default(); + + // --- Resource events --- + match client + .fetch_all_resource_events(gitlab_project_id, "issue", iid) + .await + { + Ok((state_events, label_events, milestone_events)) => { + let count = state_events.len() + label_events.len() + milestone_events.len(); + let tx = conn.unchecked_transaction()?; + store_resource_events( + &tx, + project_id, + "issue", + local_id, + &state_events, + &label_events, + &milestone_events, + )?; + tx.execute( + "UPDATE issues SET resource_events_synced_for_updated_at = updated_at WHERE id = ?", + [local_id], + )?; + tx.commit()?; + result.resource_events_fetched = count; + } + Err(e) => { + warn!( + iid, + error = %e, + "Failed to fetch resource events for issue, continuing" + ); + } + } + + // --- Discussions --- + let sync_item = IssueForDiscussionSync { + local_issue_id: local_id, + iid, + updated_at: 0, // not used for filtering in surgical mode + }; + match ingest_issue_discussions( + conn, + client, + config, + gitlab_project_id, + project_id, + &[sync_item], + ) + .await + { + Ok(disc_result) => { + result.discussions_fetched = disc_result.discussions_fetched; + } + Err(e) => { + warn!( + iid, + error = %e, + "Failed to ingest discussions for issue, continuing" + ); + } + } + + Ok(result) +} + +/// Fetch and store all dependents for a single merge request: +/// resource events, discussions, closes-issues references, and file changes (diffs). +pub(crate) async fn fetch_dependents_for_mr( + client: &GitLabClient, + conn: &Connection, + project_id: i64, + gitlab_project_id: i64, + iid: i64, + local_id: i64, + config: &Config, +) -> Result { + let mut result = DependentFetchResult::default(); + + // --- Resource events --- + match client + .fetch_all_resource_events(gitlab_project_id, "merge_request", iid) + .await + { + Ok((state_events, label_events, milestone_events)) => { + let count = state_events.len() + label_events.len() + milestone_events.len(); + let tx = conn.unchecked_transaction()?; + store_resource_events( + &tx, + project_id, + "merge_request", + local_id, + &state_events, + &label_events, + &milestone_events, + )?; + tx.execute( + "UPDATE merge_requests SET resource_events_synced_for_updated_at = updated_at WHERE id = ?", + [local_id], + )?; + tx.commit()?; + result.resource_events_fetched = count; + } + Err(e) => { + warn!( + iid, + error = %e, + "Failed to fetch resource events for MR, continuing" + ); + } + } + + // --- Discussions --- + let sync_item = MrForDiscussionSync { + local_mr_id: local_id, + iid, + updated_at: 0, + }; + match ingest_mr_discussions( + conn, + client, + config, + gitlab_project_id, + project_id, + &[sync_item], + ) + .await + { + Ok(disc_result) => { + result.discussions_fetched = disc_result.discussions_fetched; + } + Err(e) => { + warn!( + iid, + error = %e, + "Failed to ingest discussions for MR, continuing" + ); + } + } + + // --- Closes issues --- + match client.fetch_mr_closes_issues(gitlab_project_id, iid).await { + Ok(closes_issues) => { + let count = closes_issues.len(); + let tx = conn.unchecked_transaction()?; + store_closes_issues_refs(&tx, project_id, local_id, &closes_issues)?; + tx.execute( + "UPDATE merge_requests SET closes_issues_synced_for_updated_at = updated_at WHERE id = ?", + [local_id], + )?; + tx.commit()?; + result.closes_issues_stored = count; + } + Err(e) => { + warn!( + iid, + error = %e, + "Failed to fetch closes_issues for MR, continuing" + ); + } + } + + // --- File changes (diffs) --- + match client.fetch_mr_diffs(gitlab_project_id, iid).await { + Ok(diffs) => { + let tx = conn.unchecked_transaction()?; + let stored = upsert_mr_file_changes(&tx, local_id, project_id, &diffs)?; + tx.execute( + "UPDATE merge_requests SET diffs_synced_for_updated_at = updated_at WHERE id = ?", + [local_id], + )?; + tx.commit()?; + result.file_changes_stored = stored; + } + Err(e) => { + warn!( + iid, + error = %e, + "Failed to fetch diffs for MR, continuing" + ); + } + } + + Ok(result) +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +fn get_db_updated_at( + conn: &Connection, + table: &str, + iid: i64, + project_id: i64, +) -> Result> { + // Using a match on known table names avoids SQL injection from the table parameter. + let sql = match table { + "issues" => "SELECT updated_at FROM issues WHERE project_id = ?1 AND iid = ?2", + "merge_requests" => { + "SELECT updated_at FROM merge_requests WHERE project_id = ?1 AND iid = ?2" + } + _ => { + return Err(LoreError::Other(format!( + "Unknown table for updated_at lookup: {table}" + ))); + } + }; + + let result: Option = conn + .query_row(sql, (project_id, iid), |row| row.get(0)) + .optional()?; + + Ok(result) +} + +#[cfg(test)] +#[path = "surgical_tests.rs"] +mod tests; diff --git a/src/ingestion/surgical_tests.rs b/src/ingestion/surgical_tests.rs new file mode 100644 index 0000000..65887fa --- /dev/null +++ b/src/ingestion/surgical_tests.rs @@ -0,0 +1,645 @@ +use std::path::Path; + +use super::*; +use crate::core::config::{ + Config, EmbeddingConfig, GitLabConfig, LoggingConfig, ProjectConfig, ScoringConfig, + StorageConfig, SyncConfig, +}; +use crate::core::db::{create_connection, run_migrations}; +use crate::gitlab::types::{GitLabAuthor, GitLabMergeRequest}; + +// --------------------------------------------------------------------------- +// Test helpers +// --------------------------------------------------------------------------- + +fn setup_db() -> rusqlite::Connection { + let conn = create_connection(Path::new(":memory:")).expect("in-memory DB"); + run_migrations(&conn).expect("migrations"); + conn.execute( + "INSERT INTO projects (gitlab_project_id, path_with_namespace, web_url) + VALUES (100, 'group/repo', 'https://example.com/group/repo')", + [], + ) + .expect("insert project"); + conn +} + +fn test_config() -> Config { + Config { + gitlab: GitLabConfig { + base_url: "https://gitlab.example.com".to_string(), + token_env_var: "GITLAB_TOKEN".to_string(), + token: None, + }, + projects: vec![ProjectConfig { + path: "group/repo".to_string(), + }], + default_project: None, + sync: SyncConfig::default(), + storage: StorageConfig::default(), + embedding: EmbeddingConfig::default(), + logging: LoggingConfig::default(), + scoring: ScoringConfig::default(), + } +} + +fn make_test_issue(iid: i64, updated_at: &str) -> GitLabIssue { + GitLabIssue { + id: iid * 1000, // unique gitlab_id + iid, + project_id: 100, + title: format!("Test issue {iid}"), + description: Some("Description".to_string()), + state: "opened".to_string(), + created_at: "2026-01-01T00:00:00.000+00:00".to_string(), + updated_at: updated_at.to_string(), + closed_at: None, + author: GitLabAuthor { + id: 1, + username: "testuser".to_string(), + name: "Test User".to_string(), + }, + assignees: vec![], + labels: vec![], + milestone: None, + due_date: None, + web_url: format!("https://example.com/group/repo/-/issues/{iid}"), + } +} + +fn make_test_mr(iid: i64, updated_at: &str) -> GitLabMergeRequest { + GitLabMergeRequest { + id: iid * 1000, + iid, + project_id: 100, + title: format!("Test MR {iid}"), + description: Some("MR description".to_string()), + state: "opened".to_string(), + draft: false, + work_in_progress: false, + source_branch: "feature".to_string(), + target_branch: "main".to_string(), + sha: Some("abc123".to_string()), + references: None, + detailed_merge_status: None, + merge_status_legacy: None, + created_at: "2026-01-01T00:00:00.000+00:00".to_string(), + updated_at: updated_at.to_string(), + merged_at: None, + closed_at: None, + author: GitLabAuthor { + id: 1, + username: "testuser".to_string(), + name: "Test User".to_string(), + }, + merge_user: None, + merged_by: None, + labels: vec![], + assignees: vec![], + reviewers: vec![], + web_url: format!("https://example.com/group/repo/-/merge_requests/{iid}"), + merge_commit_sha: None, + squash_commit_sha: None, + } +} + +fn get_db_updated_at_helper(conn: &rusqlite::Connection, table: &str, iid: i64) -> Option { + let sql = match table { + "issues" => "SELECT updated_at FROM issues WHERE project_id = 1 AND iid = ?1", + "merge_requests" => { + "SELECT updated_at FROM merge_requests WHERE project_id = 1 AND iid = ?1" + } + _ => return None, + }; + conn.query_row(sql, [iid], |row| row.get(0)).ok() +} + +fn get_dirty_keys(conn: &rusqlite::Connection) -> Vec<(String, i64)> { + let mut stmt = conn + .prepare("SELECT source_type, source_id FROM dirty_sources ORDER BY source_type, source_id") + .expect("prepare dirty_sources query"); + stmt.query_map([], |row| { + let st: String = row.get(0)?; + let id: i64 = row.get(1)?; + Ok((st, id)) + }) + .expect("query dirty_sources") + .collect::, _>>() + .expect("collect dirty_sources") +} + +// --------------------------------------------------------------------------- +// is_stale unit tests +// --------------------------------------------------------------------------- + +#[test] +fn test_is_stale_parses_iso8601() { + // 2026-02-17T12:00:00.000+00:00 -> 1771243200000 ms + let result = is_stale("2026-02-17T12:00:00.000+00:00", Some(1_771_329_600_000)); + assert!(result.is_ok()); + // Same timestamp => stale + assert!(result.unwrap()); +} + +#[test] +fn test_is_stale_handles_none_db_value() { + let result = is_stale("2026-02-17T12:00:00.000+00:00", None); + assert!(result.is_ok()); + assert!(!result.unwrap(), "no DB row means not stale"); +} + +#[test] +fn test_is_stale_with_z_suffix() { + let result = is_stale("2026-02-17T12:00:00Z", Some(1_771_329_600_000)); + assert!(result.is_ok()); + assert!(result.unwrap(), "Z suffix should parse same as +00:00"); +} + +// --------------------------------------------------------------------------- +// Issue ingestion tests +// --------------------------------------------------------------------------- + +#[test] +fn test_ingest_issue_by_iid_upserts_and_marks_dirty() { + let conn = setup_db(); + let config = test_config(); + let issue = make_test_issue(42, "2026-02-17T12:00:00.000+00:00"); + + let result = ingest_issue_by_iid(&conn, &config, 1, &issue).unwrap(); + + assert!(!result.skipped_stale); + assert!(!result.skipped_stale); + assert!(!result.dirty_source_keys.is_empty()); + + // Verify DB row exists + let db_ts = get_db_updated_at_helper(&conn, "issues", 42); + assert!(db_ts.is_some(), "issue should exist in DB"); + + // Verify dirty marking + let dirty = get_dirty_keys(&conn); + assert!( + dirty.iter().any(|(t, _)| t == "issue"), + "dirty_sources should contain an issue entry" + ); +} + +#[test] +fn test_toctou_skips_stale_issue() { + let conn = setup_db(); + let config = test_config(); + let issue = make_test_issue(42, "2026-02-17T12:00:00.000+00:00"); + + // First ingest succeeds + let r1 = ingest_issue_by_iid(&conn, &config, 1, &issue).unwrap(); + assert!(!r1.skipped_stale); + + // Clear dirty to check second ingest doesn't re-mark + conn.execute("DELETE FROM dirty_sources", []).unwrap(); + + // Second ingest with same timestamp should be skipped + let r2 = ingest_issue_by_iid(&conn, &config, 1, &issue).unwrap(); + assert!(r2.skipped_stale); + assert!(r2.skipped_stale); + assert!(r2.dirty_source_keys.is_empty()); + + // No new dirty mark + let dirty = get_dirty_keys(&conn); + assert!(dirty.is_empty(), "stale skip should not create dirty marks"); +} + +#[test] +fn test_toctou_allows_newer_issue() { + let conn = setup_db(); + let config = test_config(); + + // Ingest at T1 + let issue_t1 = make_test_issue(42, "2026-02-17T12:00:00.000+00:00"); + ingest_issue_by_iid(&conn, &config, 1, &issue_t1).unwrap(); + + conn.execute("DELETE FROM dirty_sources", []).unwrap(); + + // Ingest at T2 (newer) — should succeed + let issue_t2 = make_test_issue(42, "2026-02-17T13:00:00.000+00:00"); + let result = ingest_issue_by_iid(&conn, &config, 1, &issue_t2).unwrap(); + + assert!(!result.skipped_stale); + assert!(!result.skipped_stale); +} + +#[test] +fn test_ingest_issue_returns_dirty_source_keys() { + let conn = setup_db(); + let config = test_config(); + let issue = make_test_issue(42, "2026-02-17T12:00:00.000+00:00"); + + let result = ingest_issue_by_iid(&conn, &config, 1, &issue).unwrap(); + + assert_eq!(result.dirty_source_keys.len(), 1); + let (source_type, local_id) = &result.dirty_source_keys[0]; + assert_eq!(source_type.as_str(), "issue"); + assert!(*local_id > 0, "local_id should be positive"); +} + +#[test] +fn test_ingest_issue_updates_existing() { + let conn = setup_db(); + let config = test_config(); + + let issue_v1 = make_test_issue(42, "2026-02-17T12:00:00.000+00:00"); + ingest_issue_by_iid(&conn, &config, 1, &issue_v1).unwrap(); + + let ts1 = get_db_updated_at_helper(&conn, "issues", 42).unwrap(); + + // Newer version + let issue_v2 = make_test_issue(42, "2026-02-17T14:00:00.000+00:00"); + let result = ingest_issue_by_iid(&conn, &config, 1, &issue_v2).unwrap(); + + assert!(!result.skipped_stale); + let ts2 = get_db_updated_at_helper(&conn, "issues", 42).unwrap(); + assert!(ts2 > ts1, "DB timestamp should increase after update"); +} + +// --------------------------------------------------------------------------- +// MR ingestion tests +// --------------------------------------------------------------------------- + +#[test] +fn test_ingest_mr_by_iid_upserts_and_marks_dirty() { + let conn = setup_db(); + let config = test_config(); + let mr = make_test_mr(101, "2026-02-17T12:00:00.000+00:00"); + + let result = ingest_mr_by_iid(&conn, &config, 1, &mr).unwrap(); + + assert!(!result.skipped_stale); + assert!(!result.skipped_stale); + assert!(!result.dirty_source_keys.is_empty()); + + let db_ts = get_db_updated_at_helper(&conn, "merge_requests", 101); + assert!(db_ts.is_some(), "MR should exist in DB"); + + let dirty = get_dirty_keys(&conn); + assert!( + dirty.iter().any(|(t, _)| t == "merge_request"), + "dirty_sources should contain a merge_request entry" + ); +} + +#[test] +fn test_toctou_skips_stale_mr() { + let conn = setup_db(); + let config = test_config(); + let mr = make_test_mr(101, "2026-02-17T12:00:00.000+00:00"); + + let r1 = ingest_mr_by_iid(&conn, &config, 1, &mr).unwrap(); + assert!(!r1.skipped_stale); + + conn.execute("DELETE FROM dirty_sources", []).unwrap(); + + let r2 = ingest_mr_by_iid(&conn, &config, 1, &mr).unwrap(); + assert!(r2.skipped_stale); + assert!(r2.skipped_stale); + assert!(r2.dirty_source_keys.is_empty()); +} + +#[test] +fn test_toctou_allows_newer_mr() { + let conn = setup_db(); + let config = test_config(); + + let mr_t1 = make_test_mr(101, "2026-02-17T12:00:00.000+00:00"); + ingest_mr_by_iid(&conn, &config, 1, &mr_t1).unwrap(); + + conn.execute("DELETE FROM dirty_sources", []).unwrap(); + + let mr_t2 = make_test_mr(101, "2026-02-17T13:00:00.000+00:00"); + let result = ingest_mr_by_iid(&conn, &config, 1, &mr_t2).unwrap(); + + assert!(!result.skipped_stale); + assert!(!result.skipped_stale); +} + +#[test] +fn test_ingest_mr_returns_dirty_source_keys() { + let conn = setup_db(); + let config = test_config(); + let mr = make_test_mr(101, "2026-02-17T12:00:00.000+00:00"); + + let result = ingest_mr_by_iid(&conn, &config, 1, &mr).unwrap(); + + assert_eq!(result.dirty_source_keys.len(), 1); + let (source_type, local_id) = &result.dirty_source_keys[0]; + assert_eq!(source_type.as_str(), "merge_request"); + assert!(*local_id > 0); +} + +#[test] +fn test_ingest_mr_updates_existing() { + let conn = setup_db(); + let config = test_config(); + + let mr_v1 = make_test_mr(101, "2026-02-17T12:00:00.000+00:00"); + ingest_mr_by_iid(&conn, &config, 1, &mr_v1).unwrap(); + + let ts1 = get_db_updated_at_helper(&conn, "merge_requests", 101).unwrap(); + + let mr_v2 = make_test_mr(101, "2026-02-17T14:00:00.000+00:00"); + let result = ingest_mr_by_iid(&conn, &config, 1, &mr_v2).unwrap(); + + assert!(!result.skipped_stale); + let ts2 = get_db_updated_at_helper(&conn, "merge_requests", 101).unwrap(); + assert!(ts2 > ts1, "DB timestamp should increase after update"); +} + +// --------------------------------------------------------------------------- +// Preflight fetch test (wiremock) +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn test_preflight_fetch_returns_issues_and_mrs() { + use wiremock::matchers::{method, path}; + use wiremock::{Mock, MockServer, ResponseTemplate}; + + let mock_server = MockServer::start().await; + + // Issue fixture + let issue_json = serde_json::json!({ + "id": 42000, + "iid": 42, + "project_id": 100, + "title": "Test issue 42", + "description": "desc", + "state": "opened", + "created_at": "2026-01-01T00:00:00.000+00:00", + "updated_at": "2026-02-17T12:00:00.000+00:00", + "author": {"id": 1, "username": "testuser", "name": "Test User"}, + "assignees": [], + "labels": [], + "web_url": "https://example.com/group/repo/-/issues/42" + }); + + // MR fixture + let mr_json = serde_json::json!({ + "id": 101000, + "iid": 101, + "project_id": 100, + "title": "Test MR 101", + "description": "mr desc", + "state": "opened", + "draft": false, + "work_in_progress": false, + "source_branch": "feature", + "target_branch": "main", + "sha": "abc123", + "created_at": "2026-01-01T00:00:00.000+00:00", + "updated_at": "2026-02-17T12:00:00.000+00:00", + "author": {"id": 1, "username": "testuser", "name": "Test User"}, + "labels": [], + "assignees": [], + "reviewers": [], + "web_url": "https://example.com/group/repo/-/merge_requests/101" + }); + + Mock::given(method("GET")) + .and(path("/api/v4/projects/100/issues/42")) + .respond_with(ResponseTemplate::new(200).set_body_json(&issue_json)) + .mount(&mock_server) + .await; + + Mock::given(method("GET")) + .and(path("/api/v4/projects/100/merge_requests/101")) + .respond_with(ResponseTemplate::new(200).set_body_json(&mr_json)) + .mount(&mock_server) + .await; + + let client = GitLabClient::new(&mock_server.uri(), "test-token", None); + let targets = vec![ + ("issue".to_string(), 42i64), + ("merge_request".to_string(), 101i64), + ]; + + let result = preflight_fetch(&client, 100, &targets).await; + + assert_eq!(result.issues.len(), 1); + assert_eq!(result.issues[0].iid, 42); + assert_eq!(result.merge_requests.len(), 1); + assert_eq!(result.merge_requests[0].iid, 101); + assert!(result.failures.is_empty()); +} + +// --------------------------------------------------------------------------- +// Dependent helper tests (bd-kanh) +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn test_fetch_dependents_for_issue_empty_events() { + use wiremock::matchers::{method, path}; + use wiremock::{Mock, MockServer, ResponseTemplate}; + + let mock_server = MockServer::start().await; + let conn = setup_db(); + let config = test_config(); + + // Insert an issue so we have a local_id + let issue = make_test_issue(42, "2026-02-17T12:00:00.000+00:00"); + ingest_issue_by_iid(&conn, &config, 1, &issue).unwrap(); + let local_id: i64 = conn + .query_row( + "SELECT id FROM issues WHERE project_id = 1 AND iid = 42", + [], + |row| row.get(0), + ) + .unwrap(); + + // Mock empty resource event endpoints + Mock::given(method("GET")) + .and(path("/api/v4/projects/100/issues/42/resource_state_events")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([]))) + .mount(&mock_server) + .await; + Mock::given(method("GET")) + .and(path("/api/v4/projects/100/issues/42/resource_label_events")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([]))) + .mount(&mock_server) + .await; + Mock::given(method("GET")) + .and(path( + "/api/v4/projects/100/issues/42/resource_milestone_events", + )) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([]))) + .mount(&mock_server) + .await; + + // Mock empty discussions endpoint + Mock::given(method("GET")) + .and(path("/api/v4/projects/100/issues/42/discussions")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([]))) + .mount(&mock_server) + .await; + + let client = GitLabClient::new(&mock_server.uri(), "test-token", None); + + let result = fetch_dependents_for_issue(&client, &conn, 1, 100, 42, local_id, &config) + .await + .unwrap(); + + assert_eq!(result.resource_events_fetched, 0); + assert_eq!(result.discussions_fetched, 0); +} + +#[tokio::test] +async fn test_fetch_dependents_for_mr_empty_events() { + use wiremock::matchers::{method, path}; + use wiremock::{Mock, MockServer, ResponseTemplate}; + + let mock_server = MockServer::start().await; + let conn = setup_db(); + let config = test_config(); + + // Insert an MR so we have a local_id + let mr = make_test_mr(101, "2026-02-17T12:00:00.000+00:00"); + ingest_mr_by_iid(&conn, &config, 1, &mr).unwrap(); + let local_id: i64 = conn + .query_row( + "SELECT id FROM merge_requests WHERE project_id = 1 AND iid = 101", + [], + |row| row.get(0), + ) + .unwrap(); + + // Mock empty resource event endpoints for MR + Mock::given(method("GET")) + .and(path( + "/api/v4/projects/100/merge_requests/101/resource_state_events", + )) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([]))) + .mount(&mock_server) + .await; + Mock::given(method("GET")) + .and(path( + "/api/v4/projects/100/merge_requests/101/resource_label_events", + )) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([]))) + .mount(&mock_server) + .await; + Mock::given(method("GET")) + .and(path( + "/api/v4/projects/100/merge_requests/101/resource_milestone_events", + )) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([]))) + .mount(&mock_server) + .await; + + // Mock empty discussions endpoint for MR + Mock::given(method("GET")) + .and(path("/api/v4/projects/100/merge_requests/101/discussions")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([]))) + .mount(&mock_server) + .await; + + // Mock empty closes_issues endpoint + Mock::given(method("GET")) + .and(path( + "/api/v4/projects/100/merge_requests/101/closes_issues", + )) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([]))) + .mount(&mock_server) + .await; + + // Mock empty diffs endpoint + Mock::given(method("GET")) + .and(path("/api/v4/projects/100/merge_requests/101/diffs")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([]))) + .mount(&mock_server) + .await; + + let client = GitLabClient::new(&mock_server.uri(), "test-token", None); + + let result = fetch_dependents_for_mr(&client, &conn, 1, 100, 101, local_id, &config) + .await + .unwrap(); + + assert_eq!(result.resource_events_fetched, 0); + assert_eq!(result.discussions_fetched, 0); + assert_eq!(result.closes_issues_stored, 0); + assert_eq!(result.file_changes_stored, 0); +} + +#[tokio::test] +async fn test_fetch_dependents_for_mr_with_closes_issues() { + use wiremock::matchers::{method, path}; + use wiremock::{Mock, MockServer, ResponseTemplate}; + + let mock_server = MockServer::start().await; + let conn = setup_db(); + let config = test_config(); + + // Insert issue and MR so references can resolve + let issue = make_test_issue(42, "2026-02-17T12:00:00.000+00:00"); + ingest_issue_by_iid(&conn, &config, 1, &issue).unwrap(); + + let mr = make_test_mr(101, "2026-02-17T12:00:00.000+00:00"); + ingest_mr_by_iid(&conn, &config, 1, &mr).unwrap(); + let mr_local_id: i64 = conn + .query_row( + "SELECT id FROM merge_requests WHERE project_id = 1 AND iid = 101", + [], + |row| row.get(0), + ) + .unwrap(); + + // Mock empty resource events + for endpoint in [ + "resource_state_events", + "resource_label_events", + "resource_milestone_events", + ] { + Mock::given(method("GET")) + .and(path(format!( + "/api/v4/projects/100/merge_requests/101/{endpoint}" + ))) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([]))) + .mount(&mock_server) + .await; + } + + // Mock empty discussions + Mock::given(method("GET")) + .and(path("/api/v4/projects/100/merge_requests/101/discussions")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([]))) + .mount(&mock_server) + .await; + + // Mock closes_issues with one reference + Mock::given(method("GET")) + .and(path( + "/api/v4/projects/100/merge_requests/101/closes_issues", + )) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([ + { + "id": 42000, + "iid": 42, + "project_id": 100, + "title": "Test issue 42", + "state": "opened", + "web_url": "https://example.com/group/repo/-/issues/42" + } + ]))) + .mount(&mock_server) + .await; + + // Mock empty diffs + Mock::given(method("GET")) + .and(path("/api/v4/projects/100/merge_requests/101/diffs")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([]))) + .mount(&mock_server) + .await; + + let client = GitLabClient::new(&mock_server.uri(), "test-token", None); + + let result = fetch_dependents_for_mr(&client, &conn, 1, 100, 101, mr_local_id, &config) + .await + .unwrap(); + + assert_eq!(result.closes_issues_stored, 1); +} diff --git a/src/main.rs b/src/main.rs index 628b1ad..75b7b5e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -26,14 +26,14 @@ use lore::cli::commands::{ run_cron_status, run_cron_uninstall, run_doctor, run_drift, run_embed, run_file_history, run_generate_docs, run_ingest, run_ingest_dry_run, run_init, run_list_issues, run_list_mrs, run_search, run_show_issue, run_show_mr, run_stats, run_sync, run_sync_status, run_timeline, - run_who, + run_token_set, run_token_show, run_who, }; use lore::cli::render::{ColorMode, GlyphMode, Icons, LoreRenderer, Theme}; use lore::cli::robot::{RobotMeta, strip_schemas}; use lore::cli::{ Cli, Commands, CountArgs, CronAction, CronArgs, EmbedArgs, FileHistoryArgs, GenerateDocsArgs, IngestArgs, IssuesArgs, MrsArgs, NotesArgs, SearchArgs, StatsArgs, SyncArgs, TimelineArgs, - TraceArgs, WhoArgs, + TokenAction, TokenArgs, TraceArgs, WhoArgs, }; use lore::core::db::{ LATEST_SCHEMA_VERSION, create_connection, get_schema_version, run_migrations, @@ -207,6 +207,7 @@ async fn main() { } Some(Commands::Trace(args)) => handle_trace(cli.config.as_deref(), args, robot_mode), Some(Commands::Cron(args)) => handle_cron(cli.config.as_deref(), args, robot_mode), + Some(Commands::Token(args)) => handle_token(cli.config.as_deref(), args, robot_mode).await, Some(Commands::Drift { entity_type, iid, @@ -2154,6 +2155,14 @@ async fn handle_sync_cmd( ) -> Result<(), Box> { let dry_run = args.dry_run && !args.no_dry_run; + // Dedup and sort IIDs + let mut issue_iids = args.issue; + let mut mr_iids = args.mr; + issue_iids.sort_unstable(); + issue_iids.dedup(); + mr_iids.sort_unstable(); + mr_iids.dedup(); + let mut config = Config::load(config_override)?; if args.no_events { config.sync.fetch_resource_events = false; @@ -2172,10 +2181,56 @@ async fn handle_sync_cmd( no_events: args.no_events, robot_mode, dry_run, + issue_iids, + mr_iids, + project: args.project, + preflight_only: args.preflight_only, }; - // For dry run, skip recording and just show the preview - if dry_run { + // Validation: preflight_only requires surgical mode + if options.preflight_only && !options.is_surgical() { + return Err("--preflight-only requires --issue or --mr".into()); + } + + // Validation: full + surgical are incompatible + if options.full && options.is_surgical() { + return Err("--full and --issue/--mr are incompatible".into()); + } + + // Validation: surgical mode requires a project (via -p or config defaultProject) + if options.is_surgical() + && config + .effective_project(options.project.as_deref()) + .is_none() + { + return Err("--issue/--mr requires -p/--project (or set defaultProject in config)".into()); + } + + // Validation: hard cap on total surgical targets + let total_targets = options.issue_iids.len() + options.mr_iids.len(); + if total_targets > SyncOptions::MAX_SURGICAL_TARGETS { + return Err(format!( + "Too many surgical targets ({total_targets}); maximum is {}", + SyncOptions::MAX_SURGICAL_TARGETS + ) + .into()); + } + + // Surgical + dry-run → treat as preflight-only + let mut options = options; + if dry_run && options.is_surgical() { + options.preflight_only = true; + } + + // Resolve effective project for surgical mode: when -p is not passed but + // defaultProject is set in config, populate options.project so the surgical + // orchestrator receives the resolved project path. + if options.is_surgical() && options.project.is_none() { + options.project = config.default_project.clone(); + } + + // For non-surgical dry run, skip recording and just show the preview + if dry_run && !options.is_surgical() { let signal = ShutdownSignal::new(); run_sync(&config, options, None, &signal).await?; return Ok(()); @@ -2199,6 +2254,34 @@ async fn handle_sync_cmd( None }; + // Surgical mode: run_sync_surgical manages its own recorder, signal, and recording. + // Skip the normal recorder setup and let the dispatch handle everything. + if options.is_surgical() { + let signal = ShutdownSignal::new(); + let signal_for_handler = signal.clone(); + tokio::spawn(async move { + let _ = tokio::signal::ctrl_c().await; + eprintln!("\nInterrupted, finishing current batch... (Ctrl+C again to force quit)"); + signal_for_handler.cancel(); + let _ = tokio::signal::ctrl_c().await; + std::process::exit(130); + }); + + let start = std::time::Instant::now(); + match run_sync(&config, options, None, &signal).await { + Ok(result) => { + let elapsed = start.elapsed(); + if robot_mode { + print_sync_json(&result, elapsed.as_millis() as u64, Some(metrics)); + } else { + print_sync(&result, elapsed, Some(metrics), args.timings); + } + return Ok(()); + } + Err(e) => return Err(e.into()), + } + } + let db_path = get_db_path(config.storage.db_path.as_deref()); let recorder_conn = create_connection(&db_path)?; let run_id = uuid::Uuid::new_v4().simple().to_string(); @@ -2287,6 +2370,29 @@ fn handle_cron( } else { print_cron_install(&result); } + // Warn if no stored token — cron runs in a minimal shell with no env vars + if let Ok(config) = Config::load(config_override) + && config + .gitlab + .token + .as_ref() + .is_none_or(|t| t.trim().is_empty()) + { + if robot_mode { + eprintln!( + "{{\"warning\":\"No stored token found. Cron sync requires a stored token. Run: lore token set\"}}" + ); + } else { + eprintln!(); + eprintln!( + " {} No stored token found. Cron sync requires a stored token.", + lore::cli::render::Theme::warning() + .render(lore::cli::render::Icons::warning()), + ); + eprintln!(" Run: lore token set"); + eprintln!(); + } + } } CronAction::Uninstall => { let result = run_cron_uninstall()?; @@ -2312,6 +2418,74 @@ fn handle_cron( Ok(()) } +async fn handle_token( + config_override: Option<&str>, + args: TokenArgs, + robot_mode: bool, +) -> Result<(), Box> { + let start = std::time::Instant::now(); + + match args.action { + TokenAction::Set { token } => { + let result = run_token_set(config_override, token).await?; + let elapsed_ms = start.elapsed().as_millis() as u64; + if robot_mode { + let output = serde_json::json!({ + "ok": true, + "data": { + "action": "set", + "username": result.username, + "config_path": result.config_path, + }, + "meta": { "elapsed_ms": elapsed_ms }, + }); + println!("{}", serde_json::to_string(&output)?); + } else { + println!( + " {} Token stored and validated (authenticated as @{})", + lore::cli::render::Theme::success().render(lore::cli::render::Icons::success()), + result.username + ); + println!( + " {} {}", + lore::cli::render::Theme::dim().render("config:"), + result.config_path + ); + println!(); + } + } + TokenAction::Show { unmask } => { + let result = run_token_show(config_override, unmask)?; + let elapsed_ms = start.elapsed().as_millis() as u64; + if robot_mode { + let output = serde_json::json!({ + "ok": true, + "data": { + "token": result.token, + "source": result.source, + }, + "meta": { "elapsed_ms": elapsed_ms }, + }); + println!("{}", serde_json::to_string(&output)?); + } else { + println!( + " {} {}", + lore::cli::render::Theme::dim().render("token:"), + result.token + ); + println!( + " {} {}", + lore::cli::render::Theme::dim().render("source:"), + result.source + ); + println!(); + } + } + } + + Ok(()) +} + #[derive(Serialize)] struct HealthOutput { ok: bool, @@ -2513,13 +2687,31 @@ fn handle_robot_docs(robot_mode: bool, brief: bool) -> Result<(), Box generate-docs -> embed", - "flags": ["--full", "--no-full", "--force", "--no-force", "--no-embed", "--no-docs", "--no-events", "--no-file-changes", "--no-status", "--dry-run", "--no-dry-run"], + "description": "Full sync pipeline: ingest -> generate-docs -> embed. Supports surgical per-IID mode.", + "flags": ["--full", "--no-full", "--force", "--no-force", "--no-embed", "--no-docs", "--no-events", "--no-file-changes", "--no-status", "--dry-run", "--no-dry-run", "-t/--timings", "--lock", "--issue ", "--mr ", "-p/--project ", "--preflight-only"], "example": "lore --robot sync", + "surgical_mode": { + "description": "Sync specific issues or MRs by IID. Runs a scoped pipeline: preflight -> TOCTOU check -> ingest -> dependents -> docs -> embed.", + "flags": ["--issue (repeatable)", "--mr (repeatable)", "-p/--project (required)", "--preflight-only"], + "examples": [ + "lore --robot sync --issue 7 -p group/project", + "lore --robot sync --issue 7 --issue 42 --mr 10 -p group/project", + "lore --robot sync --issue 7 -p group/project --preflight-only" + ], + "constraints": ["--issue/--mr requires -p/--project (or defaultProject in config)", "--full and --issue/--mr are incompatible", "--preflight-only requires --issue or --mr", "Max 100 total targets"], + "entity_result_outcomes": ["synced", "skipped_stale", "not_found", "preflight_failed", "error"] + }, "response_schema": { - "ok": "bool", - "data": {"issues_updated": "int", "mrs_updated": "int", "documents_regenerated": "int", "documents_embedded": "int", "resource_events_synced": "int", "resource_events_failed": "int"}, - "meta": {"elapsed_ms": "int", "stages?": "[{name:string, elapsed_ms:int, items_processed:int}]"} + "normal": { + "ok": "bool", + "data": {"issues_updated": "int", "mrs_updated": "int", "documents_regenerated": "int", "documents_embedded": "int", "resource_events_synced": "int", "resource_events_failed": "int"}, + "meta": {"elapsed_ms": "int", "stages?": "[{name:string, elapsed_ms:int, items_processed:int}]"} + }, + "surgical": { + "ok": "bool", + "data": {"surgical_mode": "true", "surgical_iids": "{issues:[int], merge_requests:[int]}", "entity_results": "[{entity_type:string, iid:int, outcome:string, error?:string, toctou_reason?:string}]", "preflight_only?": "bool", "issues_updated": "int", "mrs_updated": "int", "documents_regenerated": "int", "documents_embedded": "int", "discussions_fetched": "int"}, + "meta": {"elapsed_ms": "int"} + } } }, "issues": { @@ -2668,7 +2860,7 @@ fn handle_robot_docs(robot_mode: bool, brief: bool) -> Result<(), Box", "--path ", "--active", "--overlap ", "--reviews", "--since ", "-p/--project", "-n/--limit", "--fields ", "--detail", "--no-detail", "--as-of ", "--explain-score", "--include-bots", "--all-history"], + "flags": ["", "--path ", "--active", "--overlap ", "--reviews", "--since ", "-p/--project", "-n/--limit", "--fields ", "--detail", "--no-detail", "--as-of ", "--explain-score", "--include-bots", "--include-closed", "--all-history"], "modes": { "expert": "lore who -- Who knows about this area? (also: --path for root files)", "workload": "lore who -- What is someone working on?", @@ -2726,7 +2918,7 @@ fn handle_robot_docs(robot_mode: bool, brief: bool) -> Result<(), Box", "--author/-a ", "--note-type ", "--contains ", "--for-issue ", "--for-mr ", "-p/--project ", "--since ", "--until ", "--path ", "--resolution ", "--sort ", "--asc", "--include-system", "--note-id ", "--gitlab-note-id ", "--discussion-id ", "--format ", "--fields ", "--open"], + "flags": ["--limit/-n ", "--author/-a ", "--note-type ", "--contains ", "--for-issue ", "--for-mr ", "-p/--project ", "--since ", "--until ", "--path ", "--resolution ", "--sort ", "--asc", "--include-system", "--note-id ", "--gitlab-note-id ", "--discussion-id ", "--fields ", "--open"], "robot_flags": ["--format json", "--fields minimal"], "example": "lore --robot notes --author jdefting --since 1y --format json --fields minimal", "response_schema": { @@ -2735,6 +2927,33 @@ fn handle_robot_docs(robot_mode: bool, brief: bool) -> Result<(), Box"], "default_interval": 8}, + "uninstall": {"flags": []}, + "status": {"flags": []} + }, + "example": "lore --robot cron status", + "response_schema": { + "ok": "bool", + "data": {"action": "string (install|uninstall|status)", "installed?": "bool", "interval_minutes?": "int", "entry?": "string", "log_path?": "string", "replaced?": "bool", "was_installed?": "bool", "last_run_iso?": "string"}, + "meta": {"elapsed_ms": "int"} + } + }, + "token": { + "description": "Manage stored GitLab token", + "subcommands": { + "set": {"flags": ["--token "], "note": "Reads from stdin if --token omitted in non-interactive mode"}, + "show": {"flags": ["--unmask"]} + }, + "example": "lore --robot token show", + "response_schema": { + "ok": "bool", + "data": {"action": "string (set|show)", "token_masked?": "string", "token?": "string", "valid?": "bool", "username?": "string"}, + "meta": {"elapsed_ms": "int"} + } + }, "robot-docs": { "description": "This command (agent self-discovery manifest)", "flags": ["--brief"], @@ -2756,10 +2975,14 @@ fn handle_robot_docs(robot_mode: bool, brief: bool) -> Result<(), Box MR -> issue -> discussion)", + "file-history: MR history per file with rename resolution", "notes: Rich note listing with author, type, resolution, path, and discussion filters", "stats: Database statistics with document/note/discussion counts", "count: Entity counts with state breakdowns", - "embed: Generate vector embeddings for semantic search via Ollama" + "embed: Generate vector embeddings for semantic search via Ollama", + "cron: Automated sync scheduling (Unix)", + "token: Secure token management with masked display" ], "read_write_split": "lore = ALL reads (issues, MRs, search, who, timeline, intelligence). glab = ALL writes (create, update, approve, merge, CI/CD)." }); @@ -2821,6 +3044,11 @@ fn handle_robot_docs(robot_mode: bool, brief: bool) -> Result<(), Box