README.md provides complete user documentation: - Installation via cargo install or build from source - Quick start guide with example commands - Configuration file format with all options documented - Full command reference for init, auth-test, doctor, ingest, list, show, count, sync-status, migrate, and version - Database schema overview covering projects, issues, milestones, assignees, labels, discussions, notes, and raw payloads - Development setup with test, lint, and debug commands SPEC.md updated from original TypeScript planning document: - Added note clarifying this is historical (implementation uses Rust) - Updated sqlite-vss references to sqlite-vec (deprecated library) - Added architecture overview with Technology Choices rationale - Expanded project structure showing all planned modules docs/prd/ contains detailed checkpoint planning: - checkpoint-0.md: Initial project vision and requirements - checkpoint-1.md: Revised planning after technology decisions These documents capture the evolution from initial concept through the decision to use Rust for performance and type safety. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
53 KiB
Checkpoint 1: Issue Ingestion - PRD
Version: 2.0 Status: Ready for Implementation Depends On: Checkpoint 0 (Project Setup) Enables: Checkpoint 2 (MR Ingestion)
Overview
Objective
Ingest all issues, labels, and issue discussions from configured GitLab repositories with resumable cursor-based incremental sync. This checkpoint establishes the core data ingestion pattern that will be reused for MRs in Checkpoint 2.
Success Criteria
| Criterion | Validation |
|---|---|
gi ingest --type=issues fetches all issues |
gi count issues matches GitLab UI |
| Labels extracted from issue payloads (name-only) | labels table populated |
| Label linkage reflects current GitLab state | Removed labels are unlinked on re-sync |
| Issue discussions fetched per-issue (dependent sync) | For issues whose updated_at advanced, discussions and notes upserted |
| Cursor-based sync is resumable | Re-running fetches 0 new items |
| Discussion sync skips unchanged issues | Per-issue watermark prevents redundant fetches |
| Sync tracking records all runs | sync_runs table has complete audit trail |
| Single-flight lock prevents concurrent runs | Second sync fails with clear error |
Internal Gates
CP1 is validated incrementally via internal gates:
| Gate | Scope | Validation |
|---|---|---|
| Gate A | Issues only | Cursor + upsert + raw payloads + list/count/show working |
| Gate B | Labels correct | Stale-link removal verified; label count matches GitLab |
| Gate C | Dependent discussion sync | Watermark prevents redundant refetch; concurrency bounded |
| Gate D | Resumability proof | Kill mid-run, rerun; confirm bounded redo and no redundant discussion refetch |
Deliverables
1. Project Structure Additions
Add the following to the existing Rust structure from Checkpoint 0:
gitlab-inbox/
├── src/
│ ├── cli/
│ │ └── commands/
│ │ ├── ingest.rs # gi ingest --type=issues|merge_requests
│ │ ├── list.rs # gi list issues|mrs
│ │ ├── count.rs # gi count issues|mrs|discussions|notes
│ │ └── show.rs # gi show issue|mr <iid>
│ ├── gitlab/
│ │ ├── types.rs # Add GitLabIssue, GitLabDiscussion, GitLabNote
│ │ └── transformers/
│ │ ├── mod.rs
│ │ ├── issue.rs # GitLab → normalized issue
│ │ └── discussion.rs # GitLab → normalized discussion/notes
│ └── ingestion/
│ ├── mod.rs
│ ├── orchestrator.rs # Coordinates issue + dependent discussion sync
│ ├── issues.rs # Issue fetcher with pagination
│ └── discussions.rs # Discussion fetcher (per-issue)
├── tests/
│ ├── issue_transformer_tests.rs
│ ├── discussion_transformer_tests.rs
│ ├── pagination_tests.rs
│ ├── issue_ingestion_tests.rs
│ ├── label_linkage_tests.rs # Verifies stale link removal
│ ├── discussion_watermark_tests.rs
│ └── fixtures/
│ ├── gitlab_issue.json
│ ├── gitlab_issues_page.json
│ ├── gitlab_discussion.json
│ └── gitlab_discussions_page.json
└── migrations/
└── 002_issues.sql
2. GitLab API Endpoints
Issues (Bulk Fetch):
GET /projects/:id/issues?scope=all&state=all&updated_after=X&order_by=updated_at&sort=asc&per_page=100
Issue Discussions (Per-Issue Fetch):
GET /projects/:id/issues/:iid/discussions?per_page=100&page=N
Required Query Parameters:
scope=all- Include all issues, not just authored by current userstate=all- Include closed issues (GitLab may default to open only)
MVP Note (Labels):
- CP1 stores labels by name only for maximum compatibility and stability.
- Label color/description ingestion is deferred (post-CP1) via Labels API if needed.
- This avoids relying on optional/variant payload shapes that differ across GitLab versions.
Pagination:
- Follow
x-next-pageheader until empty/absent - Fall back to empty-page detection if headers missing (robustness)
- Per-page maximum: 100
Database Schema
Migration 002_issues.sql
-- Issues table
CREATE TABLE issues (
id INTEGER PRIMARY KEY,
gitlab_id INTEGER UNIQUE NOT NULL,
project_id INTEGER NOT NULL REFERENCES projects(id),
iid INTEGER NOT NULL,
title TEXT,
description TEXT,
state TEXT, -- 'opened' | 'closed'
author_username TEXT,
created_at INTEGER, -- ms epoch UTC
updated_at INTEGER, -- ms epoch UTC
last_seen_at INTEGER NOT NULL, -- ms epoch UTC, updated on every upsert
-- Prevents re-fetching discussions on cursor rewind / reruns unless issue changed.
-- Set to issue.updated_at after successfully syncing all discussions for this issue.
discussions_synced_for_updated_at INTEGER,
web_url TEXT,
raw_payload_id INTEGER REFERENCES raw_payloads(id)
);
CREATE INDEX idx_issues_project_updated ON issues(project_id, updated_at);
CREATE INDEX idx_issues_author ON issues(author_username);
CREATE INDEX idx_issues_discussions_sync ON issues(project_id, discussions_synced_for_updated_at);
CREATE UNIQUE INDEX uq_issues_project_iid ON issues(project_id, iid);
-- Labels (derived from issue payloads)
-- CP1: Name-only for stability. Color/description deferred to Labels API integration.
-- Uniqueness is (project_id, name) since gitlab_id isn't always available.
CREATE TABLE labels (
id INTEGER PRIMARY KEY,
gitlab_id INTEGER, -- optional (populated if Labels API used later)
project_id INTEGER NOT NULL REFERENCES projects(id),
name TEXT NOT NULL,
color TEXT, -- nullable, populated later if needed
description TEXT -- nullable, populated later if needed
);
CREATE UNIQUE INDEX uq_labels_project_name ON labels(project_id, name);
CREATE INDEX idx_labels_name ON labels(name);
-- Issue-Label junction
-- IMPORTANT: On issue update, DELETE existing links then INSERT current set.
-- This ensures removed labels are unlinked (not just added).
CREATE TABLE issue_labels (
issue_id INTEGER REFERENCES issues(id) ON DELETE CASCADE,
label_id INTEGER REFERENCES labels(id) ON DELETE CASCADE,
PRIMARY KEY(issue_id, label_id)
);
CREATE INDEX idx_issue_labels_label ON issue_labels(label_id);
-- Discussion threads for issues
CREATE TABLE discussions (
id INTEGER PRIMARY KEY,
gitlab_discussion_id TEXT NOT NULL, -- GitLab's string ID (e.g., "6a9c1750b37d...")
project_id INTEGER NOT NULL REFERENCES projects(id),
issue_id INTEGER REFERENCES issues(id),
merge_request_id INTEGER, -- FK added in CP2 via ALTER TABLE
noteable_type TEXT NOT NULL, -- 'Issue' | 'MergeRequest'
individual_note INTEGER NOT NULL, -- 1 = standalone comment, 0 = threaded
first_note_at INTEGER, -- ms epoch UTC, for ordering discussions
last_note_at INTEGER, -- ms epoch UTC, for "recently active" queries
last_seen_at INTEGER NOT NULL, -- ms epoch UTC, updated on every upsert
resolvable INTEGER, -- MR discussions can be resolved
resolved INTEGER,
raw_payload_id INTEGER REFERENCES raw_payloads(id),
CHECK (
(noteable_type='Issue' AND issue_id IS NOT NULL AND merge_request_id IS NULL) OR
(noteable_type='MergeRequest' AND merge_request_id IS NOT NULL AND issue_id IS NULL)
)
);
CREATE UNIQUE INDEX uq_discussions_project_discussion_id ON discussions(project_id, gitlab_discussion_id);
CREATE INDEX idx_discussions_issue ON discussions(issue_id);
CREATE INDEX idx_discussions_mr ON discussions(merge_request_id);
CREATE INDEX idx_discussions_last_note ON discussions(last_note_at);
-- Notes belong to discussions (preserving thread context)
CREATE TABLE notes (
id INTEGER PRIMARY KEY,
gitlab_id INTEGER UNIQUE NOT NULL,
discussion_id INTEGER NOT NULL REFERENCES discussions(id),
project_id INTEGER NOT NULL REFERENCES projects(id),
note_type TEXT, -- 'DiscussionNote' | 'DiffNote' | null
is_system INTEGER NOT NULL DEFAULT 0, -- 1 for system notes (assignments, label changes)
author_username TEXT,
body TEXT,
created_at INTEGER, -- ms epoch UTC
updated_at INTEGER, -- ms epoch UTC
last_seen_at INTEGER NOT NULL, -- ms epoch UTC, updated on every upsert
position INTEGER, -- derived from array order in API response (0-indexed)
resolvable INTEGER,
resolved INTEGER,
resolved_by TEXT,
resolved_at INTEGER, -- ms epoch UTC
-- DiffNote position metadata (populated for MR DiffNotes in CP2)
position_old_path TEXT,
position_new_path TEXT,
position_old_line INTEGER,
position_new_line INTEGER,
raw_payload_id INTEGER REFERENCES raw_payloads(id)
);
CREATE INDEX idx_notes_discussion ON notes(discussion_id);
CREATE INDEX idx_notes_author ON notes(author_username);
CREATE INDEX idx_notes_system ON notes(is_system);
-- Update schema version
INSERT INTO schema_version (version, applied_at, description)
VALUES (2, strftime('%s', 'now') * 1000, 'Issues, labels, discussions, notes');
GitLab Types
Type Definitions
// src/gitlab/types.rs (additions)
use serde::Deserialize;
/// GitLab issue from the API.
#[derive(Debug, Clone, Deserialize)]
pub struct GitLabIssue {
pub id: i64, // GitLab global ID
pub iid: i64, // Project-scoped issue number
pub project_id: i64,
pub title: String,
pub description: Option<String>,
pub state: String, // "opened" | "closed"
pub created_at: String, // ISO 8601
pub updated_at: String, // ISO 8601
pub closed_at: Option<String>,
pub author: GitLabAuthor,
pub labels: Vec<String>, // Array of label names (CP1 canonical)
pub web_url: String,
// NOTE: labels_details is intentionally NOT modeled for CP1.
// The field name and shape varies across GitLab versions.
// Color/description can be fetched via Labels API if needed later.
}
#[derive(Debug, Clone, Deserialize)]
pub struct GitLabAuthor {
pub id: i64,
pub username: String,
pub name: String,
}
/// GitLab discussion (thread of notes).
#[derive(Debug, Clone, Deserialize)]
pub struct GitLabDiscussion {
pub id: String, // String ID like "6a9c1750b37d..."
pub individual_note: bool, // true = standalone comment
pub notes: Vec<GitLabNote>,
}
/// GitLab note (comment).
#[derive(Debug, Clone, Deserialize)]
pub struct GitLabNote {
pub id: i64,
#[serde(rename = "type")]
pub note_type: Option<String>, // "DiscussionNote" | "DiffNote" | null
pub body: String,
pub author: GitLabAuthor,
pub created_at: String, // ISO 8601
pub updated_at: String, // ISO 8601
pub system: bool, // true for system-generated notes
#[serde(default)]
pub resolvable: bool,
#[serde(default)]
pub resolved: bool,
pub resolved_by: Option<GitLabAuthor>,
pub resolved_at: Option<String>,
/// DiffNote specific (null for non-DiffNote)
pub position: Option<GitLabNotePosition>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct GitLabNotePosition {
pub old_path: Option<String>,
pub new_path: Option<String>,
pub old_line: Option<i32>,
pub new_line: Option<i32>,
}
Transformers
Issue Transformer
// src/gitlab/transformers/issue.rs
use crate::core::time::{iso_to_ms, now_ms};
use crate::gitlab::types::GitLabIssue;
/// Normalized issue ready for database insertion.
#[derive(Debug, Clone)]
pub struct NormalizedIssue {
pub gitlab_id: i64,
pub project_id: i64, // Local DB project ID
pub iid: i64,
pub title: String,
pub description: Option<String>,
pub state: String,
pub author_username: String,
pub created_at: i64, // ms epoch
pub updated_at: i64, // ms epoch
pub last_seen_at: i64, // ms epoch
pub web_url: String,
}
/// Normalized label ready for database insertion.
/// CP1: Name-only for stability.
#[derive(Debug, Clone)]
pub struct NormalizedLabel {
pub project_id: i64,
pub name: String,
}
/// Transform GitLab issue to normalized schema.
pub fn transform_issue(gitlab_issue: &GitLabIssue, local_project_id: i64) -> NormalizedIssue {
NormalizedIssue {
gitlab_id: gitlab_issue.id,
project_id: local_project_id,
iid: gitlab_issue.iid,
title: gitlab_issue.title.clone(),
description: gitlab_issue.description.clone(),
state: gitlab_issue.state.clone(),
author_username: gitlab_issue.author.username.clone(),
created_at: iso_to_ms(&gitlab_issue.created_at),
updated_at: iso_to_ms(&gitlab_issue.updated_at),
last_seen_at: now_ms(),
web_url: gitlab_issue.web_url.clone(),
}
}
/// Extract labels from GitLab issue (CP1: name-only).
pub fn extract_labels(gitlab_issue: &GitLabIssue, local_project_id: i64) -> Vec<NormalizedLabel> {
gitlab_issue
.labels
.iter()
.map(|name| NormalizedLabel {
project_id: local_project_id,
name: name.clone(),
})
.collect()
}
Discussion Transformer
// src/gitlab/transformers/discussion.rs
use crate::core::time::{iso_to_ms, now_ms};
use crate::gitlab::types::GitLabDiscussion;
/// Normalized discussion ready for database insertion.
#[derive(Debug, Clone)]
pub struct NormalizedDiscussion {
pub gitlab_discussion_id: String,
pub project_id: i64,
pub issue_id: i64,
pub noteable_type: String, // "Issue"
pub individual_note: bool,
pub first_note_at: Option<i64>,
pub last_note_at: Option<i64>,
pub last_seen_at: i64,
pub resolvable: bool,
pub resolved: bool,
}
/// Normalized note ready for database insertion.
#[derive(Debug, Clone)]
pub struct NormalizedNote {
pub gitlab_id: i64,
pub project_id: i64,
pub note_type: Option<String>,
pub is_system: bool,
pub author_username: String,
pub body: String,
pub created_at: i64,
pub updated_at: i64,
pub last_seen_at: i64,
pub position: i32, // Array index in notes[]
pub resolvable: bool,
pub resolved: bool,
pub resolved_by: Option<String>,
pub resolved_at: Option<i64>,
}
/// Transform GitLab discussion to normalized schema.
pub fn transform_discussion(
gitlab_discussion: &GitLabDiscussion,
local_project_id: i64,
local_issue_id: i64,
) -> NormalizedDiscussion {
let note_times: Vec<i64> = gitlab_discussion
.notes
.iter()
.map(|n| iso_to_ms(&n.created_at))
.collect();
// Check if any note is resolvable
let resolvable = gitlab_discussion.notes.iter().any(|n| n.resolvable);
let resolved = resolvable
&& gitlab_discussion
.notes
.iter()
.all(|n| !n.resolvable || n.resolved);
NormalizedDiscussion {
gitlab_discussion_id: gitlab_discussion.id.clone(),
project_id: local_project_id,
issue_id: local_issue_id,
noteable_type: "Issue".to_string(),
individual_note: gitlab_discussion.individual_note,
first_note_at: note_times.iter().min().copied(),
last_note_at: note_times.iter().max().copied(),
last_seen_at: now_ms(),
resolvable,
resolved,
}
}
/// Transform GitLab notes to normalized schema.
pub fn transform_notes(
gitlab_discussion: &GitLabDiscussion,
local_project_id: i64,
) -> Vec<NormalizedNote> {
gitlab_discussion
.notes
.iter()
.enumerate()
.map(|(index, note)| NormalizedNote {
gitlab_id: note.id,
project_id: local_project_id,
note_type: note.note_type.clone(),
is_system: note.system,
author_username: note.author.username.clone(),
body: note.body.clone(),
created_at: iso_to_ms(¬e.created_at),
updated_at: iso_to_ms(¬e.updated_at),
last_seen_at: now_ms(),
position: index as i32,
resolvable: note.resolvable,
resolved: note.resolved,
resolved_by: note.resolved_by.as_ref().map(|a| a.username.clone()),
resolved_at: note.resolved_at.as_ref().map(|s| iso_to_ms(s)),
})
.collect()
}
GitLab Client Additions
Pagination with Async Streams
// src/gitlab/client.rs (additions)
use crate::gitlab::types::{GitLabDiscussion, GitLabIssue};
use reqwest::header::HeaderMap;
use std::pin::Pin;
use futures::Stream;
impl GitLabClient {
/// Paginate through issues for a project.
/// Returns a stream of issues that handles pagination automatically.
pub fn paginate_issues(
&self,
gitlab_project_id: i64,
updated_after: Option<i64>,
cursor_rewind_seconds: u32,
) -> Pin<Box<dyn Stream<Item = Result<GitLabIssue>> + Send + '_>> {
Box::pin(async_stream::try_stream! {
let mut page = 1u32;
let per_page = 100u32;
loop {
let mut params = vec![
("scope", "all".to_string()),
("state", "all".to_string()),
("order_by", "updated_at".to_string()),
("sort", "asc".to_string()),
("per_page", per_page.to_string()),
("page", page.to_string()),
];
if let Some(ts) = updated_after {
// Apply cursor rewind for safety, clamping to 0 to avoid underflow
let rewind_ms = (cursor_rewind_seconds as i64) * 1000;
let rewound = (ts - rewind_ms).max(0);
if let Some(dt) = chrono::DateTime::from_timestamp_millis(rewound) {
params.push(("updated_after", dt.to_rfc3339()));
}
// If conversion fails (shouldn't happen with max(0)), omit the param
// and fetch all issues (safe fallback).
}
let (issues, headers) = self
.request_with_headers::<Vec<GitLabIssue>>(
&format!("/api/v4/projects/{gitlab_project_id}/issues"),
¶ms,
)
.await?;
for issue in issues.iter() {
yield issue.clone();
}
// Check for next page
let next_page = headers
.get("x-next-page")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u32>().ok());
match next_page {
Some(np) if !issues.is_empty() => page = np,
_ => break,
}
}
})
}
/// Paginate through discussions for an issue.
pub fn paginate_issue_discussions(
&self,
gitlab_project_id: i64,
issue_iid: i64,
) -> Pin<Box<dyn Stream<Item = Result<GitLabDiscussion>> + Send + '_>> {
Box::pin(async_stream::try_stream! {
let mut page = 1u32;
let per_page = 100u32;
loop {
let params = vec![
("per_page", per_page.to_string()),
("page", page.to_string()),
];
let (discussions, headers) = self
.request_with_headers::<Vec<GitLabDiscussion>>(
&format!("/api/v4/projects/{gitlab_project_id}/issues/{issue_iid}/discussions"),
¶ms,
)
.await?;
for discussion in discussions.iter() {
yield discussion.clone();
}
// Check for next page
let next_page = headers
.get("x-next-page")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u32>().ok());
match next_page {
Some(np) if !discussions.is_empty() => page = np,
_ => break,
}
}
})
}
/// Make request and return response with headers for pagination.
async fn request_with_headers<T: serde::de::DeserializeOwned>(
&self,
path: &str,
params: &[(&str, String)],
) -> Result<(T, HeaderMap)> {
self.rate_limiter.lock().await.acquire().await;
let url = format!("{}{}", self.base_url, path);
tracing::debug!(url = %url, "GitLab request");
let response = self
.client
.get(&url)
.header("PRIVATE-TOKEN", &self.token)
.query(params)
.send()
.await
.map_err(|e| GiError::GitLabNetworkError {
base_url: self.base_url.clone(),
source: Some(e),
})?;
let headers = response.headers().clone();
let data = self.handle_response(response, path).await?;
Ok((data, headers))
}
}
Note: Requires adding async-stream and futures to Cargo.toml:
# Cargo.toml additions
async-stream = "0.3"
futures = "0.3"
Orchestration: Dependent Discussion Sync
Canonical Pattern (CP1)
When gi ingest --type=issues runs, it follows this orchestration:
-
Ingest issues (cursor-based, with incremental cursor updates per page)
-
Collect touched issues - For each issue that passed cursor tuple filtering, record:
local_issue_idissue_iidissue_updated_atdiscussions_synced_for_updated_at(from DB)
-
Filter for discussion sync - Enqueue issues where:
issue.updated_at > issues.discussions_synced_for_updated_atThis prevents re-fetching discussions for issues that haven't changed, even with cursor rewind.
-
Execute discussion sync with bounded concurrency (
dependent_concurrencyfrom config) -
Update watermark - After each issue's discussions are successfully ingested:
UPDATE issues SET discussions_synced_for_updated_at = ? WHERE id = ?
Invariant: A rerun MUST NOT refetch discussions for issues whose updated_at has not advanced, even with cursor rewind.
Ingestion Logic
Runtime Strategy
Decision: Use single-threaded Tokio runtime (flavor = "current_thread") for CP1.
Rationale:
rusqlite::Connectionis!Send, which conflicts with multi-threaded runtimes- Single-threaded runtime avoids Send bounds entirely
- Concurrency for discussion fetches uses
tokio::task::spawn_local+LocalSet - Keeps code simple; can upgrade to channel-based DB writer in CP2 if needed
// src/main.rs
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
// ...
}
Issue Ingestion
// src/ingestion/issues.rs
use futures::StreamExt;
use rusqlite::Connection;
use tracing::{debug, info};
use crate::core::config::Config;
use crate::core::error::Result;
use crate::core::payloads::store_payload;
use crate::core::time::now_ms;
use crate::gitlab::client::GitLabClient;
use crate::gitlab::transformers::issue::{extract_labels, transform_issue};
/// Result of issue ingestion.
#[derive(Debug, Default)]
pub struct IngestIssuesResult {
pub fetched: usize,
pub upserted: usize,
pub labels_created: usize,
/// Issues that need discussion sync (updated_at advanced)
pub issues_needing_discussion_sync: Vec<IssueForDiscussionSync>,
}
/// Info needed to sync discussions for an issue.
#[derive(Debug, Clone)]
pub struct IssueForDiscussionSync {
pub local_issue_id: i64,
pub iid: i64,
pub updated_at: i64,
}
/// Ingest issues for a project.
/// Returns list of issues that need discussion sync.
pub async fn ingest_issues(
conn: &Connection,
client: &GitLabClient,
config: &Config,
project_id: i64, // Local DB project ID
gitlab_project_id: i64, // GitLab project ID
) -> Result<IngestIssuesResult> {
let mut result = IngestIssuesResult::default();
// Get current cursor
let cursor = get_cursor(conn, project_id)?;
let cursor_updated_at = cursor.0;
let cursor_gitlab_id = cursor.1;
let mut last_updated_at: Option<i64> = None;
let mut last_gitlab_id: Option<i64> = None;
let mut issues_in_page: Vec<(i64, i64, i64)> = Vec::new(); // (local_id, iid, updated_at)
// Fetch issues with pagination
let mut stream = client.paginate_issues(
gitlab_project_id,
cursor_updated_at,
config.sync.cursor_rewind_seconds,
);
while let Some(issue_result) = stream.next().await {
let issue = issue_result?;
result.fetched += 1;
let issue_updated_at = crate::core::time::iso_to_ms(&issue.updated_at);
// Apply cursor filtering for tuple semantics
if let (Some(cursor_ts), Some(cursor_id)) = (cursor_updated_at, cursor_gitlab_id) {
if issue_updated_at < cursor_ts {
continue;
}
if issue_updated_at == cursor_ts && issue.id <= cursor_id {
continue;
}
}
// Begin transaction for this issue (atomicity + performance)
let tx = conn.unchecked_transaction()?;
// Store raw payload
let payload_id = store_payload(
&tx,
project_id,
"issue",
&issue.id.to_string(),
&issue,
config.storage.compress_raw_payloads,
)?;
// Transform and upsert issue
let normalized = transform_issue(&issue, project_id);
let changes = upsert_issue(&tx, &normalized, payload_id)?;
if changes > 0 {
result.upserted += 1;
}
// Get local issue ID for label linking
let local_issue_id = get_local_issue_id(&tx, normalized.gitlab_id)?;
// Clear existing label links (ensures removed labels are unlinked)
clear_issue_labels(&tx, local_issue_id)?;
// Extract and upsert labels (name-only for CP1)
let labels = extract_labels(&issue, project_id);
for label in &labels {
let created = upsert_label(&tx, label)?;
if created {
result.labels_created += 1;
}
// Link issue to label
let label_id = get_label_id(&tx, project_id, &label.name)?;
link_issue_label(&tx, local_issue_id, label_id)?;
}
tx.commit()?;
// Track for discussion sync eligibility
issues_in_page.push((local_issue_id, issue.iid, issue_updated_at));
// Track for cursor update
last_updated_at = Some(issue_updated_at);
last_gitlab_id = Some(issue.id);
// Incremental cursor update every 100 issues (page boundary)
// This ensures crashes don't cause massive refetch
if result.fetched % 100 == 0 {
if let (Some(updated_at), Some(gitlab_id)) = (last_updated_at, last_gitlab_id) {
update_cursor(conn, project_id, "issues", updated_at, gitlab_id)?;
}
}
}
// Final cursor update
if let (Some(updated_at), Some(gitlab_id)) = (last_updated_at, last_gitlab_id) {
update_cursor(conn, project_id, "issues", updated_at, gitlab_id)?;
}
// Determine which issues need discussion sync (updated_at advanced)
for (local_issue_id, iid, updated_at) in issues_in_page {
let synced_at = get_discussions_synced_at(conn, local_issue_id)?;
if synced_at.is_none() || updated_at > synced_at.unwrap() {
result.issues_needing_discussion_sync.push(IssueForDiscussionSync {
local_issue_id,
iid,
updated_at,
});
}
}
info!(
project_id,
fetched = result.fetched,
upserted = result.upserted,
labels_created = result.labels_created,
need_discussion_sync = result.issues_needing_discussion_sync.len(),
"Issue ingestion complete"
);
Ok(result)
}
fn get_cursor(conn: &Connection, project_id: i64) -> Result<(Option<i64>, Option<i64>)> {
let mut stmt = conn.prepare(
"SELECT updated_at_cursor, tie_breaker_id FROM sync_cursors
WHERE project_id = ? AND resource_type = 'issues'"
)?;
let result = stmt.query_row([project_id], |row| {
Ok((row.get::<_, Option<i64>>(0)?, row.get::<_, Option<i64>>(1)?))
});
match result {
Ok(cursor) => Ok(cursor),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok((None, None)),
Err(e) => Err(e.into()),
}
}
fn get_discussions_synced_at(conn: &Connection, issue_id: i64) -> Result<Option<i64>> {
let result = conn.query_row(
"SELECT discussions_synced_for_updated_at FROM issues WHERE id = ?",
[issue_id],
|row| row.get(0),
);
match result {
Ok(ts) => Ok(ts),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
fn upsert_issue(
conn: &Connection,
issue: &crate::gitlab::transformers::issue::NormalizedIssue,
payload_id: Option<i64>,
) -> Result<usize> {
let changes = conn.execute(
"INSERT INTO issues (
gitlab_id, project_id, iid, title, description, state,
author_username, created_at, updated_at, last_seen_at, web_url, raw_payload_id
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
ON CONFLICT(gitlab_id) DO UPDATE SET
title = excluded.title,
description = excluded.description,
state = excluded.state,
updated_at = excluded.updated_at,
last_seen_at = excluded.last_seen_at,
raw_payload_id = excluded.raw_payload_id",
rusqlite::params![
issue.gitlab_id,
issue.project_id,
issue.iid,
issue.title,
issue.description,
issue.state,
issue.author_username,
issue.created_at,
issue.updated_at,
issue.last_seen_at,
issue.web_url,
payload_id,
],
)?;
Ok(changes)
}
fn get_local_issue_id(conn: &Connection, gitlab_id: i64) -> Result<i64> {
Ok(conn.query_row(
"SELECT id FROM issues WHERE gitlab_id = ?",
[gitlab_id],
|row| row.get(0),
)?)
}
fn clear_issue_labels(conn: &Connection, issue_id: i64) -> Result<()> {
conn.execute("DELETE FROM issue_labels WHERE issue_id = ?", [issue_id])?;
Ok(())
}
fn upsert_label(
conn: &Connection,
label: &crate::gitlab::transformers::issue::NormalizedLabel,
) -> Result<bool> {
// CP1: Name-only labels. Color/description columns remain NULL.
let changes = conn.execute(
"INSERT INTO labels (project_id, name)
VALUES (?1, ?2)
ON CONFLICT(project_id, name) DO NOTHING",
rusqlite::params![label.project_id, label.name],
)?;
Ok(changes > 0)
}
fn get_label_id(conn: &Connection, project_id: i64, name: &str) -> Result<i64> {
Ok(conn.query_row(
"SELECT id FROM labels WHERE project_id = ? AND name = ?",
rusqlite::params![project_id, name],
|row| row.get(0),
)?)
}
fn link_issue_label(conn: &Connection, issue_id: i64, label_id: i64) -> Result<()> {
conn.execute(
"INSERT OR IGNORE INTO issue_labels (issue_id, label_id) VALUES (?, ?)",
[issue_id, label_id],
)?;
Ok(())
}
fn update_cursor(
conn: &Connection,
project_id: i64,
resource_type: &str,
updated_at: i64,
gitlab_id: i64,
) -> Result<()> {
conn.execute(
"INSERT INTO sync_cursors (project_id, resource_type, updated_at_cursor, tie_breaker_id)
VALUES (?1, ?2, ?3, ?4)
ON CONFLICT(project_id, resource_type) DO UPDATE SET
updated_at_cursor = excluded.updated_at_cursor,
tie_breaker_id = excluded.tie_breaker_id",
rusqlite::params![project_id, resource_type, updated_at, gitlab_id],
)?;
Ok(())
}
Discussion Ingestion
// src/ingestion/discussions.rs
use futures::StreamExt;
use rusqlite::Connection;
use tracing::debug;
use crate::core::config::Config;
use crate::core::error::Result;
use crate::core::payloads::store_payload;
use crate::gitlab::client::GitLabClient;
use crate::gitlab::transformers::discussion::{transform_discussion, transform_notes};
/// Result of discussion ingestion for a single issue.
#[derive(Debug, Default)]
pub struct IngestDiscussionsResult {
pub discussions_fetched: usize,
pub discussions_upserted: usize,
pub notes_upserted: usize,
pub system_notes_count: usize,
}
/// Ingest discussions for a single issue.
/// Called only when issue.updated_at > discussions_synced_for_updated_at.
pub async fn ingest_issue_discussions(
conn: &Connection,
client: &GitLabClient,
config: &Config,
project_id: i64,
gitlab_project_id: i64,
issue_iid: i64,
local_issue_id: i64,
issue_updated_at: i64,
) -> Result<IngestDiscussionsResult> {
let mut result = IngestDiscussionsResult::default();
let mut stream = client.paginate_issue_discussions(gitlab_project_id, issue_iid);
while let Some(discussion_result) = stream.next().await {
let discussion = discussion_result?;
result.discussions_fetched += 1;
// Begin transaction for this discussion (atomicity + performance)
let tx = conn.unchecked_transaction()?;
// Store raw payload for discussion
let discussion_payload_id = store_payload(
&tx,
project_id,
"discussion",
&discussion.id,
&discussion,
config.storage.compress_raw_payloads,
)?;
// Transform and upsert discussion
let normalized = transform_discussion(&discussion, project_id, local_issue_id);
upsert_discussion(&tx, &normalized, discussion_payload_id)?;
result.discussions_upserted += 1;
// Get local discussion ID
let local_discussion_id = get_local_discussion_id(&tx, project_id, &discussion.id)?;
// Transform and upsert notes
let notes = transform_notes(&discussion, project_id);
for note in ¬es {
// Store raw payload for note
let gitlab_note = discussion.notes.iter().find(|n| n.id == note.gitlab_id);
let note_payload_id = if let Some(gn) = gitlab_note {
store_payload(
&tx,
project_id,
"note",
¬e.gitlab_id.to_string(),
gn,
config.storage.compress_raw_payloads,
)?
} else {
None
};
upsert_note(&tx, local_discussion_id, note, note_payload_id)?;
result.notes_upserted += 1;
if note.is_system {
result.system_notes_count += 1;
}
}
tx.commit()?;
}
// Mark discussions as synced for this issue version
mark_discussions_synced(conn, local_issue_id, issue_updated_at)?;
debug!(
project_id,
issue_iid,
discussions = result.discussions_fetched,
notes = result.notes_upserted,
"Issue discussions ingested"
);
Ok(result)
}
fn upsert_discussion(
conn: &Connection,
discussion: &crate::gitlab::transformers::discussion::NormalizedDiscussion,
payload_id: Option<i64>,
) -> Result<()> {
conn.execute(
"INSERT INTO discussions (
gitlab_discussion_id, project_id, issue_id, noteable_type,
individual_note, first_note_at, last_note_at, last_seen_at,
resolvable, resolved, raw_payload_id
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
ON CONFLICT(project_id, gitlab_discussion_id) DO UPDATE SET
first_note_at = excluded.first_note_at,
last_note_at = excluded.last_note_at,
last_seen_at = excluded.last_seen_at,
resolvable = excluded.resolvable,
resolved = excluded.resolved,
raw_payload_id = excluded.raw_payload_id",
rusqlite::params![
discussion.gitlab_discussion_id,
discussion.project_id,
discussion.issue_id,
discussion.noteable_type,
discussion.individual_note as i32,
discussion.first_note_at,
discussion.last_note_at,
discussion.last_seen_at,
discussion.resolvable as i32,
discussion.resolved as i32,
payload_id,
],
)?;
Ok(())
}
fn get_local_discussion_id(conn: &Connection, project_id: i64, gitlab_id: &str) -> Result<i64> {
Ok(conn.query_row(
"SELECT id FROM discussions WHERE project_id = ? AND gitlab_discussion_id = ?",
rusqlite::params![project_id, gitlab_id],
|row| row.get(0),
)?)
}
fn upsert_note(
conn: &Connection,
discussion_id: i64,
note: &crate::gitlab::transformers::discussion::NormalizedNote,
payload_id: Option<i64>,
) -> Result<()> {
conn.execute(
"INSERT INTO notes (
gitlab_id, discussion_id, project_id, note_type, is_system,
author_username, body, created_at, updated_at, last_seen_at,
position, resolvable, resolved, resolved_by, resolved_at,
raw_payload_id
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)
ON CONFLICT(gitlab_id) DO UPDATE SET
body = excluded.body,
updated_at = excluded.updated_at,
last_seen_at = excluded.last_seen_at,
resolved = excluded.resolved,
resolved_by = excluded.resolved_by,
resolved_at = excluded.resolved_at,
raw_payload_id = excluded.raw_payload_id",
rusqlite::params![
note.gitlab_id,
discussion_id,
note.project_id,
note.note_type,
note.is_system as i32,
note.author_username,
note.body,
note.created_at,
note.updated_at,
note.last_seen_at,
note.position,
note.resolvable as i32,
note.resolved as i32,
note.resolved_by,
note.resolved_at,
payload_id,
],
)?;
Ok(())
}
fn mark_discussions_synced(conn: &Connection, issue_id: i64, issue_updated_at: i64) -> Result<()> {
conn.execute(
"UPDATE issues SET discussions_synced_for_updated_at = ? WHERE id = ?",
rusqlite::params![issue_updated_at, issue_id],
)?;
Ok(())
}
CLI Commands
gi ingest --type=issues
Fetch and store all issues from configured projects.
Clap Definition:
// src/cli/mod.rs (addition to Commands enum)
#[derive(Subcommand)]
pub enum Commands {
// ... existing commands ...
/// Ingest data from GitLab
Ingest {
/// Resource type to ingest
#[arg(long, value_parser = ["issues", "merge_requests"])]
r#type: String,
/// Filter to single project
#[arg(long)]
project: Option<String>,
/// Override stale sync lock
#[arg(long)]
force: bool,
},
/// List entities
List {
/// Entity type to list
#[arg(value_parser = ["issues", "mrs"])]
entity: String,
/// Maximum results
#[arg(long, default_value = "20")]
limit: usize,
/// Filter by project path
#[arg(long)]
project: Option<String>,
/// Filter by state
#[arg(long, value_parser = ["opened", "closed", "all"])]
state: Option<String>,
},
/// Count entities
Count {
/// Entity type to count
#[arg(value_parser = ["issues", "mrs", "discussions", "notes"])]
entity: String,
/// Filter by noteable type
#[arg(long, value_parser = ["issue", "mr"])]
r#type: Option<String>,
},
/// Show entity details
Show {
/// Entity type
#[arg(value_parser = ["issue", "mr"])]
entity: String,
/// Entity IID
iid: i64,
/// Project path (required if ambiguous)
#[arg(long)]
project: Option<String>,
},
}
Output:
Ingesting issues...
group/project-one: 1,234 issues fetched, 45 new labels
Fetching discussions (312 issues with updates)...
group/project-one: 312 issues → 1,234 discussions, 5,678 notes
Total: 1,234 issues, 1,234 discussions, 5,678 notes (excluding 1,234 system notes)
Skipped discussion sync for 922 unchanged issues.
gi list issues
Output:
Issues (showing 20 of 3,801)
#1234 Authentication redesign opened @johndoe 3 days ago
#1233 Fix memory leak in cache closed @janedoe 5 days ago
#1232 Add dark mode support opened @bobsmith 1 week ago
...
gi count issues
Output:
Issues: 3,801
gi show issue <iid>
Output:
Issue #1234: Authentication redesign
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Project: group/project-one
State: opened
Author: @johndoe
Created: 2024-01-15
Updated: 2024-03-20
Labels: enhancement, auth
URL: https://gitlab.example.com/group/project-one/-/issues/1234
Description:
We need to redesign the authentication flow to support...
Discussions (5):
@janedoe (2024-01-16):
I agree we should move to JWT-based auth...
@johndoe (2024-01-16):
What about refresh token strategy?
@bobsmith (2024-01-17):
Have we considered OAuth2?
Automated Tests
Unit Tests
// tests/issue_transformer_tests.rs
#[cfg(test)]
mod tests {
use gi::gitlab::transformers::issue::*;
use gi::gitlab::types::*;
#[test]
fn transforms_gitlab_issue_to_normalized_schema() { /* ... */ }
#[test]
fn extracts_labels_from_issue_payload() { /* ... */ }
#[test]
fn handles_missing_optional_fields_gracefully() { /* ... */ }
#[test]
fn converts_iso_timestamps_to_ms_epoch() { /* ... */ }
#[test]
fn sets_last_seen_at_to_current_time() { /* ... */ }
}
// tests/discussion_transformer_tests.rs
#[cfg(test)]
mod tests {
use gi::gitlab::transformers::discussion::*;
#[test]
fn transforms_discussion_payload_to_normalized_schema() { /* ... */ }
#[test]
fn extracts_notes_array_from_discussion() { /* ... */ }
#[test]
fn sets_individual_note_flag_correctly() { /* ... */ }
#[test]
fn flags_system_notes_with_is_system_true() { /* ... */ }
#[test]
fn preserves_note_order_via_position_field() { /* ... */ }
#[test]
fn computes_first_note_at_and_last_note_at_correctly() { /* ... */ }
#[test]
fn computes_resolvable_and_resolved_status() { /* ... */ }
}
// tests/pagination_tests.rs
#[cfg(test)]
mod tests {
#[tokio::test]
async fn fetches_all_pages_when_multiple_exist() { /* ... */ }
#[tokio::test]
async fn respects_per_page_parameter() { /* ... */ }
#[tokio::test]
async fn follows_x_next_page_header_until_empty() { /* ... */ }
#[tokio::test]
async fn falls_back_to_empty_page_stop_if_headers_missing() { /* ... */ }
#[tokio::test]
async fn applies_cursor_rewind_for_tuple_semantics() { /* ... */ }
#[tokio::test]
async fn clamps_negative_rewind_to_zero() { /* ... */ }
}
// tests/label_linkage_tests.rs
#[cfg(test)]
mod tests {
#[test]
fn clears_existing_labels_before_linking_new_set() { /* ... */ }
#[test]
fn removes_stale_label_links_on_issue_update() { /* ... */ }
#[test]
fn handles_issue_with_all_labels_removed() { /* ... */ }
#[test]
fn preserves_labels_that_still_exist() { /* ... */ }
}
// tests/discussion_watermark_tests.rs
#[cfg(test)]
mod tests {
#[tokio::test]
async fn skips_discussion_fetch_when_updated_at_unchanged() { /* ... */ }
#[tokio::test]
async fn fetches_discussions_when_updated_at_advanced() { /* ... */ }
#[tokio::test]
async fn updates_watermark_after_successful_discussion_sync() { /* ... */ }
#[tokio::test]
async fn does_not_update_watermark_on_discussion_sync_failure() { /* ... */ }
}
Integration Tests
// tests/issue_ingestion_tests.rs
#[cfg(test)]
mod tests {
use tempfile::TempDir;
use wiremock::{MockServer, Mock, ResponseTemplate};
use wiremock::matchers::{method, path_regex};
#[tokio::test]
async fn inserts_issues_into_database() { /* ... */ }
#[tokio::test]
async fn creates_labels_from_issue_payloads() { /* ... */ }
#[tokio::test]
async fn links_issues_to_labels_via_junction_table() { /* ... */ }
#[tokio::test]
async fn removes_stale_label_links_on_resync() { /* ... */ }
#[tokio::test]
async fn stores_raw_payload_for_each_issue() { /* ... */ }
#[tokio::test]
async fn stores_raw_payload_for_each_discussion() { /* ... */ }
#[tokio::test]
async fn updates_cursor_incrementally_per_page() { /* ... */ }
#[tokio::test]
async fn resumes_from_cursor_on_subsequent_runs() { /* ... */ }
#[tokio::test]
async fn handles_issues_with_no_labels() { /* ... */ }
#[tokio::test]
async fn upserts_existing_issues_on_refetch() { /* ... */ }
#[tokio::test]
async fn skips_discussion_refetch_for_unchanged_issues() { /* ... */ }
}
Manual Smoke Tests
| Command | Expected Output | Pass Criteria |
|---|---|---|
gi ingest --type=issues |
Progress bar, final count | Completes without error |
gi list issues --limit=10 |
Table of 10 issues | Shows iid, title, state, author |
gi list issues --project=group/project-one |
Filtered list | Only shows issues from that project |
gi count issues |
Issues: N |
Count matches GitLab UI |
gi show issue 123 |
Issue detail view | Shows title, description, labels, discussions, URL |
gi show issue 123 (ambiguous) |
Prompt or error | Asks for --project clarification |
gi count discussions --type=issue |
Issue Discussions: N |
Non-zero count |
gi count notes --type=issue |
Issue Notes: N (excluding M system) |
Non-zero count |
gi sync-status |
Last sync time, cursor positions | Shows successful last run |
gi ingest --type=issues (re-run) |
0 new issues |
Cursor prevents re-fetch |
gi ingest --type=issues (re-run) |
Skipped discussion sync for N unchanged issues |
Watermark prevents refetch |
gi ingest --type=issues (concurrent) |
Lock error | Second run fails with clear message |
| Remove label from issue in GitLab, re-sync | Label link removed | Junction table reflects GitLab state |
Data Integrity Checks
After successful ingestion, verify:
SELECT COUNT(*) FROM issuesmatches GitLab issue count for configured projects- Every issue has a corresponding
raw_payloadsrow - Every discussion has a corresponding
raw_payloadsrow - Labels in
issue_labelsjunction all exist inlabelstable issue_labelscount per issue matches GitLab UI label countsync_cursorshas entry for each(project_id, 'issues')pair- Re-running
gi ingest --type=issuesfetches 0 new items (cursor is current) - Re-running skips discussion sync for unchanged issues (watermark works)
SELECT COUNT(*) FROM discussions WHERE noteable_type='Issue'is non-zero- Every discussion has at least one note
individual_note = 1discussions have exactly one noteSELECT COUNT(*) FROM notes WHERE is_system = 1matches system note count in CLI output- After removing a label in GitLab and re-syncing, the link is removed from
issue_labels
Definition of Done
Gate A: Issues Only (Must Pass First)
gi ingest --type=issuesfetches all issues from configured projects- Issues stored with correct schema, including
last_seen_at - Cursor-based sync is resumable (re-run fetches only new/updated)
- Incremental cursor updates every 100 issues
- Raw payloads stored for each issue
gi list issuesandgi count issueswork
Gate B: Labels Correct (Must Pass)
- Labels extracted and stored (name-only)
- Label links created correctly
- Stale label links removed on re-sync (verified with test)
- Label count per issue matches GitLab
Gate C: Dependent Discussion Sync (Must Pass)
- Discussions fetched for issues with
updated_atadvancement - Notes stored with
is_systemflag correctly set - Raw payloads stored for discussions and notes
discussions_synced_for_updated_atwatermark updated after sync- Unchanged issues skip discussion refetch (verified with test)
- Bounded concurrency (
dependent_concurrencyrespected)
Gate D: Resumability Proof (Must Pass)
- Kill mid-run, rerun; bounded redo (cursor progress preserved)
- No redundant discussion refetch after crash recovery
- Single-flight lock prevents concurrent runs
Final Gate (Must Pass)
- All unit tests pass (
cargo test) - All integration tests pass (mocked with wiremock)
cargo clippypasses with no warningscargo fmt --checkpasses- Compiles with
--release
Hardening (Optional Before CP2)
- Edge cases: issues with 0 labels, 0 discussions
- Large pagination (100+ pages)
- Rate limit handling under sustained load
- Live tests pass against real GitLab instance
- Performance: 1000+ issues ingested in <5 min
Implementation Order
-
Runtime decision (5 min)
- Confirm
#[tokio::main(flavor = "current_thread")] - Add note about upgrade path for CP2 if needed
- Confirm
-
Cargo.toml updates (5 min)
- Add
async-stream = "0.3"andfutures = "0.3"
- Add
-
Database migration (15 min)
migrations/002_issues.sqlwithdiscussions_synced_for_updated_atcolumnraw_payload_idon discussions table- Update
MIGRATIONSconst insrc/core/db.rs
-
GitLab types (15 min)
- Add types to
src/gitlab/types.rs(nolabels_details) - Test deserialization with fixtures
- Add types to
-
Transformers (25 min)
src/gitlab/transformers/mod.rssrc/gitlab/transformers/issue.rs(simplified NormalizedLabel)src/gitlab/transformers/discussion.rs- Unit tests
-
GitLab client pagination (25 min)
- Add
paginate_issues()with underflow protection - Add
paginate_issue_discussions() - Add
request_with_headers()helper
- Add
-
Issue ingestion (45 min)
src/ingestion/mod.rssrc/ingestion/issues.rswith:- Transaction batching
clear_issue_labels()before linking- Incremental cursor updates
- Return
issues_needing_discussion_sync
- Unit + integration tests including label stale-link removal
-
Discussion ingestion (30 min)
src/ingestion/discussions.rswith:- Transaction batching
raw_payload_idstoragemark_discussions_synced()watermark update
- Integration tests including watermark behavior
-
Orchestrator (30 min)
src/ingestion/orchestrator.rs- Coordinates issue sync → filter for discussion needs → bounded discussion sync
- Integration tests
-
CLI commands (45 min)
gi ingest --type=issuesgi list issuesgi count issues|discussions|notesgi show issue <iid>- Enhanced
gi sync-status
-
Final validation (20 min)
cargo testcargo clippy- Gate A/B/C/D verification
- Manual smoke tests
- Data integrity checks
Risks & Mitigations
| Risk | Mitigation |
|---|---|
| GitLab rate limiting during large sync | Respect Retry-After, exponential backoff, configurable concurrency |
| Discussion API N+1 problem (thousands of calls) | dependent_concurrency config limits parallel requests; watermark prevents refetch |
| Cursor drift if GitLab timestamp behavior changes | Rolling backfill window catches missed items |
| Large issues with 100+ discussions | Paginate discussions, bound memory usage |
| System notes pollute data | is_system flag allows filtering |
| Label deduplication across projects | Unique constraint on (project_id, name) |
| Stale label links accumulate | clear_issue_labels() before linking ensures correctness |
| Async stream complexity | Use async-stream crate for ergonomic generators |
| rusqlite + async runtime Send/locking pitfalls | Single-threaded runtime (current_thread) avoids Send bounds |
| Crash causes massive refetch | Incremental cursor updates every 100 issues |
| Cursor rewind causes discussion refetch | Per-issue watermark (discussions_synced_for_updated_at) |
| Timestamp underflow on rewind | Clamp to 0 with .max(0) |
API Call Estimation
For a project with 3,000 issues:
- Issue list:
ceil(3000/100) = 30calls - Issue discussions (first run):
3000 × 1.2 average pages = 3,600calls - Issue discussions (subsequent runs, 10% updates):
300 × 1.2 = 360calls - Total first run: ~3,630 calls per project
- Total subsequent run: ~390 calls per project (90% savings from watermark)
At 10 requests/second:
- First run: ~6 minutes per project
- Subsequent run: ~40 seconds per project
References
- SPEC.md - Full system specification
- checkpoint-0.md - Project setup PRD
- GitLab Issues API
- GitLab Discussions API
- async-stream crate - Async generators for Rust
- wiremock - HTTP mocking for tests