feat(cli): add 'lore related' semantic similarity command (bd-8con)

Adds 'lore related' / 'lore similar' command for discovering semantically
related issues and MRs using vector embeddings.

Two modes:
- Entity mode: find entities similar to a specific issue/MR
- Query mode: embed free text and find matching entities

Includes distance-to-similarity conversion, label intersection,
human and robot output formatters, and 11 unit tests.
This commit is contained in:
teernisse
2026-02-19 08:01:55 -05:00
parent 3e96f19a11
commit c8dece8c60
27 changed files with 4066 additions and 33 deletions

File diff suppressed because one or more lines are too long

View File

@@ -1 +1 @@
bd-2o49 bd-8con

View File

@@ -377,3 +377,120 @@ fn test_sync_completed_from_bootstrap_resets_navigation_and_state() {
assert_eq!(app.navigation.depth(), 1); assert_eq!(app.navigation.depth(), 1);
assert!(!app.state.bootstrap.sync_started); assert!(!app.state.bootstrap.sync_started);
} }
#[test]
fn test_sync_completed_flushes_entity_caches() {
use crate::message::EntityKey;
use crate::state::issue_detail::{IssueDetailData, IssueMetadata};
use crate::state::mr_detail::{MrDetailData, MrMetadata};
use crate::state::{CachedIssuePayload, CachedMrPayload};
use crate::view::common::cross_ref::CrossRef;
let mut app = test_app();
// Populate caches with dummy data.
let issue_key = EntityKey::issue(1, 42);
app.state.issue_cache.put(
issue_key,
CachedIssuePayload {
data: IssueDetailData {
metadata: IssueMetadata {
iid: 42,
project_path: "g/p".into(),
title: "Test".into(),
description: String::new(),
state: "opened".into(),
author: "alice".into(),
assignees: vec![],
labels: vec![],
milestone: None,
due_date: None,
created_at: 0,
updated_at: 0,
web_url: String::new(),
discussion_count: 0,
},
cross_refs: Vec::<CrossRef>::new(),
},
discussions: vec![],
},
);
let mr_key = EntityKey::mr(1, 99);
app.state.mr_cache.put(
mr_key,
CachedMrPayload {
data: MrDetailData {
metadata: MrMetadata {
iid: 99,
project_path: "g/p".into(),
title: "MR".into(),
description: String::new(),
state: "opened".into(),
draft: false,
author: "bob".into(),
assignees: vec![],
reviewers: vec![],
labels: vec![],
source_branch: "feat".into(),
target_branch: "main".into(),
merge_status: String::new(),
created_at: 0,
updated_at: 0,
merged_at: None,
web_url: String::new(),
discussion_count: 0,
file_change_count: 0,
},
cross_refs: Vec::<CrossRef>::new(),
file_changes: vec![],
},
discussions: vec![],
},
);
assert_eq!(app.state.issue_cache.len(), 1);
assert_eq!(app.state.mr_cache.len(), 1);
// Sync completes — caches should be flushed.
app.update(Msg::SyncCompleted { elapsed_ms: 500 });
assert!(
app.state.issue_cache.is_empty(),
"issue cache should be flushed after sync"
);
assert!(
app.state.mr_cache.is_empty(),
"MR cache should be flushed after sync"
);
}
#[test]
fn test_sync_completed_refreshes_current_detail_view() {
use crate::message::EntityKey;
use crate::state::LoadState;
let mut app = test_app();
// Navigate to an issue detail screen.
let key = EntityKey::issue(1, 42);
app.update(Msg::NavigateTo(Screen::IssueDetail(key)));
// Simulate load completion so LoadState goes to Idle.
app.state.set_loading(
Screen::IssueDetail(EntityKey::issue(1, 42)),
LoadState::Idle,
);
// Sync completes while viewing the detail.
app.update(Msg::SyncCompleted { elapsed_ms: 300 });
// The detail screen should have been set to Refreshing.
assert_eq!(
*app.state
.load_state
.get(&Screen::IssueDetail(EntityKey::issue(1, 42))),
LoadState::Refreshing,
"detail view should refresh after sync"
);
}

View File

@@ -542,6 +542,11 @@ impl LoreApp {
Msg::SyncCompleted { elapsed_ms } => { Msg::SyncCompleted { elapsed_ms } => {
self.state.sync.complete(elapsed_ms); self.state.sync.complete(elapsed_ms);
// Flush entity caches — sync may have updated any entity's
// metadata, discussions, or cross-refs in the DB.
self.state.issue_cache.clear();
self.state.mr_cache.clear();
// If we came from Bootstrap, replace nav history with Dashboard. // If we came from Bootstrap, replace nav history with Dashboard.
if *self.navigation.current() == Screen::Bootstrap { if *self.navigation.current() == Screen::Bootstrap {
self.state.bootstrap.sync_started = false; self.state.bootstrap.sync_started = false;
@@ -557,6 +562,19 @@ impl LoreApp {
self.state.set_loading(dashboard.clone(), load_state); self.state.set_loading(dashboard.clone(), load_state);
let _handle = self.supervisor.submit(TaskKey::LoadScreen(dashboard)); let _handle = self.supervisor.submit(TaskKey::LoadScreen(dashboard));
} }
// If currently on a detail view, refresh it so the user sees
// updated data without navigating away and back.
let current = self.navigation.current().clone();
match &current {
Screen::IssueDetail(_) | Screen::MrDetail(_) => {
self.state
.set_loading(current.clone(), LoadState::Refreshing);
let _handle = self.supervisor.submit(TaskKey::LoadScreen(current));
}
_ => {}
}
Cmd::none() Cmd::none()
} }
Msg::SyncCancelled => { Msg::SyncCancelled => {

View File

@@ -0,0 +1,20 @@
-- Migration 028: Extend sync_runs for surgical sync observability
-- Adds mode/phase tracking and surgical-specific counters.
ALTER TABLE sync_runs ADD COLUMN mode TEXT;
ALTER TABLE sync_runs ADD COLUMN phase TEXT;
ALTER TABLE sync_runs ADD COLUMN surgical_iids_json TEXT;
ALTER TABLE sync_runs ADD COLUMN issues_fetched INTEGER NOT NULL DEFAULT 0;
ALTER TABLE sync_runs ADD COLUMN mrs_fetched INTEGER NOT NULL DEFAULT 0;
ALTER TABLE sync_runs ADD COLUMN issues_ingested INTEGER NOT NULL DEFAULT 0;
ALTER TABLE sync_runs ADD COLUMN mrs_ingested INTEGER NOT NULL DEFAULT 0;
ALTER TABLE sync_runs ADD COLUMN skipped_stale INTEGER NOT NULL DEFAULT 0;
ALTER TABLE sync_runs ADD COLUMN docs_regenerated INTEGER NOT NULL DEFAULT 0;
ALTER TABLE sync_runs ADD COLUMN docs_embedded INTEGER NOT NULL DEFAULT 0;
ALTER TABLE sync_runs ADD COLUMN warnings_count INTEGER NOT NULL DEFAULT 0;
ALTER TABLE sync_runs ADD COLUMN cancelled_at INTEGER;
CREATE INDEX IF NOT EXISTS idx_sync_runs_mode_started
ON sync_runs(mode, started_at DESC);
CREATE INDEX IF NOT EXISTS idx_sync_runs_status_phase_started
ON sync_runs(status, phase, started_at DESC);

View File

@@ -129,6 +129,10 @@ const COMMAND_FLAGS: &[(&str, &[&str])] = &[
"--no-dry-run", "--no-dry-run",
"--timings", "--timings",
"--tui", "--tui",
"--issue",
"--mr",
"--project",
"--preflight-only",
], ],
), ),
( (

View File

@@ -8,11 +8,13 @@ pub mod generate_docs;
pub mod ingest; pub mod ingest;
pub mod init; pub mod init;
pub mod list; pub mod list;
pub mod related;
pub mod search; pub mod search;
pub mod show; pub mod show;
pub mod stats; pub mod stats;
pub mod sync; pub mod sync;
pub mod sync_status; pub mod sync_status;
pub mod sync_surgical;
pub mod timeline; pub mod timeline;
pub mod trace; pub mod trace;
pub mod tui; pub mod tui;
@@ -39,6 +41,7 @@ pub use list::{
print_list_notes, print_list_notes_csv, print_list_notes_json, print_list_notes_jsonl, print_list_notes, print_list_notes_csv, print_list_notes_json, print_list_notes_jsonl,
query_issues, query_mrs, query_notes, run_list_issues, run_list_mrs, query_issues, query_mrs, query_notes, run_list_issues, run_list_mrs,
}; };
pub use related::{print_related, print_related_json, run_related};
pub use search::{ pub use search::{
SearchCliFilters, SearchResponse, print_search_results, print_search_results_json, run_search, SearchCliFilters, SearchResponse, print_search_results, print_search_results_json, run_search,
}; };
@@ -49,6 +52,7 @@ pub use show::{
pub use stats::{print_stats, print_stats_json, run_stats}; pub use stats::{print_stats, print_stats_json, run_stats};
pub use sync::{SyncOptions, SyncResult, print_sync, print_sync_json, run_sync}; pub use sync::{SyncOptions, SyncResult, print_sync, print_sync_json, run_sync};
pub use sync_status::{print_sync_status, print_sync_status_json, run_sync_status}; pub use sync_status::{print_sync_status, print_sync_status_json, run_sync_status};
pub use sync_surgical::run_sync_surgical;
pub use timeline::{TimelineParams, print_timeline, print_timeline_json_with_meta, run_timeline}; pub use timeline::{TimelineParams, print_timeline, print_timeline_json_with_meta, run_timeline};
pub use trace::{parse_trace_path, print_trace, print_trace_json}; pub use trace::{parse_trace_path, print_trace, print_trace_json};
pub use tui::{TuiArgs, find_lore_tui, run_tui}; pub use tui::{TuiArgs, find_lore_tui, run_tui};

692
src/cli/commands/related.rs Normal file
View File

@@ -0,0 +1,692 @@
use std::collections::HashSet;
use serde::Serialize;
use crate::cli::render::{Icons, Theme};
use crate::core::config::Config;
use crate::core::db::create_connection;
use crate::core::error::{LoreError, Result};
use crate::core::paths::get_db_path;
use crate::core::project::resolve_project;
use crate::embedding::ollama::{OllamaClient, OllamaConfig};
use crate::search::search_vector;
// ---------------------------------------------------------------------------
// Public types
// ---------------------------------------------------------------------------
#[derive(Debug, Serialize)]
pub struct RelatedSource {
pub source_type: String,
pub iid: Option<i64>,
pub title: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct RelatedResult {
pub source_type: String,
pub iid: i64,
pub title: String,
pub url: Option<String>,
pub similarity_score: f64,
pub shared_labels: Vec<String>,
pub project_path: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct RelatedResponse {
pub source: RelatedSource,
pub query: Option<String>,
pub results: Vec<RelatedResult>,
pub mode: String,
}
// ---------------------------------------------------------------------------
// Pure helpers (unit-testable)
// ---------------------------------------------------------------------------
/// Convert L2 distance to a 0-1 similarity score.
///
/// Inverse relationship: closer (lower distance) = higher similarity.
/// The +1 prevents division by zero and ensures score is in (0, 1].
fn distance_to_similarity(distance: f64) -> f64 {
1.0 / (1.0 + distance)
}
/// Parse the JSON `label_names` column into a set of labels.
fn parse_label_names(label_names_json: &Option<String>) -> HashSet<String> {
label_names_json
.as_deref()
.and_then(|s| serde_json::from_str::<Vec<String>>(s).ok())
.unwrap_or_default()
.into_iter()
.collect()
}
// ---------------------------------------------------------------------------
// Internal row types
// ---------------------------------------------------------------------------
struct DocRow {
id: i64,
content_text: String,
label_names: Option<String>,
title: Option<String>,
}
struct HydratedDoc {
source_type: String,
iid: i64,
title: String,
url: Option<String>,
label_names: Option<String>,
project_path: Option<String>,
}
/// (source_type, source_id, label_names, url, project_id)
type DocMetaRow = (String, i64, Option<String>, Option<String>, i64);
// ---------------------------------------------------------------------------
// Main entry point
// ---------------------------------------------------------------------------
pub async fn run_related(
config: &Config,
entity_type: Option<&str>,
entity_iid: Option<i64>,
query_text: Option<&str>,
project: Option<&str>,
limit: usize,
) -> Result<RelatedResponse> {
let db_path = get_db_path(config.storage.db_path.as_deref());
let conn = create_connection(&db_path)?;
// Check that embeddings exist at all.
let embedding_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM embedding_metadata WHERE last_error IS NULL",
[],
|row| row.get(0),
)
.unwrap_or(0);
if embedding_count == 0 {
return Err(LoreError::EmbeddingsNotBuilt);
}
match (entity_type, entity_iid) {
(Some(etype), Some(iid)) => {
run_entity_mode(config, &conn, etype, iid, project, limit).await
}
_ => {
let text = query_text.unwrap_or("");
if text.is_empty() {
return Err(LoreError::Other(
"Provide either an entity type + IID or a free-text query.".into(),
));
}
run_query_mode(config, &conn, text, project, limit).await
}
}
}
// ---------------------------------------------------------------------------
// Entity mode: find entities similar to a specific issue/MR
// ---------------------------------------------------------------------------
async fn run_entity_mode(
config: &Config,
conn: &rusqlite::Connection,
entity_type: &str,
iid: i64,
project: Option<&str>,
limit: usize,
) -> Result<RelatedResponse> {
let source_type = match entity_type {
"issues" | "issue" => "issue",
"mrs" | "mr" | "merge-requests" | "merge_request" => "merge_request",
other => {
return Err(LoreError::Other(format!(
"Unknown entity type '{other}'. Use 'issues' or 'mrs'."
)));
}
};
// Resolve project (optional but needed for multi-project setups).
let project_id = match project {
Some(p) => Some(resolve_project(conn, p)?),
None => None,
};
// Find the source document.
let doc = find_entity_document(conn, source_type, iid, project_id)?;
// Get or compute the embedding.
let embedding = get_or_compute_embedding(config, conn, &doc).await?;
// KNN search (request extra to filter self).
let vector_results = search_vector(conn, &embedding, limit + 5)?;
// Hydrate and filter.
let source_labels = parse_label_names(&doc.label_names);
let mut results = Vec::new();
for vr in vector_results {
// Exclude self.
if vr.document_id == doc.id {
continue;
}
if let Some(hydrated) = hydrate_document(conn, vr.document_id)? {
let result_labels = parse_label_names(&hydrated.label_names);
let shared: Vec<String> = source_labels
.intersection(&result_labels)
.cloned()
.collect();
results.push(RelatedResult {
source_type: hydrated.source_type,
iid: hydrated.iid,
title: hydrated.title,
url: hydrated.url,
similarity_score: distance_to_similarity(vr.distance),
shared_labels: shared,
project_path: hydrated.project_path,
});
}
if results.len() >= limit {
break;
}
}
Ok(RelatedResponse {
source: RelatedSource {
source_type: source_type.to_string(),
iid: Some(iid),
title: doc.title,
},
query: None,
results,
mode: "entity".to_string(),
})
}
// ---------------------------------------------------------------------------
// Query mode: embed free text and find similar entities
// ---------------------------------------------------------------------------
async fn run_query_mode(
config: &Config,
conn: &rusqlite::Connection,
text: &str,
project: Option<&str>,
limit: usize,
) -> Result<RelatedResponse> {
let ollama = OllamaClient::new(OllamaConfig {
base_url: config.embedding.base_url.clone(),
model: config.embedding.model.clone(),
timeout_secs: 60,
});
let embeddings = ollama.embed_batch(&[text]).await?;
let embedding = embeddings
.into_iter()
.next()
.ok_or_else(|| LoreError::Other("Ollama returned empty embedding result.".to_string()))?;
let vector_results = search_vector(conn, &embedding, limit)?;
let _project_id = match project {
Some(p) => Some(resolve_project(conn, p)?),
None => None,
};
let mut results = Vec::new();
for vr in vector_results {
if let Some(hydrated) = hydrate_document(conn, vr.document_id)? {
results.push(RelatedResult {
source_type: hydrated.source_type,
iid: hydrated.iid,
title: hydrated.title,
url: hydrated.url,
similarity_score: distance_to_similarity(vr.distance),
shared_labels: Vec::new(), // No source labels in query mode.
project_path: hydrated.project_path,
});
}
if results.len() >= limit {
break;
}
}
Ok(RelatedResponse {
source: RelatedSource {
source_type: "query".to_string(),
iid: None,
title: None,
},
query: Some(text.to_string()),
results,
mode: "query".to_string(),
})
}
// ---------------------------------------------------------------------------
// DB helpers
// ---------------------------------------------------------------------------
fn find_entity_document(
conn: &rusqlite::Connection,
source_type: &str,
iid: i64,
project_id: Option<i64>,
) -> Result<DocRow> {
let (table, iid_col) = match source_type {
"issue" => ("issues", "iid"),
"merge_request" => ("merge_requests", "iid"),
_ => {
return Err(LoreError::Other(format!(
"Unknown source type: {source_type}"
)));
}
};
// We build the query dynamically because the table name differs.
let project_filter = if project_id.is_some() {
"AND e.project_id = ?3".to_string()
} else {
String::new()
};
let sql = format!(
"SELECT d.id, d.content_text, d.label_names, d.title \
FROM documents d \
JOIN {table} e ON d.source_type = ?1 AND d.source_id = e.id \
WHERE e.{iid_col} = ?2 {project_filter} \
LIMIT 1"
);
let mut stmt = conn.prepare(&sql)?;
let params: Vec<Box<dyn rusqlite::types::ToSql>> = if let Some(pid) = project_id {
vec![
Box::new(source_type.to_string()),
Box::new(iid),
Box::new(pid),
]
} else {
vec![Box::new(source_type.to_string()), Box::new(iid)]
};
let param_refs: Vec<&dyn rusqlite::types::ToSql> = params.iter().map(|p| p.as_ref()).collect();
let doc = stmt
.query_row(param_refs.as_slice(), |row| {
Ok(DocRow {
id: row.get(0)?,
content_text: row.get(1)?,
label_names: row.get(2)?,
title: row.get(3)?,
})
})
.map_err(|_| {
LoreError::NotFound(format!(
"{source_type} #{iid} not found. Run 'lore sync' to fetch the latest data."
))
})?;
Ok(doc)
}
/// Get the embedding for a document from the DB, or compute it on-the-fly via Ollama.
async fn get_or_compute_embedding(
config: &Config,
conn: &rusqlite::Connection,
doc: &DocRow,
) -> Result<Vec<f32>> {
// Try to find an existing embedding in the vec0 table.
use crate::embedding::chunk_ids::encode_rowid;
let rowid = encode_rowid(doc.id, 0);
let result: Option<Vec<u8>> = conn
.query_row(
"SELECT embedding FROM embeddings WHERE rowid = ?1",
rusqlite::params![rowid],
|row| row.get(0),
)
.ok();
if let Some(bytes) = result {
// Decode f32 vec from raw bytes.
let floats: Vec<f32> = bytes
.chunks_exact(4)
.map(|chunk| f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]))
.collect();
if !floats.is_empty() {
return Ok(floats);
}
}
// Fallback: embed the content on-the-fly via Ollama.
let ollama = OllamaClient::new(OllamaConfig {
base_url: config.embedding.base_url.clone(),
model: config.embedding.model.clone(),
timeout_secs: 60,
});
let embeddings = ollama.embed_batch(&[&doc.content_text]).await?;
embeddings
.into_iter()
.next()
.ok_or_else(|| LoreError::Other("Ollama returned empty embedding result.".to_string()))
}
/// Hydrate a document_id into a displayable result by joining back to the source entity.
fn hydrate_document(conn: &rusqlite::Connection, document_id: i64) -> Result<Option<HydratedDoc>> {
// First get the document metadata.
let doc_row: Option<DocMetaRow> = conn
.query_row(
"SELECT d.source_type, d.source_id, d.label_names, d.url, d.project_id \
FROM documents d WHERE d.id = ?1",
rusqlite::params![document_id],
|row| {
Ok((
row.get(0)?,
row.get(1)?,
row.get(2)?,
row.get(3)?,
row.get(4)?,
))
},
)
.ok();
let Some((source_type, source_id, label_names, url, project_id)) = doc_row else {
return Ok(None);
};
// Get the project path.
let project_path: Option<String> = conn
.query_row(
"SELECT path_with_namespace FROM projects WHERE id = ?1",
rusqlite::params![project_id],
|row| row.get(0),
)
.ok();
// Get the entity IID and title from the source table.
let (iid, title) = match source_type.as_str() {
"issue" => {
let row: Option<(i64, String)> = conn
.query_row(
"SELECT iid, title FROM issues WHERE id = ?1",
rusqlite::params![source_id],
|row| Ok((row.get(0)?, row.get(1)?)),
)
.ok();
match row {
Some((iid, title)) => (iid, title),
None => return Ok(None),
}
}
"merge_request" => {
let row: Option<(i64, String)> = conn
.query_row(
"SELECT iid, title FROM merge_requests WHERE id = ?1",
rusqlite::params![source_id],
|row| Ok((row.get(0)?, row.get(1)?)),
)
.ok();
match row {
Some((iid, title)) => (iid, title),
None => return Ok(None),
}
}
// Discussion/note documents: use the document title or a placeholder.
_ => return Ok(None), // Skip non-entity documents in results.
};
Ok(Some(HydratedDoc {
source_type,
iid,
title,
url,
label_names,
project_path,
}))
}
// ---------------------------------------------------------------------------
// Human output
// ---------------------------------------------------------------------------
pub fn print_related(response: &RelatedResponse) {
println!();
match &response.source.source_type.as_str() {
&"query" => {
println!(
"{}",
Theme::bold().render(&format!(
"Related to: \"{}\"",
response.query.as_deref().unwrap_or("")
))
);
}
_ => {
let entity_label = if response.source.source_type == "issue" {
format!("#{}", response.source.iid.unwrap_or(0))
} else {
format!("!{}", response.source.iid.unwrap_or(0))
};
println!(
"{}",
Theme::bold().render(&format!(
"Related to {} {} {}",
response.source.source_type,
entity_label,
response
.source
.title
.as_deref()
.map(|t| format!("\"{}\"", t))
.unwrap_or_default()
))
);
}
}
if response.results.is_empty() {
println!(
"\n {} {}",
Icons::info(),
Theme::dim().render("No related entities found.")
);
println!();
return;
}
println!();
for (i, r) in response.results.iter().enumerate() {
let icon = if r.source_type == "issue" {
Icons::issue_opened()
} else {
Icons::mr_opened()
};
let prefix = if r.source_type == "issue" { "#" } else { "!" };
let score_pct = (r.similarity_score * 100.0) as u8;
let score_str = format!("{score_pct}%");
let labels_str = if r.shared_labels.is_empty() {
String::new()
} else {
format!(" [{}]", r.shared_labels.join(", "))
};
let project_str = r
.project_path
.as_deref()
.map(|p| format!(" ({})", p))
.unwrap_or_default();
println!(
" {:>2}. {} {}{:<5} {} {}{}{}",
i + 1,
icon,
prefix,
r.iid,
Theme::accent().render(&score_str),
r.title,
Theme::dim().render(&labels_str),
Theme::dim().render(&project_str),
);
}
println!();
}
// ---------------------------------------------------------------------------
// Robot (JSON) output
// ---------------------------------------------------------------------------
pub fn print_related_json(response: &RelatedResponse, elapsed_ms: u64) {
let output = serde_json::json!({
"ok": true,
"data": {
"source": response.source,
"query": response.query,
"mode": response.mode,
"results": response.results,
},
"meta": {
"elapsed_ms": elapsed_ms,
"mode": response.mode,
"embedding_dims": 768,
"distance_metric": "l2",
}
});
println!("{}", serde_json::to_string(&output).unwrap_or_default());
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_distance_to_similarity_identical() {
let sim = distance_to_similarity(0.0);
assert!(
(sim - 1.0).abs() < f64::EPSILON,
"distance 0 should give similarity 1.0"
);
}
#[test]
fn test_distance_to_similarity_one() {
let sim = distance_to_similarity(1.0);
assert!(
(sim - 0.5).abs() < f64::EPSILON,
"distance 1 should give similarity 0.5"
);
}
#[test]
fn test_distance_to_similarity_large() {
let sim = distance_to_similarity(100.0);
assert!(
sim > 0.0 && sim < 0.02,
"large distance should give near-zero similarity"
);
}
#[test]
fn test_distance_to_similarity_range() {
for d in [0.0, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 100.0] {
let sim = distance_to_similarity(d);
assert!(
(0.0..=1.0).contains(&sim),
"similarity {sim} out of [0, 1] range for distance {d}"
);
}
}
#[test]
fn test_distance_to_similarity_monotonic() {
let distances = [0.0, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0];
for w in distances.windows(2) {
let s1 = distance_to_similarity(w[0]);
let s2 = distance_to_similarity(w[1]);
assert!(
s1 >= s2,
"similarity should decrease with distance: d={} s={} vs d={} s={}",
w[0],
s1,
w[1],
s2
);
}
}
#[test]
fn test_parse_label_names_valid_json() {
let json = Some(r#"["bug","frontend","urgent"]"#.to_string());
let labels = parse_label_names(&json);
assert_eq!(labels.len(), 3);
assert!(labels.contains("bug"));
assert!(labels.contains("frontend"));
assert!(labels.contains("urgent"));
}
#[test]
fn test_parse_label_names_null() {
let labels = parse_label_names(&None);
assert!(labels.is_empty());
}
#[test]
fn test_parse_label_names_invalid_json() {
let json = Some("not valid json".to_string());
let labels = parse_label_names(&json);
assert!(labels.is_empty());
}
#[test]
fn test_parse_label_names_empty_array() {
let json = Some("[]".to_string());
let labels = parse_label_names(&json);
assert!(labels.is_empty());
}
#[test]
fn test_shared_labels_intersection() {
let a = Some(r#"["bug","frontend","urgent"]"#.to_string());
let b = Some(r#"["bug","backend","urgent","perf"]"#.to_string());
let labels_a = parse_label_names(&a);
let labels_b = parse_label_names(&b);
let shared: HashSet<String> = labels_a.intersection(&labels_b).cloned().collect();
assert_eq!(shared.len(), 2);
assert!(shared.contains("bug"));
assert!(shared.contains("urgent"));
}
#[test]
fn test_shared_labels_no_overlap() {
let a = Some(r#"["bug"]"#.to_string());
let b = Some(r#"["feature"]"#.to_string());
let labels_a = parse_label_names(&a);
let labels_b = parse_label_names(&b);
let shared: HashSet<String> = labels_a.intersection(&labels_b).cloned().collect();
assert!(shared.is_empty());
}
}

View File

@@ -26,6 +26,35 @@ pub struct SyncOptions {
pub no_events: bool, pub no_events: bool,
pub robot_mode: bool, pub robot_mode: bool,
pub dry_run: bool, pub dry_run: bool,
pub issue_iids: Vec<u64>,
pub mr_iids: Vec<u64>,
pub project: Option<String>,
pub preflight_only: bool,
}
impl SyncOptions {
pub const MAX_SURGICAL_TARGETS: usize = 100;
pub fn is_surgical(&self) -> bool {
!self.issue_iids.is_empty() || !self.mr_iids.is_empty()
}
}
#[derive(Debug, Default, Serialize)]
pub struct SurgicalIids {
pub issues: Vec<u64>,
pub merge_requests: Vec<u64>,
}
#[derive(Debug, Serialize)]
pub struct EntitySyncResult {
pub entity_type: String,
pub iid: u64,
pub outcome: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub toctou_reason: Option<String>,
} }
#[derive(Debug, Default, Serialize)] #[derive(Debug, Default, Serialize)]
@@ -49,6 +78,14 @@ pub struct SyncResult {
pub issue_projects: Vec<ProjectSummary>, pub issue_projects: Vec<ProjectSummary>,
#[serde(skip)] #[serde(skip)]
pub mr_projects: Vec<ProjectSummary>, pub mr_projects: Vec<ProjectSummary>,
#[serde(skip_serializing_if = "Option::is_none")]
pub surgical_mode: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub surgical_iids: Option<SurgicalIids>,
#[serde(skip_serializing_if = "Option::is_none")]
pub entity_results: Option<Vec<EntitySyncResult>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub preflight_only: Option<bool>,
} }
/// Apply semantic color to a stage-completion icon glyph. /// Apply semantic color to a stage-completion icon glyph.
@@ -66,6 +103,11 @@ pub async fn run_sync(
run_id: Option<&str>, run_id: Option<&str>,
signal: &ShutdownSignal, signal: &ShutdownSignal,
) -> Result<SyncResult> { ) -> Result<SyncResult> {
// Surgical dispatch: if any IIDs specified, route to the surgical pipeline.
if options.is_surgical() {
return super::sync_surgical::run_sync_surgical(config, options, run_id, signal).await;
}
let generated_id; let generated_id;
let run_id = match run_id { let run_id = match run_id {
Some(id) => id, Some(id) => id,
@@ -1029,4 +1071,93 @@ mod tests {
assert!(rows[0].contains("0 statuses updated")); assert!(rows[0].contains("0 statuses updated"));
assert!(rows[0].contains("skipped (disabled)")); assert!(rows[0].contains("skipped (disabled)"));
} }
#[test]
fn sync_result_default_omits_surgical_fields() {
let result = SyncResult::default();
let json = serde_json::to_value(&result).unwrap();
assert!(json.get("surgical_mode").is_none());
assert!(json.get("surgical_iids").is_none());
assert!(json.get("entity_results").is_none());
assert!(json.get("preflight_only").is_none());
}
#[test]
fn sync_result_with_surgical_fields_serializes_correctly() {
let result = SyncResult {
surgical_mode: Some(true),
surgical_iids: Some(SurgicalIids {
issues: vec![7, 42],
merge_requests: vec![10],
}),
entity_results: Some(vec![
EntitySyncResult {
entity_type: "issue".to_string(),
iid: 7,
outcome: "synced".to_string(),
error: None,
toctou_reason: None,
},
EntitySyncResult {
entity_type: "issue".to_string(),
iid: 42,
outcome: "skipped_toctou".to_string(),
error: None,
toctou_reason: Some("updated_at changed".to_string()),
},
]),
preflight_only: Some(false),
..SyncResult::default()
};
let json = serde_json::to_value(&result).unwrap();
assert_eq!(json["surgical_mode"], true);
assert_eq!(json["surgical_iids"]["issues"], serde_json::json!([7, 42]));
assert_eq!(json["entity_results"].as_array().unwrap().len(), 2);
assert_eq!(json["entity_results"][1]["outcome"], "skipped_toctou");
assert_eq!(json["preflight_only"], false);
}
#[test]
fn entity_sync_result_omits_none_fields() {
let entity = EntitySyncResult {
entity_type: "merge_request".to_string(),
iid: 10,
outcome: "synced".to_string(),
error: None,
toctou_reason: None,
};
let json = serde_json::to_value(&entity).unwrap();
assert!(json.get("error").is_none());
assert!(json.get("toctou_reason").is_none());
assert!(json.get("entity_type").is_some());
}
#[test]
fn is_surgical_with_issues() {
let opts = SyncOptions {
issue_iids: vec![1],
..SyncOptions::default()
};
assert!(opts.is_surgical());
}
#[test]
fn is_surgical_with_mrs() {
let opts = SyncOptions {
mr_iids: vec![10],
..SyncOptions::default()
};
assert!(opts.is_surgical());
}
#[test]
fn is_surgical_empty() {
let opts = SyncOptions::default();
assert!(!opts.is_surgical());
}
#[test]
fn max_surgical_targets_is_100() {
assert_eq!(SyncOptions::MAX_SURGICAL_TARGETS, 100);
}
} }

View File

@@ -0,0 +1,462 @@
//! Surgical (by-IID) sync orchestration.
//!
//! Coordinates the full pipeline for syncing specific issues/MRs by IID:
//! resolve project → preflight fetch → ingest with TOCTOU → enrichment →
//! scoped doc regeneration → embedding.
use std::time::Instant;
use tracing::{debug, warn};
use crate::Config;
use crate::cli::commands::embed::run_embed;
use crate::core::db::create_connection;
use crate::core::error::{LoreError, Result};
use crate::core::lock::{AppLock, LockOptions};
use crate::core::metrics::StageTiming;
use crate::core::paths::get_db_path;
use crate::core::project::resolve_project;
use crate::core::shutdown::ShutdownSignal;
use crate::core::sync_run::SyncRunRecorder;
use crate::documents::{SourceType, regenerate_documents_for_sources};
use crate::gitlab::GitLabClient;
use crate::ingestion::surgical::{
SurgicalTarget, enrich_entity_resource_events, enrich_mr_closes_issues, enrich_mr_file_changes,
ingest_issue_by_iid, ingest_mr_by_iid, preflight_fetch,
};
use super::sync::{EntitySyncResult, SurgicalIids, SyncOptions, SyncResult};
fn timing(name: &str, elapsed_ms: u64, items: usize, errors: usize) -> StageTiming {
StageTiming {
name: name.to_string(),
project: None,
elapsed_ms,
items_processed: items,
items_skipped: 0,
errors,
rate_limit_hits: 0,
retries: 0,
sub_stages: vec![],
}
}
/// Run the surgical sync pipeline for specific IIDs within a single project.
///
/// Unlike [`super::sync::run_sync`], this targets specific issues/MRs by IID
/// rather than paginating all entities across all projects.
pub async fn run_sync_surgical(
config: &Config,
options: SyncOptions,
run_id: Option<&str>,
signal: &ShutdownSignal,
) -> Result<SyncResult> {
// ── Validate inputs ──
if !options.is_surgical() {
return Ok(SyncResult::default());
}
let project_str = options.project.as_deref().ok_or_else(|| {
LoreError::Other("Surgical sync requires --project (-p) to identify the target".into())
})?;
// ── Run ID ──
let generated_id;
let run_id = match run_id {
Some(id) => id,
None => {
generated_id = uuid::Uuid::new_v4().simple().to_string();
&generated_id[..8]
}
};
// ── DB connections ──
let db_path = get_db_path(config.storage.db_path.as_deref());
let conn = create_connection(&db_path)?;
let recorder_conn = create_connection(&db_path)?;
let lock_conn = create_connection(&db_path)?;
// ── Resolve project ──
let project_id = resolve_project(&conn, project_str)?;
let (gitlab_project_id, project_path): (i64, String) = conn.query_row(
"SELECT gitlab_project_id, path_with_namespace FROM projects WHERE id = ?1",
[project_id],
|row| Ok((row.get(0)?, row.get(1)?)),
)?;
// ── Build surgical targets ──
let mut targets = Vec::new();
for &iid in &options.issue_iids {
targets.push(SurgicalTarget::Issue { iid });
}
for &iid in &options.mr_iids {
targets.push(SurgicalTarget::MergeRequest { iid });
}
// ── Prepare result ──
let mut result = SyncResult {
run_id: run_id.to_string(),
surgical_mode: Some(true),
surgical_iids: Some(SurgicalIids {
issues: options.issue_iids.clone(),
merge_requests: options.mr_iids.clone(),
}),
..SyncResult::default()
};
let mut entity_results: Vec<EntitySyncResult> = Vec::new();
let mut stage_timings: Vec<StageTiming> = Vec::new();
// ── Start recorder ──
let recorder = SyncRunRecorder::start(&recorder_conn, "surgical-sync", run_id)?;
let iids_json = serde_json::to_string(&result.surgical_iids).unwrap_or_default();
recorder.set_surgical_metadata(&recorder_conn, "surgical", "preflight", &iids_json)?;
// ── GitLab client ──
let token =
std::env::var(&config.gitlab.token_env_var).map_err(|_| LoreError::TokenNotSet {
env_var: config.gitlab.token_env_var.clone(),
})?;
let client = GitLabClient::new(
&config.gitlab.base_url,
&token,
Some(config.sync.requests_per_second),
);
// ── Stage: Preflight fetch ──
let preflight_start = Instant::now();
debug!(%run_id, "Surgical sync: preflight fetch");
recorder.update_phase(&recorder_conn, "preflight")?;
let preflight = preflight_fetch(&client, gitlab_project_id, &project_path, &targets).await?;
for failure in &preflight.failures {
entity_results.push(EntitySyncResult {
entity_type: failure.target.entity_type().to_string(),
iid: failure.target.iid(),
outcome: "not_found".to_string(),
error: Some(failure.error.to_string()),
toctou_reason: None,
});
}
stage_timings.push(timing(
"preflight",
preflight_start.elapsed().as_millis() as u64,
preflight.issues.len() + preflight.merge_requests.len(),
preflight.failures.len(),
));
// ── Preflight-only mode ──
if options.preflight_only {
result.preflight_only = Some(true);
result.entity_results = Some(entity_results);
recorder.succeed(&recorder_conn, &stage_timings, 0, preflight.failures.len())?;
return Ok(result);
}
// ── Cancellation check ──
if signal.is_cancelled() {
result.entity_results = Some(entity_results);
recorder.cancel(&recorder_conn, "Cancelled before ingest")?;
return Ok(result);
}
// ── Acquire lock ──
let mut lock = AppLock::new(
lock_conn,
LockOptions {
name: "sync".to_string(),
stale_lock_minutes: config.sync.stale_lock_minutes,
heartbeat_interval_seconds: config.sync.heartbeat_interval_seconds,
},
);
lock.acquire(options.force)?;
// ── Stage: Ingest ──
let ingest_start = Instant::now();
debug!(%run_id, "Surgical sync: ingesting entities");
recorder.update_phase(&recorder_conn, "ingest")?;
let mut dirty_sources: Vec<(SourceType, i64)> = Vec::new();
// Ingest issues
for issue in &preflight.issues {
match ingest_issue_by_iid(&conn, config, project_id, issue) {
Ok(ir) => {
if ir.skipped_stale {
entity_results.push(EntitySyncResult {
entity_type: "issue".to_string(),
iid: issue.iid as u64,
outcome: "skipped_stale".to_string(),
error: None,
toctou_reason: Some("DB has same or newer updated_at".to_string()),
});
recorder.record_entity_result(&recorder_conn, "issue", "skipped_stale")?;
} else {
dirty_sources.extend(ir.dirty_source_keys);
result.issues_updated += 1;
entity_results.push(EntitySyncResult {
entity_type: "issue".to_string(),
iid: issue.iid as u64,
outcome: "ingested".to_string(),
error: None,
toctou_reason: None,
});
recorder.record_entity_result(&recorder_conn, "issue", "ingested")?;
}
}
Err(e) => {
warn!(iid = issue.iid, error = %e, "Surgical issue ingest failed");
entity_results.push(EntitySyncResult {
entity_type: "issue".to_string(),
iid: issue.iid as u64,
outcome: "error".to_string(),
error: Some(e.to_string()),
toctou_reason: None,
});
}
}
}
// Ingest MRs
for mr in &preflight.merge_requests {
match ingest_mr_by_iid(&conn, config, project_id, mr) {
Ok(mr_result) => {
if mr_result.skipped_stale {
entity_results.push(EntitySyncResult {
entity_type: "merge_request".to_string(),
iid: mr.iid as u64,
outcome: "skipped_stale".to_string(),
error: None,
toctou_reason: Some("DB has same or newer updated_at".to_string()),
});
recorder.record_entity_result(&recorder_conn, "mr", "skipped_stale")?;
} else {
dirty_sources.extend(mr_result.dirty_source_keys);
result.mrs_updated += 1;
entity_results.push(EntitySyncResult {
entity_type: "merge_request".to_string(),
iid: mr.iid as u64,
outcome: "ingested".to_string(),
error: None,
toctou_reason: None,
});
recorder.record_entity_result(&recorder_conn, "mr", "ingested")?;
}
}
Err(e) => {
warn!(iid = mr.iid, error = %e, "Surgical MR ingest failed");
entity_results.push(EntitySyncResult {
entity_type: "merge_request".to_string(),
iid: mr.iid as u64,
outcome: "error".to_string(),
error: Some(e.to_string()),
toctou_reason: None,
});
}
}
}
stage_timings.push(timing(
"ingest",
ingest_start.elapsed().as_millis() as u64,
result.issues_updated + result.mrs_updated,
0,
));
// ── Stage: Enrichment ──
if signal.is_cancelled() {
result.entity_results = Some(entity_results);
lock.release();
recorder.cancel(&recorder_conn, "Cancelled before enrichment")?;
return Ok(result);
}
let enrich_start = Instant::now();
debug!(%run_id, "Surgical sync: enriching dependents");
recorder.update_phase(&recorder_conn, "enrichment")?;
// Enrich issues: resource events
if !options.no_events {
for issue in &preflight.issues {
let local_id = match conn.query_row(
"SELECT id FROM issues WHERE project_id = ? AND iid = ?",
(project_id, issue.iid),
|row| row.get::<_, i64>(0),
) {
Ok(id) => id,
Err(_) => continue,
};
if let Err(e) = enrich_entity_resource_events(
&client,
&conn,
project_id,
gitlab_project_id,
"issue",
issue.iid,
local_id,
)
.await
{
warn!(iid = issue.iid, error = %e, "Failed to enrich issue resource events");
result.resource_events_failed += 1;
} else {
result.resource_events_fetched += 1;
}
}
}
// Enrich MRs: resource events, closes_issues, file changes
for mr in &preflight.merge_requests {
let local_mr_id = match conn.query_row(
"SELECT id FROM merge_requests WHERE project_id = ? AND iid = ?",
(project_id, mr.iid),
|row| row.get::<_, i64>(0),
) {
Ok(id) => id,
Err(_) => continue,
};
if !options.no_events {
if let Err(e) = enrich_entity_resource_events(
&client,
&conn,
project_id,
gitlab_project_id,
"merge_request",
mr.iid,
local_mr_id,
)
.await
{
warn!(iid = mr.iid, error = %e, "Failed to enrich MR resource events");
result.resource_events_failed += 1;
} else {
result.resource_events_fetched += 1;
}
}
if let Err(e) = enrich_mr_closes_issues(
&client,
&conn,
project_id,
gitlab_project_id,
mr.iid,
local_mr_id,
)
.await
{
warn!(iid = mr.iid, error = %e, "Failed to enrich MR closes_issues");
}
if let Err(e) = enrich_mr_file_changes(
&client,
&conn,
project_id,
gitlab_project_id,
mr.iid,
local_mr_id,
)
.await
{
warn!(iid = mr.iid, error = %e, "Failed to enrich MR file changes");
result.mr_diffs_failed += 1;
} else {
result.mr_diffs_fetched += 1;
}
}
stage_timings.push(timing(
"enrichment",
enrich_start.elapsed().as_millis() as u64,
result.resource_events_fetched + result.mr_diffs_fetched,
result.resource_events_failed + result.mr_diffs_failed,
));
// ── Stage: Scoped doc regeneration ──
if !options.no_docs && !dirty_sources.is_empty() {
if signal.is_cancelled() {
result.entity_results = Some(entity_results);
lock.release();
recorder.cancel(&recorder_conn, "Cancelled before doc generation")?;
return Ok(result);
}
let docs_start = Instant::now();
debug!(%run_id, count = dirty_sources.len(), "Surgical sync: regenerating docs");
recorder.update_phase(&recorder_conn, "docs")?;
match regenerate_documents_for_sources(&conn, &dirty_sources) {
Ok(docs_result) => {
result.documents_regenerated = docs_result.regenerated;
result.documents_errored = docs_result.errored;
}
Err(e) => {
warn!(error = %e, "Surgical doc regeneration failed");
}
}
stage_timings.push(timing(
"docs",
docs_start.elapsed().as_millis() as u64,
result.documents_regenerated,
result.documents_errored,
));
}
// ── Stage: Embedding ──
if !options.no_embed {
if signal.is_cancelled() {
result.entity_results = Some(entity_results);
lock.release();
recorder.cancel(&recorder_conn, "Cancelled before embedding")?;
return Ok(result);
}
let embed_start = Instant::now();
debug!(%run_id, "Surgical sync: embedding");
recorder.update_phase(&recorder_conn, "embed")?;
match run_embed(config, false, false, None, signal).await {
Ok(embed_result) => {
result.documents_embedded = embed_result.docs_embedded;
result.embedding_failed = embed_result.failed;
}
Err(e) => {
// Embedding failure is non-fatal (Ollama may be unavailable)
warn!(error = %e, "Surgical embedding failed (non-fatal)");
}
}
stage_timings.push(timing(
"embed",
embed_start.elapsed().as_millis() as u64,
result.documents_embedded,
result.embedding_failed,
));
}
// ── Finalize ──
lock.release();
result.entity_results = Some(entity_results);
let total_items = result.issues_updated + result.mrs_updated;
let total_errors =
result.resource_events_failed + result.mr_diffs_failed + result.documents_errored;
recorder.succeed(&recorder_conn, &stage_timings, total_items, total_errors)?;
debug!(
%run_id,
issues = result.issues_updated,
mrs = result.mrs_updated,
docs = result.documents_regenerated,
"Surgical sync complete"
);
Ok(result)
}
#[cfg(test)]
#[path = "sync_surgical_tests.rs"]
mod tests;

View File

@@ -0,0 +1,323 @@
//! Tests for `sync_surgical.rs` — surgical sync orchestration.
use std::path::Path;
use wiremock::matchers::{method, path, path_regex};
use wiremock::{Mock, MockServer, ResponseTemplate};
use crate::cli::commands::sync::SyncOptions;
use crate::cli::commands::sync_surgical::run_sync_surgical;
use crate::core::config::{Config, GitLabConfig, ProjectConfig};
use crate::core::db::{create_connection, run_migrations};
use crate::core::shutdown::ShutdownSignal;
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
fn setup_temp_db() -> (tempfile::NamedTempFile, rusqlite::Connection) {
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = create_connection(tmp.path()).unwrap();
run_migrations(&conn).unwrap();
conn.execute(
"INSERT INTO projects (id, gitlab_project_id, path_with_namespace, web_url)
VALUES (1, 42, 'group/repo', 'https://gitlab.example.com/group/repo')",
[],
)
.unwrap();
(tmp, conn)
}
fn test_config(base_url: &str, db_path: &Path) -> Config {
Config {
gitlab: GitLabConfig {
base_url: base_url.to_string(),
token_env_var: "LORE_TEST_TOKEN".to_string(),
},
projects: vec![ProjectConfig {
path: "group/repo".to_string(),
}],
default_project: None,
sync: crate::core::config::SyncConfig {
requests_per_second: 1000.0,
stale_lock_minutes: 30,
heartbeat_interval_seconds: 10,
..Default::default()
},
storage: crate::core::config::StorageConfig {
db_path: Some(db_path.to_string_lossy().to_string()),
backup_dir: None,
compress_raw_payloads: false,
},
embedding: Default::default(),
logging: Default::default(),
scoring: Default::default(),
}
}
fn issue_json(iid: i64) -> serde_json::Value {
serde_json::json!({
"id": 1000 + iid,
"iid": iid,
"project_id": 42,
"title": format!("Test issue #{iid}"),
"description": "desc",
"state": "opened",
"created_at": "2026-02-17T10:00:00.000+00:00",
"updated_at": "2026-02-17T12:00:00.000+00:00",
"closed_at": null,
"author": { "id": 1, "username": "alice", "name": "Alice" },
"assignees": [],
"labels": ["bug"],
"milestone": null,
"due_date": null,
"web_url": format!("https://gitlab.example.com/group/repo/-/issues/{iid}")
})
}
#[allow(dead_code)] // Used by MR integration tests added later
fn mr_json(iid: i64) -> serde_json::Value {
serde_json::json!({
"id": 2000 + iid,
"iid": iid,
"project_id": 42,
"title": format!("Test MR !{iid}"),
"description": "desc",
"state": "opened",
"draft": false,
"work_in_progress": false,
"source_branch": "feat",
"target_branch": "main",
"sha": "abc123",
"references": { "short": format!("!{iid}"), "full": format!("group/repo!{iid}") },
"detailed_merge_status": "mergeable",
"created_at": "2026-02-17T10:00:00.000+00:00",
"updated_at": "2026-02-17T12:00:00.000+00:00",
"merged_at": null,
"closed_at": null,
"author": { "id": 2, "username": "bob", "name": "Bob" },
"merge_user": null,
"merged_by": null,
"labels": [],
"assignees": [],
"reviewers": [],
"web_url": format!("https://gitlab.example.com/group/repo/-/merge_requests/{iid}"),
"merge_commit_sha": null,
"squash_commit_sha": null
})
}
/// Mount all enrichment endpoint mocks (resource events, closes_issues, diffs) as empty.
async fn mount_empty_enrichment_mocks(server: &MockServer) {
// Resource events for issues
Mock::given(method("GET"))
.and(path_regex(
r"/api/v4/projects/\d+/issues/\d+/resource_state_events",
))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([])))
.mount(server)
.await;
Mock::given(method("GET"))
.and(path_regex(
r"/api/v4/projects/\d+/issues/\d+/resource_label_events",
))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([])))
.mount(server)
.await;
Mock::given(method("GET"))
.and(path_regex(
r"/api/v4/projects/\d+/issues/\d+/resource_milestone_events",
))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([])))
.mount(server)
.await;
// Resource events for MRs
Mock::given(method("GET"))
.and(path_regex(
r"/api/v4/projects/\d+/merge_requests/\d+/resource_state_events",
))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([])))
.mount(server)
.await;
Mock::given(method("GET"))
.and(path_regex(
r"/api/v4/projects/\d+/merge_requests/\d+/resource_label_events",
))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([])))
.mount(server)
.await;
Mock::given(method("GET"))
.and(path_regex(
r"/api/v4/projects/\d+/merge_requests/\d+/resource_milestone_events",
))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([])))
.mount(server)
.await;
// Closes issues
Mock::given(method("GET"))
.and(path_regex(
r"/api/v4/projects/\d+/merge_requests/\d+/closes_issues",
))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([])))
.mount(server)
.await;
// Diffs
Mock::given(method("GET"))
.and(path_regex(r"/api/v4/projects/\d+/merge_requests/\d+/diffs"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([])))
.mount(server)
.await;
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[tokio::test]
async fn ingest_one_issue_updates_result() {
let server = MockServer::start().await;
let (tmp, _conn) = setup_temp_db();
// Set token env var
// SAFETY: Tests are single-threaded within each test function.
unsafe { std::env::set_var("LORE_TEST_TOKEN", "test-token") };
// Mock preflight issue fetch
Mock::given(method("GET"))
.and(path("/api/v4/projects/42/issues/7"))
.respond_with(ResponseTemplate::new(200).set_body_json(issue_json(7)))
.mount(&server)
.await;
mount_empty_enrichment_mocks(&server).await;
let config = test_config(&server.uri(), tmp.path());
let options = SyncOptions {
robot_mode: true,
issue_iids: vec![7],
project: Some("group/repo".to_string()),
no_embed: true, // skip embed (no Ollama in tests)
..SyncOptions::default()
};
let signal = ShutdownSignal::new();
let result = run_sync_surgical(&config, options, Some("test01"), &signal)
.await
.unwrap();
assert_eq!(result.surgical_mode, Some(true));
assert_eq!(result.issues_updated, 1);
assert!(result.entity_results.is_some());
let entities = result.entity_results.unwrap();
assert_eq!(entities.len(), 1);
assert_eq!(entities[0].outcome, "ingested");
}
#[tokio::test]
async fn preflight_only_returns_early() {
let server = MockServer::start().await;
let (tmp, _conn) = setup_temp_db();
// SAFETY: Tests are single-threaded within each test function.
unsafe { std::env::set_var("LORE_TEST_TOKEN", "test-token") };
Mock::given(method("GET"))
.and(path("/api/v4/projects/42/issues/7"))
.respond_with(ResponseTemplate::new(200).set_body_json(issue_json(7)))
.mount(&server)
.await;
let config = test_config(&server.uri(), tmp.path());
let options = SyncOptions {
robot_mode: true,
issue_iids: vec![7],
project: Some("group/repo".to_string()),
preflight_only: true,
..SyncOptions::default()
};
let signal = ShutdownSignal::new();
let result = run_sync_surgical(&config, options, Some("test02"), &signal)
.await
.unwrap();
assert_eq!(result.preflight_only, Some(true));
assert_eq!(result.issues_updated, 0); // No actual ingest
}
#[tokio::test]
async fn cancellation_before_ingest_cancels_recorder() {
let server = MockServer::start().await;
let (tmp, _conn) = setup_temp_db();
// SAFETY: Tests are single-threaded within each test function.
unsafe { std::env::set_var("LORE_TEST_TOKEN", "test-token") };
Mock::given(method("GET"))
.and(path("/api/v4/projects/42/issues/7"))
.respond_with(ResponseTemplate::new(200).set_body_json(issue_json(7)))
.mount(&server)
.await;
let config = test_config(&server.uri(), tmp.path());
let options = SyncOptions {
robot_mode: true,
issue_iids: vec![7],
project: Some("group/repo".to_string()),
..SyncOptions::default()
};
let signal = ShutdownSignal::new();
signal.cancel(); // Cancel before we start
let result = run_sync_surgical(&config, options, Some("test03"), &signal)
.await
.unwrap();
assert_eq!(result.issues_updated, 0);
}
fn dummy_config() -> Config {
Config {
gitlab: GitLabConfig {
base_url: "https://unused.example.com".to_string(),
token_env_var: "LORE_TEST_TOKEN".to_string(),
},
projects: vec![],
default_project: None,
sync: Default::default(),
storage: Default::default(),
embedding: Default::default(),
logging: Default::default(),
scoring: Default::default(),
}
}
#[tokio::test]
async fn missing_project_returns_error() {
let options = SyncOptions {
issue_iids: vec![7],
project: None, // Missing!
..SyncOptions::default()
};
let config = dummy_config();
let signal = ShutdownSignal::new();
let err = run_sync_surgical(&config, options, Some("test04"), &signal)
.await
.unwrap_err();
assert!(err.to_string().contains("--project"));
}
#[tokio::test]
async fn empty_iids_returns_default_result() {
let config = dummy_config();
let options = SyncOptions::default(); // No IIDs
let signal = ShutdownSignal::new();
let result = run_sync_surgical(&config, options, None, &signal)
.await
.unwrap();
assert_eq!(result.issues_updated, 0);
assert_eq!(result.mrs_updated, 0);
assert!(result.surgical_mode.is_none()); // Not surgical mode
}

View File

@@ -246,6 +246,10 @@ pub enum Commands {
/// Launch the interactive TUI dashboard /// Launch the interactive TUI dashboard
Tui(TuiArgs), Tui(TuiArgs),
/// Find semantically related entities via vector similarity
#[command(visible_alias = "similar")]
Related(RelatedArgs),
/// Detect discussion divergence from original intent /// Detect discussion divergence from original intent
Drift { Drift {
/// Entity type (currently only "issues" supported) /// Entity type (currently only "issues" supported)
@@ -814,6 +818,22 @@ pub struct SyncArgs {
/// Show sync progress in interactive TUI /// Show sync progress in interactive TUI
#[arg(long)] #[arg(long)]
pub tui: bool, pub tui: bool,
/// Surgically sync specific issues by IID (repeatable)
#[arg(long, value_parser = clap::value_parser!(u64).range(1..))]
pub issue: Vec<u64>,
/// Surgically sync specific merge requests by IID (repeatable)
#[arg(long, value_parser = clap::value_parser!(u64).range(1..))]
pub mr: Vec<u64>,
/// Scope to a single project (required for surgical sync if no defaultProject)
#[arg(short = 'p', long)]
pub project: Option<String>,
/// Run preflight validation only (no DB writes). Requires --issue or --mr.
#[arg(long)]
pub preflight_only: bool,
} }
#[derive(Parser)] #[derive(Parser)]
@@ -1054,6 +1074,32 @@ pub struct TraceArgs {
pub limit: usize, pub limit: usize,
} }
#[derive(Parser)]
#[command(after_help = "\x1b[1mExamples:\x1b[0m
lore related issues 42 # Find issues similar to #42
lore related mrs 99 -p group/repo # MRs similar to !99
lore related 'authentication timeout' # Concept search")]
pub struct RelatedArgs {
/// Entity type ('issues' or 'mrs') OR free-text query
pub query_or_type: String,
/// Entity IID (when first arg is entity type)
pub iid: Option<i64>,
/// Maximum results
#[arg(
short = 'n',
long = "limit",
default_value = "10",
help_heading = "Output"
)]
pub limit: usize,
/// Scope to project (fuzzy match)
#[arg(short = 'p', long, help_heading = "Filters")]
pub project: Option<String>,
}
#[derive(Parser)] #[derive(Parser)]
pub struct CountArgs { pub struct CountArgs {
/// Entity type to count (issues, mrs, discussions, notes, events) /// Entity type to count (issues, mrs, discussions, notes, events)

View File

@@ -93,6 +93,10 @@ const MIGRATIONS: &[(&str, &str)] = &[
"027", "027",
include_str!("../../migrations/027_tui_list_indexes.sql"), include_str!("../../migrations/027_tui_list_indexes.sql"),
), ),
(
"028",
include_str!("../../migrations/028_surgical_sync_runs.sql"),
),
]; ];
pub fn create_connection(db_path: &Path) -> Result<Connection> { pub fn create_connection(db_path: &Path) -> Result<Connection> {

View File

@@ -21,6 +21,7 @@ pub enum ErrorCode {
EmbeddingFailed, EmbeddingFailed,
NotFound, NotFound,
Ambiguous, Ambiguous,
SurgicalPreflightFailed,
} }
impl std::fmt::Display for ErrorCode { impl std::fmt::Display for ErrorCode {
@@ -44,6 +45,7 @@ impl std::fmt::Display for ErrorCode {
Self::EmbeddingFailed => "EMBEDDING_FAILED", Self::EmbeddingFailed => "EMBEDDING_FAILED",
Self::NotFound => "NOT_FOUND", Self::NotFound => "NOT_FOUND",
Self::Ambiguous => "AMBIGUOUS", Self::Ambiguous => "AMBIGUOUS",
Self::SurgicalPreflightFailed => "SURGICAL_PREFLIGHT_FAILED",
}; };
write!(f, "{code}") write!(f, "{code}")
} }
@@ -70,6 +72,7 @@ impl ErrorCode {
Self::EmbeddingFailed => 16, Self::EmbeddingFailed => 16,
Self::NotFound => 17, Self::NotFound => 17,
Self::Ambiguous => 18, Self::Ambiguous => 18,
Self::SurgicalPreflightFailed => 6,
} }
} }
} }
@@ -153,6 +156,14 @@ pub enum LoreError {
#[error("No embeddings found. Run: lore embed")] #[error("No embeddings found. Run: lore embed")]
EmbeddingsNotBuilt, EmbeddingsNotBuilt,
#[error("Surgical preflight failed for {entity_type} !{iid} in {project}: {reason}")]
SurgicalPreflightFailed {
entity_type: String,
iid: u64,
project: String,
reason: String,
},
} }
impl LoreError { impl LoreError {
@@ -179,6 +190,7 @@ impl LoreError {
Self::OllamaModelNotFound { .. } => ErrorCode::OllamaModelNotFound, Self::OllamaModelNotFound { .. } => ErrorCode::OllamaModelNotFound,
Self::EmbeddingFailed { .. } => ErrorCode::EmbeddingFailed, Self::EmbeddingFailed { .. } => ErrorCode::EmbeddingFailed,
Self::EmbeddingsNotBuilt => ErrorCode::EmbeddingFailed, Self::EmbeddingsNotBuilt => ErrorCode::EmbeddingFailed,
Self::SurgicalPreflightFailed { .. } => ErrorCode::SurgicalPreflightFailed,
} }
} }
@@ -227,6 +239,9 @@ impl LoreError {
Some("Check Ollama logs or retry with 'lore embed --retry-failed'") Some("Check Ollama logs or retry with 'lore embed --retry-failed'")
} }
Self::EmbeddingsNotBuilt => Some("Generate embeddings first: lore embed"), Self::EmbeddingsNotBuilt => Some("Generate embeddings first: lore embed"),
Self::SurgicalPreflightFailed { .. } => Some(
"Verify the IID exists and you have access to the project.\n\n Example:\n lore issues -p <project>\n lore mrs -p <project>",
),
Self::Json(_) | Self::Io(_) | Self::Transform(_) | Self::Other(_) => None, Self::Json(_) | Self::Io(_) | Self::Transform(_) | Self::Other(_) => None,
} }
} }
@@ -254,6 +269,9 @@ impl LoreError {
Self::EmbeddingFailed { .. } => vec!["lore embed --retry-failed"], Self::EmbeddingFailed { .. } => vec!["lore embed --retry-failed"],
Self::MigrationFailed { .. } => vec!["lore migrate"], Self::MigrationFailed { .. } => vec!["lore migrate"],
Self::GitLabNetworkError { .. } => vec!["lore doctor"], Self::GitLabNetworkError { .. } => vec!["lore doctor"],
Self::SurgicalPreflightFailed { .. } => {
vec!["lore issues -p <project>", "lore mrs -p <project>"]
}
_ => vec![], _ => vec![],
} }
} }
@@ -293,3 +311,72 @@ impl From<&LoreError> for RobotErrorOutput {
} }
pub type Result<T> = std::result::Result<T, LoreError>; pub type Result<T> = std::result::Result<T, LoreError>;
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn surgical_preflight_failed_display() {
let err = LoreError::SurgicalPreflightFailed {
entity_type: "issue".to_string(),
iid: 42,
project: "group/repo".to_string(),
reason: "not found on GitLab".to_string(),
};
let msg = err.to_string();
assert!(msg.contains("issue"), "missing entity_type: {msg}");
assert!(msg.contains("42"), "missing iid: {msg}");
assert!(msg.contains("group/repo"), "missing project: {msg}");
assert!(msg.contains("not found on GitLab"), "missing reason: {msg}");
}
#[test]
fn surgical_preflight_failed_error_code() {
let code = ErrorCode::SurgicalPreflightFailed;
assert_eq!(code.exit_code(), 6);
}
#[test]
fn surgical_preflight_failed_code_mapping() {
let err = LoreError::SurgicalPreflightFailed {
entity_type: "merge_request".to_string(),
iid: 99,
project: "ns/proj".to_string(),
reason: "404".to_string(),
};
assert_eq!(err.code(), ErrorCode::SurgicalPreflightFailed);
}
#[test]
fn surgical_preflight_failed_has_suggestion() {
let err = LoreError::SurgicalPreflightFailed {
entity_type: "issue".to_string(),
iid: 7,
project: "g/p".to_string(),
reason: "not found".to_string(),
};
assert!(err.suggestion().is_some());
}
#[test]
fn surgical_preflight_failed_has_actions() {
let err = LoreError::SurgicalPreflightFailed {
entity_type: "issue".to_string(),
iid: 7,
project: "g/p".to_string(),
reason: "not found".to_string(),
};
assert!(!err.actions().is_empty());
}
#[test]
fn surgical_preflight_failed_display_code_string() {
let code = ErrorCode::SurgicalPreflightFailed;
assert_eq!(code.to_string(), "SURGICAL_PREFLIGHT_FAILED");
}
}

View File

@@ -20,6 +20,67 @@ impl SyncRunRecorder {
Ok(Self { row_id }) Ok(Self { row_id })
} }
/// Returns the database row ID for this sync run.
pub fn row_id(&self) -> i64 {
self.row_id
}
/// Set surgical-specific metadata after `start()`.
///
/// Takes `&self` so the recorder can continue to be used for phase
/// updates and entity result recording before finalization.
pub fn set_surgical_metadata(
&self,
conn: &Connection,
mode: &str,
phase: &str,
iids_json: &str,
) -> Result<()> {
conn.execute(
"UPDATE sync_runs SET mode = ?1, phase = ?2, surgical_iids_json = ?3
WHERE id = ?4",
rusqlite::params![mode, phase, iids_json, self.row_id],
)?;
Ok(())
}
/// Update the pipeline phase and refresh the heartbeat timestamp.
pub fn update_phase(&self, conn: &Connection, phase: &str) -> Result<()> {
conn.execute(
"UPDATE sync_runs SET phase = ?1, heartbeat_at = ?2 WHERE id = ?3",
rusqlite::params![phase, now_ms(), self.row_id],
)?;
Ok(())
}
/// Increment a surgical counter column for the given entity type and stage.
///
/// Unknown `(entity_type, stage)` combinations are silently ignored.
/// Column names are derived from a hardcoded match — no SQL injection risk.
pub fn record_entity_result(
&self,
conn: &Connection,
entity_type: &str,
stage: &str,
) -> Result<()> {
let column = match (entity_type, stage) {
("issue", "fetched") => "issues_fetched",
("issue", "ingested") => "issues_ingested",
("mr", "fetched") => "mrs_fetched",
("mr", "ingested") => "mrs_ingested",
("issue" | "mr", "skipped_stale") => "skipped_stale",
("doc", "regenerated") => "docs_regenerated",
("doc", "embedded") => "docs_embedded",
(_, "warning") => "warnings_count",
_ => return Ok(()),
};
conn.execute(
&format!("UPDATE sync_runs SET {column} = {column} + 1 WHERE id = ?1"),
rusqlite::params![self.row_id],
)?;
Ok(())
}
pub fn succeed( pub fn succeed(
self, self,
conn: &Connection, conn: &Connection,
@@ -63,6 +124,18 @@ impl SyncRunRecorder {
)?; )?;
Ok(()) Ok(())
} }
/// Finalize the run as cancelled. Consumes self to prevent further use.
pub fn cancel(self, conn: &Connection, reason: &str) -> Result<()> {
let now = now_ms();
conn.execute(
"UPDATE sync_runs SET finished_at = ?1, cancelled_at = ?2,
status = 'cancelled', error = ?3
WHERE id = ?4",
rusqlite::params![now, now, reason, self.row_id],
)?;
Ok(())
}
} }
#[cfg(test)] #[cfg(test)]

View File

@@ -146,3 +146,247 @@ fn test_sync_run_recorder_fail_with_partial_metrics() {
assert_eq!(parsed.len(), 1); assert_eq!(parsed.len(), 1);
assert_eq!(parsed[0].name, "ingest_issues"); assert_eq!(parsed[0].name, "ingest_issues");
} }
// ---------------------------------------------------------------------------
// Migration 028: Surgical sync columns
// ---------------------------------------------------------------------------
#[test]
fn sync_run_surgical_columns_exist() {
let conn = setup_test_db();
conn.execute(
"INSERT INTO sync_runs (started_at, heartbeat_at, status, command, mode, phase, surgical_iids_json)
VALUES (1000, 1000, 'running', 'sync', 'surgical', 'preflight', '{\"issues\":[7],\"mrs\":[101]}')",
[],
)
.unwrap();
let (mode, phase, iids_json): (String, String, String) = conn
.query_row(
"SELECT mode, phase, surgical_iids_json FROM sync_runs WHERE mode = 'surgical'",
[],
|r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
)
.unwrap();
assert_eq!(mode, "surgical");
assert_eq!(phase, "preflight");
assert!(iids_json.contains("7"));
}
#[test]
fn sync_run_counter_defaults_are_zero() {
let conn = setup_test_db();
conn.execute(
"INSERT INTO sync_runs (started_at, heartbeat_at, status, command)
VALUES (2000, 2000, 'running', 'sync')",
[],
)
.unwrap();
let row_id = conn.last_insert_rowid();
let (issues_fetched, mrs_fetched, docs_regenerated, warnings_count): (i64, i64, i64, i64) =
conn.query_row(
"SELECT issues_fetched, mrs_fetched, docs_regenerated, warnings_count FROM sync_runs WHERE id = ?1",
[row_id],
|r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
)
.unwrap();
assert_eq!(issues_fetched, 0);
assert_eq!(mrs_fetched, 0);
assert_eq!(docs_regenerated, 0);
assert_eq!(warnings_count, 0);
}
#[test]
fn sync_run_nullable_columns_default_to_null() {
let conn = setup_test_db();
conn.execute(
"INSERT INTO sync_runs (started_at, heartbeat_at, status, command)
VALUES (3000, 3000, 'running', 'sync')",
[],
)
.unwrap();
let row_id = conn.last_insert_rowid();
let (mode, phase, cancelled_at): (Option<String>, Option<String>, Option<i64>) = conn
.query_row(
"SELECT mode, phase, cancelled_at FROM sync_runs WHERE id = ?1",
[row_id],
|r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
)
.unwrap();
assert!(mode.is_none());
assert!(phase.is_none());
assert!(cancelled_at.is_none());
}
#[test]
fn sync_run_counter_round_trip() {
let conn = setup_test_db();
conn.execute(
"INSERT INTO sync_runs (started_at, heartbeat_at, status, command, mode, issues_fetched, mrs_ingested, docs_embedded)
VALUES (4000, 4000, 'succeeded', 'sync', 'surgical', 3, 2, 5)",
[],
)
.unwrap();
let row_id = conn.last_insert_rowid();
let (issues_fetched, mrs_ingested, docs_embedded): (i64, i64, i64) = conn
.query_row(
"SELECT issues_fetched, mrs_ingested, docs_embedded FROM sync_runs WHERE id = ?1",
[row_id],
|r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
)
.unwrap();
assert_eq!(issues_fetched, 3);
assert_eq!(mrs_ingested, 2);
assert_eq!(docs_embedded, 5);
}
// ---------------------------------------------------------------------------
// bd-arka: SyncRunRecorder surgical lifecycle methods
// ---------------------------------------------------------------------------
#[test]
fn surgical_lifecycle_start_metadata_succeed() {
let conn = setup_test_db();
let recorder = SyncRunRecorder::start(&conn, "sync", "surg001").unwrap();
let row_id = recorder.row_id();
recorder
.set_surgical_metadata(
&conn,
"surgical",
"preflight",
r#"{"issues":[7,8],"mrs":[101]}"#,
)
.unwrap();
recorder.update_phase(&conn, "ingest").unwrap();
recorder
.record_entity_result(&conn, "issue", "fetched")
.unwrap();
recorder
.record_entity_result(&conn, "issue", "fetched")
.unwrap();
recorder
.record_entity_result(&conn, "issue", "ingested")
.unwrap();
recorder
.record_entity_result(&conn, "mr", "fetched")
.unwrap();
recorder
.record_entity_result(&conn, "mr", "ingested")
.unwrap();
recorder.succeed(&conn, &[], 3, 0).unwrap();
let (mode, phase, iids, issues_fetched, mrs_fetched, issues_ingested, mrs_ingested, status): (
String,
String,
String,
i64,
i64,
i64,
i64,
String,
) = conn
.query_row(
"SELECT mode, phase, surgical_iids_json, issues_fetched, mrs_fetched,
issues_ingested, mrs_ingested, status
FROM sync_runs WHERE id = ?1",
[row_id],
|r| {
Ok((
r.get(0)?,
r.get(1)?,
r.get(2)?,
r.get(3)?,
r.get(4)?,
r.get(5)?,
r.get(6)?,
r.get(7)?,
))
},
)
.unwrap();
assert_eq!(mode, "surgical");
assert_eq!(phase, "ingest"); // Last phase set before succeed
assert!(iids.contains("101"));
assert_eq!(issues_fetched, 2);
assert_eq!(mrs_fetched, 1);
assert_eq!(issues_ingested, 1);
assert_eq!(mrs_ingested, 1);
assert_eq!(status, "succeeded");
}
#[test]
fn surgical_lifecycle_cancel() {
let conn = setup_test_db();
let recorder = SyncRunRecorder::start(&conn, "sync", "cancel01").unwrap();
let row_id = recorder.row_id();
recorder
.set_surgical_metadata(&conn, "surgical", "preflight", "{}")
.unwrap();
recorder
.cancel(&conn, "User requested cancellation")
.unwrap();
let (status, error, cancelled_at, finished_at): (
String,
Option<String>,
Option<i64>,
Option<i64>,
) = conn
.query_row(
"SELECT status, error, cancelled_at, finished_at FROM sync_runs WHERE id = ?1",
[row_id],
|r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
)
.unwrap();
assert_eq!(status, "cancelled");
assert_eq!(error.as_deref(), Some("User requested cancellation"));
assert!(cancelled_at.is_some());
assert!(finished_at.is_some());
}
#[test]
fn record_entity_result_ignores_unknown() {
let conn = setup_test_db();
let recorder = SyncRunRecorder::start(&conn, "sync", "unk001").unwrap();
// Should not panic or error on unknown combinations
recorder
.record_entity_result(&conn, "widget", "exploded")
.unwrap();
}
#[test]
fn record_entity_result_doc_counters() {
let conn = setup_test_db();
let recorder = SyncRunRecorder::start(&conn, "sync", "cnt001").unwrap();
let row_id = recorder.row_id();
recorder
.record_entity_result(&conn, "doc", "regenerated")
.unwrap();
recorder
.record_entity_result(&conn, "doc", "regenerated")
.unwrap();
recorder
.record_entity_result(&conn, "doc", "embedded")
.unwrap();
recorder
.record_entity_result(&conn, "issue", "skipped_stale")
.unwrap();
let (docs_regen, docs_embed, skipped): (i64, i64, i64) = conn
.query_row(
"SELECT docs_regenerated, docs_embedded, skipped_stale FROM sync_runs WHERE id = ?1",
[row_id],
|r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
)
.unwrap();
assert_eq!(docs_regen, 2);
assert_eq!(docs_embed, 1);
assert_eq!(skipped, 1);
}

View File

@@ -7,7 +7,10 @@ pub use extractor::{
extract_discussion_document, extract_issue_document, extract_mr_document, extract_discussion_document, extract_issue_document, extract_mr_document,
extract_note_document, extract_note_document_cached, extract_note_document, extract_note_document_cached,
}; };
pub use regenerator::{RegenerateResult, regenerate_dirty_documents}; pub use regenerator::{
RegenerateForSourcesResult, RegenerateResult, regenerate_dirty_documents,
regenerate_documents_for_sources,
};
pub use truncation::{ pub use truncation::{
MAX_DISCUSSION_BYTES, MAX_DOCUMENT_BYTES_HARD, NoteContent, TruncationReason, TruncationResult, MAX_DISCUSSION_BYTES, MAX_DOCUMENT_BYTES_HARD, NoteContent, TruncationReason, TruncationResult,
truncate_discussion, truncate_hard_cap, truncate_utf8, truncate_discussion, truncate_hard_cap, truncate_utf8,

View File

@@ -268,6 +268,75 @@ fn get_document_id(conn: &Connection, source_type: SourceType, source_id: i64) -
Ok(id) Ok(id)
} }
// ---------------------------------------------------------------------------
// Scoped regeneration for surgical sync
// ---------------------------------------------------------------------------
/// Result of regenerating documents for specific source keys.
#[derive(Debug, Default)]
pub struct RegenerateForSourcesResult {
pub regenerated: usize,
pub unchanged: usize,
pub errored: usize,
/// IDs of documents that were regenerated or confirmed unchanged,
/// for downstream scoped embedding.
pub document_ids: Vec<i64>,
}
/// Regenerate documents for specific source keys only.
///
/// Unlike [`regenerate_dirty_documents`], this does NOT read from the
/// `dirty_sources` table. It processes exactly the provided keys and
/// returns the document IDs for scoped embedding.
pub fn regenerate_documents_for_sources(
conn: &Connection,
source_keys: &[(SourceType, i64)],
) -> Result<RegenerateForSourcesResult> {
let mut result = RegenerateForSourcesResult::default();
let mut cache = ParentMetadataCache::new();
for (source_type, source_id) in source_keys {
match regenerate_one(conn, *source_type, *source_id, &mut cache) {
Ok(changed) => {
if changed {
result.regenerated += 1;
} else {
result.unchanged += 1;
}
clear_dirty(conn, *source_type, *source_id)?;
// Collect document_id for scoped embedding
match get_document_id(conn, *source_type, *source_id) {
Ok(doc_id) => result.document_ids.push(doc_id),
Err(_) => {
// Document was deleted (source no longer exists) — no ID to return
}
}
}
Err(e) => {
warn!(
source_type = %source_type,
source_id,
error = %e,
"Scoped regeneration failed"
);
record_dirty_error(conn, *source_type, *source_id, &e.to_string())?;
result.errored += 1;
}
}
}
debug!(
regenerated = result.regenerated,
unchanged = result.unchanged,
errored = result.errored,
document_ids = result.document_ids.len(),
"Scoped document regeneration complete"
);
Ok(result)
}
#[cfg(test)] #[cfg(test)]
#[path = "regenerator_tests.rs"] #[path = "regenerator_tests.rs"]
mod tests; mod tests;

View File

@@ -518,3 +518,65 @@ fn test_note_regeneration_cache_invalidates_across_parents() {
assert!(beta_content.contains("parent_iid: 99")); assert!(beta_content.contains("parent_iid: 99"));
assert!(beta_content.contains("parent_title: Issue Beta")); assert!(beta_content.contains("parent_title: Issue Beta"));
} }
// ---------------------------------------------------------------------------
// Scoped regeneration (bd-hs6j)
// ---------------------------------------------------------------------------
#[test]
fn scoped_regen_only_processes_specified_sources() {
let conn = setup_db();
conn.execute(
"INSERT INTO issues (id, gitlab_id, project_id, iid, title, state, created_at, updated_at, last_seen_at) VALUES (1, 10, 1, 42, 'Issue A', 'opened', 1000, 2000, 3000)",
[],
).unwrap();
conn.execute(
"INSERT INTO issues (id, gitlab_id, project_id, iid, title, state, created_at, updated_at, last_seen_at) VALUES (2, 20, 1, 43, 'Issue B', 'opened', 1000, 2000, 3000)",
[],
).unwrap();
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
mark_dirty(&conn, SourceType::Issue, 2).unwrap();
// Regenerate only issue 1
let result = regenerate_documents_for_sources(&conn, &[(SourceType::Issue, 1)]).unwrap();
assert_eq!(result.regenerated, 1);
assert_eq!(result.document_ids.len(), 1);
// Issue 1 dirty cleared, issue 2 still dirty
let remaining = get_dirty_sources(&conn).unwrap();
assert_eq!(remaining.len(), 1);
assert_eq!(remaining[0], (SourceType::Issue, 2));
}
#[test]
fn scoped_regen_returns_document_ids() {
let conn = setup_db();
conn.execute(
"INSERT INTO issues (id, gitlab_id, project_id, iid, title, state, created_at, updated_at, last_seen_at) VALUES (1, 10, 1, 42, 'Test', 'opened', 1000, 2000, 3000)",
[],
).unwrap();
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
let result = regenerate_documents_for_sources(&conn, &[(SourceType::Issue, 1)]).unwrap();
assert!(!result.document_ids.is_empty());
let exists: bool = conn
.query_row(
"SELECT EXISTS(SELECT 1 FROM documents WHERE id = ?1)",
[result.document_ids[0]],
|r| r.get(0),
)
.unwrap();
assert!(exists);
}
#[test]
fn scoped_regen_handles_missing_source() {
let conn = setup_db();
// Source key 9999 doesn't exist in issues table
let result = regenerate_documents_for_sources(&conn, &[(SourceType::Issue, 9999)]).unwrap();
// regenerate_one returns Ok(true) for deletions, but no doc_id to return
assert_eq!(result.document_ids.len(), 0);
}

View File

@@ -112,6 +112,20 @@ impl GitLabClient {
self.request("/api/v4/version").await self.request("/api/v4/version").await
} }
pub async fn get_issue_by_iid(&self, gitlab_project_id: i64, iid: i64) -> Result<GitLabIssue> {
let path = format!("/api/v4/projects/{gitlab_project_id}/issues/{iid}");
self.request(&path).await
}
pub async fn get_mr_by_iid(
&self,
gitlab_project_id: i64,
iid: i64,
) -> Result<GitLabMergeRequest> {
let path = format!("/api/v4/projects/{gitlab_project_id}/merge_requests/{iid}");
self.request(&path).await
}
const MAX_RETRIES: u32 = 3; const MAX_RETRIES: u32 = 3;
async fn request<T: serde::de::DeserializeOwned>(&self, path: &str) -> Result<T> { async fn request<T: serde::de::DeserializeOwned>(&self, path: &str) -> Result<T> {
@@ -848,4 +862,143 @@ mod tests {
let result = parse_link_header_next(&headers); let result = parse_link_header_next(&headers);
assert!(result.is_none()); assert!(result.is_none());
} }
// ─────────────────────────────────────────────────────────────────
// get_issue_by_iid / get_mr_by_iid
// ─────────────────────────────────────────────────────────────────
use wiremock::matchers::{header, method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
fn mock_issue_json(iid: i64) -> serde_json::Value {
serde_json::json!({
"id": 1000 + iid,
"iid": iid,
"project_id": 42,
"title": format!("Issue #{iid}"),
"description": null,
"state": "opened",
"created_at": "2024-01-15T10:00:00.000Z",
"updated_at": "2024-01-16T12:00:00.000Z",
"closed_at": null,
"author": { "id": 1, "username": "alice", "name": "Alice", "avatar_url": null },
"assignees": [],
"labels": ["bug"],
"milestone": null,
"due_date": null,
"web_url": format!("https://gitlab.example.com/g/p/-/issues/{iid}")
})
}
fn mock_mr_json(iid: i64) -> serde_json::Value {
serde_json::json!({
"id": 2000 + iid,
"iid": iid,
"project_id": 42,
"title": format!("MR !{iid}"),
"description": null,
"state": "opened",
"draft": false,
"work_in_progress": false,
"source_branch": "feat",
"target_branch": "main",
"sha": "abc123",
"references": { "short": format!("!{iid}"), "full": format!("g/p!{iid}") },
"detailed_merge_status": "mergeable",
"created_at": "2024-02-01T08:00:00.000Z",
"updated_at": "2024-02-02T09:00:00.000Z",
"merged_at": null,
"closed_at": null,
"author": { "id": 2, "username": "bob", "name": "Bob", "avatar_url": null },
"merge_user": null,
"merged_by": null,
"labels": [],
"assignees": [],
"reviewers": [],
"web_url": format!("https://gitlab.example.com/g/p/-/merge_requests/{iid}"),
"merge_commit_sha": null,
"squash_commit_sha": null
})
}
fn test_client(base_url: &str) -> GitLabClient {
GitLabClient::new(base_url, "test-token", Some(1000.0))
}
#[tokio::test]
async fn get_issue_by_iid_success() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/v4/projects/42/issues/7"))
.and(header("PRIVATE-TOKEN", "test-token"))
.respond_with(ResponseTemplate::new(200).set_body_json(mock_issue_json(7)))
.mount(&server)
.await;
let client = test_client(&server.uri());
let issue = client.get_issue_by_iid(42, 7).await.unwrap();
assert_eq!(issue.iid, 7);
assert_eq!(issue.title, "Issue #7");
assert_eq!(issue.state, "opened");
}
#[tokio::test]
async fn get_issue_by_iid_not_found() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/v4/projects/42/issues/999"))
.respond_with(ResponseTemplate::new(404))
.mount(&server)
.await;
let client = test_client(&server.uri());
let err = client.get_issue_by_iid(42, 999).await.unwrap_err();
assert!(
matches!(err, LoreError::GitLabNotFound { .. }),
"Expected GitLabNotFound, got: {err:?}"
);
}
#[tokio::test]
async fn get_mr_by_iid_success() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/v4/projects/42/merge_requests/99"))
.and(header("PRIVATE-TOKEN", "test-token"))
.respond_with(ResponseTemplate::new(200).set_body_json(mock_mr_json(99)))
.mount(&server)
.await;
let client = test_client(&server.uri());
let mr = client.get_mr_by_iid(42, 99).await.unwrap();
assert_eq!(mr.iid, 99);
assert_eq!(mr.title, "MR !99");
assert_eq!(mr.source_branch, "feat");
assert_eq!(mr.target_branch, "main");
}
#[tokio::test]
async fn get_mr_by_iid_not_found() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/v4/projects/42/merge_requests/999"))
.respond_with(ResponseTemplate::new(404))
.mount(&server)
.await;
let client = test_client(&server.uri());
let err = client.get_mr_by_iid(42, 999).await.unwrap_err();
assert!(
matches!(err, LoreError::GitLabNotFound { .. }),
"Expected GitLabNotFound, got: {err:?}"
);
}
} }

View File

@@ -140,7 +140,7 @@ fn passes_cursor_filter_with_ts(gitlab_id: i64, issue_ts: i64, cursor: &SyncCurs
true true
} }
fn process_single_issue( pub(crate) fn process_single_issue(
conn: &Connection, conn: &Connection,
config: &Config, config: &Config,
project_id: i64, project_id: i64,

View File

@@ -135,13 +135,13 @@ pub async fn ingest_merge_requests(
Ok(result) Ok(result)
} }
struct ProcessMrResult { pub(crate) struct ProcessMrResult {
labels_created: usize, pub(crate) labels_created: usize,
assignees_linked: usize, pub(crate) assignees_linked: usize,
reviewers_linked: usize, pub(crate) reviewers_linked: usize,
} }
fn process_single_mr( pub(crate) fn process_single_mr(
conn: &Connection, conn: &Connection,
config: &Config, config: &Config,
project_id: i64, project_id: i64,

View File

@@ -6,6 +6,7 @@ pub mod merge_requests;
pub mod mr_diffs; pub mod mr_diffs;
pub mod mr_discussions; pub mod mr_discussions;
pub mod orchestrator; pub mod orchestrator;
pub(crate) mod surgical;
pub use discussions::{IngestDiscussionsResult, ingest_issue_discussions}; pub use discussions::{IngestDiscussionsResult, ingest_issue_discussions};
pub use issues::{IngestIssuesResult, IssueForDiscussionSync, ingest_issues}; pub use issues::{IngestIssuesResult, IssueForDiscussionSync, ingest_issues};

View File

@@ -1097,7 +1097,7 @@ async fn drain_resource_events(
} }
/// Store resource events using the provided connection (caller manages the transaction). /// Store resource events using the provided connection (caller manages the transaction).
fn store_resource_events( pub(crate) fn store_resource_events(
conn: &Connection, conn: &Connection,
project_id: i64, project_id: i64,
entity_type: &str, entity_type: &str,
@@ -1406,7 +1406,7 @@ async fn drain_mr_closes_issues(
Ok(result) Ok(result)
} }
fn store_closes_issues_refs( pub(crate) fn store_closes_issues_refs(
conn: &Connection, conn: &Connection,
project_id: i64, project_id: i64,
mr_local_id: i64, mr_local_id: i64,

462
src/ingestion/surgical.rs Normal file
View File

@@ -0,0 +1,462 @@
//! Surgical (by-IID) sync pipeline.
//!
//! Provides targeted fetch and ingest for individual issues and merge requests,
//! as opposed to the bulk pagination paths in `issues.rs` / `merge_requests.rs`.
//!
//! Consumed by the orchestration layer (bd-1i4i) and dispatch wiring (bd-3bec).
#![allow(dead_code)] // Public API consumed by downstream beads not yet wired.
use rusqlite::Connection;
use tracing::debug;
use crate::Config;
use crate::core::error::{LoreError, Result};
use crate::documents::SourceType;
use crate::gitlab::GitLabClient;
use crate::gitlab::types::{GitLabIssue, GitLabMergeRequest};
use crate::ingestion::dirty_tracker;
use crate::ingestion::issues::process_single_issue;
use crate::ingestion::merge_requests::{ProcessMrResult, process_single_mr};
use crate::ingestion::mr_diffs::upsert_mr_file_changes;
use crate::ingestion::orchestrator::{store_closes_issues_refs, store_resource_events};
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
/// A single entity to fetch surgically by IID.
#[derive(Debug, Clone)]
pub enum SurgicalTarget {
Issue { iid: u64 },
MergeRequest { iid: u64 },
}
impl SurgicalTarget {
pub fn entity_type(&self) -> &'static str {
match self {
Self::Issue { .. } => "issue",
Self::MergeRequest { .. } => "merge_request",
}
}
pub fn iid(&self) -> u64 {
match self {
Self::Issue { iid } | Self::MergeRequest { iid } => *iid,
}
}
}
/// Outcome of a failed preflight fetch for one target.
#[derive(Debug)]
pub struct PreflightFailure {
pub target: SurgicalTarget,
pub error: LoreError,
}
/// Collected results from preflight fetching multiple targets.
#[derive(Debug, Default)]
pub struct PreflightResult {
pub issues: Vec<GitLabIssue>,
pub merge_requests: Vec<GitLabMergeRequest>,
pub failures: Vec<PreflightFailure>,
}
/// Result of ingesting a single issue by IID.
#[derive(Debug)]
pub struct IngestIssueResult {
pub upserted: bool,
pub labels_created: usize,
pub skipped_stale: bool,
pub dirty_source_keys: Vec<(SourceType, i64)>,
}
/// Result of ingesting a single MR by IID.
#[derive(Debug)]
pub struct IngestMrResult {
pub upserted: bool,
pub labels_created: usize,
pub assignees_linked: usize,
pub reviewers_linked: usize,
pub skipped_stale: bool,
pub dirty_source_keys: Vec<(SourceType, i64)>,
}
// ---------------------------------------------------------------------------
// TOCTOU guard
// ---------------------------------------------------------------------------
/// Returns `true` if the payload is stale (same age or older than the DB row).
///
/// `payload_updated_at` is an ISO 8601 string from the GitLab API.
/// `db_updated_at_ms` is the ms-epoch value from the local DB, or `None` if
/// the entity has never been ingested.
pub fn is_stale(payload_updated_at: &str, db_updated_at_ms: Option<i64>) -> Result<bool> {
let Some(db_ms) = db_updated_at_ms else {
return Ok(false); // First-ever ingest — not stale.
};
let payload_ms = chrono::DateTime::parse_from_rfc3339(payload_updated_at)
.map(|dt| dt.timestamp_millis())
.map_err(|e| {
LoreError::Other(format!(
"Failed to parse timestamp '{payload_updated_at}': {e}"
))
})?;
Ok(payload_ms <= db_ms)
}
// ---------------------------------------------------------------------------
// Preflight fetch
// ---------------------------------------------------------------------------
/// Fetch one or more entities by IID from GitLab, collecting successes and failures.
///
/// A 404 for any individual target is recorded as a [`PreflightFailure`] with
/// a [`LoreError::SurgicalPreflightFailed`] error; other targets proceed.
/// Hard errors (auth, network) propagate immediately.
pub async fn preflight_fetch(
client: &GitLabClient,
gitlab_project_id: i64,
project_path: &str,
targets: &[SurgicalTarget],
) -> Result<PreflightResult> {
let mut result = PreflightResult::default();
for target in targets {
match target {
SurgicalTarget::Issue { iid } => {
match client
.get_issue_by_iid(gitlab_project_id, *iid as i64)
.await
{
Ok(issue) => result.issues.push(issue),
Err(LoreError::GitLabNotFound { .. }) => {
result.failures.push(PreflightFailure {
target: target.clone(),
error: LoreError::SurgicalPreflightFailed {
entity_type: "issue".to_string(),
iid: *iid,
project: project_path.to_string(),
reason: "not found on GitLab".to_string(),
},
});
}
Err(e) if e.is_permanent_api_error() => {
return Err(e);
}
Err(e) => {
result.failures.push(PreflightFailure {
target: target.clone(),
error: e,
});
}
}
}
SurgicalTarget::MergeRequest { iid } => {
match client.get_mr_by_iid(gitlab_project_id, *iid as i64).await {
Ok(mr) => result.merge_requests.push(mr),
Err(LoreError::GitLabNotFound { .. }) => {
result.failures.push(PreflightFailure {
target: target.clone(),
error: LoreError::SurgicalPreflightFailed {
entity_type: "merge_request".to_string(),
iid: *iid,
project: project_path.to_string(),
reason: "not found on GitLab".to_string(),
},
});
}
Err(e) if e.is_permanent_api_error() => {
return Err(e);
}
Err(e) => {
result.failures.push(PreflightFailure {
target: target.clone(),
error: e,
});
}
}
}
}
}
Ok(result)
}
// ---------------------------------------------------------------------------
// Ingest single issue by IID
// ---------------------------------------------------------------------------
/// Ingest a single pre-fetched issue into the local DB.
///
/// Applies a TOCTOU guard: if the DB already has a row with the same or newer
/// `updated_at`, the ingest is skipped and `skipped_stale` is set to `true`.
pub fn ingest_issue_by_iid(
conn: &Connection,
config: &Config,
project_id: i64,
issue: &GitLabIssue,
) -> Result<IngestIssueResult> {
let db_updated_at = get_issue_updated_at(conn, project_id, issue.iid)?;
if is_stale(&issue.updated_at, db_updated_at)? {
debug!(
iid = issue.iid,
"Surgical issue ingest: skipping stale payload"
);
return Ok(IngestIssueResult {
upserted: false,
labels_created: 0,
skipped_stale: true,
dirty_source_keys: vec![],
});
}
let labels_created = process_single_issue(conn, config, project_id, issue)?;
let local_issue_id: i64 = conn.query_row(
"SELECT id FROM issues WHERE project_id = ? AND iid = ?",
(project_id, issue.iid),
|row| row.get(0),
)?;
// Mark dirty for downstream scoped doc regeneration.
dirty_tracker::mark_dirty(conn, SourceType::Issue, local_issue_id)?;
debug!(
iid = issue.iid,
local_id = local_issue_id,
labels_created,
"Surgical issue ingest: upserted"
);
Ok(IngestIssueResult {
upserted: true,
labels_created,
skipped_stale: false,
dirty_source_keys: vec![(SourceType::Issue, local_issue_id)],
})
}
// ---------------------------------------------------------------------------
// Ingest single MR by IID
// ---------------------------------------------------------------------------
/// Ingest a single pre-fetched merge request into the local DB.
///
/// Same TOCTOU guard as [`ingest_issue_by_iid`].
pub fn ingest_mr_by_iid(
conn: &Connection,
config: &Config,
project_id: i64,
mr: &GitLabMergeRequest,
) -> Result<IngestMrResult> {
let db_updated_at = get_mr_updated_at(conn, project_id, mr.iid)?;
if is_stale(&mr.updated_at, db_updated_at)? {
debug!(iid = mr.iid, "Surgical MR ingest: skipping stale payload");
return Ok(IngestMrResult {
upserted: false,
labels_created: 0,
assignees_linked: 0,
reviewers_linked: 0,
skipped_stale: true,
dirty_source_keys: vec![],
});
}
let ProcessMrResult {
labels_created,
assignees_linked,
reviewers_linked,
} = process_single_mr(conn, config, project_id, mr)?;
let local_mr_id: i64 = conn.query_row(
"SELECT id FROM merge_requests WHERE project_id = ? AND iid = ?",
(project_id, mr.iid),
|row| row.get(0),
)?;
dirty_tracker::mark_dirty(conn, SourceType::MergeRequest, local_mr_id)?;
debug!(
iid = mr.iid,
local_id = local_mr_id,
labels_created,
assignees_linked,
reviewers_linked,
"Surgical MR ingest: upserted"
);
Ok(IngestMrResult {
upserted: true,
labels_created,
assignees_linked,
reviewers_linked,
skipped_stale: false,
dirty_source_keys: vec![(SourceType::MergeRequest, local_mr_id)],
})
}
// ---------------------------------------------------------------------------
// Per-entity dependent enrichment (bd-kanh)
// ---------------------------------------------------------------------------
/// Fetch and store resource events (state, label, milestone) for a single entity.
///
/// Updates the `resource_events_synced_for_updated_at` watermark so the bulk
/// pipeline will not redundantly re-fetch these events.
pub async fn enrich_entity_resource_events(
client: &GitLabClient,
conn: &Connection,
project_id: i64,
gitlab_project_id: i64,
entity_type: &str,
iid: i64,
local_id: i64,
) -> Result<()> {
let (state_events, label_events, milestone_events) = client
.fetch_all_resource_events(gitlab_project_id, entity_type, iid)
.await?;
store_resource_events(
conn,
project_id,
entity_type,
local_id,
&state_events,
&label_events,
&milestone_events,
)?;
// Update watermark.
let sql = match entity_type {
"issue" => {
"UPDATE issues SET resource_events_synced_for_updated_at = updated_at WHERE id = ?"
}
"merge_request" => {
"UPDATE merge_requests SET resource_events_synced_for_updated_at = updated_at WHERE id = ?"
}
other => {
debug!(
entity_type = other,
"Unknown entity type for resource events watermark"
);
return Ok(());
}
};
conn.execute(sql, [local_id])?;
debug!(
entity_type,
iid,
local_id,
state = state_events.len(),
label = label_events.len(),
milestone = milestone_events.len(),
"Surgical: enriched resource events"
);
Ok(())
}
/// Fetch and store closes-issues references for a single merge request.
///
/// Updates the `closes_issues_synced_for_updated_at` watermark.
pub async fn enrich_mr_closes_issues(
client: &GitLabClient,
conn: &Connection,
project_id: i64,
gitlab_project_id: i64,
iid: i64,
local_mr_id: i64,
) -> Result<()> {
let refs = client
.fetch_mr_closes_issues(gitlab_project_id, iid)
.await?;
store_closes_issues_refs(conn, project_id, local_mr_id, &refs)?;
conn.execute(
"UPDATE merge_requests SET closes_issues_synced_for_updated_at = updated_at WHERE id = ?",
[local_mr_id],
)?;
debug!(
iid,
local_mr_id,
refs = refs.len(),
"Surgical: enriched closes-issues refs"
);
Ok(())
}
/// Fetch and store MR file-change diffs for a single merge request.
///
/// Updates the `diffs_synced_for_updated_at` watermark.
pub async fn enrich_mr_file_changes(
client: &GitLabClient,
conn: &Connection,
project_id: i64,
gitlab_project_id: i64,
iid: i64,
local_mr_id: i64,
) -> Result<()> {
let diffs = client.fetch_mr_diffs(gitlab_project_id, iid).await?;
upsert_mr_file_changes(conn, local_mr_id, project_id, &diffs)?;
conn.execute(
"UPDATE merge_requests SET diffs_synced_for_updated_at = updated_at WHERE id = ?",
[local_mr_id],
)?;
debug!(
iid,
local_mr_id,
diffs = diffs.len(),
"Surgical: enriched MR file changes"
);
Ok(())
}
// ---------------------------------------------------------------------------
// DB helpers
// ---------------------------------------------------------------------------
fn get_issue_updated_at(conn: &Connection, project_id: i64, iid: i64) -> Result<Option<i64>> {
let result = conn.query_row(
"SELECT updated_at FROM issues WHERE project_id = ? AND iid = ?",
(project_id, iid),
|row| row.get(0),
);
match result {
Ok(ts) => Ok(Some(ts)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
fn get_mr_updated_at(conn: &Connection, project_id: i64, iid: i64) -> Result<Option<i64>> {
let result = conn.query_row(
"SELECT updated_at FROM merge_requests WHERE project_id = ? AND iid = ?",
(project_id, iid),
|row| row.get(0),
);
match result {
Ok(ts) => Ok(Some(ts)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
#[path = "surgical_tests.rs"]
mod tests;

View File

@@ -0,0 +1,913 @@
//! Tests for `surgical.rs` — surgical (by-IID) sync pipeline.
use std::path::Path;
use crate::core::config::{Config, GitLabConfig, ProjectConfig};
use crate::core::db::{create_connection, run_migrations};
use crate::gitlab::types::{
GitLabAuthor, GitLabIssue, GitLabMergeRequest, GitLabReferences, GitLabReviewer,
};
use crate::ingestion::surgical::{SurgicalTarget, ingest_issue_by_iid, ingest_mr_by_iid, is_stale};
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
fn setup_db() -> rusqlite::Connection {
let conn = create_connection(Path::new(":memory:")).unwrap();
run_migrations(&conn).unwrap();
seed_project(&conn);
conn
}
fn seed_project(conn: &rusqlite::Connection) {
conn.execute(
"INSERT INTO projects (id, gitlab_project_id, path_with_namespace, web_url)
VALUES (1, 42, 'group/repo', 'https://gitlab.example.com/group/repo')",
[],
)
.unwrap();
}
fn test_config() -> Config {
Config {
gitlab: GitLabConfig {
base_url: "https://gitlab.example.com".to_string(),
token_env_var: "GITLAB_TOKEN".to_string(),
},
projects: vec![ProjectConfig {
path: "group/repo".to_string(),
}],
default_project: None,
sync: Default::default(),
storage: Default::default(),
embedding: Default::default(),
logging: Default::default(),
scoring: Default::default(),
}
}
fn make_test_issue(iid: i64, updated_at: &str) -> GitLabIssue {
GitLabIssue {
id: 1000 + iid,
iid,
project_id: 42,
title: format!("Test issue #{iid}"),
description: Some("Test description".to_string()),
state: "opened".to_string(),
created_at: "2024-01-01T00:00:00.000+00:00".to_string(),
updated_at: updated_at.to_string(),
closed_at: None,
author: GitLabAuthor {
id: 1,
username: "alice".to_string(),
name: "Alice".to_string(),
},
assignees: vec![],
labels: vec!["bug".to_string()],
milestone: None,
due_date: None,
web_url: format!("https://gitlab.example.com/group/repo/-/issues/{iid}"),
}
}
fn make_test_mr(iid: i64, updated_at: &str) -> GitLabMergeRequest {
GitLabMergeRequest {
id: 2000 + iid,
iid,
project_id: 42,
title: format!("Test MR !{iid}"),
description: Some("MR description".to_string()),
state: "opened".to_string(),
draft: false,
work_in_progress: false,
source_branch: "feat".to_string(),
target_branch: "main".to_string(),
sha: Some("abc123def456".to_string()),
references: Some(GitLabReferences {
short: format!("!{iid}"),
full: format!("group/repo!{iid}"),
}),
detailed_merge_status: Some("mergeable".to_string()),
merge_status_legacy: None,
created_at: "2024-01-01T00:00:00.000+00:00".to_string(),
updated_at: updated_at.to_string(),
merged_at: None,
closed_at: None,
author: GitLabAuthor {
id: 2,
username: "bob".to_string(),
name: "Bob".to_string(),
},
merge_user: None,
merged_by: None,
labels: vec![],
assignees: vec![],
reviewers: vec![GitLabReviewer {
id: 3,
username: "carol".to_string(),
name: "Carol".to_string(),
}],
web_url: format!("https://gitlab.example.com/group/repo/-/merge_requests/{iid}"),
merge_commit_sha: None,
squash_commit_sha: None,
}
}
fn get_dirty_keys(conn: &rusqlite::Connection) -> Vec<(String, i64)> {
let mut stmt = conn
.prepare("SELECT source_type, source_id FROM dirty_sources ORDER BY source_type, source_id")
.unwrap();
stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
.unwrap()
.collect::<std::result::Result<Vec<_>, _>>()
.unwrap()
}
// ---------------------------------------------------------------------------
// is_stale — TOCTOU guard
// ---------------------------------------------------------------------------
#[test]
fn test_is_stale_parses_iso8601() {
// 2024-01-15T10:00:00.000Z → 1_705_312_800_000 ms
let payload_ts = "2024-01-15T10:00:00.000Z";
let db_ts = Some(1_705_312_800_000i64);
// Same timestamp → stale (payload is NOT newer).
assert!(is_stale(payload_ts, db_ts).unwrap());
}
#[test]
fn test_is_stale_handles_none_db_value() {
// First-ever ingest: no row in DB → not stale.
let payload_ts = "2024-01-15T10:00:00.000Z";
assert!(!is_stale(payload_ts, None).unwrap());
}
#[test]
fn test_is_stale_newer_payload_is_not_stale() {
// DB has T1, payload has T2 (1 second later) → not stale.
let payload_ts = "2024-01-15T10:00:01.000Z";
let db_ts = Some(1_705_312_800_000i64);
assert!(!is_stale(payload_ts, db_ts).unwrap());
}
#[test]
fn test_is_stale_older_payload_is_stale() {
// DB has T2, payload has T1 (1 second earlier) → stale.
let payload_ts = "2024-01-15T09:59:59.000Z";
let db_ts = Some(1_705_312_800_000i64);
assert!(is_stale(payload_ts, db_ts).unwrap());
}
#[test]
fn test_is_stale_parses_timezone_offset() {
// GitLab sometimes returns +00:00 instead of Z.
let payload_ts = "2024-01-15T10:00:00.000+00:00";
let db_ts = Some(1_705_312_800_000i64);
assert!(is_stale(payload_ts, db_ts).unwrap());
}
#[test]
fn test_is_stale_with_z_suffix() {
// Z suffix (no ms) also parses correctly.
let payload_ts = "2024-01-15T10:00:00Z";
let db_ts = Some(1_705_312_800_000i64);
assert!(is_stale(payload_ts, db_ts).unwrap());
}
#[test]
fn test_is_stale_invalid_timestamp_returns_error() {
let result = is_stale("not-a-timestamp", Some(0));
assert!(result.is_err());
}
// ---------------------------------------------------------------------------
// SurgicalTarget
// ---------------------------------------------------------------------------
#[test]
fn test_surgical_target_display_issue() {
let target = SurgicalTarget::Issue { iid: 42 };
assert_eq!(target.entity_type(), "issue");
assert_eq!(target.iid(), 42);
}
#[test]
fn test_surgical_target_display_mr() {
let target = SurgicalTarget::MergeRequest { iid: 99 };
assert_eq!(target.entity_type(), "merge_request");
assert_eq!(target.iid(), 99);
}
// ---------------------------------------------------------------------------
// ingest_issue_by_iid — full DB integration
// ---------------------------------------------------------------------------
#[test]
fn test_ingest_issue_by_iid_upserts_and_marks_dirty() {
let conn = setup_db();
let issue = make_test_issue(42, "2026-02-17T12:00:00.000+00:00");
let config = test_config();
let result = ingest_issue_by_iid(&conn, &config, 1, &issue).unwrap();
assert!(result.upserted);
assert!(!result.skipped_stale);
assert_eq!(result.labels_created, 1); // "bug" label
// Verify dirty marking.
let dirty = get_dirty_keys(&conn);
assert!(
dirty.iter().any(|(t, _)| t == "issue"),
"Expected dirty issue entry, got: {dirty:?}"
);
}
#[test]
fn test_ingest_issue_returns_dirty_source_keys() {
let conn = setup_db();
let issue = make_test_issue(7, "2026-02-17T12:00:00.000+00:00");
let config = test_config();
let result = ingest_issue_by_iid(&conn, &config, 1, &issue).unwrap();
assert_eq!(result.dirty_source_keys.len(), 1);
let (source_type, source_id) = &result.dirty_source_keys[0];
assert_eq!(source_type.to_string(), "issue");
assert!(*source_id > 0);
}
#[test]
fn test_toctou_skips_stale_issue() {
let conn = setup_db();
let issue = make_test_issue(42, "2026-02-17T12:00:00.000+00:00");
let config = test_config();
// First ingest succeeds.
let first = ingest_issue_by_iid(&conn, &config, 1, &issue).unwrap();
assert!(first.upserted);
// Same timestamp again → stale.
let second = ingest_issue_by_iid(&conn, &config, 1, &issue).unwrap();
assert!(second.skipped_stale);
assert!(!second.upserted);
}
#[test]
fn test_toctou_allows_newer_issue() {
let conn = setup_db();
let config = test_config();
// First ingest at T1.
let issue_t1 = make_test_issue(42, "2026-02-17T12:00:00.000+00:00");
let first = ingest_issue_by_iid(&conn, &config, 1, &issue_t1).unwrap();
assert!(first.upserted);
// Second ingest at T2 (1 minute later) → not stale.
let issue_t2 = make_test_issue(42, "2026-02-17T12:01:00.000+00:00");
let second = ingest_issue_by_iid(&conn, &config, 1, &issue_t2).unwrap();
assert!(second.upserted);
assert!(!second.skipped_stale);
}
#[test]
fn test_ingest_issue_updates_existing() {
let conn = setup_db();
let config = test_config();
// First ingest.
let mut issue = make_test_issue(42, "2026-02-17T12:00:00.000+00:00");
ingest_issue_by_iid(&conn, &config, 1, &issue).unwrap();
// Update title and timestamp.
issue.title = "Updated title".to_string();
issue.updated_at = "2026-02-17T13:00:00.000+00:00".to_string();
ingest_issue_by_iid(&conn, &config, 1, &issue).unwrap();
// Verify the title was updated in DB.
let title: String = conn
.query_row(
"SELECT title FROM issues WHERE project_id = 1 AND iid = 42",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(title, "Updated title");
}
// ---------------------------------------------------------------------------
// ingest_mr_by_iid — full DB integration
// ---------------------------------------------------------------------------
#[test]
fn test_ingest_mr_by_iid_upserts_and_marks_dirty() {
let conn = setup_db();
let mr = make_test_mr(99, "2026-02-17T12:00:00.000+00:00");
let config = test_config();
let result = ingest_mr_by_iid(&conn, &config, 1, &mr).unwrap();
assert!(result.upserted);
assert!(!result.skipped_stale);
assert_eq!(result.reviewers_linked, 1); // "carol"
let dirty = get_dirty_keys(&conn);
assert!(
dirty.iter().any(|(t, _)| t == "merge_request"),
"Expected dirty MR entry, got: {dirty:?}"
);
}
#[test]
fn test_ingest_mr_returns_dirty_source_keys() {
let conn = setup_db();
let mr = make_test_mr(99, "2026-02-17T12:00:00.000+00:00");
let config = test_config();
let result = ingest_mr_by_iid(&conn, &config, 1, &mr).unwrap();
assert_eq!(result.dirty_source_keys.len(), 1);
let (source_type, source_id) = &result.dirty_source_keys[0];
assert_eq!(source_type.to_string(), "merge_request");
assert!(*source_id > 0);
}
#[test]
fn test_toctou_skips_stale_mr() {
let conn = setup_db();
let mr = make_test_mr(99, "2026-02-17T12:00:00.000+00:00");
let config = test_config();
let first = ingest_mr_by_iid(&conn, &config, 1, &mr).unwrap();
assert!(first.upserted);
let second = ingest_mr_by_iid(&conn, &config, 1, &mr).unwrap();
assert!(second.skipped_stale);
assert!(!second.upserted);
}
#[test]
fn test_toctou_allows_newer_mr() {
let conn = setup_db();
let config = test_config();
let mr_t1 = make_test_mr(99, "2026-02-17T12:00:00.000+00:00");
let first = ingest_mr_by_iid(&conn, &config, 1, &mr_t1).unwrap();
assert!(first.upserted);
let mr_t2 = make_test_mr(99, "2026-02-17T12:01:00.000+00:00");
let second = ingest_mr_by_iid(&conn, &config, 1, &mr_t2).unwrap();
assert!(second.upserted);
assert!(!second.skipped_stale);
}
#[test]
fn test_ingest_mr_updates_existing() {
let conn = setup_db();
let config = test_config();
let mut mr = make_test_mr(99, "2026-02-17T12:00:00.000+00:00");
ingest_mr_by_iid(&conn, &config, 1, &mr).unwrap();
mr.title = "Updated MR title".to_string();
mr.updated_at = "2026-02-17T13:00:00.000+00:00".to_string();
ingest_mr_by_iid(&conn, &config, 1, &mr).unwrap();
let title: String = conn
.query_row(
"SELECT title FROM merge_requests WHERE project_id = 1 AND iid = 99",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(title, "Updated MR title");
}
// ---------------------------------------------------------------------------
// preflight_fetch — wiremock (async)
// ---------------------------------------------------------------------------
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
use crate::gitlab::GitLabClient;
use crate::ingestion::surgical::preflight_fetch;
#[tokio::test]
async fn test_preflight_fetch_returns_issues_and_mrs() {
let server = MockServer::start().await;
let issue_json = serde_json::json!({
"id": 1042,
"iid": 42,
"project_id": 100,
"title": "Fetched issue",
"description": null,
"state": "opened",
"created_at": "2026-02-17T10:00:00.000+00:00",
"updated_at": "2026-02-17T12:00:00.000+00:00",
"closed_at": null,
"author": { "id": 1, "username": "alice", "name": "Alice" },
"assignees": [],
"labels": [],
"milestone": null,
"due_date": null,
"web_url": "https://gitlab.example.com/g/p/-/issues/42"
});
let mr_json = serde_json::json!({
"id": 2099,
"iid": 99,
"project_id": 100,
"title": "Fetched MR",
"description": null,
"state": "opened",
"draft": false,
"work_in_progress": false,
"source_branch": "feat",
"target_branch": "main",
"sha": "abc",
"references": { "short": "!99", "full": "g/p!99" },
"detailed_merge_status": "mergeable",
"created_at": "2026-02-17T10:00:00.000+00:00",
"updated_at": "2026-02-17T12:00:00.000+00:00",
"merged_at": null,
"closed_at": null,
"author": { "id": 2, "username": "bob", "name": "Bob" },
"merge_user": null,
"merged_by": null,
"labels": [],
"assignees": [],
"reviewers": [],
"web_url": "https://gitlab.example.com/g/p/-/merge_requests/99",
"merge_commit_sha": null,
"squash_commit_sha": null
});
Mock::given(method("GET"))
.and(path("/api/v4/projects/100/issues/42"))
.respond_with(ResponseTemplate::new(200).set_body_json(&issue_json))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/api/v4/projects/100/merge_requests/99"))
.respond_with(ResponseTemplate::new(200).set_body_json(&mr_json))
.mount(&server)
.await;
let client = GitLabClient::new(&server.uri(), "test-token", Some(1000.0));
let targets = vec![
SurgicalTarget::Issue { iid: 42 },
SurgicalTarget::MergeRequest { iid: 99 },
];
let result = preflight_fetch(&client, 100, "g/p", &targets)
.await
.unwrap();
assert_eq!(result.issues.len(), 1);
assert_eq!(result.issues[0].iid, 42);
assert_eq!(result.merge_requests.len(), 1);
assert_eq!(result.merge_requests[0].iid, 99);
assert!(result.failures.is_empty());
}
#[tokio::test]
async fn test_preflight_fetch_collects_failures() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/v4/projects/100/issues/999"))
.respond_with(ResponseTemplate::new(404))
.mount(&server)
.await;
let client = GitLabClient::new(&server.uri(), "test-token", Some(1000.0));
let targets = vec![SurgicalTarget::Issue { iid: 999 }];
let result = preflight_fetch(&client, 100, "g/p", &targets)
.await
.unwrap();
assert!(result.issues.is_empty());
assert_eq!(result.failures.len(), 1);
assert_eq!(result.failures[0].target.iid(), 999);
}
// ---------------------------------------------------------------------------
// Per-entity dependent helpers (bd-kanh)
// ---------------------------------------------------------------------------
use crate::ingestion::surgical::{
enrich_entity_resource_events, enrich_mr_closes_issues, enrich_mr_file_changes,
};
#[tokio::test]
async fn test_enrich_resource_events_stores_and_watermarks() {
let conn = setup_db();
let config = test_config();
let issue = make_test_issue(42, "2026-02-17T12:00:00.000+00:00");
ingest_issue_by_iid(&conn, &config, 1, &issue).unwrap();
let local_id: i64 = conn
.query_row(
"SELECT id FROM issues WHERE project_id = 1 AND iid = 42",
[],
|r| r.get(0),
)
.unwrap();
let server = MockServer::start().await;
// Mock all 3 resource event endpoints returning empty arrays
Mock::given(method("GET"))
.and(path("/api/v4/projects/42/issues/42/resource_state_events"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([])))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/api/v4/projects/42/issues/42/resource_label_events"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([])))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path(
"/api/v4/projects/42/issues/42/resource_milestone_events",
))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([])))
.mount(&server)
.await;
let client = GitLabClient::new(&server.uri(), "test-token", Some(1000.0));
enrich_entity_resource_events(&client, &conn, 1, 42, "issue", 42, local_id)
.await
.unwrap();
// Verify watermark was set
let watermark: Option<i64> = conn
.query_row(
"SELECT resource_events_synced_for_updated_at FROM issues WHERE id = ?",
[local_id],
|r| r.get(0),
)
.unwrap();
assert!(watermark.is_some());
}
#[tokio::test]
async fn test_enrich_mr_closes_issues_stores_refs() {
let conn = setup_db();
let config = test_config();
let mr = make_test_mr(99, "2026-02-17T12:00:00.000+00:00");
ingest_mr_by_iid(&conn, &config, 1, &mr).unwrap();
let local_mr_id: i64 = conn
.query_row(
"SELECT id FROM merge_requests WHERE project_id = 1 AND iid = 99",
[],
|r| r.get(0),
)
.unwrap();
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/v4/projects/42/merge_requests/99/closes_issues"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([
{
"id": 1042,
"iid": 42,
"project_id": 42,
"title": "Closed issue",
"state": "closed",
"web_url": "https://gitlab.example.com/group/repo/-/issues/42"
}
])))
.mount(&server)
.await;
let client = GitLabClient::new(&server.uri(), "test-token", Some(1000.0));
enrich_mr_closes_issues(&client, &conn, 1, 42, 99, local_mr_id)
.await
.unwrap();
// Verify entity_reference was created
let ref_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM entity_references
WHERE source_entity_type = 'merge_request' AND source_entity_id = ?
AND reference_type = 'closes'",
[local_mr_id],
|r| r.get(0),
)
.unwrap();
assert_eq!(ref_count, 1);
// Verify watermark
let watermark: Option<i64> = conn
.query_row(
"SELECT closes_issues_synced_for_updated_at FROM merge_requests WHERE id = ?",
[local_mr_id],
|r| r.get(0),
)
.unwrap();
assert!(watermark.is_some());
}
#[tokio::test]
async fn test_enrich_mr_file_changes_stores_diffs() {
let conn = setup_db();
let config = test_config();
let mr = make_test_mr(99, "2026-02-17T12:00:00.000+00:00");
ingest_mr_by_iid(&conn, &config, 1, &mr).unwrap();
let local_mr_id: i64 = conn
.query_row(
"SELECT id FROM merge_requests WHERE project_id = 1 AND iid = 99",
[],
|r| r.get(0),
)
.unwrap();
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/v4/projects/42/merge_requests/99/diffs"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([
{
"old_path": "src/main.rs",
"new_path": "src/main.rs",
"new_file": false,
"renamed_file": false,
"deleted_file": false
}
])))
.mount(&server)
.await;
let client = GitLabClient::new(&server.uri(), "test-token", Some(1000.0));
enrich_mr_file_changes(&client, &conn, 1, 42, 99, local_mr_id)
.await
.unwrap();
// Verify file change was stored
let fc_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM mr_file_changes WHERE merge_request_id = ?",
[local_mr_id],
|r| r.get(0),
)
.unwrap();
assert_eq!(fc_count, 1);
// Verify watermark
let watermark: Option<i64> = conn
.query_row(
"SELECT diffs_synced_for_updated_at FROM merge_requests WHERE id = ?",
[local_mr_id],
|r| r.get(0),
)
.unwrap();
assert!(watermark.is_some());
}
// ---------------------------------------------------------------------------
// Integration tests (bd-3jqx)
// ---------------------------------------------------------------------------
/// Preflight fetch with a mix of success and 404 — verify partial results.
#[tokio::test]
async fn test_surgical_cancellation_during_preflight() {
// Test that preflight handles partial failures gracefully: one issue exists,
// another returns 404. The existing issue should succeed, the missing one
// should be recorded as a failure — not abort the entire preflight.
let server = MockServer::start().await;
// Issue 7 exists
Mock::given(method("GET"))
.and(path("/api/v4/projects/42/issues/7"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"id": 1007, "iid": 7, "project_id": 42,
"title": "Existing issue", "description": "desc",
"state": "opened",
"created_at": "2026-02-17T10:00:00.000+00:00",
"updated_at": "2026-02-17T12:00:00.000+00:00",
"closed_at": null,
"author": {"id": 1, "username": "alice", "name": "Alice"},
"assignees": [], "labels": [], "milestone": null, "due_date": null,
"web_url": "https://gitlab.example.com/group/repo/-/issues/7"
})))
.mount(&server)
.await;
// Issue 999 does not exist
Mock::given(method("GET"))
.and(path("/api/v4/projects/42/issues/999"))
.respond_with(ResponseTemplate::new(404).set_body_json(serde_json::json!({
"message": "404 Not Found"
})))
.mount(&server)
.await;
let client = GitLabClient::new(&server.uri(), "test-token", Some(1000.0));
let targets = vec![
SurgicalTarget::Issue { iid: 7 },
SurgicalTarget::Issue { iid: 999 },
];
let result = preflight_fetch(&client, 42, "group/repo", &targets)
.await
.unwrap();
assert_eq!(result.issues.len(), 1, "One issue should succeed");
assert_eq!(result.issues[0].iid, 7);
assert_eq!(result.failures.len(), 1, "One issue should fail");
assert_eq!(result.failures[0].target.iid(), 999);
}
/// Preflight fetch for MRs: one succeeds, one gets 404.
#[tokio::test]
async fn test_surgical_timeout_during_fetch() {
// Tests mixed MR preflight: one MR found, one returns 404.
// The found MR proceeds; the missing MR is recorded as a failure.
let server = MockServer::start().await;
// MR 10 exists
Mock::given(method("GET"))
.and(path("/api/v4/projects/42/merge_requests/10"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"id": 2010, "iid": 10, "project_id": 42,
"title": "Test MR !10", "description": "desc",
"state": "opened", "draft": false, "work_in_progress": false,
"source_branch": "feat", "target_branch": "main",
"sha": "abc123",
"references": {"short": "!10", "full": "group/repo!10"},
"detailed_merge_status": "mergeable",
"created_at": "2026-02-17T10:00:00.000+00:00",
"updated_at": "2026-02-17T12:00:00.000+00:00",
"merged_at": null, "closed_at": null,
"author": {"id": 2, "username": "bob", "name": "Bob"},
"merge_user": null, "merged_by": null,
"labels": [], "assignees": [], "reviewers": [],
"web_url": "https://gitlab.example.com/group/repo/-/merge_requests/10",
"merge_commit_sha": null, "squash_commit_sha": null
})))
.mount(&server)
.await;
// MR 888 does not exist
Mock::given(method("GET"))
.and(path("/api/v4/projects/42/merge_requests/888"))
.respond_with(ResponseTemplate::new(404).set_body_json(serde_json::json!({
"message": "404 Not Found"
})))
.mount(&server)
.await;
let client = GitLabClient::new(&server.uri(), "test-token", Some(1000.0));
let targets = vec![
SurgicalTarget::MergeRequest { iid: 10 },
SurgicalTarget::MergeRequest { iid: 888 },
];
let result = preflight_fetch(&client, 42, "group/repo", &targets)
.await
.unwrap();
assert_eq!(result.merge_requests.len(), 1, "One MR should succeed");
assert_eq!(result.merge_requests[0].iid, 10);
assert_eq!(result.failures.len(), 1, "One MR should fail");
assert_eq!(result.failures[0].target.iid(), 888);
}
/// Verify that only the surgically ingested entity gets dirty-tracked.
#[tokio::test]
async fn test_surgical_embed_isolation() {
let conn = setup_db();
let config = test_config();
// Pre-seed a second issue that should NOT be dirty-tracked
let existing_issue = make_test_issue(1, "2024-06-01T00:00:00.000+00:00");
ingest_issue_by_iid(&conn, &config, 1, &existing_issue).unwrap();
// Clear any dirty entries from the pre-seed
conn.execute("DELETE FROM dirty_sources", []).unwrap();
// Now surgically ingest issue #42
let new_issue = make_test_issue(42, "2026-02-17T12:00:00.000+00:00");
let result = ingest_issue_by_iid(&conn, &config, 1, &new_issue).unwrap();
assert!(result.upserted);
assert!(!result.skipped_stale);
// Only issue 42 should be dirty-tracked
let dirty = get_dirty_keys(&conn);
assert_eq!(
dirty.len(),
1,
"Only the surgically ingested issue should be dirty"
);
assert_eq!(dirty[0].0, "issue");
// Verify the dirty source points to the correct local issue id
let local_id: i64 = conn
.query_row(
"SELECT id FROM issues WHERE project_id = 1 AND iid = 42",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(dirty[0].1, local_id);
}
/// Verify that ingested data in the DB matches the GitLab payload fields exactly.
#[tokio::test]
async fn test_surgical_payload_integrity() {
let conn = setup_db();
let config = test_config();
let issue = GitLabIssue {
id: 5555,
iid: 77,
project_id: 42,
title: "Payload integrity test".to_string(),
description: Some("Detailed description with **markdown**".to_string()),
state: "closed".to_string(),
created_at: "2025-03-10T08:30:00.000+00:00".to_string(),
updated_at: "2026-01-20T14:45:00.000+00:00".to_string(),
closed_at: Some("2026-01-20T14:45:00.000+00:00".to_string()),
author: GitLabAuthor {
id: 99,
username: "integrity_user".to_string(),
name: "Integrity Tester".to_string(),
},
assignees: vec![GitLabAuthor {
id: 100,
username: "assignee1".to_string(),
name: "Assignee One".to_string(),
}],
labels: vec!["priority::high".to_string(), "type::bug".to_string()],
milestone: None,
due_date: Some("2026-02-01".to_string()),
web_url: "https://gitlab.example.com/group/repo/-/issues/77".to_string(),
};
let result = ingest_issue_by_iid(&conn, &config, 1, &issue).unwrap();
assert!(result.upserted);
// Verify core fields in DB match the payload
let (db_title, db_state, db_description, db_author, db_web_url, db_iid, db_gitlab_id): (
String,
String,
Option<String>,
String,
String,
i64,
i64,
) = conn
.query_row(
"SELECT title, state, description, author_username, web_url, iid, gitlab_id
FROM issues
WHERE project_id = 1 AND iid = 77",
[],
|r| {
Ok((
r.get(0)?,
r.get(1)?,
r.get(2)?,
r.get(3)?,
r.get(4)?,
r.get(5)?,
r.get(6)?,
))
},
)
.unwrap();
assert_eq!(db_title, "Payload integrity test");
assert_eq!(db_state, "closed");
assert_eq!(
db_description.as_deref(),
Some("Detailed description with **markdown**")
);
assert_eq!(db_author, "integrity_user");
assert_eq!(
db_web_url,
"https://gitlab.example.com/group/repo/-/issues/77"
);
assert_eq!(db_iid, 77);
assert_eq!(db_gitlab_id, 5555);
// Verify labels were created and linked
let label_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM issue_labels il
JOIN labels l ON il.label_id = l.id
JOIN issues i ON il.issue_id = i.id
WHERE i.iid = 77 AND i.project_id = 1",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(label_count, 2, "Both labels should be linked");
}

View File

@@ -17,21 +17,22 @@ use lore::cli::commands::{
print_event_count_json, print_file_history, print_file_history_json, print_generate_docs, print_event_count_json, print_file_history, print_file_history_json, print_generate_docs,
print_generate_docs_json, print_ingest_summary, print_ingest_summary_json, print_list_issues, print_generate_docs_json, print_ingest_summary, print_ingest_summary_json, print_list_issues,
print_list_issues_json, print_list_mrs, print_list_mrs_json, print_list_notes, print_list_issues_json, print_list_mrs, print_list_mrs_json, print_list_notes,
print_list_notes_csv, print_list_notes_json, print_list_notes_jsonl, print_search_results, print_list_notes_csv, print_list_notes_json, print_list_notes_jsonl, print_related,
print_search_results_json, print_show_issue, print_show_issue_json, print_show_mr, print_related_json, print_search_results, print_search_results_json, print_show_issue,
print_show_mr_json, print_stats, print_stats_json, print_sync, print_sync_json, print_show_issue_json, print_show_mr, print_show_mr_json, print_stats, print_stats_json,
print_sync_status, print_sync_status_json, print_timeline, print_timeline_json_with_meta, print_sync, print_sync_json, print_sync_status, print_sync_status_json, print_timeline,
print_trace, print_trace_json, print_who_human, print_who_json, query_notes, run_auth_test, print_timeline_json_with_meta, print_trace, print_trace_json, print_who_human, print_who_json,
run_count, run_count_events, run_doctor, run_drift, run_embed, run_file_history, query_notes, run_auth_test, run_count, run_count_events, run_doctor, run_drift, run_embed,
run_generate_docs, run_ingest, run_ingest_dry_run, run_init, run_list_issues, run_list_mrs, run_file_history, run_generate_docs, run_ingest, run_ingest_dry_run, run_init, run_list_issues,
run_search, run_show_issue, run_show_mr, run_stats, run_sync, run_sync_status, run_timeline, run_list_mrs, run_related, run_search, run_show_issue, run_show_mr, run_stats, run_sync,
run_tui, run_who, run_sync_status, run_timeline, run_tui, run_who,
}; };
use lore::cli::render::{ColorMode, GlyphMode, Icons, LoreRenderer, Theme}; use lore::cli::render::{ColorMode, GlyphMode, Icons, LoreRenderer, Theme};
use lore::cli::robot::{RobotMeta, strip_schemas}; use lore::cli::robot::{RobotMeta, strip_schemas};
use lore::cli::{ use lore::cli::{
Cli, Commands, CountArgs, EmbedArgs, FileHistoryArgs, GenerateDocsArgs, IngestArgs, IssuesArgs, Cli, Commands, CountArgs, EmbedArgs, FileHistoryArgs, GenerateDocsArgs, IngestArgs, IssuesArgs,
MrsArgs, NotesArgs, SearchArgs, StatsArgs, SyncArgs, TimelineArgs, TraceArgs, WhoArgs, MrsArgs, NotesArgs, RelatedArgs, SearchArgs, StatsArgs, SyncArgs, TimelineArgs, TraceArgs,
WhoArgs,
}; };
use lore::core::db::{ use lore::core::db::{
LATEST_SCHEMA_VERSION, create_connection, get_schema_version, run_migrations, LATEST_SCHEMA_VERSION, create_connection, get_schema_version, run_migrations,
@@ -204,6 +205,9 @@ async fn main() {
handle_file_history(cli.config.as_deref(), args, robot_mode) handle_file_history(cli.config.as_deref(), args, robot_mode)
} }
Some(Commands::Trace(args)) => handle_trace(cli.config.as_deref(), args, robot_mode), Some(Commands::Trace(args)) => handle_trace(cli.config.as_deref(), args, robot_mode),
Some(Commands::Related(args)) => {
handle_related(cli.config.as_deref(), args, robot_mode).await
}
Some(Commands::Tui(args)) => run_tui(&args, robot_mode), Some(Commands::Tui(args)) => run_tui(&args, robot_mode),
Some(Commands::Drift { Some(Commands::Drift {
entity_type, entity_type,
@@ -732,6 +736,8 @@ fn suggest_similar_command(invalid: &str) -> String {
("drift", "drift"), ("drift", "drift"),
("file-history", "file-history"), ("file-history", "file-history"),
("trace", "trace"), ("trace", "trace"),
("related", "related"),
("similar", "related"),
]; ];
let invalid_lower = invalid.to_lowercase(); let invalid_lower = invalid.to_lowercase();
@@ -2214,6 +2220,14 @@ async fn handle_sync_cmd(
if args.no_status { if args.no_status {
config.sync.fetch_work_item_status = false; config.sync.fetch_work_item_status = false;
} }
// Dedup surgical IIDs
let mut issue_iids = args.issue;
let mut mr_iids = args.mr;
issue_iids.sort_unstable();
issue_iids.dedup();
mr_iids.sort_unstable();
mr_iids.dedup();
let options = SyncOptions { let options = SyncOptions {
full: args.full && !args.no_full, full: args.full && !args.no_full,
force: args.force && !args.no_force, force: args.force && !args.no_force,
@@ -2222,8 +2236,46 @@ async fn handle_sync_cmd(
no_events: args.no_events, no_events: args.no_events,
robot_mode, robot_mode,
dry_run, dry_run,
issue_iids,
mr_iids,
project: args.project,
preflight_only: args.preflight_only,
}; };
// Surgical sync validation
if options.is_surgical() {
let total = options.issue_iids.len() + options.mr_iids.len();
if total > SyncOptions::MAX_SURGICAL_TARGETS {
return Err(format!(
"Too many surgical targets ({total}). Maximum is {}.",
SyncOptions::MAX_SURGICAL_TARGETS
)
.into());
}
if options.full {
return Err("--full is incompatible with surgical sync (--issue / --mr).".into());
}
if options.no_docs && !options.no_embed {
return Err(
"--no-docs without --no-embed in surgical mode would leave stale embeddings. \
Add --no-embed or remove --no-docs."
.into(),
);
}
if config
.effective_project(options.project.as_deref())
.is_none()
{
return Err(
"Surgical sync requires a project. Use -p <project> or set defaultProject in config."
.into(),
);
}
}
if options.preflight_only && !options.is_surgical() {
return Err("--preflight-only requires --issue or --mr.".into());
}
// For dry run, skip recording and just show the preview // For dry run, skip recording and just show the preview
if dry_run { if dry_run {
let signal = ShutdownSignal::new(); let signal = ShutdownSignal::new();
@@ -2231,6 +2283,31 @@ async fn handle_sync_cmd(
return Ok(()); return Ok(());
} }
// Surgical sync manages its own recorder, lock, and signal internally.
// Dispatch early to avoid creating a redundant outer recorder.
if options.is_surgical() {
let signal = ShutdownSignal::new();
let signal_for_handler = signal.clone();
tokio::spawn(async move {
let _ = tokio::signal::ctrl_c().await;
eprintln!("\nInterrupted, finishing current batch... (Ctrl+C again to force quit)");
signal_for_handler.cancel();
let _ = tokio::signal::ctrl_c().await;
std::process::exit(130);
});
let start = std::time::Instant::now();
let result = run_sync(&config, options, None, &signal).await?;
let elapsed = start.elapsed();
if robot_mode {
print_sync_json(&result, elapsed.as_millis() as u64, Some(metrics));
} else {
print_sync(&result, elapsed, Some(metrics), args.timings);
}
return Ok(());
}
let db_path = get_db_path(config.storage.db_path.as_deref()); let db_path = get_db_path(config.storage.db_path.as_deref());
let recorder_conn = create_connection(&db_path)?; let recorder_conn = create_connection(&db_path)?;
let run_id = uuid::Uuid::new_v4().simple().to_string(); let run_id = uuid::Uuid::new_v4().simple().to_string();
@@ -2504,13 +2581,24 @@ fn handle_robot_docs(robot_mode: bool, brief: bool) -> Result<(), Box<dyn std::e
} }
}, },
"sync": { "sync": {
"description": "Full sync pipeline: ingest -> generate-docs -> embed", "description": "Full sync pipeline: ingest -> generate-docs -> embed. Supports surgical per-IID sync with --issue/--mr.",
"flags": ["--full", "--no-full", "--force", "--no-force", "--no-embed", "--no-docs", "--no-events", "--no-file-changes", "--no-status", "--dry-run", "--no-dry-run"], "flags": ["--full", "--no-full", "--force", "--no-force", "--no-embed", "--no-docs", "--no-events", "--no-file-changes", "--no-status", "--dry-run", "--no-dry-run", "--issue <IID>", "--mr <IID>", "-p/--project <path>", "--preflight-only"],
"example": "lore --robot sync", "example": "lore --robot sync",
"notes": {
"surgical_sync": "Pass --issue <IID> and/or --mr <IID> (repeatable) with -p <project> to sync specific entities instead of a full pipeline. Incompatible with --full.",
"preflight_only": "--preflight-only validates that entities exist on GitLab without writing to the DB. Requires --issue or --mr."
},
"response_schema": { "response_schema": {
"bulk": {
"ok": "bool", "ok": "bool",
"data": {"issues_updated": "int", "mrs_updated": "int", "documents_regenerated": "int", "documents_embedded": "int", "resource_events_synced": "int", "resource_events_failed": "int"}, "data": {"issues_updated": "int", "mrs_updated": "int", "documents_regenerated": "int", "documents_embedded": "int", "resource_events_synced": "int", "resource_events_failed": "int"},
"meta": {"elapsed_ms": "int", "stages?": "[{name:string, elapsed_ms:int, items_processed:int}]"} "meta": {"elapsed_ms": "int", "stages?": "[{name:string, elapsed_ms:int, items_processed:int}]"}
},
"surgical": {
"ok": "bool",
"data": {"surgical_mode": "true", "surgical_iids": "{issues:[int], merge_requests:[int]}", "issues_updated": "int", "mrs_updated": "int", "entity_results": "[{entity_type:string, iid:int, outcome:string, error:string?, toctou_reason:string?}]", "preflight_only": "bool?", "documents_regenerated": "int", "documents_embedded": "int"},
"meta": {"elapsed_ms": "int"}
}
} }
}, },
"issues": { "issues": {
@@ -2967,6 +3055,63 @@ async fn handle_drift(
Ok(()) Ok(())
} }
async fn handle_related(
config_override: Option<&str>,
args: RelatedArgs,
robot_mode: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let start = std::time::Instant::now();
let config = Config::load(config_override)?;
// Determine mode: if first arg is a known entity type AND iid is provided, use entity mode.
// Otherwise treat the first arg as free-text query.
let is_entity_type = matches!(
args.query_or_type.as_str(),
"issues" | "issue" | "mrs" | "mr" | "merge-requests"
);
let effective_project = config
.effective_project(args.project.as_deref())
.map(String::from);
let response = if is_entity_type && args.iid.is_some() {
run_related(
&config,
Some(args.query_or_type.as_str()),
args.iid,
None,
effective_project.as_deref(),
args.limit,
)
.await?
} else if is_entity_type && args.iid.is_none() {
return Err(format!(
"Entity type '{}' requires an IID. Usage: lore related {} <IID>",
args.query_or_type, args.query_or_type
)
.into());
} else {
run_related(
&config,
None,
None,
Some(args.query_or_type.as_str()),
effective_project.as_deref(),
args.limit,
)
.await?
};
let elapsed_ms = start.elapsed().as_millis() as u64;
if robot_mode {
print_related_json(&response, elapsed_ms);
} else {
print_related(&response);
}
Ok(())
}
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
async fn handle_list_compat( async fn handle_list_compat(
config_override: Option<&str>, config_override: Option<&str>,