Files
gitlore/docs/prd/checkpoint-1.md
2026-01-28 15:49:14 -05:00

54 KiB
Raw Blame History

Checkpoint 1: Issue Ingestion - PRD

Note: The project was renamed from "gitlab-inbox" to "gitlore" and the CLI from "gi" to "lore". References to "gi" in this document should be read as "lore".

Version: 2.0 Status: Ready for Implementation Depends On: Checkpoint 0 (Project Setup) Enables: Checkpoint 2 (MR Ingestion)


Overview

Objective

Ingest all issues, labels, and issue discussions from configured GitLab repositories with resumable cursor-based incremental sync. This checkpoint establishes the core data ingestion pattern that will be reused for MRs in Checkpoint 2.

Success Criteria

Criterion Validation
gi ingest --type=issues fetches all issues gi count issues matches GitLab UI
Labels extracted from issue payloads (name-only) labels table populated
Label linkage reflects current GitLab state Removed labels are unlinked on re-sync
Issue discussions fetched per-issue (dependent sync) For issues whose updated_at advanced, discussions and notes upserted
Cursor-based sync is resumable Re-running fetches 0 new items
Discussion sync skips unchanged issues Per-issue watermark prevents redundant fetches
Sync tracking records all runs sync_runs table has complete audit trail
Single-flight lock prevents concurrent runs Second sync fails with clear error

Internal Gates

CP1 is validated incrementally via internal gates:

Gate Scope Validation
Gate A Issues only Cursor + upsert + raw payloads + list/count/show working
Gate B Labels correct Stale-link removal verified; label count matches GitLab
Gate C Dependent discussion sync Watermark prevents redundant refetch; concurrency bounded
Gate D Resumability proof Kill mid-run, rerun; confirm bounded redo and no redundant discussion refetch

Deliverables

1. Project Structure Additions

Add the following to the existing Rust structure from Checkpoint 0:

gitlab-inbox/
├── src/
│   ├── cli/
│   │   └── commands/
│   │       ├── ingest.rs           # gi ingest --type=issues|merge_requests
│   │       ├── list.rs             # gi list issues|mrs
│   │       ├── count.rs            # gi count issues|mrs|discussions|notes
│   │       └── show.rs             # gi show issue|mr <iid>
│   ├── gitlab/
│   │   ├── types.rs                # Add GitLabIssue, GitLabDiscussion, GitLabNote
│   │   └── transformers/
│   │       ├── mod.rs
│   │       ├── issue.rs            # GitLab → normalized issue
│   │       └── discussion.rs       # GitLab → normalized discussion/notes
│   └── ingestion/
│       ├── mod.rs
│       ├── orchestrator.rs         # Coordinates issue + dependent discussion sync
│       ├── issues.rs               # Issue fetcher with pagination
│       └── discussions.rs          # Discussion fetcher (per-issue)
├── tests/
│   ├── issue_transformer_tests.rs
│   ├── discussion_transformer_tests.rs
│   ├── pagination_tests.rs
│   ├── issue_ingestion_tests.rs
│   ├── label_linkage_tests.rs      # Verifies stale link removal
│   ├── discussion_watermark_tests.rs
│   └── fixtures/
│       ├── gitlab_issue.json
│       ├── gitlab_issues_page.json
│       ├── gitlab_discussion.json
│       └── gitlab_discussions_page.json
└── migrations/
    └── 002_issues.sql

2. GitLab API Endpoints

Issues (Bulk Fetch):

GET /projects/:id/issues?scope=all&state=all&updated_after=X&order_by=updated_at&sort=asc&per_page=100

Issue Discussions (Per-Issue Fetch):

GET /projects/:id/issues/:iid/discussions?per_page=100&page=N

Required Query Parameters:

  • scope=all - Include all issues, not just authored by current user
  • state=all - Include closed issues (GitLab may default to open only)

MVP Note (Labels):

  • CP1 stores labels by name only for maximum compatibility and stability.
  • Label color/description ingestion is deferred (post-CP1) via Labels API if needed.
  • This avoids relying on optional/variant payload shapes that differ across GitLab versions.

Pagination:

  • Follow x-next-page header until empty/absent
  • Fall back to empty-page detection if headers missing (robustness)
  • Per-page maximum: 100

Database Schema

Migration 002_issues.sql

-- Issues table
CREATE TABLE issues (
  id INTEGER PRIMARY KEY,
  gitlab_id INTEGER UNIQUE NOT NULL,
  project_id INTEGER NOT NULL REFERENCES projects(id),
  iid INTEGER NOT NULL,
  title TEXT,
  description TEXT,
  state TEXT,                         -- 'opened' | 'closed'
  author_username TEXT,
  created_at INTEGER,                 -- ms epoch UTC
  updated_at INTEGER,                 -- ms epoch UTC
  last_seen_at INTEGER NOT NULL,      -- ms epoch UTC, updated on every upsert
  -- Prevents re-fetching discussions on cursor rewind / reruns unless issue changed.
  -- Set to issue.updated_at after successfully syncing all discussions for this issue.
  discussions_synced_for_updated_at INTEGER,
  web_url TEXT,
  raw_payload_id INTEGER REFERENCES raw_payloads(id)
);
CREATE INDEX idx_issues_project_updated ON issues(project_id, updated_at);
CREATE INDEX idx_issues_author ON issues(author_username);
CREATE INDEX idx_issues_discussions_sync ON issues(project_id, discussions_synced_for_updated_at);
CREATE UNIQUE INDEX uq_issues_project_iid ON issues(project_id, iid);

-- Labels (derived from issue payloads)
-- CP1: Name-only for stability. Color/description deferred to Labels API integration.
-- Uniqueness is (project_id, name) since gitlab_id isn't always available.
CREATE TABLE labels (
  id INTEGER PRIMARY KEY,
  gitlab_id INTEGER,                  -- optional (populated if Labels API used later)
  project_id INTEGER NOT NULL REFERENCES projects(id),
  name TEXT NOT NULL,
  color TEXT,                         -- nullable, populated later if needed
  description TEXT                    -- nullable, populated later if needed
);
CREATE UNIQUE INDEX uq_labels_project_name ON labels(project_id, name);
CREATE INDEX idx_labels_name ON labels(name);

-- Issue-Label junction
-- IMPORTANT: On issue update, DELETE existing links then INSERT current set.
-- This ensures removed labels are unlinked (not just added).
CREATE TABLE issue_labels (
  issue_id INTEGER REFERENCES issues(id) ON DELETE CASCADE,
  label_id INTEGER REFERENCES labels(id) ON DELETE CASCADE,
  PRIMARY KEY(issue_id, label_id)
);
CREATE INDEX idx_issue_labels_label ON issue_labels(label_id);

-- Discussion threads for issues
CREATE TABLE discussions (
  id INTEGER PRIMARY KEY,
  gitlab_discussion_id TEXT NOT NULL, -- GitLab's string ID (e.g., "6a9c1750b37d...")
  project_id INTEGER NOT NULL REFERENCES projects(id),
  issue_id INTEGER REFERENCES issues(id),
  merge_request_id INTEGER,           -- FK added in CP2 via ALTER TABLE
  noteable_type TEXT NOT NULL,        -- 'Issue' | 'MergeRequest'
  individual_note INTEGER NOT NULL,   -- 1 = standalone comment, 0 = threaded
  first_note_at INTEGER,              -- ms epoch UTC, for ordering discussions
  last_note_at INTEGER,               -- ms epoch UTC, for "recently active" queries
  last_seen_at INTEGER NOT NULL,      -- ms epoch UTC, updated on every upsert
  resolvable INTEGER,                 -- MR discussions can be resolved
  resolved INTEGER,
  raw_payload_id INTEGER REFERENCES raw_payloads(id),
  CHECK (
    (noteable_type='Issue' AND issue_id IS NOT NULL AND merge_request_id IS NULL) OR
    (noteable_type='MergeRequest' AND merge_request_id IS NOT NULL AND issue_id IS NULL)
  )
);
CREATE UNIQUE INDEX uq_discussions_project_discussion_id ON discussions(project_id, gitlab_discussion_id);
CREATE INDEX idx_discussions_issue ON discussions(issue_id);
CREATE INDEX idx_discussions_mr ON discussions(merge_request_id);
CREATE INDEX idx_discussions_last_note ON discussions(last_note_at);

-- Notes belong to discussions (preserving thread context)
CREATE TABLE notes (
  id INTEGER PRIMARY KEY,
  gitlab_id INTEGER UNIQUE NOT NULL,
  discussion_id INTEGER NOT NULL REFERENCES discussions(id),
  project_id INTEGER NOT NULL REFERENCES projects(id),
  note_type TEXT,                     -- 'DiscussionNote' | 'DiffNote' | null
  is_system INTEGER NOT NULL DEFAULT 0, -- 1 for system notes (assignments, label changes)
  author_username TEXT,
  body TEXT,
  created_at INTEGER,                 -- ms epoch UTC
  updated_at INTEGER,                 -- ms epoch UTC
  last_seen_at INTEGER NOT NULL,      -- ms epoch UTC, updated on every upsert
  position INTEGER,                   -- derived from array order in API response (0-indexed)
  resolvable INTEGER,
  resolved INTEGER,
  resolved_by TEXT,
  resolved_at INTEGER,                -- ms epoch UTC
  -- DiffNote position metadata (populated for MR DiffNotes in CP2)
  position_old_path TEXT,
  position_new_path TEXT,
  position_old_line INTEGER,
  position_new_line INTEGER,
  raw_payload_id INTEGER REFERENCES raw_payloads(id)
);
CREATE INDEX idx_notes_discussion ON notes(discussion_id);
CREATE INDEX idx_notes_author ON notes(author_username);
CREATE INDEX idx_notes_system ON notes(is_system);

-- Update schema version
INSERT INTO schema_version (version, applied_at, description)
VALUES (2, strftime('%s', 'now') * 1000, 'Issues, labels, discussions, notes');

GitLab Types

Type Definitions

// src/gitlab/types.rs (additions)

use serde::Deserialize;

/// GitLab issue from the API.
#[derive(Debug, Clone, Deserialize)]
pub struct GitLabIssue {
    pub id: i64,                      // GitLab global ID
    pub iid: i64,                     // Project-scoped issue number
    pub project_id: i64,
    pub title: String,
    pub description: Option<String>,
    pub state: String,                // "opened" | "closed"
    pub created_at: String,           // ISO 8601
    pub updated_at: String,           // ISO 8601
    pub closed_at: Option<String>,
    pub author: GitLabAuthor,
    pub labels: Vec<String>,          // Array of label names (CP1 canonical)
    pub web_url: String,
    // NOTE: labels_details is intentionally NOT modeled for CP1.
    // The field name and shape varies across GitLab versions.
    // Color/description can be fetched via Labels API if needed later.
}

#[derive(Debug, Clone, Deserialize)]
pub struct GitLabAuthor {
    pub id: i64,
    pub username: String,
    pub name: String,
}

/// GitLab discussion (thread of notes).
#[derive(Debug, Clone, Deserialize)]
pub struct GitLabDiscussion {
    pub id: String,                   // String ID like "6a9c1750b37d..."
    pub individual_note: bool,        // true = standalone comment
    pub notes: Vec<GitLabNote>,
}

/// GitLab note (comment).
#[derive(Debug, Clone, Deserialize)]
pub struct GitLabNote {
    pub id: i64,
    #[serde(rename = "type")]
    pub note_type: Option<String>,    // "DiscussionNote" | "DiffNote" | null
    pub body: String,
    pub author: GitLabAuthor,
    pub created_at: String,           // ISO 8601
    pub updated_at: String,           // ISO 8601
    pub system: bool,                 // true for system-generated notes
    #[serde(default)]
    pub resolvable: bool,
    #[serde(default)]
    pub resolved: bool,
    pub resolved_by: Option<GitLabAuthor>,
    pub resolved_at: Option<String>,
    /// DiffNote specific (null for non-DiffNote)
    pub position: Option<GitLabNotePosition>,
}

#[derive(Debug, Clone, Deserialize)]
pub struct GitLabNotePosition {
    pub old_path: Option<String>,
    pub new_path: Option<String>,
    pub old_line: Option<i32>,
    pub new_line: Option<i32>,
}

Transformers

Issue Transformer

// src/gitlab/transformers/issue.rs

use crate::core::time::{iso_to_ms, now_ms};
use crate::gitlab::types::GitLabIssue;

/// Normalized issue ready for database insertion.
#[derive(Debug, Clone)]
pub struct NormalizedIssue {
    pub gitlab_id: i64,
    pub project_id: i64,              // Local DB project ID
    pub iid: i64,
    pub title: String,
    pub description: Option<String>,
    pub state: String,
    pub author_username: String,
    pub created_at: i64,              // ms epoch
    pub updated_at: i64,              // ms epoch
    pub last_seen_at: i64,            // ms epoch
    pub web_url: String,
}

/// Normalized label ready for database insertion.
/// CP1: Name-only for stability.
#[derive(Debug, Clone)]
pub struct NormalizedLabel {
    pub project_id: i64,
    pub name: String,
}

/// Transform GitLab issue to normalized schema.
pub fn transform_issue(gitlab_issue: &GitLabIssue, local_project_id: i64) -> NormalizedIssue {
    NormalizedIssue {
        gitlab_id: gitlab_issue.id,
        project_id: local_project_id,
        iid: gitlab_issue.iid,
        title: gitlab_issue.title.clone(),
        description: gitlab_issue.description.clone(),
        state: gitlab_issue.state.clone(),
        author_username: gitlab_issue.author.username.clone(),
        created_at: iso_to_ms(&gitlab_issue.created_at),
        updated_at: iso_to_ms(&gitlab_issue.updated_at),
        last_seen_at: now_ms(),
        web_url: gitlab_issue.web_url.clone(),
    }
}

/// Extract labels from GitLab issue (CP1: name-only).
pub fn extract_labels(gitlab_issue: &GitLabIssue, local_project_id: i64) -> Vec<NormalizedLabel> {
    gitlab_issue
        .labels
        .iter()
        .map(|name| NormalizedLabel {
            project_id: local_project_id,
            name: name.clone(),
        })
        .collect()
}

Discussion Transformer

// src/gitlab/transformers/discussion.rs

use crate::core::time::{iso_to_ms, now_ms};
use crate::gitlab::types::GitLabDiscussion;

/// Normalized discussion ready for database insertion.
#[derive(Debug, Clone)]
pub struct NormalizedDiscussion {
    pub gitlab_discussion_id: String,
    pub project_id: i64,
    pub issue_id: i64,
    pub noteable_type: String,        // "Issue"
    pub individual_note: bool,
    pub first_note_at: Option<i64>,
    pub last_note_at: Option<i64>,
    pub last_seen_at: i64,
    pub resolvable: bool,
    pub resolved: bool,
}

/// Normalized note ready for database insertion.
#[derive(Debug, Clone)]
pub struct NormalizedNote {
    pub gitlab_id: i64,
    pub project_id: i64,
    pub note_type: Option<String>,
    pub is_system: bool,
    pub author_username: String,
    pub body: String,
    pub created_at: i64,
    pub updated_at: i64,
    pub last_seen_at: i64,
    pub position: i32,                // Array index in notes[]
    pub resolvable: bool,
    pub resolved: bool,
    pub resolved_by: Option<String>,
    pub resolved_at: Option<i64>,
}

/// Transform GitLab discussion to normalized schema.
pub fn transform_discussion(
    gitlab_discussion: &GitLabDiscussion,
    local_project_id: i64,
    local_issue_id: i64,
) -> NormalizedDiscussion {
    let note_times: Vec<i64> = gitlab_discussion
        .notes
        .iter()
        .map(|n| iso_to_ms(&n.created_at))
        .collect();

    // Check if any note is resolvable
    let resolvable = gitlab_discussion.notes.iter().any(|n| n.resolvable);
    let resolved = resolvable
        && gitlab_discussion
            .notes
            .iter()
            .all(|n| !n.resolvable || n.resolved);

    NormalizedDiscussion {
        gitlab_discussion_id: gitlab_discussion.id.clone(),
        project_id: local_project_id,
        issue_id: local_issue_id,
        noteable_type: "Issue".to_string(),
        individual_note: gitlab_discussion.individual_note,
        first_note_at: note_times.iter().min().copied(),
        last_note_at: note_times.iter().max().copied(),
        last_seen_at: now_ms(),
        resolvable,
        resolved,
    }
}

/// Transform GitLab notes to normalized schema.
pub fn transform_notes(
    gitlab_discussion: &GitLabDiscussion,
    local_project_id: i64,
) -> Vec<NormalizedNote> {
    gitlab_discussion
        .notes
        .iter()
        .enumerate()
        .map(|(index, note)| NormalizedNote {
            gitlab_id: note.id,
            project_id: local_project_id,
            note_type: note.note_type.clone(),
            is_system: note.system,
            author_username: note.author.username.clone(),
            body: note.body.clone(),
            created_at: iso_to_ms(&note.created_at),
            updated_at: iso_to_ms(&note.updated_at),
            last_seen_at: now_ms(),
            position: index as i32,
            resolvable: note.resolvable,
            resolved: note.resolved,
            resolved_by: note.resolved_by.as_ref().map(|a| a.username.clone()),
            resolved_at: note.resolved_at.as_ref().map(|s| iso_to_ms(s)),
        })
        .collect()
}

GitLab Client Additions

Pagination with Async Streams

// src/gitlab/client.rs (additions)

use crate::gitlab::types::{GitLabDiscussion, GitLabIssue};
use reqwest::header::HeaderMap;
use std::pin::Pin;
use futures::Stream;

impl GitLabClient {
    /// Paginate through issues for a project.
    /// Returns a stream of issues that handles pagination automatically.
    pub fn paginate_issues(
        &self,
        gitlab_project_id: i64,
        updated_after: Option<i64>,
        cursor_rewind_seconds: u32,
    ) -> Pin<Box<dyn Stream<Item = Result<GitLabIssue>> + Send + '_>> {
        Box::pin(async_stream::try_stream! {
            let mut page = 1u32;
            let per_page = 100u32;

            loop {
                let mut params = vec![
                    ("scope", "all".to_string()),
                    ("state", "all".to_string()),
                    ("order_by", "updated_at".to_string()),
                    ("sort", "asc".to_string()),
                    ("per_page", per_page.to_string()),
                    ("page", page.to_string()),
                ];

                if let Some(ts) = updated_after {
                    // Apply cursor rewind for safety, clamping to 0 to avoid underflow
                    let rewind_ms = (cursor_rewind_seconds as i64) * 1000;
                    let rewound = (ts - rewind_ms).max(0);
                    if let Some(dt) = chrono::DateTime::from_timestamp_millis(rewound) {
                        params.push(("updated_after", dt.to_rfc3339()));
                    }
                    // If conversion fails (shouldn't happen with max(0)), omit the param
                    // and fetch all issues (safe fallback).
                }

                let (issues, headers) = self
                    .request_with_headers::<Vec<GitLabIssue>>(
                        &format!("/api/v4/projects/{gitlab_project_id}/issues"),
                        &params,
                    )
                    .await?;

                for issue in issues.iter() {
                    yield issue.clone();
                }

                // Check for next page
                let next_page = headers
                    .get("x-next-page")
                    .and_then(|v| v.to_str().ok())
                    .and_then(|s| s.parse::<u32>().ok());

                match next_page {
                    Some(np) if !issues.is_empty() => page = np,
                    _ => break,
                }
            }
        })
    }

    /// Paginate through discussions for an issue.
    pub fn paginate_issue_discussions(
        &self,
        gitlab_project_id: i64,
        issue_iid: i64,
    ) -> Pin<Box<dyn Stream<Item = Result<GitLabDiscussion>> + Send + '_>> {
        Box::pin(async_stream::try_stream! {
            let mut page = 1u32;
            let per_page = 100u32;

            loop {
                let params = vec![
                    ("per_page", per_page.to_string()),
                    ("page", page.to_string()),
                ];

                let (discussions, headers) = self
                    .request_with_headers::<Vec<GitLabDiscussion>>(
                        &format!("/api/v4/projects/{gitlab_project_id}/issues/{issue_iid}/discussions"),
                        &params,
                    )
                    .await?;

                for discussion in discussions.iter() {
                    yield discussion.clone();
                }

                // Check for next page
                let next_page = headers
                    .get("x-next-page")
                    .and_then(|v| v.to_str().ok())
                    .and_then(|s| s.parse::<u32>().ok());

                match next_page {
                    Some(np) if !discussions.is_empty() => page = np,
                    _ => break,
                }
            }
        })
    }

    /// Make request and return response with headers for pagination.
    async fn request_with_headers<T: serde::de::DeserializeOwned>(
        &self,
        path: &str,
        params: &[(&str, String)],
    ) -> Result<(T, HeaderMap)> {
        self.rate_limiter.lock().await.acquire().await;

        let url = format!("{}{}", self.base_url, path);
        tracing::debug!(url = %url, "GitLab request");

        let response = self
            .client
            .get(&url)
            .header("PRIVATE-TOKEN", &self.token)
            .query(params)
            .send()
            .await
            .map_err(|e| GiError::GitLabNetworkError {
                base_url: self.base_url.clone(),
                source: Some(e),
            })?;

        let headers = response.headers().clone();
        let data = self.handle_response(response, path).await?;
        Ok((data, headers))
    }
}

Note: Requires adding async-stream and futures to Cargo.toml:

# Cargo.toml additions
async-stream = "0.3"
futures = "0.3"

Orchestration: Dependent Discussion Sync

Canonical Pattern (CP1)

When gi ingest --type=issues runs, it follows this orchestration:

  1. Ingest issues (cursor-based, with incremental cursor updates per page)

  2. Collect touched issues - For each issue that passed cursor tuple filtering, record:

    • local_issue_id
    • issue_iid
    • issue_updated_at
    • discussions_synced_for_updated_at (from DB)
  3. Filter for discussion sync - Enqueue issues where:

    issue.updated_at > issues.discussions_synced_for_updated_at
    

    This prevents re-fetching discussions for issues that haven't changed, even with cursor rewind.

  4. Execute discussion sync with bounded concurrency (dependent_concurrency from config)

  5. Update watermark - After each issue's discussions are successfully ingested:

    UPDATE issues SET discussions_synced_for_updated_at = ? WHERE id = ?
    

Invariant: A rerun MUST NOT refetch discussions for issues whose updated_at has not advanced, even with cursor rewind.


Ingestion Logic

Runtime Strategy

Decision: Use single-threaded Tokio runtime (flavor = "current_thread") for CP1.

Rationale:

  • rusqlite::Connection is !Send, which conflicts with multi-threaded runtimes
  • Single-threaded runtime avoids Send bounds entirely
  • Concurrency for discussion fetches uses tokio::task::spawn_local + LocalSet
  • Keeps code simple; can upgrade to channel-based DB writer in CP2 if needed
// src/main.rs
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
    // ...
}

Issue Ingestion

// src/ingestion/issues.rs

use futures::StreamExt;
use rusqlite::Connection;
use tracing::{debug, info};

use crate::core::config::Config;
use crate::core::error::Result;
use crate::core::payloads::store_payload;
use crate::core::time::now_ms;
use crate::gitlab::client::GitLabClient;
use crate::gitlab::transformers::issue::{extract_labels, transform_issue};

/// Result of issue ingestion.
#[derive(Debug, Default)]
pub struct IngestIssuesResult {
    pub fetched: usize,
    pub upserted: usize,
    pub labels_created: usize,
    /// Issues that need discussion sync (updated_at advanced)
    pub issues_needing_discussion_sync: Vec<IssueForDiscussionSync>,
}

/// Info needed to sync discussions for an issue.
#[derive(Debug, Clone)]
pub struct IssueForDiscussionSync {
    pub local_issue_id: i64,
    pub iid: i64,
    pub updated_at: i64,
}

/// Ingest issues for a project.
/// Returns list of issues that need discussion sync.
pub async fn ingest_issues(
    conn: &Connection,
    client: &GitLabClient,
    config: &Config,
    project_id: i64,           // Local DB project ID
    gitlab_project_id: i64,    // GitLab project ID
) -> Result<IngestIssuesResult> {
    let mut result = IngestIssuesResult::default();

    // Get current cursor
    let cursor = get_cursor(conn, project_id)?;
    let cursor_updated_at = cursor.0;
    let cursor_gitlab_id = cursor.1;

    let mut last_updated_at: Option<i64> = None;
    let mut last_gitlab_id: Option<i64> = None;
    let mut issues_in_page: Vec<(i64, i64, i64)> = Vec::new(); // (local_id, iid, updated_at)

    // Fetch issues with pagination
    let mut stream = client.paginate_issues(
        gitlab_project_id,
        cursor_updated_at,
        config.sync.cursor_rewind_seconds,
    );

    while let Some(issue_result) = stream.next().await {
        let issue = issue_result?;
        result.fetched += 1;

        let issue_updated_at = crate::core::time::iso_to_ms(&issue.updated_at);

        // Apply cursor filtering for tuple semantics
        if let (Some(cursor_ts), Some(cursor_id)) = (cursor_updated_at, cursor_gitlab_id) {
            if issue_updated_at < cursor_ts {
                continue;
            }
            if issue_updated_at == cursor_ts && issue.id <= cursor_id {
                continue;
            }
        }

        // Begin transaction for this issue (atomicity + performance)
        let tx = conn.unchecked_transaction()?;

        // Store raw payload
        let payload_id = store_payload(
            &tx,
            project_id,
            "issue",
            &issue.id.to_string(),
            &issue,
            config.storage.compress_raw_payloads,
        )?;

        // Transform and upsert issue
        let normalized = transform_issue(&issue, project_id);
        let changes = upsert_issue(&tx, &normalized, payload_id)?;
        if changes > 0 {
            result.upserted += 1;
        }

        // Get local issue ID for label linking
        let local_issue_id = get_local_issue_id(&tx, normalized.gitlab_id)?;

        // Clear existing label links (ensures removed labels are unlinked)
        clear_issue_labels(&tx, local_issue_id)?;

        // Extract and upsert labels (name-only for CP1)
        let labels = extract_labels(&issue, project_id);
        for label in &labels {
            let created = upsert_label(&tx, label)?;
            if created {
                result.labels_created += 1;
            }

            // Link issue to label
            let label_id = get_label_id(&tx, project_id, &label.name)?;
            link_issue_label(&tx, local_issue_id, label_id)?;
        }

        tx.commit()?;

        // Track for discussion sync eligibility
        issues_in_page.push((local_issue_id, issue.iid, issue_updated_at));

        // Track for cursor update
        last_updated_at = Some(issue_updated_at);
        last_gitlab_id = Some(issue.id);

        // Incremental cursor update every 100 issues (page boundary)
        // This ensures crashes don't cause massive refetch
        if result.fetched % 100 == 0 {
            if let (Some(updated_at), Some(gitlab_id)) = (last_updated_at, last_gitlab_id) {
                update_cursor(conn, project_id, "issues", updated_at, gitlab_id)?;
            }
        }
    }

    // Final cursor update
    if let (Some(updated_at), Some(gitlab_id)) = (last_updated_at, last_gitlab_id) {
        update_cursor(conn, project_id, "issues", updated_at, gitlab_id)?;
    }

    // Determine which issues need discussion sync (updated_at advanced)
    for (local_issue_id, iid, updated_at) in issues_in_page {
        let synced_at = get_discussions_synced_at(conn, local_issue_id)?;
        if synced_at.is_none() || updated_at > synced_at.unwrap() {
            result.issues_needing_discussion_sync.push(IssueForDiscussionSync {
                local_issue_id,
                iid,
                updated_at,
            });
        }
    }

    info!(
        project_id,
        fetched = result.fetched,
        upserted = result.upserted,
        labels_created = result.labels_created,
        need_discussion_sync = result.issues_needing_discussion_sync.len(),
        "Issue ingestion complete"
    );

    Ok(result)
}

fn get_cursor(conn: &Connection, project_id: i64) -> Result<(Option<i64>, Option<i64>)> {
    let mut stmt = conn.prepare(
        "SELECT updated_at_cursor, tie_breaker_id FROM sync_cursors
         WHERE project_id = ? AND resource_type = 'issues'"
    )?;

    let result = stmt.query_row([project_id], |row| {
        Ok((row.get::<_, Option<i64>>(0)?, row.get::<_, Option<i64>>(1)?))
    });

    match result {
        Ok(cursor) => Ok(cursor),
        Err(rusqlite::Error::QueryReturnedNoRows) => Ok((None, None)),
        Err(e) => Err(e.into()),
    }
}

fn get_discussions_synced_at(conn: &Connection, issue_id: i64) -> Result<Option<i64>> {
    let result = conn.query_row(
        "SELECT discussions_synced_for_updated_at FROM issues WHERE id = ?",
        [issue_id],
        |row| row.get(0),
    );

    match result {
        Ok(ts) => Ok(ts),
        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
        Err(e) => Err(e.into()),
    }
}

fn upsert_issue(
    conn: &Connection,
    issue: &crate::gitlab::transformers::issue::NormalizedIssue,
    payload_id: Option<i64>,
) -> Result<usize> {
    let changes = conn.execute(
        "INSERT INTO issues (
            gitlab_id, project_id, iid, title, description, state,
            author_username, created_at, updated_at, last_seen_at, web_url, raw_payload_id
        ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
        ON CONFLICT(gitlab_id) DO UPDATE SET
            title = excluded.title,
            description = excluded.description,
            state = excluded.state,
            updated_at = excluded.updated_at,
            last_seen_at = excluded.last_seen_at,
            raw_payload_id = excluded.raw_payload_id",
        rusqlite::params![
            issue.gitlab_id,
            issue.project_id,
            issue.iid,
            issue.title,
            issue.description,
            issue.state,
            issue.author_username,
            issue.created_at,
            issue.updated_at,
            issue.last_seen_at,
            issue.web_url,
            payload_id,
        ],
    )?;
    Ok(changes)
}

fn get_local_issue_id(conn: &Connection, gitlab_id: i64) -> Result<i64> {
    Ok(conn.query_row(
        "SELECT id FROM issues WHERE gitlab_id = ?",
        [gitlab_id],
        |row| row.get(0),
    )?)
}

fn clear_issue_labels(conn: &Connection, issue_id: i64) -> Result<()> {
    conn.execute("DELETE FROM issue_labels WHERE issue_id = ?", [issue_id])?;
    Ok(())
}

fn upsert_label(
    conn: &Connection,
    label: &crate::gitlab::transformers::issue::NormalizedLabel,
) -> Result<bool> {
    // CP1: Name-only labels. Color/description columns remain NULL.
    let changes = conn.execute(
        "INSERT INTO labels (project_id, name)
         VALUES (?1, ?2)
         ON CONFLICT(project_id, name) DO NOTHING",
        rusqlite::params![label.project_id, label.name],
    )?;
    Ok(changes > 0)
}

fn get_label_id(conn: &Connection, project_id: i64, name: &str) -> Result<i64> {
    Ok(conn.query_row(
        "SELECT id FROM labels WHERE project_id = ? AND name = ?",
        rusqlite::params![project_id, name],
        |row| row.get(0),
    )?)
}

fn link_issue_label(conn: &Connection, issue_id: i64, label_id: i64) -> Result<()> {
    conn.execute(
        "INSERT OR IGNORE INTO issue_labels (issue_id, label_id) VALUES (?, ?)",
        [issue_id, label_id],
    )?;
    Ok(())
}

fn update_cursor(
    conn: &Connection,
    project_id: i64,
    resource_type: &str,
    updated_at: i64,
    gitlab_id: i64,
) -> Result<()> {
    conn.execute(
        "INSERT INTO sync_cursors (project_id, resource_type, updated_at_cursor, tie_breaker_id)
         VALUES (?1, ?2, ?3, ?4)
         ON CONFLICT(project_id, resource_type) DO UPDATE SET
            updated_at_cursor = excluded.updated_at_cursor,
            tie_breaker_id = excluded.tie_breaker_id",
        rusqlite::params![project_id, resource_type, updated_at, gitlab_id],
    )?;
    Ok(())
}

Discussion Ingestion

// src/ingestion/discussions.rs

use futures::StreamExt;
use rusqlite::Connection;
use tracing::debug;

use crate::core::config::Config;
use crate::core::error::Result;
use crate::core::payloads::store_payload;
use crate::gitlab::client::GitLabClient;
use crate::gitlab::transformers::discussion::{transform_discussion, transform_notes};

/// Result of discussion ingestion for a single issue.
#[derive(Debug, Default)]
pub struct IngestDiscussionsResult {
    pub discussions_fetched: usize,
    pub discussions_upserted: usize,
    pub notes_upserted: usize,
    pub system_notes_count: usize,
}

/// Ingest discussions for a single issue.
/// Called only when issue.updated_at > discussions_synced_for_updated_at.
pub async fn ingest_issue_discussions(
    conn: &Connection,
    client: &GitLabClient,
    config: &Config,
    project_id: i64,
    gitlab_project_id: i64,
    issue_iid: i64,
    local_issue_id: i64,
    issue_updated_at: i64,
) -> Result<IngestDiscussionsResult> {
    let mut result = IngestDiscussionsResult::default();

    let mut stream = client.paginate_issue_discussions(gitlab_project_id, issue_iid);

    while let Some(discussion_result) = stream.next().await {
        let discussion = discussion_result?;
        result.discussions_fetched += 1;

        // Begin transaction for this discussion (atomicity + performance)
        let tx = conn.unchecked_transaction()?;

        // Store raw payload for discussion
        let discussion_payload_id = store_payload(
            &tx,
            project_id,
            "discussion",
            &discussion.id,
            &discussion,
            config.storage.compress_raw_payloads,
        )?;

        // Transform and upsert discussion
        let normalized = transform_discussion(&discussion, project_id, local_issue_id);
        upsert_discussion(&tx, &normalized, discussion_payload_id)?;
        result.discussions_upserted += 1;

        // Get local discussion ID
        let local_discussion_id = get_local_discussion_id(&tx, project_id, &discussion.id)?;

        // Transform and upsert notes
        let notes = transform_notes(&discussion, project_id);
        for note in &notes {
            // Store raw payload for note
            let gitlab_note = discussion.notes.iter().find(|n| n.id == note.gitlab_id);
            let note_payload_id = if let Some(gn) = gitlab_note {
                store_payload(
                    &tx,
                    project_id,
                    "note",
                    &note.gitlab_id.to_string(),
                    gn,
                    config.storage.compress_raw_payloads,
                )?
            } else {
                None
            };

            upsert_note(&tx, local_discussion_id, note, note_payload_id)?;
            result.notes_upserted += 1;

            if note.is_system {
                result.system_notes_count += 1;
            }
        }

        tx.commit()?;
    }

    // Mark discussions as synced for this issue version
    mark_discussions_synced(conn, local_issue_id, issue_updated_at)?;

    debug!(
        project_id,
        issue_iid,
        discussions = result.discussions_fetched,
        notes = result.notes_upserted,
        "Issue discussions ingested"
    );

    Ok(result)
}

fn upsert_discussion(
    conn: &Connection,
    discussion: &crate::gitlab::transformers::discussion::NormalizedDiscussion,
    payload_id: Option<i64>,
) -> Result<()> {
    conn.execute(
        "INSERT INTO discussions (
            gitlab_discussion_id, project_id, issue_id, noteable_type,
            individual_note, first_note_at, last_note_at, last_seen_at,
            resolvable, resolved, raw_payload_id
        ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
        ON CONFLICT(project_id, gitlab_discussion_id) DO UPDATE SET
            first_note_at = excluded.first_note_at,
            last_note_at = excluded.last_note_at,
            last_seen_at = excluded.last_seen_at,
            resolvable = excluded.resolvable,
            resolved = excluded.resolved,
            raw_payload_id = excluded.raw_payload_id",
        rusqlite::params![
            discussion.gitlab_discussion_id,
            discussion.project_id,
            discussion.issue_id,
            discussion.noteable_type,
            discussion.individual_note as i32,
            discussion.first_note_at,
            discussion.last_note_at,
            discussion.last_seen_at,
            discussion.resolvable as i32,
            discussion.resolved as i32,
            payload_id,
        ],
    )?;
    Ok(())
}

fn get_local_discussion_id(conn: &Connection, project_id: i64, gitlab_id: &str) -> Result<i64> {
    Ok(conn.query_row(
        "SELECT id FROM discussions WHERE project_id = ? AND gitlab_discussion_id = ?",
        rusqlite::params![project_id, gitlab_id],
        |row| row.get(0),
    )?)
}

fn upsert_note(
    conn: &Connection,
    discussion_id: i64,
    note: &crate::gitlab::transformers::discussion::NormalizedNote,
    payload_id: Option<i64>,
) -> Result<()> {
    conn.execute(
        "INSERT INTO notes (
            gitlab_id, discussion_id, project_id, note_type, is_system,
            author_username, body, created_at, updated_at, last_seen_at,
            position, resolvable, resolved, resolved_by, resolved_at,
            raw_payload_id
        ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)
        ON CONFLICT(gitlab_id) DO UPDATE SET
            body = excluded.body,
            updated_at = excluded.updated_at,
            last_seen_at = excluded.last_seen_at,
            resolved = excluded.resolved,
            resolved_by = excluded.resolved_by,
            resolved_at = excluded.resolved_at,
            raw_payload_id = excluded.raw_payload_id",
        rusqlite::params![
            note.gitlab_id,
            discussion_id,
            note.project_id,
            note.note_type,
            note.is_system as i32,
            note.author_username,
            note.body,
            note.created_at,
            note.updated_at,
            note.last_seen_at,
            note.position,
            note.resolvable as i32,
            note.resolved as i32,
            note.resolved_by,
            note.resolved_at,
            payload_id,
        ],
    )?;
    Ok(())
}

fn mark_discussions_synced(conn: &Connection, issue_id: i64, issue_updated_at: i64) -> Result<()> {
    conn.execute(
        "UPDATE issues SET discussions_synced_for_updated_at = ? WHERE id = ?",
        rusqlite::params![issue_updated_at, issue_id],
    )?;
    Ok(())
}

CLI Commands

gi ingest --type=issues

Fetch and store all issues from configured projects.

Clap Definition:

// src/cli/mod.rs (addition to Commands enum)

#[derive(Subcommand)]
pub enum Commands {
    // ... existing commands ...

    /// Ingest data from GitLab
    Ingest {
        /// Resource type to ingest
        #[arg(long, value_parser = ["issues", "merge_requests"])]
        r#type: String,

        /// Filter to single project
        #[arg(long)]
        project: Option<String>,

        /// Override stale sync lock
        #[arg(long)]
        force: bool,
    },

    /// List entities
    List {
        /// Entity type to list
        #[arg(value_parser = ["issues", "mrs"])]
        entity: String,

        /// Maximum results
        #[arg(long, default_value = "20")]
        limit: usize,

        /// Filter by project path
        #[arg(long)]
        project: Option<String>,

        /// Filter by state
        #[arg(long, value_parser = ["opened", "closed", "all"])]
        state: Option<String>,
    },

    /// Count entities
    Count {
        /// Entity type to count
        #[arg(value_parser = ["issues", "mrs", "discussions", "notes"])]
        entity: String,

        /// Filter by noteable type
        #[arg(long, value_parser = ["issue", "mr"])]
        r#type: Option<String>,
    },

    /// Show entity details
    Show {
        /// Entity type
        #[arg(value_parser = ["issue", "mr"])]
        entity: String,

        /// Entity IID
        iid: i64,

        /// Project path (required if ambiguous)
        #[arg(long)]
        project: Option<String>,
    },
}

Output:

Ingesting issues...

  group/project-one: 1,234 issues fetched, 45 new labels

Fetching discussions (312 issues with updates)...

  group/project-one: 312 issues → 1,234 discussions, 5,678 notes

Total: 1,234 issues, 1,234 discussions, 5,678 notes (excluding 1,234 system notes)
Skipped discussion sync for 922 unchanged issues.

gi list issues

Output:

Issues (showing 20 of 3,801)

  #1234  Authentication redesign                       opened   @johndoe   3 days ago
  #1233  Fix memory leak in cache                      closed   @janedoe   5 days ago
  #1232  Add dark mode support                         opened   @bobsmith  1 week ago
  ...

gi count issues

Output:

Issues: 3,801

gi show issue <iid>

Output:

Issue #1234: Authentication redesign
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

Project:  group/project-one
State:    opened
Author:   @johndoe
Created:  2024-01-15
Updated:  2024-03-20
Labels:   enhancement, auth
URL:      https://gitlab.example.com/group/project-one/-/issues/1234

Description:
  We need to redesign the authentication flow to support...

Discussions (5):

  @janedoe (2024-01-16):
    I agree we should move to JWT-based auth...

    @johndoe (2024-01-16):
      What about refresh token strategy?

  @bobsmith (2024-01-17):
    Have we considered OAuth2?

Automated Tests

Unit Tests

// tests/issue_transformer_tests.rs

#[cfg(test)]
mod tests {
    use gi::gitlab::transformers::issue::*;
    use gi::gitlab::types::*;

    #[test]
    fn transforms_gitlab_issue_to_normalized_schema() { /* ... */ }

    #[test]
    fn extracts_labels_from_issue_payload() { /* ... */ }

    #[test]
    fn handles_missing_optional_fields_gracefully() { /* ... */ }

    #[test]
    fn converts_iso_timestamps_to_ms_epoch() { /* ... */ }

    #[test]
    fn sets_last_seen_at_to_current_time() { /* ... */ }
}
// tests/discussion_transformer_tests.rs

#[cfg(test)]
mod tests {
    use gi::gitlab::transformers::discussion::*;

    #[test]
    fn transforms_discussion_payload_to_normalized_schema() { /* ... */ }

    #[test]
    fn extracts_notes_array_from_discussion() { /* ... */ }

    #[test]
    fn sets_individual_note_flag_correctly() { /* ... */ }

    #[test]
    fn flags_system_notes_with_is_system_true() { /* ... */ }

    #[test]
    fn preserves_note_order_via_position_field() { /* ... */ }

    #[test]
    fn computes_first_note_at_and_last_note_at_correctly() { /* ... */ }

    #[test]
    fn computes_resolvable_and_resolved_status() { /* ... */ }
}
// tests/pagination_tests.rs

#[cfg(test)]
mod tests {
    #[tokio::test]
    async fn fetches_all_pages_when_multiple_exist() { /* ... */ }

    #[tokio::test]
    async fn respects_per_page_parameter() { /* ... */ }

    #[tokio::test]
    async fn follows_x_next_page_header_until_empty() { /* ... */ }

    #[tokio::test]
    async fn falls_back_to_empty_page_stop_if_headers_missing() { /* ... */ }

    #[tokio::test]
    async fn applies_cursor_rewind_for_tuple_semantics() { /* ... */ }

    #[tokio::test]
    async fn clamps_negative_rewind_to_zero() { /* ... */ }
}
// tests/label_linkage_tests.rs

#[cfg(test)]
mod tests {
    #[test]
    fn clears_existing_labels_before_linking_new_set() { /* ... */ }

    #[test]
    fn removes_stale_label_links_on_issue_update() { /* ... */ }

    #[test]
    fn handles_issue_with_all_labels_removed() { /* ... */ }

    #[test]
    fn preserves_labels_that_still_exist() { /* ... */ }
}
// tests/discussion_watermark_tests.rs

#[cfg(test)]
mod tests {
    #[tokio::test]
    async fn skips_discussion_fetch_when_updated_at_unchanged() { /* ... */ }

    #[tokio::test]
    async fn fetches_discussions_when_updated_at_advanced() { /* ... */ }

    #[tokio::test]
    async fn updates_watermark_after_successful_discussion_sync() { /* ... */ }

    #[tokio::test]
    async fn does_not_update_watermark_on_discussion_sync_failure() { /* ... */ }
}

Integration Tests

// tests/issue_ingestion_tests.rs

#[cfg(test)]
mod tests {
    use tempfile::TempDir;
    use wiremock::{MockServer, Mock, ResponseTemplate};
    use wiremock::matchers::{method, path_regex};

    #[tokio::test]
    async fn inserts_issues_into_database() { /* ... */ }

    #[tokio::test]
    async fn creates_labels_from_issue_payloads() { /* ... */ }

    #[tokio::test]
    async fn links_issues_to_labels_via_junction_table() { /* ... */ }

    #[tokio::test]
    async fn removes_stale_label_links_on_resync() { /* ... */ }

    #[tokio::test]
    async fn stores_raw_payload_for_each_issue() { /* ... */ }

    #[tokio::test]
    async fn stores_raw_payload_for_each_discussion() { /* ... */ }

    #[tokio::test]
    async fn updates_cursor_incrementally_per_page() { /* ... */ }

    #[tokio::test]
    async fn resumes_from_cursor_on_subsequent_runs() { /* ... */ }

    #[tokio::test]
    async fn handles_issues_with_no_labels() { /* ... */ }

    #[tokio::test]
    async fn upserts_existing_issues_on_refetch() { /* ... */ }

    #[tokio::test]
    async fn skips_discussion_refetch_for_unchanged_issues() { /* ... */ }
}

Manual Smoke Tests

Command Expected Output Pass Criteria
gi ingest --type=issues Progress bar, final count Completes without error
gi list issues --limit=10 Table of 10 issues Shows iid, title, state, author
gi list issues --project=group/project-one Filtered list Only shows issues from that project
gi count issues Issues: N Count matches GitLab UI
gi show issue 123 Issue detail view Shows title, description, labels, discussions, URL
gi show issue 123 (ambiguous) Prompt or error Asks for --project clarification
gi count discussions --type=issue Issue Discussions: N Non-zero count
gi count notes --type=issue Issue Notes: N (excluding M system) Non-zero count
gi sync-status Last sync time, cursor positions Shows successful last run
gi ingest --type=issues (re-run) 0 new issues Cursor prevents re-fetch
gi ingest --type=issues (re-run) Skipped discussion sync for N unchanged issues Watermark prevents refetch
gi ingest --type=issues (concurrent) Lock error Second run fails with clear message
Remove label from issue in GitLab, re-sync Label link removed Junction table reflects GitLab state

Data Integrity Checks

After successful ingestion, verify:

  • SELECT COUNT(*) FROM issues matches GitLab issue count for configured projects
  • Every issue has a corresponding raw_payloads row
  • Every discussion has a corresponding raw_payloads row
  • Labels in issue_labels junction all exist in labels table
  • issue_labels count per issue matches GitLab UI label count
  • sync_cursors has entry for each (project_id, 'issues') pair
  • Re-running gi ingest --type=issues fetches 0 new items (cursor is current)
  • Re-running skips discussion sync for unchanged issues (watermark works)
  • SELECT COUNT(*) FROM discussions WHERE noteable_type='Issue' is non-zero
  • Every discussion has at least one note
  • individual_note = 1 discussions have exactly one note
  • SELECT COUNT(*) FROM notes WHERE is_system = 1 matches system note count in CLI output
  • After removing a label in GitLab and re-syncing, the link is removed from issue_labels

Definition of Done

Gate A: Issues Only (Must Pass First)

  • gi ingest --type=issues fetches all issues from configured projects
  • Issues stored with correct schema, including last_seen_at
  • Cursor-based sync is resumable (re-run fetches only new/updated)
  • Incremental cursor updates every 100 issues
  • Raw payloads stored for each issue
  • gi list issues and gi count issues work

Gate B: Labels Correct (Must Pass)

  • Labels extracted and stored (name-only)
  • Label links created correctly
  • Stale label links removed on re-sync (verified with test)
  • Label count per issue matches GitLab

Gate C: Dependent Discussion Sync (Must Pass)

  • Discussions fetched for issues with updated_at advancement
  • Notes stored with is_system flag correctly set
  • Raw payloads stored for discussions and notes
  • discussions_synced_for_updated_at watermark updated after sync
  • Unchanged issues skip discussion refetch (verified with test)
  • Bounded concurrency (dependent_concurrency respected)

Gate D: Resumability Proof (Must Pass)

  • Kill mid-run, rerun; bounded redo (cursor progress preserved)
  • No redundant discussion refetch after crash recovery
  • Single-flight lock prevents concurrent runs

Final Gate (Must Pass)

  • All unit tests pass (cargo test)
  • All integration tests pass (mocked with wiremock)
  • cargo clippy passes with no warnings
  • cargo fmt --check passes
  • Compiles with --release

Hardening (Optional Before CP2)

  • Edge cases: issues with 0 labels, 0 discussions
  • Large pagination (100+ pages)
  • Rate limit handling under sustained load
  • Live tests pass against real GitLab instance
  • Performance: 1000+ issues ingested in <5 min

Implementation Order

  1. Runtime decision (5 min)

    • Confirm #[tokio::main(flavor = "current_thread")]
    • Add note about upgrade path for CP2 if needed
  2. Cargo.toml updates (5 min)

    • Add async-stream = "0.3" and futures = "0.3"
  3. Database migration (15 min)

    • migrations/002_issues.sql with discussions_synced_for_updated_at column
    • raw_payload_id on discussions table
    • Update MIGRATIONS const in src/core/db.rs
  4. GitLab types (15 min)

    • Add types to src/gitlab/types.rs (no labels_details)
    • Test deserialization with fixtures
  5. Transformers (25 min)

    • src/gitlab/transformers/mod.rs
    • src/gitlab/transformers/issue.rs (simplified NormalizedLabel)
    • src/gitlab/transformers/discussion.rs
    • Unit tests
  6. GitLab client pagination (25 min)

    • Add paginate_issues() with underflow protection
    • Add paginate_issue_discussions()
    • Add request_with_headers() helper
  7. Issue ingestion (45 min)

    • src/ingestion/mod.rs
    • src/ingestion/issues.rs with:
      • Transaction batching
      • clear_issue_labels() before linking
      • Incremental cursor updates
      • Return issues_needing_discussion_sync
    • Unit + integration tests including label stale-link removal
  8. Discussion ingestion (30 min)

    • src/ingestion/discussions.rs with:
      • Transaction batching
      • raw_payload_id storage
      • mark_discussions_synced() watermark update
    • Integration tests including watermark behavior
  9. Orchestrator (30 min)

    • src/ingestion/orchestrator.rs
    • Coordinates issue sync → filter for discussion needs → bounded discussion sync
    • Integration tests
  10. CLI commands (45 min)

    • gi ingest --type=issues
    • gi list issues
    • gi count issues|discussions|notes
    • gi show issue <iid>
    • Enhanced gi sync-status
  11. Final validation (20 min)

    • cargo test
    • cargo clippy
    • Gate A/B/C/D verification
    • Manual smoke tests
    • Data integrity checks

Risks & Mitigations

Risk Mitigation
GitLab rate limiting during large sync Respect Retry-After, exponential backoff, configurable concurrency
Discussion API N+1 problem (thousands of calls) dependent_concurrency config limits parallel requests; watermark prevents refetch
Cursor drift if GitLab timestamp behavior changes Rolling backfill window catches missed items
Large issues with 100+ discussions Paginate discussions, bound memory usage
System notes pollute data is_system flag allows filtering
Label deduplication across projects Unique constraint on (project_id, name)
Stale label links accumulate clear_issue_labels() before linking ensures correctness
Async stream complexity Use async-stream crate for ergonomic generators
rusqlite + async runtime Send/locking pitfalls Single-threaded runtime (current_thread) avoids Send bounds
Crash causes massive refetch Incremental cursor updates every 100 issues
Cursor rewind causes discussion refetch Per-issue watermark (discussions_synced_for_updated_at)
Timestamp underflow on rewind Clamp to 0 with .max(0)

API Call Estimation

For a project with 3,000 issues:

  • Issue list: ceil(3000/100) = 30 calls
  • Issue discussions (first run): 3000 × 1.2 average pages = 3,600 calls
  • Issue discussions (subsequent runs, 10% updates): 300 × 1.2 = 360 calls
  • Total first run: ~3,630 calls per project
  • Total subsequent run: ~390 calls per project (90% savings from watermark)

At 10 requests/second:

  • First run: ~6 minutes per project
  • Subsequent run: ~40 seconds per project

References