feat: Add commit SHAs, closes_issues watermark, and PRD alignment

Migration 015 adds merge_commit_sha/squash_commit_sha to merge_requests
(Gate 4/5 prerequisites), closes_issues_synced_for_updated_at watermark
for incremental sync, and the missing idx_label_events_label index.

The MR transformer and ingestion pipeline now populate commit SHAs during
sync. The orchestrator uses watermark-based filtering for closes_issues
jobs instead of re-enqueuing all MRs every sync.

The Phase B PRD is updated to match the actual codebase: corrected
migration numbering (011-015), documented nullable label/milestone
fields (migration 012), watermark patterns (013), observability
infrastructure (014), simplified source_method values, and updated
entity_references schema to match implementation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Taylor Eernisse
2026-02-05 15:29:51 -05:00
parent ddcfff1026
commit 233eb546af
7 changed files with 189 additions and 63 deletions

View File

@@ -785,9 +785,32 @@ fn update_resource_event_watermark(
Ok(())
}
fn update_closes_issues_watermark(conn: &Connection, mr_local_id: i64) -> Result<()> {
conn.execute(
"UPDATE merge_requests SET closes_issues_synced_for_updated_at = updated_at WHERE id = ?",
[mr_local_id],
)?;
Ok(())
}
fn enqueue_mr_closes_issues_jobs(conn: &Connection, project_id: i64) -> Result<usize> {
let mut stmt =
conn.prepare_cached("SELECT id, iid FROM merge_requests WHERE project_id = ?1")?;
// Remove stale jobs for MRs that haven't changed since their last closes_issues sync
conn.execute(
"DELETE FROM pending_dependent_fetches \
WHERE project_id = ?1 AND entity_type = 'merge_request' AND job_type = 'mr_closes_issues' \
AND entity_local_id IN ( \
SELECT id FROM merge_requests \
WHERE project_id = ?1 \
AND updated_at <= COALESCE(closes_issues_synced_for_updated_at, 0) \
)",
[project_id],
)?;
let mut stmt = conn.prepare_cached(
"SELECT id, iid FROM merge_requests \
WHERE project_id = ?1 \
AND updated_at > COALESCE(closes_issues_synced_for_updated_at, 0)",
)?;
let entities: Vec<(i64, i64)> = stmt
.query_map([project_id], |row| Ok((row.get(0)?, row.get(1)?)))?
.collect::<std::result::Result<Vec<_>, _>>()?;
@@ -886,6 +909,7 @@ async fn drain_mr_closes_issues(
match store_result {
Ok(()) => {
complete_job(conn, job.id)?;
update_closes_issues_watermark(conn, job.entity_local_id)?;
result.fetched += 1;
}
Err(e) => {
@@ -907,6 +931,7 @@ async fn drain_mr_closes_issues(
"Permanent API error for closes_issues, marking complete"
);
complete_job(conn, job.id)?;
update_closes_issues_watermark(conn, job.entity_local_id)?;
result.skipped_not_found += 1;
} else {
warn!(