1686 lines
54 KiB
Markdown
1686 lines
54 KiB
Markdown
# Checkpoint 1: Issue Ingestion - PRD
|
||
|
||
> **Note:** The project was renamed from "gitlab-inbox" to "gitlore" and the CLI from "gi" to "lore". References to "gi" in this document should be read as "lore".
|
||
|
||
**Version:** 2.0
|
||
**Status:** Ready for Implementation
|
||
**Depends On:** Checkpoint 0 (Project Setup)
|
||
**Enables:** Checkpoint 2 (MR Ingestion)
|
||
|
||
---
|
||
|
||
## Overview
|
||
|
||
### Objective
|
||
|
||
Ingest all issues, labels, and issue discussions from configured GitLab repositories with resumable cursor-based incremental sync. This checkpoint establishes the core data ingestion pattern that will be reused for MRs in Checkpoint 2.
|
||
|
||
### Success Criteria
|
||
|
||
| Criterion | Validation |
|
||
|-----------|------------|
|
||
| `gi ingest --type=issues` fetches all issues | `gi count issues` matches GitLab UI |
|
||
| Labels extracted from issue payloads (name-only) | `labels` table populated |
|
||
| Label linkage reflects current GitLab state | Removed labels are unlinked on re-sync |
|
||
| Issue discussions fetched per-issue (dependent sync) | For issues whose `updated_at` advanced, discussions and notes upserted |
|
||
| Cursor-based sync is resumable | Re-running fetches 0 new items |
|
||
| Discussion sync skips unchanged issues | Per-issue watermark prevents redundant fetches |
|
||
| Sync tracking records all runs | `sync_runs` table has complete audit trail |
|
||
| Single-flight lock prevents concurrent runs | Second sync fails with clear error |
|
||
|
||
---
|
||
|
||
## Internal Gates
|
||
|
||
CP1 is validated incrementally via internal gates:
|
||
|
||
| Gate | Scope | Validation |
|
||
|------|-------|------------|
|
||
| **Gate A** | Issues only | Cursor + upsert + raw payloads + list/count/show working |
|
||
| **Gate B** | Labels correct | Stale-link removal verified; label count matches GitLab |
|
||
| **Gate C** | Dependent discussion sync | Watermark prevents redundant refetch; concurrency bounded |
|
||
| **Gate D** | Resumability proof | Kill mid-run, rerun; confirm bounded redo and no redundant discussion refetch |
|
||
|
||
---
|
||
|
||
## Deliverables
|
||
|
||
### 1. Project Structure Additions
|
||
|
||
Add the following to the existing Rust structure from Checkpoint 0:
|
||
|
||
```
|
||
gitlab-inbox/
|
||
├── src/
|
||
│ ├── cli/
|
||
│ │ └── commands/
|
||
│ │ ├── ingest.rs # gi ingest --type=issues|merge_requests
|
||
│ │ ├── list.rs # gi list issues|mrs
|
||
│ │ ├── count.rs # gi count issues|mrs|discussions|notes
|
||
│ │ └── show.rs # gi show issue|mr <iid>
|
||
│ ├── gitlab/
|
||
│ │ ├── types.rs # Add GitLabIssue, GitLabDiscussion, GitLabNote
|
||
│ │ └── transformers/
|
||
│ │ ├── mod.rs
|
||
│ │ ├── issue.rs # GitLab → normalized issue
|
||
│ │ └── discussion.rs # GitLab → normalized discussion/notes
|
||
│ └── ingestion/
|
||
│ ├── mod.rs
|
||
│ ├── orchestrator.rs # Coordinates issue + dependent discussion sync
|
||
│ ├── issues.rs # Issue fetcher with pagination
|
||
│ └── discussions.rs # Discussion fetcher (per-issue)
|
||
├── tests/
|
||
│ ├── issue_transformer_tests.rs
|
||
│ ├── discussion_transformer_tests.rs
|
||
│ ├── pagination_tests.rs
|
||
│ ├── issue_ingestion_tests.rs
|
||
│ ├── label_linkage_tests.rs # Verifies stale link removal
|
||
│ ├── discussion_watermark_tests.rs
|
||
│ └── fixtures/
|
||
│ ├── gitlab_issue.json
|
||
│ ├── gitlab_issues_page.json
|
||
│ ├── gitlab_discussion.json
|
||
│ └── gitlab_discussions_page.json
|
||
└── migrations/
|
||
└── 002_issues.sql
|
||
```
|
||
|
||
### 2. GitLab API Endpoints
|
||
|
||
**Issues (Bulk Fetch):**
|
||
```
|
||
GET /projects/:id/issues?scope=all&state=all&updated_after=X&order_by=updated_at&sort=asc&per_page=100
|
||
```
|
||
|
||
**Issue Discussions (Per-Issue Fetch):**
|
||
```
|
||
GET /projects/:id/issues/:iid/discussions?per_page=100&page=N
|
||
```
|
||
|
||
**Required Query Parameters:**
|
||
- `scope=all` - Include all issues, not just authored by current user
|
||
- `state=all` - Include closed issues (GitLab may default to open only)
|
||
|
||
**MVP Note (Labels):**
|
||
- CP1 stores labels by **name only** for maximum compatibility and stability.
|
||
- Label color/description ingestion is deferred (post-CP1) via Labels API if needed.
|
||
- This avoids relying on optional/variant payload shapes that differ across GitLab versions.
|
||
|
||
**Pagination:**
|
||
- Follow `x-next-page` header until empty/absent
|
||
- Fall back to empty-page detection if headers missing (robustness)
|
||
- Per-page maximum: 100
|
||
|
||
---
|
||
|
||
## Database Schema
|
||
|
||
### Migration 002_issues.sql
|
||
|
||
```sql
|
||
-- Issues table
|
||
CREATE TABLE issues (
|
||
id INTEGER PRIMARY KEY,
|
||
gitlab_id INTEGER UNIQUE NOT NULL,
|
||
project_id INTEGER NOT NULL REFERENCES projects(id),
|
||
iid INTEGER NOT NULL,
|
||
title TEXT,
|
||
description TEXT,
|
||
state TEXT, -- 'opened' | 'closed'
|
||
author_username TEXT,
|
||
created_at INTEGER, -- ms epoch UTC
|
||
updated_at INTEGER, -- ms epoch UTC
|
||
last_seen_at INTEGER NOT NULL, -- ms epoch UTC, updated on every upsert
|
||
-- Prevents re-fetching discussions on cursor rewind / reruns unless issue changed.
|
||
-- Set to issue.updated_at after successfully syncing all discussions for this issue.
|
||
discussions_synced_for_updated_at INTEGER,
|
||
web_url TEXT,
|
||
raw_payload_id INTEGER REFERENCES raw_payloads(id)
|
||
);
|
||
CREATE INDEX idx_issues_project_updated ON issues(project_id, updated_at);
|
||
CREATE INDEX idx_issues_author ON issues(author_username);
|
||
CREATE INDEX idx_issues_discussions_sync ON issues(project_id, discussions_synced_for_updated_at);
|
||
CREATE UNIQUE INDEX uq_issues_project_iid ON issues(project_id, iid);
|
||
|
||
-- Labels (derived from issue payloads)
|
||
-- CP1: Name-only for stability. Color/description deferred to Labels API integration.
|
||
-- Uniqueness is (project_id, name) since gitlab_id isn't always available.
|
||
CREATE TABLE labels (
|
||
id INTEGER PRIMARY KEY,
|
||
gitlab_id INTEGER, -- optional (populated if Labels API used later)
|
||
project_id INTEGER NOT NULL REFERENCES projects(id),
|
||
name TEXT NOT NULL,
|
||
color TEXT, -- nullable, populated later if needed
|
||
description TEXT -- nullable, populated later if needed
|
||
);
|
||
CREATE UNIQUE INDEX uq_labels_project_name ON labels(project_id, name);
|
||
CREATE INDEX idx_labels_name ON labels(name);
|
||
|
||
-- Issue-Label junction
|
||
-- IMPORTANT: On issue update, DELETE existing links then INSERT current set.
|
||
-- This ensures removed labels are unlinked (not just added).
|
||
CREATE TABLE issue_labels (
|
||
issue_id INTEGER REFERENCES issues(id) ON DELETE CASCADE,
|
||
label_id INTEGER REFERENCES labels(id) ON DELETE CASCADE,
|
||
PRIMARY KEY(issue_id, label_id)
|
||
);
|
||
CREATE INDEX idx_issue_labels_label ON issue_labels(label_id);
|
||
|
||
-- Discussion threads for issues
|
||
CREATE TABLE discussions (
|
||
id INTEGER PRIMARY KEY,
|
||
gitlab_discussion_id TEXT NOT NULL, -- GitLab's string ID (e.g., "6a9c1750b37d...")
|
||
project_id INTEGER NOT NULL REFERENCES projects(id),
|
||
issue_id INTEGER REFERENCES issues(id),
|
||
merge_request_id INTEGER, -- FK added in CP2 via ALTER TABLE
|
||
noteable_type TEXT NOT NULL, -- 'Issue' | 'MergeRequest'
|
||
individual_note INTEGER NOT NULL, -- 1 = standalone comment, 0 = threaded
|
||
first_note_at INTEGER, -- ms epoch UTC, for ordering discussions
|
||
last_note_at INTEGER, -- ms epoch UTC, for "recently active" queries
|
||
last_seen_at INTEGER NOT NULL, -- ms epoch UTC, updated on every upsert
|
||
resolvable INTEGER, -- MR discussions can be resolved
|
||
resolved INTEGER,
|
||
raw_payload_id INTEGER REFERENCES raw_payloads(id),
|
||
CHECK (
|
||
(noteable_type='Issue' AND issue_id IS NOT NULL AND merge_request_id IS NULL) OR
|
||
(noteable_type='MergeRequest' AND merge_request_id IS NOT NULL AND issue_id IS NULL)
|
||
)
|
||
);
|
||
CREATE UNIQUE INDEX uq_discussions_project_discussion_id ON discussions(project_id, gitlab_discussion_id);
|
||
CREATE INDEX idx_discussions_issue ON discussions(issue_id);
|
||
CREATE INDEX idx_discussions_mr ON discussions(merge_request_id);
|
||
CREATE INDEX idx_discussions_last_note ON discussions(last_note_at);
|
||
|
||
-- Notes belong to discussions (preserving thread context)
|
||
CREATE TABLE notes (
|
||
id INTEGER PRIMARY KEY,
|
||
gitlab_id INTEGER UNIQUE NOT NULL,
|
||
discussion_id INTEGER NOT NULL REFERENCES discussions(id),
|
||
project_id INTEGER NOT NULL REFERENCES projects(id),
|
||
note_type TEXT, -- 'DiscussionNote' | 'DiffNote' | null
|
||
is_system INTEGER NOT NULL DEFAULT 0, -- 1 for system notes (assignments, label changes)
|
||
author_username TEXT,
|
||
body TEXT,
|
||
created_at INTEGER, -- ms epoch UTC
|
||
updated_at INTEGER, -- ms epoch UTC
|
||
last_seen_at INTEGER NOT NULL, -- ms epoch UTC, updated on every upsert
|
||
position INTEGER, -- derived from array order in API response (0-indexed)
|
||
resolvable INTEGER,
|
||
resolved INTEGER,
|
||
resolved_by TEXT,
|
||
resolved_at INTEGER, -- ms epoch UTC
|
||
-- DiffNote position metadata (populated for MR DiffNotes in CP2)
|
||
position_old_path TEXT,
|
||
position_new_path TEXT,
|
||
position_old_line INTEGER,
|
||
position_new_line INTEGER,
|
||
raw_payload_id INTEGER REFERENCES raw_payloads(id)
|
||
);
|
||
CREATE INDEX idx_notes_discussion ON notes(discussion_id);
|
||
CREATE INDEX idx_notes_author ON notes(author_username);
|
||
CREATE INDEX idx_notes_system ON notes(is_system);
|
||
|
||
-- Update schema version
|
||
INSERT INTO schema_version (version, applied_at, description)
|
||
VALUES (2, strftime('%s', 'now') * 1000, 'Issues, labels, discussions, notes');
|
||
```
|
||
|
||
---
|
||
|
||
## GitLab Types
|
||
|
||
### Type Definitions
|
||
|
||
```rust
|
||
// src/gitlab/types.rs (additions)
|
||
|
||
use serde::Deserialize;
|
||
|
||
/// GitLab issue from the API.
|
||
#[derive(Debug, Clone, Deserialize)]
|
||
pub struct GitLabIssue {
|
||
pub id: i64, // GitLab global ID
|
||
pub iid: i64, // Project-scoped issue number
|
||
pub project_id: i64,
|
||
pub title: String,
|
||
pub description: Option<String>,
|
||
pub state: String, // "opened" | "closed"
|
||
pub created_at: String, // ISO 8601
|
||
pub updated_at: String, // ISO 8601
|
||
pub closed_at: Option<String>,
|
||
pub author: GitLabAuthor,
|
||
pub labels: Vec<String>, // Array of label names (CP1 canonical)
|
||
pub web_url: String,
|
||
// NOTE: labels_details is intentionally NOT modeled for CP1.
|
||
// The field name and shape varies across GitLab versions.
|
||
// Color/description can be fetched via Labels API if needed later.
|
||
}
|
||
|
||
#[derive(Debug, Clone, Deserialize)]
|
||
pub struct GitLabAuthor {
|
||
pub id: i64,
|
||
pub username: String,
|
||
pub name: String,
|
||
}
|
||
|
||
/// GitLab discussion (thread of notes).
|
||
#[derive(Debug, Clone, Deserialize)]
|
||
pub struct GitLabDiscussion {
|
||
pub id: String, // String ID like "6a9c1750b37d..."
|
||
pub individual_note: bool, // true = standalone comment
|
||
pub notes: Vec<GitLabNote>,
|
||
}
|
||
|
||
/// GitLab note (comment).
|
||
#[derive(Debug, Clone, Deserialize)]
|
||
pub struct GitLabNote {
|
||
pub id: i64,
|
||
#[serde(rename = "type")]
|
||
pub note_type: Option<String>, // "DiscussionNote" | "DiffNote" | null
|
||
pub body: String,
|
||
pub author: GitLabAuthor,
|
||
pub created_at: String, // ISO 8601
|
||
pub updated_at: String, // ISO 8601
|
||
pub system: bool, // true for system-generated notes
|
||
#[serde(default)]
|
||
pub resolvable: bool,
|
||
#[serde(default)]
|
||
pub resolved: bool,
|
||
pub resolved_by: Option<GitLabAuthor>,
|
||
pub resolved_at: Option<String>,
|
||
/// DiffNote specific (null for non-DiffNote)
|
||
pub position: Option<GitLabNotePosition>,
|
||
}
|
||
|
||
#[derive(Debug, Clone, Deserialize)]
|
||
pub struct GitLabNotePosition {
|
||
pub old_path: Option<String>,
|
||
pub new_path: Option<String>,
|
||
pub old_line: Option<i32>,
|
||
pub new_line: Option<i32>,
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## Transformers
|
||
|
||
### Issue Transformer
|
||
|
||
```rust
|
||
// src/gitlab/transformers/issue.rs
|
||
|
||
use crate::core::time::{iso_to_ms, now_ms};
|
||
use crate::gitlab::types::GitLabIssue;
|
||
|
||
/// Normalized issue ready for database insertion.
|
||
#[derive(Debug, Clone)]
|
||
pub struct NormalizedIssue {
|
||
pub gitlab_id: i64,
|
||
pub project_id: i64, // Local DB project ID
|
||
pub iid: i64,
|
||
pub title: String,
|
||
pub description: Option<String>,
|
||
pub state: String,
|
||
pub author_username: String,
|
||
pub created_at: i64, // ms epoch
|
||
pub updated_at: i64, // ms epoch
|
||
pub last_seen_at: i64, // ms epoch
|
||
pub web_url: String,
|
||
}
|
||
|
||
/// Normalized label ready for database insertion.
|
||
/// CP1: Name-only for stability.
|
||
#[derive(Debug, Clone)]
|
||
pub struct NormalizedLabel {
|
||
pub project_id: i64,
|
||
pub name: String,
|
||
}
|
||
|
||
/// Transform GitLab issue to normalized schema.
|
||
pub fn transform_issue(gitlab_issue: &GitLabIssue, local_project_id: i64) -> NormalizedIssue {
|
||
NormalizedIssue {
|
||
gitlab_id: gitlab_issue.id,
|
||
project_id: local_project_id,
|
||
iid: gitlab_issue.iid,
|
||
title: gitlab_issue.title.clone(),
|
||
description: gitlab_issue.description.clone(),
|
||
state: gitlab_issue.state.clone(),
|
||
author_username: gitlab_issue.author.username.clone(),
|
||
created_at: iso_to_ms(&gitlab_issue.created_at),
|
||
updated_at: iso_to_ms(&gitlab_issue.updated_at),
|
||
last_seen_at: now_ms(),
|
||
web_url: gitlab_issue.web_url.clone(),
|
||
}
|
||
}
|
||
|
||
/// Extract labels from GitLab issue (CP1: name-only).
|
||
pub fn extract_labels(gitlab_issue: &GitLabIssue, local_project_id: i64) -> Vec<NormalizedLabel> {
|
||
gitlab_issue
|
||
.labels
|
||
.iter()
|
||
.map(|name| NormalizedLabel {
|
||
project_id: local_project_id,
|
||
name: name.clone(),
|
||
})
|
||
.collect()
|
||
}
|
||
```
|
||
|
||
### Discussion Transformer
|
||
|
||
```rust
|
||
// src/gitlab/transformers/discussion.rs
|
||
|
||
use crate::core::time::{iso_to_ms, now_ms};
|
||
use crate::gitlab::types::GitLabDiscussion;
|
||
|
||
/// Normalized discussion ready for database insertion.
|
||
#[derive(Debug, Clone)]
|
||
pub struct NormalizedDiscussion {
|
||
pub gitlab_discussion_id: String,
|
||
pub project_id: i64,
|
||
pub issue_id: i64,
|
||
pub noteable_type: String, // "Issue"
|
||
pub individual_note: bool,
|
||
pub first_note_at: Option<i64>,
|
||
pub last_note_at: Option<i64>,
|
||
pub last_seen_at: i64,
|
||
pub resolvable: bool,
|
||
pub resolved: bool,
|
||
}
|
||
|
||
/// Normalized note ready for database insertion.
|
||
#[derive(Debug, Clone)]
|
||
pub struct NormalizedNote {
|
||
pub gitlab_id: i64,
|
||
pub project_id: i64,
|
||
pub note_type: Option<String>,
|
||
pub is_system: bool,
|
||
pub author_username: String,
|
||
pub body: String,
|
||
pub created_at: i64,
|
||
pub updated_at: i64,
|
||
pub last_seen_at: i64,
|
||
pub position: i32, // Array index in notes[]
|
||
pub resolvable: bool,
|
||
pub resolved: bool,
|
||
pub resolved_by: Option<String>,
|
||
pub resolved_at: Option<i64>,
|
||
}
|
||
|
||
/// Transform GitLab discussion to normalized schema.
|
||
pub fn transform_discussion(
|
||
gitlab_discussion: &GitLabDiscussion,
|
||
local_project_id: i64,
|
||
local_issue_id: i64,
|
||
) -> NormalizedDiscussion {
|
||
let note_times: Vec<i64> = gitlab_discussion
|
||
.notes
|
||
.iter()
|
||
.map(|n| iso_to_ms(&n.created_at))
|
||
.collect();
|
||
|
||
// Check if any note is resolvable
|
||
let resolvable = gitlab_discussion.notes.iter().any(|n| n.resolvable);
|
||
let resolved = resolvable
|
||
&& gitlab_discussion
|
||
.notes
|
||
.iter()
|
||
.all(|n| !n.resolvable || n.resolved);
|
||
|
||
NormalizedDiscussion {
|
||
gitlab_discussion_id: gitlab_discussion.id.clone(),
|
||
project_id: local_project_id,
|
||
issue_id: local_issue_id,
|
||
noteable_type: "Issue".to_string(),
|
||
individual_note: gitlab_discussion.individual_note,
|
||
first_note_at: note_times.iter().min().copied(),
|
||
last_note_at: note_times.iter().max().copied(),
|
||
last_seen_at: now_ms(),
|
||
resolvable,
|
||
resolved,
|
||
}
|
||
}
|
||
|
||
/// Transform GitLab notes to normalized schema.
|
||
pub fn transform_notes(
|
||
gitlab_discussion: &GitLabDiscussion,
|
||
local_project_id: i64,
|
||
) -> Vec<NormalizedNote> {
|
||
gitlab_discussion
|
||
.notes
|
||
.iter()
|
||
.enumerate()
|
||
.map(|(index, note)| NormalizedNote {
|
||
gitlab_id: note.id,
|
||
project_id: local_project_id,
|
||
note_type: note.note_type.clone(),
|
||
is_system: note.system,
|
||
author_username: note.author.username.clone(),
|
||
body: note.body.clone(),
|
||
created_at: iso_to_ms(¬e.created_at),
|
||
updated_at: iso_to_ms(¬e.updated_at),
|
||
last_seen_at: now_ms(),
|
||
position: index as i32,
|
||
resolvable: note.resolvable,
|
||
resolved: note.resolved,
|
||
resolved_by: note.resolved_by.as_ref().map(|a| a.username.clone()),
|
||
resolved_at: note.resolved_at.as_ref().map(|s| iso_to_ms(s)),
|
||
})
|
||
.collect()
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## GitLab Client Additions
|
||
|
||
### Pagination with Async Streams
|
||
|
||
```rust
|
||
// src/gitlab/client.rs (additions)
|
||
|
||
use crate::gitlab::types::{GitLabDiscussion, GitLabIssue};
|
||
use reqwest::header::HeaderMap;
|
||
use std::pin::Pin;
|
||
use futures::Stream;
|
||
|
||
impl GitLabClient {
|
||
/// Paginate through issues for a project.
|
||
/// Returns a stream of issues that handles pagination automatically.
|
||
pub fn paginate_issues(
|
||
&self,
|
||
gitlab_project_id: i64,
|
||
updated_after: Option<i64>,
|
||
cursor_rewind_seconds: u32,
|
||
) -> Pin<Box<dyn Stream<Item = Result<GitLabIssue>> + Send + '_>> {
|
||
Box::pin(async_stream::try_stream! {
|
||
let mut page = 1u32;
|
||
let per_page = 100u32;
|
||
|
||
loop {
|
||
let mut params = vec![
|
||
("scope", "all".to_string()),
|
||
("state", "all".to_string()),
|
||
("order_by", "updated_at".to_string()),
|
||
("sort", "asc".to_string()),
|
||
("per_page", per_page.to_string()),
|
||
("page", page.to_string()),
|
||
];
|
||
|
||
if let Some(ts) = updated_after {
|
||
// Apply cursor rewind for safety, clamping to 0 to avoid underflow
|
||
let rewind_ms = (cursor_rewind_seconds as i64) * 1000;
|
||
let rewound = (ts - rewind_ms).max(0);
|
||
if let Some(dt) = chrono::DateTime::from_timestamp_millis(rewound) {
|
||
params.push(("updated_after", dt.to_rfc3339()));
|
||
}
|
||
// If conversion fails (shouldn't happen with max(0)), omit the param
|
||
// and fetch all issues (safe fallback).
|
||
}
|
||
|
||
let (issues, headers) = self
|
||
.request_with_headers::<Vec<GitLabIssue>>(
|
||
&format!("/api/v4/projects/{gitlab_project_id}/issues"),
|
||
¶ms,
|
||
)
|
||
.await?;
|
||
|
||
for issue in issues.iter() {
|
||
yield issue.clone();
|
||
}
|
||
|
||
// Check for next page
|
||
let next_page = headers
|
||
.get("x-next-page")
|
||
.and_then(|v| v.to_str().ok())
|
||
.and_then(|s| s.parse::<u32>().ok());
|
||
|
||
match next_page {
|
||
Some(np) if !issues.is_empty() => page = np,
|
||
_ => break,
|
||
}
|
||
}
|
||
})
|
||
}
|
||
|
||
/// Paginate through discussions for an issue.
|
||
pub fn paginate_issue_discussions(
|
||
&self,
|
||
gitlab_project_id: i64,
|
||
issue_iid: i64,
|
||
) -> Pin<Box<dyn Stream<Item = Result<GitLabDiscussion>> + Send + '_>> {
|
||
Box::pin(async_stream::try_stream! {
|
||
let mut page = 1u32;
|
||
let per_page = 100u32;
|
||
|
||
loop {
|
||
let params = vec![
|
||
("per_page", per_page.to_string()),
|
||
("page", page.to_string()),
|
||
];
|
||
|
||
let (discussions, headers) = self
|
||
.request_with_headers::<Vec<GitLabDiscussion>>(
|
||
&format!("/api/v4/projects/{gitlab_project_id}/issues/{issue_iid}/discussions"),
|
||
¶ms,
|
||
)
|
||
.await?;
|
||
|
||
for discussion in discussions.iter() {
|
||
yield discussion.clone();
|
||
}
|
||
|
||
// Check for next page
|
||
let next_page = headers
|
||
.get("x-next-page")
|
||
.and_then(|v| v.to_str().ok())
|
||
.and_then(|s| s.parse::<u32>().ok());
|
||
|
||
match next_page {
|
||
Some(np) if !discussions.is_empty() => page = np,
|
||
_ => break,
|
||
}
|
||
}
|
||
})
|
||
}
|
||
|
||
/// Make request and return response with headers for pagination.
|
||
async fn request_with_headers<T: serde::de::DeserializeOwned>(
|
||
&self,
|
||
path: &str,
|
||
params: &[(&str, String)],
|
||
) -> Result<(T, HeaderMap)> {
|
||
self.rate_limiter.lock().await.acquire().await;
|
||
|
||
let url = format!("{}{}", self.base_url, path);
|
||
tracing::debug!(url = %url, "GitLab request");
|
||
|
||
let response = self
|
||
.client
|
||
.get(&url)
|
||
.header("PRIVATE-TOKEN", &self.token)
|
||
.query(params)
|
||
.send()
|
||
.await
|
||
.map_err(|e| GiError::GitLabNetworkError {
|
||
base_url: self.base_url.clone(),
|
||
source: Some(e),
|
||
})?;
|
||
|
||
let headers = response.headers().clone();
|
||
let data = self.handle_response(response, path).await?;
|
||
Ok((data, headers))
|
||
}
|
||
}
|
||
```
|
||
|
||
**Note:** Requires adding `async-stream` and `futures` to Cargo.toml:
|
||
|
||
```toml
|
||
# Cargo.toml additions
|
||
async-stream = "0.3"
|
||
futures = "0.3"
|
||
```
|
||
|
||
---
|
||
|
||
## Orchestration: Dependent Discussion Sync
|
||
|
||
### Canonical Pattern (CP1)
|
||
|
||
When `gi ingest --type=issues` runs, it follows this orchestration:
|
||
|
||
1. **Ingest issues** (cursor-based, with incremental cursor updates per page)
|
||
|
||
2. **Collect touched issues** - For each issue that passed cursor tuple filtering, record:
|
||
- `local_issue_id`
|
||
- `issue_iid`
|
||
- `issue_updated_at`
|
||
- `discussions_synced_for_updated_at` (from DB)
|
||
|
||
3. **Filter for discussion sync** - Enqueue issues where:
|
||
```
|
||
issue.updated_at > issues.discussions_synced_for_updated_at
|
||
```
|
||
This prevents re-fetching discussions for issues that haven't changed, even with cursor rewind.
|
||
|
||
4. **Execute discussion sync** with bounded concurrency (`dependent_concurrency` from config)
|
||
|
||
5. **Update watermark** - After each issue's discussions are successfully ingested:
|
||
```sql
|
||
UPDATE issues SET discussions_synced_for_updated_at = ? WHERE id = ?
|
||
```
|
||
|
||
**Invariant:** A rerun MUST NOT refetch discussions for issues whose `updated_at` has not advanced, even with cursor rewind.
|
||
|
||
---
|
||
|
||
## Ingestion Logic
|
||
|
||
### Runtime Strategy
|
||
|
||
**Decision:** Use single-threaded Tokio runtime (`flavor = "current_thread"`) for CP1.
|
||
|
||
**Rationale:**
|
||
- `rusqlite::Connection` is `!Send`, which conflicts with multi-threaded runtimes
|
||
- Single-threaded runtime avoids Send bounds entirely
|
||
- Concurrency for discussion fetches uses `tokio::task::spawn_local` + `LocalSet`
|
||
- Keeps code simple; can upgrade to channel-based DB writer in CP2 if needed
|
||
|
||
```rust
|
||
// src/main.rs
|
||
#[tokio::main(flavor = "current_thread")]
|
||
async fn main() -> Result<()> {
|
||
// ...
|
||
}
|
||
```
|
||
|
||
### Issue Ingestion
|
||
|
||
```rust
|
||
// src/ingestion/issues.rs
|
||
|
||
use futures::StreamExt;
|
||
use rusqlite::Connection;
|
||
use tracing::{debug, info};
|
||
|
||
use crate::core::config::Config;
|
||
use crate::core::error::Result;
|
||
use crate::core::payloads::store_payload;
|
||
use crate::core::time::now_ms;
|
||
use crate::gitlab::client::GitLabClient;
|
||
use crate::gitlab::transformers::issue::{extract_labels, transform_issue};
|
||
|
||
/// Result of issue ingestion.
|
||
#[derive(Debug, Default)]
|
||
pub struct IngestIssuesResult {
|
||
pub fetched: usize,
|
||
pub upserted: usize,
|
||
pub labels_created: usize,
|
||
/// Issues that need discussion sync (updated_at advanced)
|
||
pub issues_needing_discussion_sync: Vec<IssueForDiscussionSync>,
|
||
}
|
||
|
||
/// Info needed to sync discussions for an issue.
|
||
#[derive(Debug, Clone)]
|
||
pub struct IssueForDiscussionSync {
|
||
pub local_issue_id: i64,
|
||
pub iid: i64,
|
||
pub updated_at: i64,
|
||
}
|
||
|
||
/// Ingest issues for a project.
|
||
/// Returns list of issues that need discussion sync.
|
||
pub async fn ingest_issues(
|
||
conn: &Connection,
|
||
client: &GitLabClient,
|
||
config: &Config,
|
||
project_id: i64, // Local DB project ID
|
||
gitlab_project_id: i64, // GitLab project ID
|
||
) -> Result<IngestIssuesResult> {
|
||
let mut result = IngestIssuesResult::default();
|
||
|
||
// Get current cursor
|
||
let cursor = get_cursor(conn, project_id)?;
|
||
let cursor_updated_at = cursor.0;
|
||
let cursor_gitlab_id = cursor.1;
|
||
|
||
let mut last_updated_at: Option<i64> = None;
|
||
let mut last_gitlab_id: Option<i64> = None;
|
||
let mut issues_in_page: Vec<(i64, i64, i64)> = Vec::new(); // (local_id, iid, updated_at)
|
||
|
||
// Fetch issues with pagination
|
||
let mut stream = client.paginate_issues(
|
||
gitlab_project_id,
|
||
cursor_updated_at,
|
||
config.sync.cursor_rewind_seconds,
|
||
);
|
||
|
||
while let Some(issue_result) = stream.next().await {
|
||
let issue = issue_result?;
|
||
result.fetched += 1;
|
||
|
||
let issue_updated_at = crate::core::time::iso_to_ms(&issue.updated_at);
|
||
|
||
// Apply cursor filtering for tuple semantics
|
||
if let (Some(cursor_ts), Some(cursor_id)) = (cursor_updated_at, cursor_gitlab_id) {
|
||
if issue_updated_at < cursor_ts {
|
||
continue;
|
||
}
|
||
if issue_updated_at == cursor_ts && issue.id <= cursor_id {
|
||
continue;
|
||
}
|
||
}
|
||
|
||
// Begin transaction for this issue (atomicity + performance)
|
||
let tx = conn.unchecked_transaction()?;
|
||
|
||
// Store raw payload
|
||
let payload_id = store_payload(
|
||
&tx,
|
||
project_id,
|
||
"issue",
|
||
&issue.id.to_string(),
|
||
&issue,
|
||
config.storage.compress_raw_payloads,
|
||
)?;
|
||
|
||
// Transform and upsert issue
|
||
let normalized = transform_issue(&issue, project_id);
|
||
let changes = upsert_issue(&tx, &normalized, payload_id)?;
|
||
if changes > 0 {
|
||
result.upserted += 1;
|
||
}
|
||
|
||
// Get local issue ID for label linking
|
||
let local_issue_id = get_local_issue_id(&tx, normalized.gitlab_id)?;
|
||
|
||
// Clear existing label links (ensures removed labels are unlinked)
|
||
clear_issue_labels(&tx, local_issue_id)?;
|
||
|
||
// Extract and upsert labels (name-only for CP1)
|
||
let labels = extract_labels(&issue, project_id);
|
||
for label in &labels {
|
||
let created = upsert_label(&tx, label)?;
|
||
if created {
|
||
result.labels_created += 1;
|
||
}
|
||
|
||
// Link issue to label
|
||
let label_id = get_label_id(&tx, project_id, &label.name)?;
|
||
link_issue_label(&tx, local_issue_id, label_id)?;
|
||
}
|
||
|
||
tx.commit()?;
|
||
|
||
// Track for discussion sync eligibility
|
||
issues_in_page.push((local_issue_id, issue.iid, issue_updated_at));
|
||
|
||
// Track for cursor update
|
||
last_updated_at = Some(issue_updated_at);
|
||
last_gitlab_id = Some(issue.id);
|
||
|
||
// Incremental cursor update every 100 issues (page boundary)
|
||
// This ensures crashes don't cause massive refetch
|
||
if result.fetched % 100 == 0 {
|
||
if let (Some(updated_at), Some(gitlab_id)) = (last_updated_at, last_gitlab_id) {
|
||
update_cursor(conn, project_id, "issues", updated_at, gitlab_id)?;
|
||
}
|
||
}
|
||
}
|
||
|
||
// Final cursor update
|
||
if let (Some(updated_at), Some(gitlab_id)) = (last_updated_at, last_gitlab_id) {
|
||
update_cursor(conn, project_id, "issues", updated_at, gitlab_id)?;
|
||
}
|
||
|
||
// Determine which issues need discussion sync (updated_at advanced)
|
||
for (local_issue_id, iid, updated_at) in issues_in_page {
|
||
let synced_at = get_discussions_synced_at(conn, local_issue_id)?;
|
||
if synced_at.is_none() || updated_at > synced_at.unwrap() {
|
||
result.issues_needing_discussion_sync.push(IssueForDiscussionSync {
|
||
local_issue_id,
|
||
iid,
|
||
updated_at,
|
||
});
|
||
}
|
||
}
|
||
|
||
info!(
|
||
project_id,
|
||
fetched = result.fetched,
|
||
upserted = result.upserted,
|
||
labels_created = result.labels_created,
|
||
need_discussion_sync = result.issues_needing_discussion_sync.len(),
|
||
"Issue ingestion complete"
|
||
);
|
||
|
||
Ok(result)
|
||
}
|
||
|
||
fn get_cursor(conn: &Connection, project_id: i64) -> Result<(Option<i64>, Option<i64>)> {
|
||
let mut stmt = conn.prepare(
|
||
"SELECT updated_at_cursor, tie_breaker_id FROM sync_cursors
|
||
WHERE project_id = ? AND resource_type = 'issues'"
|
||
)?;
|
||
|
||
let result = stmt.query_row([project_id], |row| {
|
||
Ok((row.get::<_, Option<i64>>(0)?, row.get::<_, Option<i64>>(1)?))
|
||
});
|
||
|
||
match result {
|
||
Ok(cursor) => Ok(cursor),
|
||
Err(rusqlite::Error::QueryReturnedNoRows) => Ok((None, None)),
|
||
Err(e) => Err(e.into()),
|
||
}
|
||
}
|
||
|
||
fn get_discussions_synced_at(conn: &Connection, issue_id: i64) -> Result<Option<i64>> {
|
||
let result = conn.query_row(
|
||
"SELECT discussions_synced_for_updated_at FROM issues WHERE id = ?",
|
||
[issue_id],
|
||
|row| row.get(0),
|
||
);
|
||
|
||
match result {
|
||
Ok(ts) => Ok(ts),
|
||
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
|
||
Err(e) => Err(e.into()),
|
||
}
|
||
}
|
||
|
||
fn upsert_issue(
|
||
conn: &Connection,
|
||
issue: &crate::gitlab::transformers::issue::NormalizedIssue,
|
||
payload_id: Option<i64>,
|
||
) -> Result<usize> {
|
||
let changes = conn.execute(
|
||
"INSERT INTO issues (
|
||
gitlab_id, project_id, iid, title, description, state,
|
||
author_username, created_at, updated_at, last_seen_at, web_url, raw_payload_id
|
||
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
|
||
ON CONFLICT(gitlab_id) DO UPDATE SET
|
||
title = excluded.title,
|
||
description = excluded.description,
|
||
state = excluded.state,
|
||
updated_at = excluded.updated_at,
|
||
last_seen_at = excluded.last_seen_at,
|
||
raw_payload_id = excluded.raw_payload_id",
|
||
rusqlite::params![
|
||
issue.gitlab_id,
|
||
issue.project_id,
|
||
issue.iid,
|
||
issue.title,
|
||
issue.description,
|
||
issue.state,
|
||
issue.author_username,
|
||
issue.created_at,
|
||
issue.updated_at,
|
||
issue.last_seen_at,
|
||
issue.web_url,
|
||
payload_id,
|
||
],
|
||
)?;
|
||
Ok(changes)
|
||
}
|
||
|
||
fn get_local_issue_id(conn: &Connection, gitlab_id: i64) -> Result<i64> {
|
||
Ok(conn.query_row(
|
||
"SELECT id FROM issues WHERE gitlab_id = ?",
|
||
[gitlab_id],
|
||
|row| row.get(0),
|
||
)?)
|
||
}
|
||
|
||
fn clear_issue_labels(conn: &Connection, issue_id: i64) -> Result<()> {
|
||
conn.execute("DELETE FROM issue_labels WHERE issue_id = ?", [issue_id])?;
|
||
Ok(())
|
||
}
|
||
|
||
fn upsert_label(
|
||
conn: &Connection,
|
||
label: &crate::gitlab::transformers::issue::NormalizedLabel,
|
||
) -> Result<bool> {
|
||
// CP1: Name-only labels. Color/description columns remain NULL.
|
||
let changes = conn.execute(
|
||
"INSERT INTO labels (project_id, name)
|
||
VALUES (?1, ?2)
|
||
ON CONFLICT(project_id, name) DO NOTHING",
|
||
rusqlite::params![label.project_id, label.name],
|
||
)?;
|
||
Ok(changes > 0)
|
||
}
|
||
|
||
fn get_label_id(conn: &Connection, project_id: i64, name: &str) -> Result<i64> {
|
||
Ok(conn.query_row(
|
||
"SELECT id FROM labels WHERE project_id = ? AND name = ?",
|
||
rusqlite::params![project_id, name],
|
||
|row| row.get(0),
|
||
)?)
|
||
}
|
||
|
||
fn link_issue_label(conn: &Connection, issue_id: i64, label_id: i64) -> Result<()> {
|
||
conn.execute(
|
||
"INSERT OR IGNORE INTO issue_labels (issue_id, label_id) VALUES (?, ?)",
|
||
[issue_id, label_id],
|
||
)?;
|
||
Ok(())
|
||
}
|
||
|
||
fn update_cursor(
|
||
conn: &Connection,
|
||
project_id: i64,
|
||
resource_type: &str,
|
||
updated_at: i64,
|
||
gitlab_id: i64,
|
||
) -> Result<()> {
|
||
conn.execute(
|
||
"INSERT INTO sync_cursors (project_id, resource_type, updated_at_cursor, tie_breaker_id)
|
||
VALUES (?1, ?2, ?3, ?4)
|
||
ON CONFLICT(project_id, resource_type) DO UPDATE SET
|
||
updated_at_cursor = excluded.updated_at_cursor,
|
||
tie_breaker_id = excluded.tie_breaker_id",
|
||
rusqlite::params![project_id, resource_type, updated_at, gitlab_id],
|
||
)?;
|
||
Ok(())
|
||
}
|
||
```
|
||
|
||
### Discussion Ingestion
|
||
|
||
```rust
|
||
// src/ingestion/discussions.rs
|
||
|
||
use futures::StreamExt;
|
||
use rusqlite::Connection;
|
||
use tracing::debug;
|
||
|
||
use crate::core::config::Config;
|
||
use crate::core::error::Result;
|
||
use crate::core::payloads::store_payload;
|
||
use crate::gitlab::client::GitLabClient;
|
||
use crate::gitlab::transformers::discussion::{transform_discussion, transform_notes};
|
||
|
||
/// Result of discussion ingestion for a single issue.
|
||
#[derive(Debug, Default)]
|
||
pub struct IngestDiscussionsResult {
|
||
pub discussions_fetched: usize,
|
||
pub discussions_upserted: usize,
|
||
pub notes_upserted: usize,
|
||
pub system_notes_count: usize,
|
||
}
|
||
|
||
/// Ingest discussions for a single issue.
|
||
/// Called only when issue.updated_at > discussions_synced_for_updated_at.
|
||
pub async fn ingest_issue_discussions(
|
||
conn: &Connection,
|
||
client: &GitLabClient,
|
||
config: &Config,
|
||
project_id: i64,
|
||
gitlab_project_id: i64,
|
||
issue_iid: i64,
|
||
local_issue_id: i64,
|
||
issue_updated_at: i64,
|
||
) -> Result<IngestDiscussionsResult> {
|
||
let mut result = IngestDiscussionsResult::default();
|
||
|
||
let mut stream = client.paginate_issue_discussions(gitlab_project_id, issue_iid);
|
||
|
||
while let Some(discussion_result) = stream.next().await {
|
||
let discussion = discussion_result?;
|
||
result.discussions_fetched += 1;
|
||
|
||
// Begin transaction for this discussion (atomicity + performance)
|
||
let tx = conn.unchecked_transaction()?;
|
||
|
||
// Store raw payload for discussion
|
||
let discussion_payload_id = store_payload(
|
||
&tx,
|
||
project_id,
|
||
"discussion",
|
||
&discussion.id,
|
||
&discussion,
|
||
config.storage.compress_raw_payloads,
|
||
)?;
|
||
|
||
// Transform and upsert discussion
|
||
let normalized = transform_discussion(&discussion, project_id, local_issue_id);
|
||
upsert_discussion(&tx, &normalized, discussion_payload_id)?;
|
||
result.discussions_upserted += 1;
|
||
|
||
// Get local discussion ID
|
||
let local_discussion_id = get_local_discussion_id(&tx, project_id, &discussion.id)?;
|
||
|
||
// Transform and upsert notes
|
||
let notes = transform_notes(&discussion, project_id);
|
||
for note in ¬es {
|
||
// Store raw payload for note
|
||
let gitlab_note = discussion.notes.iter().find(|n| n.id == note.gitlab_id);
|
||
let note_payload_id = if let Some(gn) = gitlab_note {
|
||
store_payload(
|
||
&tx,
|
||
project_id,
|
||
"note",
|
||
¬e.gitlab_id.to_string(),
|
||
gn,
|
||
config.storage.compress_raw_payloads,
|
||
)?
|
||
} else {
|
||
None
|
||
};
|
||
|
||
upsert_note(&tx, local_discussion_id, note, note_payload_id)?;
|
||
result.notes_upserted += 1;
|
||
|
||
if note.is_system {
|
||
result.system_notes_count += 1;
|
||
}
|
||
}
|
||
|
||
tx.commit()?;
|
||
}
|
||
|
||
// Mark discussions as synced for this issue version
|
||
mark_discussions_synced(conn, local_issue_id, issue_updated_at)?;
|
||
|
||
debug!(
|
||
project_id,
|
||
issue_iid,
|
||
discussions = result.discussions_fetched,
|
||
notes = result.notes_upserted,
|
||
"Issue discussions ingested"
|
||
);
|
||
|
||
Ok(result)
|
||
}
|
||
|
||
fn upsert_discussion(
|
||
conn: &Connection,
|
||
discussion: &crate::gitlab::transformers::discussion::NormalizedDiscussion,
|
||
payload_id: Option<i64>,
|
||
) -> Result<()> {
|
||
conn.execute(
|
||
"INSERT INTO discussions (
|
||
gitlab_discussion_id, project_id, issue_id, noteable_type,
|
||
individual_note, first_note_at, last_note_at, last_seen_at,
|
||
resolvable, resolved, raw_payload_id
|
||
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
|
||
ON CONFLICT(project_id, gitlab_discussion_id) DO UPDATE SET
|
||
first_note_at = excluded.first_note_at,
|
||
last_note_at = excluded.last_note_at,
|
||
last_seen_at = excluded.last_seen_at,
|
||
resolvable = excluded.resolvable,
|
||
resolved = excluded.resolved,
|
||
raw_payload_id = excluded.raw_payload_id",
|
||
rusqlite::params![
|
||
discussion.gitlab_discussion_id,
|
||
discussion.project_id,
|
||
discussion.issue_id,
|
||
discussion.noteable_type,
|
||
discussion.individual_note as i32,
|
||
discussion.first_note_at,
|
||
discussion.last_note_at,
|
||
discussion.last_seen_at,
|
||
discussion.resolvable as i32,
|
||
discussion.resolved as i32,
|
||
payload_id,
|
||
],
|
||
)?;
|
||
Ok(())
|
||
}
|
||
|
||
fn get_local_discussion_id(conn: &Connection, project_id: i64, gitlab_id: &str) -> Result<i64> {
|
||
Ok(conn.query_row(
|
||
"SELECT id FROM discussions WHERE project_id = ? AND gitlab_discussion_id = ?",
|
||
rusqlite::params![project_id, gitlab_id],
|
||
|row| row.get(0),
|
||
)?)
|
||
}
|
||
|
||
fn upsert_note(
|
||
conn: &Connection,
|
||
discussion_id: i64,
|
||
note: &crate::gitlab::transformers::discussion::NormalizedNote,
|
||
payload_id: Option<i64>,
|
||
) -> Result<()> {
|
||
conn.execute(
|
||
"INSERT INTO notes (
|
||
gitlab_id, discussion_id, project_id, note_type, is_system,
|
||
author_username, body, created_at, updated_at, last_seen_at,
|
||
position, resolvable, resolved, resolved_by, resolved_at,
|
||
raw_payload_id
|
||
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)
|
||
ON CONFLICT(gitlab_id) DO UPDATE SET
|
||
body = excluded.body,
|
||
updated_at = excluded.updated_at,
|
||
last_seen_at = excluded.last_seen_at,
|
||
resolved = excluded.resolved,
|
||
resolved_by = excluded.resolved_by,
|
||
resolved_at = excluded.resolved_at,
|
||
raw_payload_id = excluded.raw_payload_id",
|
||
rusqlite::params![
|
||
note.gitlab_id,
|
||
discussion_id,
|
||
note.project_id,
|
||
note.note_type,
|
||
note.is_system as i32,
|
||
note.author_username,
|
||
note.body,
|
||
note.created_at,
|
||
note.updated_at,
|
||
note.last_seen_at,
|
||
note.position,
|
||
note.resolvable as i32,
|
||
note.resolved as i32,
|
||
note.resolved_by,
|
||
note.resolved_at,
|
||
payload_id,
|
||
],
|
||
)?;
|
||
Ok(())
|
||
}
|
||
|
||
fn mark_discussions_synced(conn: &Connection, issue_id: i64, issue_updated_at: i64) -> Result<()> {
|
||
conn.execute(
|
||
"UPDATE issues SET discussions_synced_for_updated_at = ? WHERE id = ?",
|
||
rusqlite::params![issue_updated_at, issue_id],
|
||
)?;
|
||
Ok(())
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## CLI Commands
|
||
|
||
### `gi ingest --type=issues`
|
||
|
||
Fetch and store all issues from configured projects.
|
||
|
||
**Clap Definition:**
|
||
|
||
```rust
|
||
// src/cli/mod.rs (addition to Commands enum)
|
||
|
||
#[derive(Subcommand)]
|
||
pub enum Commands {
|
||
// ... existing commands ...
|
||
|
||
/// Ingest data from GitLab
|
||
Ingest {
|
||
/// Resource type to ingest
|
||
#[arg(long, value_parser = ["issues", "merge_requests"])]
|
||
r#type: String,
|
||
|
||
/// Filter to single project
|
||
#[arg(long)]
|
||
project: Option<String>,
|
||
|
||
/// Override stale sync lock
|
||
#[arg(long)]
|
||
force: bool,
|
||
},
|
||
|
||
/// List entities
|
||
List {
|
||
/// Entity type to list
|
||
#[arg(value_parser = ["issues", "mrs"])]
|
||
entity: String,
|
||
|
||
/// Maximum results
|
||
#[arg(long, default_value = "20")]
|
||
limit: usize,
|
||
|
||
/// Filter by project path
|
||
#[arg(long)]
|
||
project: Option<String>,
|
||
|
||
/// Filter by state
|
||
#[arg(long, value_parser = ["opened", "closed", "all"])]
|
||
state: Option<String>,
|
||
},
|
||
|
||
/// Count entities
|
||
Count {
|
||
/// Entity type to count
|
||
#[arg(value_parser = ["issues", "mrs", "discussions", "notes"])]
|
||
entity: String,
|
||
|
||
/// Filter by noteable type
|
||
#[arg(long, value_parser = ["issue", "mr"])]
|
||
r#type: Option<String>,
|
||
},
|
||
|
||
/// Show entity details
|
||
Show {
|
||
/// Entity type
|
||
#[arg(value_parser = ["issue", "mr"])]
|
||
entity: String,
|
||
|
||
/// Entity IID
|
||
iid: i64,
|
||
|
||
/// Project path (required if ambiguous)
|
||
#[arg(long)]
|
||
project: Option<String>,
|
||
},
|
||
}
|
||
```
|
||
|
||
**Output:**
|
||
```
|
||
Ingesting issues...
|
||
|
||
group/project-one: 1,234 issues fetched, 45 new labels
|
||
|
||
Fetching discussions (312 issues with updates)...
|
||
|
||
group/project-one: 312 issues → 1,234 discussions, 5,678 notes
|
||
|
||
Total: 1,234 issues, 1,234 discussions, 5,678 notes (excluding 1,234 system notes)
|
||
Skipped discussion sync for 922 unchanged issues.
|
||
```
|
||
|
||
### `gi list issues`
|
||
|
||
**Output:**
|
||
```
|
||
Issues (showing 20 of 3,801)
|
||
|
||
#1234 Authentication redesign opened @johndoe 3 days ago
|
||
#1233 Fix memory leak in cache closed @janedoe 5 days ago
|
||
#1232 Add dark mode support opened @bobsmith 1 week ago
|
||
...
|
||
```
|
||
|
||
### `gi count issues`
|
||
|
||
**Output:**
|
||
```
|
||
Issues: 3,801
|
||
```
|
||
|
||
### `gi show issue <iid>`
|
||
|
||
**Output:**
|
||
```
|
||
Issue #1234: Authentication redesign
|
||
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||
|
||
Project: group/project-one
|
||
State: opened
|
||
Author: @johndoe
|
||
Created: 2024-01-15
|
||
Updated: 2024-03-20
|
||
Labels: enhancement, auth
|
||
URL: https://gitlab.example.com/group/project-one/-/issues/1234
|
||
|
||
Description:
|
||
We need to redesign the authentication flow to support...
|
||
|
||
Discussions (5):
|
||
|
||
@janedoe (2024-01-16):
|
||
I agree we should move to JWT-based auth...
|
||
|
||
@johndoe (2024-01-16):
|
||
What about refresh token strategy?
|
||
|
||
@bobsmith (2024-01-17):
|
||
Have we considered OAuth2?
|
||
```
|
||
|
||
---
|
||
|
||
## Automated Tests
|
||
|
||
### Unit Tests
|
||
|
||
```rust
|
||
// tests/issue_transformer_tests.rs
|
||
|
||
#[cfg(test)]
|
||
mod tests {
|
||
use gi::gitlab::transformers::issue::*;
|
||
use gi::gitlab::types::*;
|
||
|
||
#[test]
|
||
fn transforms_gitlab_issue_to_normalized_schema() { /* ... */ }
|
||
|
||
#[test]
|
||
fn extracts_labels_from_issue_payload() { /* ... */ }
|
||
|
||
#[test]
|
||
fn handles_missing_optional_fields_gracefully() { /* ... */ }
|
||
|
||
#[test]
|
||
fn converts_iso_timestamps_to_ms_epoch() { /* ... */ }
|
||
|
||
#[test]
|
||
fn sets_last_seen_at_to_current_time() { /* ... */ }
|
||
}
|
||
```
|
||
|
||
```rust
|
||
// tests/discussion_transformer_tests.rs
|
||
|
||
#[cfg(test)]
|
||
mod tests {
|
||
use gi::gitlab::transformers::discussion::*;
|
||
|
||
#[test]
|
||
fn transforms_discussion_payload_to_normalized_schema() { /* ... */ }
|
||
|
||
#[test]
|
||
fn extracts_notes_array_from_discussion() { /* ... */ }
|
||
|
||
#[test]
|
||
fn sets_individual_note_flag_correctly() { /* ... */ }
|
||
|
||
#[test]
|
||
fn flags_system_notes_with_is_system_true() { /* ... */ }
|
||
|
||
#[test]
|
||
fn preserves_note_order_via_position_field() { /* ... */ }
|
||
|
||
#[test]
|
||
fn computes_first_note_at_and_last_note_at_correctly() { /* ... */ }
|
||
|
||
#[test]
|
||
fn computes_resolvable_and_resolved_status() { /* ... */ }
|
||
}
|
||
```
|
||
|
||
```rust
|
||
// tests/pagination_tests.rs
|
||
|
||
#[cfg(test)]
|
||
mod tests {
|
||
#[tokio::test]
|
||
async fn fetches_all_pages_when_multiple_exist() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn respects_per_page_parameter() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn follows_x_next_page_header_until_empty() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn falls_back_to_empty_page_stop_if_headers_missing() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn applies_cursor_rewind_for_tuple_semantics() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn clamps_negative_rewind_to_zero() { /* ... */ }
|
||
}
|
||
```
|
||
|
||
```rust
|
||
// tests/label_linkage_tests.rs
|
||
|
||
#[cfg(test)]
|
||
mod tests {
|
||
#[test]
|
||
fn clears_existing_labels_before_linking_new_set() { /* ... */ }
|
||
|
||
#[test]
|
||
fn removes_stale_label_links_on_issue_update() { /* ... */ }
|
||
|
||
#[test]
|
||
fn handles_issue_with_all_labels_removed() { /* ... */ }
|
||
|
||
#[test]
|
||
fn preserves_labels_that_still_exist() { /* ... */ }
|
||
}
|
||
```
|
||
|
||
```rust
|
||
// tests/discussion_watermark_tests.rs
|
||
|
||
#[cfg(test)]
|
||
mod tests {
|
||
#[tokio::test]
|
||
async fn skips_discussion_fetch_when_updated_at_unchanged() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn fetches_discussions_when_updated_at_advanced() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn updates_watermark_after_successful_discussion_sync() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn does_not_update_watermark_on_discussion_sync_failure() { /* ... */ }
|
||
}
|
||
```
|
||
|
||
### Integration Tests
|
||
|
||
```rust
|
||
// tests/issue_ingestion_tests.rs
|
||
|
||
#[cfg(test)]
|
||
mod tests {
|
||
use tempfile::TempDir;
|
||
use wiremock::{MockServer, Mock, ResponseTemplate};
|
||
use wiremock::matchers::{method, path_regex};
|
||
|
||
#[tokio::test]
|
||
async fn inserts_issues_into_database() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn creates_labels_from_issue_payloads() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn links_issues_to_labels_via_junction_table() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn removes_stale_label_links_on_resync() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn stores_raw_payload_for_each_issue() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn stores_raw_payload_for_each_discussion() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn updates_cursor_incrementally_per_page() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn resumes_from_cursor_on_subsequent_runs() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn handles_issues_with_no_labels() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn upserts_existing_issues_on_refetch() { /* ... */ }
|
||
|
||
#[tokio::test]
|
||
async fn skips_discussion_refetch_for_unchanged_issues() { /* ... */ }
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## Manual Smoke Tests
|
||
|
||
| Command | Expected Output | Pass Criteria |
|
||
|---------|-----------------|---------------|
|
||
| `gi ingest --type=issues` | Progress bar, final count | Completes without error |
|
||
| `gi list issues --limit=10` | Table of 10 issues | Shows iid, title, state, author |
|
||
| `gi list issues --project=group/project-one` | Filtered list | Only shows issues from that project |
|
||
| `gi count issues` | `Issues: N` | Count matches GitLab UI |
|
||
| `gi show issue 123` | Issue detail view | Shows title, description, labels, discussions, URL |
|
||
| `gi show issue 123` (ambiguous) | Prompt or error | Asks for `--project` clarification |
|
||
| `gi count discussions --type=issue` | `Issue Discussions: N` | Non-zero count |
|
||
| `gi count notes --type=issue` | `Issue Notes: N (excluding M system)` | Non-zero count |
|
||
| `gi sync-status` | Last sync time, cursor positions | Shows successful last run |
|
||
| `gi ingest --type=issues` (re-run) | `0 new issues` | Cursor prevents re-fetch |
|
||
| `gi ingest --type=issues` (re-run) | `Skipped discussion sync for N unchanged issues` | Watermark prevents refetch |
|
||
| `gi ingest --type=issues` (concurrent) | Lock error | Second run fails with clear message |
|
||
| Remove label from issue in GitLab, re-sync | Label link removed | Junction table reflects GitLab state |
|
||
|
||
---
|
||
|
||
## Data Integrity Checks
|
||
|
||
After successful ingestion, verify:
|
||
|
||
- [ ] `SELECT COUNT(*) FROM issues` matches GitLab issue count for configured projects
|
||
- [ ] Every issue has a corresponding `raw_payloads` row
|
||
- [ ] Every discussion has a corresponding `raw_payloads` row
|
||
- [ ] Labels in `issue_labels` junction all exist in `labels` table
|
||
- [ ] `issue_labels` count per issue matches GitLab UI label count
|
||
- [ ] `sync_cursors` has entry for each `(project_id, 'issues')` pair
|
||
- [ ] Re-running `gi ingest --type=issues` fetches 0 new items (cursor is current)
|
||
- [ ] Re-running skips discussion sync for unchanged issues (watermark works)
|
||
- [ ] `SELECT COUNT(*) FROM discussions WHERE noteable_type='Issue'` is non-zero
|
||
- [ ] Every discussion has at least one note
|
||
- [ ] `individual_note = 1` discussions have exactly one note
|
||
- [ ] `SELECT COUNT(*) FROM notes WHERE is_system = 1` matches system note count in CLI output
|
||
- [ ] After removing a label in GitLab and re-syncing, the link is removed from `issue_labels`
|
||
|
||
---
|
||
|
||
## Definition of Done
|
||
|
||
### Gate A: Issues Only (Must Pass First)
|
||
|
||
- [ ] `gi ingest --type=issues` fetches all issues from configured projects
|
||
- [ ] Issues stored with correct schema, including `last_seen_at`
|
||
- [ ] Cursor-based sync is resumable (re-run fetches only new/updated)
|
||
- [ ] Incremental cursor updates every 100 issues
|
||
- [ ] Raw payloads stored for each issue
|
||
- [ ] `gi list issues` and `gi count issues` work
|
||
|
||
### Gate B: Labels Correct (Must Pass)
|
||
|
||
- [ ] Labels extracted and stored (name-only)
|
||
- [ ] Label links created correctly
|
||
- [ ] **Stale label links removed on re-sync** (verified with test)
|
||
- [ ] Label count per issue matches GitLab
|
||
|
||
### Gate C: Dependent Discussion Sync (Must Pass)
|
||
|
||
- [ ] Discussions fetched for issues with `updated_at` advancement
|
||
- [ ] Notes stored with `is_system` flag correctly set
|
||
- [ ] Raw payloads stored for discussions and notes
|
||
- [ ] `discussions_synced_for_updated_at` watermark updated after sync
|
||
- [ ] **Unchanged issues skip discussion refetch** (verified with test)
|
||
- [ ] Bounded concurrency (`dependent_concurrency` respected)
|
||
|
||
### Gate D: Resumability Proof (Must Pass)
|
||
|
||
- [ ] Kill mid-run, rerun; bounded redo (cursor progress preserved)
|
||
- [ ] No redundant discussion refetch after crash recovery
|
||
- [ ] Single-flight lock prevents concurrent runs
|
||
|
||
### Final Gate (Must Pass)
|
||
|
||
- [ ] All unit tests pass (`cargo test`)
|
||
- [ ] All integration tests pass (mocked with wiremock)
|
||
- [ ] `cargo clippy` passes with no warnings
|
||
- [ ] `cargo fmt --check` passes
|
||
- [ ] Compiles with `--release`
|
||
|
||
### Hardening (Optional Before CP2)
|
||
|
||
- [ ] Edge cases: issues with 0 labels, 0 discussions
|
||
- [ ] Large pagination (100+ pages)
|
||
- [ ] Rate limit handling under sustained load
|
||
- [ ] Live tests pass against real GitLab instance
|
||
- [ ] Performance: 1000+ issues ingested in <5 min
|
||
|
||
---
|
||
|
||
## Implementation Order
|
||
|
||
1. **Runtime decision** (5 min)
|
||
- Confirm `#[tokio::main(flavor = "current_thread")]`
|
||
- Add note about upgrade path for CP2 if needed
|
||
|
||
2. **Cargo.toml updates** (5 min)
|
||
- Add `async-stream = "0.3"` and `futures = "0.3"`
|
||
|
||
3. **Database migration** (15 min)
|
||
- `migrations/002_issues.sql` with `discussions_synced_for_updated_at` column
|
||
- `raw_payload_id` on discussions table
|
||
- Update `MIGRATIONS` const in `src/core/db.rs`
|
||
|
||
4. **GitLab types** (15 min)
|
||
- Add types to `src/gitlab/types.rs` (no `labels_details`)
|
||
- Test deserialization with fixtures
|
||
|
||
5. **Transformers** (25 min)
|
||
- `src/gitlab/transformers/mod.rs`
|
||
- `src/gitlab/transformers/issue.rs` (simplified NormalizedLabel)
|
||
- `src/gitlab/transformers/discussion.rs`
|
||
- Unit tests
|
||
|
||
6. **GitLab client pagination** (25 min)
|
||
- Add `paginate_issues()` with underflow protection
|
||
- Add `paginate_issue_discussions()`
|
||
- Add `request_with_headers()` helper
|
||
|
||
7. **Issue ingestion** (45 min)
|
||
- `src/ingestion/mod.rs`
|
||
- `src/ingestion/issues.rs` with:
|
||
- Transaction batching
|
||
- `clear_issue_labels()` before linking
|
||
- Incremental cursor updates
|
||
- Return `issues_needing_discussion_sync`
|
||
- Unit + integration tests including label stale-link removal
|
||
|
||
8. **Discussion ingestion** (30 min)
|
||
- `src/ingestion/discussions.rs` with:
|
||
- Transaction batching
|
||
- `raw_payload_id` storage
|
||
- `mark_discussions_synced()` watermark update
|
||
- Integration tests including watermark behavior
|
||
|
||
9. **Orchestrator** (30 min)
|
||
- `src/ingestion/orchestrator.rs`
|
||
- Coordinates issue sync → filter for discussion needs → bounded discussion sync
|
||
- Integration tests
|
||
|
||
10. **CLI commands** (45 min)
|
||
- `gi ingest --type=issues`
|
||
- `gi list issues`
|
||
- `gi count issues|discussions|notes`
|
||
- `gi show issue <iid>`
|
||
- Enhanced `gi sync-status`
|
||
|
||
11. **Final validation** (20 min)
|
||
- `cargo test`
|
||
- `cargo clippy`
|
||
- Gate A/B/C/D verification
|
||
- Manual smoke tests
|
||
- Data integrity checks
|
||
|
||
---
|
||
|
||
## Risks & Mitigations
|
||
|
||
| Risk | Mitigation |
|
||
|------|------------|
|
||
| GitLab rate limiting during large sync | Respect `Retry-After`, exponential backoff, configurable concurrency |
|
||
| Discussion API N+1 problem (thousands of calls) | `dependent_concurrency` config limits parallel requests; watermark prevents refetch |
|
||
| Cursor drift if GitLab timestamp behavior changes | Rolling backfill window catches missed items |
|
||
| Large issues with 100+ discussions | Paginate discussions, bound memory usage |
|
||
| System notes pollute data | `is_system` flag allows filtering |
|
||
| Label deduplication across projects | Unique constraint on `(project_id, name)` |
|
||
| Stale label links accumulate | `clear_issue_labels()` before linking ensures correctness |
|
||
| Async stream complexity | Use `async-stream` crate for ergonomic generators |
|
||
| rusqlite + async runtime Send/locking pitfalls | Single-threaded runtime (`current_thread`) avoids Send bounds |
|
||
| Crash causes massive refetch | Incremental cursor updates every 100 issues |
|
||
| Cursor rewind causes discussion refetch | Per-issue watermark (`discussions_synced_for_updated_at`) |
|
||
| Timestamp underflow on rewind | Clamp to 0 with `.max(0)` |
|
||
|
||
---
|
||
|
||
## API Call Estimation
|
||
|
||
For a project with 3,000 issues:
|
||
- Issue list: `ceil(3000/100) = 30` calls
|
||
- Issue discussions (first run): `3000 × 1.2 average pages = 3,600` calls
|
||
- Issue discussions (subsequent runs, 10% updates): `300 × 1.2 = 360` calls
|
||
- **Total first run: ~3,630 calls per project**
|
||
- **Total subsequent run: ~390 calls per project** (90% savings from watermark)
|
||
|
||
At 10 requests/second:
|
||
- First run: ~6 minutes per project
|
||
- Subsequent run: ~40 seconds per project
|
||
|
||
---
|
||
|
||
## References
|
||
|
||
- [SPEC.md](../../SPEC.md) - Full system specification
|
||
- [checkpoint-0.md](checkpoint-0.md) - Project setup PRD
|
||
- [GitLab Issues API](https://docs.gitlab.com/ee/api/issues.html)
|
||
- [GitLab Discussions API](https://docs.gitlab.com/ee/api/discussions.html)
|
||
- [async-stream crate](https://docs.rs/async-stream) - Async generators for Rust
|
||
- [wiremock](https://docs.rs/wiremock) - HTTP mocking for tests
|