Files
gitlore/docs/prd/checkpoint-1.md
2026-01-28 15:49:14 -05:00

1686 lines
54 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Checkpoint 1: Issue Ingestion - PRD
> **Note:** The project was renamed from "gitlab-inbox" to "gitlore" and the CLI from "gi" to "lore". References to "gi" in this document should be read as "lore".
**Version:** 2.0
**Status:** Ready for Implementation
**Depends On:** Checkpoint 0 (Project Setup)
**Enables:** Checkpoint 2 (MR Ingestion)
---
## Overview
### Objective
Ingest all issues, labels, and issue discussions from configured GitLab repositories with resumable cursor-based incremental sync. This checkpoint establishes the core data ingestion pattern that will be reused for MRs in Checkpoint 2.
### Success Criteria
| Criterion | Validation |
|-----------|------------|
| `gi ingest --type=issues` fetches all issues | `gi count issues` matches GitLab UI |
| Labels extracted from issue payloads (name-only) | `labels` table populated |
| Label linkage reflects current GitLab state | Removed labels are unlinked on re-sync |
| Issue discussions fetched per-issue (dependent sync) | For issues whose `updated_at` advanced, discussions and notes upserted |
| Cursor-based sync is resumable | Re-running fetches 0 new items |
| Discussion sync skips unchanged issues | Per-issue watermark prevents redundant fetches |
| Sync tracking records all runs | `sync_runs` table has complete audit trail |
| Single-flight lock prevents concurrent runs | Second sync fails with clear error |
---
## Internal Gates
CP1 is validated incrementally via internal gates:
| Gate | Scope | Validation |
|------|-------|------------|
| **Gate A** | Issues only | Cursor + upsert + raw payloads + list/count/show working |
| **Gate B** | Labels correct | Stale-link removal verified; label count matches GitLab |
| **Gate C** | Dependent discussion sync | Watermark prevents redundant refetch; concurrency bounded |
| **Gate D** | Resumability proof | Kill mid-run, rerun; confirm bounded redo and no redundant discussion refetch |
---
## Deliverables
### 1. Project Structure Additions
Add the following to the existing Rust structure from Checkpoint 0:
```
gitlab-inbox/
├── src/
│ ├── cli/
│ │ └── commands/
│ │ ├── ingest.rs # gi ingest --type=issues|merge_requests
│ │ ├── list.rs # gi list issues|mrs
│ │ ├── count.rs # gi count issues|mrs|discussions|notes
│ │ └── show.rs # gi show issue|mr <iid>
│ ├── gitlab/
│ │ ├── types.rs # Add GitLabIssue, GitLabDiscussion, GitLabNote
│ │ └── transformers/
│ │ ├── mod.rs
│ │ ├── issue.rs # GitLab → normalized issue
│ │ └── discussion.rs # GitLab → normalized discussion/notes
│ └── ingestion/
│ ├── mod.rs
│ ├── orchestrator.rs # Coordinates issue + dependent discussion sync
│ ├── issues.rs # Issue fetcher with pagination
│ └── discussions.rs # Discussion fetcher (per-issue)
├── tests/
│ ├── issue_transformer_tests.rs
│ ├── discussion_transformer_tests.rs
│ ├── pagination_tests.rs
│ ├── issue_ingestion_tests.rs
│ ├── label_linkage_tests.rs # Verifies stale link removal
│ ├── discussion_watermark_tests.rs
│ └── fixtures/
│ ├── gitlab_issue.json
│ ├── gitlab_issues_page.json
│ ├── gitlab_discussion.json
│ └── gitlab_discussions_page.json
└── migrations/
└── 002_issues.sql
```
### 2. GitLab API Endpoints
**Issues (Bulk Fetch):**
```
GET /projects/:id/issues?scope=all&state=all&updated_after=X&order_by=updated_at&sort=asc&per_page=100
```
**Issue Discussions (Per-Issue Fetch):**
```
GET /projects/:id/issues/:iid/discussions?per_page=100&page=N
```
**Required Query Parameters:**
- `scope=all` - Include all issues, not just authored by current user
- `state=all` - Include closed issues (GitLab may default to open only)
**MVP Note (Labels):**
- CP1 stores labels by **name only** for maximum compatibility and stability.
- Label color/description ingestion is deferred (post-CP1) via Labels API if needed.
- This avoids relying on optional/variant payload shapes that differ across GitLab versions.
**Pagination:**
- Follow `x-next-page` header until empty/absent
- Fall back to empty-page detection if headers missing (robustness)
- Per-page maximum: 100
---
## Database Schema
### Migration 002_issues.sql
```sql
-- Issues table
CREATE TABLE issues (
id INTEGER PRIMARY KEY,
gitlab_id INTEGER UNIQUE NOT NULL,
project_id INTEGER NOT NULL REFERENCES projects(id),
iid INTEGER NOT NULL,
title TEXT,
description TEXT,
state TEXT, -- 'opened' | 'closed'
author_username TEXT,
created_at INTEGER, -- ms epoch UTC
updated_at INTEGER, -- ms epoch UTC
last_seen_at INTEGER NOT NULL, -- ms epoch UTC, updated on every upsert
-- Prevents re-fetching discussions on cursor rewind / reruns unless issue changed.
-- Set to issue.updated_at after successfully syncing all discussions for this issue.
discussions_synced_for_updated_at INTEGER,
web_url TEXT,
raw_payload_id INTEGER REFERENCES raw_payloads(id)
);
CREATE INDEX idx_issues_project_updated ON issues(project_id, updated_at);
CREATE INDEX idx_issues_author ON issues(author_username);
CREATE INDEX idx_issues_discussions_sync ON issues(project_id, discussions_synced_for_updated_at);
CREATE UNIQUE INDEX uq_issues_project_iid ON issues(project_id, iid);
-- Labels (derived from issue payloads)
-- CP1: Name-only for stability. Color/description deferred to Labels API integration.
-- Uniqueness is (project_id, name) since gitlab_id isn't always available.
CREATE TABLE labels (
id INTEGER PRIMARY KEY,
gitlab_id INTEGER, -- optional (populated if Labels API used later)
project_id INTEGER NOT NULL REFERENCES projects(id),
name TEXT NOT NULL,
color TEXT, -- nullable, populated later if needed
description TEXT -- nullable, populated later if needed
);
CREATE UNIQUE INDEX uq_labels_project_name ON labels(project_id, name);
CREATE INDEX idx_labels_name ON labels(name);
-- Issue-Label junction
-- IMPORTANT: On issue update, DELETE existing links then INSERT current set.
-- This ensures removed labels are unlinked (not just added).
CREATE TABLE issue_labels (
issue_id INTEGER REFERENCES issues(id) ON DELETE CASCADE,
label_id INTEGER REFERENCES labels(id) ON DELETE CASCADE,
PRIMARY KEY(issue_id, label_id)
);
CREATE INDEX idx_issue_labels_label ON issue_labels(label_id);
-- Discussion threads for issues
CREATE TABLE discussions (
id INTEGER PRIMARY KEY,
gitlab_discussion_id TEXT NOT NULL, -- GitLab's string ID (e.g., "6a9c1750b37d...")
project_id INTEGER NOT NULL REFERENCES projects(id),
issue_id INTEGER REFERENCES issues(id),
merge_request_id INTEGER, -- FK added in CP2 via ALTER TABLE
noteable_type TEXT NOT NULL, -- 'Issue' | 'MergeRequest'
individual_note INTEGER NOT NULL, -- 1 = standalone comment, 0 = threaded
first_note_at INTEGER, -- ms epoch UTC, for ordering discussions
last_note_at INTEGER, -- ms epoch UTC, for "recently active" queries
last_seen_at INTEGER NOT NULL, -- ms epoch UTC, updated on every upsert
resolvable INTEGER, -- MR discussions can be resolved
resolved INTEGER,
raw_payload_id INTEGER REFERENCES raw_payloads(id),
CHECK (
(noteable_type='Issue' AND issue_id IS NOT NULL AND merge_request_id IS NULL) OR
(noteable_type='MergeRequest' AND merge_request_id IS NOT NULL AND issue_id IS NULL)
)
);
CREATE UNIQUE INDEX uq_discussions_project_discussion_id ON discussions(project_id, gitlab_discussion_id);
CREATE INDEX idx_discussions_issue ON discussions(issue_id);
CREATE INDEX idx_discussions_mr ON discussions(merge_request_id);
CREATE INDEX idx_discussions_last_note ON discussions(last_note_at);
-- Notes belong to discussions (preserving thread context)
CREATE TABLE notes (
id INTEGER PRIMARY KEY,
gitlab_id INTEGER UNIQUE NOT NULL,
discussion_id INTEGER NOT NULL REFERENCES discussions(id),
project_id INTEGER NOT NULL REFERENCES projects(id),
note_type TEXT, -- 'DiscussionNote' | 'DiffNote' | null
is_system INTEGER NOT NULL DEFAULT 0, -- 1 for system notes (assignments, label changes)
author_username TEXT,
body TEXT,
created_at INTEGER, -- ms epoch UTC
updated_at INTEGER, -- ms epoch UTC
last_seen_at INTEGER NOT NULL, -- ms epoch UTC, updated on every upsert
position INTEGER, -- derived from array order in API response (0-indexed)
resolvable INTEGER,
resolved INTEGER,
resolved_by TEXT,
resolved_at INTEGER, -- ms epoch UTC
-- DiffNote position metadata (populated for MR DiffNotes in CP2)
position_old_path TEXT,
position_new_path TEXT,
position_old_line INTEGER,
position_new_line INTEGER,
raw_payload_id INTEGER REFERENCES raw_payloads(id)
);
CREATE INDEX idx_notes_discussion ON notes(discussion_id);
CREATE INDEX idx_notes_author ON notes(author_username);
CREATE INDEX idx_notes_system ON notes(is_system);
-- Update schema version
INSERT INTO schema_version (version, applied_at, description)
VALUES (2, strftime('%s', 'now') * 1000, 'Issues, labels, discussions, notes');
```
---
## GitLab Types
### Type Definitions
```rust
// src/gitlab/types.rs (additions)
use serde::Deserialize;
/// GitLab issue from the API.
#[derive(Debug, Clone, Deserialize)]
pub struct GitLabIssue {
pub id: i64, // GitLab global ID
pub iid: i64, // Project-scoped issue number
pub project_id: i64,
pub title: String,
pub description: Option<String>,
pub state: String, // "opened" | "closed"
pub created_at: String, // ISO 8601
pub updated_at: String, // ISO 8601
pub closed_at: Option<String>,
pub author: GitLabAuthor,
pub labels: Vec<String>, // Array of label names (CP1 canonical)
pub web_url: String,
// NOTE: labels_details is intentionally NOT modeled for CP1.
// The field name and shape varies across GitLab versions.
// Color/description can be fetched via Labels API if needed later.
}
#[derive(Debug, Clone, Deserialize)]
pub struct GitLabAuthor {
pub id: i64,
pub username: String,
pub name: String,
}
/// GitLab discussion (thread of notes).
#[derive(Debug, Clone, Deserialize)]
pub struct GitLabDiscussion {
pub id: String, // String ID like "6a9c1750b37d..."
pub individual_note: bool, // true = standalone comment
pub notes: Vec<GitLabNote>,
}
/// GitLab note (comment).
#[derive(Debug, Clone, Deserialize)]
pub struct GitLabNote {
pub id: i64,
#[serde(rename = "type")]
pub note_type: Option<String>, // "DiscussionNote" | "DiffNote" | null
pub body: String,
pub author: GitLabAuthor,
pub created_at: String, // ISO 8601
pub updated_at: String, // ISO 8601
pub system: bool, // true for system-generated notes
#[serde(default)]
pub resolvable: bool,
#[serde(default)]
pub resolved: bool,
pub resolved_by: Option<GitLabAuthor>,
pub resolved_at: Option<String>,
/// DiffNote specific (null for non-DiffNote)
pub position: Option<GitLabNotePosition>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct GitLabNotePosition {
pub old_path: Option<String>,
pub new_path: Option<String>,
pub old_line: Option<i32>,
pub new_line: Option<i32>,
}
```
---
## Transformers
### Issue Transformer
```rust
// src/gitlab/transformers/issue.rs
use crate::core::time::{iso_to_ms, now_ms};
use crate::gitlab::types::GitLabIssue;
/// Normalized issue ready for database insertion.
#[derive(Debug, Clone)]
pub struct NormalizedIssue {
pub gitlab_id: i64,
pub project_id: i64, // Local DB project ID
pub iid: i64,
pub title: String,
pub description: Option<String>,
pub state: String,
pub author_username: String,
pub created_at: i64, // ms epoch
pub updated_at: i64, // ms epoch
pub last_seen_at: i64, // ms epoch
pub web_url: String,
}
/// Normalized label ready for database insertion.
/// CP1: Name-only for stability.
#[derive(Debug, Clone)]
pub struct NormalizedLabel {
pub project_id: i64,
pub name: String,
}
/// Transform GitLab issue to normalized schema.
pub fn transform_issue(gitlab_issue: &GitLabIssue, local_project_id: i64) -> NormalizedIssue {
NormalizedIssue {
gitlab_id: gitlab_issue.id,
project_id: local_project_id,
iid: gitlab_issue.iid,
title: gitlab_issue.title.clone(),
description: gitlab_issue.description.clone(),
state: gitlab_issue.state.clone(),
author_username: gitlab_issue.author.username.clone(),
created_at: iso_to_ms(&gitlab_issue.created_at),
updated_at: iso_to_ms(&gitlab_issue.updated_at),
last_seen_at: now_ms(),
web_url: gitlab_issue.web_url.clone(),
}
}
/// Extract labels from GitLab issue (CP1: name-only).
pub fn extract_labels(gitlab_issue: &GitLabIssue, local_project_id: i64) -> Vec<NormalizedLabel> {
gitlab_issue
.labels
.iter()
.map(|name| NormalizedLabel {
project_id: local_project_id,
name: name.clone(),
})
.collect()
}
```
### Discussion Transformer
```rust
// src/gitlab/transformers/discussion.rs
use crate::core::time::{iso_to_ms, now_ms};
use crate::gitlab::types::GitLabDiscussion;
/// Normalized discussion ready for database insertion.
#[derive(Debug, Clone)]
pub struct NormalizedDiscussion {
pub gitlab_discussion_id: String,
pub project_id: i64,
pub issue_id: i64,
pub noteable_type: String, // "Issue"
pub individual_note: bool,
pub first_note_at: Option<i64>,
pub last_note_at: Option<i64>,
pub last_seen_at: i64,
pub resolvable: bool,
pub resolved: bool,
}
/// Normalized note ready for database insertion.
#[derive(Debug, Clone)]
pub struct NormalizedNote {
pub gitlab_id: i64,
pub project_id: i64,
pub note_type: Option<String>,
pub is_system: bool,
pub author_username: String,
pub body: String,
pub created_at: i64,
pub updated_at: i64,
pub last_seen_at: i64,
pub position: i32, // Array index in notes[]
pub resolvable: bool,
pub resolved: bool,
pub resolved_by: Option<String>,
pub resolved_at: Option<i64>,
}
/// Transform GitLab discussion to normalized schema.
pub fn transform_discussion(
gitlab_discussion: &GitLabDiscussion,
local_project_id: i64,
local_issue_id: i64,
) -> NormalizedDiscussion {
let note_times: Vec<i64> = gitlab_discussion
.notes
.iter()
.map(|n| iso_to_ms(&n.created_at))
.collect();
// Check if any note is resolvable
let resolvable = gitlab_discussion.notes.iter().any(|n| n.resolvable);
let resolved = resolvable
&& gitlab_discussion
.notes
.iter()
.all(|n| !n.resolvable || n.resolved);
NormalizedDiscussion {
gitlab_discussion_id: gitlab_discussion.id.clone(),
project_id: local_project_id,
issue_id: local_issue_id,
noteable_type: "Issue".to_string(),
individual_note: gitlab_discussion.individual_note,
first_note_at: note_times.iter().min().copied(),
last_note_at: note_times.iter().max().copied(),
last_seen_at: now_ms(),
resolvable,
resolved,
}
}
/// Transform GitLab notes to normalized schema.
pub fn transform_notes(
gitlab_discussion: &GitLabDiscussion,
local_project_id: i64,
) -> Vec<NormalizedNote> {
gitlab_discussion
.notes
.iter()
.enumerate()
.map(|(index, note)| NormalizedNote {
gitlab_id: note.id,
project_id: local_project_id,
note_type: note.note_type.clone(),
is_system: note.system,
author_username: note.author.username.clone(),
body: note.body.clone(),
created_at: iso_to_ms(&note.created_at),
updated_at: iso_to_ms(&note.updated_at),
last_seen_at: now_ms(),
position: index as i32,
resolvable: note.resolvable,
resolved: note.resolved,
resolved_by: note.resolved_by.as_ref().map(|a| a.username.clone()),
resolved_at: note.resolved_at.as_ref().map(|s| iso_to_ms(s)),
})
.collect()
}
```
---
## GitLab Client Additions
### Pagination with Async Streams
```rust
// src/gitlab/client.rs (additions)
use crate::gitlab::types::{GitLabDiscussion, GitLabIssue};
use reqwest::header::HeaderMap;
use std::pin::Pin;
use futures::Stream;
impl GitLabClient {
/// Paginate through issues for a project.
/// Returns a stream of issues that handles pagination automatically.
pub fn paginate_issues(
&self,
gitlab_project_id: i64,
updated_after: Option<i64>,
cursor_rewind_seconds: u32,
) -> Pin<Box<dyn Stream<Item = Result<GitLabIssue>> + Send + '_>> {
Box::pin(async_stream::try_stream! {
let mut page = 1u32;
let per_page = 100u32;
loop {
let mut params = vec![
("scope", "all".to_string()),
("state", "all".to_string()),
("order_by", "updated_at".to_string()),
("sort", "asc".to_string()),
("per_page", per_page.to_string()),
("page", page.to_string()),
];
if let Some(ts) = updated_after {
// Apply cursor rewind for safety, clamping to 0 to avoid underflow
let rewind_ms = (cursor_rewind_seconds as i64) * 1000;
let rewound = (ts - rewind_ms).max(0);
if let Some(dt) = chrono::DateTime::from_timestamp_millis(rewound) {
params.push(("updated_after", dt.to_rfc3339()));
}
// If conversion fails (shouldn't happen with max(0)), omit the param
// and fetch all issues (safe fallback).
}
let (issues, headers) = self
.request_with_headers::<Vec<GitLabIssue>>(
&format!("/api/v4/projects/{gitlab_project_id}/issues"),
&params,
)
.await?;
for issue in issues.iter() {
yield issue.clone();
}
// Check for next page
let next_page = headers
.get("x-next-page")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u32>().ok());
match next_page {
Some(np) if !issues.is_empty() => page = np,
_ => break,
}
}
})
}
/// Paginate through discussions for an issue.
pub fn paginate_issue_discussions(
&self,
gitlab_project_id: i64,
issue_iid: i64,
) -> Pin<Box<dyn Stream<Item = Result<GitLabDiscussion>> + Send + '_>> {
Box::pin(async_stream::try_stream! {
let mut page = 1u32;
let per_page = 100u32;
loop {
let params = vec![
("per_page", per_page.to_string()),
("page", page.to_string()),
];
let (discussions, headers) = self
.request_with_headers::<Vec<GitLabDiscussion>>(
&format!("/api/v4/projects/{gitlab_project_id}/issues/{issue_iid}/discussions"),
&params,
)
.await?;
for discussion in discussions.iter() {
yield discussion.clone();
}
// Check for next page
let next_page = headers
.get("x-next-page")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u32>().ok());
match next_page {
Some(np) if !discussions.is_empty() => page = np,
_ => break,
}
}
})
}
/// Make request and return response with headers for pagination.
async fn request_with_headers<T: serde::de::DeserializeOwned>(
&self,
path: &str,
params: &[(&str, String)],
) -> Result<(T, HeaderMap)> {
self.rate_limiter.lock().await.acquire().await;
let url = format!("{}{}", self.base_url, path);
tracing::debug!(url = %url, "GitLab request");
let response = self
.client
.get(&url)
.header("PRIVATE-TOKEN", &self.token)
.query(params)
.send()
.await
.map_err(|e| GiError::GitLabNetworkError {
base_url: self.base_url.clone(),
source: Some(e),
})?;
let headers = response.headers().clone();
let data = self.handle_response(response, path).await?;
Ok((data, headers))
}
}
```
**Note:** Requires adding `async-stream` and `futures` to Cargo.toml:
```toml
# Cargo.toml additions
async-stream = "0.3"
futures = "0.3"
```
---
## Orchestration: Dependent Discussion Sync
### Canonical Pattern (CP1)
When `gi ingest --type=issues` runs, it follows this orchestration:
1. **Ingest issues** (cursor-based, with incremental cursor updates per page)
2. **Collect touched issues** - For each issue that passed cursor tuple filtering, record:
- `local_issue_id`
- `issue_iid`
- `issue_updated_at`
- `discussions_synced_for_updated_at` (from DB)
3. **Filter for discussion sync** - Enqueue issues where:
```
issue.updated_at > issues.discussions_synced_for_updated_at
```
This prevents re-fetching discussions for issues that haven't changed, even with cursor rewind.
4. **Execute discussion sync** with bounded concurrency (`dependent_concurrency` from config)
5. **Update watermark** - After each issue's discussions are successfully ingested:
```sql
UPDATE issues SET discussions_synced_for_updated_at = ? WHERE id = ?
```
**Invariant:** A rerun MUST NOT refetch discussions for issues whose `updated_at` has not advanced, even with cursor rewind.
---
## Ingestion Logic
### Runtime Strategy
**Decision:** Use single-threaded Tokio runtime (`flavor = "current_thread"`) for CP1.
**Rationale:**
- `rusqlite::Connection` is `!Send`, which conflicts with multi-threaded runtimes
- Single-threaded runtime avoids Send bounds entirely
- Concurrency for discussion fetches uses `tokio::task::spawn_local` + `LocalSet`
- Keeps code simple; can upgrade to channel-based DB writer in CP2 if needed
```rust
// src/main.rs
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
// ...
}
```
### Issue Ingestion
```rust
// src/ingestion/issues.rs
use futures::StreamExt;
use rusqlite::Connection;
use tracing::{debug, info};
use crate::core::config::Config;
use crate::core::error::Result;
use crate::core::payloads::store_payload;
use crate::core::time::now_ms;
use crate::gitlab::client::GitLabClient;
use crate::gitlab::transformers::issue::{extract_labels, transform_issue};
/// Result of issue ingestion.
#[derive(Debug, Default)]
pub struct IngestIssuesResult {
pub fetched: usize,
pub upserted: usize,
pub labels_created: usize,
/// Issues that need discussion sync (updated_at advanced)
pub issues_needing_discussion_sync: Vec<IssueForDiscussionSync>,
}
/// Info needed to sync discussions for an issue.
#[derive(Debug, Clone)]
pub struct IssueForDiscussionSync {
pub local_issue_id: i64,
pub iid: i64,
pub updated_at: i64,
}
/// Ingest issues for a project.
/// Returns list of issues that need discussion sync.
pub async fn ingest_issues(
conn: &Connection,
client: &GitLabClient,
config: &Config,
project_id: i64, // Local DB project ID
gitlab_project_id: i64, // GitLab project ID
) -> Result<IngestIssuesResult> {
let mut result = IngestIssuesResult::default();
// Get current cursor
let cursor = get_cursor(conn, project_id)?;
let cursor_updated_at = cursor.0;
let cursor_gitlab_id = cursor.1;
let mut last_updated_at: Option<i64> = None;
let mut last_gitlab_id: Option<i64> = None;
let mut issues_in_page: Vec<(i64, i64, i64)> = Vec::new(); // (local_id, iid, updated_at)
// Fetch issues with pagination
let mut stream = client.paginate_issues(
gitlab_project_id,
cursor_updated_at,
config.sync.cursor_rewind_seconds,
);
while let Some(issue_result) = stream.next().await {
let issue = issue_result?;
result.fetched += 1;
let issue_updated_at = crate::core::time::iso_to_ms(&issue.updated_at);
// Apply cursor filtering for tuple semantics
if let (Some(cursor_ts), Some(cursor_id)) = (cursor_updated_at, cursor_gitlab_id) {
if issue_updated_at < cursor_ts {
continue;
}
if issue_updated_at == cursor_ts && issue.id <= cursor_id {
continue;
}
}
// Begin transaction for this issue (atomicity + performance)
let tx = conn.unchecked_transaction()?;
// Store raw payload
let payload_id = store_payload(
&tx,
project_id,
"issue",
&issue.id.to_string(),
&issue,
config.storage.compress_raw_payloads,
)?;
// Transform and upsert issue
let normalized = transform_issue(&issue, project_id);
let changes = upsert_issue(&tx, &normalized, payload_id)?;
if changes > 0 {
result.upserted += 1;
}
// Get local issue ID for label linking
let local_issue_id = get_local_issue_id(&tx, normalized.gitlab_id)?;
// Clear existing label links (ensures removed labels are unlinked)
clear_issue_labels(&tx, local_issue_id)?;
// Extract and upsert labels (name-only for CP1)
let labels = extract_labels(&issue, project_id);
for label in &labels {
let created = upsert_label(&tx, label)?;
if created {
result.labels_created += 1;
}
// Link issue to label
let label_id = get_label_id(&tx, project_id, &label.name)?;
link_issue_label(&tx, local_issue_id, label_id)?;
}
tx.commit()?;
// Track for discussion sync eligibility
issues_in_page.push((local_issue_id, issue.iid, issue_updated_at));
// Track for cursor update
last_updated_at = Some(issue_updated_at);
last_gitlab_id = Some(issue.id);
// Incremental cursor update every 100 issues (page boundary)
// This ensures crashes don't cause massive refetch
if result.fetched % 100 == 0 {
if let (Some(updated_at), Some(gitlab_id)) = (last_updated_at, last_gitlab_id) {
update_cursor(conn, project_id, "issues", updated_at, gitlab_id)?;
}
}
}
// Final cursor update
if let (Some(updated_at), Some(gitlab_id)) = (last_updated_at, last_gitlab_id) {
update_cursor(conn, project_id, "issues", updated_at, gitlab_id)?;
}
// Determine which issues need discussion sync (updated_at advanced)
for (local_issue_id, iid, updated_at) in issues_in_page {
let synced_at = get_discussions_synced_at(conn, local_issue_id)?;
if synced_at.is_none() || updated_at > synced_at.unwrap() {
result.issues_needing_discussion_sync.push(IssueForDiscussionSync {
local_issue_id,
iid,
updated_at,
});
}
}
info!(
project_id,
fetched = result.fetched,
upserted = result.upserted,
labels_created = result.labels_created,
need_discussion_sync = result.issues_needing_discussion_sync.len(),
"Issue ingestion complete"
);
Ok(result)
}
fn get_cursor(conn: &Connection, project_id: i64) -> Result<(Option<i64>, Option<i64>)> {
let mut stmt = conn.prepare(
"SELECT updated_at_cursor, tie_breaker_id FROM sync_cursors
WHERE project_id = ? AND resource_type = 'issues'"
)?;
let result = stmt.query_row([project_id], |row| {
Ok((row.get::<_, Option<i64>>(0)?, row.get::<_, Option<i64>>(1)?))
});
match result {
Ok(cursor) => Ok(cursor),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok((None, None)),
Err(e) => Err(e.into()),
}
}
fn get_discussions_synced_at(conn: &Connection, issue_id: i64) -> Result<Option<i64>> {
let result = conn.query_row(
"SELECT discussions_synced_for_updated_at FROM issues WHERE id = ?",
[issue_id],
|row| row.get(0),
);
match result {
Ok(ts) => Ok(ts),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
fn upsert_issue(
conn: &Connection,
issue: &crate::gitlab::transformers::issue::NormalizedIssue,
payload_id: Option<i64>,
) -> Result<usize> {
let changes = conn.execute(
"INSERT INTO issues (
gitlab_id, project_id, iid, title, description, state,
author_username, created_at, updated_at, last_seen_at, web_url, raw_payload_id
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
ON CONFLICT(gitlab_id) DO UPDATE SET
title = excluded.title,
description = excluded.description,
state = excluded.state,
updated_at = excluded.updated_at,
last_seen_at = excluded.last_seen_at,
raw_payload_id = excluded.raw_payload_id",
rusqlite::params![
issue.gitlab_id,
issue.project_id,
issue.iid,
issue.title,
issue.description,
issue.state,
issue.author_username,
issue.created_at,
issue.updated_at,
issue.last_seen_at,
issue.web_url,
payload_id,
],
)?;
Ok(changes)
}
fn get_local_issue_id(conn: &Connection, gitlab_id: i64) -> Result<i64> {
Ok(conn.query_row(
"SELECT id FROM issues WHERE gitlab_id = ?",
[gitlab_id],
|row| row.get(0),
)?)
}
fn clear_issue_labels(conn: &Connection, issue_id: i64) -> Result<()> {
conn.execute("DELETE FROM issue_labels WHERE issue_id = ?", [issue_id])?;
Ok(())
}
fn upsert_label(
conn: &Connection,
label: &crate::gitlab::transformers::issue::NormalizedLabel,
) -> Result<bool> {
// CP1: Name-only labels. Color/description columns remain NULL.
let changes = conn.execute(
"INSERT INTO labels (project_id, name)
VALUES (?1, ?2)
ON CONFLICT(project_id, name) DO NOTHING",
rusqlite::params![label.project_id, label.name],
)?;
Ok(changes > 0)
}
fn get_label_id(conn: &Connection, project_id: i64, name: &str) -> Result<i64> {
Ok(conn.query_row(
"SELECT id FROM labels WHERE project_id = ? AND name = ?",
rusqlite::params![project_id, name],
|row| row.get(0),
)?)
}
fn link_issue_label(conn: &Connection, issue_id: i64, label_id: i64) -> Result<()> {
conn.execute(
"INSERT OR IGNORE INTO issue_labels (issue_id, label_id) VALUES (?, ?)",
[issue_id, label_id],
)?;
Ok(())
}
fn update_cursor(
conn: &Connection,
project_id: i64,
resource_type: &str,
updated_at: i64,
gitlab_id: i64,
) -> Result<()> {
conn.execute(
"INSERT INTO sync_cursors (project_id, resource_type, updated_at_cursor, tie_breaker_id)
VALUES (?1, ?2, ?3, ?4)
ON CONFLICT(project_id, resource_type) DO UPDATE SET
updated_at_cursor = excluded.updated_at_cursor,
tie_breaker_id = excluded.tie_breaker_id",
rusqlite::params![project_id, resource_type, updated_at, gitlab_id],
)?;
Ok(())
}
```
### Discussion Ingestion
```rust
// src/ingestion/discussions.rs
use futures::StreamExt;
use rusqlite::Connection;
use tracing::debug;
use crate::core::config::Config;
use crate::core::error::Result;
use crate::core::payloads::store_payload;
use crate::gitlab::client::GitLabClient;
use crate::gitlab::transformers::discussion::{transform_discussion, transform_notes};
/// Result of discussion ingestion for a single issue.
#[derive(Debug, Default)]
pub struct IngestDiscussionsResult {
pub discussions_fetched: usize,
pub discussions_upserted: usize,
pub notes_upserted: usize,
pub system_notes_count: usize,
}
/// Ingest discussions for a single issue.
/// Called only when issue.updated_at > discussions_synced_for_updated_at.
pub async fn ingest_issue_discussions(
conn: &Connection,
client: &GitLabClient,
config: &Config,
project_id: i64,
gitlab_project_id: i64,
issue_iid: i64,
local_issue_id: i64,
issue_updated_at: i64,
) -> Result<IngestDiscussionsResult> {
let mut result = IngestDiscussionsResult::default();
let mut stream = client.paginate_issue_discussions(gitlab_project_id, issue_iid);
while let Some(discussion_result) = stream.next().await {
let discussion = discussion_result?;
result.discussions_fetched += 1;
// Begin transaction for this discussion (atomicity + performance)
let tx = conn.unchecked_transaction()?;
// Store raw payload for discussion
let discussion_payload_id = store_payload(
&tx,
project_id,
"discussion",
&discussion.id,
&discussion,
config.storage.compress_raw_payloads,
)?;
// Transform and upsert discussion
let normalized = transform_discussion(&discussion, project_id, local_issue_id);
upsert_discussion(&tx, &normalized, discussion_payload_id)?;
result.discussions_upserted += 1;
// Get local discussion ID
let local_discussion_id = get_local_discussion_id(&tx, project_id, &discussion.id)?;
// Transform and upsert notes
let notes = transform_notes(&discussion, project_id);
for note in &notes {
// Store raw payload for note
let gitlab_note = discussion.notes.iter().find(|n| n.id == note.gitlab_id);
let note_payload_id = if let Some(gn) = gitlab_note {
store_payload(
&tx,
project_id,
"note",
&note.gitlab_id.to_string(),
gn,
config.storage.compress_raw_payloads,
)?
} else {
None
};
upsert_note(&tx, local_discussion_id, note, note_payload_id)?;
result.notes_upserted += 1;
if note.is_system {
result.system_notes_count += 1;
}
}
tx.commit()?;
}
// Mark discussions as synced for this issue version
mark_discussions_synced(conn, local_issue_id, issue_updated_at)?;
debug!(
project_id,
issue_iid,
discussions = result.discussions_fetched,
notes = result.notes_upserted,
"Issue discussions ingested"
);
Ok(result)
}
fn upsert_discussion(
conn: &Connection,
discussion: &crate::gitlab::transformers::discussion::NormalizedDiscussion,
payload_id: Option<i64>,
) -> Result<()> {
conn.execute(
"INSERT INTO discussions (
gitlab_discussion_id, project_id, issue_id, noteable_type,
individual_note, first_note_at, last_note_at, last_seen_at,
resolvable, resolved, raw_payload_id
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
ON CONFLICT(project_id, gitlab_discussion_id) DO UPDATE SET
first_note_at = excluded.first_note_at,
last_note_at = excluded.last_note_at,
last_seen_at = excluded.last_seen_at,
resolvable = excluded.resolvable,
resolved = excluded.resolved,
raw_payload_id = excluded.raw_payload_id",
rusqlite::params![
discussion.gitlab_discussion_id,
discussion.project_id,
discussion.issue_id,
discussion.noteable_type,
discussion.individual_note as i32,
discussion.first_note_at,
discussion.last_note_at,
discussion.last_seen_at,
discussion.resolvable as i32,
discussion.resolved as i32,
payload_id,
],
)?;
Ok(())
}
fn get_local_discussion_id(conn: &Connection, project_id: i64, gitlab_id: &str) -> Result<i64> {
Ok(conn.query_row(
"SELECT id FROM discussions WHERE project_id = ? AND gitlab_discussion_id = ?",
rusqlite::params![project_id, gitlab_id],
|row| row.get(0),
)?)
}
fn upsert_note(
conn: &Connection,
discussion_id: i64,
note: &crate::gitlab::transformers::discussion::NormalizedNote,
payload_id: Option<i64>,
) -> Result<()> {
conn.execute(
"INSERT INTO notes (
gitlab_id, discussion_id, project_id, note_type, is_system,
author_username, body, created_at, updated_at, last_seen_at,
position, resolvable, resolved, resolved_by, resolved_at,
raw_payload_id
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)
ON CONFLICT(gitlab_id) DO UPDATE SET
body = excluded.body,
updated_at = excluded.updated_at,
last_seen_at = excluded.last_seen_at,
resolved = excluded.resolved,
resolved_by = excluded.resolved_by,
resolved_at = excluded.resolved_at,
raw_payload_id = excluded.raw_payload_id",
rusqlite::params![
note.gitlab_id,
discussion_id,
note.project_id,
note.note_type,
note.is_system as i32,
note.author_username,
note.body,
note.created_at,
note.updated_at,
note.last_seen_at,
note.position,
note.resolvable as i32,
note.resolved as i32,
note.resolved_by,
note.resolved_at,
payload_id,
],
)?;
Ok(())
}
fn mark_discussions_synced(conn: &Connection, issue_id: i64, issue_updated_at: i64) -> Result<()> {
conn.execute(
"UPDATE issues SET discussions_synced_for_updated_at = ? WHERE id = ?",
rusqlite::params![issue_updated_at, issue_id],
)?;
Ok(())
}
```
---
## CLI Commands
### `gi ingest --type=issues`
Fetch and store all issues from configured projects.
**Clap Definition:**
```rust
// src/cli/mod.rs (addition to Commands enum)
#[derive(Subcommand)]
pub enum Commands {
// ... existing commands ...
/// Ingest data from GitLab
Ingest {
/// Resource type to ingest
#[arg(long, value_parser = ["issues", "merge_requests"])]
r#type: String,
/// Filter to single project
#[arg(long)]
project: Option<String>,
/// Override stale sync lock
#[arg(long)]
force: bool,
},
/// List entities
List {
/// Entity type to list
#[arg(value_parser = ["issues", "mrs"])]
entity: String,
/// Maximum results
#[arg(long, default_value = "20")]
limit: usize,
/// Filter by project path
#[arg(long)]
project: Option<String>,
/// Filter by state
#[arg(long, value_parser = ["opened", "closed", "all"])]
state: Option<String>,
},
/// Count entities
Count {
/// Entity type to count
#[arg(value_parser = ["issues", "mrs", "discussions", "notes"])]
entity: String,
/// Filter by noteable type
#[arg(long, value_parser = ["issue", "mr"])]
r#type: Option<String>,
},
/// Show entity details
Show {
/// Entity type
#[arg(value_parser = ["issue", "mr"])]
entity: String,
/// Entity IID
iid: i64,
/// Project path (required if ambiguous)
#[arg(long)]
project: Option<String>,
},
}
```
**Output:**
```
Ingesting issues...
group/project-one: 1,234 issues fetched, 45 new labels
Fetching discussions (312 issues with updates)...
group/project-one: 312 issues → 1,234 discussions, 5,678 notes
Total: 1,234 issues, 1,234 discussions, 5,678 notes (excluding 1,234 system notes)
Skipped discussion sync for 922 unchanged issues.
```
### `gi list issues`
**Output:**
```
Issues (showing 20 of 3,801)
#1234 Authentication redesign opened @johndoe 3 days ago
#1233 Fix memory leak in cache closed @janedoe 5 days ago
#1232 Add dark mode support opened @bobsmith 1 week ago
...
```
### `gi count issues`
**Output:**
```
Issues: 3,801
```
### `gi show issue <iid>`
**Output:**
```
Issue #1234: Authentication redesign
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Project: group/project-one
State: opened
Author: @johndoe
Created: 2024-01-15
Updated: 2024-03-20
Labels: enhancement, auth
URL: https://gitlab.example.com/group/project-one/-/issues/1234
Description:
We need to redesign the authentication flow to support...
Discussions (5):
@janedoe (2024-01-16):
I agree we should move to JWT-based auth...
@johndoe (2024-01-16):
What about refresh token strategy?
@bobsmith (2024-01-17):
Have we considered OAuth2?
```
---
## Automated Tests
### Unit Tests
```rust
// tests/issue_transformer_tests.rs
#[cfg(test)]
mod tests {
use gi::gitlab::transformers::issue::*;
use gi::gitlab::types::*;
#[test]
fn transforms_gitlab_issue_to_normalized_schema() { /* ... */ }
#[test]
fn extracts_labels_from_issue_payload() { /* ... */ }
#[test]
fn handles_missing_optional_fields_gracefully() { /* ... */ }
#[test]
fn converts_iso_timestamps_to_ms_epoch() { /* ... */ }
#[test]
fn sets_last_seen_at_to_current_time() { /* ... */ }
}
```
```rust
// tests/discussion_transformer_tests.rs
#[cfg(test)]
mod tests {
use gi::gitlab::transformers::discussion::*;
#[test]
fn transforms_discussion_payload_to_normalized_schema() { /* ... */ }
#[test]
fn extracts_notes_array_from_discussion() { /* ... */ }
#[test]
fn sets_individual_note_flag_correctly() { /* ... */ }
#[test]
fn flags_system_notes_with_is_system_true() { /* ... */ }
#[test]
fn preserves_note_order_via_position_field() { /* ... */ }
#[test]
fn computes_first_note_at_and_last_note_at_correctly() { /* ... */ }
#[test]
fn computes_resolvable_and_resolved_status() { /* ... */ }
}
```
```rust
// tests/pagination_tests.rs
#[cfg(test)]
mod tests {
#[tokio::test]
async fn fetches_all_pages_when_multiple_exist() { /* ... */ }
#[tokio::test]
async fn respects_per_page_parameter() { /* ... */ }
#[tokio::test]
async fn follows_x_next_page_header_until_empty() { /* ... */ }
#[tokio::test]
async fn falls_back_to_empty_page_stop_if_headers_missing() { /* ... */ }
#[tokio::test]
async fn applies_cursor_rewind_for_tuple_semantics() { /* ... */ }
#[tokio::test]
async fn clamps_negative_rewind_to_zero() { /* ... */ }
}
```
```rust
// tests/label_linkage_tests.rs
#[cfg(test)]
mod tests {
#[test]
fn clears_existing_labels_before_linking_new_set() { /* ... */ }
#[test]
fn removes_stale_label_links_on_issue_update() { /* ... */ }
#[test]
fn handles_issue_with_all_labels_removed() { /* ... */ }
#[test]
fn preserves_labels_that_still_exist() { /* ... */ }
}
```
```rust
// tests/discussion_watermark_tests.rs
#[cfg(test)]
mod tests {
#[tokio::test]
async fn skips_discussion_fetch_when_updated_at_unchanged() { /* ... */ }
#[tokio::test]
async fn fetches_discussions_when_updated_at_advanced() { /* ... */ }
#[tokio::test]
async fn updates_watermark_after_successful_discussion_sync() { /* ... */ }
#[tokio::test]
async fn does_not_update_watermark_on_discussion_sync_failure() { /* ... */ }
}
```
### Integration Tests
```rust
// tests/issue_ingestion_tests.rs
#[cfg(test)]
mod tests {
use tempfile::TempDir;
use wiremock::{MockServer, Mock, ResponseTemplate};
use wiremock::matchers::{method, path_regex};
#[tokio::test]
async fn inserts_issues_into_database() { /* ... */ }
#[tokio::test]
async fn creates_labels_from_issue_payloads() { /* ... */ }
#[tokio::test]
async fn links_issues_to_labels_via_junction_table() { /* ... */ }
#[tokio::test]
async fn removes_stale_label_links_on_resync() { /* ... */ }
#[tokio::test]
async fn stores_raw_payload_for_each_issue() { /* ... */ }
#[tokio::test]
async fn stores_raw_payload_for_each_discussion() { /* ... */ }
#[tokio::test]
async fn updates_cursor_incrementally_per_page() { /* ... */ }
#[tokio::test]
async fn resumes_from_cursor_on_subsequent_runs() { /* ... */ }
#[tokio::test]
async fn handles_issues_with_no_labels() { /* ... */ }
#[tokio::test]
async fn upserts_existing_issues_on_refetch() { /* ... */ }
#[tokio::test]
async fn skips_discussion_refetch_for_unchanged_issues() { /* ... */ }
}
```
---
## Manual Smoke Tests
| Command | Expected Output | Pass Criteria |
|---------|-----------------|---------------|
| `gi ingest --type=issues` | Progress bar, final count | Completes without error |
| `gi list issues --limit=10` | Table of 10 issues | Shows iid, title, state, author |
| `gi list issues --project=group/project-one` | Filtered list | Only shows issues from that project |
| `gi count issues` | `Issues: N` | Count matches GitLab UI |
| `gi show issue 123` | Issue detail view | Shows title, description, labels, discussions, URL |
| `gi show issue 123` (ambiguous) | Prompt or error | Asks for `--project` clarification |
| `gi count discussions --type=issue` | `Issue Discussions: N` | Non-zero count |
| `gi count notes --type=issue` | `Issue Notes: N (excluding M system)` | Non-zero count |
| `gi sync-status` | Last sync time, cursor positions | Shows successful last run |
| `gi ingest --type=issues` (re-run) | `0 new issues` | Cursor prevents re-fetch |
| `gi ingest --type=issues` (re-run) | `Skipped discussion sync for N unchanged issues` | Watermark prevents refetch |
| `gi ingest --type=issues` (concurrent) | Lock error | Second run fails with clear message |
| Remove label from issue in GitLab, re-sync | Label link removed | Junction table reflects GitLab state |
---
## Data Integrity Checks
After successful ingestion, verify:
- [ ] `SELECT COUNT(*) FROM issues` matches GitLab issue count for configured projects
- [ ] Every issue has a corresponding `raw_payloads` row
- [ ] Every discussion has a corresponding `raw_payloads` row
- [ ] Labels in `issue_labels` junction all exist in `labels` table
- [ ] `issue_labels` count per issue matches GitLab UI label count
- [ ] `sync_cursors` has entry for each `(project_id, 'issues')` pair
- [ ] Re-running `gi ingest --type=issues` fetches 0 new items (cursor is current)
- [ ] Re-running skips discussion sync for unchanged issues (watermark works)
- [ ] `SELECT COUNT(*) FROM discussions WHERE noteable_type='Issue'` is non-zero
- [ ] Every discussion has at least one note
- [ ] `individual_note = 1` discussions have exactly one note
- [ ] `SELECT COUNT(*) FROM notes WHERE is_system = 1` matches system note count in CLI output
- [ ] After removing a label in GitLab and re-syncing, the link is removed from `issue_labels`
---
## Definition of Done
### Gate A: Issues Only (Must Pass First)
- [ ] `gi ingest --type=issues` fetches all issues from configured projects
- [ ] Issues stored with correct schema, including `last_seen_at`
- [ ] Cursor-based sync is resumable (re-run fetches only new/updated)
- [ ] Incremental cursor updates every 100 issues
- [ ] Raw payloads stored for each issue
- [ ] `gi list issues` and `gi count issues` work
### Gate B: Labels Correct (Must Pass)
- [ ] Labels extracted and stored (name-only)
- [ ] Label links created correctly
- [ ] **Stale label links removed on re-sync** (verified with test)
- [ ] Label count per issue matches GitLab
### Gate C: Dependent Discussion Sync (Must Pass)
- [ ] Discussions fetched for issues with `updated_at` advancement
- [ ] Notes stored with `is_system` flag correctly set
- [ ] Raw payloads stored for discussions and notes
- [ ] `discussions_synced_for_updated_at` watermark updated after sync
- [ ] **Unchanged issues skip discussion refetch** (verified with test)
- [ ] Bounded concurrency (`dependent_concurrency` respected)
### Gate D: Resumability Proof (Must Pass)
- [ ] Kill mid-run, rerun; bounded redo (cursor progress preserved)
- [ ] No redundant discussion refetch after crash recovery
- [ ] Single-flight lock prevents concurrent runs
### Final Gate (Must Pass)
- [ ] All unit tests pass (`cargo test`)
- [ ] All integration tests pass (mocked with wiremock)
- [ ] `cargo clippy` passes with no warnings
- [ ] `cargo fmt --check` passes
- [ ] Compiles with `--release`
### Hardening (Optional Before CP2)
- [ ] Edge cases: issues with 0 labels, 0 discussions
- [ ] Large pagination (100+ pages)
- [ ] Rate limit handling under sustained load
- [ ] Live tests pass against real GitLab instance
- [ ] Performance: 1000+ issues ingested in <5 min
---
## Implementation Order
1. **Runtime decision** (5 min)
- Confirm `#[tokio::main(flavor = "current_thread")]`
- Add note about upgrade path for CP2 if needed
2. **Cargo.toml updates** (5 min)
- Add `async-stream = "0.3"` and `futures = "0.3"`
3. **Database migration** (15 min)
- `migrations/002_issues.sql` with `discussions_synced_for_updated_at` column
- `raw_payload_id` on discussions table
- Update `MIGRATIONS` const in `src/core/db.rs`
4. **GitLab types** (15 min)
- Add types to `src/gitlab/types.rs` (no `labels_details`)
- Test deserialization with fixtures
5. **Transformers** (25 min)
- `src/gitlab/transformers/mod.rs`
- `src/gitlab/transformers/issue.rs` (simplified NormalizedLabel)
- `src/gitlab/transformers/discussion.rs`
- Unit tests
6. **GitLab client pagination** (25 min)
- Add `paginate_issues()` with underflow protection
- Add `paginate_issue_discussions()`
- Add `request_with_headers()` helper
7. **Issue ingestion** (45 min)
- `src/ingestion/mod.rs`
- `src/ingestion/issues.rs` with:
- Transaction batching
- `clear_issue_labels()` before linking
- Incremental cursor updates
- Return `issues_needing_discussion_sync`
- Unit + integration tests including label stale-link removal
8. **Discussion ingestion** (30 min)
- `src/ingestion/discussions.rs` with:
- Transaction batching
- `raw_payload_id` storage
- `mark_discussions_synced()` watermark update
- Integration tests including watermark behavior
9. **Orchestrator** (30 min)
- `src/ingestion/orchestrator.rs`
- Coordinates issue sync → filter for discussion needs → bounded discussion sync
- Integration tests
10. **CLI commands** (45 min)
- `gi ingest --type=issues`
- `gi list issues`
- `gi count issues|discussions|notes`
- `gi show issue <iid>`
- Enhanced `gi sync-status`
11. **Final validation** (20 min)
- `cargo test`
- `cargo clippy`
- Gate A/B/C/D verification
- Manual smoke tests
- Data integrity checks
---
## Risks & Mitigations
| Risk | Mitigation |
|------|------------|
| GitLab rate limiting during large sync | Respect `Retry-After`, exponential backoff, configurable concurrency |
| Discussion API N+1 problem (thousands of calls) | `dependent_concurrency` config limits parallel requests; watermark prevents refetch |
| Cursor drift if GitLab timestamp behavior changes | Rolling backfill window catches missed items |
| Large issues with 100+ discussions | Paginate discussions, bound memory usage |
| System notes pollute data | `is_system` flag allows filtering |
| Label deduplication across projects | Unique constraint on `(project_id, name)` |
| Stale label links accumulate | `clear_issue_labels()` before linking ensures correctness |
| Async stream complexity | Use `async-stream` crate for ergonomic generators |
| rusqlite + async runtime Send/locking pitfalls | Single-threaded runtime (`current_thread`) avoids Send bounds |
| Crash causes massive refetch | Incremental cursor updates every 100 issues |
| Cursor rewind causes discussion refetch | Per-issue watermark (`discussions_synced_for_updated_at`) |
| Timestamp underflow on rewind | Clamp to 0 with `.max(0)` |
---
## API Call Estimation
For a project with 3,000 issues:
- Issue list: `ceil(3000/100) = 30` calls
- Issue discussions (first run): `3000 × 1.2 average pages = 3,600` calls
- Issue discussions (subsequent runs, 10% updates): `300 × 1.2 = 360` calls
- **Total first run: ~3,630 calls per project**
- **Total subsequent run: ~390 calls per project** (90% savings from watermark)
At 10 requests/second:
- First run: ~6 minutes per project
- Subsequent run: ~40 seconds per project
---
## References
- [SPEC.md](../../SPEC.md) - Full system specification
- [checkpoint-0.md](checkpoint-0.md) - Project setup PRD
- [GitLab Issues API](https://docs.gitlab.com/ee/api/issues.html)
- [GitLab Discussions API](https://docs.gitlab.com/ee/api/discussions.html)
- [async-stream crate](https://docs.rs/async-stream) - Async generators for Rust
- [wiremock](https://docs.rs/wiremock) - HTTP mocking for tests