Files
gitlore/docs/plan-surgical-sync.md
teernisse a1bca10408 feat(cli): implement 'lore file-history' command (bd-z94)
Adds file-history command showing which MRs touched a file, with:
- Rename chain resolution via BFS (resolve_rename_chain from bd-1yx)
- DiffNote discussion snippets with --discussions flag
- --merged filter, --no-follow-renames, -n limit
- Human output with styled MR list and rename chain display
- Robot JSON output with {ok, data, meta} envelope
- Autocorrect registry and robot-docs manifest entry
- Fixes pre-existing --no-status missing from sync autocorrect registry
2026-02-17 12:57:56 -05:00

2241 lines
108 KiB
Markdown

---
plan: true
title: ""
status: iterating
iteration: 6
target_iterations: 8
beads_revision: 0
related_plans: []
created: 2026-02-16
updated: 2026-02-17
---
# Surgical Per-IID Sync
## Context
Agents working on active issues/MRs need to refresh data for specific entities without a full sync. Currently `lore sync` always paginates through ALL issues/MRs from the cursor forward. This adds `--issue` and `--mr` flags to `sync` that accept lists of IIDs, fetching only those entities from GitLab and running the full pipeline (ingest → discussions → events → generate-docs → embed) scoped to just those items. Status enrichment is skipped in surgical mode.
### Design Constraints
1. **Sync locking**: The surgical path acquires `AppLock("sync")` but ONLY for mutation phases. The preflight (network-only) phase runs WITHOUT the lock to minimize contention with concurrent normal syncs.
2. **Dirty queue scoping**: `dirty_sources` is keyed by `(source_type, source_id)` with UPSERT semantics (no autoincrement `id` column). Surgical scoping MUST use explicit touched source keys collected during ingest — each `ingest_*_by_iid_from_payload` returns the `(source_type, source_id)` pairs it touched. Surgical docs MUST call a scoped API (`run_generate_docs_for_sources`) filtering by these exact keys, and MUST NOT drain the global dirty queue.
3. **Embed scoping**: Embedding MUST be explicitly scoped to documents regenerated by this surgical run. `run_generate_docs_for_sources` returns regenerated `document_ids` via `GenerateDocsResult.regenerated_document_ids: Vec<i64>`; surgical mode calls `run_embed_for_document_ids(document_ids)` and never global `run_embed`. This remains correct even after lock release and under concurrent normal sync activity. **Guard condition**: embed stage MUST NOT run when `--no-docs` is used in surgical mode, as it would embed unrelated backlog docs. Validation: `--no-docs` without `--no-embed` is rejected in surgical mode.
4. **Surgical dependent execution**: Surgical mode MUST bypass `pending_dependent_fetches`. Dependents (resource_events, mr_closes_issues, mr_diffs) run inline for targeted entities only via direct per-entity helper functions. The global `pending_dependent_fetches` queue remains exclusively for normal sync. This eliminates queue-scoping complexity, orphaned job cleanup, and schema migration to the shared queue table.
5. **MR dependent stages**: Normal MR ingest runs closes_issues and diffs stages. The surgical path must also run these for MR entities.
6. **Primary-entity atomicity**: All requested issue/MR payload fetches complete before the first content write. If any primary IID fetch fails (404, network error), primary ingest does zero content writes. Preflight **aggregates all failures** (does not fail-fast on the first error) so agents get a complete error report in one pass. Dependent stages (discussions, resource events, MR closes_issues, MR diffs) are post-ingest and best-effort — individual dependent stage failures are recorded per-entity but do not roll back the primary ingest.
7. **Control-plane exception**: `sync_runs` writes are allowed during preflight for observability and crash diagnostics. These are control-plane rows, not content data, and do not affect query results.
8. **Dry-run is zero-write**: The dry-run check MUST precede lock acquisition and DB connection. Dry-run produces zero side effects — no lock acquired, no DB connection opened, no network calls.
9. **defaultProject fallback**: When `-p` is omitted, fall back to `config.default_project` before erroring.
10. **Durable run state**: Surgical sync MUST use `SyncRunRecorder` end-to-end (no ad-hoc SQL updates to `sync_runs`). Extend `SyncRunRecorder` with APIs for surgical mode: `start_surgical(...)`, `set_phase(&str)`, `set_counters(SurgicalCounters)`, `finish_succeeded()`, `finish_succeeded_with_warnings(warnings_count)`, `finish_failed(error)`, `finish_cancelled()`, and periodic `heartbeat_if_due()`. Phase transitions (`preflight`, `ingest`, `dependents`, `docs`, `embed`, `done`, `failed`, `cancelled`) enable crash recovery and observability.
11. **Lock window minimization**: Preflight fetch runs WITHOUT the sync lock. Lock is acquired immediately before the first DB mutation (Stage 1) and held through ingest, dependents, and docs stages. Lock is RELEASED before the embed stage. Embed is naturally idempotent (processes only unembedded docs) and does not require the sync lock.
12. **Preflight-only mode**: `--preflight-only` performs zero content writes; control-plane run-ledger writes are allowed. Distinct from `--dry-run` (which is zero-network). Allows agents to verify IIDs are valid before committing to a full surgical run.
13. **Stale-write protection (TOCTOU)**: Because preflight runs WITHOUT the sync lock, a concurrent normal sync may update the same entity between preflight fetch and surgical ingest. Surgical ingest MUST NOT overwrite fresher local rows. Skip stale when: (a) `local.updated_at > payload.updated_at`, OR (b) `local.updated_at == payload.updated_at AND local.last_seen_at > preflight_started_at_ms`. This prevents equal-timestamp regressions under concurrent sync — the `last_seen_at` column acts as a monotonic tie-breaker when `updated_at` is identical.
14. **Surgical failure hygiene**: Surgical mode leaves no queue artifacts because it does not enqueue jobs into `pending_dependent_fetches`. Dependent stages execute inline and report failures per-entity in `SurgicalIngestResult.entity_failures`. No orphaned job cleanup is needed.
15. **Cancellation semantics**: If shutdown is observed after run start, the recorder finalizes with `finish_cancelled()`: phase is set to `cancelled`, status is `cancelled`, `finished_at` is written, and the lock is released before return. No silent `running` rows are left behind.
16. **Per-entity timeout**: Each dependent network fetch (discussions, resource events, MR dependents) is wrapped in `tokio::time::timeout` with a configurable per-entity budget. Timed-out entities are recorded in `entity_failures` with code `TIMEOUT` and the run continues best-effort. Config knob: `sync.surgical_entity_timeout_seconds` (default 30).
17. **Payload integrity**: Preflight validates that each returned payload's `project_id` matches the requested `gitlab_project_id`. On mismatch, the entity is recorded as `EntityFailure { code: "PROJECT_MISMATCH", stage: "fetch" }` and excluded from ingest. This catches API proxy misconfigurations that could silently corrupt data.
## CLI Interface
```bash
lore sync --issue 123 --issue 456 -p myproject
lore sync --mr 789 --mr 101 -p myproject
lore sync --issue 123 --mr 789 -p myproject
lore --robot sync --issue 123 -p myproject
# -p is optional if config.defaultProject is set
lore sync --issue 123
# dry-run shows what would be fetched without writes or network calls
lore sync --dry-run --issue 123 -p myproject
# preflight-only validates entities exist on GitLab without any DB writes
lore sync --preflight-only --issue 123 -p myproject
```
---
## Step 1: TDD — Write Failing Tests First
### 1a. Test helper: `test_config()` (`src/ingestion/surgical_tests.rs`)
`Config` has no `Default` impl (fields like `gitlab.base_url` are required). All surgical tests need a minimal config helper:
```rust
fn test_config() -> crate::Config {
serde_json::from_value(serde_json::json!({
"gitlab": {
"baseUrl": "https://gitlab.example.com",
"projects": ["group/project"]
},
"storage": {}
})).unwrap()
}
```
### 1b. Test: `GitLabClient::get_issue_by_iid` (`src/gitlab/client.rs`)
Add to existing `#[cfg(test)] mod tests` at line 766:
```rust
#[tokio::test]
async fn get_issue_by_iid_returns_issue() {
use wiremock::{MockServer, Mock, ResponseTemplate};
use wiremock::matchers::{method, path};
let mock_server = MockServer::start().await;
let issue_json = serde_json::json!({
"id": 42,
"iid": 7,
"project_id": 1,
"title": "Test issue",
"description": "desc",
"state": "opened",
"created_at": "2024-01-15T10:00:00.000Z",
"updated_at": "2024-01-16T10:00:00.000Z",
"web_url": "https://gitlab.example.com/issues/7",
"author": {"id": 1, "username": "testuser", "name": "Test User"},
"assignees": [],
"labels": [],
"milestone": null,
"due_date": null,
"references": {"short": "#7", "full": "group/project#7"}
});
Mock::given(method("GET"))
.and(path("/api/v4/projects/1/issues/7"))
.respond_with(ResponseTemplate::new(200).set_body_json(&issue_json))
.mount(&mock_server)
.await;
let client = GitLabClient::new(&mock_server.uri(), "test-token", None);
let issue = client.get_issue_by_iid(1, 7).await.unwrap();
assert_eq!(issue.iid, 7);
assert_eq!(issue.title, "Test issue");
}
#[tokio::test]
async fn get_issue_by_iid_returns_not_found() {
use wiremock::{MockServer, Mock, ResponseTemplate};
use wiremock::matchers::{method, path};
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/v4/projects/1/issues/999"))
.respond_with(ResponseTemplate::new(404).set_body_json(
serde_json::json!({"message": "404 Not found"})
))
.mount(&mock_server)
.await;
let client = GitLabClient::new(&mock_server.uri(), "test-token", None);
let result = client.get_issue_by_iid(1, 999).await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), LoreError::GitLabNotFound { .. }));
}
```
Same pattern for `get_mr_by_iid` with `GitLabMergeRequest` JSON.
### 1c. Test: Surgical ingest functions (`src/ingestion/surgical_tests.rs`)
Create `src/ingestion/surgical_tests.rs` (referenced from `surgical.rs` with `#[cfg(test)] #[path = "surgical_tests.rs"] mod tests;`):
```rust
use std::path::Path;
use crate::core::db::{create_connection, run_migrations};
use crate::ingestion::surgical::{
ingest_issue_by_iid_from_payload, ingest_mr_by_iid_from_payload,
SurgicalIngestResult,
};
use crate::gitlab::types::{GitLabIssue, GitLabMergeRequest};
fn test_config() -> crate::Config {
serde_json::from_value(serde_json::json!({
"gitlab": {
"baseUrl": "https://gitlab.example.com",
"projects": ["group/project"]
},
"storage": {}
})).unwrap()
}
fn setup_db() -> rusqlite::Connection {
let conn = create_connection(Path::new(":memory:")).unwrap();
run_migrations(&conn).unwrap();
conn.execute(
"INSERT INTO projects (gitlab_project_id, path_with_namespace, web_url)
VALUES (100, 'group/project', 'https://gitlab.example.com/group/project')",
[],
).unwrap();
conn
}
fn make_test_issue(iid: i64) -> GitLabIssue {
serde_json::from_value(serde_json::json!({
"id": 1000 + iid,
"iid": iid,
"project_id": 100,
"title": format!("Issue {iid}"),
"description": "test description",
"state": "opened",
"created_at": "2024-01-15T10:00:00.000Z",
"updated_at": "2024-01-16T10:00:00.000Z",
"web_url": format!("https://gitlab.example.com/issues/{iid}"),
"author": {"id": 1, "username": "testuser", "name": "Test User"},
"assignees": [],
"labels": [],
"milestone": null,
"due_date": null,
"references": {"short": format!("#{iid}"), "full": format!("group/project#{iid}")}
})).unwrap()
}
#[test]
fn ingest_issue_by_iid_inserts_and_marks_dirty() {
let conn = setup_db();
let project_id = 1i64; // auto-assigned by INSERT above
let config = test_config();
let issue = make_test_issue(7);
let result = ingest_issue_by_iid_from_payload(&conn, &config, project_id, &issue, 0).unwrap();
assert_eq!(result.upserted, 1);
// Verify issue exists
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM issues WHERE project_id = ? AND iid = 7",
[project_id], |r| r.get(0),
).unwrap();
assert_eq!(count, 1);
// Verify dirty marker exists
let dirty: i64 = conn.query_row(
"SELECT COUNT(*) FROM dirty_sources WHERE source_type = 'issue'",
[], |r| r.get(0),
).unwrap();
assert!(dirty >= 1);
// Verify dirty_source_keys were collected
assert!(!result.dirty_source_keys.is_empty(),
"Should have collected dirty source keys for scoped doc regeneration");
}
#[test]
fn ingest_issue_by_iid_resets_discussion_watermark() {
let conn = setup_db();
let project_id = 1i64;
let config = test_config();
let issue = make_test_issue(7);
// First insert
ingest_issue_by_iid_from_payload(&conn, &config, project_id, &issue, 0).unwrap();
// Simulate previous discussion sync by setting watermark
conn.execute(
"UPDATE issues SET discussions_synced_for_updated_at = updated_at
WHERE project_id = ? AND iid = 7",
[project_id],
).unwrap();
// Second surgical ingest should reset the watermark
ingest_issue_by_iid_from_payload(&conn, &config, project_id, &issue, 0).unwrap();
let watermark: Option<i64> = conn.query_row(
"SELECT discussions_synced_for_updated_at FROM issues WHERE project_id = ? AND iid = 7",
[project_id], |r| r.get(0),
).unwrap();
assert!(watermark.is_none(), "Surgical ingest should reset discussion watermark to NULL");
}
#[test]
fn ingest_issue_by_iid_resets_event_watermark() {
let conn = setup_db();
let project_id = 1i64;
let config = test_config();
let issue = make_test_issue(7);
ingest_issue_by_iid_from_payload(&conn, &config, project_id, &issue, 0).unwrap();
// Set event watermark
conn.execute(
"UPDATE issues SET resource_events_synced_for_updated_at = updated_at
WHERE project_id = ? AND iid = 7",
[project_id],
).unwrap();
// Surgical re-ingest
ingest_issue_by_iid_from_payload(&conn, &config, project_id, &issue, 0).unwrap();
let watermark: Option<i64> = conn.query_row(
"SELECT resource_events_synced_for_updated_at FROM issues WHERE project_id = ? AND iid = 7",
[project_id], |r| r.get(0),
).unwrap();
assert!(watermark.is_none(), "Surgical ingest should reset event watermark to NULL");
}
#[test]
fn duplicate_iids_are_idempotent() {
let conn = setup_db();
let project_id = 1i64;
let config = test_config();
let issue = make_test_issue(7);
// Ingest same issue twice
ingest_issue_by_iid_from_payload(&conn, &config, project_id, &issue, 0).unwrap();
let result = ingest_issue_by_iid_from_payload(&conn, &config, project_id, &issue, 0).unwrap();
assert_eq!(result.upserted, 1);
// Only one row
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM issues WHERE project_id = ? AND iid = 7",
[project_id], |r| r.get(0),
).unwrap();
assert_eq!(count, 1);
}
```
Same pattern for MR tests (with `make_test_mr` and `ingest_mr_by_iid_from_payload`).
### 1d. Scoping invariant tests (`src/ingestion/surgical_tests.rs`)
These tests enforce correctness of docs scoping — the most critical safety property of surgical sync:
```rust
#[test]
fn surgical_docs_scope_ignores_preexisting_dirty_rows() {
// Setup: insert a dirty_sources row for a DIFFERENT entity (simulating prior failed sync)
// Run surgical ingest for a new entity
// Call run_generate_docs_for_sources with only the surgical run's dirty keys
// Assert: pre-existing dirty row is UNTOUCHED (still in dirty_sources)
// Assert: only the surgical entity's docs were regenerated
let conn = setup_db();
let project_id = 1i64;
let config = test_config();
// Pre-existing dirty row for issue iid=99 (from prior sync)
conn.execute(
"INSERT INTO issues (project_id, gitlab_issue_id, iid, title, state, created_at, updated_at, web_url, last_seen_at)
VALUES (?1, 999, 99, 'Old issue', 'opened', 1000, 1000, 'https://example.com/99', 1000)",
[project_id],
).unwrap();
let old_issue_id: i64 = conn.query_row(
"SELECT id FROM issues WHERE iid = 99", [], |r| r.get(0),
).unwrap();
conn.execute(
"INSERT INTO dirty_sources (source_type, source_id, queued_at) VALUES ('issue', ?1, 1000)",
[old_issue_id],
).unwrap();
// Surgical ingest of issue iid=7
let issue = make_test_issue(7);
let result = ingest_issue_by_iid_from_payload(&conn, &config, project_id, &issue, 0).unwrap();
// The pre-existing dirty row must still exist
let old_dirty_exists: bool = conn.query_row(
"SELECT COUNT(*) > 0 FROM dirty_sources WHERE source_type = 'issue' AND source_id = ?1",
[old_issue_id], |r| r.get(0),
).unwrap();
assert!(old_dirty_exists, "Pre-existing dirty rows must be preserved");
// The surgical result's dirty_source_keys should NOT include the pre-existing row's key
assert!(!result.dirty_source_keys.iter().any(|(st, sid)| st == "issue" && *sid == old_issue_id),
"Surgical dirty_source_keys must not include pre-existing dirty rows");
}
#[test]
fn surgical_docs_scope_ignores_preexisting_dirty_rows_for_same_entity() {
// Edge case: pre-existing dirty row for the SAME entity (iid=7) from a prior failed sync
// Surgical re-ingest of iid=7 should still collect the key (UPSERT updates queued_at)
// but scoped doc regen uses the collected keys, which correctly identifies this entity
let conn = setup_db();
let project_id = 1i64;
let config = test_config();
// First ingest creates dirty row
let issue = make_test_issue(7);
let first_result = ingest_issue_by_iid_from_payload(&conn, &config, project_id, &issue, 0).unwrap();
// Simulate: the dirty row from first ingest was never processed (orphaned)
// Under UPSERT semantics, the (source_type, source_id) key persists
// Second surgical ingest of same entity — should still collect the key
let second_result = ingest_issue_by_iid_from_payload(&conn, &config, project_id, &issue, 0).unwrap();
// Result should contain the key for this entity (UPSERT touched the row)
assert!(!second_result.dirty_source_keys.is_empty(),
"Dirty source keys should be collected even for re-ingested entities");
}
#[tokio::test]
async fn preflight_aggregates_multiple_missing_iids() {
// Setup: mock server returns 404 for iids 888 and 999, 200 for iid 7
// Call preflight_fetch with all three
// Assert: result contains 1 fetched issue AND 2 failures (not fail-fast)
use wiremock::{MockServer, Mock, ResponseTemplate};
use wiremock::matchers::{method, path};
use crate::ingestion::surgical::preflight_fetch;
let mock_server = MockServer::start().await;
// iid 7 succeeds
Mock::given(method("GET"))
.and(path("/api/v4/projects/1/issues/7"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"id": 42, "iid": 7, "project_id": 1, "title": "Good issue",
"description": "", "state": "opened",
"created_at": "2024-01-15T10:00:00.000Z",
"updated_at": "2024-01-16T10:00:00.000Z",
"web_url": "https://gitlab.example.com/issues/7",
"author": {"id": 1, "username": "u", "name": "U"},
"assignees": [], "labels": [], "milestone": null,
"due_date": null, "references": {"short": "#7", "full": "g/p#7"}
})))
.mount(&mock_server).await;
// iid 888 and 999 return 404
for iid in [888, 999] {
Mock::given(method("GET"))
.and(path(format!("/api/v4/projects/1/issues/{iid}")))
.respond_with(ResponseTemplate::new(404).set_body_json(
serde_json::json!({"message": "404 Not found"})
))
.mount(&mock_server).await;
}
let client = crate::gitlab::GitLabClient::new(&mock_server.uri(), "test-token", None);
let result = preflight_fetch(
&client, 1, &[7, 888, 999], &[], Duration::from_secs(30),
).await.unwrap();
assert_eq!(result.issues.len(), 1, "Should have 1 successful fetch");
assert_eq!(result.failures.len(), 2, "Should aggregate both 404 failures");
assert!(result.failures.iter().all(|f| f.stage == "fetch"));
}
```
### 1e. Test: `sync_runs` table (`src/ingestion/surgical_tests.rs`)
```rust
#[test]
fn sync_run_is_persisted_and_updated() {
let conn = setup_db();
// Insert a sync_run row using existing table + new columns
conn.execute(
"INSERT INTO sync_runs (started_at, heartbeat_at, status, command, mode, phase, surgical_iids_json)
VALUES (strftime('%s','now') * 1000, strftime('%s','now') * 1000, 'running', 'sync', 'surgical', 'preflight', '[7]')",
[],
).unwrap();
let run_id = conn.last_insert_rowid();
// Update phase
conn.execute(
"UPDATE sync_runs SET phase = 'ingest' WHERE id = ?1",
[run_id],
).unwrap();
let phase: String = conn.query_row(
"SELECT phase FROM sync_runs WHERE id = ?1",
[run_id], |r| r.get(0),
).unwrap();
assert_eq!(phase, "ingest");
}
```
### 1f. Transactional rollback, TOCTOU, and failure hygiene tests (`src/ingestion/surgical_tests.rs`)
These tests cover race conditions, rollback guarantees, and failure cleanup — where regressions hit first in production:
```rust
#[test]
fn stale_payload_is_skipped_when_local_updated_at_is_newer() {
// Setup: insert an issue with updated_at = 2000
// Create a preflight payload with updated_at = 1000 (stale)
// Surgical ingest should skip the entity and record skipped_stale
let conn = setup_db();
let project_id = 1i64;
let config = test_config();
// Insert issue with newer timestamp
conn.execute(
"INSERT INTO issues (project_id, gitlab_issue_id, iid, title, state, created_at, updated_at, web_url, last_seen_at)
VALUES (?1, 1007, 7, 'Fresh issue', 'opened', 1000, 2000, 'https://example.com/7', 2000)",
[project_id],
).unwrap();
// Stale payload from preflight (updated_at older than local)
let stale_issue = make_test_issue_with_updated_at(7, 1000);
let result = ingest_issue_by_iid_from_payload(&conn, &config, project_id, &stale_issue, 0).unwrap();
assert_eq!(result.skipped_stale, 1, "Stale payload should be skipped");
assert_eq!(result.upserted, 0, "No upsert for stale payload");
// Local row should be unchanged
let local_updated: i64 = conn.query_row(
"SELECT updated_at FROM issues WHERE project_id = ?1 AND iid = 7",
[project_id], |r| r.get(0),
).unwrap();
assert_eq!(local_updated, 2000, "Local fresher row must not be overwritten");
}
#[test]
fn equal_updated_at_but_newer_last_seen_is_skipped() {
// TOCTOU edge case: updated_at is equal but local last_seen_at is newer than preflight start
// This catches concurrent normal sync that fetched the same data after our preflight
let conn = setup_db();
let project_id = 1i64;
let config = test_config();
// Insert issue where updated_at = 1500 and last_seen_at = 3000 (seen after preflight started)
conn.execute(
"INSERT INTO issues (project_id, gitlab_issue_id, iid, title, state, created_at, updated_at, web_url, last_seen_at)
VALUES (?1, 1007, 7, 'Concurrent issue', 'opened', 1000, 1500, 'https://example.com/7', 3000)",
[project_id],
).unwrap();
// Payload has same updated_at=1500, but preflight started at time 2000 (before the concurrent sync at 3000)
let payload = make_test_issue_with_updated_at(7, 1500);
let preflight_started_at_ms = 2000i64;
let result = ingest_issue_by_iid_from_payload(&conn, &config, project_id, &payload, preflight_started_at_ms).unwrap();
assert_eq!(result.skipped_stale, 1, "Equal updated_at but newer last_seen_at should be skipped");
assert_eq!(result.upserted, 0, "No upsert when local was refreshed after our preflight");
}
#[test]
fn preflight_success_then_ingest_failure_rolls_back_all_content_writes() {
// Setup: preflight succeeds for 2 issues
// First issue ingests fine, second causes an error (e.g., constraint violation)
// Assert: NEITHER issue is in the DB (transaction rolled back)
// Assert: sync_runs phase is 'failed'
let conn = setup_db();
let project_id = 1i64;
let config = test_config();
let issue_ok = make_test_issue(7);
let issue_bad = make_test_issue_bad_data(8); // Missing required field, will fail ingest
let preflight = PreflightResult {
issues: vec![issue_ok, issue_bad],
merge_requests: vec![],
failures: vec![],
};
let result = ingest_preflight_results(&conn, &config, project_id, &preflight, 0);
assert!(result.is_err(), "Ingest should fail on bad data");
// Verify rollback: no issues written
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM issues WHERE project_id = ?1",
[project_id], |r| r.get(0),
).unwrap();
assert_eq!(count, 0, "Transaction rollback should leave zero issues");
}
#[test]
fn surgical_no_docs_requires_no_embed_validation() {
// Verify that surgical mode with --no-docs but without --no-embed is rejected
let options = SyncOptions {
issue_iids: vec![7],
no_docs: true,
no_embed: false,
..SyncOptions::default()
};
assert!(options.is_surgical());
// The validation logic in handle_sync_cmd should reject this combination
// (tested via integration or by calling the validation function directly)
}
```
### 1g. Cancellation, timeout, scoped embed isolation, and payload integrity tests (`src/ingestion/surgical_tests.rs`)
These tests cover the new failure-prone runtime paths added by constraints 15-17:
```rust
#[tokio::test]
async fn cancellation_marks_sync_run_cancelled() {
// Setup: start a surgical sync run, trigger cancellation signal after preflight
// Assert: sync_runs row has status='cancelled', phase='cancelled', finished_at populated
// Assert: no content writes occurred after cancellation
todo!("implement after SyncRunRecorder extensions in Step 8b")
}
#[tokio::test]
async fn dependent_timeout_records_entity_failure_and_continues() {
// Setup: mock server with one fast-responding entity and one that hangs
// Set surgical_entity_timeout_seconds = 1
// Assert: fast entity's dependents succeed
// Assert: slow entity has EntityFailure { code: "TIMEOUT", stage: "discussions" }
// Assert: run still completes (not aborted)
todo!("implement after inline dependent helpers in Step 9b")
}
#[tokio::test]
async fn scoped_embed_does_not_embed_unrelated_docs_created_after_docs_stage() {
// Setup: surgical sync generates docs for entity A (collect document_ids)
// Between docs and embed stage, simulate another sync creating unembedded docs for entity B
// Call run_embed_for_document_ids with ONLY entity A's doc IDs
// Assert: entity B's docs remain unembedded
// Assert: entity A's docs are embedded
todo!("implement after run_embed_for_document_ids in Step 9a")
}
#[test]
fn payload_project_id_mismatch_is_rejected_in_preflight() {
// Setup: create a payload where project_id != expected gitlab_project_id
// Assert: preflight records EntityFailure { code: "PROJECT_MISMATCH" }
// Assert: mismatched payload is NOT included in successful fetches
todo!("implement after preflight_fetch integrity check in Step 7")
}
```
---
## Step 2: Add `--issue`, `--mr`, `-p`, `--preflight-only` to `SyncArgs`
**File: `src/cli/mod.rs`, struct `SyncArgs` (line 755)**
Add after the existing `no_file_changes` field (around line 784):
```rust
/// Surgically sync specific issues by IID (repeatable, must be positive)
#[arg(long, value_parser = clap::value_parser!(u64).range(1..), action = clap::ArgAction::Append)]
pub issue: Vec<u64>,
/// Surgically sync specific merge requests by IID (repeatable, must be positive)
#[arg(long, value_parser = clap::value_parser!(u64).range(1..), action = clap::ArgAction::Append)]
pub mr: Vec<u64>,
/// Scope to a single project (required when --issue or --mr is used, falls back to config.defaultProject)
#[arg(short = 'p', long)]
pub project: Option<String>,
/// Validate remote entities exist and permissions are valid, without any DB writes.
/// Runs the preflight network fetch phase only. Useful for agents to verify IIDs before committing to a full surgical sync.
#[arg(long, default_value_t = false)]
pub preflight_only: bool,
```
**Why `u64` with `range(1..)`**: IIDs are always positive integers. Parse-time validation via clap gives immediate, clear error messages (e.g., `error: 0 is not in 1..`) with zero runtime plumbing. The `u64` type makes invalid states unrepresentable.
---
## Step 3: Extend `SyncOptions`
**File: `src/cli/commands/sync.rs`, struct `SyncOptions` (line 20)**
Add fields:
```rust
#[derive(Debug, Default)]
pub struct SyncOptions {
pub full: bool,
pub force: bool,
pub no_embed: bool,
pub no_docs: bool,
pub no_events: bool,
pub robot_mode: bool,
pub dry_run: bool,
// NEW:
pub issue_iids: Vec<u64>,
pub mr_iids: Vec<u64>,
pub project: Option<String>,
pub preflight_only: bool,
}
```
Add helper method:
```rust
impl SyncOptions {
/// Maximum combined IIDs allowed in a single surgical sync.
const MAX_SURGICAL_TARGETS: usize = 100;
pub fn is_surgical(&self) -> bool {
!self.issue_iids.is_empty() || !self.mr_iids.is_empty()
}
}
```
---
## Step 4: Wire new fields in `handle_sync_cmd`
**File: `src/main.rs`, function `handle_sync_cmd` (line 2034)**
After line 2058 (existing `dry_run` field), add:
```rust
// Deduplicate IIDs before constructing SyncOptions
let mut issue_iids = args.issue;
let mut mr_iids = args.mr;
issue_iids.sort_unstable();
issue_iids.dedup();
mr_iids.sort_unstable();
mr_iids.dedup();
let options = SyncOptions {
full: args.full && !args.no_full,
force: args.force && !args.no_force,
no_embed: args.no_embed,
no_docs: args.no_docs,
no_events: args.no_events,
robot_mode,
dry_run,
// NEW:
issue_iids,
mr_iids,
project: args.project,
preflight_only: args.preflight_only,
};
```
Add validation before recording starts (after options creation):
```rust
// Validate surgical mode constraints
if options.is_surgical() {
// Enforce hard cap on combined target count
let total_targets = options.issue_iids.len() + options.mr_iids.len();
if total_targets > SyncOptions::MAX_SURGICAL_TARGETS {
return Err(Box::new(LoreError::Other(
format!(
"Too many surgical targets ({total_targets}). Maximum is {}.",
SyncOptions::MAX_SURGICAL_TARGETS
)
)));
}
// Fall back to config.defaultProject when -p is omitted
let project = options.project.clone().or_else(|| config.default_project.clone());
if project.is_none() {
return Err(Box::new(LoreError::Other(
"The --issue and --mr flags require --project (-p) or config.defaultProject".to_string()
)));
}
// (reassign resolved project back into options)
// Reject incompatible flags
if options.full {
return Err(Box::new(LoreError::Other(
"--full is incompatible with surgical sync (--issue/--mr)".to_string()
)));
}
// Reject --no-docs without --no-embed in surgical mode (embed leakage prevention)
if options.no_docs && !options.no_embed {
return Err(Box::new(LoreError::Other(
"In surgical mode, --no-docs requires --no-embed (to prevent embedding unrelated backlog docs)".to_string()
)));
}
}
// Validate preflight_only requires surgical mode
if options.preflight_only && !options.is_surgical() {
return Err(Box::new(LoreError::Other(
"--preflight-only requires --issue or --mr".to_string()
)));
}
```
---
## Step 5: Add `get_issue_by_iid` and `get_mr_by_iid` to `GitLabClient`
**File: `src/gitlab/client.rs`**
Add after `paginate_issues` (around line 330), before `paginate_merge_requests`:
```rust
/// Fetch a single issue by its project-scoped IID.
/// Uses: GET /api/v4/projects/:id/issues/:iid
pub async fn get_issue_by_iid(
&self,
gitlab_project_id: i64,
iid: u64,
) -> Result<GitLabIssue> {
let path = format!("/api/v4/projects/{gitlab_project_id}/issues/{iid}");
self.request(&path).await
}
/// Fetch a single merge request by its project-scoped IID.
/// Uses: GET /api/v4/projects/:id/merge_requests/:iid
pub async fn get_mr_by_iid(
&self,
gitlab_project_id: i64,
iid: u64,
) -> Result<GitLabMergeRequest> {
let path = format!("/api/v4/projects/{gitlab_project_id}/merge_requests/{iid}");
self.request(&path).await
}
```
These reuse the existing `request()` method (line 117) which handles:
- Auth via `PRIVATE-TOKEN` header
- Rate limiting via `RateLimiter`
- Retry on 429 with `retry_after`
- JSON deserialization via `handle_response`
- Error mapping (401 → `GitLabAuthFailed`, 404 → `GitLabNotFound`)
---
## Step 6: Make `process_single_issue` and `process_single_mr` `pub(crate)`
**File: `src/ingestion/issues.rs`, line 143**
Change `fn process_single_issue``pub(crate) fn process_single_issue`
**File: `src/ingestion/merge_requests.rs`, line 144**
Change `fn process_single_mr``pub(crate) fn process_single_mr`
Also add to `src/ingestion/mod.rs` exports (not pub, just ensure accessible via `crate::ingestion::issues::process_single_issue`).
---
## Step 7: Create `src/ingestion/surgical.rs`
This is the core new module. It provides three layers:
1. **Payload processing** (sync, no network — testable) — takes a `GitLabIssue`/`GitLabMergeRequest` already fetched
2. **Preflight fetch** (async, hits GitLab API) — fetches all entities BEFORE any DB writes, **aggregating all failures** instead of failing fast
3. **Transactional ingest** — applies all DB mutations inside a transaction after successful preflight
```rust
use rusqlite::Connection;
use tracing::debug;
use crate::Config;
use crate::core::error::Result;
use crate::gitlab::GitLabClient;
use crate::gitlab::types::{GitLabIssue, GitLabMergeRequest};
use crate::ingestion::issues::IssueForDiscussionSync;
use crate::ingestion::merge_requests::MrForDiscussionSync;
use super::issues::process_single_issue;
use super::merge_requests::process_single_mr;
/// Per-entity failure info for robot mode structured error reporting.
#[derive(Debug, Clone)]
pub struct EntityFailure {
pub entity_type: &'static str, // "issue" or "merge_request"
pub iid: u64,
pub stage: &'static str, // "fetch", "ingest", "discussions", etc.
pub code: String, // e.g. "NOT_FOUND", "NETWORK_ERROR"
pub message: String,
}
/// A dirty source key: (source_type, source_id) matching dirty_sources PK.
pub type DirtySourceKey = (String, i64);
#[derive(Debug, Default)]
pub struct SurgicalIngestResult {
pub upserted: usize,
pub labels_created: usize,
pub issue_disc_sync: Vec<IssueForDiscussionSync>,
pub mr_disc_sync: Vec<MrForDiscussionSync>,
/// Dirty source keys touched during this surgical ingest (for scoped doc regeneration).
/// Each key is `(source_type, source_id)` matching `dirty_sources` PK.
pub dirty_source_keys: Vec<DirtySourceKey>,
/// Per-entity failures for structured error reporting in robot mode
pub entity_failures: Vec<EntityFailure>,
/// Entities skipped because local row was newer than preflight payload (TOCTOU protection)
pub skipped_stale: usize,
}
/// Check whether the local issue row has a fresher state than the payload.
/// Returns true if local is newer (payload is stale and should be skipped).
/// Uses two-tier check: (a) updated_at strictly newer, or (b) equal updated_at
/// but last_seen_at is newer than preflight start (concurrent sync fetched same data).
fn is_local_newer_issue(conn: &Connection, project_id: i64, iid: i64, payload_updated_at: i64, preflight_started_at_ms: i64) -> Result<bool> {
let row: Option<(i64, i64)> = conn.query_row(
"SELECT updated_at, last_seen_at FROM issues WHERE project_id = ?1 AND iid = ?2",
(project_id, iid),
|row| Ok((row.get(0)?, row.get(1)?)),
).optional()?;
Ok(row.map_or(false, |(local_ts, local_seen)| {
local_ts > payload_updated_at
|| (local_ts == payload_updated_at && local_seen > preflight_started_at_ms)
}))
}
/// Check whether the local MR row has a fresher state than the payload.
fn is_local_newer_mr(conn: &Connection, project_id: i64, iid: i64, payload_updated_at: i64, preflight_started_at_ms: i64) -> Result<bool> {
let row: Option<(i64, i64)> = conn.query_row(
"SELECT updated_at, last_seen_at FROM merge_requests WHERE project_id = ?1 AND iid = ?2",
(project_id, iid),
|row| Ok((row.get(0)?, row.get(1)?)),
).optional()?;
Ok(row.map_or(false, |(local_ts, local_seen)| {
local_ts > payload_updated_at
|| (local_ts == payload_updated_at && local_seen > preflight_started_at_ms)
}))
}
/// Process a single issue that has already been fetched from GitLab.
/// Upserts into DB, resets discussion + event watermarks so dependents re-sync.
/// Tracks dirty source keys for scoped doc regeneration.
/// Skips stale payloads to avoid TOCTOU overwrite after unlocked preflight.
///
/// `preflight_started_at_ms`: ms-epoch timestamp captured before preflight fetch began.
/// Used for equal-timestamp TOCTOU tie-breaking via `last_seen_at`.
pub fn ingest_issue_by_iid_from_payload(
conn: &Connection,
config: &Config,
project_id: i64,
issue: &GitLabIssue,
preflight_started_at_ms: i64,
) -> Result<SurgicalIngestResult> {
let mut result = SurgicalIngestResult::default();
// TOCTOU guard: skip if local row is fresher than preflight payload
if is_local_newer_issue(conn, project_id, issue.iid, issue.updated_at, preflight_started_at_ms)? {
result.skipped_stale = 1;
debug!(iid = issue.iid, "Skipping stale payload (local is newer)");
return Ok(result);
}
let labels_created = process_single_issue(conn, config, project_id, issue)?;
result.upserted = 1;
result.labels_created = labels_created;
// Reset watermarks so discussions + events re-sync for this issue
conn.execute(
"UPDATE issues SET
discussions_synced_for_updated_at = NULL,
resource_events_synced_for_updated_at = NULL
WHERE project_id = ?1 AND iid = ?2",
(project_id, issue.iid),
)?;
// Collect dirty source key for scoped doc regeneration
let local_issue_id: i64 = conn.query_row(
"SELECT id FROM issues WHERE project_id = ?1 AND iid = ?2",
(project_id, issue.iid),
|row| row.get(0),
)?;
result.dirty_source_keys.push(("issue".to_string(), local_issue_id));
// Build the discussion sync descriptor
let row = conn.query_row(
"SELECT id, iid, updated_at FROM issues WHERE project_id = ?1 AND iid = ?2",
(project_id, issue.iid),
|row| {
Ok(IssueForDiscussionSync {
local_issue_id: row.get(0)?,
iid: row.get(1)?,
updated_at: row.get(2)?,
})
},
)?;
result.issue_disc_sync.push(row);
debug!(iid = issue.iid, "Surgical issue ingest complete");
Ok(result)
}
/// Process a single MR that has already been fetched from GitLab.
/// Skips stale payloads to avoid TOCTOU overwrite after unlocked preflight.
pub fn ingest_mr_by_iid_from_payload(
conn: &Connection,
config: &Config,
project_id: i64,
mr: &GitLabMergeRequest,
preflight_started_at_ms: i64,
) -> Result<SurgicalIngestResult> {
let mut result = SurgicalIngestResult::default();
// TOCTOU guard: skip if local row is fresher than preflight payload
if is_local_newer_mr(conn, project_id, mr.iid, mr.updated_at, preflight_started_at_ms)? {
result.skipped_stale = 1;
debug!(iid = mr.iid, "Skipping stale MR payload (local is newer)");
return Ok(result);
}
let mr_result = process_single_mr(conn, config, project_id, mr)?;
result.upserted = 1;
result.labels_created = mr_result.labels_created;
// Reset watermarks
conn.execute(
"UPDATE merge_requests SET
discussions_synced_for_updated_at = NULL,
resource_events_synced_for_updated_at = NULL
WHERE project_id = ?1 AND iid = ?2",
(project_id, mr.iid),
)?;
// Collect dirty source key
let local_mr_id: i64 = conn.query_row(
"SELECT id FROM merge_requests WHERE project_id = ?1 AND iid = ?2",
(project_id, mr.iid),
|row| row.get(0),
)?;
result.dirty_source_keys.push(("merge_request".to_string(), local_mr_id));
let row = conn.query_row(
"SELECT id, iid, updated_at FROM merge_requests WHERE project_id = ?1 AND iid = ?2",
(project_id, mr.iid),
|row| {
Ok(MrForDiscussionSync {
local_mr_id: row.get(0)?,
iid: row.get(1)?,
updated_at: row.get(2)?,
})
},
)?;
result.mr_disc_sync.push(row);
debug!(iid = mr.iid, "Surgical MR ingest complete");
Ok(result)
}
/// Preflight: fetch all requested entities from GitLab without any DB writes.
/// Aggregates ALL failures instead of failing fast — agents get a complete error report in one pass.
/// Returns the fetched payloads plus any per-IID failures.
/// Caller MUST check `result.failures` and abort writes if non-empty.
pub async fn preflight_fetch(
client: &GitLabClient,
gitlab_project_id: i64,
issue_iids: &[u64],
mr_iids: &[u64],
entity_timeout: Duration,
) -> Result<PreflightResult> {
let mut result = PreflightResult::default();
for &iid in issue_iids {
debug!(iid, "Preflight: fetching issue");
match tokio::time::timeout(entity_timeout, client.get_issue_by_iid(gitlab_project_id, iid)).await {
Ok(Ok(issue)) => {
// Payload integrity check: project_id must match requested gitlab_project_id
if issue.project_id != gitlab_project_id {
result.failures.push(EntityFailure {
entity_type: "issue",
iid,
stage: "fetch",
code: "PROJECT_MISMATCH".to_string(),
message: format!(
"Payload project_id {} does not match requested {}",
issue.project_id, gitlab_project_id
),
});
} else {
result.issues.push(issue);
}
}
Ok(Err(e)) => {
let code = classify_error_code(&e);
result.failures.push(EntityFailure {
entity_type: "issue",
iid,
stage: "fetch",
code,
message: e.to_string(),
});
}
Err(_elapsed) => {
result.failures.push(EntityFailure {
entity_type: "issue",
iid,
stage: "fetch",
code: "TIMEOUT".to_string(),
message: format!("Timed out after {}s", entity_timeout.as_secs()),
});
}
}
}
for &iid in mr_iids {
debug!(iid, "Preflight: fetching MR");
match tokio::time::timeout(entity_timeout, client.get_mr_by_iid(gitlab_project_id, iid)).await {
Ok(Ok(mr)) => {
// Payload integrity check: project_id must match
if mr.target_project_id.unwrap_or(mr.project_id) != gitlab_project_id {
result.failures.push(EntityFailure {
entity_type: "merge_request",
iid,
stage: "fetch",
code: "PROJECT_MISMATCH".to_string(),
message: format!(
"Payload project_id does not match requested {}",
gitlab_project_id
),
});
} else {
result.merge_requests.push(mr);
}
}
Ok(Err(e)) => {
let code = classify_error_code(&e);
result.failures.push(EntityFailure {
entity_type: "merge_request",
iid,
stage: "fetch",
code,
message: e.to_string(),
});
}
Err(_elapsed) => {
result.failures.push(EntityFailure {
entity_type: "merge_request",
iid,
stage: "fetch",
code: "TIMEOUT".to_string(),
message: format!("Timed out after {}s", entity_timeout.as_secs()),
});
}
}
}
Ok(result)
}
/// Map a LoreError to a machine-readable error code string.
fn classify_error_code(e: &crate::core::error::LoreError) -> String {
match e {
crate::core::error::LoreError::GitLabNotFound { .. } => "NOT_FOUND".to_string(),
crate::core::error::LoreError::GitLabAuthFailed { .. } => "AUTH_FAILED".to_string(),
crate::core::error::LoreError::RateLimited { .. } => "RATE_LIMITED".to_string(),
_ => "FETCH_ERROR".to_string(),
}
}
#[derive(Debug, Default)]
pub struct PreflightResult {
pub issues: Vec<GitLabIssue>,
pub merge_requests: Vec<GitLabMergeRequest>,
/// Per-IID failures collected during preflight (empty = all succeeded)
pub failures: Vec<EntityFailure>,
}
impl PreflightResult {
pub fn has_failures(&self) -> bool {
!self.failures.is_empty()
}
}
/// Ingest all preflight-fetched entities into the DB inside a transaction.
/// Returns combined result with all dirty source keys and discussion sync descriptors.
///
/// `preflight_started_at_ms`: ms-epoch timestamp captured before preflight began,
/// used for TOCTOU tie-breaking on equal `updated_at` timestamps.
pub fn ingest_preflight_results(
conn: &Connection,
config: &Config,
project_id: i64,
preflight: &PreflightResult,
preflight_started_at_ms: i64,
) -> Result<SurgicalIngestResult> {
let mut combined = SurgicalIngestResult::default();
// All writes happen inside a transaction — if any fails, all roll back
let tx = conn.unchecked_transaction()?;
for issue in &preflight.issues {
let single = ingest_issue_by_iid_from_payload(&tx, config, project_id, issue, preflight_started_at_ms)?;
combined.upserted += single.upserted;
combined.skipped_stale += single.skipped_stale;
combined.labels_created += single.labels_created;
combined.issue_disc_sync.extend(single.issue_disc_sync);
combined.dirty_source_keys.extend(single.dirty_source_keys);
}
for mr in &preflight.merge_requests {
let single = ingest_mr_by_iid_from_payload(&tx, config, project_id, mr, preflight_started_at_ms)?;
combined.upserted += single.upserted;
combined.skipped_stale += single.skipped_stale;
combined.labels_created += single.labels_created;
combined.mr_disc_sync.extend(single.mr_disc_sync);
combined.dirty_source_keys.extend(single.dirty_source_keys);
}
tx.commit()?;
Ok(combined)
}
#[cfg(test)]
#[path = "surgical_tests.rs"]
mod tests;
```
---
## Step 8: Register `surgical` module
**File: `src/ingestion/mod.rs`**
Add after line 8 (`pub mod orchestrator;`):
```rust
pub mod surgical;
```
---
## Step 8a: Extend existing `sync_runs` table via migration
**File: `src/core/db.rs`**
The `sync_runs` table already exists (migration 001, enriched in migration 014). Add a new migration to extend it with surgical sync columns:
```sql
-- Migration 027: Extend sync_runs for surgical sync observability
-- Adds mode/phase tracking and surgical-specific counters.
-- Reuses existing sync_runs row lifecycle (SyncRunRecorder).
ALTER TABLE sync_runs ADD COLUMN mode TEXT; -- 'standard' | 'surgical' (NULL for pre-existing rows)
ALTER TABLE sync_runs ADD COLUMN phase TEXT; -- preflight|ingest|dependents|docs|embed|done|failed
ALTER TABLE sync_runs ADD COLUMN surgical_iids_json TEXT; -- JSON: {"issues":[7,8],"mrs":[101]}
ALTER TABLE sync_runs ADD COLUMN issues_fetched INTEGER NOT NULL DEFAULT 0;
ALTER TABLE sync_runs ADD COLUMN mrs_fetched INTEGER NOT NULL DEFAULT 0;
ALTER TABLE sync_runs ADD COLUMN issues_ingested INTEGER NOT NULL DEFAULT 0;
ALTER TABLE sync_runs ADD COLUMN mrs_ingested INTEGER NOT NULL DEFAULT 0;
ALTER TABLE sync_runs ADD COLUMN skipped_stale INTEGER NOT NULL DEFAULT 0;
ALTER TABLE sync_runs ADD COLUMN docs_regenerated INTEGER NOT NULL DEFAULT 0;
ALTER TABLE sync_runs ADD COLUMN docs_embedded INTEGER NOT NULL DEFAULT 0;
ALTER TABLE sync_runs ADD COLUMN warnings_count INTEGER NOT NULL DEFAULT 0;
ALTER TABLE sync_runs ADD COLUMN cancelled_at INTEGER;
-- Indexes for observability queries on surgical runs
CREATE INDEX IF NOT EXISTS idx_sync_runs_mode_started
ON sync_runs(mode, started_at DESC);
CREATE INDEX IF NOT EXISTS idx_sync_runs_status_phase_started
ON sync_runs(status, phase, started_at DESC);
```
**Note:** No changes to `pending_dependent_fetches` — surgical mode bypasses the queue entirely (see constraint 4).
---
## Step 8b: Extend `SyncRunRecorder` for surgical mode
**File: `src/core/sync.rs`** (or wherever `SyncRunRecorder` lives)
Add methods for surgical mode lifecycle management. This replaces all ad-hoc SQL that Step 9 would otherwise need. The recorder owns the row ID and handles all `sync_runs` mutations:
```rust
impl SyncRunRecorder {
/// Start a new surgical sync run. Records mode='surgical' and phase='preflight'.
pub fn start_surgical(
conn: &Connection,
iids_json: &str,
) -> Result<Self> {
// INSERT INTO sync_runs with mode='surgical', phase='preflight', surgical_iids_json
// Returns Self with the row id for subsequent updates
todo!()
}
/// Update the current phase (preflight -> ingest -> dependents -> docs -> embed -> done)
pub fn set_phase(&self, conn: &Connection, phase: &str) -> Result<()> {
conn.execute(
"UPDATE sync_runs SET phase = ?1, heartbeat_at = strftime('%s','now') * 1000 WHERE id = ?2",
rusqlite::params![phase, self.row_id],
)?;
Ok(())
}
/// Update surgical-specific counters (issues_fetched, mrs_fetched, etc.)
pub fn set_counters(&self, conn: &Connection, counters: &SurgicalCounters) -> Result<()> {
// Single UPDATE with all counter fields
todo!()
}
/// Finalize: succeeded
pub fn finish_succeeded(&self, conn: &Connection) -> Result<()> {
// SET phase='done', status='succeeded', finished_at=now
todo!()
}
/// Finalize: succeeded with warnings (partial dependent failures)
pub fn finish_succeeded_with_warnings(&self, conn: &Connection, warnings_count: usize) -> Result<()> {
// SET phase='done', status='succeeded', warnings_count=N, finished_at=now
todo!()
}
/// Finalize: failed
pub fn finish_failed(&self, conn: &Connection, error: &str) -> Result<()> {
// SET phase='failed', status='failed', error=msg, finished_at=now
todo!()
}
/// Finalize: cancelled (shutdown signal received)
pub fn finish_cancelled(&self, conn: &Connection) -> Result<()> {
// SET phase='cancelled', status='cancelled', cancelled_at=now, finished_at=now
todo!()
}
/// Heartbeat if enough time has elapsed since last heartbeat
pub fn heartbeat_if_due(&self, conn: &Connection) -> Result<()> {
// UPDATE heartbeat_at if interval exceeded
todo!()
}
}
/// Counter snapshot for surgical sync run updates
#[derive(Debug, Default)]
pub struct SurgicalCounters {
pub issues_fetched: usize,
pub mrs_fetched: usize,
pub issues_ingested: usize,
pub mrs_ingested: usize,
pub skipped_stale: usize,
pub docs_regenerated: usize,
pub docs_embedded: usize,
}
```
---
## Step 9: Create `run_sync_surgical` in `src/cli/commands/sync.rs`
This is the surgical variant of `run_sync`. It uses a preflight-then-commit pattern: all primary entity fetches happen first (WITHOUT the sync lock), then DB writes happen transactionally (WITH the sync lock). Dependent stages (discussions, resource events, MR closes_issues, MR diffs) run inline per-entity — they do NOT use `pending_dependent_fetches`.
**IMPORTANT implementation notes:**
- **Dry-run is zero-write**: Check `options.dry_run` BEFORE lock acquisition and DB connection. No side effects at all.
- **Preflight-only**: Check `options.preflight_only` after preflight fetch completes. Return results (including any failures) with zero content DB writes. Control-plane run-ledger writes are allowed.
- **Lock window minimization**: Lock is NOT held during preflight network I/O. Acquired immediately before Stage 1 (first content DB mutation). Released before embed stage.
- **Aggregate preflight failures**: Preflight collects ALL per-IID failures (including payload integrity mismatches). If any failures exist, abort with zero content DB writes and return structured error report.
- **Durable run state via SyncRunRecorder**: Use `SyncRunRecorder::start_surgical(...)` at run start. Call `recorder.set_phase()` after each stage transition. On success call `recorder.finish_succeeded()` (or `finish_succeeded_with_warnings`). On failure call `recorder.finish_failed()`. On cancellation call `recorder.finish_cancelled()`. No raw SQL updates to `sync_runs`.
- **Sync lock**: Acquire `AppLock("sync")` using the same stale lock and heartbeat settings as `run_ingest`. Hold lock for ingest + dependents + docs only. Release before embed.
- **Transactional ingest**: After successful preflight, apply all DB mutations inside `unchecked_transaction()`.
- **Stale-write protection**: Each entity's ingest checks local `updated_at` (and `last_seen_at` for equal-timestamp tie-breaking) vs preflight payload. Stale payloads are skipped (not overwritten).
- **Inline dependents**: Discussions, resource events, MR closes_issues, and MR diffs are fetched and written inline per-entity. No jobs are enqueued to `pending_dependent_fetches`. Individual dependent stage failures are recorded per-entity in `entity_failures` but do not abort the run.
- **Per-entity timeout**: Each dependent network call is wrapped in `tokio::time::timeout(Duration::from_secs(config.sync.surgical_entity_timeout_seconds.unwrap_or(30)))`. Timeouts produce `EntityFailure { code: "TIMEOUT" }`.
- **Scoped doc regeneration**: Call `run_generate_docs_for_sources(config, &dirty_source_keys)` which returns `GenerateDocsResult` including `regenerated_document_ids`.
- **Scoped embed**: Call `run_embed_for_document_ids(config, &regenerated_document_ids, signal)` — NOT global `run_embed`. This ensures isolation even after lock release and under concurrent normal sync.
- **Embed guard**: Embed stage only runs when surgical docs actually regenerated documents in this run (`!options.no_docs && !regenerated_document_ids.is_empty()`).
- **prefetch_mr_discussions signature**: Takes `(client, gitlab_project_id, local_project_id, MrForDiscussionSync)` — NOT a slice.
- **Preflight timestamp**: Capture `preflight_started_at_ms` before the first network call and pass it through to ingest functions for TOCTOU tie-breaking.
- **Cancellation handling**: Every `signal.is_cancelled()` check MUST call `recorder.finish_cancelled()` before returning. No early returns without finalizing the run row.
- **Payload integrity**: Preflight validates `payload.project_id == gitlab_project_id` for every fetched entity. Mismatches become `EntityFailure { code: "PROJECT_MISMATCH" }`.
Add this function after `run_sync` (around line 360):
```rust
pub async fn run_sync_surgical(
config: &Config,
options: &SyncOptions,
run_id: &str,
signal: &ShutdownSignal,
) -> Result<SyncResult> {
use crate::core::db::create_connection;
use crate::core::lock::{AppLock, LockOptions};
use crate::core::paths::get_db_path;
use crate::core::project::resolve_project;
use crate::core::sync::{SyncRunRecorder, SurgicalCounters};
use crate::gitlab::GitLabClient;
use crate::ingestion::discussions::ingest_issue_discussions;
use crate::ingestion::mr_discussions::prefetch_mr_discussions;
use crate::ingestion::mr_discussions::write_prefetched_mr_discussions;
use crate::ingestion::surgical::{preflight_fetch, ingest_preflight_results};
let span = tracing::info_span!("sync_surgical", %run_id);
async move {
let mut result = SyncResult {
run_id: run_id.to_string(),
..SyncResult::default()
};
// ── Dry-run: zero side effects (no lock, no DB, no network) ──
if options.dry_run {
let project_str = options.project.as_deref().expect("validated in handle_sync_cmd");
info!(
issues = ?options.issue_iids,
mrs = ?options.mr_iids,
project = project_str,
"Surgical sync dry-run: would fetch these IIDs"
);
return Ok(result);
}
let db_path = get_db_path(config.storage.db_path.as_deref());
let conn = create_connection(&db_path)?;
let project_str = options.project.as_deref().expect("validated in handle_sync_cmd");
let project_id = resolve_project(&conn, project_str)?;
let gitlab_project_id: i64 = conn.query_row(
"SELECT gitlab_project_id FROM projects WHERE id = ?1",
[project_id],
|r| r.get(0),
)?;
let token = config.resolve_token()?;
let client = GitLabClient::new(
&config.gitlab.base_url, &token, config.sync.rate_limit_rps,
);
// ── Record sync run via SyncRunRecorder (not ad-hoc SQL) ──
let iids_json = serde_json::to_string(&serde_json::json!({
"issues": options.issue_iids,
"mrs": options.mr_iids,
})).unwrap_or_default();
let recorder = SyncRunRecorder::start_surgical(&conn, &iids_json)?;
// ── Capture preflight start timestamp for TOCTOU tie-breaking ──
let preflight_started_at_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
// ── Stage 0: Preflight fetch (NO lock, NO content DB writes — network only) ──
// Lock is NOT held during preflight to minimize contention with normal syncs.
let stage_start = Instant::now();
let total_items = options.issue_iids.len() + options.mr_iids.len();
let entity_timeout = Duration::from_secs(
config.sync.surgical_entity_timeout_seconds.unwrap_or(30) as u64
);
let spinner = stage_spinner_v2(
Icons::sync(),
"Preflight",
&format!("fetching {} items...", total_items),
options.robot_mode,
);
let preflight = preflight_fetch(
&client, gitlab_project_id, &options.issue_iids, &options.mr_iids, entity_timeout,
).await?;
// Update run ledger via recorder
recorder.set_phase(&conn, "preflight")?;
recorder.set_counters(&conn, &SurgicalCounters {
issues_fetched: preflight.issues.len(),
mrs_fetched: preflight.merge_requests.len(),
..Default::default()
})?;
let preflight_elapsed = stage_start.elapsed();
result.stage_timings_ms.insert("preflight".to_string(), preflight_elapsed.as_millis() as u64);
// Check for preflight failures — abort with structured error report if any
if preflight.has_failures() {
let failure_summary = preflight.failures.iter()
.map(|f| format!("{} #{}: {} ({})", f.entity_type, f.iid, f.code, f.message))
.collect::<Vec<_>>()
.join("; ");
let fail_icon = color_icon(Icons::error(), true);
emit_stage_line(&spinner, &fail_icon, "Preflight",
&format!("{} of {} failed: {}", preflight.failures.len(), total_items, failure_summary),
preflight_elapsed);
// Record failure via recorder
recorder.finish_failed(&conn, &failure_summary)?;
result.entity_failures = preflight.failures.clone();
return Err(LoreError::SurgicalPreflightFailed {
run_id: run_id.to_string(),
total: total_items,
failures: preflight.failures,
}.into());
}
let preflight_summary = format!(
"{} issues, {} MRs fetched",
preflight.issues.len(), preflight.merge_requests.len()
);
let preflight_icon = color_icon(Icons::success(), false);
emit_stage_line(&spinner, &preflight_icon, "Preflight", &preflight_summary, preflight_elapsed);
// ── Preflight-only mode: return after successful preflight, zero content DB writes ──
if options.preflight_only {
result.requested_count = total_items;
result.fetched_count = preflight.issues.len() + preflight.merge_requests.len();
recorder.finish_succeeded(&conn)?;
return Ok(result);
}
if signal.is_cancelled() {
recorder.finish_cancelled(&conn)?;
return Ok(result);
}
// ── Acquire sync lock ONLY for mutation phases ──
// Lock is acquired here, AFTER preflight completes, to minimize contention.
let lock_conn = create_connection(&db_path)?;
let mut lock = AppLock::new(
lock_conn,
LockOptions {
name: "sync".to_string(),
stale_lock_minutes: config.sync.stale_lock_minutes,
heartbeat_interval_seconds: config.sync.heartbeat_interval_seconds,
},
);
lock.acquire(false)?; // not force — respect existing locks
// ── Stage 1: Transactional ingest (all-or-nothing content DB writes) ──
let stage_start = Instant::now();
let spinner = stage_spinner_v2(
Icons::sync(),
"Ingest",
"writing to database...",
options.robot_mode,
);
recorder.set_phase(&conn, "ingest")?;
let ingest_result = match ingest_preflight_results(
&conn, config, project_id, &preflight, preflight_started_at_ms,
) {
Ok(r) => r,
Err(e) => {
recorder.finish_failed(&conn, &e.to_string())?;
return Err(e);
}
};
result.issues_updated = preflight.issues.len();
result.mrs_updated = preflight.merge_requests.len();
result.skipped_stale = ingest_result.skipped_stale;
let all_dirty_source_keys = ingest_result.dirty_source_keys.clone();
// Update run ledger counters
recorder.set_counters(&conn, &SurgicalCounters {
issues_fetched: preflight.issues.len(),
mrs_fetched: preflight.merge_requests.len(),
issues_ingested: result.issues_updated,
mrs_ingested: result.mrs_updated,
skipped_stale: result.skipped_stale,
..Default::default()
})?;
let ingest_elapsed = stage_start.elapsed();
result.stage_timings_ms.insert("ingest".to_string(), ingest_elapsed.as_millis() as u64);
let ingest_summary = format!(
"{} issues, {} MRs ingested{}",
result.issues_updated, result.mrs_updated,
if result.skipped_stale > 0 { format!(", {} skipped (stale)", result.skipped_stale) } else { String::new() }
);
let ingest_icon = color_icon(Icons::success(), false);
emit_stage_line(&spinner, &ingest_icon, "Ingest", &ingest_summary, ingest_elapsed);
if signal.is_cancelled() {
recorder.finish_cancelled(&conn)?;
return Ok(result);
}
// ── Stage 2: Dependent stages (discussions, events, MR dependents) ──
// All dependents run INLINE per-entity — no jobs are enqueued to pending_dependent_fetches.
// Individual dependent failures are recorded per-entity but do not abort the run.
// Each network call is wrapped in tokio::time::timeout for bounded execution.
recorder.set_phase(&conn, "dependents")?;
let stage_start = Instant::now();
// Stage 2a: Discussions for issues
if !ingest_result.issue_disc_sync.is_empty() {
for issue_info in &ingest_result.issue_disc_sync {
match tokio::time::timeout(entity_timeout, ingest_issue_discussions(
&conn, &client, config, gitlab_project_id, project_id,
std::slice::from_ref(issue_info),
)).await {
Ok(Ok(_)) => result.discussions_fetched += 1,
Ok(Err(e)) => {
result.entity_failures.push(EntityFailure {
entity_type: "issue",
iid: issue_info.iid as u64,
stage: "discussions",
code: "FETCH_ERROR".to_string(),
message: e.to_string(),
});
}
Err(_elapsed) => {
result.entity_failures.push(EntityFailure {
entity_type: "issue",
iid: issue_info.iid as u64,
stage: "discussions",
code: "TIMEOUT".to_string(),
message: format!("Timed out after {}s", entity_timeout.as_secs()),
});
}
}
}
}
// Stage 2b: Resource events for issues (inline, no queue)
if !options.no_events && config.sync.fetch_resource_events {
for issue_info in &ingest_result.issue_disc_sync {
match tokio::time::timeout(entity_timeout, sync_issue_resource_events_direct(
&conn, &client, config, gitlab_project_id, project_id, issue_info,
)).await {
Ok(Ok(count)) => result.resource_events_fetched += count,
Ok(Err(e)) => {
result.entity_failures.push(EntityFailure {
entity_type: "issue",
iid: issue_info.iid as u64,
stage: "resource_events",
code: "FETCH_ERROR".to_string(),
message: e.to_string(),
});
}
Err(_elapsed) => {
result.entity_failures.push(EntityFailure {
entity_type: "issue",
iid: issue_info.iid as u64,
stage: "resource_events",
code: "TIMEOUT".to_string(),
message: format!("Timed out after {}s", entity_timeout.as_secs()),
});
}
}
}
}
// Stage 2c: Discussions for MRs
if !ingest_result.mr_disc_sync.is_empty() {
for mr_info in &ingest_result.mr_disc_sync {
let prefetched = match tokio::time::timeout(entity_timeout, prefetch_mr_discussions(
&client, gitlab_project_id, project_id, mr_info.clone(),
)).await {
Ok(p) => p,
Err(_elapsed) => {
result.entity_failures.push(EntityFailure {
entity_type: "merge_request",
iid: mr_info.iid as u64,
stage: "discussions",
code: "TIMEOUT".to_string(),
message: format!("Timed out after {}s", entity_timeout.as_secs()),
});
continue;
}
};
match write_prefetched_mr_discussions(
&conn, config, project_id, &[prefetched],
) {
Ok(disc_result) => result.discussions_fetched += disc_result.len(),
Err(e) => {
result.entity_failures.push(EntityFailure {
entity_type: "merge_request",
iid: mr_info.iid as u64,
stage: "discussions",
code: "WRITE_ERROR".to_string(),
message: e.to_string(),
});
}
}
}
}
// Stage 2d: Resource events for MRs (inline, no queue)
if !options.no_events && config.sync.fetch_resource_events && !ingest_result.mr_disc_sync.is_empty() {
for mr_info in &ingest_result.mr_disc_sync {
match tokio::time::timeout(entity_timeout, sync_mr_resource_events_direct(
&conn, &client, config, gitlab_project_id, project_id, mr_info,
)).await {
Ok(Ok(count)) => result.resource_events_fetched += count,
Ok(Err(e)) => {
result.entity_failures.push(EntityFailure {
entity_type: "merge_request",
iid: mr_info.iid as u64,
stage: "resource_events",
code: "FETCH_ERROR".to_string(),
message: e.to_string(),
});
}
Err(_elapsed) => {
result.entity_failures.push(EntityFailure {
entity_type: "merge_request",
iid: mr_info.iid as u64,
stage: "resource_events",
code: "TIMEOUT".to_string(),
message: format!("Timed out after {}s", entity_timeout.as_secs()),
});
}
}
}
}
// Stage 2e: MR closes_issues (inline, no queue)
if config.sync.fetch_mr_closes_issues.unwrap_or(true) && !ingest_result.mr_disc_sync.is_empty() {
for mr_info in &ingest_result.mr_disc_sync {
match tokio::time::timeout(entity_timeout, sync_mr_closes_issues_direct(
&conn, &client, config, gitlab_project_id, project_id, mr_info,
)).await {
Ok(Ok(_)) => {},
Ok(Err(e)) => {
result.entity_failures.push(EntityFailure {
entity_type: "merge_request",
iid: mr_info.iid as u64,
stage: "mr_closes_issues",
code: "FETCH_ERROR".to_string(),
message: e.to_string(),
});
}
Err(_elapsed) => {
result.entity_failures.push(EntityFailure {
entity_type: "merge_request",
iid: mr_info.iid as u64,
stage: "mr_closes_issues",
code: "TIMEOUT".to_string(),
message: format!("Timed out after {}s", entity_timeout.as_secs()),
});
}
}
}
}
// Stage 2f: MR diffs (inline, no queue)
if config.sync.fetch_mr_diffs.unwrap_or(true) && !ingest_result.mr_disc_sync.is_empty() {
for mr_info in &ingest_result.mr_disc_sync {
match tokio::time::timeout(entity_timeout, sync_mr_diffs_direct(
&conn, &client, config, gitlab_project_id, project_id, mr_info,
)).await {
Ok(Ok(_)) => {},
Ok(Err(e)) => {
result.entity_failures.push(EntityFailure {
entity_type: "merge_request",
iid: mr_info.iid as u64,
stage: "mr_diffs",
code: "FETCH_ERROR".to_string(),
message: e.to_string(),
});
}
Err(_elapsed) => {
result.entity_failures.push(EntityFailure {
entity_type: "merge_request",
iid: mr_info.iid as u64,
stage: "mr_diffs",
code: "TIMEOUT".to_string(),
message: format!("Timed out after {}s", entity_timeout.as_secs()),
});
}
}
}
}
let dependents_elapsed = stage_start.elapsed();
result.stage_timings_ms.insert("dependents".to_string(), dependents_elapsed.as_millis() as u64);
if signal.is_cancelled() {
recorder.finish_cancelled(&conn)?;
return Ok(result);
}
// ── Stage 3: Docs ── (scoped to dirty sources from THIS surgical run only)
let mut regenerated_document_ids: Vec<i64> = Vec::new();
if !options.no_docs && !all_dirty_source_keys.is_empty() {
recorder.set_phase(&conn, "docs")?;
let stage_start = Instant::now();
let spinner = stage_spinner_v2(Icons::sync(), "Docs", "generating...", options.robot_mode);
// Process ONLY the dirty sources touched by this surgical run.
// Uses run_generate_docs_for_sources which filters by (source_type, source_id) PK
// and deletes only those specific dirty_sources rows after processing.
// Returns regenerated_document_ids for scoped embedding.
let docs_result = run_generate_docs_for_sources(config, &all_dirty_source_keys)?;
result.documents_regenerated = docs_result.regenerated;
result.documents_errored = docs_result.errored;
regenerated_document_ids = docs_result.regenerated_document_ids;
// Update run ledger
recorder.set_counters(&conn, &SurgicalCounters {
issues_fetched: preflight.issues.len(),
mrs_fetched: preflight.merge_requests.len(),
issues_ingested: result.issues_updated,
mrs_ingested: result.mrs_updated,
skipped_stale: result.skipped_stale,
docs_regenerated: result.documents_regenerated,
..Default::default()
})?;
let docs_elapsed = stage_start.elapsed();
result.stage_timings_ms.insert("docs".to_string(), docs_elapsed.as_millis() as u64);
let docs_summary = format!("{} documents generated", result.documents_regenerated);
let docs_icon = color_icon(
if docs_result.errored > 0 { Icons::warning() } else { Icons::success() },
docs_result.errored > 0,
);
emit_stage_line(&spinner, &docs_icon, "Docs", &docs_summary, docs_elapsed);
}
// ── Release sync lock before embed (embed is idempotent, doesn't need sync lock) ──
drop(lock);
// ── Stage 4: Embed ──
// SCOPED: only embeds documents regenerated by THIS surgical run (via document_ids).
// Uses run_embed_for_document_ids — NOT global run_embed — to ensure isolation
// even after lock release and under concurrent normal sync activity.
if !options.no_embed && !options.no_docs && !regenerated_document_ids.is_empty() {
recorder.set_phase(&conn, "embed")?;
let stage_start = Instant::now();
let spinner = stage_spinner_v2(Icons::sync(), "Embed", "preparing...", options.robot_mode);
match run_embed_for_document_ids(config, &regenerated_document_ids, signal).await {
Ok(embed_result) => {
result.documents_embedded = embed_result.docs_embedded;
result.embedding_failed = embed_result.failed;
// Update run ledger
recorder.set_counters(&conn, &SurgicalCounters {
issues_fetched: preflight.issues.len(),
mrs_fetched: preflight.merge_requests.len(),
issues_ingested: result.issues_updated,
mrs_ingested: result.mrs_updated,
skipped_stale: result.skipped_stale,
docs_regenerated: result.documents_regenerated,
docs_embedded: result.documents_embedded,
})?;
let embed_elapsed = stage_start.elapsed();
result.stage_timings_ms.insert("embed".to_string(), embed_elapsed.as_millis() as u64);
let embed_summary = format!("{} chunks embedded", embed_result.chunks_embedded);
let embed_icon = color_icon(
if embed_result.failed > 0 { Icons::warning() } else { Icons::success() },
embed_result.failed > 0,
);
emit_stage_line(&spinner, &embed_icon, "Embed", &embed_summary, embed_elapsed);
}
Err(e) => {
let embed_elapsed = stage_start.elapsed();
result.stage_timings_ms.insert("embed".to_string(), embed_elapsed.as_millis() as u64);
let warn_icon = color_icon(Icons::warning(), true);
emit_stage_line(&spinner, &warn_icon, "Embed", &format!("skipped ({e})"), embed_elapsed);
warn!(error = %e, "Embedding stage failed, continuing");
}
}
}
// ── Mark run complete ──
let warnings_count = result.entity_failures.len();
if warnings_count > 0 {
recorder.finish_succeeded_with_warnings(&conn, warnings_count)?;
} else {
recorder.finish_succeeded(&conn)?;
}
result.requested_count = total_items;
result.fetched_count = preflight.issues.len() + preflight.merge_requests.len();
result.processed_count = result.issues_updated + result.mrs_updated;
Ok(result)
}
.instrument(span)
.await
}
```
### Step 9a: Implement `run_generate_docs_for_sources` and `run_embed_for_document_ids`
**File: `src/documents/mod.rs`** (or wherever `run_generate_docs` lives)
`run_generate_docs_for_sources` is a ~20-line variant of the existing `run_generate_docs` that filters by `(source_type, source_id)` primary key, processes only those docs, then deletes only those specific `dirty_sources` rows. It additionally returns the IDs of regenerated documents for scoped embedding:
```rust
/// Scoped doc regeneration: process ONLY the specified dirty source keys.
/// Used by surgical sync to avoid draining the global dirty queue.
/// Each key is `(source_type, source_id)` matching `dirty_sources` PK.
/// Returns regenerated document IDs for use with scoped embedding.
pub fn run_generate_docs_for_sources(
config: &Config,
source_keys: &[(String, i64)],
) -> Result<GenerateDocsResult> {
// Same as run_generate_docs but with:
// 1. SELECT ... FROM dirty_sources WHERE (source_type, source_id) IN (...) instead of full scan
// 2. DELETE FROM dirty_sources WHERE (source_type, source_id) IN (...) instead of full drain
// 3. Collect document IDs of regenerated rows into regenerated_document_ids
// All other logic (doc template rendering, indexing) is identical.
todo!("implement scoped variant")
}
```
**`GenerateDocsResult`** must be extended with:
```rust
pub struct GenerateDocsResult {
pub regenerated: usize,
pub errored: usize,
/// Document IDs of all regenerated documents (for scoped embedding in surgical mode)
pub regenerated_document_ids: Vec<i64>,
}
```
**File: `src/embedding/mod.rs`** (or wherever `run_embed` lives)
Add a scoped embedding function that processes only specific document IDs:
```rust
/// Scoped embedding: embed ONLY the specified document IDs.
/// Used by surgical sync to ensure embed isolation after lock release.
/// Unlike global `run_embed` (which processes all unembedded docs),
/// this ensures only documents from the current surgical run are embedded.
pub async fn run_embed_for_document_ids(
config: &Config,
document_ids: &[i64],
signal: &ShutdownSignal,
) -> Result<EmbedResult> {
// Same as run_embed but with:
// 1. SELECT ... FROM documents WHERE id IN (...) AND embedding IS NULL
// instead of SELECT ... FROM documents WHERE embedding IS NULL
// 2. All other logic (chunking, batching, Ollama calls) is identical
todo!("implement scoped variant")
}
```
### Step 9b: Implement inline dependent helpers
**File: `src/ingestion/surgical.rs`** (or a new `src/ingestion/surgical_dependents.rs`)
These functions fetch and write dependent data inline for a single entity, bypassing `pending_dependent_fetches` entirely. They are thin wrappers around the existing fetch + write logic from `orchestrator.rs`, extracted to operate on a single entity:
```rust
/// Fetch and write resource events for a single issue, inline (no queue).
pub(crate) async fn sync_issue_resource_events_direct(
conn: &Connection,
client: &GitLabClient,
config: &Config,
gitlab_project_id: i64,
project_id: i64,
issue_info: &IssueForDiscussionSync,
) -> Result<usize> {
// Fetch resource events for this issue from GitLab API
// Write to resource_state_events / resource_label_events / resource_milestone_events
// Return count of events written
todo!("extract from orchestrator drain logic")
}
/// Fetch and write resource events for a single MR, inline (no queue).
pub(crate) async fn sync_mr_resource_events_direct(
conn: &Connection,
client: &GitLabClient,
config: &Config,
gitlab_project_id: i64,
project_id: i64,
mr_info: &MrForDiscussionSync,
) -> Result<usize> {
todo!("extract from orchestrator drain logic")
}
/// Fetch and write closes_issues data for a single MR, inline (no queue).
pub(crate) async fn sync_mr_closes_issues_direct(
conn: &Connection,
client: &GitLabClient,
config: &Config,
gitlab_project_id: i64,
project_id: i64,
mr_info: &MrForDiscussionSync,
) -> Result<()> {
todo!("extract from orchestrator drain logic")
}
/// Fetch and write diff data for a single MR, inline (no queue).
pub(crate) async fn sync_mr_diffs_direct(
conn: &Connection,
client: &GitLabClient,
config: &Config,
gitlab_project_id: i64,
project_id: i64,
mr_info: &MrForDiscussionSync,
) -> Result<()> {
todo!("extract from orchestrator drain logic")
}
```
These are extracted from the existing drain loop bodies in `orchestrator.rs`. The drain functions iterate over queued jobs and call per-entity logic — these helpers are that per-entity logic, made callable directly.
---
## Step 9d: Add `LoreError::SurgicalPreflightFailed` variant
**File: `src/core/error.rs`**
Add a typed error variant for surgical preflight failures. This preserves machine semantics for robot mode and enables structured exit codes:
```rust
/// Surgical sync preflight failed — one or more IIDs could not be fetched.
/// Contains structured per-entity failure details for robot mode output.
SurgicalPreflightFailed {
run_id: String,
total: usize,
failures: Vec<crate::ingestion::surgical::EntityFailure>,
},
```
Map to exit code 6 (resource not found) or a new dedicated exit code. The `Display` impl should produce a human-readable summary:
```rust
Self::SurgicalPreflightFailed { run_id, total, failures } => {
write!(f, "Surgical preflight failed for {} of {} IIDs (run {}): {}",
failures.len(), total, run_id,
failures.iter().map(|f| format!("{} #{}: {}", f.entity_type, f.iid, f.code)).collect::<Vec<_>>().join(", "))
}
```
In robot mode, this serializes to structured JSON with per-entity details and actionable recovery commands, rather than a generic `Other` string.
---
## Step 10: Add branch in `run_sync` for surgical mode
**File: `src/cli/commands/sync.rs`, function `run_sync` (line 68)**
**CRITICAL: dry_run must be checked BEFORE surgical to prevent accidental writes:**
```rust
// Handle dry_run mode - must check BEFORE surgical to prevent writes
if options.dry_run {
if options.is_surgical() {
// Surgical dry-run is handled inside run_sync_surgical (returns before any writes/locks)
return run_sync_surgical(config, &options, run_id, signal).await;
}
return run_sync_dry_run(config, &options).await;
}
// Handle preflight-only mode
if options.preflight_only {
return run_sync_surgical(config, &options, run_id, signal).await;
}
// Handle surgical mode (specific IIDs)
if options.is_surgical() {
return run_sync_surgical(config, &options, run_id, signal).await;
}
```
Note: `run_sync_surgical` checks `options.dry_run` and `options.preflight_only` internally and returns early before ANY side effects. This avoids needing separate functions for each mode.
---
## Step 11: Make `GitLabClient` constructible from config
Check if `GitLabClient::from_config` already exists. If not, the surgical path needs to construct the client. Currently `run_ingest` constructs it via:
```rust
let token = config.resolve_token()?;
let client = GitLabClient::new(&config.gitlab.base_url, &token, config.sync.rate_limit_rps);
```
The surgical function uses the same 2-line inline construction (shown in Step 9).
---
## Step 12: `ProcessMrResult` visibility
`process_single_mr` returns `ProcessMrResult` which is currently private. Make it `pub(crate)`:
**File: `src/ingestion/merge_requests.rs`, line 138**
```rust
pub(crate) struct ProcessMrResult {
```
---
## Step 13: Make orchestrator per-entity logic extractable
The surgical inline dependent helpers (Step 9b) need the per-entity fetch+write logic currently embedded in orchestrator drain loops. There are two approaches:
**Option A (preferred):** Extract the per-entity body of each drain loop into a standalone function. The drain function then calls the extracted function in a loop, and the surgical helpers call it directly. This avoids code duplication.
**Option B:** If the drain loop body is tightly coupled to queue state (locked_at, attempts, etc.), write standalone versions for surgical that call the same underlying GitLab API functions and DB write functions, skipping queue management.
**Functions to make accessible or extract:**
- Resource events fetch + write for a single entity (issue or MR)
- MR closes_issues fetch + write for a single MR
- MR diffs fetch + write for a single MR
The existing orchestrator functions should remain unchanged for normal sync — only surgical mode uses the extracted per-entity versions.
---
## Step 14: Update `robot-docs` manifest
The `robot-docs` command outputs a manifest of all commands and flags. If it's auto-generated from clap derive, the new `--issue`, `--mr`, `-p`, `--preflight-only` flags will appear automatically. If it's hardcoded, update the sync command entry.
**File:** Check `src/cli/robot.rs` or wherever `robot-docs` content is defined. The new flags should appear in the sync command's schema. Document the `--preflight-only` flag's behavior and its distinction from `--dry-run`.
---
## Step 15: Update `SyncResult` for robot mode structured output
**File: `src/cli/commands/sync.rs`** (wherever `SyncResult` is defined)
Add fields for surgical-specific structured output:
```rust
pub struct SyncResult {
// ... existing fields ...
// NEW: Surgical sync metadata for robot mode
/// Per-entity failures (empty on success)
pub entity_failures: Vec<EntityFailure>,
/// Per-entity outcome map for deterministic retry and richer UI messaging
pub entity_outcomes: Vec<EntityOutcome>,
/// Number of entities requested
pub requested_count: usize,
/// Number of entities successfully fetched in preflight
pub fetched_count: usize,
/// Number of entities successfully processed (ingested + dependents)
pub processed_count: usize,
/// Entities skipped because local row was newer than preflight payload (TOCTOU protection)
pub skipped_stale: usize,
/// Per-entity IIDs that were skipped as stale
pub skipped_stale_iids: Vec<(String, u64)>,
/// Per-stage elapsed ms for deterministic performance tracking
pub stage_timings_ms: std::collections::BTreeMap<String, u64>,
/// Suggested recovery commands when failures occur (robot ergonomics)
pub recovery_actions: Vec<String>,
/// Overall completion status: succeeded | succeeded_with_warnings | failed | cancelled
pub completion_status: String,
}
/// Per-entity outcome for robot mode structured output.
/// Enables deterministic per-IID retry and richer UI messaging.
#[derive(Debug, Clone, serde::Serialize)]
pub struct EntityOutcome {
pub entity_type: String, // "issue" or "merge_request"
pub iid: u64,
pub fetched: bool,
pub ingested: bool,
pub stale_skipped: bool,
pub dependent_failures: Vec<EntityFailure>,
}
```
Robot mode JSON output includes these fields when in surgical mode, enabling agents to:
- Programmatically detect partial failures via `entity_failures`
- Get complete per-entity lifecycle via `entity_outcomes` for deterministic retry
- Track performance regressions via `stage_timings_ms`
- Auto-recover via `recovery_actions` (e.g., `["lore sync --issue 888 -p myproject"]` for retry)
- Detect TOCTOU skips via `skipped_stale` and `skipped_stale_iids`
- Distinguish partial success via `completion_status`
---
## Files Modified Summary
| File | Change | Lines |
|------|--------|-------|
| `src/cli/mod.rs` | Add `issue: Vec<u64>`, `mr: Vec<u64>`, `project: Option<String>`, `preflight_only: bool` to `SyncArgs` | ~790 |
| `src/cli/commands/sync.rs` | Add fields to `SyncOptions`, `is_surgical()`, `MAX_SURGICAL_TARGETS`, dry-run/surgical/preflight-only branch ordering, `run_sync_surgical` fn, `SyncResult` fields, `EntityOutcome` struct | ~20-30, new ~350 lines |
| `src/main.rs` | Wire new fields in `handle_sync_cmd`, add validation + defaultProject fallback + dedup + hard cap + preflight_only + no-docs/no-embed guard validation | ~2052-2060 |
| `src/gitlab/client.rs` | Add `get_issue_by_iid()`, `get_mr_by_iid()` (~10 lines each, `u64` iid param), add tests (~60 lines) | ~330, ~770 |
| `src/ingestion/surgical.rs` | **New file**: `EntityFailure`, `DirtySourceKey`, `PreflightResult` (with `failures` vec, payload integrity checks), `preflight_fetch` (aggregate-failures, per-entity timeout, project_id validation), `classify_error_code`, TOCTOU guards (`is_local_newer_*` with `last_seen_at` tie-breaking), dirty source key collection, inline dependent helpers, payload processors, `ingest_preflight_results` (~450 lines) | new |
| `src/ingestion/surgical_tests.rs` | **New file**: unit tests with `test_config()` helper, scoping invariant tests (including same-entity), TOCTOU tests (including equal-timestamp), rollback tests, preflight aggregation test, cancellation test, timeout test, scoped embed isolation test, payload integrity test (~450 lines) | new |
| `src/ingestion/mod.rs` | Add `pub mod surgical;` | line 9 |
| `src/ingestion/issues.rs` | `fn process_single_issue``pub(crate) fn` | line 143 |
| `src/ingestion/merge_requests.rs` | `fn process_single_mr``pub(crate) fn`, `struct ProcessMrResult``pub(crate)` | lines 138, 144 |
| `src/ingestion/orchestrator.rs` | Extract per-entity fetch+write logic into standalone functions callable by surgical inline helpers | TBD |
| `src/documents/mod.rs` | Add `run_generate_docs_for_sources` scoped variant (filters by PK, returns `regenerated_document_ids`), extend `GenerateDocsResult` | new fn ~30 lines |
| `src/embedding/mod.rs` | Add `run_embed_for_document_ids` scoped variant (embeds only specified document IDs) | new fn ~30 lines |
| `src/core/db.rs` | New migration: extend `sync_runs` with `mode`, `phase`, `surgical_iids_json`, surgical counters, `warnings_count`, `cancelled_at`, indexes | new migration |
| `src/core/sync.rs` | Extend `SyncRunRecorder` with `start_surgical`, `set_phase`, `set_counters`, `finish_*`, `heartbeat_if_due`, add `SurgicalCounters` struct | ~80 lines |
| `src/core/error.rs` | Add `SurgicalPreflightFailed` variant with structured fields | ~15 lines |
---
## Acceptance Criteria
### For agents to verify (automated):
1. **Compilation**: `cargo check --all-targets` passes with zero errors
2. **Clippy**: `cargo clippy --all-targets -- -D warnings` passes (pedantic + nursery enabled)
3. **Format**: `cargo fmt --check` passes
4. **Tests**: `cargo test` passes, including all new tests
5. **New tests exist**:
- `get_issue_by_iid_returns_issue` — wiremock test in `src/gitlab/client.rs`
- `get_issue_by_iid_returns_not_found` — 404 error handling
- `get_mr_by_iid_returns_mr` — wiremock test
- `get_mr_by_iid_returns_not_found`
- `ingest_issue_by_iid_inserts_and_marks_dirty` — in-memory DB test
- `ingest_issue_by_iid_resets_discussion_watermark`
- `ingest_issue_by_iid_resets_event_watermark`
- `ingest_mr_by_iid_inserts_and_marks_dirty`
- `ingest_mr_by_iid_resets_discussion_watermark`
- `ingest_mr_by_iid_resets_event_watermark`
- `duplicate_iids_are_idempotent` — dedup/upsert behavior
- `surgical_docs_scope_ignores_preexisting_dirty_rows` — scoping invariant
- `surgical_docs_scope_ignores_preexisting_dirty_rows_for_same_entity` — same-entity scoping edge case
- `preflight_aggregates_multiple_missing_iids` — aggregate-failures behavior
- `sync_run_is_persisted_and_updated` — durable run ledger (extends existing table)
- `stale_payload_is_skipped_when_local_updated_at_is_newer` — TOCTOU protection
- `equal_updated_at_but_newer_last_seen_is_skipped` — TOCTOU equal-timestamp tie-breaking
- `preflight_success_then_ingest_failure_rolls_back_all_content_writes` — transactional rollback
- `surgical_no_docs_requires_no_embed_validation` — embed leakage prevention
- `cancellation_marks_sync_run_cancelled` — cancellation durability
- `dependent_timeout_records_entity_failure_and_continues` — timeout handling
- `scoped_embed_does_not_embed_unrelated_docs_created_after_docs_stage` — embed isolation under concurrency
- `payload_project_id_mismatch_is_rejected_in_preflight` — payload integrity
### For me to evaluate (functional):
6. **Basic surgical sync works**: `cargo run --release -- sync --issue <real_iid> -p <real_project>` fetches the issue, syncs its discussions and events, regenerates docs, and embeds
7. **Multiple IIDs work**: `cargo run --release -- sync --issue 1 --issue 2 --mr 3 -p <project>`
8. **Robot mode works**: `cargo run --release -- --robot sync --issue <iid> -p <project>` returns `{"ok":true,"data":{...},"meta":{...}}` with `stage_timings_ms` and `entity_outcomes` populated
9. **Error: missing project**: `cargo run --release -- sync --issue 1` fails with clear error about `-p` or `defaultProject` being required
10. **Error: nonexistent IID**: `cargo run --release -- sync --issue 999999 -p <project>` returns a clear "not found" error with the IID in the message, and zero DB mutations
11. **Error: mixed valid+invalid IIDs**: `cargo run --release -- sync --issue <valid> --issue 999999 -p <project>` reports BOTH results (1 success, 1 failure) in structured output, with zero DB mutations
12. **No status enrichment**: Surgical sync does NOT call GraphQL status enrichment
13. **--no-docs/--no-embed respected**: `cargo run --release -- sync --issue <iid> -p <project> --no-docs --no-embed` skips those stages
14. **Dirty queue scoping**: After surgical sync of 1 issue, only that issue's documents are regenerated (not all dirty docs from previous syncs — verify by checking the count in docs stage output)
15. **Watermark reset**: After surgical sync, the issue's discussions and events are re-fetched even if they were previously synced (verify via the discussions_fetched count > 0)
16. **Normal sync unaffected**: `cargo run --release -- sync` (without --issue/--mr) still works exactly as before
17. **`lore robot-docs`**: The new `--issue`, `--mr`, `-p`, `--preflight-only` flags appear in the sync command schema
18. **defaultProject fallback**: `cargo run --release -- sync --issue <iid>` works when config has `defaultProject` set
19. **Sync lock**: Surgical sync acquires lock — running two surgical syncs concurrently produces a lock error, not corruption
20. **--full incompatible**: `cargo run --release -- sync --full --issue 1 -p project` fails with clear error
21. **MR dependent stages**: Surgical MR sync fetches closes_issues and MR file diffs when enabled by config
22. **Parse-time validation**: `--issue 0` and `--issue -1` rejected by clap with clear error before any code runs
23. **Hard cap enforcement**: `--issue 1 --issue 2 ... --issue 101` (>100 combined targets) rejected with clear error
24. **Preflight-only mode**: `cargo run --release -- sync --preflight-only --issue <iid> -p <project>` validates the IID exists on GitLab and returns success with `fetched_count` populated, but no content DB mutations (only control-plane run-ledger entries)
25. **Preflight-only requires surgical**: `cargo run --release -- sync --preflight-only` (no --issue/--mr) fails with clear error
26. **Durable run ledger**: After surgical sync, `SELECT * FROM sync_runs WHERE mode = 'surgical' ORDER BY id DESC LIMIT 1` shows the correct phase (`done` on success, `failed` on error, `cancelled` on Ctrl+C) with counters populated
27. **Stage timings in robot output**: Robot mode JSON includes `stage_timings_ms` with entries for each completed stage
28. **Typed preflight error**: Preflight failures serialize as structured `SurgicalPreflightFailed` errors with per-entity details and machine-usable codes, not generic `Other` strings
29. **Dependent stage failures are best-effort**: A failing discussion fetch for one entity does not prevent other entities' dependents from running. Failures are recorded in `entity_failures`.
### Edge cases to verify:
30. **Duplicate IIDs**: `--issue 123 --issue 123` should work without error (deduplicated, idempotent)
31. **Mixed existing + new**: Syncing an IID that's already in the DB should update it
32. **Closed issues**: Surgical sync works for closed issues (state = "closed")
33. **Signal handling**: Ctrl+C during surgical sync records `status='cancelled'`, `phase='cancelled'`, and `finished_at` in `sync_runs` (no dangling `running` rows)
34. **Dry-run zero-write assertion**: `--dry-run --issue 123 -p project` produces zero side effects — no lock acquired, no DB connection opened, no network calls. DB file is byte-identical before and after.
35. **--no-docs without --no-embed rejected**: `--issue 1 -p project --no-docs` without `--no-embed` is rejected with clear error
36. **Lock released before embed**: A normal `lore sync` can acquire the sync lock while a surgical sync is in its embed phase.
### Correctness under contention and rollback:
37. **No partial writes on missing IID**: A mixed set of valid + invalid IIDs (e.g., `--issue 123 --issue 999999`) causes zero DB mutations — the preflight phase catches the 404 before any writes. The structured error report lists ALL failures, not just the first.
38. **Scoped docs**: Only documents tied to this surgical run's dirty source keys are regenerated. Other pending dirty_sources are untouched.
39. **Lock contention test**: A second concurrent surgical sync fails fast with a lock error and produces no writes.
40. **Lock not held during preflight**: Verify that a normal `lore sync` can acquire the lock while a surgical sync is in its preflight (network) phase.
41. **TOCTOU safety**: If a normal sync updates entity after preflight but before ingest, surgical run skips stale payload (no overwrite). Stale skips are reported in `skipped_stale` count.
42. **TOCTOU equal-timestamp safety**: If a concurrent sync fetches the same entity with identical `updated_at` but after our preflight started, surgical run detects the fresher `last_seen_at` and skips.
43. **No embed leakage**: Surgical embed uses `run_embed_for_document_ids` with explicit document IDs from the docs stage — never global `run_embed`. Even if another sync creates unembedded docs between lock release and embed, they are not processed.
44. **No queue artifacts**: Surgical sync creates zero rows in `pending_dependent_fetches`. Verify table is untouched after a surgical run.
### Automated scoping invariants (covered by tests in 1d and 1f):
45. **Scoped docs invariants are enforced by automated tests**, not manual-only verification. Tests `surgical_docs_scope_ignores_preexisting_dirty_rows`, `surgical_docs_scope_ignores_preexisting_dirty_rows_for_same_entity`, and `preflight_aggregates_multiple_missing_iids` prevent regressions.
46. **Rollback and race invariants are enforced by automated tests**: no partial writes on ingest failure, no stale overwrite (strict and equal-timestamp variants).
47. **Cancellation durability**: Ctrl+C during surgical sync records `status='cancelled'`, `phase='cancelled'`, and `finished_at` in `sync_runs`. Verified by `cancellation_marks_sync_run_cancelled` test.
48. **Payload integrity**: Payloads with unexpected `project_id` are rejected in preflight and produce zero content writes. Verified by `payload_project_id_mismatch_is_rejected_in_preflight` test.
49. **Scoped embed isolation under concurrency**: Verified by `scoped_embed_does_not_embed_unrelated_docs_created_after_docs_stage` test.
50. **Timeout path**: Verified by `dependent_timeout_records_entity_failure_and_continues` test (TIMEOUT code + continued processing).
---
## Rejected Recommendations
- **SyncMode enum replacing flat fields in SyncOptions** — rejected because it's overengineered for a boolean distinction (surgical vs standard). The `is_surgical()` helper is simpler, and the enum would require refactoring all existing SyncOptions construction sites. The flat fields approach is how the rest of the codebase works (see `full`, `force`, `dry_run`).
- **`value_delimiter = ','` on --issue/--mr** — rejected because `--issue 1,2,3` is non-idiomatic for this CLI (all other repeatable flags use `--flag val --flag val` pattern) and commas in shell can interact poorly with quoting.
- **Chunked `list_issues_by_iids` / `list_mrs_by_iids` batch fetch via `iids[]` query param** — rejected because surgical sync targets 1-5 IIDs typically (agent refreshing active work). Individual GET requests give precise per-IID error reporting (404 for missing IID) and add zero complexity. Batch optimization is premature for this use case.
- **`--strict` flag for fail-on-missing-IID behavior** — rejected for v1 because the default behavior (fail on 404) is already strict. A `--lenient` flag to skip missing IIDs would be the future extension if needed, not the other way around. Adding --strict now adds flag surface area for a mode that's already the default.
- **Separate `run_sync_surgical_dry_run` function** — rejected because the dry-run check is 5 lines inside `run_sync_surgical` (log + return early). A separate function would duplicate all the setup code (db connection, project resolution) just to print a message.
- **`SurgicalPlan` normalized object** — rejected because validation/dedup/project fallback is ~15 lines in one location (`handle_sync_cmd`). Introducing a dedicated struct with a builder method adds a new type, a new file, and indirection for code that runs exactly once. The flat validation approach is how all other SyncOptions constraints are checked in this codebase.
- **`--with-related` flag for auto-expanding related entities** — rejected for v1 as scope creep. Surgical sync's purpose is agent-driven refresh of known IIDs. Auto-expansion introduces unbounded work (related issues have related MRs have related issues...), requires expansion caps, and complicates the preflight-then-commit contract. This is a good v2 feature tracked separately.
- **Bounded concurrency (`buffer_unordered`) for IID fetches** — rejected for v1 because surgical sync targets 1-5 IIDs (agent refreshing active work). Sequential fetch of 5 items takes <2s. Adding `futures::stream`, `buffer_unordered`, config knobs (`surgical_max_in_flight`), and post-fetch sorting for determinism is premature complexity. If usage patterns show >10 IIDs becoming common, add concurrency then.
- **`origin_run_id` column on `dirty_sources` table** — rejected because it modifies shared schema (`dirty_sources`) for a surgical-only concern. The source-key-based scoping approach is self-contained and requires zero schema changes to `dirty_sources`. Adding a column would require updating all code paths that insert dirty rows.
- **Accept `&Transaction` instead of `&Connection` in surgical ingest functions** — rejected because the existing codebase uniformly uses `&Connection` (rusqlite's `Transaction` derefs to `Connection`). `process_single_issue` and all existing ingestion functions take `&Connection`. Changing surgical functions to `&Transaction` would create an inconsistency and require refactoring callers. The `unchecked_transaction` + `&Connection` pattern is used elsewhere (see `enrich_issue_statuses_txn`).
- **Transient retry policy (5xx/timeout with jittered backoff)** — rejected for surgical sync scope because the existing `request()` method already handles 429 retries. Adding 5xx retry applies to ALL GitLabClient methods, not just surgical. It should be a separate enhancement to the client layer, not coupled to surgical sync.
- **`sync_run_entities` per-entity lifecycle table** — rejected for v1 because it adds significant schema complexity (new table, FK, index, per-entity per-stage row inserts) for observability that can be achieved with simpler means: `SyncResult` already carries `skipped_stale_iids` and `entity_failures` for robot output, and `sync_runs` has aggregate counters. If retry-by-entity becomes a real need, this table is a clean v2 addition.
- **`--retry-failed` flag on `sync-runs` command** — rejected as scope creep for v1. Deterministic retry can be built on top of `sync_runs` data later. For now, agents can simply re-run `lore sync --issue <failed-iid>` based on the structured error output.
- **`--issue-url` / `--mr-url` URL-native surgical targets** — rejected for v1 because agents already have project + IID from their context (lore queries return both). Parsing GitLab URLs introduces URL format fragility (self-hosted instances vary), namespace disambiguation complexity, and additional validation code for marginal ergonomic gain. If copy-paste from browser becomes a common workflow, this is a clean v2 addition.
- **`sync-runs` read command (`lore --robot sync-runs`)** — rejected for v1 scope. The run ledger is useful for observability but the read path is not required for the surgical sync feature itself. Agents can query `sync_runs` directly via `sqlite3` or via a future `lore --robot sync-runs` command. Tracked as a v2 enhancement.
- **Scoped drain helpers with `scope_run_id` column on `pending_dependent_fetches`** — rejected because the inline execution approach (constraint 4) is strictly simpler. Adding `scope_run_id` + `aborted_reason` columns, a new covering index, scoped drain variants, and orphan cleanup logic to `pending_dependent_fetches` is significant schema and code churn for a table with an existing UNIQUE constraint and delete-on-complete semantics. The inline approach avoids all of this and eliminates an entire class of failure modes (orphaned jobs, queue scoping bugs).
- **`failed_run_aborts_pending_scoped_jobs` test** — rejected because surgical mode no longer enqueues jobs to any shared queue. The inline execution model eliminates the need for queue failure hygiene tests. Dependent stage failures are tracked per-entity in `entity_failures`.
- **Separate dependent fetch/write split with lock release between phases** — rejected because it significantly increases complexity for marginal contention reduction. Surgical sync targets 1-5 entities; dependent stage lock hold time is seconds. Splitting fetch (unlocked) and write (short locked transactions) per dependent type would require managing intermediate state, coordinating multiple lock acquire/release cycles, and applying per-entity freshness guards at the dependent level. The inline approach is simpler and adequate at our scale.
- **Global stage timeout budget (`surgical_dependents_budget_seconds`)** — rejected for v1 because per-entity timeout (constraint 16) provides sufficient bounding. A global budget across all entities adds complexity (tracking cumulative time, deciding which entities to skip when budget exhausted) for minimal incremental safety. If surgical sync scales to 50+ entities, this becomes worthwhile.