feat(surgical-sync): add per-IID surgical sync pipeline with preflight validation

Add the ability to sync specific issues or merge requests by IID without
running a full incremental sync. This enables fast, targeted data refresh
for individual entities — useful for agent workflows, debugging, and
real-time investigation of specific issues or MRs.

Architecture:
- New CLI flags: --issue <IID> and --mr <IID> (repeatable, up to 100 total)
  scoped to a single project via -p/--project
- Preflight phase validates all IIDs exist on GitLab before any DB writes,
  with TOCTOU-aware soft verification at ingest time
- 6-stage pipeline: preflight -> fetch -> ingest -> dependents -> docs -> embed
- Each stage is cancellation-aware via ShutdownSignal
- Dedicated SyncRunRecorder extensions track surgical-specific counters
  (issues_fetched, mrs_ingested, docs_regenerated, etc.)

New modules:
- src/ingestion/surgical.rs: Core surgical fetch/ingest/dependent logic
  with preflight_fetch(), ingest_issue_by_iid(), ingest_mr_by_iid(),
  and fetch_dependents_for_{issue,mr}()
- src/cli/commands/sync_surgical.rs: Full CLI orchestrator with progress
  spinners, human/robot output, and cancellation handling
- src/embedding/pipeline.rs: embed_documents_by_ids() for scoped embedding
- src/documents/regenerator.rs: regenerate_dirty_documents_for_sources()
  for scoped document regeneration

Database changes:
- Migration 027: Extends sync_runs with mode, phase, surgical_iids_json,
  per-entity counters, and cancelled_at column
- New indexes: idx_sync_runs_mode_started, idx_sync_runs_status_phase_started

GitLab client:
- get_issue_by_iid() and get_mr_by_iid() single-entity fetch methods

Error handling:
- New SurgicalPreflightFailed error variant with entity_type, iid, project,
  and reason fields. Shares exit code 6 with GitLabNotFound.

Includes comprehensive test coverage:
- 645 lines of surgical ingestion tests (wiremock-based)
- 184 lines of scoped embedding tests
- 85 lines of scoped regeneration tests
- 113 lines of GitLab client single-entity tests
- 236 lines of sync_run surgical column/counter tests
- Unit tests for SyncOptions, error codes, and CLI validation
This commit is contained in:
teernisse
2026-02-18 16:27:59 -05:00
parent ea6e45e43f
commit 9ec1344945
25 changed files with 3354 additions and 37 deletions

View File

@@ -7,5 +7,5 @@ pub mod similarity;
pub use change_detector::{PendingDocument, count_pending_documents, find_pending_documents};
pub use chunking::{CHUNK_MAX_BYTES, CHUNK_OVERLAP_CHARS, split_into_chunks};
pub use pipeline::{EmbedResult, embed_documents};
pub use pipeline::{EmbedForIdsResult, EmbedResult, embed_documents, embed_documents_by_ids};
pub use similarity::cosine_similarity;

View File

@@ -578,3 +578,207 @@ fn sha256_hash(input: &str) -> String {
hasher.update(input.as_bytes());
format!("{:x}", hasher.finalize())
}
#[derive(Debug, Default)]
pub struct EmbedForIdsResult {
pub chunks_embedded: usize,
pub docs_embedded: usize,
pub failed: usize,
pub skipped: usize,
}
/// Embed only the documents with the given IDs, skipping any that are
/// already embedded with matching config (model, dims, chunk size, hash).
pub async fn embed_documents_by_ids(
conn: &Connection,
client: &OllamaClient,
model_name: &str,
concurrency: usize,
document_ids: &[i64],
signal: &ShutdownSignal,
) -> Result<EmbedForIdsResult> {
let mut result = EmbedForIdsResult::default();
if document_ids.is_empty() {
return Ok(result);
}
if signal.is_cancelled() {
return Ok(result);
}
// Load documents for the specified IDs, filtering out already-embedded
let pending = find_documents_by_ids(conn, document_ids, model_name)?;
if pending.is_empty() {
result.skipped = document_ids.len();
return Ok(result);
}
let skipped_count = document_ids.len() - pending.len();
result.skipped = skipped_count;
info!(
requested = document_ids.len(),
pending = pending.len(),
skipped = skipped_count,
"Scoped embedding: processing documents by ID"
);
// Use the same SAVEPOINT + embed_page pattern as the main pipeline
let mut last_id: i64 = 0;
let mut processed: usize = 0;
let total = pending.len();
let mut page_stats = EmbedResult::default();
conn.execute_batch("SAVEPOINT embed_by_ids")?;
let page_result = embed_page(
conn,
client,
model_name,
concurrency,
&pending,
&mut page_stats,
&mut last_id,
&mut processed,
total,
&None,
signal,
)
.await;
match page_result {
Ok(()) if signal.is_cancelled() => {
let _ = conn.execute_batch("ROLLBACK TO embed_by_ids; RELEASE embed_by_ids");
info!("Rolled back scoped embed page due to cancellation");
}
Ok(()) => {
conn.execute_batch("RELEASE embed_by_ids")?;
// Count actual results from DB
let (chunks, docs) = count_embedded_results(conn, &pending)?;
result.chunks_embedded = chunks;
result.docs_embedded = docs;
result.failed = page_stats.failed;
}
Err(e) => {
let _ = conn.execute_batch("ROLLBACK TO embed_by_ids; RELEASE embed_by_ids");
return Err(e);
}
}
info!(
chunks_embedded = result.chunks_embedded,
docs_embedded = result.docs_embedded,
failed = result.failed,
skipped = result.skipped,
"Scoped embedding complete"
);
Ok(result)
}
/// Load documents by specific IDs, filtering out those already embedded
/// with matching config (same logic as `find_pending_documents` but scoped).
fn find_documents_by_ids(
conn: &Connection,
document_ids: &[i64],
model_name: &str,
) -> Result<Vec<crate::embedding::change_detector::PendingDocument>> {
use crate::embedding::chunking::{CHUNK_MAX_BYTES, EXPECTED_DIMS};
if document_ids.is_empty() {
return Ok(Vec::new());
}
// Build IN clause with placeholders
let placeholders: Vec<String> = (0..document_ids.len())
.map(|i| format!("?{}", i + 1))
.collect();
let in_clause = placeholders.join(", ");
let sql = format!(
r#"
SELECT d.id, d.content_text, d.content_hash
FROM documents d
LEFT JOIN embedding_metadata em
ON em.document_id = d.id AND em.chunk_index = 0
WHERE d.id IN ({in_clause})
AND (
em.document_id IS NULL
OR em.document_hash != d.content_hash
OR em.chunk_max_bytes IS NULL
OR em.chunk_max_bytes != ?{chunk_bytes_idx}
OR em.model != ?{model_idx}
OR em.dims != ?{dims_idx}
)
ORDER BY d.id
"#,
in_clause = in_clause,
chunk_bytes_idx = document_ids.len() + 1,
model_idx = document_ids.len() + 2,
dims_idx = document_ids.len() + 3,
);
let mut stmt = conn.prepare(&sql)?;
// Build params: document_ids... then chunk_max_bytes, model, dims
let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
for id in document_ids {
params.push(Box::new(*id));
}
params.push(Box::new(CHUNK_MAX_BYTES as i64));
params.push(Box::new(model_name.to_string()));
params.push(Box::new(EXPECTED_DIMS as i64));
let param_refs: Vec<&dyn rusqlite::types::ToSql> = params.iter().map(|p| p.as_ref()).collect();
let rows = stmt
.query_map(param_refs.as_slice(), |row| {
Ok(crate::embedding::change_detector::PendingDocument {
document_id: row.get(0)?,
content_text: row.get(1)?,
content_hash: row.get(2)?,
})
})?
.collect::<std::result::Result<Vec<_>, _>>()?;
Ok(rows)
}
/// Count how many chunks and complete docs were embedded for the given pending docs.
fn count_embedded_results(
conn: &Connection,
pending: &[crate::embedding::change_detector::PendingDocument],
) -> Result<(usize, usize)> {
let mut total_chunks: usize = 0;
let mut total_docs: usize = 0;
for doc in pending {
let chunk_count: i64 = conn.query_row(
"SELECT COUNT(*) FROM embedding_metadata WHERE document_id = ?1 AND last_error IS NULL",
[doc.document_id],
|row| row.get(0),
)?;
if chunk_count > 0 {
total_chunks += chunk_count as usize;
// Check if all expected chunks are present (chunk_count metadata on chunk_index=0)
let expected: Option<i64> = conn.query_row(
"SELECT chunk_count FROM embedding_metadata WHERE document_id = ?1 AND chunk_index = 0",
[doc.document_id],
|row| row.get(0),
)?;
if let Some(expected_count) = expected
&& chunk_count >= expected_count
{
total_docs += 1;
}
}
}
Ok((total_chunks, total_docs))
}
#[cfg(test)]
#[path = "pipeline_tests.rs"]
mod tests;

View File

@@ -0,0 +1,184 @@
use std::path::Path;
use rusqlite::Connection;
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
use crate::core::db::{create_connection, run_migrations};
use crate::core::shutdown::ShutdownSignal;
use crate::embedding::chunking::EXPECTED_DIMS;
use crate::embedding::ollama::{OllamaClient, OllamaConfig};
use crate::embedding::pipeline::embed_documents_by_ids;
const MODEL: &str = "nomic-embed-text";
fn setup_db() -> Connection {
let conn = create_connection(Path::new(":memory:")).unwrap();
run_migrations(&conn).unwrap();
conn
}
fn insert_test_project(conn: &Connection) -> i64 {
conn.execute(
"INSERT INTO projects (gitlab_project_id, path_with_namespace, web_url)
VALUES (1, 'group/test', 'https://gitlab.example.com/group/test')",
[],
)
.unwrap();
conn.last_insert_rowid()
}
fn insert_test_document(
conn: &Connection,
project_id: i64,
source_id: i64,
content: &str,
hash: &str,
) -> i64 {
conn.execute(
"INSERT INTO documents (source_type, source_id, project_id, content_text, content_hash)
VALUES ('issue', ?1, ?2, ?3, ?4)",
rusqlite::params![source_id, project_id, content, hash],
)
.unwrap();
conn.last_insert_rowid()
}
fn make_fake_embedding() -> Vec<f32> {
vec![0.1_f32; EXPECTED_DIMS]
}
fn make_ollama_response(count: usize) -> serde_json::Value {
let embedding = make_fake_embedding();
let embeddings: Vec<_> = (0..count).map(|_| embedding.clone()).collect();
serde_json::json!({
"model": MODEL,
"embeddings": embeddings
})
}
fn count_embeddings_for_doc(conn: &Connection, doc_id: i64) -> i64 {
conn.query_row(
"SELECT COUNT(*) FROM embedding_metadata WHERE document_id = ?1",
[doc_id],
|row| row.get(0),
)
.unwrap()
}
fn make_client(base_url: &str) -> OllamaClient {
OllamaClient::new(OllamaConfig {
base_url: base_url.to_string(),
model: MODEL.to_string(),
timeout_secs: 10,
})
}
#[tokio::test]
async fn test_embed_by_ids_only_embeds_specified_docs() {
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/api/embed"))
.respond_with(ResponseTemplate::new(200).set_body_json(make_ollama_response(1)))
.mount(&mock_server)
.await;
let conn = setup_db();
let proj_id = insert_test_project(&conn);
let doc1 = insert_test_document(&conn, proj_id, 1, "Hello world content for doc 1", "hash_a");
let doc2 = insert_test_document(&conn, proj_id, 2, "Hello world content for doc 2", "hash_b");
let signal = ShutdownSignal::new();
let client = make_client(&mock_server.uri());
// Only embed doc1
let result = embed_documents_by_ids(&conn, &client, MODEL, 1, &[doc1], &signal)
.await
.unwrap();
assert_eq!(result.docs_embedded, 1, "Should embed exactly 1 doc");
assert!(result.chunks_embedded > 0, "Should have embedded chunks");
// doc1 should have embeddings
assert!(
count_embeddings_for_doc(&conn, doc1) > 0,
"doc1 should have embeddings"
);
// doc2 should have NO embeddings
assert_eq!(
count_embeddings_for_doc(&conn, doc2),
0,
"doc2 should have no embeddings"
);
}
#[tokio::test]
async fn test_embed_by_ids_skips_already_embedded() {
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/api/embed"))
.respond_with(ResponseTemplate::new(200).set_body_json(make_ollama_response(1)))
.expect(1) // Should only be called once
.mount(&mock_server)
.await;
let conn = setup_db();
let proj_id = insert_test_project(&conn);
let doc1 = insert_test_document(&conn, proj_id, 1, "Hello world content for doc 1", "hash_a");
let signal = ShutdownSignal::new();
let client = make_client(&mock_server.uri());
// First embed
let result1 = embed_documents_by_ids(&conn, &client, MODEL, 1, &[doc1], &signal)
.await
.unwrap();
assert_eq!(result1.docs_embedded, 1);
// Second embed with same doc — should skip
let result2 = embed_documents_by_ids(&conn, &client, MODEL, 1, &[doc1], &signal)
.await
.unwrap();
assert_eq!(result2.docs_embedded, 0, "Should embed 0 on second call");
assert_eq!(result2.skipped, 1, "Should report 1 skipped");
assert_eq!(result2.chunks_embedded, 0, "No new chunks");
}
#[tokio::test]
async fn test_embed_by_ids_empty_input() {
let conn = setup_db();
let signal = ShutdownSignal::new();
// Client URL doesn't matter — should never be called
let client = make_client("http://localhost:99999");
let result = embed_documents_by_ids(&conn, &client, MODEL, 1, &[], &signal)
.await
.unwrap();
assert_eq!(result.docs_embedded, 0);
assert_eq!(result.chunks_embedded, 0);
assert_eq!(result.failed, 0);
assert_eq!(result.skipped, 0);
}
#[tokio::test]
async fn test_embed_by_ids_respects_cancellation() {
let conn = setup_db();
let proj_id = insert_test_project(&conn);
let doc1 = insert_test_document(&conn, proj_id, 1, "Hello world content for doc 1", "hash_a");
let signal = ShutdownSignal::new();
signal.cancel(); // Pre-cancel
let client = make_client("http://localhost:99999");
let result = embed_documents_by_ids(&conn, &client, MODEL, 1, &[doc1], &signal)
.await
.unwrap();
assert_eq!(result.docs_embedded, 0, "Should embed 0 when cancelled");
assert_eq!(result.chunks_embedded, 0, "No chunks when cancelled");
}