# Checkpoint 2: MR Ingestion - PRD > **Note:** The project was renamed from "gitlab-inbox" to "gitlore" and the CLI from "gi" to "lore". References to "gi" in this document should be read as "lore". **Version:** 1.3 **Status:** Ready for Implementation **Depends On:** Checkpoint 1 (Issue Ingestion) **Enables:** Checkpoint 3A (Document Generation + FTS) --- ## Overview ### Objective Ingest all merge requests, MR discussions, and notes (including DiffNote position metadata) from configured GitLab repositories. This checkpoint extends the cursor-based sync pattern established in CP1 to merge requests, capturing code review context that is essential for decision traceability. ### Success Criteria | Criterion | Validation | |-----------|------------| | `gi ingest --type=merge_requests` fetches all MRs | `gi count mrs` matches GitLab UI | | MR labels extracted and linked correctly | Label count per MR matches GitLab | | MR assignees and reviewers captured | Junction tables populated | | MR merge status is future-proof | `detailed_merge_status` populated; no reliance on deprecated `merge_status` | | MR merge actor is future-proof | `merge_user_username` populated when merged/auto-merge set | | Draft MRs are captured | `draft` populated and visible in `gi list mrs` | | Draft is backwards-compatible | Older instances using `work_in_progress` still populate `draft` | | MR state is complete | `state` supports `opened|merged|closed|locked` (no drop/parse failures) | | MR head SHA captured | `head_sha` populated for CP3 diff/commit context | | MR references captured | `references_short` and `references_full` populated for CP3 cross-project display | | MR discussions fetched per-MR (dependent sync) | For MRs whose `updated_at` advanced, discussions and notes upserted | | DiffNote position metadata captured | `position_new_path`, `position_old_path`, `position_type` populated for DiffNotes | | DiffNote SHA triplet captured | `position_base_sha`, `position_start_sha`, `position_head_sha` populated for DiffNotes | | Cursor-based sync is resumable | Re-running fetches 0 new items | | Discussion sync skips unchanged MRs | Per-MR watermark prevents redundant fetches | | Watermark not advanced on partial failure | Incomplete pagination OR parse failures do not mark MR as synced | | Full sync resets discussion watermarks | `--full` flag resets both MR cursor AND discussion watermarks | | Pagination uses robust fallback chain | Link header > x-next-page > full-page heuristic | | MR CLI commands functional | `gi show mr`, `gi count mrs` work | --- ## Internal Gates CP2 is validated incrementally via internal gates: | Gate | Scope | Validation | |------|-------|------------| | **Gate A** | MRs only | Cursor + upsert + raw payloads + list/count working; `locked` state (local filtering), `work_in_progress` fallback, `head_sha` + `references` captured; Link header pagination with fallback chain | | **Gate B** | Labels + assignees + reviewers correct | Junction tables populated; counts match GitLab | | **Gate C** | Dependent discussion sync | Watermark prevents redundant refetch; DiffNote paths + SHA triplet captured; upsert + sweep for notes; watermark does NOT advance on partial pagination failure OR note parse failures; atomic note replacement (parse before delete) | | **Gate D** | Resumability proof | Kill mid-run, rerun; confirm bounded redo; `--full` resets cursor AND discussion watermarks | | **Gate E** | CLI complete | `gi show mr` displays discussions with DiffNote file context | --- ## Deliverables ### 1. Project Structure Additions Add the following to the existing Rust structure from Checkpoint 1: ``` gitlab-inbox/ ├── src/ │ ├── cli/ │ │ └── commands/ │ │ ├── list.rs # Update: add MR listing │ │ ├── show.rs # Update: add MR detail view │ │ └── count.rs # Update: add MR counting │ ├── gitlab/ │ │ ├── types.rs # Add GitLabMergeRequest, GitLabReviewer │ │ └── transformers/ │ │ └── merge_request.rs # NEW: GitLab -> normalized MR │ └── ingestion/ │ ├── merge_requests.rs # NEW: MR fetcher with pagination │ ├── mr_discussions.rs # NEW: MR discussion fetcher │ └── orchestrator.rs # Update: support MR ingestion ├── tests/ │ ├── mr_transformer_tests.rs │ ├── mr_ingestion_tests.rs │ ├── mr_discussion_tests.rs │ ├── diffnote_tests.rs │ └── fixtures/ │ ├── gitlab_merge_request.json │ ├── gitlab_merge_requests_page.json │ ├── gitlab_mr_discussion.json │ ├── gitlab_mr_discussion_with_diffnote.json │ └── gitlab_diffnote_position.json └── migrations/ └── 006_merge_requests.sql ``` ### 2. GitLab API Endpoints **Merge Requests (Bulk Fetch):** ``` GET /projects/:id/merge_requests?scope=all&state=all&updated_after=X&order_by=updated_at&sort=asc&per_page=100 ``` **MR Discussions (Per-MR Fetch):** ``` GET /projects/:id/merge_requests/:iid/discussions?per_page=100&page=N ``` **Required Query Parameters:** - `scope=all` - Include all MRs, not just authored by current user - `state=all` - Include merged/closed MRs (GitLab defaults may exclude them) **Key Differences from Issues:** - MRs have `source_branch` and `target_branch` - MRs have `merge_status`, `merged_by`, `merged_at` fields - MRs have reviewers (separate from assignees) - MR discussions can contain DiffNotes with position metadata **MR State Values:** GitLab MR `state` can be `opened`, `closed`, `merged`, or `locked`. The `locked` state is a **transitional state** that indicates an MR is in the middle of being merged (merge-in-progress). Key considerations: - **Store as first-class:** Persist `locked` in the database without coercion to preserve GitLab's exact state. - **Local filtering only:** The GitLab API's `state` filter does not support `locked` as a query parameter. To filter for locked MRs, we must use `state=all` server-side and filter locally via SQL `WHERE state = 'locked'`. - **Transient nature:** Most MRs in `locked` state will transition to `merged` within seconds/minutes. Expect very few locked MRs in typical queries. - **CLI exposure:** The `--state=locked` filter is available for debugging/inspection but is not expected to be commonly used. **Reviewer Verification (Optional / Debug Mode):** When reviewer data is suspected to be incomplete on a given GitLab version, enable an optional verification mode: ``` GET /projects/:id/merge_requests/:merge_request_iid/reviewers ``` This is disabled by default to avoid N+1 overhead. Activated via config flag `sync.verify_reviewers: true` for debugging problematic instances. **Pagination:** - **Primary:** Parse `Link: ; rel="next"` header (RFC 8288). This is the most reliable signal as it works consistently across GitLab versions and proxy configurations. - **Fallback 1:** If no `Link` header, check `x-next-page` header until empty/absent - **Fallback 2:** If no pagination headers but current page returned `per_page` items, attempt `page + 1` and stop only when an empty response is received. This handles proxies/instances that strip headers. - Per-page maximum: 100 - **Optional keyset mode:** For projects with 10,000+ MRs, enable keyset pagination via config flag `sync.use_keyset_pagination: true`. This uses `pagination=keyset&order_by=updated_at&sort=asc` with cursor tokens from Link headers for O(1) page fetches instead of O(n) offset scans. --- ## Database Schema ### Migration 006_merge_requests.sql ```sql -- Merge requests table CREATE TABLE merge_requests ( id INTEGER PRIMARY KEY, gitlab_id INTEGER UNIQUE NOT NULL, project_id INTEGER NOT NULL REFERENCES projects(id), iid INTEGER NOT NULL, title TEXT, description TEXT, state TEXT, -- 'opened' | 'merged' | 'closed' | 'locked' draft INTEGER NOT NULL DEFAULT 0, -- 0/1 (SQLite boolean) - work-in-progress status author_username TEXT, source_branch TEXT, target_branch TEXT, head_sha TEXT, -- Current commit SHA at head of source branch (CP3-ready) references_short TEXT, -- Short reference e.g. "!123" (CP3-ready for display) references_full TEXT, -- Full reference e.g. "group/project!123" (CP3-ready for cross-project) detailed_merge_status TEXT, -- preferred, non-deprecated (replaces merge_status) merge_user_username TEXT, -- preferred over deprecated merged_by created_at INTEGER, -- ms epoch UTC updated_at INTEGER, -- ms epoch UTC merged_at INTEGER, -- ms epoch UTC (NULL if not merged) closed_at INTEGER, -- ms epoch UTC (NULL if not closed) last_seen_at INTEGER NOT NULL, -- ms epoch UTC, updated on every upsert -- Prevents re-fetching discussions on cursor rewind / reruns unless MR changed. discussions_synced_for_updated_at INTEGER, -- Sync health telemetry for debuggability discussions_sync_last_attempt_at INTEGER, -- ms epoch UTC of last sync attempt discussions_sync_attempts INTEGER DEFAULT 0, -- count of sync attempts for this MR version discussions_sync_last_error TEXT, -- last error message if sync failed web_url TEXT, raw_payload_id INTEGER REFERENCES raw_payloads(id) ); CREATE INDEX idx_mrs_project_updated ON merge_requests(project_id, updated_at); CREATE INDEX idx_mrs_author ON merge_requests(author_username); CREATE INDEX idx_mrs_target_branch ON merge_requests(project_id, target_branch); CREATE INDEX idx_mrs_source_branch ON merge_requests(project_id, source_branch); CREATE INDEX idx_mrs_state ON merge_requests(project_id, state); CREATE INDEX idx_mrs_detailed_merge_status ON merge_requests(project_id, detailed_merge_status); CREATE INDEX idx_mrs_draft ON merge_requests(project_id, draft); CREATE INDEX idx_mrs_discussions_sync ON merge_requests(project_id, discussions_synced_for_updated_at); CREATE UNIQUE INDEX uq_mrs_project_iid ON merge_requests(project_id, iid); -- MR-Label junction (reuses labels table from CP1) CREATE TABLE mr_labels ( merge_request_id INTEGER REFERENCES merge_requests(id) ON DELETE CASCADE, label_id INTEGER REFERENCES labels(id) ON DELETE CASCADE, PRIMARY KEY(merge_request_id, label_id) ); CREATE INDEX idx_mr_labels_label ON mr_labels(label_id); -- MR assignees (same pattern as issue_assignees) CREATE TABLE mr_assignees ( merge_request_id INTEGER REFERENCES merge_requests(id) ON DELETE CASCADE, username TEXT NOT NULL, PRIMARY KEY(merge_request_id, username) ); CREATE INDEX idx_mr_assignees_username ON mr_assignees(username); -- MR reviewers (MR-specific, not applicable to issues) CREATE TABLE mr_reviewers ( merge_request_id INTEGER REFERENCES merge_requests(id) ON DELETE CASCADE, username TEXT NOT NULL, PRIMARY KEY(merge_request_id, username) ); CREATE INDEX idx_mr_reviewers_username ON mr_reviewers(username); -- Add FK constraint to discussions table for merge_request_id -- Note: SQLite doesn't support ADD CONSTRAINT, the FK was defined in CP1 but nullable -- We just need to add an index if not already present CREATE INDEX IF NOT EXISTS idx_discussions_mr_id ON discussions(merge_request_id); CREATE INDEX IF NOT EXISTS idx_discussions_mr_resolved ON discussions(merge_request_id, resolved, resolvable); -- Additional indexes for DiffNote queries (notes table from CP1) -- These composite indexes enable efficient file-context queries for CP3 CREATE INDEX IF NOT EXISTS idx_notes_type ON notes(note_type); CREATE INDEX IF NOT EXISTS idx_notes_new_path ON notes(position_new_path); CREATE INDEX IF NOT EXISTS idx_notes_new_path_line ON notes(position_new_path, position_new_line); CREATE INDEX IF NOT EXISTS idx_notes_old_path_line ON notes(position_old_path, position_old_line); -- CP2: capture richer diff note position shapes (minimal, still MVP) -- These fields support modern GitLab diff note semantics without full diff reconstruction ALTER TABLE notes ADD COLUMN position_type TEXT; -- 'text' | 'image' | 'file' ALTER TABLE notes ADD COLUMN position_line_range_start INTEGER; -- multi-line comment start ALTER TABLE notes ADD COLUMN position_line_range_end INTEGER; -- multi-line comment end -- DiffNote SHA triplet for commit context (CP3-ready, zero extra API cost) ALTER TABLE notes ADD COLUMN position_base_sha TEXT; -- Base commit SHA for diff ALTER TABLE notes ADD COLUMN position_start_sha TEXT; -- Start commit SHA for diff ALTER TABLE notes ADD COLUMN position_head_sha TEXT; -- Head commit SHA for diff -- Update schema version INSERT INTO schema_version (version, applied_at, description) VALUES (6, strftime('%s', 'now') * 1000, 'Merge requests, MR labels, assignees, reviewers'); ``` --- ## GitLab Types ### Type Definitions ```rust // src/gitlab/types.rs (additions) use serde::Deserialize; /// GitLab merge request from the API. /// Note: Uses non-deprecated field names where possible (detailed_merge_status, merge_user). /// Falls back gracefully for older GitLab versions. #[derive(Debug, Clone, Deserialize)] pub struct GitLabMergeRequest { pub id: i64, // GitLab global ID pub iid: i64, // Project-scoped MR number pub project_id: i64, pub title: String, pub description: Option, pub state: String, // "opened" | "merged" | "closed" | "locked" #[serde(default)] pub draft: bool, // Work-in-progress status (preferred) #[serde(default)] pub work_in_progress: bool, // Deprecated; fallback for older instances pub source_branch: String, pub target_branch: String, pub sha: Option, // Current commit SHA at head of source branch (CP3-ready) pub references: Option, // Short and full reference strings (CP3-ready) // Prefer detailed_merge_status (non-deprecated) over merge_status (deprecated) pub detailed_merge_status: Option, // "mergeable" | "not_mergeable" | "checking" | etc. #[serde(alias = "merge_status")] pub merge_status_legacy: Option, // Keep for older/self-managed versions pub created_at: String, // ISO 8601 pub updated_at: String, // ISO 8601 pub merged_at: Option, // ISO 8601 pub closed_at: Option, // ISO 8601 pub author: GitLabAuthor, // Prefer merge_user (current) over merged_by (deprecated) pub merge_user: Option, pub merged_by: Option, // Keep for older/self-managed versions #[serde(default)] pub labels: Vec, // Array of label names #[serde(default)] pub assignees: Vec, #[serde(default)] pub reviewers: Vec, pub web_url: String, } /// GitLab references object (short and full reference strings). #[derive(Debug, Clone, Deserialize)] pub struct GitLabReferences { pub short: String, // e.g. "!123" pub full: String, // e.g. "group/project!123" } /// GitLab author from the API. #[derive(Debug, Clone, Deserialize)] pub struct GitLabAuthor { pub id: i64, pub username: String, pub name: String, } /// GitLab reviewer (can have approval state). #[derive(Debug, Clone, Deserialize)] pub struct GitLabReviewer { pub id: i64, pub username: String, pub name: String, // Note: approval state may require additional API call, defer to post-MVP } // Note: GitLabDiscussion and GitLabNote types already exist from CP1 // and support MR discussions with DiffNote position metadata. ``` --- ## Transformers ### Merge Request Transformer ```rust // src/gitlab/transformers/merge_request.rs use crate::core::time::{iso_to_ms, now_ms}; use crate::gitlab::types::GitLabMergeRequest; /// Normalized merge request ready for database insertion. #[derive(Debug, Clone)] pub struct NormalizedMergeRequest { pub gitlab_id: i64, pub project_id: i64, // Local DB project ID pub iid: i64, pub title: String, pub description: Option, pub state: String, pub draft: bool, pub author_username: String, pub source_branch: String, pub target_branch: String, pub head_sha: Option, // CP3-ready: current commit at source branch head pub references_short: Option, // CP3-ready: e.g. "!123" pub references_full: Option, // CP3-ready: e.g. "group/project!123" pub detailed_merge_status: Option, pub merge_user_username: Option, pub created_at: i64, // ms epoch pub updated_at: i64, // ms epoch pub merged_at: Option, // ms epoch pub closed_at: Option, // ms epoch pub last_seen_at: i64, // ms epoch pub web_url: String, } /// Normalized label ready for database insertion. /// Reuses structure from issue transformer. #[derive(Debug, Clone)] pub struct NormalizedLabel { pub project_id: i64, pub name: String, } /// Result of transforming a GitLab MR. #[derive(Debug, Clone)] pub struct MergeRequestWithMetadata { pub merge_request: NormalizedMergeRequest, pub label_names: Vec, pub assignee_usernames: Vec, pub reviewer_usernames: Vec, } /// Transform GitLab merge request to normalized schema. pub fn transform_merge_request( gitlab_mr: &GitLabMergeRequest, local_project_id: i64, ) -> Result { // Parse timestamps, return error if invalid (strict parsing, no silent zeroing) let created_at = iso_to_ms(&gitlab_mr.created_at) .ok_or_else(|| format!("Invalid created_at: {}", gitlab_mr.created_at))?; let updated_at = iso_to_ms(&gitlab_mr.updated_at) .ok_or_else(|| format!("Invalid updated_at: {}", gitlab_mr.updated_at))?; let merged_at = gitlab_mr .merged_at .as_ref() .and_then(|s| iso_to_ms(s)); let closed_at = gitlab_mr .closed_at .as_ref() .and_then(|s| iso_to_ms(s)); // Prefer merge_user (current) over merged_by (deprecated) // Both are bools; draft takes precedence if true, otherwise fall back to work_in_progress let is_draft = gitlab_mr.draft || gitlab_mr.work_in_progress; // Extract references (CP3-ready) let (references_short, references_full) = gitlab_mr .references .as_ref() .map(|r| (Some(r.short.clone()), Some(r.full.clone()))) .unwrap_or((None, None)); let merge_request = NormalizedMergeRequest { gitlab_id: gitlab_mr.id, project_id: local_project_id, iid: gitlab_mr.iid, title: gitlab_mr.title.clone(), description: gitlab_mr.description.clone(), state: gitlab_mr.state.clone(), draft: is_draft, author_username: gitlab_mr.author.username.clone(), source_branch: gitlab_mr.source_branch.clone(), target_branch: gitlab_mr.target_branch.clone(), head_sha: gitlab_mr.sha.clone(), references_short, references_full, detailed_merge_status: gitlab_mr .detailed_merge_status .clone() .or_else(|| gitlab_mr.merge_status_legacy.clone()), merge_user_username: gitlab_mr .merge_user .as_ref() .map(|u| u.username.clone()) .or_else(|| gitlab_mr.merged_by.as_ref().map(|u| u.username.clone())), created_at, updated_at, merged_at, closed_at, last_seen_at: now_ms(), web_url: gitlab_mr.web_url.clone(), }; let label_names = gitlab_mr.labels.clone(); let assignee_usernames = gitlab_mr .assignees .iter() .map(|u| u.username.clone()) .collect(); let reviewer_usernames = gitlab_mr .reviewers .iter() .map(|u| u.username.clone()) .collect(); Ok(MergeRequestWithMetadata { merge_request, label_names, assignee_usernames, reviewer_usernames, }) } /// Extract labels from GitLab MR (name-only, same as issues). pub fn extract_labels(gitlab_mr: &GitLabMergeRequest, local_project_id: i64) -> Vec { gitlab_mr .labels .iter() .map(|name| NormalizedLabel { project_id: local_project_id, name: name.clone(), }) .collect() } ``` ### MR Discussion Transformer ```rust // src/gitlab/transformers/discussion.rs (additions) /// Transform GitLab discussion to normalized schema for MR context. /// Note: The core transform_discussion and transform_notes functions /// from CP1 are reused. This adds MR-specific handling. pub fn transform_mr_discussion( gitlab_discussion: &GitLabDiscussion, local_project_id: i64, local_mr_id: i64, ) -> NormalizedDiscussion { let note_times: Vec = gitlab_discussion .notes .iter() .filter_map(|n| iso_to_ms(&n.created_at)) .collect(); // Check if any note is resolvable (MR discussions often are) let resolvable = gitlab_discussion.notes.iter().any(|n| n.resolvable); let resolved = resolvable && gitlab_discussion .notes .iter() .all(|n| !n.resolvable || n.resolved); NormalizedDiscussion { gitlab_discussion_id: gitlab_discussion.id.clone(), project_id: local_project_id, issue_id: None, // Not an issue discussion merge_request_id: Some(local_mr_id), noteable_type: "MergeRequest".to_string(), individual_note: gitlab_discussion.individual_note, first_note_at: note_times.iter().min().copied(), last_note_at: note_times.iter().max().copied(), last_seen_at: now_ms(), resolvable, resolved, } } /// Transform GitLab notes with DiffNote position extraction. /// Captures file path and line metadata for code review comments. /// /// Returns Result to enforce strict timestamp parsing - corrupted timestamps /// (zero values) would break cursor logic and time-series ordering. pub fn transform_notes_with_diff_position( gitlab_discussion: &GitLabDiscussion, local_project_id: i64, ) -> Result, String> { gitlab_discussion .notes .iter() .enumerate() .map(|(index, note)| { // Strict timestamp parsing - no silent zeroing let created_at = iso_to_ms(¬e.created_at) .ok_or_else(|| format!( "Invalid note.created_at for note {}: {}", note.id, note.created_at ))?; let updated_at = iso_to_ms(¬e.updated_at) .ok_or_else(|| format!( "Invalid note.updated_at for note {}: {}", note.id, note.updated_at ))?; // Extract DiffNote position metadata if present // Includes position_type, line_range, and SHA triplet for modern GitLab diff note shapes let (old_path, new_path, old_line, new_line, position_type, lr_start, lr_end, base_sha, start_sha, head_sha) = note .position .as_ref() .map(|pos| ( pos.old_path.clone(), pos.new_path.clone(), pos.old_line, pos.new_line, pos.position_type.clone(), pos.line_range.as_ref().map(|r| r.start_line), pos.line_range.as_ref().map(|r| r.end_line), pos.base_sha.clone(), pos.start_sha.clone(), pos.head_sha.clone(), )) .unwrap_or((None, None, None, None, None, None, None, None, None, None)); Ok(NormalizedNote { gitlab_id: note.id, project_id: local_project_id, note_type: note.note_type.clone(), is_system: note.system, author_username: note.author.username.clone(), body: note.body.clone(), created_at, updated_at, last_seen_at: now_ms(), position: index as i32, resolvable: note.resolvable, resolved: note.resolved, resolved_by: note.resolved_by.as_ref().map(|a| a.username.clone()), resolved_at: note.resolved_at.as_ref().and_then(|s| iso_to_ms(s)), // DiffNote position metadata position_old_path: old_path, position_new_path: new_path, position_old_line: old_line, position_new_line: new_line, // Extended position metadata for modern GitLab position_type, position_line_range_start: lr_start, position_line_range_end: lr_end, // SHA triplet for commit context (CP3-ready) position_base_sha: base_sha, position_start_sha: start_sha, position_head_sha: head_sha, }) }) .collect() } ``` --- ## GitLab Client Additions ### Pagination with Async Streams ```rust // src/gitlab/client.rs (additions) /// A page of merge requests with pagination metadata. #[derive(Debug)] pub struct MergeRequestPage { pub items: Vec, pub next_page: Option, pub is_last_page: bool, } impl GitLabClient { /// Parse Link header to extract rel="next" URL (RFC 8288). /// Returns Some(url) if a next page link exists, None otherwise. fn parse_link_header_next(headers: &reqwest::header::HeaderMap) -> Option { headers .get("link") .and_then(|v| v.to_str().ok()) .and_then(|link_str| { // Parse Link header format: ; rel="next", ; rel="last" for part in link_str.split(',') { let part = part.trim(); if part.contains("rel=\"next\"") || part.contains("rel=next") { // Extract URL between < and > if let Some(start) = part.find('<') { if let Some(end) = part.find('>') { return Some(part[start + 1..end].to_string()); } } } } None }) } /// Fetch a single page of merge requests for a project. /// Returns the items plus pagination metadata for page-aware cursor updates. pub async fn fetch_merge_requests_page( &self, gitlab_project_id: i64, updated_after: Option, cursor_rewind_seconds: u32, page: u32, per_page: u32, ) -> Result { let mut params = vec![ ("scope", "all".to_string()), ("state", "all".to_string()), ("order_by", "updated_at".to_string()), ("sort", "asc".to_string()), ("per_page", per_page.to_string()), ("page", page.to_string()), ]; if let Some(ts) = updated_after { // Apply cursor rewind for safety, clamping to 0 to avoid underflow let rewind_ms = (cursor_rewind_seconds as i64) * 1000; let rewound = (ts - rewind_ms).max(0); if let Some(dt) = chrono::DateTime::from_timestamp_millis(rewound) { params.push(("updated_after", dt.to_rfc3339())); } } let (items, headers) = self .request_with_headers::>( &format!("/api/v4/projects/{gitlab_project_id}/merge_requests"), ¶ms, ) .await?; // Parse pagination with fallback chain: // 1. Link header (most reliable, RFC 8288 compliant) // 2. x-next-page header (GitLab-specific) // 3. Full page heuristic (fallback for proxies that strip headers) let link_next = parse_link_header_next(&headers); let x_next_page = headers .get("x-next-page") .and_then(|v| v.to_str().ok()) .and_then(|s| s.parse::().ok()); let next_page = match (link_next, x_next_page, items.len() as u32 == per_page) { (Some(_), _, _) => Some(page + 1), // Link header present: continue (None, Some(np), _) => Some(np), // x-next-page present: use it (None, None, true) => Some(page + 1), // Full page, no headers: try next (None, None, false) => None, // Partial page: we're done }; let is_last_page = items.is_empty() || next_page.is_none(); Ok(MergeRequestPage { items, next_page, is_last_page, }) } /// Paginate through merge requests for a project. /// Returns a stream of MRs that handles pagination automatically. /// Note: For page-aware cursor updates, prefer fetch_merge_requests_page directly. pub fn paginate_merge_requests( &self, gitlab_project_id: i64, updated_after: Option, cursor_rewind_seconds: u32, ) -> Pin> + Send + '_>> { Box::pin(async_stream::try_stream! { let mut page = 1u32; let per_page = 100u32; loop { let page_result = self.fetch_merge_requests_page( gitlab_project_id, updated_after, cursor_rewind_seconds, page, per_page, ).await?; for mr in page_result.items { yield mr; } if page_result.is_last_page { break; } if let Some(np) = page_result.next_page { page = np; } else { break; } } }) } /// Paginate through discussions for a merge request. pub fn paginate_mr_discussions( &self, gitlab_project_id: i64, mr_iid: i64, ) -> Pin> + Send + '_>> { Box::pin(async_stream::try_stream! { let mut page = 1u32; let per_page = 100u32; loop { let params = vec![ ("per_page", per_page.to_string()), ("page", page.to_string()), ]; let (discussions, headers) = self .request_with_headers::>( &format!("/api/v4/projects/{gitlab_project_id}/merge_requests/{mr_iid}/discussions"), ¶ms, ) .await?; for discussion in discussions.iter() { yield discussion.clone(); } // Robust fallback if pagination headers are missing let next_page_hdr = headers .get("x-next-page") .and_then(|v| v.to_str().ok()) .and_then(|s| s.parse::().ok()); let next_page = match (next_page_hdr, discussions.len() as u32 == per_page) { (Some(np), _) => Some(np), // Header present: trust it (None, true) => Some(page + 1), // Full page, no header: try next (None, false) => None, // Partial page: we're done }; match next_page { Some(np) if !discussions.is_empty() => page = np, _ => break, } } }) } } ``` --- ## Orchestration: Dependent Discussion Sync ### Canonical Pattern (CP2) When `gi ingest --type=merge_requests` runs, it follows this orchestration (mirroring CP1 pattern): 1. **Ingest merge requests** (cursor-based, with incremental cursor updates per page) 2. **Select MRs needing discussion sync (DB-driven)** - After MR ingestion completes, query: ```sql SELECT id, iid, updated_at FROM merge_requests WHERE project_id = ? AND (discussions_synced_for_updated_at IS NULL OR updated_at > discussions_synced_for_updated_at) ORDER BY updated_at ASC; ``` This is more scalable and debuggable than in-memory collection. The DB is the source of truth. 3. **Execute discussion sync** with bounded concurrency (`dependent_concurrency` from config) 4. **Update watermark** - After each MR's discussions are successfully ingested *with no partial failures*: ```sql UPDATE merge_requests SET discussions_synced_for_updated_at = ? WHERE id = ? ``` **Invariant:** A rerun MUST NOT refetch discussions for MRs whose `updated_at` has not advanced, even with cursor rewind. Watermark is NOT advanced if any failure occurs (HTTP pagination failure OR note parse failure). --- ## Ingestion Logic ### MR Ingestion ```rust // src/ingestion/merge_requests.rs use rusqlite::Connection; use tracing::{debug, info}; use crate::core::config::Config; use crate::core::error::Result; use crate::core::payloads::store_payload; use crate::core::time::now_ms; use crate::gitlab::client::GitLabClient; use crate::gitlab::transformers::merge_request::{transform_merge_request, extract_labels}; /// Result of MR ingestion. #[derive(Debug, Default)] pub struct IngestMergeRequestsResult { pub fetched: usize, pub upserted: usize, pub labels_created: usize, pub assignees_linked: usize, pub reviewers_linked: usize, // Discussion sync is DB-driven, not in-memory collection. // Use query: SELECT id, iid FROM merge_requests WHERE updated_at > discussions_synced_for_updated_at // This avoids memory growth for large projects. } /// Info needed to sync discussions for an MR. #[derive(Debug, Clone)] pub struct MrForDiscussionSync { pub local_mr_id: i64, pub iid: i64, pub updated_at: i64, } /// Result of upserting a merge request, including previous sync state. #[derive(Debug)] pub struct UpsertMergeRequestResult { pub local_mr_id: i64, pub changes: usize, pub previous_discussions_synced_at: Option, } /// Ingest merge requests for a project using page-based cursor updates. /// Discussion sync eligibility is determined via DB query AFTER ingestion completes. pub async fn ingest_merge_requests( conn: &Connection, client: &GitLabClient, config: &Config, project_id: i64, // Local DB project ID gitlab_project_id: i64, // GitLab project ID full_sync: bool, // Reset cursor if true ) -> Result { let mut result = IngestMergeRequestsResult::default(); // Reset cursor if full sync requested if full_sync { reset_cursor(conn, project_id, "merge_requests")?; // Also reset discussion watermarks to force re-fetch of all discussions // This ensures --full truly fetches everything, not just MRs reset_discussion_watermarks(conn, project_id)?; } // Get current cursor let cursor = get_cursor(conn, project_id)?; let cursor_updated_at = cursor.0; let cursor_gitlab_id = cursor.1; let mut last_updated_at: Option = None; let mut last_gitlab_id: Option = None; let mut page = 1u32; let per_page = 100u32; // Page-based iteration for proper cursor boundary tracking loop { let page_result = client.fetch_merge_requests_page( gitlab_project_id, cursor_updated_at, config.sync.cursor_rewind_seconds, page, per_page, ).await?; if page_result.items.is_empty() { break; } for mr in &page_result.items { result.fetched += 1; let mr_updated_at = crate::core::time::iso_to_ms(&mr.updated_at) .ok_or_else(|| crate::core::error::GiError::ParseError { field: "updated_at".to_string(), value: mr.updated_at.clone(), })?; // Apply cursor filtering for tuple semantics if let (Some(cursor_ts), Some(cursor_id)) = (cursor_updated_at, cursor_gitlab_id) { if mr_updated_at < cursor_ts { continue; } if mr_updated_at == cursor_ts && mr.id <= cursor_id { continue; } } // Begin transaction for this MR (atomicity + performance) let tx = conn.unchecked_transaction()?; // Store raw payload let payload_id = store_payload( &tx, project_id, "merge_request", &mr.id.to_string(), &mr, config.storage.compress_raw_payloads, )?; // Transform and upsert MR let transformed = transform_merge_request(&mr, project_id) .map_err(|e| crate::core::error::GiError::ParseError { field: "merge_request".to_string(), value: e, })?; let upsert_result = upsert_merge_request( &tx, &transformed.merge_request, payload_id, )?; if upsert_result.changes > 0 { result.upserted += 1; } let local_mr_id = upsert_result.local_mr_id; // Discussion sync eligibility is determined AFTER ingestion via DB query: // SELECT id, iid FROM merge_requests WHERE updated_at > discussions_synced_for_updated_at // This avoids memory growth for large projects. // Clear existing label links (ensures removed labels are unlinked) clear_mr_labels(&tx, local_mr_id)?; // Extract and upsert labels (name-only, same as issues) let labels = extract_labels(&mr, project_id); for label in &labels { let created = upsert_label(&tx, label)?; if created { result.labels_created += 1; } // Link MR to label let label_id = get_label_id(&tx, project_id, &label.name)?; link_mr_label(&tx, local_mr_id, label_id)?; } // Clear and relink assignees clear_mr_assignees(&tx, local_mr_id)?; for username in &transformed.assignee_usernames { upsert_mr_assignee(&tx, local_mr_id, username)?; result.assignees_linked += 1; } // Clear and relink reviewers clear_mr_reviewers(&tx, local_mr_id)?; for username in &transformed.reviewer_usernames { upsert_mr_reviewer(&tx, local_mr_id, username)?; result.reviewers_linked += 1; } tx.commit()?; // Track for cursor update last_updated_at = Some(mr_updated_at); last_gitlab_id = Some(mr.id); } // Page-boundary cursor flush (not based on fetched % 100) // This ensures cursor reflects actual processed page boundaries if let (Some(updated_at), Some(gitlab_id)) = (last_updated_at, last_gitlab_id) { update_cursor(conn, project_id, "merge_requests", updated_at, gitlab_id)?; } // Check for more pages if page_result.is_last_page { break; } match page_result.next_page { Some(np) => page = np, None => break, } } // Final cursor update (in case we exited mid-loop) if let (Some(updated_at), Some(gitlab_id)) = (last_updated_at, last_gitlab_id) { update_cursor(conn, project_id, "merge_requests", updated_at, gitlab_id)?; } info!( project_id, fetched = result.fetched, upserted = result.upserted, labels_created = result.labels_created, assignees_linked = result.assignees_linked, reviewers_linked = result.reviewers_linked, "MR ingestion complete" ); Ok(result) } /// Upsert a merge request and return sync state for inline discussion decision. fn upsert_merge_request( conn: &Connection, mr: &NormalizedMergeRequest, payload_id: Option, ) -> Result { // Perform upsert let changes = conn.execute( "INSERT INTO merge_requests ( gitlab_id, project_id, iid, title, description, state, draft, author_username, source_branch, target_branch, head_sha, references_short, references_full, detailed_merge_status, merge_user_username, created_at, updated_at, merged_at, closed_at, last_seen_at, web_url, raw_payload_id ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22) ON CONFLICT(gitlab_id) DO UPDATE SET title = excluded.title, description = excluded.description, state = excluded.state, draft = excluded.draft, head_sha = excluded.head_sha, references_short = excluded.references_short, references_full = excluded.references_full, detailed_merge_status = excluded.detailed_merge_status, merge_user_username = excluded.merge_user_username, updated_at = excluded.updated_at, merged_at = excluded.merged_at, closed_at = excluded.closed_at, last_seen_at = excluded.last_seen_at, raw_payload_id = excluded.raw_payload_id", rusqlite::params![ mr.gitlab_id, mr.project_id, mr.iid, mr.title, mr.description, mr.state, mr.draft as i32, mr.author_username, mr.source_branch, mr.target_branch, mr.head_sha, mr.references_short, mr.references_full, mr.detailed_merge_status, mr.merge_user_username, mr.created_at, mr.updated_at, mr.merged_at, mr.closed_at, mr.last_seen_at, mr.web_url, payload_id, ], )?; // Get local ID (either existing or newly inserted) let local_mr_id = conn.query_row( "SELECT id FROM merge_requests WHERE gitlab_id = ?", [mr.gitlab_id], |row| row.get(0), ).expect("MR must exist after upsert"); Ok(UpsertMergeRequestResult { local_mr_id, changes, previous_discussions_synced_at: None, }) } // Database helper functions follow the same pattern as issues.rs // (get_cursor, update_cursor, reset_cursor, upsert_merge_request, etc.) /// Reset discussion watermarks for all MRs in a project. /// Called when --full sync is requested to force re-fetch of all discussions. fn reset_discussion_watermarks(conn: &Connection, project_id: i64) -> Result<()> { conn.execute( "UPDATE merge_requests SET discussions_synced_for_updated_at = NULL, discussions_sync_attempts = 0, discussions_sync_last_error = NULL WHERE project_id = ?", [project_id], )?; Ok(()) } /// Record sync health telemetry for debugging failed discussion syncs. fn record_sync_health_error(conn: &Connection, mr_id: i64, error: &str) -> Result<()> { conn.execute( "UPDATE merge_requests SET discussions_sync_last_attempt_at = ?, discussions_sync_attempts = COALESCE(discussions_sync_attempts, 0) + 1, discussions_sync_last_error = ? WHERE id = ?", rusqlite::params![now_ms(), error, mr_id], )?; Ok(()) } /// Clear sync health error on successful sync. fn clear_sync_health_error(conn: &Connection, mr_id: i64) -> Result<()> { conn.execute( "UPDATE merge_requests SET discussions_sync_last_attempt_at = ?, discussions_sync_attempts = 0, discussions_sync_last_error = NULL WHERE id = ?", rusqlite::params![now_ms(), mr_id], )?; Ok(()) } ``` ### MR Discussion Ingestion ```rust // src/ingestion/mr_discussions.rs use futures::StreamExt; use rusqlite::Connection; use tracing::{debug, warn}; use crate::core::config::Config; use crate::core::error::Result; use crate::core::payloads::store_payload; use crate::core::time::now_ms; use crate::gitlab::client::GitLabClient; use crate::gitlab::transformers::discussion::{ transform_mr_discussion, transform_notes_with_diff_position, }; /// Result of discussion ingestion for a single MR. #[derive(Debug, Default)] pub struct IngestMrDiscussionsResult { pub discussions_fetched: usize, pub discussions_upserted: usize, pub notes_upserted: usize, pub notes_skipped_bad_timestamp: usize, pub system_notes_count: usize, pub diffnotes_count: usize, pub pagination_succeeded: bool, } /// Ingest discussions for a single MR. /// Called only when mr.updated_at > discussions_synced_for_updated_at. /// /// CRITICAL INVARIANTS: /// 1. Watermark is ONLY advanced if pagination completes successfully AND all notes parse successfully. /// 2. Notes are parsed BEFORE any destructive DB operations to prevent data loss. /// 3. Stale discussions are removed via last_seen_at sweep (not in-memory ID collection). pub async fn ingest_mr_discussions( conn: &Connection, client: &GitLabClient, config: &Config, project_id: i64, gitlab_project_id: i64, mr_iid: i64, local_mr_id: i64, mr_updated_at: i64, ) -> Result { let mut result = IngestMrDiscussionsResult { pagination_succeeded: true, // Assume success, set false on error ..Default::default() }; // Record sync start time for last_seen_at sweep let run_seen_at = now_ms(); let mut stream = client.paginate_mr_discussions(gitlab_project_id, mr_iid); while let Some(discussion_result) = stream.next().await { let discussion = match discussion_result { Ok(d) => d, Err(e) => { // Log error and mark pagination as failed warn!( project_id, mr_iid, error = %e, "Error fetching MR discussion page" ); result.pagination_succeeded = false; break; } }; result.discussions_fetched += 1; // CRITICAL: Parse/transform notes BEFORE any destructive DB operations. // If parsing fails, we preserve prior data and treat as partial failure (no watermark advance). let notes = match transform_notes_with_diff_position(&discussion, project_id) { Ok(notes) => notes, Err(e) => { warn!( project_id, mr_iid, discussion_id = %discussion.id, error = %e, "Note transform failed; preserving existing notes; MR watermark will NOT advance" ); result.notes_skipped_bad_timestamp += discussion.notes.len(); result.pagination_succeeded = false; continue; } }; // Begin transaction for this discussion (only AFTER parsing succeeded) let tx = conn.unchecked_transaction()?; // Store raw payload for discussion let discussion_payload_id = store_payload( &tx, project_id, "discussion", &discussion.id, &discussion, config.storage.compress_raw_payloads, )?; // Transform and upsert discussion (MR context) with run_seen_at timestamp let mut normalized = transform_mr_discussion(&discussion, project_id, local_mr_id); normalized.last_seen_at = run_seen_at; upsert_discussion(&tx, &normalized, discussion_payload_id)?; result.discussions_upserted += 1; // Get local discussion ID let local_discussion_id = get_local_discussion_id(&tx, project_id, &discussion.id)?; // Upsert notes instead of delete-all-then-insert // This reduces write amplification and preserves any local metadata for note in ¬es { // Selective raw payload storage for notes: // - Store for DiffNotes (position metadata useful for file-context queries) // - Store for non-system notes (user-generated content) // - Skip for system notes (numerous, low-value, discussion payload has them) let should_store_note_payload = !note.is_system() || note.position_new_path().is_some() || note.position_old_path().is_some(); let note_payload_id = if should_store_note_payload { let gitlab_note = discussion.notes.iter().find(|n| n.id() == note.gitlab_id()); if let Some(gn) = gitlab_note { store_payload( &tx, project_id, "note", ¬e.gitlab_id().to_string(), gn, config.storage.compress_raw_payloads, )? } else { None } } else { None }; // Upsert note with run_seen_at timestamp for sweep upsert_note(&tx, local_discussion_id, note, note_payload_id, run_seen_at)?; result.notes_upserted += 1; if note.is_system() { result.system_notes_count += 1; } // Count DiffNotes if note.position_new_path().is_some() || note.position_old_path().is_some() { result.diffnotes_count += 1; } } tx.commit()?; } // Remove stale discussions AND notes via last_seen_at sweep (only if pagination completed successfully) // This is simpler and more scalable than collecting seen IDs in memory if result.pagination_succeeded { // Sweep stale discussions conn.execute( "DELETE FROM discussions WHERE project_id = ? AND merge_request_id = ? AND last_seen_at < ?", rusqlite::params![project_id, local_mr_id, run_seen_at], )?; // Sweep stale notes for this MR's discussions // Notes inherit staleness from their discussion, but we also sweep at note level // for cases where notes are removed from an existing discussion conn.execute( "DELETE FROM notes WHERE discussion_id IN ( SELECT id FROM discussions WHERE project_id = ? AND merge_request_id = ? ) AND last_seen_at < ?", rusqlite::params![project_id, local_mr_id, run_seen_at], )?; } // CRITICAL: Only advance watermark if pagination succeeded completely AND no parse failures. // If we advance on partial failure, we permanently lose data for this MR version. if result.pagination_succeeded { mark_discussions_synced(conn, local_mr_id, mr_updated_at)?; // Clear sync health error on success clear_sync_health_error(conn, local_mr_id)?; } else { // Record sync health telemetry for debugging record_sync_health_error( conn, local_mr_id, "Pagination incomplete or parse failure; will retry on next sync" )?; warn!( project_id, mr_iid, local_mr_id, "Discussion sync incomplete; watermark NOT advanced (will retry on next sync)" ); } debug!( project_id, mr_iid, discussions = result.discussions_fetched, notes = result.notes_upserted, diffnotes = result.diffnotes_count, pagination_succeeded = result.pagination_succeeded, "MR discussions ingested" ); Ok(result) } fn mark_discussions_synced(conn: &Connection, mr_id: i64, mr_updated_at: i64) -> Result<()> { conn.execute( "UPDATE merge_requests SET discussions_synced_for_updated_at = ? WHERE id = ?", rusqlite::params![mr_updated_at, mr_id], )?; Ok(()) } /// Record sync health telemetry for debugging failed discussion syncs. fn record_sync_health_error(conn: &Connection, mr_id: i64, error: &str) -> Result<()> { conn.execute( "UPDATE merge_requests SET discussions_sync_last_attempt_at = ?, discussions_sync_attempts = COALESCE(discussions_sync_attempts, 0) + 1, discussions_sync_last_error = ? WHERE id = ?", rusqlite::params![now_ms(), error, mr_id], )?; Ok(()) } /// Clear sync health error on successful sync. fn clear_sync_health_error(conn: &Connection, mr_id: i64) -> Result<()> { conn.execute( "UPDATE merge_requests SET discussions_sync_last_attempt_at = ?, discussions_sync_attempts = 0, discussions_sync_last_error = NULL WHERE id = ?", rusqlite::params![now_ms(), mr_id], )?; Ok(()) } ``` --- ## CLI Commands ### `gi ingest --type=merge_requests` Fetch and store all MRs from configured projects. **Implementation Updates:** ```rust // src/cli/commands/ingest.rs /// Run the ingest command. pub async fn run_ingest( config: &Config, resource_type: &str, project_filter: Option<&str>, force: bool, full: bool, ) -> Result { match resource_type { "issues" => { // Existing CP1 implementation run_issue_ingest(config, project_filter, force, full).await } "merge_requests" => { // NEW: CP2 implementation run_mr_ingest(config, project_filter, force, full).await } _ => Err(GiError::InvalidArgument { name: "type".to_string(), value: resource_type.to_string(), expected: "issues or merge_requests".to_string(), }), } } ``` **Output:** ``` Ingesting merge requests... group/project-one: 567 MRs fetched, 12 new labels, 89 assignees, 45 reviewers Fetching discussions (123 MRs with updates)... group/project-one: 123 MRs -> 456 discussions, 1,234 notes (89 DiffNotes) Total: 567 MRs, 456 discussions, 1,234 notes (excluding 2,345 system notes) Skipped discussion sync for 444 unchanged MRs. ``` ### `gi list mrs` **Output:** ``` Merge Requests (showing 20 of 1,234) !847 Refactor auth to use JWT tokens merged @johndoe main <- feature/jwt 3 days ago !846 Fix memory leak in websocket handler opened @janedoe main <- fix/websocket 5 days ago !845 [DRAFT] Add dark mode CSS variables opened @bobsmith main <- ui/dark-mode 1 week ago !844 Update dependencies closed @alice main <- chore/deps 1 week ago ... ``` **Filter Options (same pattern as issues):** - `--state [opened|merged|closed|locked|all]` - `--draft` (MR-specific: show only draft MRs) - `--no-draft` (MR-specific: exclude draft MRs) - `--author ` - `--assignee ` - `--reviewer ` (MR-specific) - `--target-branch ` (MR-specific) - `--source-branch ` (MR-specific) - `--label ` (repeatable) - `--project ` - `--limit ` - `--since ` - `--json` - `--open` (open in browser) ### `gi show mr ` **Output:** ``` Merge Request !847: Refactor auth to use JWT tokens ================================================================================ Project: group/project-one State: merged Draft: No Author: @johndoe Assignees: @janedoe, @bobsmith Reviewers: @alice, @charlie Source: feature/jwt Target: main Merge Status: mergeable (detailed_merge_status) Merged By: @alice Merged At: 2024-03-20 14:30:00 Created: 2024-03-15 Updated: 2024-03-20 Labels: enhancement, auth, reviewed URL: https://gitlab.example.com/group/project-one/-/merge_requests/847 Description: Moving away from session cookies to JWT-based authentication for better mobile client support and API consumption... Discussions (8): @janedoe (2024-03-16) [src/auth/jwt.ts:45]: Should we use a separate signing key for refresh tokens? @johndoe (2024-03-16): Good point. I'll add a separate key with rotation support. @alice (2024-03-18) [RESOLVED]: Looks good! Just one nit about the token expiry constant. ``` ### `gi count mrs` **Output:** ``` Merge Requests: 1,234 opened: 89 merged: 1,045 closed: 100 ``` ### `gi count discussions --type=mr` **Output:** ``` MR Discussions: 4,567 ``` ### `gi count notes --type=mr` **Output:** ``` MR Notes: 12,345 (excluding 2,345 system notes) DiffNotes: 3,456 (with file position metadata) ``` --- ## Automated Tests ### Unit Tests ```rust // tests/mr_transformer_tests.rs #[cfg(test)] mod tests { use gi::gitlab::transformers::merge_request::*; use gi::gitlab::types::*; #[test] fn transforms_gitlab_mr_to_normalized_schema() { /* ... */ } #[test] fn extracts_labels_from_mr_payloads() { /* ... */ } #[test] fn extracts_assignees_from_mr_payloads() { /* ... */ } #[test] fn extracts_reviewers_from_mr_payloads() { /* ... */ } #[test] fn handles_missing_optional_fields_gracefully() { /* ... */ } #[test] fn converts_merged_at_and_closed_at_timestamps() { /* ... */ } #[test] fn handles_mrs_with_no_labels_assignees_reviewers() { /* ... */ } } ``` ```rust // tests/diffnote_tests.rs #[cfg(test)] mod tests { use gi::gitlab::transformers::discussion::*; #[test] fn extracts_diffnote_position_metadata() { /* ... */ } #[test] fn handles_notes_without_position() { /* ... */ } #[test] fn extracts_old_path_and_new_path() { /* ... */ } #[test] fn extracts_line_numbers() { /* ... */ } #[test] fn handles_renamed_files_in_diffnote() { /* ... */ } } ``` ```rust // tests/mr_discussion_tests.rs #[cfg(test)] mod tests { #[test] fn transforms_mr_discussion_with_correct_noteable_type() { /* ... */ } #[test] fn computes_resolved_state_for_mr_discussions() { /* ... */ } #[test] fn links_discussion_to_merge_request_id() { /* ... */ } #[test] fn handles_individual_note_mr_discussions() { /* ... */ } } ``` ### Integration Tests ```rust // tests/mr_ingestion_tests.rs #[cfg(test)] mod tests { use tempfile::TempDir; use wiremock::{MockServer, Mock, ResponseTemplate}; use wiremock::matchers::{method, path_regex}; #[tokio::test] async fn inserts_mrs_into_database() { /* ... */ } #[tokio::test] async fn creates_labels_from_mr_payloads() { /* ... */ } #[tokio::test] async fn links_mrs_to_labels_via_junction_table() { /* ... */ } #[tokio::test] async fn removes_stale_label_links_on_resync() { /* ... */ } #[tokio::test] async fn creates_assignee_links() { /* ... */ } #[tokio::test] async fn creates_reviewer_links() { /* ... */ } #[tokio::test] async fn stores_raw_payload_for_each_mr() { /* ... */ } #[tokio::test] async fn updates_cursor_at_page_boundaries() { /* ... */ } #[tokio::test] async fn resumes_from_cursor_on_subsequent_runs() { /* ... */ } #[tokio::test] async fn full_sync_resets_cursor() { /* ... */ } #[tokio::test] async fn full_sync_resets_discussion_watermarks() { // Setup: Create MR with discussions_synced_for_updated_at set // Action: Run ingest with --full flag // Assert: discussions_synced_for_updated_at is NULL after sync // Assert: Discussions are re-fetched for this MR } #[tokio::test] async fn handles_mrs_with_no_labels_assignees_reviewers() { /* ... */ } #[tokio::test] async fn upserts_existing_mrs_on_refetch() { /* ... */ } #[tokio::test] async fn skips_discussion_refetch_for_unchanged_mrs() { /* ... */ } #[tokio::test] async fn captures_diffnote_position_metadata() { /* ... */ } #[tokio::test] async fn removes_stale_discussions_after_successful_pagination() { /* ... */ } #[tokio::test] async fn captures_draft_status_correctly() { /* ... */ } #[tokio::test] async fn uses_detailed_merge_status_over_deprecated_field() { /* ... */ } #[tokio::test] async fn uses_merge_user_over_deprecated_merged_by() { /* ... */ } // CRITICAL: Verify preference ordering when BOTH deprecated and current fields are present. // GitLab may return both for backward compatibility, and we must always prefer the current field. #[tokio::test] async fn prefers_detailed_merge_status_when_both_fields_present() { // Setup: MR payload with BOTH detailed_merge_status AND merge_status // Mock: Return MR with detailed_merge_status="discussions_not_resolved" AND merge_status="can_be_merged" // Assert: detailed_merge_status in DB is "discussions_not_resolved" (not "can_be_merged") } #[tokio::test] async fn prefers_merge_user_when_both_fields_present() { // Setup: MR payload with BOTH merge_user AND merged_by // Mock: Return MR with merge_user.username="alice" AND merged_by.username="bob" // Assert: merge_user_username in DB is "alice" (not "bob") } #[tokio::test] async fn prefers_draft_when_both_draft_and_work_in_progress_present() { // Setup: MR payload with BOTH draft=false AND work_in_progress=true // Mock: Return MR with draft=false, work_in_progress=true // Assert: draft in DB is true (work_in_progress fallback applies when draft is false) // Note: This tests the OR semantics: draft || work_in_progress } // CRITICAL: This test ensures we don't permanently lose data on partial pagination failures. // If pagination fails mid-way, the watermark must NOT advance, so the next sync // will retry fetching discussions for this MR. #[tokio::test] async fn does_not_advance_discussion_watermark_on_partial_failure() { // Setup: Create MR with updated_at > discussions_synced_for_updated_at // Mock: Page 1 of discussions returns successfully // Mock: Page 2 returns 500 Internal Server Error // Assert: discussions_synced_for_updated_at is unchanged (not advanced) // Assert: Re-running ingest will retry this MR's discussions } // CRITICAL: This test ensures we don't lose data when note parsing fails. // If any note in a discussion has an invalid timestamp, we must NOT delete // existing notes, and we must NOT advance the watermark. #[tokio::test] async fn does_not_advance_discussion_watermark_on_note_parse_failure() { // Setup: Create MR with existing discussion and valid notes // Mock: Return discussion with one note having invalid created_at // Assert: Original 3 notes are preserved // Assert: No partial replacement occurred } // Verify atomic note replacement: parsing happens BEFORE deletion #[tokio::test] async fn atomic_note_replacement_preserves_data_on_parse_failure() { // Setup: Discussion with 3 existing notes // Mock: Return updated discussion where note 2 has invalid timestamp // Assert: Original 3 notes are preserved // Assert: No partial replacement occurred } #[tokio::test] async fn uses_work_in_progress_when_draft_not_present() { /* ... */ } #[tokio::test] async fn handles_locked_mr_state() { /* ... */ } #[tokio::test] async fn pagination_continues_when_headers_missing_but_full_page_returned() { /* ... */ } #[tokio::test] async fn does_not_store_raw_payload_for_system_notes_without_position() { /* ... */ } #[tokio::test] async fn stores_raw_payload_for_diffnotes_even_if_system() { /* ... */ } #[tokio::test] async fn captures_position_type_and_line_range_for_diffnotes() { /* ... */ } } ``` --- ## Manual Smoke Tests | Command | Expected Output | Pass Criteria | |---------|-----------------|---------------| | `gi ingest --type=merge_requests` | Progress bar, final count | Completes without error | | `gi list mrs --limit=10` | Table of 10 MRs | Shows iid, title, state, author, branches, [DRAFT] prefix | | `gi list mrs --project=group/project-one` | Filtered list | Only shows MRs from that project | | `gi list mrs --state=merged` | Filtered by state | Only merged MRs shown | | `gi list mrs --state=locked` | Filtered by state | Only locked MRs shown (merge in progress) | | `gi list mrs --draft` | Filtered by draft | Only draft MRs shown | | `gi list mrs --no-draft` | Excludes drafts | No draft MRs shown | | `gi list mrs --reviewer=username` | Filtered by reviewer | Only MRs with that reviewer | | `gi list mrs --target-branch=main` | Filtered by target | Only MRs targeting main | | `gi count mrs` | `Merge Requests: N` | Count matches GitLab UI | | `gi show mr 123` | MR detail view | Shows title, description, branches, discussions, detailed_merge_status | | `gi show mr 123` (draft MR) | MR detail view | Shows `Draft: Yes` | | `gi show mr 123` (ambiguous) | Prompt or error | Asks for `--project` clarification | | `gi count discussions --type=mr` | `MR Discussions: N` | Non-zero count | | `gi count notes --type=mr` | `MR Notes: N (excluding M system)` | Non-zero count, shows DiffNote count | | `gi sync-status` | Shows MR cursor positions | MR cursors visible alongside issue cursors | | `gi ingest --type=merge_requests` (re-run) | `0 new MRs` | Cursor is current | | `gi ingest --type=merge_requests` (re-run) | `Skipped discussion sync for N unchanged MRs` | Watermark works | | `gi ingest --type=merge_requests` (concurrent) | Lock error | Second run fails with clear message | | `gi ingest --type=merge_requests --full` | Full re-sync | Resets cursor, fetches everything | | Remove label from MR in GitLab, re-sync | Label link removed | Junction table reflects GitLab state | | Add reviewer in GitLab, re-sync | Reviewer link added | Junction table updated | --- ## Data Integrity Checks After successful ingestion, verify: - [ ] `SELECT COUNT(*) FROM merge_requests` matches GitLab MR count for configured projects - [ ] Every MR has a corresponding `raw_payloads` row - [ ] `draft` field matches GitLab UI (draft MRs have draft=1, includes `work_in_progress` fallback) - [ ] `state` includes all expected values (`opened`, `merged`, `closed`, `locked`) - [ ] `detailed_merge_status` is populated for modern GitLab instances - [ ] `head_sha` is populated (NULL only for very old MRs or edge cases) - [ ] `references_short` and `references_full` are populated when GitLab returns them - [ ] Labels in `mr_labels` junction all exist in `labels` table - [ ] `mr_labels` count per MR matches GitLab UI label count - [ ] `mr_assignees` usernames match GitLab UI - [ ] `mr_reviewers` usernames match GitLab UI - [ ] `sync_cursors` has entry for each `(project_id, 'merge_requests')` pair - [ ] Re-running `gi ingest --type=merge_requests` fetches 0 new items (cursor is current) - [ ] Re-running skips discussion refetch for unchanged MRs (watermark works) - [ ] `--full` sync resets `discussions_synced_for_updated_at` to NULL for all MRs - [ ] After partial pagination failure, `discussions_synced_for_updated_at` is NOT advanced - [ ] After note parse failure, `discussions_synced_for_updated_at` is NOT advanced - [ ] After note parse failure, existing notes are preserved (not deleted) - [ ] `SELECT COUNT(*) FROM discussions WHERE noteable_type='MergeRequest'` is non-zero - [ ] Every MR discussion has at least one note - [ ] DiffNotes have `position_new_path` populated when available - [ ] DiffNotes have `position_type` populated when available (text/image/file) - [ ] DiffNotes have `position_base_sha`, `position_start_sha`, `position_head_sha` populated when available - [ ] Multi-line DiffNotes have `position_line_range_start/end` populated when available - [ ] `SELECT COUNT(*) FROM notes WHERE position_new_path IS NOT NULL` matches expected DiffNote count - [ ] Discussion `first_note_at` <= `last_note_at` for all rows - [ ] No notes have `created_at = 0` or `updated_at = 0` (strict timestamp parsing) - [ ] System notes without position do NOT have raw_payloads rows (selective storage) - [ ] After removing a label/reviewer in GitLab and re-syncing, the link is removed - [ ] Stale discussions removed via `last_seen_at` sweep after successful pagination - [ ] Stale notes removed via `last_seen_at` sweep (upsert + sweep pattern) --- ## Definition of Done ### Gate A: MRs Only (Must Pass First) - [ ] `gi ingest --type=merge_requests` fetches all MRs from configured projects - [ ] MRs stored with correct schema, including branches and merge metadata - [ ] `draft` field captured correctly (with `work_in_progress` fallback for older instances) - [ ] `state` field supports all values: `opened`, `merged`, `closed`, `locked` - [ ] `locked` state handled as transitional (local filtering only, not server-side filter) - [ ] `detailed_merge_status` used (non-deprecated) with fallback to `merge_status` - [ ] `merge_user_username` used (non-deprecated) with fallback to `merged_by` - [ ] `head_sha` captured for CP3 readiness - [ ] `references_short` and `references_full` captured for CP3 readiness - [ ] Cursor-based sync is resumable (re-run fetches only new/updated) - [ ] Page-boundary cursor updates (not based on item count modulo) - [ ] Pagination uses robust fallback chain: Link header > x-next-page > full-page heuristic - [ ] Raw payloads stored for each MR - [ ] `gi list mrs` and `gi count mrs` work ### Gate B: Labels + Assignees + Reviewers (Must Pass) - [ ] Labels extracted and stored (name-only) - [ ] Label links created correctly - [ ] **Stale label links removed on re-sync** (verified with test) - [ ] Assignees extracted and linked to `mr_assignees` - [ ] Reviewers extracted and linked to `mr_reviewers` - [ ] **Stale assignee/reviewer links removed on re-sync** ### Gate C: Dependent Discussion Sync with DiffNotes (Must Pass) - [ ] Discussions fetched for MRs with `updated_at` advancement - [ ] Notes stored with `is_system` flag correctly set - [ ] **Upsert + sweep pattern for notes** (not delete-all-then-insert; reduces write amplification) - [ ] **Atomic note replacement** (notes parsed BEFORE existing notes deleted - prevents data loss) - [ ] **Strict timestamp parsing** (no zero values from invalid timestamps) - [ ] **DiffNote position metadata captured** (`position_new_path`, `position_type`, line numbers, line_range) - [ ] **DiffNote SHA triplet captured** (`position_base_sha`, `position_start_sha`, `position_head_sha`) - [ ] **Selective raw payload storage** (skip system notes without position) - [ ] Raw payloads stored for discussions and DiffNotes - [ ] `discussions_synced_for_updated_at` watermark updated after sync - [ ] **Watermark is NOT advanced if any discussion page fetch fails** (verified with test) - [ ] **Watermark is NOT advanced if any note parse fails** (verified with test) - [ ] **Unchanged MRs skip discussion refetch** (verified with test) - [ ] Stale discussions removed via `last_seen_at` sweep after successful pagination - [ ] Stale notes removed via `last_seen_at` sweep (upsert + sweep pattern) - [ ] Bounded concurrency (`dependent_concurrency` respected) ### Gate D: Resumability Proof (Must Pass) - [ ] Kill mid-run, rerun; bounded redo (cursor progress preserved at page boundaries) - [ ] No redundant discussion refetch after crash recovery - [ ] No watermark advancement on partial pagination failure - [ ] No watermark advancement on note parse failure (preserves existing data) - [ ] Single-flight lock prevents concurrent runs - [ ] `--full` flag resets cursor and fetches all data - [ ] `--full` flag also resets `discussions_synced_for_updated_at` (forces discussion refetch) ### Gate E: CLI Complete (Must Pass) - [ ] `gi list mrs` with all filter options including `--draft` and `--no-draft` - [ ] `gi list mrs` shows [DRAFT] prefix for draft MRs - [ ] `gi show mr ` displays full detail with discussions - [ ] `gi show mr` shows DiffNote file context in discussion display - [ ] `gi show mr` shows `detailed_merge_status` - [ ] `gi count mrs` shows state breakdown - [ ] `gi sync-status` shows MR cursor positions ### Final Gate (Must Pass) - [ ] All unit tests pass (`cargo test`) - [ ] All integration tests pass (mocked with wiremock) - [ ] `does_not_advance_discussion_watermark_on_partial_failure` test passes - [ ] `prefers_detailed_merge_status_when_both_fields_present` test passes - [ ] `prefers_merge_user_when_both_fields_present` test passes - [ ] `prefers_draft_when_both_draft_and_work_in_progress_present` test passes - [ ] `cargo clippy` passes with no warnings - [ ] `cargo fmt --check` passes - [ ] Compiles with `--release` ### Hardening (Optional Before CP3) - [ ] Edge cases: MRs with 0 labels, 0 discussions, no assignees/reviewers - [ ] Large pagination (100+ pages) - [ ] Rate limit handling under sustained load - [ ] Live tests pass against real GitLab instance - [ ] Performance: 1000+ MRs ingested in <5 min - [ ] DB size verification: system note payloads not stored --- ## Implementation Order 1. **Database migration** (15 min) - `migrations/006_merge_requests.sql` - Update `MIGRATIONS` const in `src/core/db.rs` 2. **GitLab types** (15 min) - Add `GitLabMergeRequest`, `GitLabReviewer` to `src/gitlab/types.rs` - Test deserialization with fixtures 3. **MR Transformer** (25 min) - `src/gitlab/transformers/merge_request.rs` - `MergeRequestWithMetadata` struct - Unit tests 4. **Discussion transformer updates** (15 min) - Add `transform_mr_discussion()` function - Add `transform_notes_with_diff_position()` function - Unit tests for DiffNote extraction 5. **GitLab client pagination** (20 min) - Add `paginate_merge_requests()` - Add `paginate_mr_discussions()` 6. **MR ingestion** (45 min) - `src/ingestion/merge_requests.rs` - Transaction batching - Label/assignee/reviewer linking with stale removal - Incremental cursor updates - Return `mrs_needing_discussion_sync` - Integration tests 7. **MR discussion ingestion** (30 min) - `src/ingestion/mr_discussions.rs` - DiffNote position capture - Stale discussion removal - Watermark update - Integration tests 8. **Update orchestrator** (20 min) - `src/ingestion/orchestrator.rs` - Support both issue and MR ingestion - Aggregate results 9. **Update ingest command** (15 min) - `src/cli/commands/ingest.rs` - Route `merge_requests` type to MR ingest 10. **Implement MR CLI commands** (45 min) - Update `gi list` for MRs with filters - Update `gi show` for MR detail view - Update `gi count` for MR counts - Update `gi sync-status` for MR cursors 11. **Final validation** (20 min) - `cargo test` - `cargo clippy` - Gate A/B/C/D/E verification - Manual smoke tests - Data integrity checks --- ## Risks & Mitigations | Risk | Mitigation | |------|------------| | GitLab rate limiting during large sync | Respect `Retry-After`, exponential backoff, configurable concurrency | | MR discussion API N+1 problem | `dependent_concurrency` config limits parallel requests; watermark prevents refetch | | Cursor drift if GitLab timestamp behavior changes | Rolling backfill window catches missed items | | Large MRs with 100+ discussions | Paginate discussions, bound memory usage | | System notes pollute data | `is_system` flag allows filtering | | Label/assignee/reviewer deduplication | Clear and re-link pattern ensures correctness | | DiffNote position field variations | Defensive parsing with Option types; extended fields (position_type, line_range, SHA triplet) for modern GitLab | | Async stream complexity | Use `async-stream` crate for ergonomic generators | | rusqlite + async runtime Send/locking pitfalls | Use `LocalSet` + `spawn_local` for non-Send tasks; each worker opens its own SQLite connection (WAL + busy_timeout). Avoid holding DB handles across `.await`. | | Crash causes massive refetch | Page-boundary cursor updates (not item-count based) | | Cursor rewind causes discussion refetch | Per-MR watermark (`discussions_synced_for_updated_at`) | | Stale discussions accumulate | Remove discussions via `last_seen_at < run_seen_at` sweep (not in-memory ID collection) | | Stale notes accumulate | Remove notes via `last_seen_at` sweep (upsert + sweep pattern) | | **Partial pagination failure loses data** | Watermark NOT advanced unless pagination completes; explicit test coverage | | **Invalid timestamps corrupt cursor logic** | Strict timestamp parsing (Result types, no unwrap_or(0)); bad notes logged and skipped; parse BEFORE delete | | **Note parse failure causes data loss** | Parse notes BEFORE any destructive DB operations; if parse fails, preserve existing data and don't advance watermark | | **Raw payload size explodes with notes** | Selective storage: DiffNotes + non-system notes only; system notes use discussion payload | | **Deprecated GitLab fields cause future churn** | Use `detailed_merge_status` and `merge_user` with fallback to old; explicit tests verify preference when both present | | **Memory growth on large projects** | DB-driven discussion sync selection; no in-memory collection of MRs needing sync | | **Pagination headers missing (proxy/instance issue)** | Robust fallback chain: Link header (RFC 8288) > x-next-page > full-page heuristic | | **GitLab version differences** | Handle `locked` state (transitional, local filtering); support both `draft` and `work_in_progress`; optional reviewer verification endpoint for debugging | | **--full sync leaves stale discussion data** | `--full` resets both MR cursor AND `discussions_synced_for_updated_at` watermarks | | **Write amplification on note updates** | Upsert + sweep pattern instead of delete-all-then-insert | --- ## API Call Estimation For a project with **500 MRs** (average 10 discussions each, 3 notes per discussion): | Operation | Calls | Notes | |-----------|-------|-------| | MR list pages | 5 | 100 per page | | Discussion pages per MR | 1 | Most MRs have <100 discussions | | **Total initial sync** | ~505 | 5 + (500 × 1) | | **Subsequent sync (10% change)** | ~55 | 5 + (50 × 1) | **Rate limit safety:** At 100 requests/second, full sync completes in ~5 seconds of API time. With `dependent_concurrency: 10`, wall clock time is ~50 seconds plus network latency. --- ## MR-Specific Considerations ### DiffNote Position Metadata GitLab DiffNotes include position metadata that identifies the file and line where the comment was placed. This is critical for code review context in CP3 document generation. **Position Object Structure (from GitLab API):** ```json { "position": { "base_sha": "abc123...", // Base commit SHA for the diff "start_sha": "def456...", // Start commit SHA for the diff "head_sha": "ghi789...", // Head commit SHA for the diff "old_path": "src/auth/login.ts", // File path before rename (if any) "new_path": "src/auth/login.ts", // Current file path "position_type": "text", // "text" | "image" | "file" "old_line": null, // Line number in old version (null for additions) "new_line": 45, // Line number in new version (null for deletions) "line_range": { // For multi-line comments (GitLab 13.6+) "start": { "line_code": "...", "type": "new", "old_line": null, "new_line": 45 }, "end": { "line_code": "...", "type": "new", "old_line": null, "new_line": 48 } } } } ``` **Why Capture SHA Triplet:** The SHA triplet (`base_sha`, `start_sha`, `head_sha`) is essential for future CP3 work: - **Precise diff context:** Maps comment to exact commit range being reviewed - **Future file content fetch:** Enables fetching file content at the exact commit when the comment was made - **Stale comment detection:** Identifies comments on outdated code when source branch is updated - **Zero extra API cost:** Already present in discussion response, just needs extraction **Stored Fields:** - `position_old_path` - File path in base version (for renames/deletions) - `position_new_path` - File path in head version (for additions/modifications) - `position_old_line` - Line number in base version - `position_new_line` - Line number in head version - `position_type` - Type of position: "text", "image", or "file" - `position_line_range_start` - Start line for multi-line comments - `position_line_range_end` - End line for multi-line comments - `position_base_sha` - Base commit SHA for the diff - `position_start_sha` - Start commit SHA for the diff - `position_head_sha` - Head commit SHA for the diff **Display in CLI:** ``` @janedoe (2024-03-16) [src/auth/login.ts:45]: Should we validate the JWT signature here? ``` For multi-line comments: ``` @janedoe (2024-03-16) [src/auth/login.ts:45-48]: This whole block should use async/await instead of callbacks. ``` --- ## References - [GitLab MR API](https://docs.gitlab.com/ee/api/merge_requests.html) - [GitLab Discussions API](https://docs.gitlab.com/ee/api/discussions.html) - [GitLab Notes API](https://docs.gitlab.com/ee/api/notes.html) - [GitLab Pagination](https://docs.gitlab.com/ee/api/rest/index.html#pagination) - [RFC 8288 - Link Header](https://datatracker.ietf.org/doc/html/rfc8288) - Checkpoint 1 PRD (Issue Ingestion)