Files
gitlore/src/ingestion/surgical.rs
teernisse 9ec1344945 feat(surgical-sync): add per-IID surgical sync pipeline with preflight validation
Add the ability to sync specific issues or merge requests by IID without
running a full incremental sync. This enables fast, targeted data refresh
for individual entities — useful for agent workflows, debugging, and
real-time investigation of specific issues or MRs.

Architecture:
- New CLI flags: --issue <IID> and --mr <IID> (repeatable, up to 100 total)
  scoped to a single project via -p/--project
- Preflight phase validates all IIDs exist on GitLab before any DB writes,
  with TOCTOU-aware soft verification at ingest time
- 6-stage pipeline: preflight -> fetch -> ingest -> dependents -> docs -> embed
- Each stage is cancellation-aware via ShutdownSignal
- Dedicated SyncRunRecorder extensions track surgical-specific counters
  (issues_fetched, mrs_ingested, docs_regenerated, etc.)

New modules:
- src/ingestion/surgical.rs: Core surgical fetch/ingest/dependent logic
  with preflight_fetch(), ingest_issue_by_iid(), ingest_mr_by_iid(),
  and fetch_dependents_for_{issue,mr}()
- src/cli/commands/sync_surgical.rs: Full CLI orchestrator with progress
  spinners, human/robot output, and cancellation handling
- src/embedding/pipeline.rs: embed_documents_by_ids() for scoped embedding
- src/documents/regenerator.rs: regenerate_dirty_documents_for_sources()
  for scoped document regeneration

Database changes:
- Migration 027: Extends sync_runs with mode, phase, surgical_iids_json,
  per-entity counters, and cancelled_at column
- New indexes: idx_sync_runs_mode_started, idx_sync_runs_status_phase_started

GitLab client:
- get_issue_by_iid() and get_mr_by_iid() single-entity fetch methods

Error handling:
- New SurgicalPreflightFailed error variant with entity_type, iid, project,
  and reason fields. Shares exit code 6 with GitLabNotFound.

Includes comprehensive test coverage:
- 645 lines of surgical ingestion tests (wiremock-based)
- 184 lines of scoped embedding tests
- 85 lines of scoped regeneration tests
- 113 lines of GitLab client single-entity tests
- 236 lines of sync_run surgical column/counter tests
- Unit tests for SyncOptions, error codes, and CLI validation
2026-02-18 16:28:21 -05:00

470 lines
14 KiB
Rust

use futures::stream::StreamExt;
use rusqlite::Connection;
use rusqlite::OptionalExtension;
use tracing::{debug, warn};
use crate::Config;
use crate::core::error::{LoreError, Result};
use crate::documents::SourceType;
use crate::gitlab::GitLabClient;
use crate::gitlab::types::{GitLabIssue, GitLabMergeRequest};
use crate::ingestion::dirty_tracker;
use crate::ingestion::discussions::ingest_issue_discussions;
use crate::ingestion::issues::{IssueForDiscussionSync, process_single_issue};
use crate::ingestion::merge_requests::{MrForDiscussionSync, process_single_mr};
use crate::ingestion::mr_diffs::upsert_mr_file_changes;
use crate::ingestion::mr_discussions::ingest_mr_discussions;
use crate::ingestion::orchestrator::{store_closes_issues_refs, store_resource_events};
// ---------------------------------------------------------------------------
// Result types
// ---------------------------------------------------------------------------
#[derive(Debug)]
pub(crate) struct IngestIssueResult {
pub skipped_stale: bool,
pub dirty_source_keys: Vec<(SourceType, i64)>,
}
#[derive(Debug)]
pub(crate) struct IngestMrResult {
pub skipped_stale: bool,
pub dirty_source_keys: Vec<(SourceType, i64)>,
}
#[derive(Debug)]
pub(crate) struct PreflightResult {
pub issues: Vec<GitLabIssue>,
pub merge_requests: Vec<GitLabMergeRequest>,
pub failures: Vec<PreflightFailure>,
}
#[derive(Debug)]
pub(crate) struct PreflightFailure {
pub entity_type: String,
pub iid: i64,
pub error: LoreError,
}
// ---------------------------------------------------------------------------
// TOCTOU guard
// ---------------------------------------------------------------------------
/// Returns `true` if the payload is stale (same age or older than what the DB
/// already has). Returns `false` when the entity is new (no DB row) or when
/// the payload is strictly newer.
pub(crate) fn is_stale(payload_updated_at: &str, db_updated_at_ms: Option<i64>) -> Result<bool> {
let Some(db_ms) = db_updated_at_ms else {
return Ok(false);
};
let payload_ms = chrono::DateTime::parse_from_rfc3339(payload_updated_at)
.map(|dt| dt.timestamp_millis())
.map_err(|e| {
LoreError::Other(format!(
"Failed to parse timestamp '{}': {}",
payload_updated_at, e
))
})?;
Ok(payload_ms <= db_ms)
}
// ---------------------------------------------------------------------------
// Ingestion wrappers
// ---------------------------------------------------------------------------
/// Ingest a single issue by IID with TOCTOU guard and dirty marking.
pub(crate) fn ingest_issue_by_iid(
conn: &Connection,
config: &Config,
project_id: i64,
issue: &GitLabIssue,
) -> Result<IngestIssueResult> {
let db_updated_at = get_db_updated_at(conn, "issues", issue.iid, project_id)?;
if is_stale(&issue.updated_at, db_updated_at)? {
debug!(iid = issue.iid, "Skipping stale issue (TOCTOU guard)");
return Ok(IngestIssueResult {
skipped_stale: true,
dirty_source_keys: vec![],
});
}
process_single_issue(conn, config, project_id, issue)?;
let local_id: i64 = conn.query_row(
"SELECT id FROM issues WHERE project_id = ? AND iid = ?",
(project_id, issue.iid),
|row| row.get(0),
)?;
dirty_tracker::mark_dirty(conn, SourceType::Issue, local_id)?;
Ok(IngestIssueResult {
skipped_stale: false,
dirty_source_keys: vec![(SourceType::Issue, local_id)],
})
}
/// Ingest a single merge request by IID with TOCTOU guard and dirty marking.
pub(crate) fn ingest_mr_by_iid(
conn: &Connection,
config: &Config,
project_id: i64,
mr: &GitLabMergeRequest,
) -> Result<IngestMrResult> {
let db_updated_at = get_db_updated_at(conn, "merge_requests", mr.iid, project_id)?;
if is_stale(&mr.updated_at, db_updated_at)? {
debug!(iid = mr.iid, "Skipping stale MR (TOCTOU guard)");
return Ok(IngestMrResult {
skipped_stale: true,
dirty_source_keys: vec![],
});
}
process_single_mr(conn, config, project_id, mr)?;
let local_id: i64 = conn.query_row(
"SELECT id FROM merge_requests WHERE project_id = ? AND iid = ?",
(project_id, mr.iid),
|row| row.get(0),
)?;
dirty_tracker::mark_dirty(conn, SourceType::MergeRequest, local_id)?;
Ok(IngestMrResult {
skipped_stale: false,
dirty_source_keys: vec![(SourceType::MergeRequest, local_id)],
})
}
// ---------------------------------------------------------------------------
// Preflight fetch
// ---------------------------------------------------------------------------
/// Fetch specific issues and MRs by IID from GitLab. Collects successes and
/// failures without aborting on individual 404s.
///
/// Requests are dispatched concurrently (up to 10 in-flight at once) to avoid
/// sequential round-trip latency when syncing many IIDs.
pub(crate) async fn preflight_fetch(
client: &GitLabClient,
gitlab_project_id: i64,
targets: &[(String, i64)],
) -> PreflightResult {
/// Max concurrent HTTP requests during preflight.
const PREFLIGHT_CONCURRENCY: usize = 10;
#[allow(clippy::large_enum_variant)]
enum FetchOutcome {
Issue(std::result::Result<GitLabIssue, (String, i64, LoreError)>),
MergeRequest(std::result::Result<GitLabMergeRequest, (String, i64, LoreError)>),
UnknownType(String, i64),
}
let mut result = PreflightResult {
issues: Vec::new(),
merge_requests: Vec::new(),
failures: Vec::new(),
};
let mut stream = futures::stream::iter(targets.iter().map(|(entity_type, iid)| {
let entity_type = entity_type.clone();
let iid = *iid;
async move {
match entity_type.as_str() {
"issue" => FetchOutcome::Issue(
client
.get_issue_by_iid(gitlab_project_id, iid)
.await
.map_err(|e| (entity_type, iid, e)),
),
"merge_request" => FetchOutcome::MergeRequest(
client
.get_mr_by_iid(gitlab_project_id, iid)
.await
.map_err(|e| (entity_type, iid, e)),
),
_ => FetchOutcome::UnknownType(entity_type, iid),
}
}
}))
.buffer_unordered(PREFLIGHT_CONCURRENCY);
while let Some(outcome) = stream.next().await {
match outcome {
FetchOutcome::Issue(Ok(issue)) => result.issues.push(issue),
FetchOutcome::Issue(Err((et, iid, e))) => {
result.failures.push(PreflightFailure {
entity_type: et,
iid,
error: e,
});
}
FetchOutcome::MergeRequest(Ok(mr)) => result.merge_requests.push(mr),
FetchOutcome::MergeRequest(Err((et, iid, e))) => {
result.failures.push(PreflightFailure {
entity_type: et,
iid,
error: e,
});
}
FetchOutcome::UnknownType(et, iid) => {
result.failures.push(PreflightFailure {
entity_type: et.clone(),
iid,
error: LoreError::Other(format!("Unknown entity type: {et}")),
});
}
}
}
result
}
// ---------------------------------------------------------------------------
// Dependent fetch helpers (surgical mode)
// ---------------------------------------------------------------------------
/// Counts returned from fetching dependents for a single entity.
#[derive(Debug, Default)]
pub(crate) struct DependentFetchResult {
pub resource_events_fetched: usize,
pub discussions_fetched: usize,
pub closes_issues_stored: usize,
pub file_changes_stored: usize,
}
/// Fetch and store all dependents for a single issue:
/// resource events (state, label, milestone) and discussions.
pub(crate) async fn fetch_dependents_for_issue(
client: &GitLabClient,
conn: &Connection,
project_id: i64,
gitlab_project_id: i64,
iid: i64,
local_id: i64,
config: &Config,
) -> Result<DependentFetchResult> {
let mut result = DependentFetchResult::default();
// --- Resource events ---
match client
.fetch_all_resource_events(gitlab_project_id, "issue", iid)
.await
{
Ok((state_events, label_events, milestone_events)) => {
let count = state_events.len() + label_events.len() + milestone_events.len();
let tx = conn.unchecked_transaction()?;
store_resource_events(
&tx,
project_id,
"issue",
local_id,
&state_events,
&label_events,
&milestone_events,
)?;
tx.execute(
"UPDATE issues SET resource_events_synced_for_updated_at = updated_at WHERE id = ?",
[local_id],
)?;
tx.commit()?;
result.resource_events_fetched = count;
}
Err(e) => {
warn!(
iid,
error = %e,
"Failed to fetch resource events for issue, continuing"
);
}
}
// --- Discussions ---
let sync_item = IssueForDiscussionSync {
local_issue_id: local_id,
iid,
updated_at: 0, // not used for filtering in surgical mode
};
match ingest_issue_discussions(
conn,
client,
config,
gitlab_project_id,
project_id,
&[sync_item],
)
.await
{
Ok(disc_result) => {
result.discussions_fetched = disc_result.discussions_fetched;
}
Err(e) => {
warn!(
iid,
error = %e,
"Failed to ingest discussions for issue, continuing"
);
}
}
Ok(result)
}
/// Fetch and store all dependents for a single merge request:
/// resource events, discussions, closes-issues references, and file changes (diffs).
pub(crate) async fn fetch_dependents_for_mr(
client: &GitLabClient,
conn: &Connection,
project_id: i64,
gitlab_project_id: i64,
iid: i64,
local_id: i64,
config: &Config,
) -> Result<DependentFetchResult> {
let mut result = DependentFetchResult::default();
// --- Resource events ---
match client
.fetch_all_resource_events(gitlab_project_id, "merge_request", iid)
.await
{
Ok((state_events, label_events, milestone_events)) => {
let count = state_events.len() + label_events.len() + milestone_events.len();
let tx = conn.unchecked_transaction()?;
store_resource_events(
&tx,
project_id,
"merge_request",
local_id,
&state_events,
&label_events,
&milestone_events,
)?;
tx.execute(
"UPDATE merge_requests SET resource_events_synced_for_updated_at = updated_at WHERE id = ?",
[local_id],
)?;
tx.commit()?;
result.resource_events_fetched = count;
}
Err(e) => {
warn!(
iid,
error = %e,
"Failed to fetch resource events for MR, continuing"
);
}
}
// --- Discussions ---
let sync_item = MrForDiscussionSync {
local_mr_id: local_id,
iid,
updated_at: 0,
};
match ingest_mr_discussions(
conn,
client,
config,
gitlab_project_id,
project_id,
&[sync_item],
)
.await
{
Ok(disc_result) => {
result.discussions_fetched = disc_result.discussions_fetched;
}
Err(e) => {
warn!(
iid,
error = %e,
"Failed to ingest discussions for MR, continuing"
);
}
}
// --- Closes issues ---
match client.fetch_mr_closes_issues(gitlab_project_id, iid).await {
Ok(closes_issues) => {
let count = closes_issues.len();
let tx = conn.unchecked_transaction()?;
store_closes_issues_refs(&tx, project_id, local_id, &closes_issues)?;
tx.execute(
"UPDATE merge_requests SET closes_issues_synced_for_updated_at = updated_at WHERE id = ?",
[local_id],
)?;
tx.commit()?;
result.closes_issues_stored = count;
}
Err(e) => {
warn!(
iid,
error = %e,
"Failed to fetch closes_issues for MR, continuing"
);
}
}
// --- File changes (diffs) ---
match client.fetch_mr_diffs(gitlab_project_id, iid).await {
Ok(diffs) => {
let tx = conn.unchecked_transaction()?;
let stored = upsert_mr_file_changes(&tx, local_id, project_id, &diffs)?;
tx.execute(
"UPDATE merge_requests SET diffs_synced_for_updated_at = updated_at WHERE id = ?",
[local_id],
)?;
tx.commit()?;
result.file_changes_stored = stored;
}
Err(e) => {
warn!(
iid,
error = %e,
"Failed to fetch diffs for MR, continuing"
);
}
}
Ok(result)
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
fn get_db_updated_at(
conn: &Connection,
table: &str,
iid: i64,
project_id: i64,
) -> Result<Option<i64>> {
// Using a match on known table names avoids SQL injection from the table parameter.
let sql = match table {
"issues" => "SELECT updated_at FROM issues WHERE project_id = ?1 AND iid = ?2",
"merge_requests" => {
"SELECT updated_at FROM merge_requests WHERE project_id = ?1 AND iid = ?2"
}
_ => {
return Err(LoreError::Other(format!(
"Unknown table for updated_at lookup: {table}"
)));
}
};
let result: Option<i64> = conn
.query_row(sql, (project_id, iid), |row| row.get(0))
.optional()?;
Ok(result)
}
#[cfg(test)]
#[path = "surgical_tests.rs"]
mod tests;