2096 lines
80 KiB
Markdown
2096 lines
80 KiB
Markdown
# Checkpoint 2: MR Ingestion - PRD
|
||
|
||
> **Note:** The project was renamed from "gitlab-inbox" to "gitlore" and the CLI from "gi" to "lore". References to "gi" in this document should be read as "lore".
|
||
|
||
**Version:** 1.3
|
||
**Status:** Ready for Implementation
|
||
**Depends On:** Checkpoint 1 (Issue Ingestion)
|
||
**Enables:** Checkpoint 3A (Document Generation + FTS)
|
||
|
||
---
|
||
|
||
## Overview
|
||
|
||
### Objective
|
||
|
||
Ingest all merge requests, MR discussions, and notes (including DiffNote position metadata) from configured GitLab repositories. This checkpoint extends the cursor-based sync pattern established in CP1 to merge requests, capturing code review context that is essential for decision traceability.
|
||
|
||
### Success Criteria
|
||
|
||
| Criterion | Validation |
|
||
|-----------|------------|
|
||
| `gi ingest --type=merge_requests` fetches all MRs | `gi count mrs` matches GitLab UI |
|
||
| MR labels extracted and linked correctly | Label count per MR matches GitLab |
|
||
| MR assignees and reviewers captured | Junction tables populated |
|
||
| MR merge status is future-proof | `detailed_merge_status` populated; no reliance on deprecated `merge_status` |
|
||
| MR merge actor is future-proof | `merge_user_username` populated when merged/auto-merge set |
|
||
| Draft MRs are captured | `draft` populated and visible in `gi list mrs` |
|
||
| Draft is backwards-compatible | Older instances using `work_in_progress` still populate `draft` |
|
||
| MR state is complete | `state` supports `opened|merged|closed|locked` (no drop/parse failures) |
|
||
| MR head SHA captured | `head_sha` populated for CP3 diff/commit context |
|
||
| MR references captured | `references_short` and `references_full` populated for CP3 cross-project display |
|
||
| MR discussions fetched per-MR (dependent sync) | For MRs whose `updated_at` advanced, discussions and notes upserted |
|
||
| DiffNote position metadata captured | `position_new_path`, `position_old_path`, `position_type` populated for DiffNotes |
|
||
| DiffNote SHA triplet captured | `position_base_sha`, `position_start_sha`, `position_head_sha` populated for DiffNotes |
|
||
| Cursor-based sync is resumable | Re-running fetches 0 new items |
|
||
| Discussion sync skips unchanged MRs | Per-MR watermark prevents redundant fetches |
|
||
| Watermark not advanced on partial failure | Incomplete pagination OR parse failures do not mark MR as synced |
|
||
| Full sync resets discussion watermarks | `--full` flag resets both MR cursor AND discussion watermarks |
|
||
| Pagination uses robust fallback chain | Link header > x-next-page > full-page heuristic |
|
||
| MR CLI commands functional | `gi show mr`, `gi count mrs` work |
|
||
|
||
---
|
||
|
||
## Internal Gates
|
||
|
||
CP2 is validated incrementally via internal gates:
|
||
|
||
| Gate | Scope | Validation |
|
||
|------|-------|------------|
|
||
| **Gate A** | MRs only | Cursor + upsert + raw payloads + list/count working; `locked` state (local filtering), `work_in_progress` fallback, `head_sha` + `references` captured; Link header pagination with fallback chain |
|
||
| **Gate B** | Labels + assignees + reviewers correct | Junction tables populated; counts match GitLab |
|
||
| **Gate C** | Dependent discussion sync | Watermark prevents redundant refetch; DiffNote paths + SHA triplet captured; upsert + sweep for notes; watermark does NOT advance on partial pagination failure OR note parse failures; atomic note replacement (parse before delete) |
|
||
| **Gate D** | Resumability proof | Kill mid-run, rerun; confirm bounded redo; `--full` resets cursor AND discussion watermarks |
|
||
| **Gate E** | CLI complete | `gi show mr` displays discussions with DiffNote file context |
|
||
|
||
---
|
||
|
||
## Deliverables
|
||
|
||
### 1. Project Structure Additions
|
||
|
||
Add the following to the existing Rust structure from Checkpoint 1:
|
||
|
||
```
|
||
gitlab-inbox/
|
||
├── src/
|
||
│ ├── cli/
|
||
│ │ └── commands/
|
||
│ │ ├── list.rs # Update: add MR listing
|
||
│ │ ├── show.rs # Update: add MR detail view
|
||
│ │ └── count.rs # Update: add MR counting
|
||
│ ├── gitlab/
|
||
│ │ ├── types.rs # Add GitLabMergeRequest, GitLabReviewer
|
||
│ │ └── transformers/
|
||
│ │ └── merge_request.rs # NEW: GitLab -> normalized MR
|
||
│ └── ingestion/
|
||
│ ├── merge_requests.rs # NEW: MR fetcher with pagination
|
||
│ ├── mr_discussions.rs # NEW: MR discussion fetcher
|
||
│ └── orchestrator.rs # Update: support MR ingestion
|
||
├── tests/
|
||
│ ├── mr_transformer_tests.rs
|
||
│ ├── mr_ingestion_tests.rs
|
||
│ ├── mr_discussion_tests.rs
|
||
│ ├── diffnote_tests.rs
|
||
│ └── fixtures/
|
||
│ ├── gitlab_merge_request.json
|
||
│ ├── gitlab_merge_requests_page.json
|
||
│ ├── gitlab_mr_discussion.json
|
||
│ ├── gitlab_mr_discussion_with_diffnote.json
|
||
│ └── gitlab_diffnote_position.json
|
||
└── migrations/
|
||
└── 006_merge_requests.sql
|
||
```
|
||
|
||
### 2. GitLab API Endpoints
|
||
|
||
**Merge Requests (Bulk Fetch):**
|
||
```
|
||
GET /projects/:id/merge_requests?scope=all&state=all&updated_after=X&order_by=updated_at&sort=asc&per_page=100
|
||
```
|
||
|
||
**MR Discussions (Per-MR Fetch):**
|
||
```
|
||
GET /projects/:id/merge_requests/:iid/discussions?per_page=100&page=N
|
||
```
|
||
|
||
**Required Query Parameters:**
|
||
- `scope=all` - Include all MRs, not just authored by current user
|
||
- `state=all` - Include merged/closed MRs (GitLab defaults may exclude them)
|
||
|
||
**Key Differences from Issues:**
|
||
- MRs have `source_branch` and `target_branch`
|
||
- MRs have `merge_status`, `merged_by`, `merged_at` fields
|
||
- MRs have reviewers (separate from assignees)
|
||
- MR discussions can contain DiffNotes with position metadata
|
||
|
||
**MR State Values:**
|
||
GitLab MR `state` can be `opened`, `closed`, `merged`, or `locked`. The `locked` state is a **transitional state** that indicates an MR is in the middle of being merged (merge-in-progress). Key considerations:
|
||
- **Store as first-class:** Persist `locked` in the database without coercion to preserve GitLab's exact state.
|
||
- **Local filtering only:** The GitLab API's `state` filter does not support `locked` as a query parameter. To filter for locked MRs, we must use `state=all` server-side and filter locally via SQL `WHERE state = 'locked'`.
|
||
- **Transient nature:** Most MRs in `locked` state will transition to `merged` within seconds/minutes. Expect very few locked MRs in typical queries.
|
||
- **CLI exposure:** The `--state=locked` filter is available for debugging/inspection but is not expected to be commonly used.
|
||
|
||
**Reviewer Verification (Optional / Debug Mode):**
|
||
When reviewer data is suspected to be incomplete on a given GitLab version, enable an optional verification mode:
|
||
```
|
||
GET /projects/:id/merge_requests/:merge_request_iid/reviewers
|
||
```
|
||
This is disabled by default to avoid N+1 overhead. Activated via config flag `sync.verify_reviewers: true` for debugging problematic instances.
|
||
|
||
**Pagination:**
|
||
- **Primary:** Parse `Link: <url>; rel="next"` header (RFC 8288). This is the most reliable signal as it works consistently across GitLab versions and proxy configurations.
|
||
- **Fallback 1:** If no `Link` header, check `x-next-page` header until empty/absent
|
||
- **Fallback 2:** If no pagination headers but current page returned `per_page` items, attempt `page + 1` and stop only when an empty response is received. This handles proxies/instances that strip headers.
|
||
- Per-page maximum: 100
|
||
- **Optional keyset mode:** For projects with 10,000+ MRs, enable keyset pagination via config flag `sync.use_keyset_pagination: true`. This uses `pagination=keyset&order_by=updated_at&sort=asc` with cursor tokens from Link headers for O(1) page fetches instead of O(n) offset scans.
|
||
|
||
---
|
||
|
||
## Database Schema
|
||
|
||
### Migration 006_merge_requests.sql
|
||
|
||
```sql
|
||
-- Merge requests table
|
||
CREATE TABLE merge_requests (
|
||
id INTEGER PRIMARY KEY,
|
||
gitlab_id INTEGER UNIQUE NOT NULL,
|
||
project_id INTEGER NOT NULL REFERENCES projects(id),
|
||
iid INTEGER NOT NULL,
|
||
title TEXT,
|
||
description TEXT,
|
||
state TEXT, -- 'opened' | 'merged' | 'closed' | 'locked'
|
||
draft INTEGER NOT NULL DEFAULT 0, -- 0/1 (SQLite boolean) - work-in-progress status
|
||
author_username TEXT,
|
||
source_branch TEXT,
|
||
target_branch TEXT,
|
||
head_sha TEXT, -- Current commit SHA at head of source branch (CP3-ready)
|
||
references_short TEXT, -- Short reference e.g. "!123" (CP3-ready for display)
|
||
references_full TEXT, -- Full reference e.g. "group/project!123" (CP3-ready for cross-project)
|
||
detailed_merge_status TEXT, -- preferred, non-deprecated (replaces merge_status)
|
||
merge_user_username TEXT, -- preferred over deprecated merged_by
|
||
created_at INTEGER, -- ms epoch UTC
|
||
updated_at INTEGER, -- ms epoch UTC
|
||
merged_at INTEGER, -- ms epoch UTC (NULL if not merged)
|
||
closed_at INTEGER, -- ms epoch UTC (NULL if not closed)
|
||
last_seen_at INTEGER NOT NULL, -- ms epoch UTC, updated on every upsert
|
||
-- Prevents re-fetching discussions on cursor rewind / reruns unless MR changed.
|
||
discussions_synced_for_updated_at INTEGER,
|
||
-- Sync health telemetry for debuggability
|
||
discussions_sync_last_attempt_at INTEGER, -- ms epoch UTC of last sync attempt
|
||
discussions_sync_attempts INTEGER DEFAULT 0, -- count of sync attempts for this MR version
|
||
discussions_sync_last_error TEXT, -- last error message if sync failed
|
||
web_url TEXT,
|
||
raw_payload_id INTEGER REFERENCES raw_payloads(id)
|
||
);
|
||
CREATE INDEX idx_mrs_project_updated ON merge_requests(project_id, updated_at);
|
||
CREATE INDEX idx_mrs_author ON merge_requests(author_username);
|
||
CREATE INDEX idx_mrs_target_branch ON merge_requests(project_id, target_branch);
|
||
CREATE INDEX idx_mrs_source_branch ON merge_requests(project_id, source_branch);
|
||
CREATE INDEX idx_mrs_state ON merge_requests(project_id, state);
|
||
CREATE INDEX idx_mrs_detailed_merge_status ON merge_requests(project_id, detailed_merge_status);
|
||
CREATE INDEX idx_mrs_draft ON merge_requests(project_id, draft);
|
||
CREATE INDEX idx_mrs_discussions_sync ON merge_requests(project_id, discussions_synced_for_updated_at);
|
||
CREATE UNIQUE INDEX uq_mrs_project_iid ON merge_requests(project_id, iid);
|
||
|
||
-- MR-Label junction (reuses labels table from CP1)
|
||
CREATE TABLE mr_labels (
|
||
merge_request_id INTEGER REFERENCES merge_requests(id) ON DELETE CASCADE,
|
||
label_id INTEGER REFERENCES labels(id) ON DELETE CASCADE,
|
||
PRIMARY KEY(merge_request_id, label_id)
|
||
);
|
||
CREATE INDEX idx_mr_labels_label ON mr_labels(label_id);
|
||
|
||
-- MR assignees (same pattern as issue_assignees)
|
||
CREATE TABLE mr_assignees (
|
||
merge_request_id INTEGER REFERENCES merge_requests(id) ON DELETE CASCADE,
|
||
username TEXT NOT NULL,
|
||
PRIMARY KEY(merge_request_id, username)
|
||
);
|
||
CREATE INDEX idx_mr_assignees_username ON mr_assignees(username);
|
||
|
||
-- MR reviewers (MR-specific, not applicable to issues)
|
||
CREATE TABLE mr_reviewers (
|
||
merge_request_id INTEGER REFERENCES merge_requests(id) ON DELETE CASCADE,
|
||
username TEXT NOT NULL,
|
||
PRIMARY KEY(merge_request_id, username)
|
||
);
|
||
CREATE INDEX idx_mr_reviewers_username ON mr_reviewers(username);
|
||
|
||
-- Add FK constraint to discussions table for merge_request_id
|
||
-- Note: SQLite doesn't support ADD CONSTRAINT, the FK was defined in CP1 but nullable
|
||
-- We just need to add an index if not already present
|
||
CREATE INDEX IF NOT EXISTS idx_discussions_mr_id ON discussions(merge_request_id);
|
||
CREATE INDEX IF NOT EXISTS idx_discussions_mr_resolved ON discussions(merge_request_id, resolved, resolvable);
|
||
|
||
-- Additional indexes for DiffNote queries (notes table from CP1)
|
||
-- These composite indexes enable efficient file-context queries for CP3
|
||
CREATE INDEX IF NOT EXISTS idx_notes_type ON notes(note_type);
|
||
CREATE INDEX IF NOT EXISTS idx_notes_new_path ON notes(position_new_path);
|
||
CREATE INDEX IF NOT EXISTS idx_notes_new_path_line ON notes(position_new_path, position_new_line);
|
||
CREATE INDEX IF NOT EXISTS idx_notes_old_path_line ON notes(position_old_path, position_old_line);
|
||
|
||
-- CP2: capture richer diff note position shapes (minimal, still MVP)
|
||
-- These fields support modern GitLab diff note semantics without full diff reconstruction
|
||
ALTER TABLE notes ADD COLUMN position_type TEXT; -- 'text' | 'image' | 'file'
|
||
ALTER TABLE notes ADD COLUMN position_line_range_start INTEGER; -- multi-line comment start
|
||
ALTER TABLE notes ADD COLUMN position_line_range_end INTEGER; -- multi-line comment end
|
||
-- DiffNote SHA triplet for commit context (CP3-ready, zero extra API cost)
|
||
ALTER TABLE notes ADD COLUMN position_base_sha TEXT; -- Base commit SHA for diff
|
||
ALTER TABLE notes ADD COLUMN position_start_sha TEXT; -- Start commit SHA for diff
|
||
ALTER TABLE notes ADD COLUMN position_head_sha TEXT; -- Head commit SHA for diff
|
||
|
||
-- Update schema version
|
||
INSERT INTO schema_version (version, applied_at, description)
|
||
VALUES (6, strftime('%s', 'now') * 1000, 'Merge requests, MR labels, assignees, reviewers');
|
||
```
|
||
|
||
---
|
||
|
||
## GitLab Types
|
||
|
||
### Type Definitions
|
||
|
||
```rust
|
||
// src/gitlab/types.rs (additions)
|
||
|
||
use serde::Deserialize;
|
||
|
||
/// GitLab merge request from the API.
|
||
/// Note: Uses non-deprecated field names where possible (detailed_merge_status, merge_user).
|
||
/// Falls back gracefully for older GitLab versions.
|
||
#[derive(Debug, Clone, Deserialize)]
|
||
pub struct GitLabMergeRequest {
|
||
pub id: i64, // GitLab global ID
|
||
pub iid: i64, // Project-scoped MR number
|
||
pub project_id: i64,
|
||
pub title: String,
|
||
pub description: Option<String>,
|
||
pub state: String, // "opened" | "merged" | "closed" | "locked"
|
||
#[serde(default)]
|
||
pub draft: bool, // Work-in-progress status (preferred)
|
||
#[serde(default)]
|
||
pub work_in_progress: bool, // Deprecated; fallback for older instances
|
||
pub source_branch: String,
|
||
pub target_branch: String,
|
||
pub sha: Option<String>, // Current commit SHA at head of source branch (CP3-ready)
|
||
pub references: Option<GitLabReferences>, // Short and full reference strings (CP3-ready)
|
||
// Prefer detailed_merge_status (non-deprecated) over merge_status (deprecated)
|
||
pub detailed_merge_status: Option<String>, // "mergeable" | "not_mergeable" | "checking" | etc.
|
||
#[serde(alias = "merge_status")]
|
||
pub merge_status_legacy: Option<String>, // Keep for older/self-managed versions
|
||
pub created_at: String, // ISO 8601
|
||
pub updated_at: String, // ISO 8601
|
||
pub merged_at: Option<String>, // ISO 8601
|
||
pub closed_at: Option<String>, // ISO 8601
|
||
pub author: GitLabAuthor,
|
||
// Prefer merge_user (current) over merged_by (deprecated)
|
||
pub merge_user: Option<GitLabAuthor>,
|
||
pub merged_by: Option<GitLabAuthor>, // Keep for older/self-managed versions
|
||
#[serde(default)]
|
||
pub labels: Vec<String>, // Array of label names
|
||
#[serde(default)]
|
||
pub assignees: Vec<GitLabAuthor>,
|
||
#[serde(default)]
|
||
pub reviewers: Vec<GitLabReviewer>,
|
||
pub web_url: String,
|
||
}
|
||
|
||
/// GitLab references object (short and full reference strings).
|
||
#[derive(Debug, Clone, Deserialize)]
|
||
pub struct GitLabReferences {
|
||
pub short: String, // e.g. "!123"
|
||
pub full: String, // e.g. "group/project!123"
|
||
}
|
||
|
||
/// GitLab author from the API.
|
||
#[derive(Debug, Clone, Deserialize)]
|
||
pub struct GitLabAuthor {
|
||
pub id: i64,
|
||
pub username: String,
|
||
pub name: String,
|
||
}
|
||
|
||
/// GitLab reviewer (can have approval state).
|
||
#[derive(Debug, Clone, Deserialize)]
|
||
pub struct GitLabReviewer {
|
||
pub id: i64,
|
||
pub username: String,
|
||
pub name: String,
|
||
// Note: approval state may require additional API call, defer to post-MVP
|
||
}
|
||
|
||
// Note: GitLabDiscussion and GitLabNote types already exist from CP1
|
||
// and support MR discussions with DiffNote position metadata.
|
||
```
|
||
|
||
---
|
||
|
||
## Transformers
|
||
|
||
### Merge Request Transformer
|
||
|
||
```rust
|
||
// src/gitlab/transformers/merge_request.rs
|
||
|
||
use crate::core::time::{iso_to_ms, now_ms};
|
||
use crate::gitlab::types::GitLabMergeRequest;
|
||
|
||
/// Normalized merge request ready for database insertion.
|
||
#[derive(Debug, Clone)]
|
||
pub struct NormalizedMergeRequest {
|
||
pub gitlab_id: i64,
|
||
pub project_id: i64, // Local DB project ID
|
||
pub iid: i64,
|
||
pub title: String,
|
||
pub description: Option<String>,
|
||
pub state: String,
|
||
pub draft: bool,
|
||
pub author_username: String,
|
||
pub source_branch: String,
|
||
pub target_branch: String,
|
||
pub head_sha: Option<String>, // CP3-ready: current commit at source branch head
|
||
pub references_short: Option<String>, // CP3-ready: e.g. "!123"
|
||
pub references_full: Option<String>, // CP3-ready: e.g. "group/project!123"
|
||
pub detailed_merge_status: Option<String>,
|
||
pub merge_user_username: Option<String>,
|
||
pub created_at: i64, // ms epoch
|
||
pub updated_at: i64, // ms epoch
|
||
pub merged_at: Option<i64>, // ms epoch
|
||
pub closed_at: Option<i64>, // ms epoch
|
||
pub last_seen_at: i64, // ms epoch
|
||
pub web_url: String,
|
||
}
|
||
|
||
/// Normalized label ready for database insertion.
|
||
/// Reuses structure from issue transformer.
|
||
#[derive(Debug, Clone)]
|
||
pub struct NormalizedLabel {
|
||
pub project_id: i64,
|
||
pub name: String,
|
||
}
|
||
|
||
/// Result of transforming a GitLab MR.
|
||
#[derive(Debug, Clone)]
|
||
pub struct MergeRequestWithMetadata {
|
||
pub merge_request: NormalizedMergeRequest,
|
||
pub label_names: Vec<String>,
|
||
pub assignee_usernames: Vec<String>,
|
||
pub reviewer_usernames: Vec<String>,
|
||
}
|
||
|
||
/// Transform GitLab merge request to normalized schema.
|
||
pub fn transform_merge_request(
|
||
gitlab_mr: &GitLabMergeRequest,
|
||
local_project_id: i64,
|
||
) -> Result<MergeRequestWithMetadata, String> {
|
||
// Parse timestamps, return error if invalid (strict parsing, no silent zeroing)
|
||
let created_at = iso_to_ms(&gitlab_mr.created_at)
|
||
.ok_or_else(|| format!("Invalid created_at: {}", gitlab_mr.created_at))?;
|
||
let updated_at = iso_to_ms(&gitlab_mr.updated_at)
|
||
.ok_or_else(|| format!("Invalid updated_at: {}", gitlab_mr.updated_at))?;
|
||
|
||
let merged_at = gitlab_mr
|
||
.merged_at
|
||
.as_ref()
|
||
.and_then(|s| iso_to_ms(s));
|
||
|
||
let closed_at = gitlab_mr
|
||
.closed_at
|
||
.as_ref()
|
||
.and_then(|s| iso_to_ms(s));
|
||
|
||
// Prefer merge_user (current) over merged_by (deprecated)
|
||
// Both are bools; draft takes precedence if true, otherwise fall back to work_in_progress
|
||
let is_draft = gitlab_mr.draft || gitlab_mr.work_in_progress;
|
||
|
||
// Extract references (CP3-ready)
|
||
let (references_short, references_full) = gitlab_mr
|
||
.references
|
||
.as_ref()
|
||
.map(|r| (Some(r.short.clone()), Some(r.full.clone())))
|
||
.unwrap_or((None, None));
|
||
|
||
let merge_request = NormalizedMergeRequest {
|
||
gitlab_id: gitlab_mr.id,
|
||
project_id: local_project_id,
|
||
iid: gitlab_mr.iid,
|
||
title: gitlab_mr.title.clone(),
|
||
description: gitlab_mr.description.clone(),
|
||
state: gitlab_mr.state.clone(),
|
||
draft: is_draft,
|
||
author_username: gitlab_mr.author.username.clone(),
|
||
source_branch: gitlab_mr.source_branch.clone(),
|
||
target_branch: gitlab_mr.target_branch.clone(),
|
||
head_sha: gitlab_mr.sha.clone(),
|
||
references_short,
|
||
references_full,
|
||
detailed_merge_status: gitlab_mr
|
||
.detailed_merge_status
|
||
.clone()
|
||
.or_else(|| gitlab_mr.merge_status_legacy.clone()),
|
||
merge_user_username: gitlab_mr
|
||
.merge_user
|
||
.as_ref()
|
||
.map(|u| u.username.clone())
|
||
.or_else(|| gitlab_mr.merged_by.as_ref().map(|u| u.username.clone())),
|
||
created_at,
|
||
updated_at,
|
||
merged_at,
|
||
closed_at,
|
||
last_seen_at: now_ms(),
|
||
web_url: gitlab_mr.web_url.clone(),
|
||
};
|
||
|
||
let label_names = gitlab_mr.labels.clone();
|
||
|
||
let assignee_usernames = gitlab_mr
|
||
.assignees
|
||
.iter()
|
||
.map(|u| u.username.clone())
|
||
.collect();
|
||
|
||
let reviewer_usernames = gitlab_mr
|
||
.reviewers
|
||
.iter()
|
||
.map(|u| u.username.clone())
|
||
.collect();
|
||
|
||
Ok(MergeRequestWithMetadata {
|
||
merge_request,
|
||
label_names,
|
||
assignee_usernames,
|
||
reviewer_usernames,
|
||
})
|
||
}
|
||
|
||
/// Extract labels from GitLab MR (name-only, same as issues).
|
||
pub fn extract_labels(gitlab_mr: &GitLabMergeRequest, local_project_id: i64) -> Vec<NormalizedLabel> {
|
||
gitlab_mr
|
||
.labels
|
||
.iter()
|
||
.map(|name| NormalizedLabel {
|
||
project_id: local_project_id,
|
||
name: name.clone(),
|
||
})
|
||
.collect()
|
||
}
|
||
```
|
||
|
||
### MR Discussion Transformer
|
||
|
||
```rust
|
||
// src/gitlab/transformers/discussion.rs (additions)
|
||
|
||
/// Transform GitLab discussion to normalized schema for MR context.
|
||
/// Note: The core transform_discussion and transform_notes functions
|
||
/// from CP1 are reused. This adds MR-specific handling.
|
||
pub fn transform_mr_discussion(
|
||
gitlab_discussion: &GitLabDiscussion,
|
||
local_project_id: i64,
|
||
local_mr_id: i64,
|
||
) -> NormalizedDiscussion {
|
||
let note_times: Vec<i64> = gitlab_discussion
|
||
.notes
|
||
.iter()
|
||
.filter_map(|n| iso_to_ms(&n.created_at))
|
||
.collect();
|
||
|
||
// Check if any note is resolvable (MR discussions often are)
|
||
let resolvable = gitlab_discussion.notes.iter().any(|n| n.resolvable);
|
||
let resolved = resolvable
|
||
&& gitlab_discussion
|
||
.notes
|
||
.iter()
|
||
.all(|n| !n.resolvable || n.resolved);
|
||
|
||
NormalizedDiscussion {
|
||
gitlab_discussion_id: gitlab_discussion.id.clone(),
|
||
project_id: local_project_id,
|
||
issue_id: None, // Not an issue discussion
|
||
merge_request_id: Some(local_mr_id),
|
||
noteable_type: "MergeRequest".to_string(),
|
||
individual_note: gitlab_discussion.individual_note,
|
||
first_note_at: note_times.iter().min().copied(),
|
||
last_note_at: note_times.iter().max().copied(),
|
||
last_seen_at: now_ms(),
|
||
resolvable,
|
||
resolved,
|
||
}
|
||
}
|
||
|
||
/// Transform GitLab notes with DiffNote position extraction.
|
||
/// Captures file path and line metadata for code review comments.
|
||
///
|
||
/// Returns Result to enforce strict timestamp parsing - corrupted timestamps
|
||
/// (zero values) would break cursor logic and time-series ordering.
|
||
pub fn transform_notes_with_diff_position(
|
||
gitlab_discussion: &GitLabDiscussion,
|
||
local_project_id: i64,
|
||
) -> Result<Vec<NormalizedNote>, String> {
|
||
gitlab_discussion
|
||
.notes
|
||
.iter()
|
||
.enumerate()
|
||
.map(|(index, note)| {
|
||
// Strict timestamp parsing - no silent zeroing
|
||
let created_at = iso_to_ms(¬e.created_at)
|
||
.ok_or_else(|| format!(
|
||
"Invalid note.created_at for note {}: {}",
|
||
note.id, note.created_at
|
||
))?;
|
||
let updated_at = iso_to_ms(¬e.updated_at)
|
||
.ok_or_else(|| format!(
|
||
"Invalid note.updated_at for note {}: {}",
|
||
note.id, note.updated_at
|
||
))?;
|
||
|
||
// Extract DiffNote position metadata if present
|
||
// Includes position_type, line_range, and SHA triplet for modern GitLab diff note shapes
|
||
let (old_path, new_path, old_line, new_line, position_type, lr_start, lr_end, base_sha, start_sha, head_sha) = note
|
||
.position
|
||
.as_ref()
|
||
.map(|pos| (
|
||
pos.old_path.clone(),
|
||
pos.new_path.clone(),
|
||
pos.old_line,
|
||
pos.new_line,
|
||
pos.position_type.clone(),
|
||
pos.line_range.as_ref().map(|r| r.start_line),
|
||
pos.line_range.as_ref().map(|r| r.end_line),
|
||
pos.base_sha.clone(),
|
||
pos.start_sha.clone(),
|
||
pos.head_sha.clone(),
|
||
))
|
||
.unwrap_or((None, None, None, None, None, None, None, None, None, None));
|
||
|
||
Ok(NormalizedNote {
|
||
gitlab_id: note.id,
|
||
project_id: local_project_id,
|
||
note_type: note.note_type.clone(),
|
||
is_system: note.system,
|
||
author_username: note.author.username.clone(),
|
||
body: note.body.clone(),
|
||
created_at,
|
||
updated_at,
|
||
last_seen_at: now_ms(),
|
||
position: index as i32,
|
||
resolvable: note.resolvable,
|
||
resolved: note.resolved,
|
||
resolved_by: note.resolved_by.as_ref().map(|a| a.username.clone()),
|
||
resolved_at: note.resolved_at.as_ref().and_then(|s| iso_to_ms(s)),
|
||
// DiffNote position metadata
|
||
position_old_path: old_path,
|
||
position_new_path: new_path,
|
||
position_old_line: old_line,
|
||
position_new_line: new_line,
|
||
// Extended position metadata for modern GitLab
|
||
position_type,
|
||
position_line_range_start: lr_start,
|
||
position_line_range_end: lr_end,
|
||
// SHA triplet for commit context (CP3-ready)
|
||
position_base_sha: base_sha,
|
||
position_start_sha: start_sha,
|
||
position_head_sha: head_sha,
|
||
})
|
||
})
|
||
.collect()
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## GitLab Client Additions
|
||
|
||
### Pagination with Async Streams
|
||
|
||
```rust
|
||
// src/gitlab/client.rs (additions)
|
||
|
||
/// A page of merge requests with pagination metadata.
|
||
#[derive(Debug)]
|
||
pub struct MergeRequestPage {
|
||
pub items: Vec<GitLabMergeRequest>,
|
||
pub next_page: Option<u32>,
|
||
pub is_last_page: bool,
|
||
}
|
||
|
||
impl GitLabClient {
|
||
/// Parse Link header to extract rel="next" URL (RFC 8288).
|
||
/// Returns Some(url) if a next page link exists, None otherwise.
|
||
fn parse_link_header_next(headers: &reqwest::header::HeaderMap) -> Option<String> {
|
||
headers
|
||
.get("link")
|
||
.and_then(|v| v.to_str().ok())
|
||
.and_then(|link_str| {
|
||
// Parse Link header format: <url>; rel="next", <url>; rel="last"
|
||
for part in link_str.split(',') {
|
||
let part = part.trim();
|
||
if part.contains("rel=\"next\"") || part.contains("rel=next") {
|
||
// Extract URL between < and >
|
||
if let Some(start) = part.find('<') {
|
||
if let Some(end) = part.find('>') {
|
||
return Some(part[start + 1..end].to_string());
|
||
}
|
||
}
|
||
}
|
||
}
|
||
None
|
||
})
|
||
}
|
||
|
||
/// Fetch a single page of merge requests for a project.
|
||
/// Returns the items plus pagination metadata for page-aware cursor updates.
|
||
pub async fn fetch_merge_requests_page(
|
||
&self,
|
||
gitlab_project_id: i64,
|
||
updated_after: Option<i64>,
|
||
cursor_rewind_seconds: u32,
|
||
page: u32,
|
||
per_page: u32,
|
||
) -> Result<MergeRequestPage> {
|
||
let mut params = vec![
|
||
("scope", "all".to_string()),
|
||
("state", "all".to_string()),
|
||
("order_by", "updated_at".to_string()),
|
||
("sort", "asc".to_string()),
|
||
("per_page", per_page.to_string()),
|
||
("page", page.to_string()),
|
||
];
|
||
|
||
if let Some(ts) = updated_after {
|
||
// Apply cursor rewind for safety, clamping to 0 to avoid underflow
|
||
let rewind_ms = (cursor_rewind_seconds as i64) * 1000;
|
||
let rewound = (ts - rewind_ms).max(0);
|
||
if let Some(dt) = chrono::DateTime::from_timestamp_millis(rewound) {
|
||
params.push(("updated_after", dt.to_rfc3339()));
|
||
}
|
||
}
|
||
|
||
let (items, headers) = self
|
||
.request_with_headers::<Vec<GitLabMergeRequest>>(
|
||
&format!("/api/v4/projects/{gitlab_project_id}/merge_requests"),
|
||
¶ms,
|
||
)
|
||
.await?;
|
||
|
||
// Parse pagination with fallback chain:
|
||
// 1. Link header (most reliable, RFC 8288 compliant)
|
||
// 2. x-next-page header (GitLab-specific)
|
||
// 3. Full page heuristic (fallback for proxies that strip headers)
|
||
let link_next = parse_link_header_next(&headers);
|
||
let x_next_page = headers
|
||
.get("x-next-page")
|
||
.and_then(|v| v.to_str().ok())
|
||
.and_then(|s| s.parse::<u32>().ok());
|
||
|
||
let next_page = match (link_next, x_next_page, items.len() as u32 == per_page) {
|
||
(Some(_), _, _) => Some(page + 1), // Link header present: continue
|
||
(None, Some(np), _) => Some(np), // x-next-page present: use it
|
||
(None, None, true) => Some(page + 1), // Full page, no headers: try next
|
||
(None, None, false) => None, // Partial page: we're done
|
||
};
|
||
|
||
let is_last_page = items.is_empty() || next_page.is_none();
|
||
|
||
Ok(MergeRequestPage {
|
||
items,
|
||
next_page,
|
||
is_last_page,
|
||
})
|
||
}
|
||
|
||
/// Paginate through merge requests for a project.
|
||
/// Returns a stream of MRs that handles pagination automatically.
|
||
/// Note: For page-aware cursor updates, prefer fetch_merge_requests_page directly.
|
||
pub fn paginate_merge_requests(
|
||
&self,
|
||
gitlab_project_id: i64,
|
||
updated_after: Option<i64>,
|
||
cursor_rewind_seconds: u32,
|
||
) -> Pin<Box<dyn Stream<Item = Result<GitLabMergeRequest>> + Send + '_>> {
|
||
Box::pin(async_stream::try_stream! {
|
||
let mut page = 1u32;
|
||
let per_page = 100u32;
|
||
|
||
loop {
|
||
let page_result = self.fetch_merge_requests_page(
|
||
gitlab_project_id,
|
||
updated_after,
|
||
cursor_rewind_seconds,
|
||
page,
|
||
per_page,
|
||
).await?;
|
||
|
||
for mr in page_result.items {
|
||
yield mr;
|
||
}
|
||
|
||
if page_result.is_last_page {
|
||
break;
|
||
}
|
||
|
||
if let Some(np) = page_result.next_page {
|
||
page = np;
|
||
} else {
|
||
break;
|
||
}
|
||
}
|
||
})
|
||
}
|
||
|
||
/// Paginate through discussions for a merge request.
|
||
pub fn paginate_mr_discussions(
|
||
&self,
|
||
gitlab_project_id: i64,
|
||
mr_iid: i64,
|
||
) -> Pin<Box<dyn Stream<Item = Result<GitLabDiscussion>> + Send + '_>> {
|
||
Box::pin(async_stream::try_stream! {
|
||
let mut page = 1u32;
|
||
let per_page = 100u32;
|
||
|
||
loop {
|
||
let params = vec![
|
||
("per_page", per_page.to_string()),
|
||
("page", page.to_string()),
|
||
];
|
||
|
||
let (discussions, headers) = self
|
||
.request_with_headers::<Vec<GitLabDiscussion>>(
|
||
&format!("/api/v4/projects/{gitlab_project_id}/merge_requests/{mr_iid}/discussions"),
|
||
¶ms,
|
||
)
|
||
.await?;
|
||
|
||
for discussion in discussions.iter() {
|
||
yield discussion.clone();
|
||
}
|
||
|
||
// Robust fallback if pagination headers are missing
|
||
let next_page_hdr = headers
|
||
.get("x-next-page")
|
||
.and_then(|v| v.to_str().ok())
|
||
.and_then(|s| s.parse::<u32>().ok());
|
||
|
||
let next_page = match (next_page_hdr, discussions.len() as u32 == per_page) {
|
||
(Some(np), _) => Some(np), // Header present: trust it
|
||
(None, true) => Some(page + 1), // Full page, no header: try next
|
||
(None, false) => None, // Partial page: we're done
|
||
};
|
||
|
||
match next_page {
|
||
Some(np) if !discussions.is_empty() => page = np,
|
||
_ => break,
|
||
}
|
||
}
|
||
})
|
||
}
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## Orchestration: Dependent Discussion Sync
|
||
|
||
### Canonical Pattern (CP2)
|
||
|
||
When `gi ingest --type=merge_requests` runs, it follows this orchestration (mirroring CP1 pattern):
|
||
|
||
1. **Ingest merge requests** (cursor-based, with incremental cursor updates per page)
|
||
|
||
2. **Select MRs needing discussion sync (DB-driven)** - After MR ingestion completes, query:
|
||
```sql
|
||
SELECT id, iid, updated_at
|
||
FROM merge_requests
|
||
WHERE project_id = ?
|
||
AND (discussions_synced_for_updated_at IS NULL
|
||
OR updated_at > discussions_synced_for_updated_at)
|
||
ORDER BY updated_at ASC;
|
||
```
|
||
This is more scalable and debuggable than in-memory collection. The DB is the source of truth.
|
||
|
||
3. **Execute discussion sync** with bounded concurrency (`dependent_concurrency` from config)
|
||
|
||
4. **Update watermark** - After each MR's discussions are successfully ingested *with no partial failures*:
|
||
```sql
|
||
UPDATE merge_requests SET discussions_synced_for_updated_at = ? WHERE id = ?
|
||
```
|
||
|
||
**Invariant:** A rerun MUST NOT refetch discussions for MRs whose `updated_at` has not advanced, even with cursor rewind. Watermark is NOT advanced if any failure occurs (HTTP pagination failure OR note parse failure).
|
||
|
||
---
|
||
|
||
## Ingestion Logic
|
||
|
||
### MR Ingestion
|
||
|
||
```rust
|
||
// src/ingestion/merge_requests.rs
|
||
|
||
use rusqlite::Connection;
|
||
use tracing::{debug, info};
|
||
|
||
use crate::core::config::Config;
|
||
use crate::core::error::Result;
|
||
use crate::core::payloads::store_payload;
|
||
use crate::core::time::now_ms;
|
||
use crate::gitlab::client::GitLabClient;
|
||
use crate::gitlab::transformers::merge_request::{transform_merge_request, extract_labels};
|
||
|
||
/// Result of MR ingestion.
|
||
#[derive(Debug, Default)]
|
||
pub struct IngestMergeRequestsResult {
|
||
pub fetched: usize,
|
||
pub upserted: usize,
|
||
pub labels_created: usize,
|
||
pub assignees_linked: usize,
|
||
pub reviewers_linked: usize,
|
||
// Discussion sync is DB-driven, not in-memory collection.
|
||
// Use query: SELECT id, iid FROM merge_requests WHERE updated_at > discussions_synced_for_updated_at
|
||
// This avoids memory growth for large projects.
|
||
}
|
||
|
||
/// Info needed to sync discussions for an MR.
|
||
#[derive(Debug, Clone)]
|
||
pub struct MrForDiscussionSync {
|
||
pub local_mr_id: i64,
|
||
pub iid: i64,
|
||
pub updated_at: i64,
|
||
}
|
||
|
||
/// Result of upserting a merge request, including previous sync state.
|
||
#[derive(Debug)]
|
||
pub struct UpsertMergeRequestResult {
|
||
pub local_mr_id: i64,
|
||
pub changes: usize,
|
||
pub previous_discussions_synced_at: Option<i64>,
|
||
}
|
||
|
||
/// Ingest merge requests for a project using page-based cursor updates.
|
||
/// Discussion sync eligibility is determined via DB query AFTER ingestion completes.
|
||
pub async fn ingest_merge_requests(
|
||
conn: &Connection,
|
||
client: &GitLabClient,
|
||
config: &Config,
|
||
project_id: i64, // Local DB project ID
|
||
gitlab_project_id: i64, // GitLab project ID
|
||
full_sync: bool, // Reset cursor if true
|
||
) -> Result<IngestMergeRequestsResult> {
|
||
let mut result = IngestMergeRequestsResult::default();
|
||
|
||
// Reset cursor if full sync requested
|
||
if full_sync {
|
||
reset_cursor(conn, project_id, "merge_requests")?;
|
||
// Also reset discussion watermarks to force re-fetch of all discussions
|
||
// This ensures --full truly fetches everything, not just MRs
|
||
reset_discussion_watermarks(conn, project_id)?;
|
||
}
|
||
|
||
// Get current cursor
|
||
let cursor = get_cursor(conn, project_id)?;
|
||
let cursor_updated_at = cursor.0;
|
||
let cursor_gitlab_id = cursor.1;
|
||
|
||
let mut last_updated_at: Option<i64> = None;
|
||
let mut last_gitlab_id: Option<i64> = None;
|
||
let mut page = 1u32;
|
||
let per_page = 100u32;
|
||
|
||
// Page-based iteration for proper cursor boundary tracking
|
||
loop {
|
||
let page_result = client.fetch_merge_requests_page(
|
||
gitlab_project_id,
|
||
cursor_updated_at,
|
||
config.sync.cursor_rewind_seconds,
|
||
page,
|
||
per_page,
|
||
).await?;
|
||
|
||
if page_result.items.is_empty() {
|
||
break;
|
||
}
|
||
|
||
for mr in &page_result.items {
|
||
result.fetched += 1;
|
||
|
||
let mr_updated_at = crate::core::time::iso_to_ms(&mr.updated_at)
|
||
.ok_or_else(|| crate::core::error::GiError::ParseError {
|
||
field: "updated_at".to_string(),
|
||
value: mr.updated_at.clone(),
|
||
})?;
|
||
|
||
// Apply cursor filtering for tuple semantics
|
||
if let (Some(cursor_ts), Some(cursor_id)) = (cursor_updated_at, cursor_gitlab_id) {
|
||
if mr_updated_at < cursor_ts {
|
||
continue;
|
||
}
|
||
if mr_updated_at == cursor_ts && mr.id <= cursor_id {
|
||
continue;
|
||
}
|
||
}
|
||
|
||
// Begin transaction for this MR (atomicity + performance)
|
||
let tx = conn.unchecked_transaction()?;
|
||
|
||
// Store raw payload
|
||
let payload_id = store_payload(
|
||
&tx,
|
||
project_id,
|
||
"merge_request",
|
||
&mr.id.to_string(),
|
||
&mr,
|
||
config.storage.compress_raw_payloads,
|
||
)?;
|
||
|
||
// Transform and upsert MR
|
||
let transformed = transform_merge_request(&mr, project_id)
|
||
.map_err(|e| crate::core::error::GiError::ParseError {
|
||
field: "merge_request".to_string(),
|
||
value: e,
|
||
})?;
|
||
|
||
let upsert_result = upsert_merge_request(
|
||
&tx,
|
||
&transformed.merge_request,
|
||
payload_id,
|
||
)?;
|
||
|
||
if upsert_result.changes > 0 {
|
||
result.upserted += 1;
|
||
}
|
||
|
||
let local_mr_id = upsert_result.local_mr_id;
|
||
|
||
// Discussion sync eligibility is determined AFTER ingestion via DB query:
|
||
// SELECT id, iid FROM merge_requests WHERE updated_at > discussions_synced_for_updated_at
|
||
// This avoids memory growth for large projects.
|
||
|
||
// Clear existing label links (ensures removed labels are unlinked)
|
||
clear_mr_labels(&tx, local_mr_id)?;
|
||
|
||
// Extract and upsert labels (name-only, same as issues)
|
||
let labels = extract_labels(&mr, project_id);
|
||
for label in &labels {
|
||
let created = upsert_label(&tx, label)?;
|
||
if created {
|
||
result.labels_created += 1;
|
||
}
|
||
|
||
// Link MR to label
|
||
let label_id = get_label_id(&tx, project_id, &label.name)?;
|
||
link_mr_label(&tx, local_mr_id, label_id)?;
|
||
}
|
||
|
||
// Clear and relink assignees
|
||
clear_mr_assignees(&tx, local_mr_id)?;
|
||
for username in &transformed.assignee_usernames {
|
||
upsert_mr_assignee(&tx, local_mr_id, username)?;
|
||
result.assignees_linked += 1;
|
||
}
|
||
|
||
// Clear and relink reviewers
|
||
clear_mr_reviewers(&tx, local_mr_id)?;
|
||
for username in &transformed.reviewer_usernames {
|
||
upsert_mr_reviewer(&tx, local_mr_id, username)?;
|
||
result.reviewers_linked += 1;
|
||
}
|
||
|
||
tx.commit()?;
|
||
|
||
// Track for cursor update
|
||
last_updated_at = Some(mr_updated_at);
|
||
last_gitlab_id = Some(mr.id);
|
||
}
|
||
|
||
// Page-boundary cursor flush (not based on fetched % 100)
|
||
// This ensures cursor reflects actual processed page boundaries
|
||
if let (Some(updated_at), Some(gitlab_id)) = (last_updated_at, last_gitlab_id) {
|
||
update_cursor(conn, project_id, "merge_requests", updated_at, gitlab_id)?;
|
||
}
|
||
|
||
// Check for more pages
|
||
if page_result.is_last_page {
|
||
break;
|
||
}
|
||
|
||
match page_result.next_page {
|
||
Some(np) => page = np,
|
||
None => break,
|
||
}
|
||
}
|
||
|
||
// Final cursor update (in case we exited mid-loop)
|
||
if let (Some(updated_at), Some(gitlab_id)) = (last_updated_at, last_gitlab_id) {
|
||
update_cursor(conn, project_id, "merge_requests", updated_at, gitlab_id)?;
|
||
}
|
||
|
||
info!(
|
||
project_id,
|
||
fetched = result.fetched,
|
||
upserted = result.upserted,
|
||
labels_created = result.labels_created,
|
||
assignees_linked = result.assignees_linked,
|
||
reviewers_linked = result.reviewers_linked,
|
||
"MR ingestion complete"
|
||
);
|
||
|
||
Ok(result)
|
||
}
|
||
|
||
/// Upsert a merge request and return sync state for inline discussion decision.
|
||
fn upsert_merge_request(
|
||
conn: &Connection,
|
||
mr: &NormalizedMergeRequest,
|
||
payload_id: Option<i64>,
|
||
) -> Result<UpsertMergeRequestResult> {
|
||
// Perform upsert
|
||
let changes = conn.execute(
|
||
"INSERT INTO merge_requests (
|
||
gitlab_id, project_id, iid, title, description, state, draft,
|
||
author_username, source_branch, target_branch,
|
||
head_sha, references_short, references_full,
|
||
detailed_merge_status, merge_user_username,
|
||
created_at, updated_at, merged_at, closed_at, last_seen_at,
|
||
web_url, raw_payload_id
|
||
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22)
|
||
ON CONFLICT(gitlab_id) DO UPDATE SET
|
||
title = excluded.title,
|
||
description = excluded.description,
|
||
state = excluded.state,
|
||
draft = excluded.draft,
|
||
head_sha = excluded.head_sha,
|
||
references_short = excluded.references_short,
|
||
references_full = excluded.references_full,
|
||
detailed_merge_status = excluded.detailed_merge_status,
|
||
merge_user_username = excluded.merge_user_username,
|
||
updated_at = excluded.updated_at,
|
||
merged_at = excluded.merged_at,
|
||
closed_at = excluded.closed_at,
|
||
last_seen_at = excluded.last_seen_at,
|
||
raw_payload_id = excluded.raw_payload_id",
|
||
rusqlite::params![
|
||
mr.gitlab_id, mr.project_id, mr.iid, mr.title, mr.description,
|
||
mr.state, mr.draft as i32,
|
||
mr.author_username, mr.source_branch, mr.target_branch,
|
||
mr.head_sha, mr.references_short, mr.references_full,
|
||
mr.detailed_merge_status, mr.merge_user_username,
|
||
mr.created_at, mr.updated_at, mr.merged_at, mr.closed_at, mr.last_seen_at,
|
||
mr.web_url, payload_id,
|
||
],
|
||
)?;
|
||
|
||
// Get local ID (either existing or newly inserted)
|
||
let local_mr_id = conn.query_row(
|
||
"SELECT id FROM merge_requests WHERE gitlab_id = ?",
|
||
[mr.gitlab_id],
|
||
|row| row.get(0),
|
||
).expect("MR must exist after upsert");
|
||
|
||
Ok(UpsertMergeRequestResult {
|
||
local_mr_id,
|
||
changes,
|
||
previous_discussions_synced_at: None,
|
||
})
|
||
}
|
||
|
||
// Database helper functions follow the same pattern as issues.rs
|
||
// (get_cursor, update_cursor, reset_cursor, upsert_merge_request, etc.)
|
||
|
||
/// Reset discussion watermarks for all MRs in a project.
|
||
/// Called when --full sync is requested to force re-fetch of all discussions.
|
||
fn reset_discussion_watermarks(conn: &Connection, project_id: i64) -> Result<()> {
|
||
conn.execute(
|
||
"UPDATE merge_requests
|
||
SET discussions_synced_for_updated_at = NULL,
|
||
discussions_sync_attempts = 0,
|
||
discussions_sync_last_error = NULL
|
||
WHERE project_id = ?",
|
||
[project_id],
|
||
)?;
|
||
Ok(())
|
||
}
|
||
|
||
/// Record sync health telemetry for debugging failed discussion syncs.
|
||
fn record_sync_health_error(conn: &Connection, mr_id: i64, error: &str) -> Result<()> {
|
||
conn.execute(
|
||
"UPDATE merge_requests SET
|
||
discussions_sync_last_attempt_at = ?,
|
||
discussions_sync_attempts = COALESCE(discussions_sync_attempts, 0) + 1,
|
||
discussions_sync_last_error = ?
|
||
WHERE id = ?",
|
||
rusqlite::params![now_ms(), error, mr_id],
|
||
)?;
|
||
Ok(())
|
||
}
|
||
|
||
/// Clear sync health error on successful sync.
|
||
fn clear_sync_health_error(conn: &Connection, mr_id: i64) -> Result<()> {
|
||
conn.execute(
|
||
"UPDATE merge_requests SET
|
||
discussions_sync_last_attempt_at = ?,
|
||
discussions_sync_attempts = 0,
|
||
discussions_sync_last_error = NULL
|
||
WHERE id = ?",
|
||
rusqlite::params![now_ms(), mr_id],
|
||
)?;
|
||
Ok(())
|
||
}
|
||
```
|
||
|
||
### MR Discussion Ingestion
|
||
|
||
```rust
|
||
// src/ingestion/mr_discussions.rs
|
||
|
||
use futures::StreamExt;
|
||
use rusqlite::Connection;
|
||
use tracing::{debug, warn};
|
||
|
||
use crate::core::config::Config;
|
||
use crate::core::error::Result;
|
||
use crate::core::payloads::store_payload;
|
||
use crate::core::time::now_ms;
|
||
use crate::gitlab::client::GitLabClient;
|
||
use crate::gitlab::transformers::discussion::{
|
||
transform_mr_discussion,
|
||
transform_notes_with_diff_position,
|
||
};
|
||
|
||
/// Result of discussion ingestion for a single MR.
|
||
#[derive(Debug, Default)]
|
||
pub struct IngestMrDiscussionsResult {
|
||
pub discussions_fetched: usize,
|
||
pub discussions_upserted: usize,
|
||
pub notes_upserted: usize,
|
||
pub notes_skipped_bad_timestamp: usize,
|
||
pub system_notes_count: usize,
|
||
pub diffnotes_count: usize,
|
||
pub pagination_succeeded: bool,
|
||
}
|
||
|
||
/// Ingest discussions for a single MR.
|
||
/// Called only when mr.updated_at > discussions_synced_for_updated_at.
|
||
///
|
||
/// CRITICAL INVARIANTS:
|
||
/// 1. Watermark is ONLY advanced if pagination completes successfully AND all notes parse successfully.
|
||
/// 2. Notes are parsed BEFORE any destructive DB operations to prevent data loss.
|
||
/// 3. Stale discussions are removed via last_seen_at sweep (not in-memory ID collection).
|
||
pub async fn ingest_mr_discussions(
|
||
conn: &Connection,
|
||
client: &GitLabClient,
|
||
config: &Config,
|
||
project_id: i64,
|
||
gitlab_project_id: i64,
|
||
mr_iid: i64,
|
||
local_mr_id: i64,
|
||
mr_updated_at: i64,
|
||
) -> Result<IngestMrDiscussionsResult> {
|
||
let mut result = IngestMrDiscussionsResult {
|
||
pagination_succeeded: true, // Assume success, set false on error
|
||
..Default::default()
|
||
};
|
||
|
||
// Record sync start time for last_seen_at sweep
|
||
let run_seen_at = now_ms();
|
||
|
||
let mut stream = client.paginate_mr_discussions(gitlab_project_id, mr_iid);
|
||
|
||
while let Some(discussion_result) = stream.next().await {
|
||
let discussion = match discussion_result {
|
||
Ok(d) => d,
|
||
Err(e) => {
|
||
// Log error and mark pagination as failed
|
||
warn!(
|
||
project_id,
|
||
mr_iid,
|
||
error = %e,
|
||
"Error fetching MR discussion page"
|
||
);
|
||
result.pagination_succeeded = false;
|
||
break;
|
||
}
|
||
};
|
||
result.discussions_fetched += 1;
|
||
|
||
// CRITICAL: Parse/transform notes BEFORE any destructive DB operations.
|
||
// If parsing fails, we preserve prior data and treat as partial failure (no watermark advance).
|
||
let notes = match transform_notes_with_diff_position(&discussion, project_id) {
|
||
Ok(notes) => notes,
|
||
Err(e) => {
|
||
warn!(
|
||
project_id,
|
||
mr_iid,
|
||
discussion_id = %discussion.id,
|
||
error = %e,
|
||
"Note transform failed; preserving existing notes; MR watermark will NOT advance"
|
||
);
|
||
result.notes_skipped_bad_timestamp += discussion.notes.len();
|
||
result.pagination_succeeded = false;
|
||
continue;
|
||
}
|
||
};
|
||
|
||
// Begin transaction for this discussion (only AFTER parsing succeeded)
|
||
let tx = conn.unchecked_transaction()?;
|
||
|
||
// Store raw payload for discussion
|
||
let discussion_payload_id = store_payload(
|
||
&tx,
|
||
project_id,
|
||
"discussion",
|
||
&discussion.id,
|
||
&discussion,
|
||
config.storage.compress_raw_payloads,
|
||
)?;
|
||
|
||
// Transform and upsert discussion (MR context) with run_seen_at timestamp
|
||
let mut normalized = transform_mr_discussion(&discussion, project_id, local_mr_id);
|
||
normalized.last_seen_at = run_seen_at;
|
||
upsert_discussion(&tx, &normalized, discussion_payload_id)?;
|
||
result.discussions_upserted += 1;
|
||
|
||
// Get local discussion ID
|
||
let local_discussion_id = get_local_discussion_id(&tx, project_id, &discussion.id)?;
|
||
|
||
// Upsert notes instead of delete-all-then-insert
|
||
// This reduces write amplification and preserves any local metadata
|
||
for note in ¬es {
|
||
// Selective raw payload storage for notes:
|
||
// - Store for DiffNotes (position metadata useful for file-context queries)
|
||
// - Store for non-system notes (user-generated content)
|
||
// - Skip for system notes (numerous, low-value, discussion payload has them)
|
||
let should_store_note_payload =
|
||
!note.is_system() ||
|
||
note.position_new_path().is_some() ||
|
||
note.position_old_path().is_some();
|
||
|
||
let note_payload_id = if should_store_note_payload {
|
||
let gitlab_note = discussion.notes.iter().find(|n| n.id() == note.gitlab_id());
|
||
if let Some(gn) = gitlab_note {
|
||
store_payload(
|
||
&tx,
|
||
project_id,
|
||
"note",
|
||
¬e.gitlab_id().to_string(),
|
||
gn,
|
||
config.storage.compress_raw_payloads,
|
||
)?
|
||
} else {
|
||
None
|
||
}
|
||
} else {
|
||
None
|
||
};
|
||
|
||
// Upsert note with run_seen_at timestamp for sweep
|
||
upsert_note(&tx, local_discussion_id, note, note_payload_id, run_seen_at)?;
|
||
result.notes_upserted += 1;
|
||
|
||
if note.is_system() {
|
||
result.system_notes_count += 1;
|
||
}
|
||
|
||
// Count DiffNotes
|
||
if note.position_new_path().is_some() || note.position_old_path().is_some() {
|
||
result.diffnotes_count += 1;
|
||
}
|
||
}
|
||
|
||
tx.commit()?;
|
||
}
|
||
|
||
// Remove stale discussions AND notes via last_seen_at sweep (only if pagination completed successfully)
|
||
// This is simpler and more scalable than collecting seen IDs in memory
|
||
if result.pagination_succeeded {
|
||
// Sweep stale discussions
|
||
conn.execute(
|
||
"DELETE FROM discussions
|
||
WHERE project_id = ? AND merge_request_id = ?
|
||
AND last_seen_at < ?",
|
||
rusqlite::params![project_id, local_mr_id, run_seen_at],
|
||
)?;
|
||
|
||
// Sweep stale notes for this MR's discussions
|
||
// Notes inherit staleness from their discussion, but we also sweep at note level
|
||
// for cases where notes are removed from an existing discussion
|
||
conn.execute(
|
||
"DELETE FROM notes
|
||
WHERE discussion_id IN (
|
||
SELECT id FROM discussions
|
||
WHERE project_id = ? AND merge_request_id = ?
|
||
)
|
||
AND last_seen_at < ?",
|
||
rusqlite::params![project_id, local_mr_id, run_seen_at],
|
||
)?;
|
||
}
|
||
|
||
// CRITICAL: Only advance watermark if pagination succeeded completely AND no parse failures.
|
||
// If we advance on partial failure, we permanently lose data for this MR version.
|
||
if result.pagination_succeeded {
|
||
mark_discussions_synced(conn, local_mr_id, mr_updated_at)?;
|
||
// Clear sync health error on success
|
||
clear_sync_health_error(conn, local_mr_id)?;
|
||
} else {
|
||
// Record sync health telemetry for debugging
|
||
record_sync_health_error(
|
||
conn,
|
||
local_mr_id,
|
||
"Pagination incomplete or parse failure; will retry on next sync"
|
||
)?;
|
||
warn!(
|
||
project_id,
|
||
mr_iid,
|
||
local_mr_id,
|
||
"Discussion sync incomplete; watermark NOT advanced (will retry on next sync)"
|
||
);
|
||
}
|
||
|
||
debug!(
|
||
project_id,
|
||
mr_iid,
|
||
discussions = result.discussions_fetched,
|
||
notes = result.notes_upserted,
|
||
diffnotes = result.diffnotes_count,
|
||
pagination_succeeded = result.pagination_succeeded,
|
||
"MR discussions ingested"
|
||
);
|
||
|
||
Ok(result)
|
||
}
|
||
|
||
fn mark_discussions_synced(conn: &Connection, mr_id: i64, mr_updated_at: i64) -> Result<()> {
|
||
conn.execute(
|
||
"UPDATE merge_requests SET discussions_synced_for_updated_at = ? WHERE id = ?",
|
||
rusqlite::params![mr_updated_at, mr_id],
|
||
)?;
|
||
Ok(())
|
||
}
|
||
|
||
/// Record sync health telemetry for debugging failed discussion syncs.
|
||
fn record_sync_health_error(conn: &Connection, mr_id: i64, error: &str) -> Result<()> {
|
||
conn.execute(
|
||
"UPDATE merge_requests SET
|
||
discussions_sync_last_attempt_at = ?,
|
||
discussions_sync_attempts = COALESCE(discussions_sync_attempts, 0) + 1,
|
||
discussions_sync_last_error = ?
|
||
WHERE id = ?",
|
||
rusqlite::params![now_ms(), error, mr_id],
|
||
)?;
|
||
Ok(())
|
||
}
|
||
|
||
/// Clear sync health error on successful sync.
|
||
fn clear_sync_health_error(conn: &Connection, mr_id: i64) -> Result<()> {
|
||
conn.execute(
|
||
"UPDATE merge_requests SET
|
||
discussions_sync_last_attempt_at = ?,
|
||
discussions_sync_attempts = 0,
|
||
discussions_sync_last_error = NULL
|
||
WHERE id = ?",
|
||
rusqlite::params![now_ms(), mr_id],
|
||
)?;
|
||
Ok(())
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## CLI Commands
|
||
|
||
### `gi ingest --type=merge_requests`
|
||
|
||
Fetch and store all MRs from configured projects.
|
||
|
||
**Implementation Updates:**
|
||
|
||
```rust
|
||
// src/cli/commands/ingest.rs
|
||
|
||
/// Run the ingest command.
|
||
pub async fn run_ingest(
|
||
config: &Config,
|
||
resource_type: &str,
|
||
project_filter: Option<&str>,
|
||
force: bool,
|
||
full: bool,
|
||
) -> Result<IngestResult> {
|
||
match resource_type {
|
||
"issues" => {
|
||
// Existing CP1 implementation
|
||
run_issue_ingest(config, project_filter, force, full).await
|
||
}
|
||
"merge_requests" => {
|
||
// NEW: CP2 implementation
|
||
run_mr_ingest(config, project_filter, force, full).await
|
||
}
|
||
_ => Err(GiError::InvalidArgument {
|
||
name: "type".to_string(),
|
||
value: resource_type.to_string(),
|
||
expected: "issues or merge_requests".to_string(),
|
||
}),
|
||
}
|
||
}
|
||
```
|
||
|
||
**Output:**
|
||
```
|
||
Ingesting merge requests...
|
||
|
||
group/project-one: 567 MRs fetched, 12 new labels, 89 assignees, 45 reviewers
|
||
|
||
Fetching discussions (123 MRs with updates)...
|
||
|
||
group/project-one: 123 MRs -> 456 discussions, 1,234 notes (89 DiffNotes)
|
||
|
||
Total: 567 MRs, 456 discussions, 1,234 notes (excluding 2,345 system notes)
|
||
Skipped discussion sync for 444 unchanged MRs.
|
||
```
|
||
|
||
### `gi list mrs`
|
||
|
||
**Output:**
|
||
```
|
||
Merge Requests (showing 20 of 1,234)
|
||
|
||
!847 Refactor auth to use JWT tokens merged @johndoe main <- feature/jwt 3 days ago
|
||
!846 Fix memory leak in websocket handler opened @janedoe main <- fix/websocket 5 days ago
|
||
!845 [DRAFT] Add dark mode CSS variables opened @bobsmith main <- ui/dark-mode 1 week ago
|
||
!844 Update dependencies closed @alice main <- chore/deps 1 week ago
|
||
...
|
||
```
|
||
|
||
**Filter Options (same pattern as issues):**
|
||
- `--state [opened|merged|closed|locked|all]`
|
||
- `--draft` (MR-specific: show only draft MRs)
|
||
- `--no-draft` (MR-specific: exclude draft MRs)
|
||
- `--author <username>`
|
||
- `--assignee <username>`
|
||
- `--reviewer <username>` (MR-specific)
|
||
- `--target-branch <branch>` (MR-specific)
|
||
- `--source-branch <branch>` (MR-specific)
|
||
- `--label <name>` (repeatable)
|
||
- `--project <path>`
|
||
- `--limit <n>`
|
||
- `--since <duration|date>`
|
||
- `--json`
|
||
- `--open` (open in browser)
|
||
|
||
### `gi show mr <iid>`
|
||
|
||
**Output:**
|
||
```
|
||
Merge Request !847: Refactor auth to use JWT tokens
|
||
================================================================================
|
||
|
||
Project: group/project-one
|
||
State: merged
|
||
Draft: No
|
||
Author: @johndoe
|
||
Assignees: @janedoe, @bobsmith
|
||
Reviewers: @alice, @charlie
|
||
Source: feature/jwt
|
||
Target: main
|
||
Merge Status: mergeable (detailed_merge_status)
|
||
Merged By: @alice
|
||
Merged At: 2024-03-20 14:30:00
|
||
Created: 2024-03-15
|
||
Updated: 2024-03-20
|
||
Labels: enhancement, auth, reviewed
|
||
URL: https://gitlab.example.com/group/project-one/-/merge_requests/847
|
||
|
||
Description:
|
||
Moving away from session cookies to JWT-based authentication for better
|
||
mobile client support and API consumption...
|
||
|
||
Discussions (8):
|
||
|
||
@janedoe (2024-03-16) [src/auth/jwt.ts:45]:
|
||
Should we use a separate signing key for refresh tokens?
|
||
|
||
@johndoe (2024-03-16):
|
||
Good point. I'll add a separate key with rotation support.
|
||
|
||
@alice (2024-03-18) [RESOLVED]:
|
||
Looks good! Just one nit about the token expiry constant.
|
||
```
|
||
|
||
### `gi count mrs`
|
||
|
||
**Output:**
|
||
```
|
||
Merge Requests: 1,234
|
||
opened: 89
|
||
merged: 1,045
|
||
closed: 100
|
||
```
|
||
|
||
### `gi count discussions --type=mr`
|
||
|
||
**Output:**
|
||
```
|
||
MR Discussions: 4,567
|
||
```
|
||
|
||
### `gi count notes --type=mr`
|
||
|
||
**Output:**
|
||
```
|
||
MR Notes: 12,345 (excluding 2,345 system notes)
|
||
DiffNotes: 3,456 (with file position metadata)
|
||
```
|
||
|
||
---
|
||
|
||
## Automated Tests
|
||
|
||
### Unit Tests
|
||
|
||
```rust
|
||
// tests/mr_transformer_tests.rs
|
||
|
||
#[cfg(test)]
|
||
mod tests {
|
||
use gi::gitlab::transformers::merge_request::*;
|
||
use gi::gitlab::types::*;
|
||
|
||
#[test]
|
||
fn transforms_gitlab_mr_to_normalized_schema() { /* ... */ }
|
||
|
||
#[test]
|
||
fn extracts_labels_from_mr_payloads() { /* ... */ }
|
||
|
||
#[test]
|
||
fn extracts_assignees_from_mr_payloads() { /* ... */ }
|
||
|
||
#[test]
|
||
fn extracts_reviewers_from_mr_payloads() { /* ... */ }
|
||
|
||
#[test]
|
||
fn handles_missing_optional_fields_gracefully() { /* ... */ }
|
||
|
||
#[test]
|
||
fn converts_merged_at_and_closed_at_timestamps() { /* ... */ }
|
||
|
||
#[test]
|
||
fn handles_mrs_with_no_labels_assignees_reviewers() { /* ... */ }
|
||
}
|
||
```
|
||
|
||
```rust
|
||
// tests/diffnote_tests.rs
|
||
|
||
#[cfg(test)]
|
||
mod tests {
|
||
use gi::gitlab::transformers::discussion::*;
|
||
|
||
#[test]
|
||
fn extracts_diffnote_position_metadata() { /* ... */ }
|
||
|
||
#[test]
|
||
fn handles_notes_without_position() { /* ... */ }
|
||
|
||
#[test]
|
||
fn extracts_old_path_and_new_path() { /* ... */ }
|
||
|
||
#[test]
|
||
fn extracts_line_numbers() { /* ... */ }
|
||
|
||
#[test]
|
||
fn handles_renamed_files_in_diffnote() { /* ... */ }
|
||
}
|
||
```
|
||
|
||
```rust
|
||
// tests/mr_discussion_tests.rs
|
||
|
||
#[cfg(test)]
|
||
mod tests {
|
||
#[test]
|
||
fn transforms_mr_discussion_with_correct_noteable_type() { /* ... */ }
|
||
|
||
#[test]
|
||
fn computes_resolved_state_for_mr_discussions() { /* ... */ }
|
||
|
||
#[test]
|
||
fn links_discussion_to_merge_request_id() { /* ... */ }
|
||
|
||
#[test]
|
||
fn handles_individual_note_mr_discussions() { /* ... */ }
|
||
}
|
||
```
|
||
|
||
### Integration Tests
|
||
|
||
```rust
|
||
// tests/mr_ingestion_tests.rs
|
||
|
||
#[cfg(test)]
|
||
mod tests {
|
||
use tempfile::TempDir;
|
||
use wiremock::{MockServer, Mock, ResponseTemplate};
|
||
use wiremock::matchers::{method, path_regex};
|
||
|
||
#[tokio::test]
|
||
async fn inserts_mrs_into_database() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn creates_labels_from_mr_payloads() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn links_mrs_to_labels_via_junction_table() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn removes_stale_label_links_on_resync() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn creates_assignee_links() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn creates_reviewer_links() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn stores_raw_payload_for_each_mr() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn updates_cursor_at_page_boundaries() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn resumes_from_cursor_on_subsequent_runs() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn full_sync_resets_cursor() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn full_sync_resets_discussion_watermarks() {
|
||
// Setup: Create MR with discussions_synced_for_updated_at set
|
||
// Action: Run ingest with --full flag
|
||
// Assert: discussions_synced_for_updated_at is NULL after sync
|
||
// Assert: Discussions are re-fetched for this MR
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn handles_mrs_with_no_labels_assignees_reviewers() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn upserts_existing_mrs_on_refetch() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn skips_discussion_refetch_for_unchanged_mrs() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn captures_diffnote_position_metadata() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn removes_stale_discussions_after_successful_pagination() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn captures_draft_status_correctly() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn uses_detailed_merge_status_over_deprecated_field() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn uses_merge_user_over_deprecated_merged_by() { /* ... */ }
|
||
|
||
// CRITICAL: Verify preference ordering when BOTH deprecated and current fields are present.
|
||
// GitLab may return both for backward compatibility, and we must always prefer the current field.
|
||
#[tokio::test]
|
||
async fn prefers_detailed_merge_status_when_both_fields_present() {
|
||
// Setup: MR payload with BOTH detailed_merge_status AND merge_status
|
||
// Mock: Return MR with detailed_merge_status="discussions_not_resolved" AND merge_status="can_be_merged"
|
||
// Assert: detailed_merge_status in DB is "discussions_not_resolved" (not "can_be_merged")
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn prefers_merge_user_when_both_fields_present() {
|
||
// Setup: MR payload with BOTH merge_user AND merged_by
|
||
// Mock: Return MR with merge_user.username="alice" AND merged_by.username="bob"
|
||
// Assert: merge_user_username in DB is "alice" (not "bob")
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn prefers_draft_when_both_draft_and_work_in_progress_present() {
|
||
// Setup: MR payload with BOTH draft=false AND work_in_progress=true
|
||
// Mock: Return MR with draft=false, work_in_progress=true
|
||
// Assert: draft in DB is true (work_in_progress fallback applies when draft is false)
|
||
// Note: This tests the OR semantics: draft || work_in_progress
|
||
}
|
||
|
||
// CRITICAL: This test ensures we don't permanently lose data on partial pagination failures.
|
||
// If pagination fails mid-way, the watermark must NOT advance, so the next sync
|
||
// will retry fetching discussions for this MR.
|
||
#[tokio::test]
|
||
async fn does_not_advance_discussion_watermark_on_partial_failure() {
|
||
// Setup: Create MR with updated_at > discussions_synced_for_updated_at
|
||
// Mock: Page 1 of discussions returns successfully
|
||
// Mock: Page 2 returns 500 Internal Server Error
|
||
// Assert: discussions_synced_for_updated_at is unchanged (not advanced)
|
||
// Assert: Re-running ingest will retry this MR's discussions
|
||
}
|
||
|
||
// CRITICAL: This test ensures we don't lose data when note parsing fails.
|
||
// If any note in a discussion has an invalid timestamp, we must NOT delete
|
||
// existing notes, and we must NOT advance the watermark.
|
||
#[tokio::test]
|
||
async fn does_not_advance_discussion_watermark_on_note_parse_failure() {
|
||
// Setup: Create MR with existing discussion and valid notes
|
||
// Mock: Return discussion with one note having invalid created_at
|
||
// Assert: Original 3 notes are preserved
|
||
// Assert: No partial replacement occurred
|
||
}
|
||
|
||
// Verify atomic note replacement: parsing happens BEFORE deletion
|
||
#[tokio::test]
|
||
async fn atomic_note_replacement_preserves_data_on_parse_failure() {
|
||
// Setup: Discussion with 3 existing notes
|
||
// Mock: Return updated discussion where note 2 has invalid timestamp
|
||
// Assert: Original 3 notes are preserved
|
||
// Assert: No partial replacement occurred
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn uses_work_in_progress_when_draft_not_present() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn handles_locked_mr_state() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn pagination_continues_when_headers_missing_but_full_page_returned() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn does_not_store_raw_payload_for_system_notes_without_position() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn stores_raw_payload_for_diffnotes_even_if_system() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn captures_position_type_and_line_range_for_diffnotes() { /* ... */ }
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## Manual Smoke Tests
|
||
|
||
| Command | Expected Output | Pass Criteria |
|
||
|---------|-----------------|---------------|
|
||
| `gi ingest --type=merge_requests` | Progress bar, final count | Completes without error |
|
||
| `gi list mrs --limit=10` | Table of 10 MRs | Shows iid, title, state, author, branches, [DRAFT] prefix |
|
||
| `gi list mrs --project=group/project-one` | Filtered list | Only shows MRs from that project |
|
||
| `gi list mrs --state=merged` | Filtered by state | Only merged MRs shown |
|
||
| `gi list mrs --state=locked` | Filtered by state | Only locked MRs shown (merge in progress) |
|
||
| `gi list mrs --draft` | Filtered by draft | Only draft MRs shown |
|
||
| `gi list mrs --no-draft` | Excludes drafts | No draft MRs shown |
|
||
| `gi list mrs --reviewer=username` | Filtered by reviewer | Only MRs with that reviewer |
|
||
| `gi list mrs --target-branch=main` | Filtered by target | Only MRs targeting main |
|
||
| `gi count mrs` | `Merge Requests: N` | Count matches GitLab UI |
|
||
| `gi show mr 123` | MR detail view | Shows title, description, branches, discussions, detailed_merge_status |
|
||
| `gi show mr 123` (draft MR) | MR detail view | Shows `Draft: Yes` |
|
||
| `gi show mr 123` (ambiguous) | Prompt or error | Asks for `--project` clarification |
|
||
| `gi count discussions --type=mr` | `MR Discussions: N` | Non-zero count |
|
||
| `gi count notes --type=mr` | `MR Notes: N (excluding M system)` | Non-zero count, shows DiffNote count |
|
||
| `gi sync-status` | Shows MR cursor positions | MR cursors visible alongside issue cursors |
|
||
| `gi ingest --type=merge_requests` (re-run) | `0 new MRs` | Cursor is current |
|
||
| `gi ingest --type=merge_requests` (re-run) | `Skipped discussion sync for N unchanged MRs` | Watermark works |
|
||
| `gi ingest --type=merge_requests` (concurrent) | Lock error | Second run fails with clear message |
|
||
| `gi ingest --type=merge_requests --full` | Full re-sync | Resets cursor, fetches everything |
|
||
| Remove label from MR in GitLab, re-sync | Label link removed | Junction table reflects GitLab state |
|
||
| Add reviewer in GitLab, re-sync | Reviewer link added | Junction table updated |
|
||
|
||
---
|
||
|
||
## Data Integrity Checks
|
||
|
||
After successful ingestion, verify:
|
||
|
||
- [ ] `SELECT COUNT(*) FROM merge_requests` matches GitLab MR count for configured projects
|
||
- [ ] Every MR has a corresponding `raw_payloads` row
|
||
- [ ] `draft` field matches GitLab UI (draft MRs have draft=1, includes `work_in_progress` fallback)
|
||
- [ ] `state` includes all expected values (`opened`, `merged`, `closed`, `locked`)
|
||
- [ ] `detailed_merge_status` is populated for modern GitLab instances
|
||
- [ ] `head_sha` is populated (NULL only for very old MRs or edge cases)
|
||
- [ ] `references_short` and `references_full` are populated when GitLab returns them
|
||
- [ ] Labels in `mr_labels` junction all exist in `labels` table
|
||
- [ ] `mr_labels` count per MR matches GitLab UI label count
|
||
- [ ] `mr_assignees` usernames match GitLab UI
|
||
- [ ] `mr_reviewers` usernames match GitLab UI
|
||
- [ ] `sync_cursors` has entry for each `(project_id, 'merge_requests')` pair
|
||
- [ ] Re-running `gi ingest --type=merge_requests` fetches 0 new items (cursor is current)
|
||
- [ ] Re-running skips discussion refetch for unchanged MRs (watermark works)
|
||
- [ ] `--full` sync resets `discussions_synced_for_updated_at` to NULL for all MRs
|
||
- [ ] After partial pagination failure, `discussions_synced_for_updated_at` is NOT advanced
|
||
- [ ] After note parse failure, `discussions_synced_for_updated_at` is NOT advanced
|
||
- [ ] After note parse failure, existing notes are preserved (not deleted)
|
||
- [ ] `SELECT COUNT(*) FROM discussions WHERE noteable_type='MergeRequest'` is non-zero
|
||
- [ ] Every MR discussion has at least one note
|
||
- [ ] DiffNotes have `position_new_path` populated when available
|
||
- [ ] DiffNotes have `position_type` populated when available (text/image/file)
|
||
- [ ] DiffNotes have `position_base_sha`, `position_start_sha`, `position_head_sha` populated when available
|
||
- [ ] Multi-line DiffNotes have `position_line_range_start/end` populated when available
|
||
- [ ] `SELECT COUNT(*) FROM notes WHERE position_new_path IS NOT NULL` matches expected DiffNote count
|
||
- [ ] Discussion `first_note_at` <= `last_note_at` for all rows
|
||
- [ ] No notes have `created_at = 0` or `updated_at = 0` (strict timestamp parsing)
|
||
- [ ] System notes without position do NOT have raw_payloads rows (selective storage)
|
||
- [ ] After removing a label/reviewer in GitLab and re-syncing, the link is removed
|
||
- [ ] Stale discussions removed via `last_seen_at` sweep after successful pagination
|
||
- [ ] Stale notes removed via `last_seen_at` sweep (upsert + sweep pattern)
|
||
|
||
---
|
||
|
||
## Definition of Done
|
||
|
||
### Gate A: MRs Only (Must Pass First)
|
||
|
||
- [ ] `gi ingest --type=merge_requests` fetches all MRs from configured projects
|
||
- [ ] MRs stored with correct schema, including branches and merge metadata
|
||
- [ ] `draft` field captured correctly (with `work_in_progress` fallback for older instances)
|
||
- [ ] `state` field supports all values: `opened`, `merged`, `closed`, `locked`
|
||
- [ ] `locked` state handled as transitional (local filtering only, not server-side filter)
|
||
- [ ] `detailed_merge_status` used (non-deprecated) with fallback to `merge_status`
|
||
- [ ] `merge_user_username` used (non-deprecated) with fallback to `merged_by`
|
||
- [ ] `head_sha` captured for CP3 readiness
|
||
- [ ] `references_short` and `references_full` captured for CP3 readiness
|
||
- [ ] Cursor-based sync is resumable (re-run fetches only new/updated)
|
||
- [ ] Page-boundary cursor updates (not based on item count modulo)
|
||
- [ ] Pagination uses robust fallback chain: Link header > x-next-page > full-page heuristic
|
||
- [ ] Raw payloads stored for each MR
|
||
- [ ] `gi list mrs` and `gi count mrs` work
|
||
|
||
### Gate B: Labels + Assignees + Reviewers (Must Pass)
|
||
|
||
- [ ] Labels extracted and stored (name-only)
|
||
- [ ] Label links created correctly
|
||
- [ ] **Stale label links removed on re-sync** (verified with test)
|
||
- [ ] Assignees extracted and linked to `mr_assignees`
|
||
- [ ] Reviewers extracted and linked to `mr_reviewers`
|
||
- [ ] **Stale assignee/reviewer links removed on re-sync**
|
||
|
||
### Gate C: Dependent Discussion Sync with DiffNotes (Must Pass)
|
||
|
||
- [ ] Discussions fetched for MRs with `updated_at` advancement
|
||
- [ ] Notes stored with `is_system` flag correctly set
|
||
- [ ] **Upsert + sweep pattern for notes** (not delete-all-then-insert; reduces write amplification)
|
||
- [ ] **Atomic note replacement** (notes parsed BEFORE existing notes deleted - prevents data loss)
|
||
- [ ] **Strict timestamp parsing** (no zero values from invalid timestamps)
|
||
- [ ] **DiffNote position metadata captured** (`position_new_path`, `position_type`, line numbers, line_range)
|
||
- [ ] **DiffNote SHA triplet captured** (`position_base_sha`, `position_start_sha`, `position_head_sha`)
|
||
- [ ] **Selective raw payload storage** (skip system notes without position)
|
||
- [ ] Raw payloads stored for discussions and DiffNotes
|
||
- [ ] `discussions_synced_for_updated_at` watermark updated after sync
|
||
- [ ] **Watermark is NOT advanced if any discussion page fetch fails** (verified with test)
|
||
- [ ] **Watermark is NOT advanced if any note parse fails** (verified with test)
|
||
- [ ] **Unchanged MRs skip discussion refetch** (verified with test)
|
||
- [ ] Stale discussions removed via `last_seen_at` sweep after successful pagination
|
||
- [ ] Stale notes removed via `last_seen_at` sweep (upsert + sweep pattern)
|
||
- [ ] Bounded concurrency (`dependent_concurrency` respected)
|
||
|
||
### Gate D: Resumability Proof (Must Pass)
|
||
|
||
- [ ] Kill mid-run, rerun; bounded redo (cursor progress preserved at page boundaries)
|
||
- [ ] No redundant discussion refetch after crash recovery
|
||
- [ ] No watermark advancement on partial pagination failure
|
||
- [ ] No watermark advancement on note parse failure (preserves existing data)
|
||
- [ ] Single-flight lock prevents concurrent runs
|
||
- [ ] `--full` flag resets cursor and fetches all data
|
||
- [ ] `--full` flag also resets `discussions_synced_for_updated_at` (forces discussion refetch)
|
||
|
||
### Gate E: CLI Complete (Must Pass)
|
||
|
||
- [ ] `gi list mrs` with all filter options including `--draft` and `--no-draft`
|
||
- [ ] `gi list mrs` shows [DRAFT] prefix for draft MRs
|
||
- [ ] `gi show mr <iid>` displays full detail with discussions
|
||
- [ ] `gi show mr` shows DiffNote file context in discussion display
|
||
- [ ] `gi show mr` shows `detailed_merge_status`
|
||
- [ ] `gi count mrs` shows state breakdown
|
||
- [ ] `gi sync-status` shows MR cursor positions
|
||
|
||
### Final Gate (Must Pass)
|
||
|
||
- [ ] All unit tests pass (`cargo test`)
|
||
- [ ] All integration tests pass (mocked with wiremock)
|
||
- [ ] `does_not_advance_discussion_watermark_on_partial_failure` test passes
|
||
- [ ] `prefers_detailed_merge_status_when_both_fields_present` test passes
|
||
- [ ] `prefers_merge_user_when_both_fields_present` test passes
|
||
- [ ] `prefers_draft_when_both_draft_and_work_in_progress_present` test passes
|
||
- [ ] `cargo clippy` passes with no warnings
|
||
- [ ] `cargo fmt --check` passes
|
||
- [ ] Compiles with `--release`
|
||
|
||
### Hardening (Optional Before CP3)
|
||
|
||
- [ ] Edge cases: MRs with 0 labels, 0 discussions, no assignees/reviewers
|
||
- [ ] Large pagination (100+ pages)
|
||
- [ ] Rate limit handling under sustained load
|
||
- [ ] Live tests pass against real GitLab instance
|
||
- [ ] Performance: 1000+ MRs ingested in <5 min
|
||
- [ ] DB size verification: system note payloads not stored
|
||
|
||
---
|
||
|
||
## Implementation Order
|
||
|
||
1. **Database migration** (15 min)
|
||
- `migrations/006_merge_requests.sql`
|
||
- Update `MIGRATIONS` const in `src/core/db.rs`
|
||
|
||
2. **GitLab types** (15 min)
|
||
- Add `GitLabMergeRequest`, `GitLabReviewer` to `src/gitlab/types.rs`
|
||
- Test deserialization with fixtures
|
||
|
||
3. **MR Transformer** (25 min)
|
||
- `src/gitlab/transformers/merge_request.rs`
|
||
- `MergeRequestWithMetadata` struct
|
||
- Unit tests
|
||
|
||
4. **Discussion transformer updates** (15 min)
|
||
- Add `transform_mr_discussion()` function
|
||
- Add `transform_notes_with_diff_position()` function
|
||
- Unit tests for DiffNote extraction
|
||
|
||
5. **GitLab client pagination** (20 min)
|
||
- Add `paginate_merge_requests()`
|
||
- Add `paginate_mr_discussions()`
|
||
|
||
6. **MR ingestion** (45 min)
|
||
- `src/ingestion/merge_requests.rs`
|
||
- Transaction batching
|
||
- Label/assignee/reviewer linking with stale removal
|
||
- Incremental cursor updates
|
||
- Return `mrs_needing_discussion_sync`
|
||
- Integration tests
|
||
|
||
7. **MR discussion ingestion** (30 min)
|
||
- `src/ingestion/mr_discussions.rs`
|
||
- DiffNote position capture
|
||
- Stale discussion removal
|
||
- Watermark update
|
||
- Integration tests
|
||
|
||
8. **Update orchestrator** (20 min)
|
||
- `src/ingestion/orchestrator.rs`
|
||
- Support both issue and MR ingestion
|
||
- Aggregate results
|
||
|
||
9. **Update ingest command** (15 min)
|
||
- `src/cli/commands/ingest.rs`
|
||
- Route `merge_requests` type to MR ingest
|
||
|
||
10. **Implement MR CLI commands** (45 min)
|
||
- Update `gi list` for MRs with filters
|
||
- Update `gi show` for MR detail view
|
||
- Update `gi count` for MR counts
|
||
- Update `gi sync-status` for MR cursors
|
||
|
||
11. **Final validation** (20 min)
|
||
- `cargo test`
|
||
- `cargo clippy`
|
||
- Gate A/B/C/D/E verification
|
||
- Manual smoke tests
|
||
- Data integrity checks
|
||
|
||
---
|
||
|
||
## Risks & Mitigations
|
||
|
||
| Risk | Mitigation |
|
||
|------|------------|
|
||
| GitLab rate limiting during large sync | Respect `Retry-After`, exponential backoff, configurable concurrency |
|
||
| MR discussion API N+1 problem | `dependent_concurrency` config limits parallel requests; watermark prevents refetch |
|
||
| Cursor drift if GitLab timestamp behavior changes | Rolling backfill window catches missed items |
|
||
| Large MRs with 100+ discussions | Paginate discussions, bound memory usage |
|
||
| System notes pollute data | `is_system` flag allows filtering |
|
||
| Label/assignee/reviewer deduplication | Clear and re-link pattern ensures correctness |
|
||
| DiffNote position field variations | Defensive parsing with Option types; extended fields (position_type, line_range, SHA triplet) for modern GitLab |
|
||
| Async stream complexity | Use `async-stream` crate for ergonomic generators |
|
||
| rusqlite + async runtime Send/locking pitfalls | Use `LocalSet` + `spawn_local` for non-Send tasks; each worker opens its own SQLite connection (WAL + busy_timeout). Avoid holding DB handles across `.await`. |
|
||
| Crash causes massive refetch | Page-boundary cursor updates (not item-count based) |
|
||
| Cursor rewind causes discussion refetch | Per-MR watermark (`discussions_synced_for_updated_at`) |
|
||
| Stale discussions accumulate | Remove discussions via `last_seen_at < run_seen_at` sweep (not in-memory ID collection) |
|
||
| Stale notes accumulate | Remove notes via `last_seen_at` sweep (upsert + sweep pattern) |
|
||
| **Partial pagination failure loses data** | Watermark NOT advanced unless pagination completes; explicit test coverage |
|
||
| **Invalid timestamps corrupt cursor logic** | Strict timestamp parsing (Result types, no unwrap_or(0)); bad notes logged and skipped; parse BEFORE delete |
|
||
| **Note parse failure causes data loss** | Parse notes BEFORE any destructive DB operations; if parse fails, preserve existing data and don't advance watermark |
|
||
| **Raw payload size explodes with notes** | Selective storage: DiffNotes + non-system notes only; system notes use discussion payload |
|
||
| **Deprecated GitLab fields cause future churn** | Use `detailed_merge_status` and `merge_user` with fallback to old; explicit tests verify preference when both present |
|
||
| **Memory growth on large projects** | DB-driven discussion sync selection; no in-memory collection of MRs needing sync |
|
||
| **Pagination headers missing (proxy/instance issue)** | Robust fallback chain: Link header (RFC 8288) > x-next-page > full-page heuristic |
|
||
| **GitLab version differences** | Handle `locked` state (transitional, local filtering); support both `draft` and `work_in_progress`; optional reviewer verification endpoint for debugging |
|
||
| **--full sync leaves stale discussion data** | `--full` resets both MR cursor AND `discussions_synced_for_updated_at` watermarks |
|
||
| **Write amplification on note updates** | Upsert + sweep pattern instead of delete-all-then-insert |
|
||
|
||
---
|
||
|
||
## API Call Estimation
|
||
|
||
For a project with **500 MRs** (average 10 discussions each, 3 notes per discussion):
|
||
|
||
| Operation | Calls | Notes |
|
||
|-----------|-------|-------|
|
||
| MR list pages | 5 | 100 per page |
|
||
| Discussion pages per MR | 1 | Most MRs have <100 discussions |
|
||
| **Total initial sync** | ~505 | 5 + (500 × 1) |
|
||
| **Subsequent sync (10% change)** | ~55 | 5 + (50 × 1) |
|
||
|
||
**Rate limit safety:** At 100 requests/second, full sync completes in ~5 seconds of API time. With `dependent_concurrency: 10`, wall clock time is ~50 seconds plus network latency.
|
||
|
||
---
|
||
|
||
## MR-Specific Considerations
|
||
|
||
### DiffNote Position Metadata
|
||
|
||
GitLab DiffNotes include position metadata that identifies the file and line where the comment was placed. This is critical for code review context in CP3 document generation.
|
||
|
||
**Position Object Structure (from GitLab API):**
|
||
```json
|
||
{
|
||
"position": {
|
||
"base_sha": "abc123...", // Base commit SHA for the diff
|
||
"start_sha": "def456...", // Start commit SHA for the diff
|
||
"head_sha": "ghi789...", // Head commit SHA for the diff
|
||
"old_path": "src/auth/login.ts", // File path before rename (if any)
|
||
"new_path": "src/auth/login.ts", // Current file path
|
||
"position_type": "text", // "text" | "image" | "file"
|
||
"old_line": null, // Line number in old version (null for additions)
|
||
"new_line": 45, // Line number in new version (null for deletions)
|
||
"line_range": { // For multi-line comments (GitLab 13.6+)
|
||
"start": { "line_code": "...", "type": "new", "old_line": null, "new_line": 45 },
|
||
"end": { "line_code": "...", "type": "new", "old_line": null, "new_line": 48 }
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
**Why Capture SHA Triplet:**
|
||
The SHA triplet (`base_sha`, `start_sha`, `head_sha`) is essential for future CP3 work:
|
||
- **Precise diff context:** Maps comment to exact commit range being reviewed
|
||
- **Future file content fetch:** Enables fetching file content at the exact commit when the comment was made
|
||
- **Stale comment detection:** Identifies comments on outdated code when source branch is updated
|
||
- **Zero extra API cost:** Already present in discussion response, just needs extraction
|
||
|
||
**Stored Fields:**
|
||
- `position_old_path` - File path in base version (for renames/deletions)
|
||
- `position_new_path` - File path in head version (for additions/modifications)
|
||
- `position_old_line` - Line number in base version
|
||
- `position_new_line` - Line number in head version
|
||
- `position_type` - Type of position: "text", "image", or "file"
|
||
- `position_line_range_start` - Start line for multi-line comments
|
||
- `position_line_range_end` - End line for multi-line comments
|
||
- `position_base_sha` - Base commit SHA for the diff
|
||
- `position_start_sha` - Start commit SHA for the diff
|
||
- `position_head_sha` - Head commit SHA for the diff
|
||
|
||
**Display in CLI:**
|
||
```
|
||
@janedoe (2024-03-16) [src/auth/login.ts:45]:
|
||
Should we validate the JWT signature here?
|
||
```
|
||
|
||
For multi-line comments:
|
||
```
|
||
@janedoe (2024-03-16) [src/auth/login.ts:45-48]:
|
||
This whole block should use async/await instead of callbacks.
|
||
```
|
||
|
||
---
|
||
|
||
## References
|
||
|
||
- [GitLab MR API](https://docs.gitlab.com/ee/api/merge_requests.html)
|
||
- [GitLab Discussions API](https://docs.gitlab.com/ee/api/discussions.html)
|
||
- [GitLab Notes API](https://docs.gitlab.com/ee/api/notes.html)
|
||
- [GitLab Pagination](https://docs.gitlab.com/ee/api/rest/index.html#pagination)
|
||
- [RFC 8288 - Link Header](https://datatracker.ietf.org/doc/html/rfc8288)
|
||
- Checkpoint 1 PRD (Issue Ingestion)
|