Adds file-history command showing which MRs touched a file, with:
- Rename chain resolution via BFS (resolve_rename_chain from bd-1yx)
- DiffNote discussion snippets with --discussions flag
- --merged filter, --no-follow-renames, -n limit
- Human output with styled MR list and rename chain display
- Robot JSON output with {ok, data, meta} envelope
- Autocorrect registry and robot-docs manifest entry
- Fixes pre-existing --no-status missing from sync autocorrect registry
108 KiB
plan, title, status, iteration, target_iterations, beads_revision, related_plans, created, updated
| plan | title | status | iteration | target_iterations | beads_revision | related_plans | created | updated |
|---|---|---|---|---|---|---|---|---|
| true | iterating | 6 | 8 | 0 | 2026-02-16 | 2026-02-17 |
Surgical Per-IID Sync
Context
Agents working on active issues/MRs need to refresh data for specific entities without a full sync. Currently lore sync always paginates through ALL issues/MRs from the cursor forward. This adds --issue and --mr flags to sync that accept lists of IIDs, fetching only those entities from GitLab and running the full pipeline (ingest → discussions → events → generate-docs → embed) scoped to just those items. Status enrichment is skipped in surgical mode.
Design Constraints
- Sync locking: The surgical path acquires
AppLock("sync")but ONLY for mutation phases. The preflight (network-only) phase runs WITHOUT the lock to minimize contention with concurrent normal syncs. - Dirty queue scoping:
dirty_sourcesis keyed by(source_type, source_id)with UPSERT semantics (no autoincrementidcolumn). Surgical scoping MUST use explicit touched source keys collected during ingest — eachingest_*_by_iid_from_payloadreturns the(source_type, source_id)pairs it touched. Surgical docs MUST call a scoped API (run_generate_docs_for_sources) filtering by these exact keys, and MUST NOT drain the global dirty queue. - Embed scoping: Embedding MUST be explicitly scoped to documents regenerated by this surgical run.
run_generate_docs_for_sourcesreturns regenerateddocument_idsviaGenerateDocsResult.regenerated_document_ids: Vec<i64>; surgical mode callsrun_embed_for_document_ids(document_ids)and never globalrun_embed. This remains correct even after lock release and under concurrent normal sync activity. Guard condition: embed stage MUST NOT run when--no-docsis used in surgical mode, as it would embed unrelated backlog docs. Validation:--no-docswithout--no-embedis rejected in surgical mode. - Surgical dependent execution: Surgical mode MUST bypass
pending_dependent_fetches. Dependents (resource_events, mr_closes_issues, mr_diffs) run inline for targeted entities only via direct per-entity helper functions. The globalpending_dependent_fetchesqueue remains exclusively for normal sync. This eliminates queue-scoping complexity, orphaned job cleanup, and schema migration to the shared queue table. - MR dependent stages: Normal MR ingest runs closes_issues and diffs stages. The surgical path must also run these for MR entities.
- Primary-entity atomicity: All requested issue/MR payload fetches complete before the first content write. If any primary IID fetch fails (404, network error), primary ingest does zero content writes. Preflight aggregates all failures (does not fail-fast on the first error) so agents get a complete error report in one pass. Dependent stages (discussions, resource events, MR closes_issues, MR diffs) are post-ingest and best-effort — individual dependent stage failures are recorded per-entity but do not roll back the primary ingest.
- Control-plane exception:
sync_runswrites are allowed during preflight for observability and crash diagnostics. These are control-plane rows, not content data, and do not affect query results. - Dry-run is zero-write: The dry-run check MUST precede lock acquisition and DB connection. Dry-run produces zero side effects — no lock acquired, no DB connection opened, no network calls.
- defaultProject fallback: When
-pis omitted, fall back toconfig.default_projectbefore erroring. - Durable run state: Surgical sync MUST use
SyncRunRecorderend-to-end (no ad-hoc SQL updates tosync_runs). ExtendSyncRunRecorderwith APIs for surgical mode:start_surgical(...),set_phase(&str),set_counters(SurgicalCounters),finish_succeeded(),finish_succeeded_with_warnings(warnings_count),finish_failed(error),finish_cancelled(), and periodicheartbeat_if_due(). Phase transitions (preflight,ingest,dependents,docs,embed,done,failed,cancelled) enable crash recovery and observability. - Lock window minimization: Preflight fetch runs WITHOUT the sync lock. Lock is acquired immediately before the first DB mutation (Stage 1) and held through ingest, dependents, and docs stages. Lock is RELEASED before the embed stage. Embed is naturally idempotent (processes only unembedded docs) and does not require the sync lock.
- Preflight-only mode:
--preflight-onlyperforms zero content writes; control-plane run-ledger writes are allowed. Distinct from--dry-run(which is zero-network). Allows agents to verify IIDs are valid before committing to a full surgical run. - Stale-write protection (TOCTOU): Because preflight runs WITHOUT the sync lock, a concurrent normal sync may update the same entity between preflight fetch and surgical ingest. Surgical ingest MUST NOT overwrite fresher local rows. Skip stale when: (a)
local.updated_at > payload.updated_at, OR (b)local.updated_at == payload.updated_at AND local.last_seen_at > preflight_started_at_ms. This prevents equal-timestamp regressions under concurrent sync — thelast_seen_atcolumn acts as a monotonic tie-breaker whenupdated_atis identical. - Surgical failure hygiene: Surgical mode leaves no queue artifacts because it does not enqueue jobs into
pending_dependent_fetches. Dependent stages execute inline and report failures per-entity inSurgicalIngestResult.entity_failures. No orphaned job cleanup is needed. - Cancellation semantics: If shutdown is observed after run start, the recorder finalizes with
finish_cancelled(): phase is set tocancelled, status iscancelled,finished_atis written, and the lock is released before return. No silentrunningrows are left behind. - Per-entity timeout: Each dependent network fetch (discussions, resource events, MR dependents) is wrapped in
tokio::time::timeoutwith a configurable per-entity budget. Timed-out entities are recorded inentity_failureswith codeTIMEOUTand the run continues best-effort. Config knob:sync.surgical_entity_timeout_seconds(default 30). - Payload integrity: Preflight validates that each returned payload's
project_idmatches the requestedgitlab_project_id. On mismatch, the entity is recorded asEntityFailure { code: "PROJECT_MISMATCH", stage: "fetch" }and excluded from ingest. This catches API proxy misconfigurations that could silently corrupt data.
CLI Interface
lore sync --issue 123 --issue 456 -p myproject
lore sync --mr 789 --mr 101 -p myproject
lore sync --issue 123 --mr 789 -p myproject
lore --robot sync --issue 123 -p myproject
# -p is optional if config.defaultProject is set
lore sync --issue 123
# dry-run shows what would be fetched without writes or network calls
lore sync --dry-run --issue 123 -p myproject
# preflight-only validates entities exist on GitLab without any DB writes
lore sync --preflight-only --issue 123 -p myproject
Step 1: TDD — Write Failing Tests First
1a. Test helper: test_config() (src/ingestion/surgical_tests.rs)
Config has no Default impl (fields like gitlab.base_url are required). All surgical tests need a minimal config helper:
fn test_config() -> crate::Config {
serde_json::from_value(serde_json::json!({
"gitlab": {
"baseUrl": "https://gitlab.example.com",
"projects": ["group/project"]
},
"storage": {}
})).unwrap()
}
1b. Test: GitLabClient::get_issue_by_iid (src/gitlab/client.rs)
Add to existing #[cfg(test)] mod tests at line 766:
#[tokio::test]
async fn get_issue_by_iid_returns_issue() {
use wiremock::{MockServer, Mock, ResponseTemplate};
use wiremock::matchers::{method, path};
let mock_server = MockServer::start().await;
let issue_json = serde_json::json!({
"id": 42,
"iid": 7,
"project_id": 1,
"title": "Test issue",
"description": "desc",
"state": "opened",
"created_at": "2024-01-15T10:00:00.000Z",
"updated_at": "2024-01-16T10:00:00.000Z",
"web_url": "https://gitlab.example.com/issues/7",
"author": {"id": 1, "username": "testuser", "name": "Test User"},
"assignees": [],
"labels": [],
"milestone": null,
"due_date": null,
"references": {"short": "#7", "full": "group/project#7"}
});
Mock::given(method("GET"))
.and(path("/api/v4/projects/1/issues/7"))
.respond_with(ResponseTemplate::new(200).set_body_json(&issue_json))
.mount(&mock_server)
.await;
let client = GitLabClient::new(&mock_server.uri(), "test-token", None);
let issue = client.get_issue_by_iid(1, 7).await.unwrap();
assert_eq!(issue.iid, 7);
assert_eq!(issue.title, "Test issue");
}
#[tokio::test]
async fn get_issue_by_iid_returns_not_found() {
use wiremock::{MockServer, Mock, ResponseTemplate};
use wiremock::matchers::{method, path};
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/v4/projects/1/issues/999"))
.respond_with(ResponseTemplate::new(404).set_body_json(
serde_json::json!({"message": "404 Not found"})
))
.mount(&mock_server)
.await;
let client = GitLabClient::new(&mock_server.uri(), "test-token", None);
let result = client.get_issue_by_iid(1, 999).await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), LoreError::GitLabNotFound { .. }));
}
Same pattern for get_mr_by_iid with GitLabMergeRequest JSON.
1c. Test: Surgical ingest functions (src/ingestion/surgical_tests.rs)
Create src/ingestion/surgical_tests.rs (referenced from surgical.rs with #[cfg(test)] #[path = "surgical_tests.rs"] mod tests;):
use std::path::Path;
use crate::core::db::{create_connection, run_migrations};
use crate::ingestion::surgical::{
ingest_issue_by_iid_from_payload, ingest_mr_by_iid_from_payload,
SurgicalIngestResult,
};
use crate::gitlab::types::{GitLabIssue, GitLabMergeRequest};
fn test_config() -> crate::Config {
serde_json::from_value(serde_json::json!({
"gitlab": {
"baseUrl": "https://gitlab.example.com",
"projects": ["group/project"]
},
"storage": {}
})).unwrap()
}
fn setup_db() -> rusqlite::Connection {
let conn = create_connection(Path::new(":memory:")).unwrap();
run_migrations(&conn).unwrap();
conn.execute(
"INSERT INTO projects (gitlab_project_id, path_with_namespace, web_url)
VALUES (100, 'group/project', 'https://gitlab.example.com/group/project')",
[],
).unwrap();
conn
}
fn make_test_issue(iid: i64) -> GitLabIssue {
serde_json::from_value(serde_json::json!({
"id": 1000 + iid,
"iid": iid,
"project_id": 100,
"title": format!("Issue {iid}"),
"description": "test description",
"state": "opened",
"created_at": "2024-01-15T10:00:00.000Z",
"updated_at": "2024-01-16T10:00:00.000Z",
"web_url": format!("https://gitlab.example.com/issues/{iid}"),
"author": {"id": 1, "username": "testuser", "name": "Test User"},
"assignees": [],
"labels": [],
"milestone": null,
"due_date": null,
"references": {"short": format!("#{iid}"), "full": format!("group/project#{iid}")}
})).unwrap()
}
#[test]
fn ingest_issue_by_iid_inserts_and_marks_dirty() {
let conn = setup_db();
let project_id = 1i64; // auto-assigned by INSERT above
let config = test_config();
let issue = make_test_issue(7);
let result = ingest_issue_by_iid_from_payload(&conn, &config, project_id, &issue, 0).unwrap();
assert_eq!(result.upserted, 1);
// Verify issue exists
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM issues WHERE project_id = ? AND iid = 7",
[project_id], |r| r.get(0),
).unwrap();
assert_eq!(count, 1);
// Verify dirty marker exists
let dirty: i64 = conn.query_row(
"SELECT COUNT(*) FROM dirty_sources WHERE source_type = 'issue'",
[], |r| r.get(0),
).unwrap();
assert!(dirty >= 1);
// Verify dirty_source_keys were collected
assert!(!result.dirty_source_keys.is_empty(),
"Should have collected dirty source keys for scoped doc regeneration");
}
#[test]
fn ingest_issue_by_iid_resets_discussion_watermark() {
let conn = setup_db();
let project_id = 1i64;
let config = test_config();
let issue = make_test_issue(7);
// First insert
ingest_issue_by_iid_from_payload(&conn, &config, project_id, &issue, 0).unwrap();
// Simulate previous discussion sync by setting watermark
conn.execute(
"UPDATE issues SET discussions_synced_for_updated_at = updated_at
WHERE project_id = ? AND iid = 7",
[project_id],
).unwrap();
// Second surgical ingest should reset the watermark
ingest_issue_by_iid_from_payload(&conn, &config, project_id, &issue, 0).unwrap();
let watermark: Option<i64> = conn.query_row(
"SELECT discussions_synced_for_updated_at FROM issues WHERE project_id = ? AND iid = 7",
[project_id], |r| r.get(0),
).unwrap();
assert!(watermark.is_none(), "Surgical ingest should reset discussion watermark to NULL");
}
#[test]
fn ingest_issue_by_iid_resets_event_watermark() {
let conn = setup_db();
let project_id = 1i64;
let config = test_config();
let issue = make_test_issue(7);
ingest_issue_by_iid_from_payload(&conn, &config, project_id, &issue, 0).unwrap();
// Set event watermark
conn.execute(
"UPDATE issues SET resource_events_synced_for_updated_at = updated_at
WHERE project_id = ? AND iid = 7",
[project_id],
).unwrap();
// Surgical re-ingest
ingest_issue_by_iid_from_payload(&conn, &config, project_id, &issue, 0).unwrap();
let watermark: Option<i64> = conn.query_row(
"SELECT resource_events_synced_for_updated_at FROM issues WHERE project_id = ? AND iid = 7",
[project_id], |r| r.get(0),
).unwrap();
assert!(watermark.is_none(), "Surgical ingest should reset event watermark to NULL");
}
#[test]
fn duplicate_iids_are_idempotent() {
let conn = setup_db();
let project_id = 1i64;
let config = test_config();
let issue = make_test_issue(7);
// Ingest same issue twice
ingest_issue_by_iid_from_payload(&conn, &config, project_id, &issue, 0).unwrap();
let result = ingest_issue_by_iid_from_payload(&conn, &config, project_id, &issue, 0).unwrap();
assert_eq!(result.upserted, 1);
// Only one row
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM issues WHERE project_id = ? AND iid = 7",
[project_id], |r| r.get(0),
).unwrap();
assert_eq!(count, 1);
}
Same pattern for MR tests (with make_test_mr and ingest_mr_by_iid_from_payload).
1d. Scoping invariant tests (src/ingestion/surgical_tests.rs)
These tests enforce correctness of docs scoping — the most critical safety property of surgical sync:
#[test]
fn surgical_docs_scope_ignores_preexisting_dirty_rows() {
// Setup: insert a dirty_sources row for a DIFFERENT entity (simulating prior failed sync)
// Run surgical ingest for a new entity
// Call run_generate_docs_for_sources with only the surgical run's dirty keys
// Assert: pre-existing dirty row is UNTOUCHED (still in dirty_sources)
// Assert: only the surgical entity's docs were regenerated
let conn = setup_db();
let project_id = 1i64;
let config = test_config();
// Pre-existing dirty row for issue iid=99 (from prior sync)
conn.execute(
"INSERT INTO issues (project_id, gitlab_issue_id, iid, title, state, created_at, updated_at, web_url, last_seen_at)
VALUES (?1, 999, 99, 'Old issue', 'opened', 1000, 1000, 'https://example.com/99', 1000)",
[project_id],
).unwrap();
let old_issue_id: i64 = conn.query_row(
"SELECT id FROM issues WHERE iid = 99", [], |r| r.get(0),
).unwrap();
conn.execute(
"INSERT INTO dirty_sources (source_type, source_id, queued_at) VALUES ('issue', ?1, 1000)",
[old_issue_id],
).unwrap();
// Surgical ingest of issue iid=7
let issue = make_test_issue(7);
let result = ingest_issue_by_iid_from_payload(&conn, &config, project_id, &issue, 0).unwrap();
// The pre-existing dirty row must still exist
let old_dirty_exists: bool = conn.query_row(
"SELECT COUNT(*) > 0 FROM dirty_sources WHERE source_type = 'issue' AND source_id = ?1",
[old_issue_id], |r| r.get(0),
).unwrap();
assert!(old_dirty_exists, "Pre-existing dirty rows must be preserved");
// The surgical result's dirty_source_keys should NOT include the pre-existing row's key
assert!(!result.dirty_source_keys.iter().any(|(st, sid)| st == "issue" && *sid == old_issue_id),
"Surgical dirty_source_keys must not include pre-existing dirty rows");
}
#[test]
fn surgical_docs_scope_ignores_preexisting_dirty_rows_for_same_entity() {
// Edge case: pre-existing dirty row for the SAME entity (iid=7) from a prior failed sync
// Surgical re-ingest of iid=7 should still collect the key (UPSERT updates queued_at)
// but scoped doc regen uses the collected keys, which correctly identifies this entity
let conn = setup_db();
let project_id = 1i64;
let config = test_config();
// First ingest creates dirty row
let issue = make_test_issue(7);
let first_result = ingest_issue_by_iid_from_payload(&conn, &config, project_id, &issue, 0).unwrap();
// Simulate: the dirty row from first ingest was never processed (orphaned)
// Under UPSERT semantics, the (source_type, source_id) key persists
// Second surgical ingest of same entity — should still collect the key
let second_result = ingest_issue_by_iid_from_payload(&conn, &config, project_id, &issue, 0).unwrap();
// Result should contain the key for this entity (UPSERT touched the row)
assert!(!second_result.dirty_source_keys.is_empty(),
"Dirty source keys should be collected even for re-ingested entities");
}
#[tokio::test]
async fn preflight_aggregates_multiple_missing_iids() {
// Setup: mock server returns 404 for iids 888 and 999, 200 for iid 7
// Call preflight_fetch with all three
// Assert: result contains 1 fetched issue AND 2 failures (not fail-fast)
use wiremock::{MockServer, Mock, ResponseTemplate};
use wiremock::matchers::{method, path};
use crate::ingestion::surgical::preflight_fetch;
let mock_server = MockServer::start().await;
// iid 7 succeeds
Mock::given(method("GET"))
.and(path("/api/v4/projects/1/issues/7"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"id": 42, "iid": 7, "project_id": 1, "title": "Good issue",
"description": "", "state": "opened",
"created_at": "2024-01-15T10:00:00.000Z",
"updated_at": "2024-01-16T10:00:00.000Z",
"web_url": "https://gitlab.example.com/issues/7",
"author": {"id": 1, "username": "u", "name": "U"},
"assignees": [], "labels": [], "milestone": null,
"due_date": null, "references": {"short": "#7", "full": "g/p#7"}
})))
.mount(&mock_server).await;
// iid 888 and 999 return 404
for iid in [888, 999] {
Mock::given(method("GET"))
.and(path(format!("/api/v4/projects/1/issues/{iid}")))
.respond_with(ResponseTemplate::new(404).set_body_json(
serde_json::json!({"message": "404 Not found"})
))
.mount(&mock_server).await;
}
let client = crate::gitlab::GitLabClient::new(&mock_server.uri(), "test-token", None);
let result = preflight_fetch(
&client, 1, &[7, 888, 999], &[], Duration::from_secs(30),
).await.unwrap();
assert_eq!(result.issues.len(), 1, "Should have 1 successful fetch");
assert_eq!(result.failures.len(), 2, "Should aggregate both 404 failures");
assert!(result.failures.iter().all(|f| f.stage == "fetch"));
}
1e. Test: sync_runs table (src/ingestion/surgical_tests.rs)
#[test]
fn sync_run_is_persisted_and_updated() {
let conn = setup_db();
// Insert a sync_run row using existing table + new columns
conn.execute(
"INSERT INTO sync_runs (started_at, heartbeat_at, status, command, mode, phase, surgical_iids_json)
VALUES (strftime('%s','now') * 1000, strftime('%s','now') * 1000, 'running', 'sync', 'surgical', 'preflight', '[7]')",
[],
).unwrap();
let run_id = conn.last_insert_rowid();
// Update phase
conn.execute(
"UPDATE sync_runs SET phase = 'ingest' WHERE id = ?1",
[run_id],
).unwrap();
let phase: String = conn.query_row(
"SELECT phase FROM sync_runs WHERE id = ?1",
[run_id], |r| r.get(0),
).unwrap();
assert_eq!(phase, "ingest");
}
1f. Transactional rollback, TOCTOU, and failure hygiene tests (src/ingestion/surgical_tests.rs)
These tests cover race conditions, rollback guarantees, and failure cleanup — where regressions hit first in production:
#[test]
fn stale_payload_is_skipped_when_local_updated_at_is_newer() {
// Setup: insert an issue with updated_at = 2000
// Create a preflight payload with updated_at = 1000 (stale)
// Surgical ingest should skip the entity and record skipped_stale
let conn = setup_db();
let project_id = 1i64;
let config = test_config();
// Insert issue with newer timestamp
conn.execute(
"INSERT INTO issues (project_id, gitlab_issue_id, iid, title, state, created_at, updated_at, web_url, last_seen_at)
VALUES (?1, 1007, 7, 'Fresh issue', 'opened', 1000, 2000, 'https://example.com/7', 2000)",
[project_id],
).unwrap();
// Stale payload from preflight (updated_at older than local)
let stale_issue = make_test_issue_with_updated_at(7, 1000);
let result = ingest_issue_by_iid_from_payload(&conn, &config, project_id, &stale_issue, 0).unwrap();
assert_eq!(result.skipped_stale, 1, "Stale payload should be skipped");
assert_eq!(result.upserted, 0, "No upsert for stale payload");
// Local row should be unchanged
let local_updated: i64 = conn.query_row(
"SELECT updated_at FROM issues WHERE project_id = ?1 AND iid = 7",
[project_id], |r| r.get(0),
).unwrap();
assert_eq!(local_updated, 2000, "Local fresher row must not be overwritten");
}
#[test]
fn equal_updated_at_but_newer_last_seen_is_skipped() {
// TOCTOU edge case: updated_at is equal but local last_seen_at is newer than preflight start
// This catches concurrent normal sync that fetched the same data after our preflight
let conn = setup_db();
let project_id = 1i64;
let config = test_config();
// Insert issue where updated_at = 1500 and last_seen_at = 3000 (seen after preflight started)
conn.execute(
"INSERT INTO issues (project_id, gitlab_issue_id, iid, title, state, created_at, updated_at, web_url, last_seen_at)
VALUES (?1, 1007, 7, 'Concurrent issue', 'opened', 1000, 1500, 'https://example.com/7', 3000)",
[project_id],
).unwrap();
// Payload has same updated_at=1500, but preflight started at time 2000 (before the concurrent sync at 3000)
let payload = make_test_issue_with_updated_at(7, 1500);
let preflight_started_at_ms = 2000i64;
let result = ingest_issue_by_iid_from_payload(&conn, &config, project_id, &payload, preflight_started_at_ms).unwrap();
assert_eq!(result.skipped_stale, 1, "Equal updated_at but newer last_seen_at should be skipped");
assert_eq!(result.upserted, 0, "No upsert when local was refreshed after our preflight");
}
#[test]
fn preflight_success_then_ingest_failure_rolls_back_all_content_writes() {
// Setup: preflight succeeds for 2 issues
// First issue ingests fine, second causes an error (e.g., constraint violation)
// Assert: NEITHER issue is in the DB (transaction rolled back)
// Assert: sync_runs phase is 'failed'
let conn = setup_db();
let project_id = 1i64;
let config = test_config();
let issue_ok = make_test_issue(7);
let issue_bad = make_test_issue_bad_data(8); // Missing required field, will fail ingest
let preflight = PreflightResult {
issues: vec![issue_ok, issue_bad],
merge_requests: vec![],
failures: vec![],
};
let result = ingest_preflight_results(&conn, &config, project_id, &preflight, 0);
assert!(result.is_err(), "Ingest should fail on bad data");
// Verify rollback: no issues written
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM issues WHERE project_id = ?1",
[project_id], |r| r.get(0),
).unwrap();
assert_eq!(count, 0, "Transaction rollback should leave zero issues");
}
#[test]
fn surgical_no_docs_requires_no_embed_validation() {
// Verify that surgical mode with --no-docs but without --no-embed is rejected
let options = SyncOptions {
issue_iids: vec![7],
no_docs: true,
no_embed: false,
..SyncOptions::default()
};
assert!(options.is_surgical());
// The validation logic in handle_sync_cmd should reject this combination
// (tested via integration or by calling the validation function directly)
}
1g. Cancellation, timeout, scoped embed isolation, and payload integrity tests (src/ingestion/surgical_tests.rs)
These tests cover the new failure-prone runtime paths added by constraints 15-17:
#[tokio::test]
async fn cancellation_marks_sync_run_cancelled() {
// Setup: start a surgical sync run, trigger cancellation signal after preflight
// Assert: sync_runs row has status='cancelled', phase='cancelled', finished_at populated
// Assert: no content writes occurred after cancellation
todo!("implement after SyncRunRecorder extensions in Step 8b")
}
#[tokio::test]
async fn dependent_timeout_records_entity_failure_and_continues() {
// Setup: mock server with one fast-responding entity and one that hangs
// Set surgical_entity_timeout_seconds = 1
// Assert: fast entity's dependents succeed
// Assert: slow entity has EntityFailure { code: "TIMEOUT", stage: "discussions" }
// Assert: run still completes (not aborted)
todo!("implement after inline dependent helpers in Step 9b")
}
#[tokio::test]
async fn scoped_embed_does_not_embed_unrelated_docs_created_after_docs_stage() {
// Setup: surgical sync generates docs for entity A (collect document_ids)
// Between docs and embed stage, simulate another sync creating unembedded docs for entity B
// Call run_embed_for_document_ids with ONLY entity A's doc IDs
// Assert: entity B's docs remain unembedded
// Assert: entity A's docs are embedded
todo!("implement after run_embed_for_document_ids in Step 9a")
}
#[test]
fn payload_project_id_mismatch_is_rejected_in_preflight() {
// Setup: create a payload where project_id != expected gitlab_project_id
// Assert: preflight records EntityFailure { code: "PROJECT_MISMATCH" }
// Assert: mismatched payload is NOT included in successful fetches
todo!("implement after preflight_fetch integrity check in Step 7")
}
Step 2: Add --issue, --mr, -p, --preflight-only to SyncArgs
File: src/cli/mod.rs, struct SyncArgs (line 755)
Add after the existing no_file_changes field (around line 784):
/// Surgically sync specific issues by IID (repeatable, must be positive)
#[arg(long, value_parser = clap::value_parser!(u64).range(1..), action = clap::ArgAction::Append)]
pub issue: Vec<u64>,
/// Surgically sync specific merge requests by IID (repeatable, must be positive)
#[arg(long, value_parser = clap::value_parser!(u64).range(1..), action = clap::ArgAction::Append)]
pub mr: Vec<u64>,
/// Scope to a single project (required when --issue or --mr is used, falls back to config.defaultProject)
#[arg(short = 'p', long)]
pub project: Option<String>,
/// Validate remote entities exist and permissions are valid, without any DB writes.
/// Runs the preflight network fetch phase only. Useful for agents to verify IIDs before committing to a full surgical sync.
#[arg(long, default_value_t = false)]
pub preflight_only: bool,
Why u64 with range(1..): IIDs are always positive integers. Parse-time validation via clap gives immediate, clear error messages (e.g., error: 0 is not in 1..) with zero runtime plumbing. The u64 type makes invalid states unrepresentable.
Step 3: Extend SyncOptions
File: src/cli/commands/sync.rs, struct SyncOptions (line 20)
Add fields:
#[derive(Debug, Default)]
pub struct SyncOptions {
pub full: bool,
pub force: bool,
pub no_embed: bool,
pub no_docs: bool,
pub no_events: bool,
pub robot_mode: bool,
pub dry_run: bool,
// NEW:
pub issue_iids: Vec<u64>,
pub mr_iids: Vec<u64>,
pub project: Option<String>,
pub preflight_only: bool,
}
Add helper method:
impl SyncOptions {
/// Maximum combined IIDs allowed in a single surgical sync.
const MAX_SURGICAL_TARGETS: usize = 100;
pub fn is_surgical(&self) -> bool {
!self.issue_iids.is_empty() || !self.mr_iids.is_empty()
}
}
Step 4: Wire new fields in handle_sync_cmd
File: src/main.rs, function handle_sync_cmd (line 2034)
After line 2058 (existing dry_run field), add:
// Deduplicate IIDs before constructing SyncOptions
let mut issue_iids = args.issue;
let mut mr_iids = args.mr;
issue_iids.sort_unstable();
issue_iids.dedup();
mr_iids.sort_unstable();
mr_iids.dedup();
let options = SyncOptions {
full: args.full && !args.no_full,
force: args.force && !args.no_force,
no_embed: args.no_embed,
no_docs: args.no_docs,
no_events: args.no_events,
robot_mode,
dry_run,
// NEW:
issue_iids,
mr_iids,
project: args.project,
preflight_only: args.preflight_only,
};
Add validation before recording starts (after options creation):
// Validate surgical mode constraints
if options.is_surgical() {
// Enforce hard cap on combined target count
let total_targets = options.issue_iids.len() + options.mr_iids.len();
if total_targets > SyncOptions::MAX_SURGICAL_TARGETS {
return Err(Box::new(LoreError::Other(
format!(
"Too many surgical targets ({total_targets}). Maximum is {}.",
SyncOptions::MAX_SURGICAL_TARGETS
)
)));
}
// Fall back to config.defaultProject when -p is omitted
let project = options.project.clone().or_else(|| config.default_project.clone());
if project.is_none() {
return Err(Box::new(LoreError::Other(
"The --issue and --mr flags require --project (-p) or config.defaultProject".to_string()
)));
}
// (reassign resolved project back into options)
// Reject incompatible flags
if options.full {
return Err(Box::new(LoreError::Other(
"--full is incompatible with surgical sync (--issue/--mr)".to_string()
)));
}
// Reject --no-docs without --no-embed in surgical mode (embed leakage prevention)
if options.no_docs && !options.no_embed {
return Err(Box::new(LoreError::Other(
"In surgical mode, --no-docs requires --no-embed (to prevent embedding unrelated backlog docs)".to_string()
)));
}
}
// Validate preflight_only requires surgical mode
if options.preflight_only && !options.is_surgical() {
return Err(Box::new(LoreError::Other(
"--preflight-only requires --issue or --mr".to_string()
)));
}
Step 5: Add get_issue_by_iid and get_mr_by_iid to GitLabClient
File: src/gitlab/client.rs
Add after paginate_issues (around line 330), before paginate_merge_requests:
/// Fetch a single issue by its project-scoped IID.
/// Uses: GET /api/v4/projects/:id/issues/:iid
pub async fn get_issue_by_iid(
&self,
gitlab_project_id: i64,
iid: u64,
) -> Result<GitLabIssue> {
let path = format!("/api/v4/projects/{gitlab_project_id}/issues/{iid}");
self.request(&path).await
}
/// Fetch a single merge request by its project-scoped IID.
/// Uses: GET /api/v4/projects/:id/merge_requests/:iid
pub async fn get_mr_by_iid(
&self,
gitlab_project_id: i64,
iid: u64,
) -> Result<GitLabMergeRequest> {
let path = format!("/api/v4/projects/{gitlab_project_id}/merge_requests/{iid}");
self.request(&path).await
}
These reuse the existing request() method (line 117) which handles:
- Auth via
PRIVATE-TOKENheader - Rate limiting via
RateLimiter - Retry on 429 with
retry_after - JSON deserialization via
handle_response - Error mapping (401 →
GitLabAuthFailed, 404 →GitLabNotFound)
Step 6: Make process_single_issue and process_single_mr pub(crate)
File: src/ingestion/issues.rs, line 143
Change fn process_single_issue → pub(crate) fn process_single_issue
File: src/ingestion/merge_requests.rs, line 144
Change fn process_single_mr → pub(crate) fn process_single_mr
Also add to src/ingestion/mod.rs exports (not pub, just ensure accessible via crate::ingestion::issues::process_single_issue).
Step 7: Create src/ingestion/surgical.rs
This is the core new module. It provides three layers:
- Payload processing (sync, no network — testable) — takes a
GitLabIssue/GitLabMergeRequestalready fetched - Preflight fetch (async, hits GitLab API) — fetches all entities BEFORE any DB writes, aggregating all failures instead of failing fast
- Transactional ingest — applies all DB mutations inside a transaction after successful preflight
use rusqlite::Connection;
use tracing::debug;
use crate::Config;
use crate::core::error::Result;
use crate::gitlab::GitLabClient;
use crate::gitlab::types::{GitLabIssue, GitLabMergeRequest};
use crate::ingestion::issues::IssueForDiscussionSync;
use crate::ingestion::merge_requests::MrForDiscussionSync;
use super::issues::process_single_issue;
use super::merge_requests::process_single_mr;
/// Per-entity failure info for robot mode structured error reporting.
#[derive(Debug, Clone)]
pub struct EntityFailure {
pub entity_type: &'static str, // "issue" or "merge_request"
pub iid: u64,
pub stage: &'static str, // "fetch", "ingest", "discussions", etc.
pub code: String, // e.g. "NOT_FOUND", "NETWORK_ERROR"
pub message: String,
}
/// A dirty source key: (source_type, source_id) matching dirty_sources PK.
pub type DirtySourceKey = (String, i64);
#[derive(Debug, Default)]
pub struct SurgicalIngestResult {
pub upserted: usize,
pub labels_created: usize,
pub issue_disc_sync: Vec<IssueForDiscussionSync>,
pub mr_disc_sync: Vec<MrForDiscussionSync>,
/// Dirty source keys touched during this surgical ingest (for scoped doc regeneration).
/// Each key is `(source_type, source_id)` matching `dirty_sources` PK.
pub dirty_source_keys: Vec<DirtySourceKey>,
/// Per-entity failures for structured error reporting in robot mode
pub entity_failures: Vec<EntityFailure>,
/// Entities skipped because local row was newer than preflight payload (TOCTOU protection)
pub skipped_stale: usize,
}
/// Check whether the local issue row has a fresher state than the payload.
/// Returns true if local is newer (payload is stale and should be skipped).
/// Uses two-tier check: (a) updated_at strictly newer, or (b) equal updated_at
/// but last_seen_at is newer than preflight start (concurrent sync fetched same data).
fn is_local_newer_issue(conn: &Connection, project_id: i64, iid: i64, payload_updated_at: i64, preflight_started_at_ms: i64) -> Result<bool> {
let row: Option<(i64, i64)> = conn.query_row(
"SELECT updated_at, last_seen_at FROM issues WHERE project_id = ?1 AND iid = ?2",
(project_id, iid),
|row| Ok((row.get(0)?, row.get(1)?)),
).optional()?;
Ok(row.map_or(false, |(local_ts, local_seen)| {
local_ts > payload_updated_at
|| (local_ts == payload_updated_at && local_seen > preflight_started_at_ms)
}))
}
/// Check whether the local MR row has a fresher state than the payload.
fn is_local_newer_mr(conn: &Connection, project_id: i64, iid: i64, payload_updated_at: i64, preflight_started_at_ms: i64) -> Result<bool> {
let row: Option<(i64, i64)> = conn.query_row(
"SELECT updated_at, last_seen_at FROM merge_requests WHERE project_id = ?1 AND iid = ?2",
(project_id, iid),
|row| Ok((row.get(0)?, row.get(1)?)),
).optional()?;
Ok(row.map_or(false, |(local_ts, local_seen)| {
local_ts > payload_updated_at
|| (local_ts == payload_updated_at && local_seen > preflight_started_at_ms)
}))
}
/// Process a single issue that has already been fetched from GitLab.
/// Upserts into DB, resets discussion + event watermarks so dependents re-sync.
/// Tracks dirty source keys for scoped doc regeneration.
/// Skips stale payloads to avoid TOCTOU overwrite after unlocked preflight.
///
/// `preflight_started_at_ms`: ms-epoch timestamp captured before preflight fetch began.
/// Used for equal-timestamp TOCTOU tie-breaking via `last_seen_at`.
pub fn ingest_issue_by_iid_from_payload(
conn: &Connection,
config: &Config,
project_id: i64,
issue: &GitLabIssue,
preflight_started_at_ms: i64,
) -> Result<SurgicalIngestResult> {
let mut result = SurgicalIngestResult::default();
// TOCTOU guard: skip if local row is fresher than preflight payload
if is_local_newer_issue(conn, project_id, issue.iid, issue.updated_at, preflight_started_at_ms)? {
result.skipped_stale = 1;
debug!(iid = issue.iid, "Skipping stale payload (local is newer)");
return Ok(result);
}
let labels_created = process_single_issue(conn, config, project_id, issue)?;
result.upserted = 1;
result.labels_created = labels_created;
// Reset watermarks so discussions + events re-sync for this issue
conn.execute(
"UPDATE issues SET
discussions_synced_for_updated_at = NULL,
resource_events_synced_for_updated_at = NULL
WHERE project_id = ?1 AND iid = ?2",
(project_id, issue.iid),
)?;
// Collect dirty source key for scoped doc regeneration
let local_issue_id: i64 = conn.query_row(
"SELECT id FROM issues WHERE project_id = ?1 AND iid = ?2",
(project_id, issue.iid),
|row| row.get(0),
)?;
result.dirty_source_keys.push(("issue".to_string(), local_issue_id));
// Build the discussion sync descriptor
let row = conn.query_row(
"SELECT id, iid, updated_at FROM issues WHERE project_id = ?1 AND iid = ?2",
(project_id, issue.iid),
|row| {
Ok(IssueForDiscussionSync {
local_issue_id: row.get(0)?,
iid: row.get(1)?,
updated_at: row.get(2)?,
})
},
)?;
result.issue_disc_sync.push(row);
debug!(iid = issue.iid, "Surgical issue ingest complete");
Ok(result)
}
/// Process a single MR that has already been fetched from GitLab.
/// Skips stale payloads to avoid TOCTOU overwrite after unlocked preflight.
pub fn ingest_mr_by_iid_from_payload(
conn: &Connection,
config: &Config,
project_id: i64,
mr: &GitLabMergeRequest,
preflight_started_at_ms: i64,
) -> Result<SurgicalIngestResult> {
let mut result = SurgicalIngestResult::default();
// TOCTOU guard: skip if local row is fresher than preflight payload
if is_local_newer_mr(conn, project_id, mr.iid, mr.updated_at, preflight_started_at_ms)? {
result.skipped_stale = 1;
debug!(iid = mr.iid, "Skipping stale MR payload (local is newer)");
return Ok(result);
}
let mr_result = process_single_mr(conn, config, project_id, mr)?;
result.upserted = 1;
result.labels_created = mr_result.labels_created;
// Reset watermarks
conn.execute(
"UPDATE merge_requests SET
discussions_synced_for_updated_at = NULL,
resource_events_synced_for_updated_at = NULL
WHERE project_id = ?1 AND iid = ?2",
(project_id, mr.iid),
)?;
// Collect dirty source key
let local_mr_id: i64 = conn.query_row(
"SELECT id FROM merge_requests WHERE project_id = ?1 AND iid = ?2",
(project_id, mr.iid),
|row| row.get(0),
)?;
result.dirty_source_keys.push(("merge_request".to_string(), local_mr_id));
let row = conn.query_row(
"SELECT id, iid, updated_at FROM merge_requests WHERE project_id = ?1 AND iid = ?2",
(project_id, mr.iid),
|row| {
Ok(MrForDiscussionSync {
local_mr_id: row.get(0)?,
iid: row.get(1)?,
updated_at: row.get(2)?,
})
},
)?;
result.mr_disc_sync.push(row);
debug!(iid = mr.iid, "Surgical MR ingest complete");
Ok(result)
}
/// Preflight: fetch all requested entities from GitLab without any DB writes.
/// Aggregates ALL failures instead of failing fast — agents get a complete error report in one pass.
/// Returns the fetched payloads plus any per-IID failures.
/// Caller MUST check `result.failures` and abort writes if non-empty.
pub async fn preflight_fetch(
client: &GitLabClient,
gitlab_project_id: i64,
issue_iids: &[u64],
mr_iids: &[u64],
entity_timeout: Duration,
) -> Result<PreflightResult> {
let mut result = PreflightResult::default();
for &iid in issue_iids {
debug!(iid, "Preflight: fetching issue");
match tokio::time::timeout(entity_timeout, client.get_issue_by_iid(gitlab_project_id, iid)).await {
Ok(Ok(issue)) => {
// Payload integrity check: project_id must match requested gitlab_project_id
if issue.project_id != gitlab_project_id {
result.failures.push(EntityFailure {
entity_type: "issue",
iid,
stage: "fetch",
code: "PROJECT_MISMATCH".to_string(),
message: format!(
"Payload project_id {} does not match requested {}",
issue.project_id, gitlab_project_id
),
});
} else {
result.issues.push(issue);
}
}
Ok(Err(e)) => {
let code = classify_error_code(&e);
result.failures.push(EntityFailure {
entity_type: "issue",
iid,
stage: "fetch",
code,
message: e.to_string(),
});
}
Err(_elapsed) => {
result.failures.push(EntityFailure {
entity_type: "issue",
iid,
stage: "fetch",
code: "TIMEOUT".to_string(),
message: format!("Timed out after {}s", entity_timeout.as_secs()),
});
}
}
}
for &iid in mr_iids {
debug!(iid, "Preflight: fetching MR");
match tokio::time::timeout(entity_timeout, client.get_mr_by_iid(gitlab_project_id, iid)).await {
Ok(Ok(mr)) => {
// Payload integrity check: project_id must match
if mr.target_project_id.unwrap_or(mr.project_id) != gitlab_project_id {
result.failures.push(EntityFailure {
entity_type: "merge_request",
iid,
stage: "fetch",
code: "PROJECT_MISMATCH".to_string(),
message: format!(
"Payload project_id does not match requested {}",
gitlab_project_id
),
});
} else {
result.merge_requests.push(mr);
}
}
Ok(Err(e)) => {
let code = classify_error_code(&e);
result.failures.push(EntityFailure {
entity_type: "merge_request",
iid,
stage: "fetch",
code,
message: e.to_string(),
});
}
Err(_elapsed) => {
result.failures.push(EntityFailure {
entity_type: "merge_request",
iid,
stage: "fetch",
code: "TIMEOUT".to_string(),
message: format!("Timed out after {}s", entity_timeout.as_secs()),
});
}
}
}
Ok(result)
}
/// Map a LoreError to a machine-readable error code string.
fn classify_error_code(e: &crate::core::error::LoreError) -> String {
match e {
crate::core::error::LoreError::GitLabNotFound { .. } => "NOT_FOUND".to_string(),
crate::core::error::LoreError::GitLabAuthFailed { .. } => "AUTH_FAILED".to_string(),
crate::core::error::LoreError::RateLimited { .. } => "RATE_LIMITED".to_string(),
_ => "FETCH_ERROR".to_string(),
}
}
#[derive(Debug, Default)]
pub struct PreflightResult {
pub issues: Vec<GitLabIssue>,
pub merge_requests: Vec<GitLabMergeRequest>,
/// Per-IID failures collected during preflight (empty = all succeeded)
pub failures: Vec<EntityFailure>,
}
impl PreflightResult {
pub fn has_failures(&self) -> bool {
!self.failures.is_empty()
}
}
/// Ingest all preflight-fetched entities into the DB inside a transaction.
/// Returns combined result with all dirty source keys and discussion sync descriptors.
///
/// `preflight_started_at_ms`: ms-epoch timestamp captured before preflight began,
/// used for TOCTOU tie-breaking on equal `updated_at` timestamps.
pub fn ingest_preflight_results(
conn: &Connection,
config: &Config,
project_id: i64,
preflight: &PreflightResult,
preflight_started_at_ms: i64,
) -> Result<SurgicalIngestResult> {
let mut combined = SurgicalIngestResult::default();
// All writes happen inside a transaction — if any fails, all roll back
let tx = conn.unchecked_transaction()?;
for issue in &preflight.issues {
let single = ingest_issue_by_iid_from_payload(&tx, config, project_id, issue, preflight_started_at_ms)?;
combined.upserted += single.upserted;
combined.skipped_stale += single.skipped_stale;
combined.labels_created += single.labels_created;
combined.issue_disc_sync.extend(single.issue_disc_sync);
combined.dirty_source_keys.extend(single.dirty_source_keys);
}
for mr in &preflight.merge_requests {
let single = ingest_mr_by_iid_from_payload(&tx, config, project_id, mr, preflight_started_at_ms)?;
combined.upserted += single.upserted;
combined.skipped_stale += single.skipped_stale;
combined.labels_created += single.labels_created;
combined.mr_disc_sync.extend(single.mr_disc_sync);
combined.dirty_source_keys.extend(single.dirty_source_keys);
}
tx.commit()?;
Ok(combined)
}
#[cfg(test)]
#[path = "surgical_tests.rs"]
mod tests;
Step 8: Register surgical module
File: src/ingestion/mod.rs
Add after line 8 (pub mod orchestrator;):
pub mod surgical;
Step 8a: Extend existing sync_runs table via migration
File: src/core/db.rs
The sync_runs table already exists (migration 001, enriched in migration 014). Add a new migration to extend it with surgical sync columns:
-- Migration 027: Extend sync_runs for surgical sync observability
-- Adds mode/phase tracking and surgical-specific counters.
-- Reuses existing sync_runs row lifecycle (SyncRunRecorder).
ALTER TABLE sync_runs ADD COLUMN mode TEXT; -- 'standard' | 'surgical' (NULL for pre-existing rows)
ALTER TABLE sync_runs ADD COLUMN phase TEXT; -- preflight|ingest|dependents|docs|embed|done|failed
ALTER TABLE sync_runs ADD COLUMN surgical_iids_json TEXT; -- JSON: {"issues":[7,8],"mrs":[101]}
ALTER TABLE sync_runs ADD COLUMN issues_fetched INTEGER NOT NULL DEFAULT 0;
ALTER TABLE sync_runs ADD COLUMN mrs_fetched INTEGER NOT NULL DEFAULT 0;
ALTER TABLE sync_runs ADD COLUMN issues_ingested INTEGER NOT NULL DEFAULT 0;
ALTER TABLE sync_runs ADD COLUMN mrs_ingested INTEGER NOT NULL DEFAULT 0;
ALTER TABLE sync_runs ADD COLUMN skipped_stale INTEGER NOT NULL DEFAULT 0;
ALTER TABLE sync_runs ADD COLUMN docs_regenerated INTEGER NOT NULL DEFAULT 0;
ALTER TABLE sync_runs ADD COLUMN docs_embedded INTEGER NOT NULL DEFAULT 0;
ALTER TABLE sync_runs ADD COLUMN warnings_count INTEGER NOT NULL DEFAULT 0;
ALTER TABLE sync_runs ADD COLUMN cancelled_at INTEGER;
-- Indexes for observability queries on surgical runs
CREATE INDEX IF NOT EXISTS idx_sync_runs_mode_started
ON sync_runs(mode, started_at DESC);
CREATE INDEX IF NOT EXISTS idx_sync_runs_status_phase_started
ON sync_runs(status, phase, started_at DESC);
Note: No changes to pending_dependent_fetches — surgical mode bypasses the queue entirely (see constraint 4).
Step 8b: Extend SyncRunRecorder for surgical mode
File: src/core/sync.rs (or wherever SyncRunRecorder lives)
Add methods for surgical mode lifecycle management. This replaces all ad-hoc SQL that Step 9 would otherwise need. The recorder owns the row ID and handles all sync_runs mutations:
impl SyncRunRecorder {
/// Start a new surgical sync run. Records mode='surgical' and phase='preflight'.
pub fn start_surgical(
conn: &Connection,
iids_json: &str,
) -> Result<Self> {
// INSERT INTO sync_runs with mode='surgical', phase='preflight', surgical_iids_json
// Returns Self with the row id for subsequent updates
todo!()
}
/// Update the current phase (preflight -> ingest -> dependents -> docs -> embed -> done)
pub fn set_phase(&self, conn: &Connection, phase: &str) -> Result<()> {
conn.execute(
"UPDATE sync_runs SET phase = ?1, heartbeat_at = strftime('%s','now') * 1000 WHERE id = ?2",
rusqlite::params![phase, self.row_id],
)?;
Ok(())
}
/// Update surgical-specific counters (issues_fetched, mrs_fetched, etc.)
pub fn set_counters(&self, conn: &Connection, counters: &SurgicalCounters) -> Result<()> {
// Single UPDATE with all counter fields
todo!()
}
/// Finalize: succeeded
pub fn finish_succeeded(&self, conn: &Connection) -> Result<()> {
// SET phase='done', status='succeeded', finished_at=now
todo!()
}
/// Finalize: succeeded with warnings (partial dependent failures)
pub fn finish_succeeded_with_warnings(&self, conn: &Connection, warnings_count: usize) -> Result<()> {
// SET phase='done', status='succeeded', warnings_count=N, finished_at=now
todo!()
}
/// Finalize: failed
pub fn finish_failed(&self, conn: &Connection, error: &str) -> Result<()> {
// SET phase='failed', status='failed', error=msg, finished_at=now
todo!()
}
/// Finalize: cancelled (shutdown signal received)
pub fn finish_cancelled(&self, conn: &Connection) -> Result<()> {
// SET phase='cancelled', status='cancelled', cancelled_at=now, finished_at=now
todo!()
}
/// Heartbeat if enough time has elapsed since last heartbeat
pub fn heartbeat_if_due(&self, conn: &Connection) -> Result<()> {
// UPDATE heartbeat_at if interval exceeded
todo!()
}
}
/// Counter snapshot for surgical sync run updates
#[derive(Debug, Default)]
pub struct SurgicalCounters {
pub issues_fetched: usize,
pub mrs_fetched: usize,
pub issues_ingested: usize,
pub mrs_ingested: usize,
pub skipped_stale: usize,
pub docs_regenerated: usize,
pub docs_embedded: usize,
}
Step 9: Create run_sync_surgical in src/cli/commands/sync.rs
This is the surgical variant of run_sync. It uses a preflight-then-commit pattern: all primary entity fetches happen first (WITHOUT the sync lock), then DB writes happen transactionally (WITH the sync lock). Dependent stages (discussions, resource events, MR closes_issues, MR diffs) run inline per-entity — they do NOT use pending_dependent_fetches.
IMPORTANT implementation notes:
- Dry-run is zero-write: Check
options.dry_runBEFORE lock acquisition and DB connection. No side effects at all. - Preflight-only: Check
options.preflight_onlyafter preflight fetch completes. Return results (including any failures) with zero content DB writes. Control-plane run-ledger writes are allowed. - Lock window minimization: Lock is NOT held during preflight network I/O. Acquired immediately before Stage 1 (first content DB mutation). Released before embed stage.
- Aggregate preflight failures: Preflight collects ALL per-IID failures (including payload integrity mismatches). If any failures exist, abort with zero content DB writes and return structured error report.
- Durable run state via SyncRunRecorder: Use
SyncRunRecorder::start_surgical(...)at run start. Callrecorder.set_phase()after each stage transition. On success callrecorder.finish_succeeded()(orfinish_succeeded_with_warnings). On failure callrecorder.finish_failed(). On cancellation callrecorder.finish_cancelled(). No raw SQL updates tosync_runs. - Sync lock: Acquire
AppLock("sync")using the same stale lock and heartbeat settings asrun_ingest. Hold lock for ingest + dependents + docs only. Release before embed. - Transactional ingest: After successful preflight, apply all DB mutations inside
unchecked_transaction(). - Stale-write protection: Each entity's ingest checks local
updated_at(andlast_seen_atfor equal-timestamp tie-breaking) vs preflight payload. Stale payloads are skipped (not overwritten). - Inline dependents: Discussions, resource events, MR closes_issues, and MR diffs are fetched and written inline per-entity. No jobs are enqueued to
pending_dependent_fetches. Individual dependent stage failures are recorded per-entity inentity_failuresbut do not abort the run. - Per-entity timeout: Each dependent network call is wrapped in
tokio::time::timeout(Duration::from_secs(config.sync.surgical_entity_timeout_seconds.unwrap_or(30))). Timeouts produceEntityFailure { code: "TIMEOUT" }. - Scoped doc regeneration: Call
run_generate_docs_for_sources(config, &dirty_source_keys)which returnsGenerateDocsResultincludingregenerated_document_ids. - Scoped embed: Call
run_embed_for_document_ids(config, ®enerated_document_ids, signal)— NOT globalrun_embed. This ensures isolation even after lock release and under concurrent normal sync. - Embed guard: Embed stage only runs when surgical docs actually regenerated documents in this run (
!options.no_docs && !regenerated_document_ids.is_empty()). - prefetch_mr_discussions signature: Takes
(client, gitlab_project_id, local_project_id, MrForDiscussionSync)— NOT a slice. - Preflight timestamp: Capture
preflight_started_at_msbefore the first network call and pass it through to ingest functions for TOCTOU tie-breaking. - Cancellation handling: Every
signal.is_cancelled()check MUST callrecorder.finish_cancelled()before returning. No early returns without finalizing the run row. - Payload integrity: Preflight validates
payload.project_id == gitlab_project_idfor every fetched entity. Mismatches becomeEntityFailure { code: "PROJECT_MISMATCH" }.
Add this function after run_sync (around line 360):
pub async fn run_sync_surgical(
config: &Config,
options: &SyncOptions,
run_id: &str,
signal: &ShutdownSignal,
) -> Result<SyncResult> {
use crate::core::db::create_connection;
use crate::core::lock::{AppLock, LockOptions};
use crate::core::paths::get_db_path;
use crate::core::project::resolve_project;
use crate::core::sync::{SyncRunRecorder, SurgicalCounters};
use crate::gitlab::GitLabClient;
use crate::ingestion::discussions::ingest_issue_discussions;
use crate::ingestion::mr_discussions::prefetch_mr_discussions;
use crate::ingestion::mr_discussions::write_prefetched_mr_discussions;
use crate::ingestion::surgical::{preflight_fetch, ingest_preflight_results};
let span = tracing::info_span!("sync_surgical", %run_id);
async move {
let mut result = SyncResult {
run_id: run_id.to_string(),
..SyncResult::default()
};
// ── Dry-run: zero side effects (no lock, no DB, no network) ──
if options.dry_run {
let project_str = options.project.as_deref().expect("validated in handle_sync_cmd");
info!(
issues = ?options.issue_iids,
mrs = ?options.mr_iids,
project = project_str,
"Surgical sync dry-run: would fetch these IIDs"
);
return Ok(result);
}
let db_path = get_db_path(config.storage.db_path.as_deref());
let conn = create_connection(&db_path)?;
let project_str = options.project.as_deref().expect("validated in handle_sync_cmd");
let project_id = resolve_project(&conn, project_str)?;
let gitlab_project_id: i64 = conn.query_row(
"SELECT gitlab_project_id FROM projects WHERE id = ?1",
[project_id],
|r| r.get(0),
)?;
let token = config.resolve_token()?;
let client = GitLabClient::new(
&config.gitlab.base_url, &token, config.sync.rate_limit_rps,
);
// ── Record sync run via SyncRunRecorder (not ad-hoc SQL) ──
let iids_json = serde_json::to_string(&serde_json::json!({
"issues": options.issue_iids,
"mrs": options.mr_iids,
})).unwrap_or_default();
let recorder = SyncRunRecorder::start_surgical(&conn, &iids_json)?;
// ── Capture preflight start timestamp for TOCTOU tie-breaking ──
let preflight_started_at_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
// ── Stage 0: Preflight fetch (NO lock, NO content DB writes — network only) ──
// Lock is NOT held during preflight to minimize contention with normal syncs.
let stage_start = Instant::now();
let total_items = options.issue_iids.len() + options.mr_iids.len();
let entity_timeout = Duration::from_secs(
config.sync.surgical_entity_timeout_seconds.unwrap_or(30) as u64
);
let spinner = stage_spinner_v2(
Icons::sync(),
"Preflight",
&format!("fetching {} items...", total_items),
options.robot_mode,
);
let preflight = preflight_fetch(
&client, gitlab_project_id, &options.issue_iids, &options.mr_iids, entity_timeout,
).await?;
// Update run ledger via recorder
recorder.set_phase(&conn, "preflight")?;
recorder.set_counters(&conn, &SurgicalCounters {
issues_fetched: preflight.issues.len(),
mrs_fetched: preflight.merge_requests.len(),
..Default::default()
})?;
let preflight_elapsed = stage_start.elapsed();
result.stage_timings_ms.insert("preflight".to_string(), preflight_elapsed.as_millis() as u64);
// Check for preflight failures — abort with structured error report if any
if preflight.has_failures() {
let failure_summary = preflight.failures.iter()
.map(|f| format!("{} #{}: {} ({})", f.entity_type, f.iid, f.code, f.message))
.collect::<Vec<_>>()
.join("; ");
let fail_icon = color_icon(Icons::error(), true);
emit_stage_line(&spinner, &fail_icon, "Preflight",
&format!("{} of {} failed: {}", preflight.failures.len(), total_items, failure_summary),
preflight_elapsed);
// Record failure via recorder
recorder.finish_failed(&conn, &failure_summary)?;
result.entity_failures = preflight.failures.clone();
return Err(LoreError::SurgicalPreflightFailed {
run_id: run_id.to_string(),
total: total_items,
failures: preflight.failures,
}.into());
}
let preflight_summary = format!(
"{} issues, {} MRs fetched",
preflight.issues.len(), preflight.merge_requests.len()
);
let preflight_icon = color_icon(Icons::success(), false);
emit_stage_line(&spinner, &preflight_icon, "Preflight", &preflight_summary, preflight_elapsed);
// ── Preflight-only mode: return after successful preflight, zero content DB writes ──
if options.preflight_only {
result.requested_count = total_items;
result.fetched_count = preflight.issues.len() + preflight.merge_requests.len();
recorder.finish_succeeded(&conn)?;
return Ok(result);
}
if signal.is_cancelled() {
recorder.finish_cancelled(&conn)?;
return Ok(result);
}
// ── Acquire sync lock ONLY for mutation phases ──
// Lock is acquired here, AFTER preflight completes, to minimize contention.
let lock_conn = create_connection(&db_path)?;
let mut lock = AppLock::new(
lock_conn,
LockOptions {
name: "sync".to_string(),
stale_lock_minutes: config.sync.stale_lock_minutes,
heartbeat_interval_seconds: config.sync.heartbeat_interval_seconds,
},
);
lock.acquire(false)?; // not force — respect existing locks
// ── Stage 1: Transactional ingest (all-or-nothing content DB writes) ──
let stage_start = Instant::now();
let spinner = stage_spinner_v2(
Icons::sync(),
"Ingest",
"writing to database...",
options.robot_mode,
);
recorder.set_phase(&conn, "ingest")?;
let ingest_result = match ingest_preflight_results(
&conn, config, project_id, &preflight, preflight_started_at_ms,
) {
Ok(r) => r,
Err(e) => {
recorder.finish_failed(&conn, &e.to_string())?;
return Err(e);
}
};
result.issues_updated = preflight.issues.len();
result.mrs_updated = preflight.merge_requests.len();
result.skipped_stale = ingest_result.skipped_stale;
let all_dirty_source_keys = ingest_result.dirty_source_keys.clone();
// Update run ledger counters
recorder.set_counters(&conn, &SurgicalCounters {
issues_fetched: preflight.issues.len(),
mrs_fetched: preflight.merge_requests.len(),
issues_ingested: result.issues_updated,
mrs_ingested: result.mrs_updated,
skipped_stale: result.skipped_stale,
..Default::default()
})?;
let ingest_elapsed = stage_start.elapsed();
result.stage_timings_ms.insert("ingest".to_string(), ingest_elapsed.as_millis() as u64);
let ingest_summary = format!(
"{} issues, {} MRs ingested{}",
result.issues_updated, result.mrs_updated,
if result.skipped_stale > 0 { format!(", {} skipped (stale)", result.skipped_stale) } else { String::new() }
);
let ingest_icon = color_icon(Icons::success(), false);
emit_stage_line(&spinner, &ingest_icon, "Ingest", &ingest_summary, ingest_elapsed);
if signal.is_cancelled() {
recorder.finish_cancelled(&conn)?;
return Ok(result);
}
// ── Stage 2: Dependent stages (discussions, events, MR dependents) ──
// All dependents run INLINE per-entity — no jobs are enqueued to pending_dependent_fetches.
// Individual dependent failures are recorded per-entity but do not abort the run.
// Each network call is wrapped in tokio::time::timeout for bounded execution.
recorder.set_phase(&conn, "dependents")?;
let stage_start = Instant::now();
// Stage 2a: Discussions for issues
if !ingest_result.issue_disc_sync.is_empty() {
for issue_info in &ingest_result.issue_disc_sync {
match tokio::time::timeout(entity_timeout, ingest_issue_discussions(
&conn, &client, config, gitlab_project_id, project_id,
std::slice::from_ref(issue_info),
)).await {
Ok(Ok(_)) => result.discussions_fetched += 1,
Ok(Err(e)) => {
result.entity_failures.push(EntityFailure {
entity_type: "issue",
iid: issue_info.iid as u64,
stage: "discussions",
code: "FETCH_ERROR".to_string(),
message: e.to_string(),
});
}
Err(_elapsed) => {
result.entity_failures.push(EntityFailure {
entity_type: "issue",
iid: issue_info.iid as u64,
stage: "discussions",
code: "TIMEOUT".to_string(),
message: format!("Timed out after {}s", entity_timeout.as_secs()),
});
}
}
}
}
// Stage 2b: Resource events for issues (inline, no queue)
if !options.no_events && config.sync.fetch_resource_events {
for issue_info in &ingest_result.issue_disc_sync {
match tokio::time::timeout(entity_timeout, sync_issue_resource_events_direct(
&conn, &client, config, gitlab_project_id, project_id, issue_info,
)).await {
Ok(Ok(count)) => result.resource_events_fetched += count,
Ok(Err(e)) => {
result.entity_failures.push(EntityFailure {
entity_type: "issue",
iid: issue_info.iid as u64,
stage: "resource_events",
code: "FETCH_ERROR".to_string(),
message: e.to_string(),
});
}
Err(_elapsed) => {
result.entity_failures.push(EntityFailure {
entity_type: "issue",
iid: issue_info.iid as u64,
stage: "resource_events",
code: "TIMEOUT".to_string(),
message: format!("Timed out after {}s", entity_timeout.as_secs()),
});
}
}
}
}
// Stage 2c: Discussions for MRs
if !ingest_result.mr_disc_sync.is_empty() {
for mr_info in &ingest_result.mr_disc_sync {
let prefetched = match tokio::time::timeout(entity_timeout, prefetch_mr_discussions(
&client, gitlab_project_id, project_id, mr_info.clone(),
)).await {
Ok(p) => p,
Err(_elapsed) => {
result.entity_failures.push(EntityFailure {
entity_type: "merge_request",
iid: mr_info.iid as u64,
stage: "discussions",
code: "TIMEOUT".to_string(),
message: format!("Timed out after {}s", entity_timeout.as_secs()),
});
continue;
}
};
match write_prefetched_mr_discussions(
&conn, config, project_id, &[prefetched],
) {
Ok(disc_result) => result.discussions_fetched += disc_result.len(),
Err(e) => {
result.entity_failures.push(EntityFailure {
entity_type: "merge_request",
iid: mr_info.iid as u64,
stage: "discussions",
code: "WRITE_ERROR".to_string(),
message: e.to_string(),
});
}
}
}
}
// Stage 2d: Resource events for MRs (inline, no queue)
if !options.no_events && config.sync.fetch_resource_events && !ingest_result.mr_disc_sync.is_empty() {
for mr_info in &ingest_result.mr_disc_sync {
match tokio::time::timeout(entity_timeout, sync_mr_resource_events_direct(
&conn, &client, config, gitlab_project_id, project_id, mr_info,
)).await {
Ok(Ok(count)) => result.resource_events_fetched += count,
Ok(Err(e)) => {
result.entity_failures.push(EntityFailure {
entity_type: "merge_request",
iid: mr_info.iid as u64,
stage: "resource_events",
code: "FETCH_ERROR".to_string(),
message: e.to_string(),
});
}
Err(_elapsed) => {
result.entity_failures.push(EntityFailure {
entity_type: "merge_request",
iid: mr_info.iid as u64,
stage: "resource_events",
code: "TIMEOUT".to_string(),
message: format!("Timed out after {}s", entity_timeout.as_secs()),
});
}
}
}
}
// Stage 2e: MR closes_issues (inline, no queue)
if config.sync.fetch_mr_closes_issues.unwrap_or(true) && !ingest_result.mr_disc_sync.is_empty() {
for mr_info in &ingest_result.mr_disc_sync {
match tokio::time::timeout(entity_timeout, sync_mr_closes_issues_direct(
&conn, &client, config, gitlab_project_id, project_id, mr_info,
)).await {
Ok(Ok(_)) => {},
Ok(Err(e)) => {
result.entity_failures.push(EntityFailure {
entity_type: "merge_request",
iid: mr_info.iid as u64,
stage: "mr_closes_issues",
code: "FETCH_ERROR".to_string(),
message: e.to_string(),
});
}
Err(_elapsed) => {
result.entity_failures.push(EntityFailure {
entity_type: "merge_request",
iid: mr_info.iid as u64,
stage: "mr_closes_issues",
code: "TIMEOUT".to_string(),
message: format!("Timed out after {}s", entity_timeout.as_secs()),
});
}
}
}
}
// Stage 2f: MR diffs (inline, no queue)
if config.sync.fetch_mr_diffs.unwrap_or(true) && !ingest_result.mr_disc_sync.is_empty() {
for mr_info in &ingest_result.mr_disc_sync {
match tokio::time::timeout(entity_timeout, sync_mr_diffs_direct(
&conn, &client, config, gitlab_project_id, project_id, mr_info,
)).await {
Ok(Ok(_)) => {},
Ok(Err(e)) => {
result.entity_failures.push(EntityFailure {
entity_type: "merge_request",
iid: mr_info.iid as u64,
stage: "mr_diffs",
code: "FETCH_ERROR".to_string(),
message: e.to_string(),
});
}
Err(_elapsed) => {
result.entity_failures.push(EntityFailure {
entity_type: "merge_request",
iid: mr_info.iid as u64,
stage: "mr_diffs",
code: "TIMEOUT".to_string(),
message: format!("Timed out after {}s", entity_timeout.as_secs()),
});
}
}
}
}
let dependents_elapsed = stage_start.elapsed();
result.stage_timings_ms.insert("dependents".to_string(), dependents_elapsed.as_millis() as u64);
if signal.is_cancelled() {
recorder.finish_cancelled(&conn)?;
return Ok(result);
}
// ── Stage 3: Docs ── (scoped to dirty sources from THIS surgical run only)
let mut regenerated_document_ids: Vec<i64> = Vec::new();
if !options.no_docs && !all_dirty_source_keys.is_empty() {
recorder.set_phase(&conn, "docs")?;
let stage_start = Instant::now();
let spinner = stage_spinner_v2(Icons::sync(), "Docs", "generating...", options.robot_mode);
// Process ONLY the dirty sources touched by this surgical run.
// Uses run_generate_docs_for_sources which filters by (source_type, source_id) PK
// and deletes only those specific dirty_sources rows after processing.
// Returns regenerated_document_ids for scoped embedding.
let docs_result = run_generate_docs_for_sources(config, &all_dirty_source_keys)?;
result.documents_regenerated = docs_result.regenerated;
result.documents_errored = docs_result.errored;
regenerated_document_ids = docs_result.regenerated_document_ids;
// Update run ledger
recorder.set_counters(&conn, &SurgicalCounters {
issues_fetched: preflight.issues.len(),
mrs_fetched: preflight.merge_requests.len(),
issues_ingested: result.issues_updated,
mrs_ingested: result.mrs_updated,
skipped_stale: result.skipped_stale,
docs_regenerated: result.documents_regenerated,
..Default::default()
})?;
let docs_elapsed = stage_start.elapsed();
result.stage_timings_ms.insert("docs".to_string(), docs_elapsed.as_millis() as u64);
let docs_summary = format!("{} documents generated", result.documents_regenerated);
let docs_icon = color_icon(
if docs_result.errored > 0 { Icons::warning() } else { Icons::success() },
docs_result.errored > 0,
);
emit_stage_line(&spinner, &docs_icon, "Docs", &docs_summary, docs_elapsed);
}
// ── Release sync lock before embed (embed is idempotent, doesn't need sync lock) ──
drop(lock);
// ── Stage 4: Embed ──
// SCOPED: only embeds documents regenerated by THIS surgical run (via document_ids).
// Uses run_embed_for_document_ids — NOT global run_embed — to ensure isolation
// even after lock release and under concurrent normal sync activity.
if !options.no_embed && !options.no_docs && !regenerated_document_ids.is_empty() {
recorder.set_phase(&conn, "embed")?;
let stage_start = Instant::now();
let spinner = stage_spinner_v2(Icons::sync(), "Embed", "preparing...", options.robot_mode);
match run_embed_for_document_ids(config, ®enerated_document_ids, signal).await {
Ok(embed_result) => {
result.documents_embedded = embed_result.docs_embedded;
result.embedding_failed = embed_result.failed;
// Update run ledger
recorder.set_counters(&conn, &SurgicalCounters {
issues_fetched: preflight.issues.len(),
mrs_fetched: preflight.merge_requests.len(),
issues_ingested: result.issues_updated,
mrs_ingested: result.mrs_updated,
skipped_stale: result.skipped_stale,
docs_regenerated: result.documents_regenerated,
docs_embedded: result.documents_embedded,
})?;
let embed_elapsed = stage_start.elapsed();
result.stage_timings_ms.insert("embed".to_string(), embed_elapsed.as_millis() as u64);
let embed_summary = format!("{} chunks embedded", embed_result.chunks_embedded);
let embed_icon = color_icon(
if embed_result.failed > 0 { Icons::warning() } else { Icons::success() },
embed_result.failed > 0,
);
emit_stage_line(&spinner, &embed_icon, "Embed", &embed_summary, embed_elapsed);
}
Err(e) => {
let embed_elapsed = stage_start.elapsed();
result.stage_timings_ms.insert("embed".to_string(), embed_elapsed.as_millis() as u64);
let warn_icon = color_icon(Icons::warning(), true);
emit_stage_line(&spinner, &warn_icon, "Embed", &format!("skipped ({e})"), embed_elapsed);
warn!(error = %e, "Embedding stage failed, continuing");
}
}
}
// ── Mark run complete ──
let warnings_count = result.entity_failures.len();
if warnings_count > 0 {
recorder.finish_succeeded_with_warnings(&conn, warnings_count)?;
} else {
recorder.finish_succeeded(&conn)?;
}
result.requested_count = total_items;
result.fetched_count = preflight.issues.len() + preflight.merge_requests.len();
result.processed_count = result.issues_updated + result.mrs_updated;
Ok(result)
}
.instrument(span)
.await
}
Step 9a: Implement run_generate_docs_for_sources and run_embed_for_document_ids
File: src/documents/mod.rs (or wherever run_generate_docs lives)
run_generate_docs_for_sources is a ~20-line variant of the existing run_generate_docs that filters by (source_type, source_id) primary key, processes only those docs, then deletes only those specific dirty_sources rows. It additionally returns the IDs of regenerated documents for scoped embedding:
/// Scoped doc regeneration: process ONLY the specified dirty source keys.
/// Used by surgical sync to avoid draining the global dirty queue.
/// Each key is `(source_type, source_id)` matching `dirty_sources` PK.
/// Returns regenerated document IDs for use with scoped embedding.
pub fn run_generate_docs_for_sources(
config: &Config,
source_keys: &[(String, i64)],
) -> Result<GenerateDocsResult> {
// Same as run_generate_docs but with:
// 1. SELECT ... FROM dirty_sources WHERE (source_type, source_id) IN (...) instead of full scan
// 2. DELETE FROM dirty_sources WHERE (source_type, source_id) IN (...) instead of full drain
// 3. Collect document IDs of regenerated rows into regenerated_document_ids
// All other logic (doc template rendering, indexing) is identical.
todo!("implement scoped variant")
}
GenerateDocsResult must be extended with:
pub struct GenerateDocsResult {
pub regenerated: usize,
pub errored: usize,
/// Document IDs of all regenerated documents (for scoped embedding in surgical mode)
pub regenerated_document_ids: Vec<i64>,
}
File: src/embedding/mod.rs (or wherever run_embed lives)
Add a scoped embedding function that processes only specific document IDs:
/// Scoped embedding: embed ONLY the specified document IDs.
/// Used by surgical sync to ensure embed isolation after lock release.
/// Unlike global `run_embed` (which processes all unembedded docs),
/// this ensures only documents from the current surgical run are embedded.
pub async fn run_embed_for_document_ids(
config: &Config,
document_ids: &[i64],
signal: &ShutdownSignal,
) -> Result<EmbedResult> {
// Same as run_embed but with:
// 1. SELECT ... FROM documents WHERE id IN (...) AND embedding IS NULL
// instead of SELECT ... FROM documents WHERE embedding IS NULL
// 2. All other logic (chunking, batching, Ollama calls) is identical
todo!("implement scoped variant")
}
Step 9b: Implement inline dependent helpers
File: src/ingestion/surgical.rs (or a new src/ingestion/surgical_dependents.rs)
These functions fetch and write dependent data inline for a single entity, bypassing pending_dependent_fetches entirely. They are thin wrappers around the existing fetch + write logic from orchestrator.rs, extracted to operate on a single entity:
/// Fetch and write resource events for a single issue, inline (no queue).
pub(crate) async fn sync_issue_resource_events_direct(
conn: &Connection,
client: &GitLabClient,
config: &Config,
gitlab_project_id: i64,
project_id: i64,
issue_info: &IssueForDiscussionSync,
) -> Result<usize> {
// Fetch resource events for this issue from GitLab API
// Write to resource_state_events / resource_label_events / resource_milestone_events
// Return count of events written
todo!("extract from orchestrator drain logic")
}
/// Fetch and write resource events for a single MR, inline (no queue).
pub(crate) async fn sync_mr_resource_events_direct(
conn: &Connection,
client: &GitLabClient,
config: &Config,
gitlab_project_id: i64,
project_id: i64,
mr_info: &MrForDiscussionSync,
) -> Result<usize> {
todo!("extract from orchestrator drain logic")
}
/// Fetch and write closes_issues data for a single MR, inline (no queue).
pub(crate) async fn sync_mr_closes_issues_direct(
conn: &Connection,
client: &GitLabClient,
config: &Config,
gitlab_project_id: i64,
project_id: i64,
mr_info: &MrForDiscussionSync,
) -> Result<()> {
todo!("extract from orchestrator drain logic")
}
/// Fetch and write diff data for a single MR, inline (no queue).
pub(crate) async fn sync_mr_diffs_direct(
conn: &Connection,
client: &GitLabClient,
config: &Config,
gitlab_project_id: i64,
project_id: i64,
mr_info: &MrForDiscussionSync,
) -> Result<()> {
todo!("extract from orchestrator drain logic")
}
These are extracted from the existing drain loop bodies in orchestrator.rs. The drain functions iterate over queued jobs and call per-entity logic — these helpers are that per-entity logic, made callable directly.
Step 9d: Add LoreError::SurgicalPreflightFailed variant
File: src/core/error.rs
Add a typed error variant for surgical preflight failures. This preserves machine semantics for robot mode and enables structured exit codes:
/// Surgical sync preflight failed — one or more IIDs could not be fetched.
/// Contains structured per-entity failure details for robot mode output.
SurgicalPreflightFailed {
run_id: String,
total: usize,
failures: Vec<crate::ingestion::surgical::EntityFailure>,
},
Map to exit code 6 (resource not found) or a new dedicated exit code. The Display impl should produce a human-readable summary:
Self::SurgicalPreflightFailed { run_id, total, failures } => {
write!(f, "Surgical preflight failed for {} of {} IIDs (run {}): {}",
failures.len(), total, run_id,
failures.iter().map(|f| format!("{} #{}: {}", f.entity_type, f.iid, f.code)).collect::<Vec<_>>().join(", "))
}
In robot mode, this serializes to structured JSON with per-entity details and actionable recovery commands, rather than a generic Other string.
Step 10: Add branch in run_sync for surgical mode
File: src/cli/commands/sync.rs, function run_sync (line 68)
CRITICAL: dry_run must be checked BEFORE surgical to prevent accidental writes:
// Handle dry_run mode - must check BEFORE surgical to prevent writes
if options.dry_run {
if options.is_surgical() {
// Surgical dry-run is handled inside run_sync_surgical (returns before any writes/locks)
return run_sync_surgical(config, &options, run_id, signal).await;
}
return run_sync_dry_run(config, &options).await;
}
// Handle preflight-only mode
if options.preflight_only {
return run_sync_surgical(config, &options, run_id, signal).await;
}
// Handle surgical mode (specific IIDs)
if options.is_surgical() {
return run_sync_surgical(config, &options, run_id, signal).await;
}
Note: run_sync_surgical checks options.dry_run and options.preflight_only internally and returns early before ANY side effects. This avoids needing separate functions for each mode.
Step 11: Make GitLabClient constructible from config
Check if GitLabClient::from_config already exists. If not, the surgical path needs to construct the client. Currently run_ingest constructs it via:
let token = config.resolve_token()?;
let client = GitLabClient::new(&config.gitlab.base_url, &token, config.sync.rate_limit_rps);
The surgical function uses the same 2-line inline construction (shown in Step 9).
Step 12: ProcessMrResult visibility
process_single_mr returns ProcessMrResult which is currently private. Make it pub(crate):
File: src/ingestion/merge_requests.rs, line 138
pub(crate) struct ProcessMrResult {
Step 13: Make orchestrator per-entity logic extractable
The surgical inline dependent helpers (Step 9b) need the per-entity fetch+write logic currently embedded in orchestrator drain loops. There are two approaches:
Option A (preferred): Extract the per-entity body of each drain loop into a standalone function. The drain function then calls the extracted function in a loop, and the surgical helpers call it directly. This avoids code duplication.
Option B: If the drain loop body is tightly coupled to queue state (locked_at, attempts, etc.), write standalone versions for surgical that call the same underlying GitLab API functions and DB write functions, skipping queue management.
Functions to make accessible or extract:
- Resource events fetch + write for a single entity (issue or MR)
- MR closes_issues fetch + write for a single MR
- MR diffs fetch + write for a single MR
The existing orchestrator functions should remain unchanged for normal sync — only surgical mode uses the extracted per-entity versions.
Step 14: Update robot-docs manifest
The robot-docs command outputs a manifest of all commands and flags. If it's auto-generated from clap derive, the new --issue, --mr, -p, --preflight-only flags will appear automatically. If it's hardcoded, update the sync command entry.
File: Check src/cli/robot.rs or wherever robot-docs content is defined. The new flags should appear in the sync command's schema. Document the --preflight-only flag's behavior and its distinction from --dry-run.
Step 15: Update SyncResult for robot mode structured output
File: src/cli/commands/sync.rs (wherever SyncResult is defined)
Add fields for surgical-specific structured output:
pub struct SyncResult {
// ... existing fields ...
// NEW: Surgical sync metadata for robot mode
/// Per-entity failures (empty on success)
pub entity_failures: Vec<EntityFailure>,
/// Per-entity outcome map for deterministic retry and richer UI messaging
pub entity_outcomes: Vec<EntityOutcome>,
/// Number of entities requested
pub requested_count: usize,
/// Number of entities successfully fetched in preflight
pub fetched_count: usize,
/// Number of entities successfully processed (ingested + dependents)
pub processed_count: usize,
/// Entities skipped because local row was newer than preflight payload (TOCTOU protection)
pub skipped_stale: usize,
/// Per-entity IIDs that were skipped as stale
pub skipped_stale_iids: Vec<(String, u64)>,
/// Per-stage elapsed ms for deterministic performance tracking
pub stage_timings_ms: std::collections::BTreeMap<String, u64>,
/// Suggested recovery commands when failures occur (robot ergonomics)
pub recovery_actions: Vec<String>,
/// Overall completion status: succeeded | succeeded_with_warnings | failed | cancelled
pub completion_status: String,
}
/// Per-entity outcome for robot mode structured output.
/// Enables deterministic per-IID retry and richer UI messaging.
#[derive(Debug, Clone, serde::Serialize)]
pub struct EntityOutcome {
pub entity_type: String, // "issue" or "merge_request"
pub iid: u64,
pub fetched: bool,
pub ingested: bool,
pub stale_skipped: bool,
pub dependent_failures: Vec<EntityFailure>,
}
Robot mode JSON output includes these fields when in surgical mode, enabling agents to:
- Programmatically detect partial failures via
entity_failures - Get complete per-entity lifecycle via
entity_outcomesfor deterministic retry - Track performance regressions via
stage_timings_ms - Auto-recover via
recovery_actions(e.g.,["lore sync --issue 888 -p myproject"]for retry) - Detect TOCTOU skips via
skipped_staleandskipped_stale_iids - Distinguish partial success via
completion_status
Files Modified Summary
| File | Change | Lines |
|---|---|---|
src/cli/mod.rs |
Add issue: Vec<u64>, mr: Vec<u64>, project: Option<String>, preflight_only: bool to SyncArgs |
~790 |
src/cli/commands/sync.rs |
Add fields to SyncOptions, is_surgical(), MAX_SURGICAL_TARGETS, dry-run/surgical/preflight-only branch ordering, run_sync_surgical fn, SyncResult fields, EntityOutcome struct |
~20-30, new ~350 lines |
src/main.rs |
Wire new fields in handle_sync_cmd, add validation + defaultProject fallback + dedup + hard cap + preflight_only + no-docs/no-embed guard validation |
~2052-2060 |
src/gitlab/client.rs |
Add get_issue_by_iid(), get_mr_by_iid() (~10 lines each, u64 iid param), add tests (~60 lines) |
~330, ~770 |
src/ingestion/surgical.rs |
New file: EntityFailure, DirtySourceKey, PreflightResult (with failures vec, payload integrity checks), preflight_fetch (aggregate-failures, per-entity timeout, project_id validation), classify_error_code, TOCTOU guards (is_local_newer_* with last_seen_at tie-breaking), dirty source key collection, inline dependent helpers, payload processors, ingest_preflight_results (~450 lines) |
new |
src/ingestion/surgical_tests.rs |
New file: unit tests with test_config() helper, scoping invariant tests (including same-entity), TOCTOU tests (including equal-timestamp), rollback tests, preflight aggregation test, cancellation test, timeout test, scoped embed isolation test, payload integrity test (~450 lines) |
new |
src/ingestion/mod.rs |
Add pub mod surgical; |
line 9 |
src/ingestion/issues.rs |
fn process_single_issue → pub(crate) fn |
line 143 |
src/ingestion/merge_requests.rs |
fn process_single_mr → pub(crate) fn, struct ProcessMrResult → pub(crate) |
lines 138, 144 |
src/ingestion/orchestrator.rs |
Extract per-entity fetch+write logic into standalone functions callable by surgical inline helpers | TBD |
src/documents/mod.rs |
Add run_generate_docs_for_sources scoped variant (filters by PK, returns regenerated_document_ids), extend GenerateDocsResult |
new fn ~30 lines |
src/embedding/mod.rs |
Add run_embed_for_document_ids scoped variant (embeds only specified document IDs) |
new fn ~30 lines |
src/core/db.rs |
New migration: extend sync_runs with mode, phase, surgical_iids_json, surgical counters, warnings_count, cancelled_at, indexes |
new migration |
src/core/sync.rs |
Extend SyncRunRecorder with start_surgical, set_phase, set_counters, finish_*, heartbeat_if_due, add SurgicalCounters struct |
~80 lines |
src/core/error.rs |
Add SurgicalPreflightFailed variant with structured fields |
~15 lines |
Acceptance Criteria
For agents to verify (automated):
- Compilation:
cargo check --all-targetspasses with zero errors - Clippy:
cargo clippy --all-targets -- -D warningspasses (pedantic + nursery enabled) - Format:
cargo fmt --checkpasses - Tests:
cargo testpasses, including all new tests - New tests exist:
get_issue_by_iid_returns_issue— wiremock test insrc/gitlab/client.rsget_issue_by_iid_returns_not_found— 404 error handlingget_mr_by_iid_returns_mr— wiremock testget_mr_by_iid_returns_not_foundingest_issue_by_iid_inserts_and_marks_dirty— in-memory DB testingest_issue_by_iid_resets_discussion_watermarkingest_issue_by_iid_resets_event_watermarkingest_mr_by_iid_inserts_and_marks_dirtyingest_mr_by_iid_resets_discussion_watermarkingest_mr_by_iid_resets_event_watermarkduplicate_iids_are_idempotent— dedup/upsert behaviorsurgical_docs_scope_ignores_preexisting_dirty_rows— scoping invariantsurgical_docs_scope_ignores_preexisting_dirty_rows_for_same_entity— same-entity scoping edge casepreflight_aggregates_multiple_missing_iids— aggregate-failures behaviorsync_run_is_persisted_and_updated— durable run ledger (extends existing table)stale_payload_is_skipped_when_local_updated_at_is_newer— TOCTOU protectionequal_updated_at_but_newer_last_seen_is_skipped— TOCTOU equal-timestamp tie-breakingpreflight_success_then_ingest_failure_rolls_back_all_content_writes— transactional rollbacksurgical_no_docs_requires_no_embed_validation— embed leakage preventioncancellation_marks_sync_run_cancelled— cancellation durabilitydependent_timeout_records_entity_failure_and_continues— timeout handlingscoped_embed_does_not_embed_unrelated_docs_created_after_docs_stage— embed isolation under concurrencypayload_project_id_mismatch_is_rejected_in_preflight— payload integrity
For me to evaluate (functional):
- Basic surgical sync works:
cargo run --release -- sync --issue <real_iid> -p <real_project>fetches the issue, syncs its discussions and events, regenerates docs, and embeds - Multiple IIDs work:
cargo run --release -- sync --issue 1 --issue 2 --mr 3 -p <project> - Robot mode works:
cargo run --release -- --robot sync --issue <iid> -p <project>returns{"ok":true,"data":{...},"meta":{...}}withstage_timings_msandentity_outcomespopulated - Error: missing project:
cargo run --release -- sync --issue 1fails with clear error about-pordefaultProjectbeing required - Error: nonexistent IID:
cargo run --release -- sync --issue 999999 -p <project>returns a clear "not found" error with the IID in the message, and zero DB mutations - Error: mixed valid+invalid IIDs:
cargo run --release -- sync --issue <valid> --issue 999999 -p <project>reports BOTH results (1 success, 1 failure) in structured output, with zero DB mutations - No status enrichment: Surgical sync does NOT call GraphQL status enrichment
- --no-docs/--no-embed respected:
cargo run --release -- sync --issue <iid> -p <project> --no-docs --no-embedskips those stages - Dirty queue scoping: After surgical sync of 1 issue, only that issue's documents are regenerated (not all dirty docs from previous syncs — verify by checking the count in docs stage output)
- Watermark reset: After surgical sync, the issue's discussions and events are re-fetched even if they were previously synced (verify via the discussions_fetched count > 0)
- Normal sync unaffected:
cargo run --release -- sync(without --issue/--mr) still works exactly as before lore robot-docs: The new--issue,--mr,-p,--preflight-onlyflags appear in the sync command schema- defaultProject fallback:
cargo run --release -- sync --issue <iid>works when config hasdefaultProjectset - Sync lock: Surgical sync acquires lock — running two surgical syncs concurrently produces a lock error, not corruption
- --full incompatible:
cargo run --release -- sync --full --issue 1 -p projectfails with clear error - MR dependent stages: Surgical MR sync fetches closes_issues and MR file diffs when enabled by config
- Parse-time validation:
--issue 0and--issue -1rejected by clap with clear error before any code runs - Hard cap enforcement:
--issue 1 --issue 2 ... --issue 101(>100 combined targets) rejected with clear error - Preflight-only mode:
cargo run --release -- sync --preflight-only --issue <iid> -p <project>validates the IID exists on GitLab and returns success withfetched_countpopulated, but no content DB mutations (only control-plane run-ledger entries) - Preflight-only requires surgical:
cargo run --release -- sync --preflight-only(no --issue/--mr) fails with clear error - Durable run ledger: After surgical sync,
SELECT * FROM sync_runs WHERE mode = 'surgical' ORDER BY id DESC LIMIT 1shows the correct phase (doneon success,failedon error,cancelledon Ctrl+C) with counters populated - Stage timings in robot output: Robot mode JSON includes
stage_timings_mswith entries for each completed stage - Typed preflight error: Preflight failures serialize as structured
SurgicalPreflightFailederrors with per-entity details and machine-usable codes, not genericOtherstrings - Dependent stage failures are best-effort: A failing discussion fetch for one entity does not prevent other entities' dependents from running. Failures are recorded in
entity_failures.
Edge cases to verify:
- Duplicate IIDs:
--issue 123 --issue 123should work without error (deduplicated, idempotent) - Mixed existing + new: Syncing an IID that's already in the DB should update it
- Closed issues: Surgical sync works for closed issues (state = "closed")
- Signal handling: Ctrl+C during surgical sync records
status='cancelled',phase='cancelled', andfinished_atinsync_runs(no danglingrunningrows) - Dry-run zero-write assertion:
--dry-run --issue 123 -p projectproduces zero side effects — no lock acquired, no DB connection opened, no network calls. DB file is byte-identical before and after. - --no-docs without --no-embed rejected:
--issue 1 -p project --no-docswithout--no-embedis rejected with clear error - Lock released before embed: A normal
lore synccan acquire the sync lock while a surgical sync is in its embed phase.
Correctness under contention and rollback:
- No partial writes on missing IID: A mixed set of valid + invalid IIDs (e.g.,
--issue 123 --issue 999999) causes zero DB mutations — the preflight phase catches the 404 before any writes. The structured error report lists ALL failures, not just the first. - Scoped docs: Only documents tied to this surgical run's dirty source keys are regenerated. Other pending dirty_sources are untouched.
- Lock contention test: A second concurrent surgical sync fails fast with a lock error and produces no writes.
- Lock not held during preflight: Verify that a normal
lore synccan acquire the lock while a surgical sync is in its preflight (network) phase. - TOCTOU safety: If a normal sync updates entity after preflight but before ingest, surgical run skips stale payload (no overwrite). Stale skips are reported in
skipped_stalecount. - TOCTOU equal-timestamp safety: If a concurrent sync fetches the same entity with identical
updated_atbut after our preflight started, surgical run detects the fresherlast_seen_atand skips. - No embed leakage: Surgical embed uses
run_embed_for_document_idswith explicit document IDs from the docs stage — never globalrun_embed. Even if another sync creates unembedded docs between lock release and embed, they are not processed. - No queue artifacts: Surgical sync creates zero rows in
pending_dependent_fetches. Verify table is untouched after a surgical run.
Automated scoping invariants (covered by tests in 1d and 1f):
- Scoped docs invariants are enforced by automated tests, not manual-only verification. Tests
surgical_docs_scope_ignores_preexisting_dirty_rows,surgical_docs_scope_ignores_preexisting_dirty_rows_for_same_entity, andpreflight_aggregates_multiple_missing_iidsprevent regressions. - Rollback and race invariants are enforced by automated tests: no partial writes on ingest failure, no stale overwrite (strict and equal-timestamp variants).
- Cancellation durability: Ctrl+C during surgical sync records
status='cancelled',phase='cancelled', andfinished_atinsync_runs. Verified bycancellation_marks_sync_run_cancelledtest. - Payload integrity: Payloads with unexpected
project_idare rejected in preflight and produce zero content writes. Verified bypayload_project_id_mismatch_is_rejected_in_preflighttest. - Scoped embed isolation under concurrency: Verified by
scoped_embed_does_not_embed_unrelated_docs_created_after_docs_stagetest. - Timeout path: Verified by
dependent_timeout_records_entity_failure_and_continuestest (TIMEOUT code + continued processing).
Rejected Recommendations
- SyncMode enum replacing flat fields in SyncOptions — rejected because it's overengineered for a boolean distinction (surgical vs standard). The
is_surgical()helper is simpler, and the enum would require refactoring all existing SyncOptions construction sites. The flat fields approach is how the rest of the codebase works (seefull,force,dry_run). value_delimiter = ','on --issue/--mr — rejected because--issue 1,2,3is non-idiomatic for this CLI (all other repeatable flags use--flag val --flag valpattern) and commas in shell can interact poorly with quoting.- Chunked
list_issues_by_iids/list_mrs_by_iidsbatch fetch viaiids[]query param — rejected because surgical sync targets 1-5 IIDs typically (agent refreshing active work). Individual GET requests give precise per-IID error reporting (404 for missing IID) and add zero complexity. Batch optimization is premature for this use case. --strictflag for fail-on-missing-IID behavior — rejected for v1 because the default behavior (fail on 404) is already strict. A--lenientflag to skip missing IIDs would be the future extension if needed, not the other way around. Adding --strict now adds flag surface area for a mode that's already the default.- Separate
run_sync_surgical_dry_runfunction — rejected because the dry-run check is 5 lines insiderun_sync_surgical(log + return early). A separate function would duplicate all the setup code (db connection, project resolution) just to print a message. SurgicalPlannormalized object — rejected because validation/dedup/project fallback is ~15 lines in one location (handle_sync_cmd). Introducing a dedicated struct with a builder method adds a new type, a new file, and indirection for code that runs exactly once. The flat validation approach is how all other SyncOptions constraints are checked in this codebase.--with-relatedflag for auto-expanding related entities — rejected for v1 as scope creep. Surgical sync's purpose is agent-driven refresh of known IIDs. Auto-expansion introduces unbounded work (related issues have related MRs have related issues...), requires expansion caps, and complicates the preflight-then-commit contract. This is a good v2 feature tracked separately.- Bounded concurrency (
buffer_unordered) for IID fetches — rejected for v1 because surgical sync targets 1-5 IIDs (agent refreshing active work). Sequential fetch of 5 items takes <2s. Addingfutures::stream,buffer_unordered, config knobs (surgical_max_in_flight), and post-fetch sorting for determinism is premature complexity. If usage patterns show >10 IIDs becoming common, add concurrency then. origin_run_idcolumn ondirty_sourcestable — rejected because it modifies shared schema (dirty_sources) for a surgical-only concern. The source-key-based scoping approach is self-contained and requires zero schema changes todirty_sources. Adding a column would require updating all code paths that insert dirty rows.- Accept
&Transactioninstead of&Connectionin surgical ingest functions — rejected because the existing codebase uniformly uses&Connection(rusqlite'sTransactionderefs toConnection).process_single_issueand all existing ingestion functions take&Connection. Changing surgical functions to&Transactionwould create an inconsistency and require refactoring callers. Theunchecked_transaction+&Connectionpattern is used elsewhere (seeenrich_issue_statuses_txn). - Transient retry policy (5xx/timeout with jittered backoff) — rejected for surgical sync scope because the existing
request()method already handles 429 retries. Adding 5xx retry applies to ALL GitLabClient methods, not just surgical. It should be a separate enhancement to the client layer, not coupled to surgical sync. sync_run_entitiesper-entity lifecycle table — rejected for v1 because it adds significant schema complexity (new table, FK, index, per-entity per-stage row inserts) for observability that can be achieved with simpler means:SyncResultalready carriesskipped_stale_iidsandentity_failuresfor robot output, andsync_runshas aggregate counters. If retry-by-entity becomes a real need, this table is a clean v2 addition.--retry-failedflag onsync-runscommand — rejected as scope creep for v1. Deterministic retry can be built on top ofsync_runsdata later. For now, agents can simply re-runlore sync --issue <failed-iid>based on the structured error output.--issue-url/--mr-urlURL-native surgical targets — rejected for v1 because agents already have project + IID from their context (lore queries return both). Parsing GitLab URLs introduces URL format fragility (self-hosted instances vary), namespace disambiguation complexity, and additional validation code for marginal ergonomic gain. If copy-paste from browser becomes a common workflow, this is a clean v2 addition.sync-runsread command (lore --robot sync-runs) — rejected for v1 scope. The run ledger is useful for observability but the read path is not required for the surgical sync feature itself. Agents can querysync_runsdirectly viasqlite3or via a futurelore --robot sync-runscommand. Tracked as a v2 enhancement.- Scoped drain helpers with
scope_run_idcolumn onpending_dependent_fetches— rejected because the inline execution approach (constraint 4) is strictly simpler. Addingscope_run_id+aborted_reasoncolumns, a new covering index, scoped drain variants, and orphan cleanup logic topending_dependent_fetchesis significant schema and code churn for a table with an existing UNIQUE constraint and delete-on-complete semantics. The inline approach avoids all of this and eliminates an entire class of failure modes (orphaned jobs, queue scoping bugs). failed_run_aborts_pending_scoped_jobstest — rejected because surgical mode no longer enqueues jobs to any shared queue. The inline execution model eliminates the need for queue failure hygiene tests. Dependent stage failures are tracked per-entity inentity_failures.- Separate dependent fetch/write split with lock release between phases — rejected because it significantly increases complexity for marginal contention reduction. Surgical sync targets 1-5 entities; dependent stage lock hold time is seconds. Splitting fetch (unlocked) and write (short locked transactions) per dependent type would require managing intermediate state, coordinating multiple lock acquire/release cycles, and applying per-entity freshness guards at the dependent level. The inline approach is simpler and adequate at our scale.
- Global stage timeout budget (
surgical_dependents_budget_seconds) — rejected for v1 because per-entity timeout (constraint 16) provides sufficient bounding. A global budget across all entities adds complexity (tracking cumulative time, deciding which entities to skip when budget exhausted) for minimal incremental safety. If surgical sync scales to 50+ entities, this becomes worthwhile.