feat(who): expand expert + overlap queries with mr_file_changes and mr_reviewers

Chain: bd-jec (config flag) -> bd-2yo (fetch MR diffs) -> bd-3qn6 (rewrite who queries)

- Add fetch_mr_file_changes config option and --no-file-changes CLI flag
- Add GitLab MR diffs API fetch pipeline with watermark-based sync
- Create migration 020 for diffs_synced_for_updated_at watermark column
- Rewrite query_expert() and query_overlap() to use 4-signal UNION ALL:
  DiffNote reviewers, DiffNote MR authors, file-change authors, file-change reviewers
- Deduplicate across signal types via COUNT(DISTINCT CASE WHEN ... THEN mr_id END)
- Add insert_file_change test helper, 8 new who tests, all 397 tests pass
- Also includes: list performance migration 019, autocorrect module, README updates

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Taylor Eernisse
2026-02-08 13:35:14 -05:00
parent 435a208c93
commit 95b7183add
19 changed files with 2139 additions and 291 deletions

View File

@@ -3,6 +3,7 @@ pub mod discussion_queue;
pub mod discussions;
pub mod issues;
pub mod merge_requests;
pub mod mr_diffs;
pub mod mr_discussions;
pub mod orchestrator;

268
src/ingestion/mr_diffs.rs Normal file
View File

@@ -0,0 +1,268 @@
use rusqlite::Connection;
use tracing::debug;
use crate::core::error::Result;
use crate::gitlab::types::GitLabMrDiff;
/// Derive the change type from GitLab's boolean flags.
fn derive_change_type(diff: &GitLabMrDiff) -> &'static str {
if diff.new_file {
"added"
} else if diff.renamed_file {
"renamed"
} else if diff.deleted_file {
"deleted"
} else {
"modified"
}
}
/// Replace all file change records for a given MR with the provided diffs.
/// Uses DELETE+INSERT (simpler than UPSERT for array replacement).
pub fn upsert_mr_file_changes(
conn: &Connection,
mr_local_id: i64,
project_id: i64,
diffs: &[GitLabMrDiff],
) -> Result<usize> {
conn.execute(
"DELETE FROM mr_file_changes WHERE merge_request_id = ?1",
[mr_local_id],
)?;
let mut stmt = conn.prepare_cached(
"INSERT INTO mr_file_changes (merge_request_id, project_id, old_path, new_path, change_type) \
VALUES (?1, ?2, ?3, ?4, ?5)",
)?;
let mut inserted = 0;
for diff in diffs {
let old_path = if diff.renamed_file {
Some(diff.old_path.as_str())
} else {
None
};
let change_type = derive_change_type(diff);
stmt.execute(rusqlite::params![
mr_local_id,
project_id,
old_path,
diff.new_path,
change_type,
])?;
inserted += 1;
}
if inserted > 0 {
debug!(inserted, mr_local_id, "Stored MR file changes");
}
Ok(inserted)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::db::{create_connection, run_migrations};
use std::path::Path;
fn setup() -> Connection {
let conn = create_connection(Path::new(":memory:")).unwrap();
run_migrations(&conn).unwrap();
// Insert a test project
conn.execute(
"INSERT INTO projects (gitlab_project_id, path_with_namespace, web_url) VALUES (1, 'group/repo', 'https://gitlab.com/group/repo')",
[],
).unwrap();
// Insert a test MR
conn.execute(
"INSERT INTO merge_requests (gitlab_id, iid, project_id, title, state, draft, source_branch, target_branch, author_username, created_at, updated_at, last_seen_at) \
VALUES (100, 1, 1, 'Test MR', 'merged', 0, 'feature', 'main', 'testuser', 1000, 2000, 3000)",
[],
).unwrap();
conn
}
#[test]
fn test_derive_change_type_added() {
let diff = GitLabMrDiff {
old_path: String::new(),
new_path: "src/new.rs".to_string(),
new_file: true,
renamed_file: false,
deleted_file: false,
};
assert_eq!(derive_change_type(&diff), "added");
}
#[test]
fn test_derive_change_type_renamed() {
let diff = GitLabMrDiff {
old_path: "src/old.rs".to_string(),
new_path: "src/new.rs".to_string(),
new_file: false,
renamed_file: true,
deleted_file: false,
};
assert_eq!(derive_change_type(&diff), "renamed");
}
#[test]
fn test_derive_change_type_deleted() {
let diff = GitLabMrDiff {
old_path: "src/gone.rs".to_string(),
new_path: "src/gone.rs".to_string(),
new_file: false,
renamed_file: false,
deleted_file: true,
};
assert_eq!(derive_change_type(&diff), "deleted");
}
#[test]
fn test_derive_change_type_modified() {
let diff = GitLabMrDiff {
old_path: "src/lib.rs".to_string(),
new_path: "src/lib.rs".to_string(),
new_file: false,
renamed_file: false,
deleted_file: false,
};
assert_eq!(derive_change_type(&diff), "modified");
}
#[test]
fn test_upsert_inserts_file_changes() {
let conn = setup();
let diffs = [
GitLabMrDiff {
old_path: String::new(),
new_path: "src/new.rs".to_string(),
new_file: true,
renamed_file: false,
deleted_file: false,
},
GitLabMrDiff {
old_path: "src/lib.rs".to_string(),
new_path: "src/lib.rs".to_string(),
new_file: false,
renamed_file: false,
deleted_file: false,
},
];
let inserted = upsert_mr_file_changes(&conn, 1, 1, &diffs).unwrap();
assert_eq!(inserted, 2);
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM mr_file_changes WHERE merge_request_id = 1",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(count, 2);
}
#[test]
fn test_upsert_replaces_existing() {
let conn = setup();
let diffs_v1 = [GitLabMrDiff {
old_path: String::new(),
new_path: "src/old.rs".to_string(),
new_file: true,
renamed_file: false,
deleted_file: false,
}];
upsert_mr_file_changes(&conn, 1, 1, &diffs_v1).unwrap();
let diffs_v2 = [
GitLabMrDiff {
old_path: "src/a.rs".to_string(),
new_path: "src/a.rs".to_string(),
new_file: false,
renamed_file: false,
deleted_file: false,
},
GitLabMrDiff {
old_path: "src/b.rs".to_string(),
new_path: "src/b.rs".to_string(),
new_file: false,
renamed_file: false,
deleted_file: false,
},
];
let inserted = upsert_mr_file_changes(&conn, 1, 1, &diffs_v2).unwrap();
assert_eq!(inserted, 2);
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM mr_file_changes WHERE merge_request_id = 1",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(count, 2);
// The old "src/old.rs" should be gone
let old_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM mr_file_changes WHERE new_path = 'src/old.rs'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(old_count, 0);
}
#[test]
fn test_renamed_stores_old_path() {
let conn = setup();
let diffs = [GitLabMrDiff {
old_path: "src/old_name.rs".to_string(),
new_path: "src/new_name.rs".to_string(),
new_file: false,
renamed_file: true,
deleted_file: false,
}];
upsert_mr_file_changes(&conn, 1, 1, &diffs).unwrap();
let (old_path, change_type): (Option<String>, String) = conn
.query_row(
"SELECT old_path, change_type FROM mr_file_changes WHERE new_path = 'src/new_name.rs'",
[],
|r| Ok((r.get(0)?, r.get(1)?)),
)
.unwrap();
assert_eq!(old_path.as_deref(), Some("src/old_name.rs"));
assert_eq!(change_type, "renamed");
}
#[test]
fn test_non_renamed_has_null_old_path() {
let conn = setup();
let diffs = [GitLabMrDiff {
old_path: "src/lib.rs".to_string(),
new_path: "src/lib.rs".to_string(),
new_file: false,
renamed_file: false,
deleted_file: false,
}];
upsert_mr_file_changes(&conn, 1, 1, &diffs).unwrap();
let old_path: Option<String> = conn
.query_row(
"SELECT old_path FROM mr_file_changes WHERE new_path = 'src/lib.rs'",
[],
|r| r.get(0),
)
.unwrap();
assert!(old_path.is_none());
}
}

View File

@@ -42,6 +42,9 @@ pub enum ProgressEvent {
ClosesIssuesFetchStarted { total: usize },
ClosesIssueFetched { current: usize, total: usize },
ClosesIssuesFetchComplete { fetched: usize, failed: usize },
MrDiffsFetchStarted { total: usize },
MrDiffFetched { current: usize, total: usize },
MrDiffsFetchComplete { fetched: usize, failed: usize },
}
#[derive(Debug, Default)]
@@ -76,6 +79,8 @@ pub struct IngestMrProjectResult {
pub resource_events_failed: usize,
pub closes_issues_fetched: usize,
pub closes_issues_failed: usize,
pub mr_diffs_fetched: usize,
pub mr_diffs_failed: usize,
}
pub async fn ingest_project_issues(
@@ -466,6 +471,31 @@ pub async fn ingest_project_merge_requests_with_progress(
result.closes_issues_failed = closes_result.failed;
}
if signal.is_cancelled() {
info!("Shutdown requested, returning partial MR results");
return Ok(result);
}
if config.sync.fetch_mr_file_changes {
let enqueued = enqueue_mr_diffs_jobs(conn, project_id)?;
if enqueued > 0 {
debug!(enqueued, "Enqueued mr_diffs jobs");
}
let diffs_result = drain_mr_diffs(
conn,
client,
config,
project_id,
gitlab_project_id,
&progress,
signal,
)
.await?;
result.mr_diffs_fetched = diffs_result.fetched;
result.mr_diffs_failed = diffs_result.failed;
}
info!(
mrs_fetched = result.mrs_fetched,
mrs_upserted = result.mrs_upserted,
@@ -479,6 +509,8 @@ pub async fn ingest_project_merge_requests_with_progress(
resource_events_failed = result.resource_events_failed,
closes_issues_fetched = result.closes_issues_fetched,
closes_issues_failed = result.closes_issues_failed,
mr_diffs_fetched = result.mr_diffs_fetched,
mr_diffs_failed = result.mr_diffs_failed,
"MR project ingestion complete"
);
@@ -1188,6 +1220,235 @@ fn store_closes_issues_refs(
}
}
// ─── MR Diffs (file changes) ────────────────────────────────────────────────
fn enqueue_mr_diffs_jobs(conn: &Connection, project_id: i64) -> Result<usize> {
// Remove stale jobs for MRs that haven't changed since their last diffs sync
conn.execute(
"DELETE FROM pending_dependent_fetches \
WHERE project_id = ?1 AND entity_type = 'merge_request' AND job_type = 'mr_diffs' \
AND entity_local_id IN ( \
SELECT id FROM merge_requests \
WHERE project_id = ?1 \
AND updated_at <= COALESCE(diffs_synced_for_updated_at, 0) \
)",
[project_id],
)?;
let mut stmt = conn.prepare_cached(
"SELECT id, iid FROM merge_requests \
WHERE project_id = ?1 \
AND updated_at > COALESCE(diffs_synced_for_updated_at, 0)",
)?;
let entities: Vec<(i64, i64)> = stmt
.query_map([project_id], |row| Ok((row.get(0)?, row.get(1)?)))?
.collect::<std::result::Result<Vec<_>, _>>()?;
let mut enqueued = 0;
for (local_id, iid) in &entities {
if enqueue_job(
conn,
project_id,
"merge_request",
*iid,
*local_id,
"mr_diffs",
None,
)? {
enqueued += 1;
}
}
Ok(enqueued)
}
struct PrefetchedMrDiffs {
job_id: i64,
entity_iid: i64,
entity_local_id: i64,
result:
std::result::Result<Vec<crate::gitlab::types::GitLabMrDiff>, crate::core::error::LoreError>,
}
async fn prefetch_mr_diffs(
client: &GitLabClient,
gitlab_project_id: i64,
job_id: i64,
entity_iid: i64,
entity_local_id: i64,
) -> PrefetchedMrDiffs {
let result = client.fetch_mr_diffs(gitlab_project_id, entity_iid).await;
PrefetchedMrDiffs {
job_id,
entity_iid,
entity_local_id,
result,
}
}
#[instrument(
skip(conn, client, config, progress, signal),
fields(project_id, gitlab_project_id, items_processed, errors)
)]
async fn drain_mr_diffs(
conn: &Connection,
client: &GitLabClient,
config: &Config,
project_id: i64,
gitlab_project_id: i64,
progress: &Option<ProgressCallback>,
signal: &ShutdownSignal,
) -> Result<DrainResult> {
let mut result = DrainResult::default();
let batch_size = config.sync.dependent_concurrency as usize;
let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?;
if reclaimed > 0 {
info!(reclaimed, "Reclaimed stale mr_diffs locks");
}
let claimable_counts = count_claimable_jobs(conn, project_id)?;
let total_pending = claimable_counts.get("mr_diffs").copied().unwrap_or(0);
if total_pending == 0 {
return Ok(result);
}
let emit = |event: ProgressEvent| {
if let Some(cb) = progress {
cb(event);
}
};
emit(ProgressEvent::MrDiffsFetchStarted {
total: total_pending,
});
let mut processed = 0;
let mut seen_job_ids = std::collections::HashSet::new();
loop {
if signal.is_cancelled() {
info!("Shutdown requested during mr_diffs drain, returning partial results");
break;
}
let jobs = claim_jobs(conn, "mr_diffs", project_id, batch_size)?;
if jobs.is_empty() {
break;
}
// Phase 1: Concurrent HTTP fetches
let futures: Vec<_> = jobs
.iter()
.filter(|j| seen_job_ids.insert(j.id))
.map(|j| {
prefetch_mr_diffs(
client,
gitlab_project_id,
j.id,
j.entity_iid,
j.entity_local_id,
)
})
.collect();
if futures.is_empty() {
warn!("All claimed mr_diffs jobs were already processed, breaking drain loop");
break;
}
let prefetched = join_all(futures).await;
// Phase 2: Serial DB writes
for p in prefetched {
match p.result {
Ok(diffs) => {
let store_result = super::mr_diffs::upsert_mr_file_changes(
conn,
p.entity_local_id,
project_id,
&diffs,
);
match store_result {
Ok(_) => {
let tx = conn.unchecked_transaction()?;
complete_job_tx(&tx, p.job_id)?;
update_diffs_watermark_tx(&tx, p.entity_local_id)?;
tx.commit()?;
result.fetched += 1;
}
Err(e) => {
warn!(
entity_iid = p.entity_iid,
error = %e,
"Failed to store MR file changes"
);
fail_job(conn, p.job_id, &e.to_string())?;
result.failed += 1;
}
}
}
Err(e) => {
if e.is_permanent_api_error() {
debug!(
entity_iid = p.entity_iid,
error = %e,
"Permanent API error for mr_diffs, marking complete"
);
let tx = conn.unchecked_transaction()?;
complete_job_tx(&tx, p.job_id)?;
update_diffs_watermark_tx(&tx, p.entity_local_id)?;
tx.commit()?;
result.skipped_not_found += 1;
} else {
warn!(
entity_iid = p.entity_iid,
error = %e,
"Failed to fetch MR diffs from GitLab"
);
fail_job(conn, p.job_id, &e.to_string())?;
result.failed += 1;
}
}
}
processed += 1;
emit(ProgressEvent::MrDiffFetched {
current: processed,
total: total_pending,
});
}
}
emit(ProgressEvent::MrDiffsFetchComplete {
fetched: result.fetched,
failed: result.failed,
});
if result.fetched > 0 || result.failed > 0 {
info!(
fetched = result.fetched,
failed = result.failed,
"mr_diffs drain complete"
);
}
tracing::Span::current().record("items_processed", result.fetched);
tracing::Span::current().record("errors", result.failed);
Ok(result)
}
fn update_diffs_watermark_tx(tx: &rusqlite::Transaction<'_>, mr_local_id: i64) -> Result<()> {
tx.execute(
"UPDATE merge_requests SET diffs_synced_for_updated_at = updated_at WHERE id = ?",
[mr_local_id],
)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;