80 KiB
Checkpoint 2: MR Ingestion - PRD
Note: The project was renamed from "gitlab-inbox" to "gitlore" and the CLI from "gi" to "lore". References to "gi" in this document should be read as "lore".
Version: 1.3 Status: Ready for Implementation Depends On: Checkpoint 1 (Issue Ingestion) Enables: Checkpoint 3A (Document Generation + FTS)
Overview
Objective
Ingest all merge requests, MR discussions, and notes (including DiffNote position metadata) from configured GitLab repositories. This checkpoint extends the cursor-based sync pattern established in CP1 to merge requests, capturing code review context that is essential for decision traceability.
Success Criteria
| Criterion | Validation |
|---|---|
gi ingest --type=merge_requests fetches all MRs |
gi count mrs matches GitLab UI |
| MR labels extracted and linked correctly | Label count per MR matches GitLab |
| MR assignees and reviewers captured | Junction tables populated |
| MR merge status is future-proof | detailed_merge_status populated; no reliance on deprecated merge_status |
| MR merge actor is future-proof | merge_user_username populated when merged/auto-merge set |
| Draft MRs are captured | draft populated and visible in gi list mrs |
| Draft is backwards-compatible | Older instances using work_in_progress still populate draft |
| MR state is complete | state supports `opened |
| MR head SHA captured | head_sha populated for CP3 diff/commit context |
| MR references captured | references_short and references_full populated for CP3 cross-project display |
| MR discussions fetched per-MR (dependent sync) | For MRs whose updated_at advanced, discussions and notes upserted |
| DiffNote position metadata captured | position_new_path, position_old_path, position_type populated for DiffNotes |
| DiffNote SHA triplet captured | position_base_sha, position_start_sha, position_head_sha populated for DiffNotes |
| Cursor-based sync is resumable | Re-running fetches 0 new items |
| Discussion sync skips unchanged MRs | Per-MR watermark prevents redundant fetches |
| Watermark not advanced on partial failure | Incomplete pagination OR parse failures do not mark MR as synced |
| Full sync resets discussion watermarks | --full flag resets both MR cursor AND discussion watermarks |
| Pagination uses robust fallback chain | Link header > x-next-page > full-page heuristic |
| MR CLI commands functional | gi show mr, gi count mrs work |
Internal Gates
CP2 is validated incrementally via internal gates:
| Gate | Scope | Validation |
|---|---|---|
| Gate A | MRs only | Cursor + upsert + raw payloads + list/count working; locked state (local filtering), work_in_progress fallback, head_sha + references captured; Link header pagination with fallback chain |
| Gate B | Labels + assignees + reviewers correct | Junction tables populated; counts match GitLab |
| Gate C | Dependent discussion sync | Watermark prevents redundant refetch; DiffNote paths + SHA triplet captured; upsert + sweep for notes; watermark does NOT advance on partial pagination failure OR note parse failures; atomic note replacement (parse before delete) |
| Gate D | Resumability proof | Kill mid-run, rerun; confirm bounded redo; --full resets cursor AND discussion watermarks |
| Gate E | CLI complete | gi show mr displays discussions with DiffNote file context |
Deliverables
1. Project Structure Additions
Add the following to the existing Rust structure from Checkpoint 1:
gitlab-inbox/
├── src/
│ ├── cli/
│ │ └── commands/
│ │ ├── list.rs # Update: add MR listing
│ │ ├── show.rs # Update: add MR detail view
│ │ └── count.rs # Update: add MR counting
│ ├── gitlab/
│ │ ├── types.rs # Add GitLabMergeRequest, GitLabReviewer
│ │ └── transformers/
│ │ └── merge_request.rs # NEW: GitLab -> normalized MR
│ └── ingestion/
│ ├── merge_requests.rs # NEW: MR fetcher with pagination
│ ├── mr_discussions.rs # NEW: MR discussion fetcher
│ └── orchestrator.rs # Update: support MR ingestion
├── tests/
│ ├── mr_transformer_tests.rs
│ ├── mr_ingestion_tests.rs
│ ├── mr_discussion_tests.rs
│ ├── diffnote_tests.rs
│ └── fixtures/
│ ├── gitlab_merge_request.json
│ ├── gitlab_merge_requests_page.json
│ ├── gitlab_mr_discussion.json
│ ├── gitlab_mr_discussion_with_diffnote.json
│ └── gitlab_diffnote_position.json
└── migrations/
└── 006_merge_requests.sql
2. GitLab API Endpoints
Merge Requests (Bulk Fetch):
GET /projects/:id/merge_requests?scope=all&state=all&updated_after=X&order_by=updated_at&sort=asc&per_page=100
MR Discussions (Per-MR Fetch):
GET /projects/:id/merge_requests/:iid/discussions?per_page=100&page=N
Required Query Parameters:
scope=all- Include all MRs, not just authored by current userstate=all- Include merged/closed MRs (GitLab defaults may exclude them)
Key Differences from Issues:
- MRs have
source_branchandtarget_branch - MRs have
merge_status,merged_by,merged_atfields - MRs have reviewers (separate from assignees)
- MR discussions can contain DiffNotes with position metadata
MR State Values:
GitLab MR state can be opened, closed, merged, or locked. The locked state is a transitional state that indicates an MR is in the middle of being merged (merge-in-progress). Key considerations:
- Store as first-class: Persist
lockedin the database without coercion to preserve GitLab's exact state. - Local filtering only: The GitLab API's
statefilter does not supportlockedas a query parameter. To filter for locked MRs, we must usestate=allserver-side and filter locally via SQLWHERE state = 'locked'. - Transient nature: Most MRs in
lockedstate will transition tomergedwithin seconds/minutes. Expect very few locked MRs in typical queries. - CLI exposure: The
--state=lockedfilter is available for debugging/inspection but is not expected to be commonly used.
Reviewer Verification (Optional / Debug Mode): When reviewer data is suspected to be incomplete on a given GitLab version, enable an optional verification mode:
GET /projects/:id/merge_requests/:merge_request_iid/reviewers
This is disabled by default to avoid N+1 overhead. Activated via config flag sync.verify_reviewers: true for debugging problematic instances.
Pagination:
- Primary: Parse
Link: <url>; rel="next"header (RFC 8288). This is the most reliable signal as it works consistently across GitLab versions and proxy configurations. - Fallback 1: If no
Linkheader, checkx-next-pageheader until empty/absent - Fallback 2: If no pagination headers but current page returned
per_pageitems, attemptpage + 1and stop only when an empty response is received. This handles proxies/instances that strip headers. - Per-page maximum: 100
- Optional keyset mode: For projects with 10,000+ MRs, enable keyset pagination via config flag
sync.use_keyset_pagination: true. This usespagination=keyset&order_by=updated_at&sort=ascwith cursor tokens from Link headers for O(1) page fetches instead of O(n) offset scans.
Database Schema
Migration 006_merge_requests.sql
-- Merge requests table
CREATE TABLE merge_requests (
id INTEGER PRIMARY KEY,
gitlab_id INTEGER UNIQUE NOT NULL,
project_id INTEGER NOT NULL REFERENCES projects(id),
iid INTEGER NOT NULL,
title TEXT,
description TEXT,
state TEXT, -- 'opened' | 'merged' | 'closed' | 'locked'
draft INTEGER NOT NULL DEFAULT 0, -- 0/1 (SQLite boolean) - work-in-progress status
author_username TEXT,
source_branch TEXT,
target_branch TEXT,
head_sha TEXT, -- Current commit SHA at head of source branch (CP3-ready)
references_short TEXT, -- Short reference e.g. "!123" (CP3-ready for display)
references_full TEXT, -- Full reference e.g. "group/project!123" (CP3-ready for cross-project)
detailed_merge_status TEXT, -- preferred, non-deprecated (replaces merge_status)
merge_user_username TEXT, -- preferred over deprecated merged_by
created_at INTEGER, -- ms epoch UTC
updated_at INTEGER, -- ms epoch UTC
merged_at INTEGER, -- ms epoch UTC (NULL if not merged)
closed_at INTEGER, -- ms epoch UTC (NULL if not closed)
last_seen_at INTEGER NOT NULL, -- ms epoch UTC, updated on every upsert
-- Prevents re-fetching discussions on cursor rewind / reruns unless MR changed.
discussions_synced_for_updated_at INTEGER,
-- Sync health telemetry for debuggability
discussions_sync_last_attempt_at INTEGER, -- ms epoch UTC of last sync attempt
discussions_sync_attempts INTEGER DEFAULT 0, -- count of sync attempts for this MR version
discussions_sync_last_error TEXT, -- last error message if sync failed
web_url TEXT,
raw_payload_id INTEGER REFERENCES raw_payloads(id)
);
CREATE INDEX idx_mrs_project_updated ON merge_requests(project_id, updated_at);
CREATE INDEX idx_mrs_author ON merge_requests(author_username);
CREATE INDEX idx_mrs_target_branch ON merge_requests(project_id, target_branch);
CREATE INDEX idx_mrs_source_branch ON merge_requests(project_id, source_branch);
CREATE INDEX idx_mrs_state ON merge_requests(project_id, state);
CREATE INDEX idx_mrs_detailed_merge_status ON merge_requests(project_id, detailed_merge_status);
CREATE INDEX idx_mrs_draft ON merge_requests(project_id, draft);
CREATE INDEX idx_mrs_discussions_sync ON merge_requests(project_id, discussions_synced_for_updated_at);
CREATE UNIQUE INDEX uq_mrs_project_iid ON merge_requests(project_id, iid);
-- MR-Label junction (reuses labels table from CP1)
CREATE TABLE mr_labels (
merge_request_id INTEGER REFERENCES merge_requests(id) ON DELETE CASCADE,
label_id INTEGER REFERENCES labels(id) ON DELETE CASCADE,
PRIMARY KEY(merge_request_id, label_id)
);
CREATE INDEX idx_mr_labels_label ON mr_labels(label_id);
-- MR assignees (same pattern as issue_assignees)
CREATE TABLE mr_assignees (
merge_request_id INTEGER REFERENCES merge_requests(id) ON DELETE CASCADE,
username TEXT NOT NULL,
PRIMARY KEY(merge_request_id, username)
);
CREATE INDEX idx_mr_assignees_username ON mr_assignees(username);
-- MR reviewers (MR-specific, not applicable to issues)
CREATE TABLE mr_reviewers (
merge_request_id INTEGER REFERENCES merge_requests(id) ON DELETE CASCADE,
username TEXT NOT NULL,
PRIMARY KEY(merge_request_id, username)
);
CREATE INDEX idx_mr_reviewers_username ON mr_reviewers(username);
-- Add FK constraint to discussions table for merge_request_id
-- Note: SQLite doesn't support ADD CONSTRAINT, the FK was defined in CP1 but nullable
-- We just need to add an index if not already present
CREATE INDEX IF NOT EXISTS idx_discussions_mr_id ON discussions(merge_request_id);
CREATE INDEX IF NOT EXISTS idx_discussions_mr_resolved ON discussions(merge_request_id, resolved, resolvable);
-- Additional indexes for DiffNote queries (notes table from CP1)
-- These composite indexes enable efficient file-context queries for CP3
CREATE INDEX IF NOT EXISTS idx_notes_type ON notes(note_type);
CREATE INDEX IF NOT EXISTS idx_notes_new_path ON notes(position_new_path);
CREATE INDEX IF NOT EXISTS idx_notes_new_path_line ON notes(position_new_path, position_new_line);
CREATE INDEX IF NOT EXISTS idx_notes_old_path_line ON notes(position_old_path, position_old_line);
-- CP2: capture richer diff note position shapes (minimal, still MVP)
-- These fields support modern GitLab diff note semantics without full diff reconstruction
ALTER TABLE notes ADD COLUMN position_type TEXT; -- 'text' | 'image' | 'file'
ALTER TABLE notes ADD COLUMN position_line_range_start INTEGER; -- multi-line comment start
ALTER TABLE notes ADD COLUMN position_line_range_end INTEGER; -- multi-line comment end
-- DiffNote SHA triplet for commit context (CP3-ready, zero extra API cost)
ALTER TABLE notes ADD COLUMN position_base_sha TEXT; -- Base commit SHA for diff
ALTER TABLE notes ADD COLUMN position_start_sha TEXT; -- Start commit SHA for diff
ALTER TABLE notes ADD COLUMN position_head_sha TEXT; -- Head commit SHA for diff
-- Update schema version
INSERT INTO schema_version (version, applied_at, description)
VALUES (6, strftime('%s', 'now') * 1000, 'Merge requests, MR labels, assignees, reviewers');
GitLab Types
Type Definitions
// src/gitlab/types.rs (additions)
use serde::Deserialize;
/// GitLab merge request from the API.
/// Note: Uses non-deprecated field names where possible (detailed_merge_status, merge_user).
/// Falls back gracefully for older GitLab versions.
#[derive(Debug, Clone, Deserialize)]
pub struct GitLabMergeRequest {
pub id: i64, // GitLab global ID
pub iid: i64, // Project-scoped MR number
pub project_id: i64,
pub title: String,
pub description: Option<String>,
pub state: String, // "opened" | "merged" | "closed" | "locked"
#[serde(default)]
pub draft: bool, // Work-in-progress status (preferred)
#[serde(default)]
pub work_in_progress: bool, // Deprecated; fallback for older instances
pub source_branch: String,
pub target_branch: String,
pub sha: Option<String>, // Current commit SHA at head of source branch (CP3-ready)
pub references: Option<GitLabReferences>, // Short and full reference strings (CP3-ready)
// Prefer detailed_merge_status (non-deprecated) over merge_status (deprecated)
pub detailed_merge_status: Option<String>, // "mergeable" | "not_mergeable" | "checking" | etc.
#[serde(alias = "merge_status")]
pub merge_status_legacy: Option<String>, // Keep for older/self-managed versions
pub created_at: String, // ISO 8601
pub updated_at: String, // ISO 8601
pub merged_at: Option<String>, // ISO 8601
pub closed_at: Option<String>, // ISO 8601
pub author: GitLabAuthor,
// Prefer merge_user (current) over merged_by (deprecated)
pub merge_user: Option<GitLabAuthor>,
pub merged_by: Option<GitLabAuthor>, // Keep for older/self-managed versions
#[serde(default)]
pub labels: Vec<String>, // Array of label names
#[serde(default)]
pub assignees: Vec<GitLabAuthor>,
#[serde(default)]
pub reviewers: Vec<GitLabReviewer>,
pub web_url: String,
}
/// GitLab references object (short and full reference strings).
#[derive(Debug, Clone, Deserialize)]
pub struct GitLabReferences {
pub short: String, // e.g. "!123"
pub full: String, // e.g. "group/project!123"
}
/// GitLab author from the API.
#[derive(Debug, Clone, Deserialize)]
pub struct GitLabAuthor {
pub id: i64,
pub username: String,
pub name: String,
}
/// GitLab reviewer (can have approval state).
#[derive(Debug, Clone, Deserialize)]
pub struct GitLabReviewer {
pub id: i64,
pub username: String,
pub name: String,
// Note: approval state may require additional API call, defer to post-MVP
}
// Note: GitLabDiscussion and GitLabNote types already exist from CP1
// and support MR discussions with DiffNote position metadata.
Transformers
Merge Request Transformer
// src/gitlab/transformers/merge_request.rs
use crate::core::time::{iso_to_ms, now_ms};
use crate::gitlab::types::GitLabMergeRequest;
/// Normalized merge request ready for database insertion.
#[derive(Debug, Clone)]
pub struct NormalizedMergeRequest {
pub gitlab_id: i64,
pub project_id: i64, // Local DB project ID
pub iid: i64,
pub title: String,
pub description: Option<String>,
pub state: String,
pub draft: bool,
pub author_username: String,
pub source_branch: String,
pub target_branch: String,
pub head_sha: Option<String>, // CP3-ready: current commit at source branch head
pub references_short: Option<String>, // CP3-ready: e.g. "!123"
pub references_full: Option<String>, // CP3-ready: e.g. "group/project!123"
pub detailed_merge_status: Option<String>,
pub merge_user_username: Option<String>,
pub created_at: i64, // ms epoch
pub updated_at: i64, // ms epoch
pub merged_at: Option<i64>, // ms epoch
pub closed_at: Option<i64>, // ms epoch
pub last_seen_at: i64, // ms epoch
pub web_url: String,
}
/// Normalized label ready for database insertion.
/// Reuses structure from issue transformer.
#[derive(Debug, Clone)]
pub struct NormalizedLabel {
pub project_id: i64,
pub name: String,
}
/// Result of transforming a GitLab MR.
#[derive(Debug, Clone)]
pub struct MergeRequestWithMetadata {
pub merge_request: NormalizedMergeRequest,
pub label_names: Vec<String>,
pub assignee_usernames: Vec<String>,
pub reviewer_usernames: Vec<String>,
}
/// Transform GitLab merge request to normalized schema.
pub fn transform_merge_request(
gitlab_mr: &GitLabMergeRequest,
local_project_id: i64,
) -> Result<MergeRequestWithMetadata, String> {
// Parse timestamps, return error if invalid (strict parsing, no silent zeroing)
let created_at = iso_to_ms(&gitlab_mr.created_at)
.ok_or_else(|| format!("Invalid created_at: {}", gitlab_mr.created_at))?;
let updated_at = iso_to_ms(&gitlab_mr.updated_at)
.ok_or_else(|| format!("Invalid updated_at: {}", gitlab_mr.updated_at))?;
let merged_at = gitlab_mr
.merged_at
.as_ref()
.and_then(|s| iso_to_ms(s));
let closed_at = gitlab_mr
.closed_at
.as_ref()
.and_then(|s| iso_to_ms(s));
// Prefer merge_user (current) over merged_by (deprecated)
// Both are bools; draft takes precedence if true, otherwise fall back to work_in_progress
let is_draft = gitlab_mr.draft || gitlab_mr.work_in_progress;
// Extract references (CP3-ready)
let (references_short, references_full) = gitlab_mr
.references
.as_ref()
.map(|r| (Some(r.short.clone()), Some(r.full.clone())))
.unwrap_or((None, None));
let merge_request = NormalizedMergeRequest {
gitlab_id: gitlab_mr.id,
project_id: local_project_id,
iid: gitlab_mr.iid,
title: gitlab_mr.title.clone(),
description: gitlab_mr.description.clone(),
state: gitlab_mr.state.clone(),
draft: is_draft,
author_username: gitlab_mr.author.username.clone(),
source_branch: gitlab_mr.source_branch.clone(),
target_branch: gitlab_mr.target_branch.clone(),
head_sha: gitlab_mr.sha.clone(),
references_short,
references_full,
detailed_merge_status: gitlab_mr
.detailed_merge_status
.clone()
.or_else(|| gitlab_mr.merge_status_legacy.clone()),
merge_user_username: gitlab_mr
.merge_user
.as_ref()
.map(|u| u.username.clone())
.or_else(|| gitlab_mr.merged_by.as_ref().map(|u| u.username.clone())),
created_at,
updated_at,
merged_at,
closed_at,
last_seen_at: now_ms(),
web_url: gitlab_mr.web_url.clone(),
};
let label_names = gitlab_mr.labels.clone();
let assignee_usernames = gitlab_mr
.assignees
.iter()
.map(|u| u.username.clone())
.collect();
let reviewer_usernames = gitlab_mr
.reviewers
.iter()
.map(|u| u.username.clone())
.collect();
Ok(MergeRequestWithMetadata {
merge_request,
label_names,
assignee_usernames,
reviewer_usernames,
})
}
/// Extract labels from GitLab MR (name-only, same as issues).
pub fn extract_labels(gitlab_mr: &GitLabMergeRequest, local_project_id: i64) -> Vec<NormalizedLabel> {
gitlab_mr
.labels
.iter()
.map(|name| NormalizedLabel {
project_id: local_project_id,
name: name.clone(),
})
.collect()
}
MR Discussion Transformer
// src/gitlab/transformers/discussion.rs (additions)
/// Transform GitLab discussion to normalized schema for MR context.
/// Note: The core transform_discussion and transform_notes functions
/// from CP1 are reused. This adds MR-specific handling.
pub fn transform_mr_discussion(
gitlab_discussion: &GitLabDiscussion,
local_project_id: i64,
local_mr_id: i64,
) -> NormalizedDiscussion {
let note_times: Vec<i64> = gitlab_discussion
.notes
.iter()
.filter_map(|n| iso_to_ms(&n.created_at))
.collect();
// Check if any note is resolvable (MR discussions often are)
let resolvable = gitlab_discussion.notes.iter().any(|n| n.resolvable);
let resolved = resolvable
&& gitlab_discussion
.notes
.iter()
.all(|n| !n.resolvable || n.resolved);
NormalizedDiscussion {
gitlab_discussion_id: gitlab_discussion.id.clone(),
project_id: local_project_id,
issue_id: None, // Not an issue discussion
merge_request_id: Some(local_mr_id),
noteable_type: "MergeRequest".to_string(),
individual_note: gitlab_discussion.individual_note,
first_note_at: note_times.iter().min().copied(),
last_note_at: note_times.iter().max().copied(),
last_seen_at: now_ms(),
resolvable,
resolved,
}
}
/// Transform GitLab notes with DiffNote position extraction.
/// Captures file path and line metadata for code review comments.
///
/// Returns Result to enforce strict timestamp parsing - corrupted timestamps
/// (zero values) would break cursor logic and time-series ordering.
pub fn transform_notes_with_diff_position(
gitlab_discussion: &GitLabDiscussion,
local_project_id: i64,
) -> Result<Vec<NormalizedNote>, String> {
gitlab_discussion
.notes
.iter()
.enumerate()
.map(|(index, note)| {
// Strict timestamp parsing - no silent zeroing
let created_at = iso_to_ms(¬e.created_at)
.ok_or_else(|| format!(
"Invalid note.created_at for note {}: {}",
note.id, note.created_at
))?;
let updated_at = iso_to_ms(¬e.updated_at)
.ok_or_else(|| format!(
"Invalid note.updated_at for note {}: {}",
note.id, note.updated_at
))?;
// Extract DiffNote position metadata if present
// Includes position_type, line_range, and SHA triplet for modern GitLab diff note shapes
let (old_path, new_path, old_line, new_line, position_type, lr_start, lr_end, base_sha, start_sha, head_sha) = note
.position
.as_ref()
.map(|pos| (
pos.old_path.clone(),
pos.new_path.clone(),
pos.old_line,
pos.new_line,
pos.position_type.clone(),
pos.line_range.as_ref().map(|r| r.start_line),
pos.line_range.as_ref().map(|r| r.end_line),
pos.base_sha.clone(),
pos.start_sha.clone(),
pos.head_sha.clone(),
))
.unwrap_or((None, None, None, None, None, None, None, None, None, None));
Ok(NormalizedNote {
gitlab_id: note.id,
project_id: local_project_id,
note_type: note.note_type.clone(),
is_system: note.system,
author_username: note.author.username.clone(),
body: note.body.clone(),
created_at,
updated_at,
last_seen_at: now_ms(),
position: index as i32,
resolvable: note.resolvable,
resolved: note.resolved,
resolved_by: note.resolved_by.as_ref().map(|a| a.username.clone()),
resolved_at: note.resolved_at.as_ref().and_then(|s| iso_to_ms(s)),
// DiffNote position metadata
position_old_path: old_path,
position_new_path: new_path,
position_old_line: old_line,
position_new_line: new_line,
// Extended position metadata for modern GitLab
position_type,
position_line_range_start: lr_start,
position_line_range_end: lr_end,
// SHA triplet for commit context (CP3-ready)
position_base_sha: base_sha,
position_start_sha: start_sha,
position_head_sha: head_sha,
})
})
.collect()
}
GitLab Client Additions
Pagination with Async Streams
// src/gitlab/client.rs (additions)
/// A page of merge requests with pagination metadata.
#[derive(Debug)]
pub struct MergeRequestPage {
pub items: Vec<GitLabMergeRequest>,
pub next_page: Option<u32>,
pub is_last_page: bool,
}
impl GitLabClient {
/// Parse Link header to extract rel="next" URL (RFC 8288).
/// Returns Some(url) if a next page link exists, None otherwise.
fn parse_link_header_next(headers: &reqwest::header::HeaderMap) -> Option<String> {
headers
.get("link")
.and_then(|v| v.to_str().ok())
.and_then(|link_str| {
// Parse Link header format: <url>; rel="next", <url>; rel="last"
for part in link_str.split(',') {
let part = part.trim();
if part.contains("rel=\"next\"") || part.contains("rel=next") {
// Extract URL between < and >
if let Some(start) = part.find('<') {
if let Some(end) = part.find('>') {
return Some(part[start + 1..end].to_string());
}
}
}
}
None
})
}
/// Fetch a single page of merge requests for a project.
/// Returns the items plus pagination metadata for page-aware cursor updates.
pub async fn fetch_merge_requests_page(
&self,
gitlab_project_id: i64,
updated_after: Option<i64>,
cursor_rewind_seconds: u32,
page: u32,
per_page: u32,
) -> Result<MergeRequestPage> {
let mut params = vec![
("scope", "all".to_string()),
("state", "all".to_string()),
("order_by", "updated_at".to_string()),
("sort", "asc".to_string()),
("per_page", per_page.to_string()),
("page", page.to_string()),
];
if let Some(ts) = updated_after {
// Apply cursor rewind for safety, clamping to 0 to avoid underflow
let rewind_ms = (cursor_rewind_seconds as i64) * 1000;
let rewound = (ts - rewind_ms).max(0);
if let Some(dt) = chrono::DateTime::from_timestamp_millis(rewound) {
params.push(("updated_after", dt.to_rfc3339()));
}
}
let (items, headers) = self
.request_with_headers::<Vec<GitLabMergeRequest>>(
&format!("/api/v4/projects/{gitlab_project_id}/merge_requests"),
¶ms,
)
.await?;
// Parse pagination with fallback chain:
// 1. Link header (most reliable, RFC 8288 compliant)
// 2. x-next-page header (GitLab-specific)
// 3. Full page heuristic (fallback for proxies that strip headers)
let link_next = parse_link_header_next(&headers);
let x_next_page = headers
.get("x-next-page")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u32>().ok());
let next_page = match (link_next, x_next_page, items.len() as u32 == per_page) {
(Some(_), _, _) => Some(page + 1), // Link header present: continue
(None, Some(np), _) => Some(np), // x-next-page present: use it
(None, None, true) => Some(page + 1), // Full page, no headers: try next
(None, None, false) => None, // Partial page: we're done
};
let is_last_page = items.is_empty() || next_page.is_none();
Ok(MergeRequestPage {
items,
next_page,
is_last_page,
})
}
/// Paginate through merge requests for a project.
/// Returns a stream of MRs that handles pagination automatically.
/// Note: For page-aware cursor updates, prefer fetch_merge_requests_page directly.
pub fn paginate_merge_requests(
&self,
gitlab_project_id: i64,
updated_after: Option<i64>,
cursor_rewind_seconds: u32,
) -> Pin<Box<dyn Stream<Item = Result<GitLabMergeRequest>> + Send + '_>> {
Box::pin(async_stream::try_stream! {
let mut page = 1u32;
let per_page = 100u32;
loop {
let page_result = self.fetch_merge_requests_page(
gitlab_project_id,
updated_after,
cursor_rewind_seconds,
page,
per_page,
).await?;
for mr in page_result.items {
yield mr;
}
if page_result.is_last_page {
break;
}
if let Some(np) = page_result.next_page {
page = np;
} else {
break;
}
}
})
}
/// Paginate through discussions for a merge request.
pub fn paginate_mr_discussions(
&self,
gitlab_project_id: i64,
mr_iid: i64,
) -> Pin<Box<dyn Stream<Item = Result<GitLabDiscussion>> + Send + '_>> {
Box::pin(async_stream::try_stream! {
let mut page = 1u32;
let per_page = 100u32;
loop {
let params = vec![
("per_page", per_page.to_string()),
("page", page.to_string()),
];
let (discussions, headers) = self
.request_with_headers::<Vec<GitLabDiscussion>>(
&format!("/api/v4/projects/{gitlab_project_id}/merge_requests/{mr_iid}/discussions"),
¶ms,
)
.await?;
for discussion in discussions.iter() {
yield discussion.clone();
}
// Robust fallback if pagination headers are missing
let next_page_hdr = headers
.get("x-next-page")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u32>().ok());
let next_page = match (next_page_hdr, discussions.len() as u32 == per_page) {
(Some(np), _) => Some(np), // Header present: trust it
(None, true) => Some(page + 1), // Full page, no header: try next
(None, false) => None, // Partial page: we're done
};
match next_page {
Some(np) if !discussions.is_empty() => page = np,
_ => break,
}
}
})
}
}
Orchestration: Dependent Discussion Sync
Canonical Pattern (CP2)
When gi ingest --type=merge_requests runs, it follows this orchestration (mirroring CP1 pattern):
-
Ingest merge requests (cursor-based, with incremental cursor updates per page)
-
Select MRs needing discussion sync (DB-driven) - After MR ingestion completes, query:
SELECT id, iid, updated_at FROM merge_requests WHERE project_id = ? AND (discussions_synced_for_updated_at IS NULL OR updated_at > discussions_synced_for_updated_at) ORDER BY updated_at ASC;This is more scalable and debuggable than in-memory collection. The DB is the source of truth.
-
Execute discussion sync with bounded concurrency (
dependent_concurrencyfrom config) -
Update watermark - After each MR's discussions are successfully ingested with no partial failures:
UPDATE merge_requests SET discussions_synced_for_updated_at = ? WHERE id = ?
Invariant: A rerun MUST NOT refetch discussions for MRs whose updated_at has not advanced, even with cursor rewind. Watermark is NOT advanced if any failure occurs (HTTP pagination failure OR note parse failure).
Ingestion Logic
MR Ingestion
// src/ingestion/merge_requests.rs
use rusqlite::Connection;
use tracing::{debug, info};
use crate::core::config::Config;
use crate::core::error::Result;
use crate::core::payloads::store_payload;
use crate::core::time::now_ms;
use crate::gitlab::client::GitLabClient;
use crate::gitlab::transformers::merge_request::{transform_merge_request, extract_labels};
/// Result of MR ingestion.
#[derive(Debug, Default)]
pub struct IngestMergeRequestsResult {
pub fetched: usize,
pub upserted: usize,
pub labels_created: usize,
pub assignees_linked: usize,
pub reviewers_linked: usize,
// Discussion sync is DB-driven, not in-memory collection.
// Use query: SELECT id, iid FROM merge_requests WHERE updated_at > discussions_synced_for_updated_at
// This avoids memory growth for large projects.
}
/// Info needed to sync discussions for an MR.
#[derive(Debug, Clone)]
pub struct MrForDiscussionSync {
pub local_mr_id: i64,
pub iid: i64,
pub updated_at: i64,
}
/// Result of upserting a merge request, including previous sync state.
#[derive(Debug)]
pub struct UpsertMergeRequestResult {
pub local_mr_id: i64,
pub changes: usize,
pub previous_discussions_synced_at: Option<i64>,
}
/// Ingest merge requests for a project using page-based cursor updates.
/// Discussion sync eligibility is determined via DB query AFTER ingestion completes.
pub async fn ingest_merge_requests(
conn: &Connection,
client: &GitLabClient,
config: &Config,
project_id: i64, // Local DB project ID
gitlab_project_id: i64, // GitLab project ID
full_sync: bool, // Reset cursor if true
) -> Result<IngestMergeRequestsResult> {
let mut result = IngestMergeRequestsResult::default();
// Reset cursor if full sync requested
if full_sync {
reset_cursor(conn, project_id, "merge_requests")?;
// Also reset discussion watermarks to force re-fetch of all discussions
// This ensures --full truly fetches everything, not just MRs
reset_discussion_watermarks(conn, project_id)?;
}
// Get current cursor
let cursor = get_cursor(conn, project_id)?;
let cursor_updated_at = cursor.0;
let cursor_gitlab_id = cursor.1;
let mut last_updated_at: Option<i64> = None;
let mut last_gitlab_id: Option<i64> = None;
let mut page = 1u32;
let per_page = 100u32;
// Page-based iteration for proper cursor boundary tracking
loop {
let page_result = client.fetch_merge_requests_page(
gitlab_project_id,
cursor_updated_at,
config.sync.cursor_rewind_seconds,
page,
per_page,
).await?;
if page_result.items.is_empty() {
break;
}
for mr in &page_result.items {
result.fetched += 1;
let mr_updated_at = crate::core::time::iso_to_ms(&mr.updated_at)
.ok_or_else(|| crate::core::error::GiError::ParseError {
field: "updated_at".to_string(),
value: mr.updated_at.clone(),
})?;
// Apply cursor filtering for tuple semantics
if let (Some(cursor_ts), Some(cursor_id)) = (cursor_updated_at, cursor_gitlab_id) {
if mr_updated_at < cursor_ts {
continue;
}
if mr_updated_at == cursor_ts && mr.id <= cursor_id {
continue;
}
}
// Begin transaction for this MR (atomicity + performance)
let tx = conn.unchecked_transaction()?;
// Store raw payload
let payload_id = store_payload(
&tx,
project_id,
"merge_request",
&mr.id.to_string(),
&mr,
config.storage.compress_raw_payloads,
)?;
// Transform and upsert MR
let transformed = transform_merge_request(&mr, project_id)
.map_err(|e| crate::core::error::GiError::ParseError {
field: "merge_request".to_string(),
value: e,
})?;
let upsert_result = upsert_merge_request(
&tx,
&transformed.merge_request,
payload_id,
)?;
if upsert_result.changes > 0 {
result.upserted += 1;
}
let local_mr_id = upsert_result.local_mr_id;
// Discussion sync eligibility is determined AFTER ingestion via DB query:
// SELECT id, iid FROM merge_requests WHERE updated_at > discussions_synced_for_updated_at
// This avoids memory growth for large projects.
// Clear existing label links (ensures removed labels are unlinked)
clear_mr_labels(&tx, local_mr_id)?;
// Extract and upsert labels (name-only, same as issues)
let labels = extract_labels(&mr, project_id);
for label in &labels {
let created = upsert_label(&tx, label)?;
if created {
result.labels_created += 1;
}
// Link MR to label
let label_id = get_label_id(&tx, project_id, &label.name)?;
link_mr_label(&tx, local_mr_id, label_id)?;
}
// Clear and relink assignees
clear_mr_assignees(&tx, local_mr_id)?;
for username in &transformed.assignee_usernames {
upsert_mr_assignee(&tx, local_mr_id, username)?;
result.assignees_linked += 1;
}
// Clear and relink reviewers
clear_mr_reviewers(&tx, local_mr_id)?;
for username in &transformed.reviewer_usernames {
upsert_mr_reviewer(&tx, local_mr_id, username)?;
result.reviewers_linked += 1;
}
tx.commit()?;
// Track for cursor update
last_updated_at = Some(mr_updated_at);
last_gitlab_id = Some(mr.id);
}
// Page-boundary cursor flush (not based on fetched % 100)
// This ensures cursor reflects actual processed page boundaries
if let (Some(updated_at), Some(gitlab_id)) = (last_updated_at, last_gitlab_id) {
update_cursor(conn, project_id, "merge_requests", updated_at, gitlab_id)?;
}
// Check for more pages
if page_result.is_last_page {
break;
}
match page_result.next_page {
Some(np) => page = np,
None => break,
}
}
// Final cursor update (in case we exited mid-loop)
if let (Some(updated_at), Some(gitlab_id)) = (last_updated_at, last_gitlab_id) {
update_cursor(conn, project_id, "merge_requests", updated_at, gitlab_id)?;
}
info!(
project_id,
fetched = result.fetched,
upserted = result.upserted,
labels_created = result.labels_created,
assignees_linked = result.assignees_linked,
reviewers_linked = result.reviewers_linked,
"MR ingestion complete"
);
Ok(result)
}
/// Upsert a merge request and return sync state for inline discussion decision.
fn upsert_merge_request(
conn: &Connection,
mr: &NormalizedMergeRequest,
payload_id: Option<i64>,
) -> Result<UpsertMergeRequestResult> {
// Perform upsert
let changes = conn.execute(
"INSERT INTO merge_requests (
gitlab_id, project_id, iid, title, description, state, draft,
author_username, source_branch, target_branch,
head_sha, references_short, references_full,
detailed_merge_status, merge_user_username,
created_at, updated_at, merged_at, closed_at, last_seen_at,
web_url, raw_payload_id
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22)
ON CONFLICT(gitlab_id) DO UPDATE SET
title = excluded.title,
description = excluded.description,
state = excluded.state,
draft = excluded.draft,
head_sha = excluded.head_sha,
references_short = excluded.references_short,
references_full = excluded.references_full,
detailed_merge_status = excluded.detailed_merge_status,
merge_user_username = excluded.merge_user_username,
updated_at = excluded.updated_at,
merged_at = excluded.merged_at,
closed_at = excluded.closed_at,
last_seen_at = excluded.last_seen_at,
raw_payload_id = excluded.raw_payload_id",
rusqlite::params![
mr.gitlab_id, mr.project_id, mr.iid, mr.title, mr.description,
mr.state, mr.draft as i32,
mr.author_username, mr.source_branch, mr.target_branch,
mr.head_sha, mr.references_short, mr.references_full,
mr.detailed_merge_status, mr.merge_user_username,
mr.created_at, mr.updated_at, mr.merged_at, mr.closed_at, mr.last_seen_at,
mr.web_url, payload_id,
],
)?;
// Get local ID (either existing or newly inserted)
let local_mr_id = conn.query_row(
"SELECT id FROM merge_requests WHERE gitlab_id = ?",
[mr.gitlab_id],
|row| row.get(0),
).expect("MR must exist after upsert");
Ok(UpsertMergeRequestResult {
local_mr_id,
changes,
previous_discussions_synced_at: None,
})
}
// Database helper functions follow the same pattern as issues.rs
// (get_cursor, update_cursor, reset_cursor, upsert_merge_request, etc.)
/// Reset discussion watermarks for all MRs in a project.
/// Called when --full sync is requested to force re-fetch of all discussions.
fn reset_discussion_watermarks(conn: &Connection, project_id: i64) -> Result<()> {
conn.execute(
"UPDATE merge_requests
SET discussions_synced_for_updated_at = NULL,
discussions_sync_attempts = 0,
discussions_sync_last_error = NULL
WHERE project_id = ?",
[project_id],
)?;
Ok(())
}
/// Record sync health telemetry for debugging failed discussion syncs.
fn record_sync_health_error(conn: &Connection, mr_id: i64, error: &str) -> Result<()> {
conn.execute(
"UPDATE merge_requests SET
discussions_sync_last_attempt_at = ?,
discussions_sync_attempts = COALESCE(discussions_sync_attempts, 0) + 1,
discussions_sync_last_error = ?
WHERE id = ?",
rusqlite::params![now_ms(), error, mr_id],
)?;
Ok(())
}
/// Clear sync health error on successful sync.
fn clear_sync_health_error(conn: &Connection, mr_id: i64) -> Result<()> {
conn.execute(
"UPDATE merge_requests SET
discussions_sync_last_attempt_at = ?,
discussions_sync_attempts = 0,
discussions_sync_last_error = NULL
WHERE id = ?",
rusqlite::params![now_ms(), mr_id],
)?;
Ok(())
}
MR Discussion Ingestion
// src/ingestion/mr_discussions.rs
use futures::StreamExt;
use rusqlite::Connection;
use tracing::{debug, warn};
use crate::core::config::Config;
use crate::core::error::Result;
use crate::core::payloads::store_payload;
use crate::core::time::now_ms;
use crate::gitlab::client::GitLabClient;
use crate::gitlab::transformers::discussion::{
transform_mr_discussion,
transform_notes_with_diff_position,
};
/// Result of discussion ingestion for a single MR.
#[derive(Debug, Default)]
pub struct IngestMrDiscussionsResult {
pub discussions_fetched: usize,
pub discussions_upserted: usize,
pub notes_upserted: usize,
pub notes_skipped_bad_timestamp: usize,
pub system_notes_count: usize,
pub diffnotes_count: usize,
pub pagination_succeeded: bool,
}
/// Ingest discussions for a single MR.
/// Called only when mr.updated_at > discussions_synced_for_updated_at.
///
/// CRITICAL INVARIANTS:
/// 1. Watermark is ONLY advanced if pagination completes successfully AND all notes parse successfully.
/// 2. Notes are parsed BEFORE any destructive DB operations to prevent data loss.
/// 3. Stale discussions are removed via last_seen_at sweep (not in-memory ID collection).
pub async fn ingest_mr_discussions(
conn: &Connection,
client: &GitLabClient,
config: &Config,
project_id: i64,
gitlab_project_id: i64,
mr_iid: i64,
local_mr_id: i64,
mr_updated_at: i64,
) -> Result<IngestMrDiscussionsResult> {
let mut result = IngestMrDiscussionsResult {
pagination_succeeded: true, // Assume success, set false on error
..Default::default()
};
// Record sync start time for last_seen_at sweep
let run_seen_at = now_ms();
let mut stream = client.paginate_mr_discussions(gitlab_project_id, mr_iid);
while let Some(discussion_result) = stream.next().await {
let discussion = match discussion_result {
Ok(d) => d,
Err(e) => {
// Log error and mark pagination as failed
warn!(
project_id,
mr_iid,
error = %e,
"Error fetching MR discussion page"
);
result.pagination_succeeded = false;
break;
}
};
result.discussions_fetched += 1;
// CRITICAL: Parse/transform notes BEFORE any destructive DB operations.
// If parsing fails, we preserve prior data and treat as partial failure (no watermark advance).
let notes = match transform_notes_with_diff_position(&discussion, project_id) {
Ok(notes) => notes,
Err(e) => {
warn!(
project_id,
mr_iid,
discussion_id = %discussion.id,
error = %e,
"Note transform failed; preserving existing notes; MR watermark will NOT advance"
);
result.notes_skipped_bad_timestamp += discussion.notes.len();
result.pagination_succeeded = false;
continue;
}
};
// Begin transaction for this discussion (only AFTER parsing succeeded)
let tx = conn.unchecked_transaction()?;
// Store raw payload for discussion
let discussion_payload_id = store_payload(
&tx,
project_id,
"discussion",
&discussion.id,
&discussion,
config.storage.compress_raw_payloads,
)?;
// Transform and upsert discussion (MR context) with run_seen_at timestamp
let mut normalized = transform_mr_discussion(&discussion, project_id, local_mr_id);
normalized.last_seen_at = run_seen_at;
upsert_discussion(&tx, &normalized, discussion_payload_id)?;
result.discussions_upserted += 1;
// Get local discussion ID
let local_discussion_id = get_local_discussion_id(&tx, project_id, &discussion.id)?;
// Upsert notes instead of delete-all-then-insert
// This reduces write amplification and preserves any local metadata
for note in ¬es {
// Selective raw payload storage for notes:
// - Store for DiffNotes (position metadata useful for file-context queries)
// - Store for non-system notes (user-generated content)
// - Skip for system notes (numerous, low-value, discussion payload has them)
let should_store_note_payload =
!note.is_system() ||
note.position_new_path().is_some() ||
note.position_old_path().is_some();
let note_payload_id = if should_store_note_payload {
let gitlab_note = discussion.notes.iter().find(|n| n.id() == note.gitlab_id());
if let Some(gn) = gitlab_note {
store_payload(
&tx,
project_id,
"note",
¬e.gitlab_id().to_string(),
gn,
config.storage.compress_raw_payloads,
)?
} else {
None
}
} else {
None
};
// Upsert note with run_seen_at timestamp for sweep
upsert_note(&tx, local_discussion_id, note, note_payload_id, run_seen_at)?;
result.notes_upserted += 1;
if note.is_system() {
result.system_notes_count += 1;
}
// Count DiffNotes
if note.position_new_path().is_some() || note.position_old_path().is_some() {
result.diffnotes_count += 1;
}
}
tx.commit()?;
}
// Remove stale discussions AND notes via last_seen_at sweep (only if pagination completed successfully)
// This is simpler and more scalable than collecting seen IDs in memory
if result.pagination_succeeded {
// Sweep stale discussions
conn.execute(
"DELETE FROM discussions
WHERE project_id = ? AND merge_request_id = ?
AND last_seen_at < ?",
rusqlite::params![project_id, local_mr_id, run_seen_at],
)?;
// Sweep stale notes for this MR's discussions
// Notes inherit staleness from their discussion, but we also sweep at note level
// for cases where notes are removed from an existing discussion
conn.execute(
"DELETE FROM notes
WHERE discussion_id IN (
SELECT id FROM discussions
WHERE project_id = ? AND merge_request_id = ?
)
AND last_seen_at < ?",
rusqlite::params![project_id, local_mr_id, run_seen_at],
)?;
}
// CRITICAL: Only advance watermark if pagination succeeded completely AND no parse failures.
// If we advance on partial failure, we permanently lose data for this MR version.
if result.pagination_succeeded {
mark_discussions_synced(conn, local_mr_id, mr_updated_at)?;
// Clear sync health error on success
clear_sync_health_error(conn, local_mr_id)?;
} else {
// Record sync health telemetry for debugging
record_sync_health_error(
conn,
local_mr_id,
"Pagination incomplete or parse failure; will retry on next sync"
)?;
warn!(
project_id,
mr_iid,
local_mr_id,
"Discussion sync incomplete; watermark NOT advanced (will retry on next sync)"
);
}
debug!(
project_id,
mr_iid,
discussions = result.discussions_fetched,
notes = result.notes_upserted,
diffnotes = result.diffnotes_count,
pagination_succeeded = result.pagination_succeeded,
"MR discussions ingested"
);
Ok(result)
}
fn mark_discussions_synced(conn: &Connection, mr_id: i64, mr_updated_at: i64) -> Result<()> {
conn.execute(
"UPDATE merge_requests SET discussions_synced_for_updated_at = ? WHERE id = ?",
rusqlite::params![mr_updated_at, mr_id],
)?;
Ok(())
}
/// Record sync health telemetry for debugging failed discussion syncs.
fn record_sync_health_error(conn: &Connection, mr_id: i64, error: &str) -> Result<()> {
conn.execute(
"UPDATE merge_requests SET
discussions_sync_last_attempt_at = ?,
discussions_sync_attempts = COALESCE(discussions_sync_attempts, 0) + 1,
discussions_sync_last_error = ?
WHERE id = ?",
rusqlite::params![now_ms(), error, mr_id],
)?;
Ok(())
}
/// Clear sync health error on successful sync.
fn clear_sync_health_error(conn: &Connection, mr_id: i64) -> Result<()> {
conn.execute(
"UPDATE merge_requests SET
discussions_sync_last_attempt_at = ?,
discussions_sync_attempts = 0,
discussions_sync_last_error = NULL
WHERE id = ?",
rusqlite::params![now_ms(), mr_id],
)?;
Ok(())
}
CLI Commands
gi ingest --type=merge_requests
Fetch and store all MRs from configured projects.
Implementation Updates:
// src/cli/commands/ingest.rs
/// Run the ingest command.
pub async fn run_ingest(
config: &Config,
resource_type: &str,
project_filter: Option<&str>,
force: bool,
full: bool,
) -> Result<IngestResult> {
match resource_type {
"issues" => {
// Existing CP1 implementation
run_issue_ingest(config, project_filter, force, full).await
}
"merge_requests" => {
// NEW: CP2 implementation
run_mr_ingest(config, project_filter, force, full).await
}
_ => Err(GiError::InvalidArgument {
name: "type".to_string(),
value: resource_type.to_string(),
expected: "issues or merge_requests".to_string(),
}),
}
}
Output:
Ingesting merge requests...
group/project-one: 567 MRs fetched, 12 new labels, 89 assignees, 45 reviewers
Fetching discussions (123 MRs with updates)...
group/project-one: 123 MRs -> 456 discussions, 1,234 notes (89 DiffNotes)
Total: 567 MRs, 456 discussions, 1,234 notes (excluding 2,345 system notes)
Skipped discussion sync for 444 unchanged MRs.
gi list mrs
Output:
Merge Requests (showing 20 of 1,234)
!847 Refactor auth to use JWT tokens merged @johndoe main <- feature/jwt 3 days ago
!846 Fix memory leak in websocket handler opened @janedoe main <- fix/websocket 5 days ago
!845 [DRAFT] Add dark mode CSS variables opened @bobsmith main <- ui/dark-mode 1 week ago
!844 Update dependencies closed @alice main <- chore/deps 1 week ago
...
Filter Options (same pattern as issues):
--state [opened|merged|closed|locked|all]--draft(MR-specific: show only draft MRs)--no-draft(MR-specific: exclude draft MRs)--author <username>--assignee <username>--reviewer <username>(MR-specific)--target-branch <branch>(MR-specific)--source-branch <branch>(MR-specific)--label <name>(repeatable)--project <path>--limit <n>--since <duration|date>--json--open(open in browser)
gi show mr <iid>
Output:
Merge Request !847: Refactor auth to use JWT tokens
================================================================================
Project: group/project-one
State: merged
Draft: No
Author: @johndoe
Assignees: @janedoe, @bobsmith
Reviewers: @alice, @charlie
Source: feature/jwt
Target: main
Merge Status: mergeable (detailed_merge_status)
Merged By: @alice
Merged At: 2024-03-20 14:30:00
Created: 2024-03-15
Updated: 2024-03-20
Labels: enhancement, auth, reviewed
URL: https://gitlab.example.com/group/project-one/-/merge_requests/847
Description:
Moving away from session cookies to JWT-based authentication for better
mobile client support and API consumption...
Discussions (8):
@janedoe (2024-03-16) [src/auth/jwt.ts:45]:
Should we use a separate signing key for refresh tokens?
@johndoe (2024-03-16):
Good point. I'll add a separate key with rotation support.
@alice (2024-03-18) [RESOLVED]:
Looks good! Just one nit about the token expiry constant.
gi count mrs
Output:
Merge Requests: 1,234
opened: 89
merged: 1,045
closed: 100
gi count discussions --type=mr
Output:
MR Discussions: 4,567
gi count notes --type=mr
Output:
MR Notes: 12,345 (excluding 2,345 system notes)
DiffNotes: 3,456 (with file position metadata)
Automated Tests
Unit Tests
// tests/mr_transformer_tests.rs
#[cfg(test)]
mod tests {
use gi::gitlab::transformers::merge_request::*;
use gi::gitlab::types::*;
#[test]
fn transforms_gitlab_mr_to_normalized_schema() { /* ... */ }
#[test]
fn extracts_labels_from_mr_payloads() { /* ... */ }
#[test]
fn extracts_assignees_from_mr_payloads() { /* ... */ }
#[test]
fn extracts_reviewers_from_mr_payloads() { /* ... */ }
#[test]
fn handles_missing_optional_fields_gracefully() { /* ... */ }
#[test]
fn converts_merged_at_and_closed_at_timestamps() { /* ... */ }
#[test]
fn handles_mrs_with_no_labels_assignees_reviewers() { /* ... */ }
}
// tests/diffnote_tests.rs
#[cfg(test)]
mod tests {
use gi::gitlab::transformers::discussion::*;
#[test]
fn extracts_diffnote_position_metadata() { /* ... */ }
#[test]
fn handles_notes_without_position() { /* ... */ }
#[test]
fn extracts_old_path_and_new_path() { /* ... */ }
#[test]
fn extracts_line_numbers() { /* ... */ }
#[test]
fn handles_renamed_files_in_diffnote() { /* ... */ }
}
// tests/mr_discussion_tests.rs
#[cfg(test)]
mod tests {
#[test]
fn transforms_mr_discussion_with_correct_noteable_type() { /* ... */ }
#[test]
fn computes_resolved_state_for_mr_discussions() { /* ... */ }
#[test]
fn links_discussion_to_merge_request_id() { /* ... */ }
#[test]
fn handles_individual_note_mr_discussions() { /* ... */ }
}
Integration Tests
// tests/mr_ingestion_tests.rs
#[cfg(test)]
mod tests {
use tempfile::TempDir;
use wiremock::{MockServer, Mock, ResponseTemplate};
use wiremock::matchers::{method, path_regex};
#[tokio::test]
async fn inserts_mrs_into_database() { /* ... */ }
#[tokio::test]
async fn creates_labels_from_mr_payloads() { /* ... */ }
#[tokio::test]
async fn links_mrs_to_labels_via_junction_table() { /* ... */ }
#[tokio::test]
async fn removes_stale_label_links_on_resync() { /* ... */ }
#[tokio::test]
async fn creates_assignee_links() { /* ... */ }
#[tokio::test]
async fn creates_reviewer_links() { /* ... */ }
#[tokio::test]
async fn stores_raw_payload_for_each_mr() { /* ... */ }
#[tokio::test]
async fn updates_cursor_at_page_boundaries() { /* ... */ }
#[tokio::test]
async fn resumes_from_cursor_on_subsequent_runs() { /* ... */ }
#[tokio::test]
async fn full_sync_resets_cursor() { /* ... */ }
#[tokio::test]
async fn full_sync_resets_discussion_watermarks() {
// Setup: Create MR with discussions_synced_for_updated_at set
// Action: Run ingest with --full flag
// Assert: discussions_synced_for_updated_at is NULL after sync
// Assert: Discussions are re-fetched for this MR
}
#[tokio::test]
async fn handles_mrs_with_no_labels_assignees_reviewers() { /* ... */ }
#[tokio::test]
async fn upserts_existing_mrs_on_refetch() { /* ... */ }
#[tokio::test]
async fn skips_discussion_refetch_for_unchanged_mrs() { /* ... */ }
#[tokio::test]
async fn captures_diffnote_position_metadata() { /* ... */ }
#[tokio::test]
async fn removes_stale_discussions_after_successful_pagination() { /* ... */ }
#[tokio::test]
async fn captures_draft_status_correctly() { /* ... */ }
#[tokio::test]
async fn uses_detailed_merge_status_over_deprecated_field() { /* ... */ }
#[tokio::test]
async fn uses_merge_user_over_deprecated_merged_by() { /* ... */ }
// CRITICAL: Verify preference ordering when BOTH deprecated and current fields are present.
// GitLab may return both for backward compatibility, and we must always prefer the current field.
#[tokio::test]
async fn prefers_detailed_merge_status_when_both_fields_present() {
// Setup: MR payload with BOTH detailed_merge_status AND merge_status
// Mock: Return MR with detailed_merge_status="discussions_not_resolved" AND merge_status="can_be_merged"
// Assert: detailed_merge_status in DB is "discussions_not_resolved" (not "can_be_merged")
}
#[tokio::test]
async fn prefers_merge_user_when_both_fields_present() {
// Setup: MR payload with BOTH merge_user AND merged_by
// Mock: Return MR with merge_user.username="alice" AND merged_by.username="bob"
// Assert: merge_user_username in DB is "alice" (not "bob")
}
#[tokio::test]
async fn prefers_draft_when_both_draft_and_work_in_progress_present() {
// Setup: MR payload with BOTH draft=false AND work_in_progress=true
// Mock: Return MR with draft=false, work_in_progress=true
// Assert: draft in DB is true (work_in_progress fallback applies when draft is false)
// Note: This tests the OR semantics: draft || work_in_progress
}
// CRITICAL: This test ensures we don't permanently lose data on partial pagination failures.
// If pagination fails mid-way, the watermark must NOT advance, so the next sync
// will retry fetching discussions for this MR.
#[tokio::test]
async fn does_not_advance_discussion_watermark_on_partial_failure() {
// Setup: Create MR with updated_at > discussions_synced_for_updated_at
// Mock: Page 1 of discussions returns successfully
// Mock: Page 2 returns 500 Internal Server Error
// Assert: discussions_synced_for_updated_at is unchanged (not advanced)
// Assert: Re-running ingest will retry this MR's discussions
}
// CRITICAL: This test ensures we don't lose data when note parsing fails.
// If any note in a discussion has an invalid timestamp, we must NOT delete
// existing notes, and we must NOT advance the watermark.
#[tokio::test]
async fn does_not_advance_discussion_watermark_on_note_parse_failure() {
// Setup: Create MR with existing discussion and valid notes
// Mock: Return discussion with one note having invalid created_at
// Assert: Original 3 notes are preserved
// Assert: No partial replacement occurred
}
// Verify atomic note replacement: parsing happens BEFORE deletion
#[tokio::test]
async fn atomic_note_replacement_preserves_data_on_parse_failure() {
// Setup: Discussion with 3 existing notes
// Mock: Return updated discussion where note 2 has invalid timestamp
// Assert: Original 3 notes are preserved
// Assert: No partial replacement occurred
}
#[tokio::test]
async fn uses_work_in_progress_when_draft_not_present() { /* ... */ }
#[tokio::test]
async fn handles_locked_mr_state() { /* ... */ }
#[tokio::test]
async fn pagination_continues_when_headers_missing_but_full_page_returned() { /* ... */ }
#[tokio::test]
async fn does_not_store_raw_payload_for_system_notes_without_position() { /* ... */ }
#[tokio::test]
async fn stores_raw_payload_for_diffnotes_even_if_system() { /* ... */ }
#[tokio::test]
async fn captures_position_type_and_line_range_for_diffnotes() { /* ... */ }
}
Manual Smoke Tests
| Command | Expected Output | Pass Criteria |
|---|---|---|
gi ingest --type=merge_requests |
Progress bar, final count | Completes without error |
gi list mrs --limit=10 |
Table of 10 MRs | Shows iid, title, state, author, branches, [DRAFT] prefix |
gi list mrs --project=group/project-one |
Filtered list | Only shows MRs from that project |
gi list mrs --state=merged |
Filtered by state | Only merged MRs shown |
gi list mrs --state=locked |
Filtered by state | Only locked MRs shown (merge in progress) |
gi list mrs --draft |
Filtered by draft | Only draft MRs shown |
gi list mrs --no-draft |
Excludes drafts | No draft MRs shown |
gi list mrs --reviewer=username |
Filtered by reviewer | Only MRs with that reviewer |
gi list mrs --target-branch=main |
Filtered by target | Only MRs targeting main |
gi count mrs |
Merge Requests: N |
Count matches GitLab UI |
gi show mr 123 |
MR detail view | Shows title, description, branches, discussions, detailed_merge_status |
gi show mr 123 (draft MR) |
MR detail view | Shows Draft: Yes |
gi show mr 123 (ambiguous) |
Prompt or error | Asks for --project clarification |
gi count discussions --type=mr |
MR Discussions: N |
Non-zero count |
gi count notes --type=mr |
MR Notes: N (excluding M system) |
Non-zero count, shows DiffNote count |
gi sync-status |
Shows MR cursor positions | MR cursors visible alongside issue cursors |
gi ingest --type=merge_requests (re-run) |
0 new MRs |
Cursor is current |
gi ingest --type=merge_requests (re-run) |
Skipped discussion sync for N unchanged MRs |
Watermark works |
gi ingest --type=merge_requests (concurrent) |
Lock error | Second run fails with clear message |
gi ingest --type=merge_requests --full |
Full re-sync | Resets cursor, fetches everything |
| Remove label from MR in GitLab, re-sync | Label link removed | Junction table reflects GitLab state |
| Add reviewer in GitLab, re-sync | Reviewer link added | Junction table updated |
Data Integrity Checks
After successful ingestion, verify:
SELECT COUNT(*) FROM merge_requestsmatches GitLab MR count for configured projects- Every MR has a corresponding
raw_payloadsrow draftfield matches GitLab UI (draft MRs have draft=1, includeswork_in_progressfallback)stateincludes all expected values (opened,merged,closed,locked)detailed_merge_statusis populated for modern GitLab instanceshead_shais populated (NULL only for very old MRs or edge cases)references_shortandreferences_fullare populated when GitLab returns them- Labels in
mr_labelsjunction all exist inlabelstable mr_labelscount per MR matches GitLab UI label countmr_assigneesusernames match GitLab UImr_reviewersusernames match GitLab UIsync_cursorshas entry for each(project_id, 'merge_requests')pair- Re-running
gi ingest --type=merge_requestsfetches 0 new items (cursor is current) - Re-running skips discussion refetch for unchanged MRs (watermark works)
--fullsync resetsdiscussions_synced_for_updated_atto NULL for all MRs- After partial pagination failure,
discussions_synced_for_updated_atis NOT advanced - After note parse failure,
discussions_synced_for_updated_atis NOT advanced - After note parse failure, existing notes are preserved (not deleted)
SELECT COUNT(*) FROM discussions WHERE noteable_type='MergeRequest'is non-zero- Every MR discussion has at least one note
- DiffNotes have
position_new_pathpopulated when available - DiffNotes have
position_typepopulated when available (text/image/file) - DiffNotes have
position_base_sha,position_start_sha,position_head_shapopulated when available - Multi-line DiffNotes have
position_line_range_start/endpopulated when available SELECT COUNT(*) FROM notes WHERE position_new_path IS NOT NULLmatches expected DiffNote count- Discussion
first_note_at<=last_note_atfor all rows - No notes have
created_at = 0orupdated_at = 0(strict timestamp parsing) - System notes without position do NOT have raw_payloads rows (selective storage)
- After removing a label/reviewer in GitLab and re-syncing, the link is removed
- Stale discussions removed via
last_seen_atsweep after successful pagination - Stale notes removed via
last_seen_atsweep (upsert + sweep pattern)
Definition of Done
Gate A: MRs Only (Must Pass First)
gi ingest --type=merge_requestsfetches all MRs from configured projects- MRs stored with correct schema, including branches and merge metadata
draftfield captured correctly (withwork_in_progressfallback for older instances)statefield supports all values:opened,merged,closed,lockedlockedstate handled as transitional (local filtering only, not server-side filter)detailed_merge_statusused (non-deprecated) with fallback tomerge_statusmerge_user_usernameused (non-deprecated) with fallback tomerged_byhead_shacaptured for CP3 readinessreferences_shortandreferences_fullcaptured for CP3 readiness- Cursor-based sync is resumable (re-run fetches only new/updated)
- Page-boundary cursor updates (not based on item count modulo)
- Pagination uses robust fallback chain: Link header > x-next-page > full-page heuristic
- Raw payloads stored for each MR
gi list mrsandgi count mrswork
Gate B: Labels + Assignees + Reviewers (Must Pass)
- Labels extracted and stored (name-only)
- Label links created correctly
- Stale label links removed on re-sync (verified with test)
- Assignees extracted and linked to
mr_assignees - Reviewers extracted and linked to
mr_reviewers - Stale assignee/reviewer links removed on re-sync
Gate C: Dependent Discussion Sync with DiffNotes (Must Pass)
- Discussions fetched for MRs with
updated_atadvancement - Notes stored with
is_systemflag correctly set - Upsert + sweep pattern for notes (not delete-all-then-insert; reduces write amplification)
- Atomic note replacement (notes parsed BEFORE existing notes deleted - prevents data loss)
- Strict timestamp parsing (no zero values from invalid timestamps)
- DiffNote position metadata captured (
position_new_path,position_type, line numbers, line_range) - DiffNote SHA triplet captured (
position_base_sha,position_start_sha,position_head_sha) - Selective raw payload storage (skip system notes without position)
- Raw payloads stored for discussions and DiffNotes
discussions_synced_for_updated_atwatermark updated after sync- Watermark is NOT advanced if any discussion page fetch fails (verified with test)
- Watermark is NOT advanced if any note parse fails (verified with test)
- Unchanged MRs skip discussion refetch (verified with test)
- Stale discussions removed via
last_seen_atsweep after successful pagination - Stale notes removed via
last_seen_atsweep (upsert + sweep pattern) - Bounded concurrency (
dependent_concurrencyrespected)
Gate D: Resumability Proof (Must Pass)
- Kill mid-run, rerun; bounded redo (cursor progress preserved at page boundaries)
- No redundant discussion refetch after crash recovery
- No watermark advancement on partial pagination failure
- No watermark advancement on note parse failure (preserves existing data)
- Single-flight lock prevents concurrent runs
--fullflag resets cursor and fetches all data--fullflag also resetsdiscussions_synced_for_updated_at(forces discussion refetch)
Gate E: CLI Complete (Must Pass)
gi list mrswith all filter options including--draftand--no-draftgi list mrsshows [DRAFT] prefix for draft MRsgi show mr <iid>displays full detail with discussionsgi show mrshows DiffNote file context in discussion displaygi show mrshowsdetailed_merge_statusgi count mrsshows state breakdowngi sync-statusshows MR cursor positions
Final Gate (Must Pass)
- All unit tests pass (
cargo test) - All integration tests pass (mocked with wiremock)
does_not_advance_discussion_watermark_on_partial_failuretest passesprefers_detailed_merge_status_when_both_fields_presenttest passesprefers_merge_user_when_both_fields_presenttest passesprefers_draft_when_both_draft_and_work_in_progress_presenttest passescargo clippypasses with no warningscargo fmt --checkpasses- Compiles with
--release
Hardening (Optional Before CP3)
- Edge cases: MRs with 0 labels, 0 discussions, no assignees/reviewers
- Large pagination (100+ pages)
- Rate limit handling under sustained load
- Live tests pass against real GitLab instance
- Performance: 1000+ MRs ingested in <5 min
- DB size verification: system note payloads not stored
Implementation Order
-
Database migration (15 min)
migrations/006_merge_requests.sql- Update
MIGRATIONSconst insrc/core/db.rs
-
GitLab types (15 min)
- Add
GitLabMergeRequest,GitLabReviewertosrc/gitlab/types.rs - Test deserialization with fixtures
- Add
-
MR Transformer (25 min)
src/gitlab/transformers/merge_request.rsMergeRequestWithMetadatastruct- Unit tests
-
Discussion transformer updates (15 min)
- Add
transform_mr_discussion()function - Add
transform_notes_with_diff_position()function - Unit tests for DiffNote extraction
- Add
-
GitLab client pagination (20 min)
- Add
paginate_merge_requests() - Add
paginate_mr_discussions()
- Add
-
MR ingestion (45 min)
src/ingestion/merge_requests.rs- Transaction batching
- Label/assignee/reviewer linking with stale removal
- Incremental cursor updates
- Return
mrs_needing_discussion_sync - Integration tests
-
MR discussion ingestion (30 min)
src/ingestion/mr_discussions.rs- DiffNote position capture
- Stale discussion removal
- Watermark update
- Integration tests
-
Update orchestrator (20 min)
src/ingestion/orchestrator.rs- Support both issue and MR ingestion
- Aggregate results
-
Update ingest command (15 min)
src/cli/commands/ingest.rs- Route
merge_requeststype to MR ingest
-
Implement MR CLI commands (45 min)
- Update
gi listfor MRs with filters - Update
gi showfor MR detail view - Update
gi countfor MR counts - Update
gi sync-statusfor MR cursors
- Update
-
Final validation (20 min)
cargo testcargo clippy- Gate A/B/C/D/E verification
- Manual smoke tests
- Data integrity checks
Risks & Mitigations
| Risk | Mitigation |
|---|---|
| GitLab rate limiting during large sync | Respect Retry-After, exponential backoff, configurable concurrency |
| MR discussion API N+1 problem | dependent_concurrency config limits parallel requests; watermark prevents refetch |
| Cursor drift if GitLab timestamp behavior changes | Rolling backfill window catches missed items |
| Large MRs with 100+ discussions | Paginate discussions, bound memory usage |
| System notes pollute data | is_system flag allows filtering |
| Label/assignee/reviewer deduplication | Clear and re-link pattern ensures correctness |
| DiffNote position field variations | Defensive parsing with Option types; extended fields (position_type, line_range, SHA triplet) for modern GitLab |
| Async stream complexity | Use async-stream crate for ergonomic generators |
| rusqlite + async runtime Send/locking pitfalls | Use LocalSet + spawn_local for non-Send tasks; each worker opens its own SQLite connection (WAL + busy_timeout). Avoid holding DB handles across .await. |
| Crash causes massive refetch | Page-boundary cursor updates (not item-count based) |
| Cursor rewind causes discussion refetch | Per-MR watermark (discussions_synced_for_updated_at) |
| Stale discussions accumulate | Remove discussions via last_seen_at < run_seen_at sweep (not in-memory ID collection) |
| Stale notes accumulate | Remove notes via last_seen_at sweep (upsert + sweep pattern) |
| Partial pagination failure loses data | Watermark NOT advanced unless pagination completes; explicit test coverage |
| Invalid timestamps corrupt cursor logic | Strict timestamp parsing (Result types, no unwrap_or(0)); bad notes logged and skipped; parse BEFORE delete |
| Note parse failure causes data loss | Parse notes BEFORE any destructive DB operations; if parse fails, preserve existing data and don't advance watermark |
| Raw payload size explodes with notes | Selective storage: DiffNotes + non-system notes only; system notes use discussion payload |
| Deprecated GitLab fields cause future churn | Use detailed_merge_status and merge_user with fallback to old; explicit tests verify preference when both present |
| Memory growth on large projects | DB-driven discussion sync selection; no in-memory collection of MRs needing sync |
| Pagination headers missing (proxy/instance issue) | Robust fallback chain: Link header (RFC 8288) > x-next-page > full-page heuristic |
| GitLab version differences | Handle locked state (transitional, local filtering); support both draft and work_in_progress; optional reviewer verification endpoint for debugging |
| --full sync leaves stale discussion data | --full resets both MR cursor AND discussions_synced_for_updated_at watermarks |
| Write amplification on note updates | Upsert + sweep pattern instead of delete-all-then-insert |
API Call Estimation
For a project with 500 MRs (average 10 discussions each, 3 notes per discussion):
| Operation | Calls | Notes |
|---|---|---|
| MR list pages | 5 | 100 per page |
| Discussion pages per MR | 1 | Most MRs have <100 discussions |
| Total initial sync | ~505 | 5 + (500 × 1) |
| Subsequent sync (10% change) | ~55 | 5 + (50 × 1) |
Rate limit safety: At 100 requests/second, full sync completes in ~5 seconds of API time. With dependent_concurrency: 10, wall clock time is ~50 seconds plus network latency.
MR-Specific Considerations
DiffNote Position Metadata
GitLab DiffNotes include position metadata that identifies the file and line where the comment was placed. This is critical for code review context in CP3 document generation.
Position Object Structure (from GitLab API):
{
"position": {
"base_sha": "abc123...", // Base commit SHA for the diff
"start_sha": "def456...", // Start commit SHA for the diff
"head_sha": "ghi789...", // Head commit SHA for the diff
"old_path": "src/auth/login.ts", // File path before rename (if any)
"new_path": "src/auth/login.ts", // Current file path
"position_type": "text", // "text" | "image" | "file"
"old_line": null, // Line number in old version (null for additions)
"new_line": 45, // Line number in new version (null for deletions)
"line_range": { // For multi-line comments (GitLab 13.6+)
"start": { "line_code": "...", "type": "new", "old_line": null, "new_line": 45 },
"end": { "line_code": "...", "type": "new", "old_line": null, "new_line": 48 }
}
}
}
Why Capture SHA Triplet:
The SHA triplet (base_sha, start_sha, head_sha) is essential for future CP3 work:
- Precise diff context: Maps comment to exact commit range being reviewed
- Future file content fetch: Enables fetching file content at the exact commit when the comment was made
- Stale comment detection: Identifies comments on outdated code when source branch is updated
- Zero extra API cost: Already present in discussion response, just needs extraction
Stored Fields:
position_old_path- File path in base version (for renames/deletions)position_new_path- File path in head version (for additions/modifications)position_old_line- Line number in base versionposition_new_line- Line number in head versionposition_type- Type of position: "text", "image", or "file"position_line_range_start- Start line for multi-line commentsposition_line_range_end- End line for multi-line commentsposition_base_sha- Base commit SHA for the diffposition_start_sha- Start commit SHA for the diffposition_head_sha- Head commit SHA for the diff
Display in CLI:
@janedoe (2024-03-16) [src/auth/login.ts:45]:
Should we validate the JWT signature here?
For multi-line comments:
@janedoe (2024-03-16) [src/auth/login.ts:45-48]:
This whole block should use async/await instead of callbacks.
References
- GitLab MR API
- GitLab Discussions API
- GitLab Notes API
- GitLab Pagination
- RFC 8288 - Link Header
- Checkpoint 1 PRD (Issue Ingestion)