feat(sync): fetch and store GitLab issue links (bd-343o)

Add end-to-end support for GitLab issue link fetching:
- New GitLabIssueLink type + fetch_issue_links API client method
- Migration 029: add issue_links job type and watermark column
- issue_links.rs: bidirectional entity_reference storage with
  self-link skip, cross-project fallback, idempotent upsert
- Drain pipeline in orchestrator following mr_closes_issues pattern
- Display related issues in 'lore show issues' output
- --no-issue-links CLI flag with config, autocorrect, robot-docs
- 6 unit tests for storage logic
This commit is contained in:
teernisse
2026-02-19 09:26:28 -05:00
parent 9a1dbda522
commit 1e679a6d72
18 changed files with 2051 additions and 8 deletions

File diff suppressed because one or more lines are too long

View File

@@ -1 +1 @@
bd-2fc
bd-9lbr

View File

@@ -5,7 +5,7 @@
//! Built on FrankenTUI (Elm architecture): Model, update, view.
//! The `lore` CLI spawns `lore-tui` via PATH lookup at runtime.
use anyhow::Result;
use anyhow::{Context, Result};
// Phase 0 modules.
pub mod clock; // Clock trait: SystemClock + FakeClock (bd-2lg6)
@@ -71,9 +71,40 @@ pub struct LaunchOptions {
/// 2. **Data readiness** — check whether the database has any entity data.
/// If empty, start on the Bootstrap screen; otherwise start on Dashboard.
pub fn launch_tui(options: LaunchOptions) -> Result<()> {
let _options = options;
// Phase 1 will wire this to LoreApp + App::fullscreen().run()
eprintln!("lore-tui: browse mode not yet implemented (Phase 1)");
// 1. Resolve database path.
let db_path = lore::core::paths::get_db_path(None);
if !db_path.exists() {
anyhow::bail!(
"No lore database found at {}.\n\
Run 'lore init' to create a config, then 'lore sync' to fetch data.",
db_path.display()
);
}
// 2. Open DB and run schema preflight.
let db = db::DbManager::open(&db_path)
.with_context(|| format!("opening database at {}", db_path.display()))?;
db.with_reader(|conn| schema_preflight(conn))?;
// 3. Check data readiness — bootstrap screen if empty.
let start_on_bootstrap = db.with_reader(|conn| {
let readiness = action::check_data_readiness(conn)?;
Ok(!readiness.has_any_data())
})?;
// 4. Build the app model.
let mut app = app::LoreApp::new();
app.db = Some(db);
if start_on_bootstrap {
app.navigation.reset_to(message::Screen::Bootstrap);
}
// 5. Enter the FrankenTUI event loop.
ftui::App::fullscreen(app)
.with_mouse()
.run()
.context("running TUI event loop")?;
Ok(())
}

View File

@@ -0,0 +1,43 @@
-- Migration 029: Expand pending_dependent_fetches CHECK to include 'issue_links' job type.
-- Also adds issue_links_synced_for_updated_at watermark to issues table.
-- SQLite cannot ALTER CHECK constraints, so we recreate the table.
-- Step 1: Recreate pending_dependent_fetches with expanded CHECK
CREATE TABLE pending_dependent_fetches_new (
id INTEGER PRIMARY KEY,
project_id INTEGER NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
entity_type TEXT NOT NULL CHECK (entity_type IN ('issue', 'merge_request')),
entity_iid INTEGER NOT NULL,
entity_local_id INTEGER NOT NULL,
job_type TEXT NOT NULL CHECK (job_type IN (
'resource_events', 'mr_closes_issues', 'mr_diffs', 'issue_links'
)),
payload_json TEXT,
enqueued_at INTEGER NOT NULL,
locked_at INTEGER,
attempts INTEGER NOT NULL DEFAULT 0,
next_retry_at INTEGER,
last_error TEXT
);
INSERT INTO pending_dependent_fetches_new
SELECT * FROM pending_dependent_fetches;
DROP TABLE pending_dependent_fetches;
ALTER TABLE pending_dependent_fetches_new RENAME TO pending_dependent_fetches;
-- Recreate indexes from migration 011
CREATE UNIQUE INDEX uq_pending_fetches
ON pending_dependent_fetches(project_id, entity_type, entity_iid, job_type);
CREATE INDEX idx_pending_fetches_claimable
ON pending_dependent_fetches(job_type, locked_at) WHERE locked_at IS NULL;
CREATE INDEX idx_pending_fetches_retryable
ON pending_dependent_fetches(next_retry_at) WHERE locked_at IS NULL AND next_retry_at IS NOT NULL;
-- Step 2: Add watermark column for issue link sync tracking
ALTER TABLE issues ADD COLUMN issue_links_synced_for_updated_at INTEGER;
-- Update schema version
INSERT INTO schema_version (version, applied_at, description)
VALUES (29, strftime('%s', 'now') * 1000, 'Expand dependent fetch queue for issue links');

View File

@@ -125,6 +125,7 @@ const COMMAND_FLAGS: &[(&str, &[&str])] = &[
"--no-events",
"--no-file-changes",
"--no-status",
"--no-issue-links",
"--dry-run",
"--no-dry-run",
"--timings",

1177
src/cli/commands/explain.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -590,6 +590,9 @@ async fn run_ingest_inner(
}
}
ProgressEvent::StatusEnrichmentSkipped => {}
ProgressEvent::IssueLinksFetchStarted { .. }
| ProgressEvent::IssueLinkFetched { .. }
| ProgressEvent::IssueLinksFetchComplete { .. } => {}
})
};

View File

@@ -3,6 +3,7 @@ pub mod count;
pub mod doctor;
pub mod drift;
pub mod embed;
pub mod explain;
pub mod file_history;
pub mod generate_docs;
pub mod ingest;
@@ -29,6 +30,7 @@ pub use count::{
pub use doctor::{DoctorChecks, print_doctor_results, run_doctor};
pub use drift::{DriftResponse, print_drift_human, print_drift_json, run_drift};
pub use embed::{print_embed, print_embed_json, run_embed};
pub use explain::{ExplainResponse, print_explain_human, print_explain_json, run_explain};
pub use file_history::{print_file_history, print_file_history_json, run_file_history};
pub use generate_docs::{print_generate_docs, print_generate_docs_json, run_generate_docs};
pub use ingest::{

View File

@@ -65,6 +65,16 @@ pub struct ClosingMrRef {
pub web_url: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
pub struct RelatedIssueRef {
pub iid: i64,
pub title: String,
pub state: String,
pub web_url: Option<String>,
/// For unresolved cross-project refs
pub project_path: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct IssueDetail {
pub id: i64,
@@ -87,6 +97,7 @@ pub struct IssueDetail {
pub user_notes_count: i64,
pub merge_requests_count: usize,
pub closing_merge_requests: Vec<ClosingMrRef>,
pub related_issues: Vec<RelatedIssueRef>,
pub discussions: Vec<DiscussionDetail>,
pub status_name: Option<String>,
pub status_category: Option<String>,
@@ -125,6 +136,8 @@ pub fn run_show_issue(
let closing_mrs = get_closing_mrs(&conn, issue.id)?;
let related_issues = get_related_issues(&conn, issue.id)?;
let discussions = get_issue_discussions(&conn, issue.id)?;
let references_full = format!("{}#{}", issue.project_path, issue.iid);
@@ -151,6 +164,7 @@ pub fn run_show_issue(
user_notes_count: issue.user_notes_count,
merge_requests_count,
closing_merge_requests: closing_mrs,
related_issues,
discussions,
status_name: issue.status_name,
status_category: issue.status_category,
@@ -321,6 +335,54 @@ fn get_closing_mrs(conn: &Connection, issue_id: i64) -> Result<Vec<ClosingMrRef>
Ok(mrs)
}
fn get_related_issues(conn: &Connection, issue_id: i64) -> Result<Vec<RelatedIssueRef>> {
// Resolved local references: source or target side
let mut stmt = conn.prepare(
"SELECT DISTINCT i.iid, i.title, i.state, i.web_url, NULL AS project_path
FROM entity_references er
JOIN issues i ON i.id = er.target_entity_id
WHERE er.source_entity_type = 'issue'
AND er.source_entity_id = ?1
AND er.target_entity_type = 'issue'
AND er.reference_type = 'related'
AND er.target_entity_id IS NOT NULL
UNION
SELECT DISTINCT i.iid, i.title, i.state, i.web_url, NULL AS project_path
FROM entity_references er
JOIN issues i ON i.id = er.source_entity_id
WHERE er.target_entity_type = 'issue'
AND er.target_entity_id = ?1
AND er.source_entity_type = 'issue'
AND er.reference_type = 'related'
UNION
SELECT er.target_entity_iid AS iid, NULL AS title, NULL AS state, NULL AS web_url,
er.target_project_path AS project_path
FROM entity_references er
WHERE er.source_entity_type = 'issue'
AND er.source_entity_id = ?1
AND er.target_entity_type = 'issue'
AND er.reference_type = 'related'
AND er.target_entity_id IS NULL
ORDER BY iid",
)?;
let related: Vec<RelatedIssueRef> = stmt
.query_map([issue_id], |row| {
Ok(RelatedIssueRef {
iid: row.get(0)?,
title: row.get::<_, Option<String>>(1)?.unwrap_or_default(),
state: row
.get::<_, Option<String>>(2)?
.unwrap_or_else(|| "unknown".to_string()),
web_url: row.get(3)?,
project_path: row.get(4)?,
})
})?
.collect::<std::result::Result<Vec<_>, _>>()?;
Ok(related)
}
fn get_issue_discussions(conn: &Connection, issue_id: i64) -> Result<Vec<DiscussionDetail>> {
let mut disc_stmt = conn.prepare(
"SELECT id, individual_note FROM discussions
@@ -729,6 +791,38 @@ pub fn print_show_issue(issue: &IssueDetail) {
}
}
// Related Issues section
if !issue.related_issues.is_empty() {
println!(
"{}",
render::section_divider(&format!("Related Issues ({})", issue.related_issues.len()))
);
for rel in &issue.related_issues {
let (icon, style) = match rel.state.as_str() {
"opened" => (Icons::issue_opened(), Theme::success()),
"closed" => (Icons::issue_closed(), Theme::dim()),
_ => (Icons::issue_opened(), Theme::muted()),
};
if let Some(project_path) = &rel.project_path {
println!(
" {} {}#{} {}",
Theme::muted().render(icon),
project_path,
rel.iid,
Theme::muted().render("(cross-project, unresolved)"),
);
} else {
println!(
" {} #{} {} {}",
style.render(icon),
rel.iid,
rel.title,
style.render(&rel.state),
);
}
}
}
// Description section
println!("{}", render::section_divider("Description"));
if let Some(desc) = &issue.description {

View File

@@ -804,6 +804,10 @@ pub struct SyncArgs {
#[arg(long = "no-status")]
pub no_status: bool,
/// Skip issue link fetching (overrides config)
#[arg(long = "no-issue-links")]
pub no_issue_links: bool,
/// Preview what would be synced without making changes
#[arg(long, overrides_with = "no_dry_run")]
pub dry_run: bool,

View File

@@ -55,6 +55,9 @@ pub struct SyncConfig {
#[serde(rename = "fetchWorkItemStatus", default = "default_true")]
pub fetch_work_item_status: bool,
#[serde(rename = "fetchIssueLinks", default = "default_true")]
pub fetch_issue_links: bool,
}
fn default_true() -> bool {
@@ -74,6 +77,7 @@ impl Default for SyncConfig {
fetch_resource_events: true,
fetch_mr_file_changes: true,
fetch_work_item_status: true,
fetch_issue_links: true,
}
}
}

View File

@@ -97,6 +97,10 @@ const MIGRATIONS: &[(&str, &str)] = &[
"028",
include_str!("../../migrations/028_surgical_sync_runs.sql"),
),
(
"029",
include_str!("../../migrations/029_issue_links_job_type.sql"),
),
];
pub fn create_connection(db_path: &Path) -> Result<Connection> {

View File

@@ -627,6 +627,15 @@ impl GitLabClient {
self.fetch_all_pages(&path).await
}
pub async fn fetch_issue_links(
&self,
gitlab_project_id: i64,
issue_iid: i64,
) -> Result<Vec<crate::gitlab::types::GitLabIssueLink>> {
let path = format!("/api/v4/projects/{gitlab_project_id}/issues/{issue_iid}/links");
coalesce_not_found(self.fetch_all_pages(&path).await)
}
pub async fn fetch_mr_diffs(
&self,
gitlab_project_id: i64,

View File

@@ -263,6 +263,21 @@ pub struct GitLabMergeRequest {
pub squash_commit_sha: Option<String>,
}
/// Linked issue returned by GitLab's issue links API.
/// GET /projects/:id/issues/:iid/links
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct GitLabIssueLink {
pub id: i64,
pub iid: i64,
pub project_id: i64,
pub title: String,
pub state: String,
pub web_url: String,
/// "relates_to", "blocks", or "is_blocked_by"
pub link_type: String,
pub link_created_at: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkItemStatus {
pub name: String,

View File

@@ -0,0 +1,397 @@
use rusqlite::Connection;
use tracing::debug;
use crate::core::error::Result;
use crate::core::references::{
EntityReference, insert_entity_reference, resolve_issue_local_id, resolve_project_path,
};
use crate::gitlab::types::GitLabIssueLink;
/// Store issue links as bidirectional entity_references.
///
/// For each linked issue:
/// - Creates A -> B reference (source -> target)
/// - Creates B -> A reference (target -> source)
/// - Skips self-links
/// - Stores unresolved cross-project links (target_entity_id = NULL)
pub fn store_issue_links(
conn: &Connection,
project_id: i64,
source_issue_local_id: i64,
source_issue_iid: i64,
links: &[GitLabIssueLink],
) -> Result<usize> {
let mut stored = 0;
for link in links {
// Skip self-links
if link.iid == source_issue_iid
&& link.project_id == resolve_gitlab_project_id(conn, project_id)?.unwrap_or(-1)
{
debug!(source_iid = source_issue_iid, "Skipping self-link");
continue;
}
let target_local_id =
if link.project_id == resolve_gitlab_project_id(conn, project_id)?.unwrap_or(-1) {
resolve_issue_local_id(conn, project_id, link.iid)?
} else {
// Cross-project link: try to find in our DB
resolve_issue_by_gitlab_project(conn, link.project_id, link.iid)?
};
let (target_id, target_path, target_iid) = if let Some(local_id) = target_local_id {
(Some(local_id), None, None)
} else {
let path = resolve_project_path(conn, link.project_id)?;
let fallback = path.unwrap_or_else(|| format!("gitlab_project:{}", link.project_id));
(None, Some(fallback), Some(link.iid))
};
// Forward reference: source -> target
let forward = EntityReference {
project_id,
source_entity_type: "issue",
source_entity_id: source_issue_local_id,
target_entity_type: "issue",
target_entity_id: target_id,
target_project_path: target_path.as_deref(),
target_entity_iid: target_iid,
reference_type: "related",
source_method: "api",
};
if insert_entity_reference(conn, &forward)? {
stored += 1;
}
// Reverse reference: target -> source (only if target is resolved locally)
if let Some(target_local) = target_id {
let reverse = EntityReference {
project_id,
source_entity_type: "issue",
source_entity_id: target_local,
target_entity_type: "issue",
target_entity_id: Some(source_issue_local_id),
target_project_path: None,
target_entity_iid: None,
reference_type: "related",
source_method: "api",
};
if insert_entity_reference(conn, &reverse)? {
stored += 1;
}
}
}
Ok(stored)
}
/// Resolve the gitlab_project_id for a local project_id.
fn resolve_gitlab_project_id(conn: &Connection, project_id: i64) -> Result<Option<i64>> {
use rusqlite::OptionalExtension;
let result = conn
.query_row(
"SELECT gitlab_project_id FROM projects WHERE id = ?1",
[project_id],
|row| row.get(0),
)
.optional()?;
Ok(result)
}
/// Resolve an issue local ID by gitlab_project_id and iid (cross-project).
fn resolve_issue_by_gitlab_project(
conn: &Connection,
gitlab_project_id: i64,
issue_iid: i64,
) -> Result<Option<i64>> {
use rusqlite::OptionalExtension;
let result = conn
.query_row(
"SELECT i.id FROM issues i
JOIN projects p ON p.id = i.project_id
WHERE p.gitlab_project_id = ?1 AND i.iid = ?2",
rusqlite::params![gitlab_project_id, issue_iid],
|row| row.get(0),
)
.optional()?;
Ok(result)
}
/// Update the issue_links watermark after successful sync.
pub fn update_issue_links_watermark(conn: &Connection, issue_local_id: i64) -> Result<()> {
conn.execute(
"UPDATE issues SET issue_links_synced_for_updated_at = updated_at WHERE id = ?",
[issue_local_id],
)?;
Ok(())
}
/// Update the issue_links watermark within a transaction.
pub fn update_issue_links_watermark_tx(
tx: &rusqlite::Transaction<'_>,
issue_local_id: i64,
) -> Result<()> {
tx.execute(
"UPDATE issues SET issue_links_synced_for_updated_at = updated_at WHERE id = ?",
[issue_local_id],
)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::db::{create_connection, run_migrations};
use std::path::Path;
fn setup_test_db() -> Connection {
let conn = create_connection(Path::new(":memory:")).unwrap();
run_migrations(&conn).unwrap();
// Insert a project
conn.execute(
"INSERT INTO projects (id, gitlab_project_id, path_with_namespace, web_url)
VALUES (1, 100, 'group/project', 'https://gitlab.example.com/group/project')",
[],
)
.unwrap();
// Insert two issues
conn.execute(
"INSERT INTO issues (id, gitlab_id, iid, project_id, title, state, author_username, created_at, updated_at, last_seen_at)
VALUES (10, 1001, 1, 1, 'Issue One', 'opened', 'alice', 1000, 2000, 3000)",
[],
)
.unwrap();
conn.execute(
"INSERT INTO issues (id, gitlab_id, iid, project_id, title, state, author_username, created_at, updated_at, last_seen_at)
VALUES (20, 1002, 2, 1, 'Issue Two', 'opened', 'bob', 1000, 2000, 3000)",
[],
)
.unwrap();
conn
}
#[test]
fn test_store_issue_links_creates_bidirectional_references() {
let conn = setup_test_db();
let links = vec![GitLabIssueLink {
id: 999,
iid: 2,
project_id: 100, // same project
title: "Issue Two".to_string(),
state: "opened".to_string(),
web_url: "https://gitlab.example.com/group/project/-/issues/2".to_string(),
link_type: "relates_to".to_string(),
link_created_at: None,
}];
let stored = store_issue_links(&conn, 1, 10, 1, &links).unwrap();
assert_eq!(stored, 2, "Should create 2 references (forward + reverse)");
// Verify forward reference: issue 10 (iid 1) -> issue 20 (iid 2)
let forward_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM entity_references
WHERE source_entity_type = 'issue' AND source_entity_id = 10
AND target_entity_type = 'issue' AND target_entity_id = 20
AND reference_type = 'related' AND source_method = 'api'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(forward_count, 1);
// Verify reverse reference: issue 20 (iid 2) -> issue 10 (iid 1)
let reverse_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM entity_references
WHERE source_entity_type = 'issue' AND source_entity_id = 20
AND target_entity_type = 'issue' AND target_entity_id = 10
AND reference_type = 'related' AND source_method = 'api'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(reverse_count, 1);
}
#[test]
fn test_self_link_skipped() {
let conn = setup_test_db();
let links = vec![GitLabIssueLink {
id: 999,
iid: 1, // same iid as source
project_id: 100,
title: "Issue One".to_string(),
state: "opened".to_string(),
web_url: "https://gitlab.example.com/group/project/-/issues/1".to_string(),
link_type: "relates_to".to_string(),
link_created_at: None,
}];
let stored = store_issue_links(&conn, 1, 10, 1, &links).unwrap();
assert_eq!(stored, 0, "Self-link should be skipped");
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM entity_references WHERE project_id = 1",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(count, 0);
}
#[test]
fn test_cross_project_link_unresolved() {
let conn = setup_test_db();
// Link to an issue in a different project (not in our DB)
let links = vec![GitLabIssueLink {
id: 999,
iid: 42,
project_id: 200, // different project, not in DB
title: "External Issue".to_string(),
state: "opened".to_string(),
web_url: "https://gitlab.example.com/other/project/-/issues/42".to_string(),
link_type: "relates_to".to_string(),
link_created_at: None,
}];
let stored = store_issue_links(&conn, 1, 10, 1, &links).unwrap();
assert_eq!(
stored, 1,
"Should create 1 forward reference (no reverse for unresolved)"
);
// Verify unresolved reference
let (target_id, target_path, target_iid): (Option<i64>, Option<String>, Option<i64>) = conn
.query_row(
"SELECT target_entity_id, target_project_path, target_entity_iid
FROM entity_references
WHERE source_entity_type = 'issue' AND source_entity_id = 10",
[],
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
)
.unwrap();
assert!(target_id.is_none(), "Target should be unresolved");
assert_eq!(
target_path.as_deref(),
Some("gitlab_project:200"),
"Should store gitlab_project fallback"
);
assert_eq!(target_iid, Some(42));
}
#[test]
fn test_duplicate_links_idempotent() {
let conn = setup_test_db();
let links = vec![GitLabIssueLink {
id: 999,
iid: 2,
project_id: 100,
title: "Issue Two".to_string(),
state: "opened".to_string(),
web_url: "https://gitlab.example.com/group/project/-/issues/2".to_string(),
link_type: "relates_to".to_string(),
link_created_at: None,
}];
// Store twice
let stored1 = store_issue_links(&conn, 1, 10, 1, &links).unwrap();
let stored2 = store_issue_links(&conn, 1, 10, 1, &links).unwrap();
assert_eq!(stored1, 2);
assert_eq!(
stored2, 0,
"Second insert should be idempotent (INSERT OR IGNORE)"
);
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM entity_references WHERE project_id = 1 AND reference_type = 'related'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(count, 2, "Should still have exactly 2 references");
}
#[test]
fn test_issue_link_deserialization() {
let json = r#"[
{
"id": 123,
"iid": 42,
"project_id": 100,
"title": "Linked Issue",
"state": "opened",
"web_url": "https://gitlab.example.com/group/project/-/issues/42",
"link_type": "relates_to",
"link_created_at": "2026-01-15T10:30:00.000Z"
},
{
"id": 456,
"iid": 99,
"project_id": 200,
"title": "Blocking Issue",
"state": "closed",
"web_url": "https://gitlab.example.com/other/project/-/issues/99",
"link_type": "blocks",
"link_created_at": null
}
]"#;
let links: Vec<GitLabIssueLink> = serde_json::from_str(json).unwrap();
assert_eq!(links.len(), 2);
assert_eq!(links[0].iid, 42);
assert_eq!(links[0].link_type, "relates_to");
assert_eq!(
links[0].link_created_at.as_deref(),
Some("2026-01-15T10:30:00.000Z")
);
assert_eq!(links[1].link_type, "blocks");
assert!(links[1].link_created_at.is_none());
}
#[test]
fn test_update_issue_links_watermark() {
let conn = setup_test_db();
// Initially NULL
let wm: Option<i64> = conn
.query_row(
"SELECT issue_links_synced_for_updated_at FROM issues WHERE id = 10",
[],
|row| row.get(0),
)
.unwrap();
assert!(wm.is_none());
// Update watermark
update_issue_links_watermark(&conn, 10).unwrap();
// Should now equal updated_at (2000)
let wm: Option<i64> = conn
.query_row(
"SELECT issue_links_synced_for_updated_at FROM issues WHERE id = 10",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(wm, Some(2000));
}
}

View File

@@ -1,6 +1,7 @@
pub mod dirty_tracker;
pub mod discussion_queue;
pub mod discussions;
pub mod issue_links;
pub mod issues;
pub mod merge_requests;
pub mod mr_diffs;

View File

@@ -45,6 +45,9 @@ pub enum ProgressEvent {
MrDiffsFetchStarted { total: usize },
MrDiffFetched { current: usize, total: usize },
MrDiffsFetchComplete { fetched: usize, failed: usize },
IssueLinksFetchStarted { total: usize },
IssueLinkFetched { current: usize, total: usize },
IssueLinksFetchComplete { fetched: usize, failed: usize },
StatusEnrichmentStarted { total: usize },
StatusEnrichmentPageFetched { items_so_far: usize },
StatusEnrichmentWriting { total: usize },
@@ -64,6 +67,8 @@ pub struct IngestProjectResult {
pub issues_skipped_discussion_sync: usize,
pub resource_events_fetched: usize,
pub resource_events_failed: usize,
pub issue_links_fetched: usize,
pub issue_links_failed: usize,
pub statuses_enriched: usize,
pub statuses_cleared: usize,
pub statuses_seen: usize,
@@ -357,6 +362,27 @@ pub async fn ingest_project_issues_with_progress(
}
}
// ── Issue Links ──────────────────────────────────────────────────
if config.sync.fetch_issue_links && !signal.is_cancelled() {
let enqueued = enqueue_issue_links(conn, project_id)?;
if enqueued > 0 {
debug!(enqueued, "Enqueued issue_links jobs");
}
let drain_result = drain_issue_links(
conn,
client,
config,
project_id,
gitlab_project_id,
&progress,
signal,
)
.await?;
result.issue_links_fetched = drain_result.fetched;
result.issue_links_failed = drain_result.failed;
}
debug!(
summary = crate::ingestion::nonzero_summary(&[
("fetched", result.issues_fetched),
@@ -368,6 +394,8 @@ pub async fn ingest_project_issues_with_progress(
("skipped", result.issues_skipped_discussion_sync),
("events", result.resource_events_fetched),
("event errors", result.resource_events_failed),
("links", result.issue_links_fetched),
("link errors", result.issue_links_failed),
]),
"Project complete"
);
@@ -1441,6 +1469,233 @@ pub(crate) fn store_closes_issues_refs(
Ok(())
}
// ─── Issue Links ────────────────────────────────────────────────────────────
fn enqueue_issue_links(conn: &Connection, project_id: i64) -> Result<usize> {
// Remove stale jobs for issues that haven't changed since their last issue_links sync
conn.execute(
"DELETE FROM pending_dependent_fetches \
WHERE project_id = ?1 AND entity_type = 'issue' AND job_type = 'issue_links' \
AND entity_local_id IN ( \
SELECT id FROM issues \
WHERE project_id = ?1 \
AND updated_at <= COALESCE(issue_links_synced_for_updated_at, 0) \
)",
[project_id],
)?;
let mut stmt = conn.prepare_cached(
"SELECT id, iid FROM issues \
WHERE project_id = ?1 \
AND updated_at > COALESCE(issue_links_synced_for_updated_at, 0)",
)?;
let entities: Vec<(i64, i64)> = stmt
.query_map([project_id], |row| Ok((row.get(0)?, row.get(1)?)))?
.collect::<std::result::Result<Vec<_>, _>>()?;
let mut enqueued = 0;
for (local_id, iid) in &entities {
if enqueue_job(
conn,
project_id,
"issue",
*iid,
*local_id,
"issue_links",
None,
)? {
enqueued += 1;
}
}
Ok(enqueued)
}
struct PrefetchedIssueLinks {
job_id: i64,
entity_iid: i64,
entity_local_id: i64,
result: std::result::Result<
Vec<crate::gitlab::types::GitLabIssueLink>,
crate::core::error::LoreError,
>,
}
async fn prefetch_issue_links(
client: &GitLabClient,
gitlab_project_id: i64,
job_id: i64,
entity_iid: i64,
entity_local_id: i64,
) -> PrefetchedIssueLinks {
let result = client
.fetch_issue_links(gitlab_project_id, entity_iid)
.await;
PrefetchedIssueLinks {
job_id,
entity_iid,
entity_local_id,
result,
}
}
#[instrument(
skip(conn, client, config, progress, signal),
fields(project_id, gitlab_project_id, items_processed, errors)
)]
async fn drain_issue_links(
conn: &Connection,
client: &GitLabClient,
config: &Config,
project_id: i64,
gitlab_project_id: i64,
progress: &Option<ProgressCallback>,
signal: &ShutdownSignal,
) -> Result<DrainResult> {
let mut result = DrainResult::default();
let batch_size = config.sync.dependent_concurrency as usize;
let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?;
if reclaimed > 0 {
debug!(reclaimed, "Reclaimed stale issue_links locks");
}
let claimable_counts = count_claimable_jobs(conn, project_id)?;
let total_pending = claimable_counts.get("issue_links").copied().unwrap_or(0);
if total_pending == 0 {
return Ok(result);
}
let emit = |event: ProgressEvent| {
if let Some(cb) = progress {
cb(event);
}
};
emit(ProgressEvent::IssueLinksFetchStarted {
total: total_pending,
});
let mut processed = 0;
let mut seen_job_ids = std::collections::HashSet::new();
loop {
if signal.is_cancelled() {
debug!("Shutdown requested during issue_links drain");
break;
}
let jobs = claim_jobs(conn, "issue_links", project_id, batch_size)?;
if jobs.is_empty() {
break;
}
// Phase 1: Concurrent HTTP fetches
let futures: Vec<_> = jobs
.iter()
.filter(|j| seen_job_ids.insert(j.id))
.map(|j| {
prefetch_issue_links(
client,
gitlab_project_id,
j.id,
j.entity_iid,
j.entity_local_id,
)
})
.collect();
if futures.is_empty() {
warn!("All claimed issue_links jobs were already processed");
break;
}
let prefetched = futures::future::join_all(futures).await;
// Phase 2: Serial DB writes
for p in prefetched {
match p.result {
Ok(links) => {
let tx = conn.unchecked_transaction()?;
let store_result = crate::ingestion::issue_links::store_issue_links(
&tx,
project_id,
p.entity_local_id,
p.entity_iid,
&links,
);
match store_result {
Ok(stored) => {
complete_job_tx(&tx, p.job_id)?;
crate::ingestion::issue_links::update_issue_links_watermark_tx(
&tx,
p.entity_local_id,
)?;
tx.commit()?;
result.fetched += 1;
if stored > 0 {
debug!(
entity_iid = p.entity_iid,
stored, "Stored issue link references"
);
}
}
Err(e) => {
drop(tx);
warn!(
entity_iid = p.entity_iid,
error = %e,
"Failed to store issue link references"
);
fail_job(conn, p.job_id, &e.to_string())?;
result.failed += 1;
}
}
}
Err(e) => {
let is_not_found = matches!(&e, crate::core::error::LoreError::NotFound(_));
if is_not_found {
debug!(
entity_iid = p.entity_iid,
"Issue not found for links (probably deleted)"
);
let tx = conn.unchecked_transaction()?;
complete_job_tx(&tx, p.job_id)?;
tx.commit()?;
result.skipped_not_found += 1;
} else {
warn!(
entity_iid = p.entity_iid,
error = %e,
"HTTP error fetching issue links"
);
fail_job(conn, p.job_id, &e.to_string())?;
result.failed += 1;
}
}
}
processed += 1;
emit(ProgressEvent::IssueLinkFetched {
current: processed,
total: total_pending,
});
}
}
emit(ProgressEvent::IssueLinksFetchComplete {
fetched: result.fetched,
failed: result.failed,
});
tracing::Span::current().record("items_processed", result.fetched);
tracing::Span::current().record("errors", result.failed);
Ok(result)
}
// ─── MR Diffs (file changes) ────────────────────────────────────────────────
fn enqueue_mr_diffs_jobs(conn: &Connection, project_id: i64) -> Result<usize> {

View File

@@ -2231,6 +2231,9 @@ async fn handle_sync_cmd(
if args.no_status {
config.sync.fetch_work_item_status = false;
}
if args.no_issue_links {
config.sync.fetch_issue_links = false;
}
// Dedup surgical IIDs
let mut issue_iids = args.issue;
let mut mr_iids = args.mr;
@@ -2593,7 +2596,7 @@ fn handle_robot_docs(robot_mode: bool, brief: bool) -> Result<(), Box<dyn std::e
},
"sync": {
"description": "Full sync pipeline: ingest -> generate-docs -> embed. Supports surgical per-IID sync with --issue/--mr.",
"flags": ["--full", "--no-full", "--force", "--no-force", "--no-embed", "--no-docs", "--no-events", "--no-file-changes", "--no-status", "--dry-run", "--no-dry-run", "--issue <IID>", "--mr <IID>", "-p/--project <path>", "--preflight-only"],
"flags": ["--full", "--no-full", "--force", "--no-force", "--no-embed", "--no-docs", "--no-events", "--no-file-changes", "--no-status", "--no-issue-links", "--dry-run", "--no-dry-run", "--issue <IID>", "--mr <IID>", "-p/--project <path>", "--preflight-only"],
"example": "lore --robot sync",
"notes": {
"surgical_sync": "Pass --issue <IID> and/or --mr <IID> (repeatable) with -p <project> to sync specific entities instead of a full pipeline. Incompatible with --full.",