Add the ability to sync specific issues or merge requests by IID without
running a full incremental sync. This enables fast, targeted data refresh
for individual entities — useful for agent workflows, debugging, and
real-time investigation of specific issues or MRs.
Architecture:
- New CLI flags: --issue <IID> and --mr <IID> (repeatable, up to 100 total)
scoped to a single project via -p/--project
- Preflight phase validates all IIDs exist on GitLab before any DB writes,
with TOCTOU-aware soft verification at ingest time
- 6-stage pipeline: preflight -> fetch -> ingest -> dependents -> docs -> embed
- Each stage is cancellation-aware via ShutdownSignal
- Dedicated SyncRunRecorder extensions track surgical-specific counters
(issues_fetched, mrs_ingested, docs_regenerated, etc.)
New modules:
- src/ingestion/surgical.rs: Core surgical fetch/ingest/dependent logic
with preflight_fetch(), ingest_issue_by_iid(), ingest_mr_by_iid(),
and fetch_dependents_for_{issue,mr}()
- src/cli/commands/sync_surgical.rs: Full CLI orchestrator with progress
spinners, human/robot output, and cancellation handling
- src/embedding/pipeline.rs: embed_documents_by_ids() for scoped embedding
- src/documents/regenerator.rs: regenerate_dirty_documents_for_sources()
for scoped document regeneration
Database changes:
- Migration 027: Extends sync_runs with mode, phase, surgical_iids_json,
per-entity counters, and cancelled_at column
- New indexes: idx_sync_runs_mode_started, idx_sync_runs_status_phase_started
GitLab client:
- get_issue_by_iid() and get_mr_by_iid() single-entity fetch methods
Error handling:
- New SurgicalPreflightFailed error variant with entity_type, iid, project,
and reason fields. Shares exit code 6 with GitLabNotFound.
Includes comprehensive test coverage:
- 645 lines of surgical ingestion tests (wiremock-based)
- 184 lines of scoped embedding tests
- 85 lines of scoped regeneration tests
- 113 lines of GitLab client single-entity tests
- 236 lines of sync_run surgical column/counter tests
- Unit tests for SyncOptions, error codes, and CLI validation
482 lines
14 KiB
Rust
482 lines
14 KiB
Rust
use std::ops::Deref;
|
|
|
|
use rusqlite::{Connection, Transaction, params};
|
|
use tracing::{debug, warn};
|
|
|
|
use crate::Config;
|
|
use crate::core::error::{LoreError, Result};
|
|
use crate::core::payloads::{StorePayloadOptions, store_payload};
|
|
use crate::core::shutdown::ShutdownSignal;
|
|
use crate::core::time::now_ms;
|
|
use crate::documents::SourceType;
|
|
use crate::gitlab::GitLabClient;
|
|
use crate::gitlab::transformers::merge_request::transform_merge_request;
|
|
use crate::gitlab::types::GitLabMergeRequest;
|
|
use crate::ingestion::dirty_tracker;
|
|
|
|
#[derive(Debug, Default)]
|
|
pub struct IngestMergeRequestsResult {
|
|
pub fetched: usize,
|
|
pub upserted: usize,
|
|
pub labels_created: usize,
|
|
pub assignees_linked: usize,
|
|
pub reviewers_linked: usize,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct MrForDiscussionSync {
|
|
pub local_mr_id: i64,
|
|
pub iid: i64,
|
|
pub updated_at: i64,
|
|
}
|
|
|
|
#[derive(Debug, Default)]
|
|
struct SyncCursor {
|
|
updated_at_cursor: Option<i64>,
|
|
tie_breaker_id: Option<i64>,
|
|
}
|
|
|
|
pub async fn ingest_merge_requests(
|
|
conn: &Connection,
|
|
client: &GitLabClient,
|
|
config: &Config,
|
|
project_id: i64,
|
|
gitlab_project_id: i64,
|
|
full_sync: bool,
|
|
signal: &ShutdownSignal,
|
|
) -> Result<IngestMergeRequestsResult> {
|
|
let mut result = IngestMergeRequestsResult::default();
|
|
|
|
if full_sync {
|
|
reset_sync_cursor(conn, project_id)?;
|
|
reset_discussion_watermarks(conn, project_id)?;
|
|
debug!("Full sync: cursor and discussion watermarks reset");
|
|
}
|
|
|
|
let cursor = get_sync_cursor(conn, project_id)?;
|
|
debug!(?cursor, "Starting MR ingestion with cursor");
|
|
|
|
let mut page = 1u32;
|
|
let per_page = 100u32;
|
|
|
|
loop {
|
|
if signal.is_cancelled() {
|
|
debug!("MR ingestion interrupted by shutdown signal");
|
|
break;
|
|
}
|
|
let page_result = client
|
|
.fetch_merge_requests_page(
|
|
gitlab_project_id,
|
|
cursor.updated_at_cursor,
|
|
config.sync.cursor_rewind_seconds,
|
|
page,
|
|
per_page,
|
|
)
|
|
.await?;
|
|
|
|
let mut last_updated_at: Option<i64> = None;
|
|
let mut last_gitlab_id: Option<i64> = None;
|
|
|
|
for mr in &page_result.items {
|
|
result.fetched += 1;
|
|
|
|
let mr_updated_at = match parse_timestamp(&mr.updated_at) {
|
|
Ok(ts) => ts,
|
|
Err(e) => {
|
|
warn!(
|
|
gitlab_id = mr.id,
|
|
error = %e,
|
|
"Skipping MR with invalid timestamp"
|
|
);
|
|
continue;
|
|
}
|
|
};
|
|
|
|
if !passes_cursor_filter_with_ts(mr.id, mr_updated_at, &cursor) {
|
|
debug!(gitlab_id = mr.id, "Skipping already-processed MR");
|
|
continue;
|
|
}
|
|
|
|
let mr_result = process_single_mr(conn, config, project_id, mr)?;
|
|
result.upserted += 1;
|
|
result.labels_created += mr_result.labels_created;
|
|
result.assignees_linked += mr_result.assignees_linked;
|
|
result.reviewers_linked += mr_result.reviewers_linked;
|
|
|
|
last_updated_at = Some(mr_updated_at);
|
|
last_gitlab_id = Some(mr.id);
|
|
}
|
|
|
|
if let (Some(ts), Some(id)) = (last_updated_at, last_gitlab_id) {
|
|
update_sync_cursor(conn, project_id, ts, id)?;
|
|
debug!(page, "Page-boundary cursor update");
|
|
}
|
|
|
|
if page_result.is_last_page {
|
|
break;
|
|
}
|
|
match page_result.next_page {
|
|
Some(np) => page = np,
|
|
None => break,
|
|
}
|
|
}
|
|
|
|
debug!(
|
|
summary = crate::ingestion::nonzero_summary(&[
|
|
("fetched", result.fetched),
|
|
("upserted", result.upserted),
|
|
("labels", result.labels_created),
|
|
("assignees", result.assignees_linked),
|
|
("reviewers", result.reviewers_linked),
|
|
]),
|
|
"MR ingestion"
|
|
);
|
|
|
|
Ok(result)
|
|
}
|
|
|
|
pub(crate) struct ProcessMrResult {
|
|
pub(crate) labels_created: usize,
|
|
pub(crate) assignees_linked: usize,
|
|
pub(crate) reviewers_linked: usize,
|
|
}
|
|
|
|
pub(crate) fn process_single_mr(
|
|
conn: &Connection,
|
|
config: &Config,
|
|
project_id: i64,
|
|
mr: &GitLabMergeRequest,
|
|
) -> Result<ProcessMrResult> {
|
|
let payload_bytes = serde_json::to_vec(mr)?;
|
|
let transformed = transform_merge_request(mr, project_id)
|
|
.map_err(|e| LoreError::Other(format!("MR transform failed: {}", e)))?;
|
|
|
|
let tx = conn.unchecked_transaction()?;
|
|
let result =
|
|
process_mr_in_transaction(&tx, config, project_id, mr, &payload_bytes, &transformed)?;
|
|
tx.commit()?;
|
|
|
|
Ok(result)
|
|
}
|
|
|
|
fn process_mr_in_transaction(
|
|
tx: &Transaction<'_>,
|
|
config: &Config,
|
|
project_id: i64,
|
|
mr: &GitLabMergeRequest,
|
|
payload_bytes: &[u8],
|
|
transformed: &crate::gitlab::transformers::merge_request::MergeRequestWithMetadata,
|
|
) -> Result<ProcessMrResult> {
|
|
let mut labels_created = 0;
|
|
let mr_row = &transformed.merge_request;
|
|
let now = now_ms();
|
|
|
|
let payload_id = store_payload(
|
|
tx.deref(),
|
|
StorePayloadOptions {
|
|
project_id: Some(project_id),
|
|
resource_type: "merge_request",
|
|
gitlab_id: &mr.id.to_string(),
|
|
json_bytes: payload_bytes,
|
|
compress: config.storage.compress_raw_payloads,
|
|
},
|
|
)?;
|
|
|
|
tx.execute(
|
|
"INSERT INTO merge_requests (
|
|
gitlab_id, project_id, iid, title, description, state, draft,
|
|
author_username, source_branch, target_branch, head_sha,
|
|
references_short, references_full, detailed_merge_status,
|
|
merge_user_username, created_at, updated_at, merged_at, closed_at,
|
|
last_seen_at, web_url, raw_payload_id,
|
|
merge_commit_sha, squash_commit_sha
|
|
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22, ?23, ?24)
|
|
ON CONFLICT(gitlab_id) DO UPDATE SET
|
|
title = excluded.title,
|
|
description = excluded.description,
|
|
state = excluded.state,
|
|
draft = excluded.draft,
|
|
author_username = excluded.author_username,
|
|
source_branch = excluded.source_branch,
|
|
target_branch = excluded.target_branch,
|
|
head_sha = excluded.head_sha,
|
|
references_short = excluded.references_short,
|
|
references_full = excluded.references_full,
|
|
detailed_merge_status = excluded.detailed_merge_status,
|
|
merge_user_username = excluded.merge_user_username,
|
|
updated_at = excluded.updated_at,
|
|
merged_at = excluded.merged_at,
|
|
closed_at = excluded.closed_at,
|
|
last_seen_at = excluded.last_seen_at,
|
|
web_url = excluded.web_url,
|
|
raw_payload_id = excluded.raw_payload_id,
|
|
merge_commit_sha = excluded.merge_commit_sha,
|
|
squash_commit_sha = excluded.squash_commit_sha",
|
|
params![
|
|
mr_row.gitlab_id,
|
|
project_id,
|
|
mr_row.iid,
|
|
&mr_row.title,
|
|
&mr_row.description,
|
|
&mr_row.state,
|
|
mr_row.draft,
|
|
&mr_row.author_username,
|
|
&mr_row.source_branch,
|
|
&mr_row.target_branch,
|
|
&mr_row.head_sha,
|
|
&mr_row.references_short,
|
|
&mr_row.references_full,
|
|
&mr_row.detailed_merge_status,
|
|
&mr_row.merge_user_username,
|
|
mr_row.created_at,
|
|
mr_row.updated_at,
|
|
mr_row.merged_at,
|
|
mr_row.closed_at,
|
|
now,
|
|
&mr_row.web_url,
|
|
payload_id,
|
|
&mr_row.merge_commit_sha,
|
|
&mr_row.squash_commit_sha,
|
|
],
|
|
)?;
|
|
|
|
let local_mr_id: i64 = tx.query_row(
|
|
"SELECT id FROM merge_requests WHERE project_id = ? AND iid = ?",
|
|
(project_id, mr_row.iid),
|
|
|row| row.get(0),
|
|
)?;
|
|
|
|
dirty_tracker::mark_dirty_tx(tx, SourceType::MergeRequest, local_mr_id)?;
|
|
|
|
tx.execute(
|
|
"DELETE FROM mr_labels WHERE merge_request_id = ?",
|
|
[local_mr_id],
|
|
)?;
|
|
for label_name in &transformed.label_names {
|
|
let label_id = upsert_label_tx(tx, project_id, label_name, &mut labels_created)?;
|
|
tx.execute(
|
|
"INSERT OR IGNORE INTO mr_labels (merge_request_id, label_id) VALUES (?, ?)",
|
|
(local_mr_id, label_id),
|
|
)?;
|
|
}
|
|
|
|
tx.execute(
|
|
"DELETE FROM mr_assignees WHERE merge_request_id = ?",
|
|
[local_mr_id],
|
|
)?;
|
|
let assignees_linked = transformed.assignee_usernames.len();
|
|
for username in &transformed.assignee_usernames {
|
|
tx.execute(
|
|
"INSERT OR IGNORE INTO mr_assignees (merge_request_id, username) VALUES (?, ?)",
|
|
(local_mr_id, username),
|
|
)?;
|
|
}
|
|
|
|
tx.execute(
|
|
"DELETE FROM mr_reviewers WHERE merge_request_id = ?",
|
|
[local_mr_id],
|
|
)?;
|
|
let reviewers_linked = transformed.reviewer_usernames.len();
|
|
for username in &transformed.reviewer_usernames {
|
|
tx.execute(
|
|
"INSERT OR IGNORE INTO mr_reviewers (merge_request_id, username) VALUES (?, ?)",
|
|
(local_mr_id, username),
|
|
)?;
|
|
}
|
|
|
|
Ok(ProcessMrResult {
|
|
labels_created,
|
|
assignees_linked,
|
|
reviewers_linked,
|
|
})
|
|
}
|
|
|
|
fn upsert_label_tx(
|
|
tx: &Transaction<'_>,
|
|
project_id: i64,
|
|
name: &str,
|
|
created_count: &mut usize,
|
|
) -> Result<i64> {
|
|
tx.execute(
|
|
"INSERT OR IGNORE INTO labels (project_id, name) VALUES (?1, ?2)",
|
|
(project_id, name),
|
|
)?;
|
|
|
|
if tx.changes() > 0 {
|
|
*created_count += 1;
|
|
}
|
|
|
|
let id: i64 = tx.query_row(
|
|
"SELECT id FROM labels WHERE project_id = ?1 AND name = ?2",
|
|
(project_id, name),
|
|
|row| row.get(0),
|
|
)?;
|
|
|
|
Ok(id)
|
|
}
|
|
|
|
fn passes_cursor_filter_with_ts(gitlab_id: i64, mr_ts: i64, cursor: &SyncCursor) -> bool {
|
|
let Some(cursor_ts) = cursor.updated_at_cursor else {
|
|
return true;
|
|
};
|
|
|
|
if mr_ts < cursor_ts {
|
|
return false;
|
|
}
|
|
|
|
if mr_ts == cursor_ts
|
|
&& let Some(cursor_id) = cursor.tie_breaker_id
|
|
&& gitlab_id <= cursor_id
|
|
{
|
|
return false;
|
|
}
|
|
|
|
true
|
|
}
|
|
|
|
fn get_sync_cursor(conn: &Connection, project_id: i64) -> Result<SyncCursor> {
|
|
let row: Option<(Option<i64>, Option<i64>)> = conn
|
|
.query_row(
|
|
"SELECT updated_at_cursor, tie_breaker_id FROM sync_cursors
|
|
WHERE project_id = ? AND resource_type = 'merge_requests'",
|
|
[project_id],
|
|
|row| Ok((row.get(0)?, row.get(1)?)),
|
|
)
|
|
.ok();
|
|
|
|
Ok(match row {
|
|
Some((updated_at, tie_breaker)) => SyncCursor {
|
|
updated_at_cursor: updated_at,
|
|
tie_breaker_id: tie_breaker,
|
|
},
|
|
None => SyncCursor::default(),
|
|
})
|
|
}
|
|
|
|
fn update_sync_cursor(
|
|
conn: &Connection,
|
|
project_id: i64,
|
|
updated_at: i64,
|
|
gitlab_id: i64,
|
|
) -> Result<()> {
|
|
conn.execute(
|
|
"INSERT INTO sync_cursors (project_id, resource_type, updated_at_cursor, tie_breaker_id)
|
|
VALUES (?1, 'merge_requests', ?2, ?3)
|
|
ON CONFLICT(project_id, resource_type) DO UPDATE SET
|
|
updated_at_cursor = excluded.updated_at_cursor,
|
|
tie_breaker_id = excluded.tie_breaker_id",
|
|
(project_id, updated_at, gitlab_id),
|
|
)?;
|
|
Ok(())
|
|
}
|
|
|
|
fn reset_sync_cursor(conn: &Connection, project_id: i64) -> Result<()> {
|
|
conn.execute(
|
|
"DELETE FROM sync_cursors WHERE project_id = ? AND resource_type = 'merge_requests'",
|
|
[project_id],
|
|
)?;
|
|
Ok(())
|
|
}
|
|
|
|
fn reset_discussion_watermarks(conn: &Connection, project_id: i64) -> Result<()> {
|
|
conn.execute(
|
|
"UPDATE merge_requests
|
|
SET discussions_synced_for_updated_at = NULL,
|
|
discussions_sync_attempts = 0,
|
|
discussions_sync_last_error = NULL,
|
|
resource_events_synced_for_updated_at = NULL,
|
|
closes_issues_synced_for_updated_at = NULL,
|
|
diffs_synced_for_updated_at = NULL
|
|
WHERE project_id = ?",
|
|
[project_id],
|
|
)?;
|
|
Ok(())
|
|
}
|
|
|
|
pub fn get_mrs_needing_discussion_sync(
|
|
conn: &Connection,
|
|
project_id: i64,
|
|
) -> Result<Vec<MrForDiscussionSync>> {
|
|
let mut stmt = conn.prepare(
|
|
"SELECT id, iid, updated_at FROM merge_requests
|
|
WHERE project_id = ?
|
|
AND updated_at > COALESCE(discussions_synced_for_updated_at, 0)",
|
|
)?;
|
|
|
|
let mrs: std::result::Result<Vec<_>, _> = stmt
|
|
.query_map([project_id], |row| {
|
|
Ok(MrForDiscussionSync {
|
|
local_mr_id: row.get(0)?,
|
|
iid: row.get(1)?,
|
|
updated_at: row.get(2)?,
|
|
})
|
|
})?
|
|
.collect();
|
|
|
|
Ok(mrs?)
|
|
}
|
|
|
|
fn parse_timestamp(ts: &str) -> Result<i64> {
|
|
chrono::DateTime::parse_from_rfc3339(ts)
|
|
.map(|dt| dt.timestamp_millis())
|
|
.map_err(|e| LoreError::Other(format!("Failed to parse timestamp '{}': {}", ts, e)))
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
#[test]
|
|
fn result_default_has_zero_counts() {
|
|
let result = IngestMergeRequestsResult::default();
|
|
assert_eq!(result.fetched, 0);
|
|
assert_eq!(result.upserted, 0);
|
|
assert_eq!(result.labels_created, 0);
|
|
assert_eq!(result.assignees_linked, 0);
|
|
assert_eq!(result.reviewers_linked, 0);
|
|
}
|
|
|
|
#[test]
|
|
fn cursor_filter_allows_newer_mrs() {
|
|
let cursor = SyncCursor {
|
|
updated_at_cursor: Some(1705312800000),
|
|
tie_breaker_id: Some(100),
|
|
};
|
|
|
|
let later_ts = 1705399200000;
|
|
assert!(passes_cursor_filter_with_ts(101, later_ts, &cursor));
|
|
}
|
|
|
|
#[test]
|
|
fn cursor_filter_blocks_older_mrs() {
|
|
let cursor = SyncCursor {
|
|
updated_at_cursor: Some(1705312800000),
|
|
tie_breaker_id: Some(100),
|
|
};
|
|
|
|
let earlier_ts = 1705226400000;
|
|
assert!(!passes_cursor_filter_with_ts(99, earlier_ts, &cursor));
|
|
}
|
|
|
|
#[test]
|
|
fn cursor_filter_uses_tie_breaker_for_same_timestamp() {
|
|
let cursor = SyncCursor {
|
|
updated_at_cursor: Some(1705312800000),
|
|
tie_breaker_id: Some(100),
|
|
};
|
|
|
|
assert!(passes_cursor_filter_with_ts(101, 1705312800000, &cursor));
|
|
|
|
assert!(!passes_cursor_filter_with_ts(100, 1705312800000, &cursor));
|
|
|
|
assert!(!passes_cursor_filter_with_ts(99, 1705312800000, &cursor));
|
|
}
|
|
|
|
#[test]
|
|
fn cursor_filter_allows_all_when_no_cursor() {
|
|
let cursor = SyncCursor::default();
|
|
let old_ts = 1577836800000;
|
|
assert!(passes_cursor_filter_with_ts(1, old_ts, &cursor));
|
|
}
|
|
}
|