# Checkpoint 1: Issue Ingestion - PRD **Version:** 2.0 **Status:** Ready for Implementation **Depends On:** Checkpoint 0 (Project Setup) **Enables:** Checkpoint 2 (MR Ingestion) --- ## Overview ### Objective Ingest all issues, labels, and issue discussions from configured GitLab repositories with resumable cursor-based incremental sync. This checkpoint establishes the core data ingestion pattern that will be reused for MRs in Checkpoint 2. ### Success Criteria | Criterion | Validation | |-----------|------------| | `gi ingest --type=issues` fetches all issues | `gi count issues` matches GitLab UI | | Labels extracted from issue payloads (name-only) | `labels` table populated | | Label linkage reflects current GitLab state | Removed labels are unlinked on re-sync | | Issue discussions fetched per-issue (dependent sync) | For issues whose `updated_at` advanced, discussions and notes upserted | | Cursor-based sync is resumable | Re-running fetches 0 new items | | Discussion sync skips unchanged issues | Per-issue watermark prevents redundant fetches | | Sync tracking records all runs | `sync_runs` table has complete audit trail | | Single-flight lock prevents concurrent runs | Second sync fails with clear error | --- ## Internal Gates CP1 is validated incrementally via internal gates: | Gate | Scope | Validation | |------|-------|------------| | **Gate A** | Issues only | Cursor + upsert + raw payloads + list/count/show working | | **Gate B** | Labels correct | Stale-link removal verified; label count matches GitLab | | **Gate C** | Dependent discussion sync | Watermark prevents redundant refetch; concurrency bounded | | **Gate D** | Resumability proof | Kill mid-run, rerun; confirm bounded redo and no redundant discussion refetch | --- ## Deliverables ### 1. Project Structure Additions Add the following to the existing Rust structure from Checkpoint 0: ``` gitlab-inbox/ ├── src/ │ ├── cli/ │ │ └── commands/ │ │ ├── ingest.rs # gi ingest --type=issues|merge_requests │ │ ├── list.rs # gi list issues|mrs │ │ ├── count.rs # gi count issues|mrs|discussions|notes │ │ └── show.rs # gi show issue|mr │ ├── gitlab/ │ │ ├── types.rs # Add GitLabIssue, GitLabDiscussion, GitLabNote │ │ └── transformers/ │ │ ├── mod.rs │ │ ├── issue.rs # GitLab → normalized issue │ │ └── discussion.rs # GitLab → normalized discussion/notes │ └── ingestion/ │ ├── mod.rs │ ├── orchestrator.rs # Coordinates issue + dependent discussion sync │ ├── issues.rs # Issue fetcher with pagination │ └── discussions.rs # Discussion fetcher (per-issue) ├── tests/ │ ├── issue_transformer_tests.rs │ ├── discussion_transformer_tests.rs │ ├── pagination_tests.rs │ ├── issue_ingestion_tests.rs │ ├── label_linkage_tests.rs # Verifies stale link removal │ ├── discussion_watermark_tests.rs │ └── fixtures/ │ ├── gitlab_issue.json │ ├── gitlab_issues_page.json │ ├── gitlab_discussion.json │ └── gitlab_discussions_page.json └── migrations/ └── 002_issues.sql ``` ### 2. GitLab API Endpoints **Issues (Bulk Fetch):** ``` GET /projects/:id/issues?scope=all&state=all&updated_after=X&order_by=updated_at&sort=asc&per_page=100 ``` **Issue Discussions (Per-Issue Fetch):** ``` GET /projects/:id/issues/:iid/discussions?per_page=100&page=N ``` **Required Query Parameters:** - `scope=all` - Include all issues, not just authored by current user - `state=all` - Include closed issues (GitLab may default to open only) **MVP Note (Labels):** - CP1 stores labels by **name only** for maximum compatibility and stability. - Label color/description ingestion is deferred (post-CP1) via Labels API if needed. - This avoids relying on optional/variant payload shapes that differ across GitLab versions. **Pagination:** - Follow `x-next-page` header until empty/absent - Fall back to empty-page detection if headers missing (robustness) - Per-page maximum: 100 --- ## Database Schema ### Migration 002_issues.sql ```sql -- Issues table CREATE TABLE issues ( id INTEGER PRIMARY KEY, gitlab_id INTEGER UNIQUE NOT NULL, project_id INTEGER NOT NULL REFERENCES projects(id), iid INTEGER NOT NULL, title TEXT, description TEXT, state TEXT, -- 'opened' | 'closed' author_username TEXT, created_at INTEGER, -- ms epoch UTC updated_at INTEGER, -- ms epoch UTC last_seen_at INTEGER NOT NULL, -- ms epoch UTC, updated on every upsert -- Prevents re-fetching discussions on cursor rewind / reruns unless issue changed. -- Set to issue.updated_at after successfully syncing all discussions for this issue. discussions_synced_for_updated_at INTEGER, web_url TEXT, raw_payload_id INTEGER REFERENCES raw_payloads(id) ); CREATE INDEX idx_issues_project_updated ON issues(project_id, updated_at); CREATE INDEX idx_issues_author ON issues(author_username); CREATE INDEX idx_issues_discussions_sync ON issues(project_id, discussions_synced_for_updated_at); CREATE UNIQUE INDEX uq_issues_project_iid ON issues(project_id, iid); -- Labels (derived from issue payloads) -- CP1: Name-only for stability. Color/description deferred to Labels API integration. -- Uniqueness is (project_id, name) since gitlab_id isn't always available. CREATE TABLE labels ( id INTEGER PRIMARY KEY, gitlab_id INTEGER, -- optional (populated if Labels API used later) project_id INTEGER NOT NULL REFERENCES projects(id), name TEXT NOT NULL, color TEXT, -- nullable, populated later if needed description TEXT -- nullable, populated later if needed ); CREATE UNIQUE INDEX uq_labels_project_name ON labels(project_id, name); CREATE INDEX idx_labels_name ON labels(name); -- Issue-Label junction -- IMPORTANT: On issue update, DELETE existing links then INSERT current set. -- This ensures removed labels are unlinked (not just added). CREATE TABLE issue_labels ( issue_id INTEGER REFERENCES issues(id) ON DELETE CASCADE, label_id INTEGER REFERENCES labels(id) ON DELETE CASCADE, PRIMARY KEY(issue_id, label_id) ); CREATE INDEX idx_issue_labels_label ON issue_labels(label_id); -- Discussion threads for issues CREATE TABLE discussions ( id INTEGER PRIMARY KEY, gitlab_discussion_id TEXT NOT NULL, -- GitLab's string ID (e.g., "6a9c1750b37d...") project_id INTEGER NOT NULL REFERENCES projects(id), issue_id INTEGER REFERENCES issues(id), merge_request_id INTEGER, -- FK added in CP2 via ALTER TABLE noteable_type TEXT NOT NULL, -- 'Issue' | 'MergeRequest' individual_note INTEGER NOT NULL, -- 1 = standalone comment, 0 = threaded first_note_at INTEGER, -- ms epoch UTC, for ordering discussions last_note_at INTEGER, -- ms epoch UTC, for "recently active" queries last_seen_at INTEGER NOT NULL, -- ms epoch UTC, updated on every upsert resolvable INTEGER, -- MR discussions can be resolved resolved INTEGER, raw_payload_id INTEGER REFERENCES raw_payloads(id), CHECK ( (noteable_type='Issue' AND issue_id IS NOT NULL AND merge_request_id IS NULL) OR (noteable_type='MergeRequest' AND merge_request_id IS NOT NULL AND issue_id IS NULL) ) ); CREATE UNIQUE INDEX uq_discussions_project_discussion_id ON discussions(project_id, gitlab_discussion_id); CREATE INDEX idx_discussions_issue ON discussions(issue_id); CREATE INDEX idx_discussions_mr ON discussions(merge_request_id); CREATE INDEX idx_discussions_last_note ON discussions(last_note_at); -- Notes belong to discussions (preserving thread context) CREATE TABLE notes ( id INTEGER PRIMARY KEY, gitlab_id INTEGER UNIQUE NOT NULL, discussion_id INTEGER NOT NULL REFERENCES discussions(id), project_id INTEGER NOT NULL REFERENCES projects(id), note_type TEXT, -- 'DiscussionNote' | 'DiffNote' | null is_system INTEGER NOT NULL DEFAULT 0, -- 1 for system notes (assignments, label changes) author_username TEXT, body TEXT, created_at INTEGER, -- ms epoch UTC updated_at INTEGER, -- ms epoch UTC last_seen_at INTEGER NOT NULL, -- ms epoch UTC, updated on every upsert position INTEGER, -- derived from array order in API response (0-indexed) resolvable INTEGER, resolved INTEGER, resolved_by TEXT, resolved_at INTEGER, -- ms epoch UTC -- DiffNote position metadata (populated for MR DiffNotes in CP2) position_old_path TEXT, position_new_path TEXT, position_old_line INTEGER, position_new_line INTEGER, raw_payload_id INTEGER REFERENCES raw_payloads(id) ); CREATE INDEX idx_notes_discussion ON notes(discussion_id); CREATE INDEX idx_notes_author ON notes(author_username); CREATE INDEX idx_notes_system ON notes(is_system); -- Update schema version INSERT INTO schema_version (version, applied_at, description) VALUES (2, strftime('%s', 'now') * 1000, 'Issues, labels, discussions, notes'); ``` --- ## GitLab Types ### Type Definitions ```rust // src/gitlab/types.rs (additions) use serde::Deserialize; /// GitLab issue from the API. #[derive(Debug, Clone, Deserialize)] pub struct GitLabIssue { pub id: i64, // GitLab global ID pub iid: i64, // Project-scoped issue number pub project_id: i64, pub title: String, pub description: Option, pub state: String, // "opened" | "closed" pub created_at: String, // ISO 8601 pub updated_at: String, // ISO 8601 pub closed_at: Option, pub author: GitLabAuthor, pub labels: Vec, // Array of label names (CP1 canonical) pub web_url: String, // NOTE: labels_details is intentionally NOT modeled for CP1. // The field name and shape varies across GitLab versions. // Color/description can be fetched via Labels API if needed later. } #[derive(Debug, Clone, Deserialize)] pub struct GitLabAuthor { pub id: i64, pub username: String, pub name: String, } /// GitLab discussion (thread of notes). #[derive(Debug, Clone, Deserialize)] pub struct GitLabDiscussion { pub id: String, // String ID like "6a9c1750b37d..." pub individual_note: bool, // true = standalone comment pub notes: Vec, } /// GitLab note (comment). #[derive(Debug, Clone, Deserialize)] pub struct GitLabNote { pub id: i64, #[serde(rename = "type")] pub note_type: Option, // "DiscussionNote" | "DiffNote" | null pub body: String, pub author: GitLabAuthor, pub created_at: String, // ISO 8601 pub updated_at: String, // ISO 8601 pub system: bool, // true for system-generated notes #[serde(default)] pub resolvable: bool, #[serde(default)] pub resolved: bool, pub resolved_by: Option, pub resolved_at: Option, /// DiffNote specific (null for non-DiffNote) pub position: Option, } #[derive(Debug, Clone, Deserialize)] pub struct GitLabNotePosition { pub old_path: Option, pub new_path: Option, pub old_line: Option, pub new_line: Option, } ``` --- ## Transformers ### Issue Transformer ```rust // src/gitlab/transformers/issue.rs use crate::core::time::{iso_to_ms, now_ms}; use crate::gitlab::types::GitLabIssue; /// Normalized issue ready for database insertion. #[derive(Debug, Clone)] pub struct NormalizedIssue { pub gitlab_id: i64, pub project_id: i64, // Local DB project ID pub iid: i64, pub title: String, pub description: Option, pub state: String, pub author_username: String, pub created_at: i64, // ms epoch pub updated_at: i64, // ms epoch pub last_seen_at: i64, // ms epoch pub web_url: String, } /// Normalized label ready for database insertion. /// CP1: Name-only for stability. #[derive(Debug, Clone)] pub struct NormalizedLabel { pub project_id: i64, pub name: String, } /// Transform GitLab issue to normalized schema. pub fn transform_issue(gitlab_issue: &GitLabIssue, local_project_id: i64) -> NormalizedIssue { NormalizedIssue { gitlab_id: gitlab_issue.id, project_id: local_project_id, iid: gitlab_issue.iid, title: gitlab_issue.title.clone(), description: gitlab_issue.description.clone(), state: gitlab_issue.state.clone(), author_username: gitlab_issue.author.username.clone(), created_at: iso_to_ms(&gitlab_issue.created_at), updated_at: iso_to_ms(&gitlab_issue.updated_at), last_seen_at: now_ms(), web_url: gitlab_issue.web_url.clone(), } } /// Extract labels from GitLab issue (CP1: name-only). pub fn extract_labels(gitlab_issue: &GitLabIssue, local_project_id: i64) -> Vec { gitlab_issue .labels .iter() .map(|name| NormalizedLabel { project_id: local_project_id, name: name.clone(), }) .collect() } ``` ### Discussion Transformer ```rust // src/gitlab/transformers/discussion.rs use crate::core::time::{iso_to_ms, now_ms}; use crate::gitlab::types::GitLabDiscussion; /// Normalized discussion ready for database insertion. #[derive(Debug, Clone)] pub struct NormalizedDiscussion { pub gitlab_discussion_id: String, pub project_id: i64, pub issue_id: i64, pub noteable_type: String, // "Issue" pub individual_note: bool, pub first_note_at: Option, pub last_note_at: Option, pub last_seen_at: i64, pub resolvable: bool, pub resolved: bool, } /// Normalized note ready for database insertion. #[derive(Debug, Clone)] pub struct NormalizedNote { pub gitlab_id: i64, pub project_id: i64, pub note_type: Option, pub is_system: bool, pub author_username: String, pub body: String, pub created_at: i64, pub updated_at: i64, pub last_seen_at: i64, pub position: i32, // Array index in notes[] pub resolvable: bool, pub resolved: bool, pub resolved_by: Option, pub resolved_at: Option, } /// Transform GitLab discussion to normalized schema. pub fn transform_discussion( gitlab_discussion: &GitLabDiscussion, local_project_id: i64, local_issue_id: i64, ) -> NormalizedDiscussion { let note_times: Vec = gitlab_discussion .notes .iter() .map(|n| iso_to_ms(&n.created_at)) .collect(); // Check if any note is resolvable let resolvable = gitlab_discussion.notes.iter().any(|n| n.resolvable); let resolved = resolvable && gitlab_discussion .notes .iter() .all(|n| !n.resolvable || n.resolved); NormalizedDiscussion { gitlab_discussion_id: gitlab_discussion.id.clone(), project_id: local_project_id, issue_id: local_issue_id, noteable_type: "Issue".to_string(), individual_note: gitlab_discussion.individual_note, first_note_at: note_times.iter().min().copied(), last_note_at: note_times.iter().max().copied(), last_seen_at: now_ms(), resolvable, resolved, } } /// Transform GitLab notes to normalized schema. pub fn transform_notes( gitlab_discussion: &GitLabDiscussion, local_project_id: i64, ) -> Vec { gitlab_discussion .notes .iter() .enumerate() .map(|(index, note)| NormalizedNote { gitlab_id: note.id, project_id: local_project_id, note_type: note.note_type.clone(), is_system: note.system, author_username: note.author.username.clone(), body: note.body.clone(), created_at: iso_to_ms(¬e.created_at), updated_at: iso_to_ms(¬e.updated_at), last_seen_at: now_ms(), position: index as i32, resolvable: note.resolvable, resolved: note.resolved, resolved_by: note.resolved_by.as_ref().map(|a| a.username.clone()), resolved_at: note.resolved_at.as_ref().map(|s| iso_to_ms(s)), }) .collect() } ``` --- ## GitLab Client Additions ### Pagination with Async Streams ```rust // src/gitlab/client.rs (additions) use crate::gitlab::types::{GitLabDiscussion, GitLabIssue}; use reqwest::header::HeaderMap; use std::pin::Pin; use futures::Stream; impl GitLabClient { /// Paginate through issues for a project. /// Returns a stream of issues that handles pagination automatically. pub fn paginate_issues( &self, gitlab_project_id: i64, updated_after: Option, cursor_rewind_seconds: u32, ) -> Pin> + Send + '_>> { Box::pin(async_stream::try_stream! { let mut page = 1u32; let per_page = 100u32; loop { let mut params = vec![ ("scope", "all".to_string()), ("state", "all".to_string()), ("order_by", "updated_at".to_string()), ("sort", "asc".to_string()), ("per_page", per_page.to_string()), ("page", page.to_string()), ]; if let Some(ts) = updated_after { // Apply cursor rewind for safety, clamping to 0 to avoid underflow let rewind_ms = (cursor_rewind_seconds as i64) * 1000; let rewound = (ts - rewind_ms).max(0); if let Some(dt) = chrono::DateTime::from_timestamp_millis(rewound) { params.push(("updated_after", dt.to_rfc3339())); } // If conversion fails (shouldn't happen with max(0)), omit the param // and fetch all issues (safe fallback). } let (issues, headers) = self .request_with_headers::>( &format!("/api/v4/projects/{gitlab_project_id}/issues"), ¶ms, ) .await?; for issue in issues.iter() { yield issue.clone(); } // Check for next page let next_page = headers .get("x-next-page") .and_then(|v| v.to_str().ok()) .and_then(|s| s.parse::().ok()); match next_page { Some(np) if !issues.is_empty() => page = np, _ => break, } } }) } /// Paginate through discussions for an issue. pub fn paginate_issue_discussions( &self, gitlab_project_id: i64, issue_iid: i64, ) -> Pin> + Send + '_>> { Box::pin(async_stream::try_stream! { let mut page = 1u32; let per_page = 100u32; loop { let params = vec![ ("per_page", per_page.to_string()), ("page", page.to_string()), ]; let (discussions, headers) = self .request_with_headers::>( &format!("/api/v4/projects/{gitlab_project_id}/issues/{issue_iid}/discussions"), ¶ms, ) .await?; for discussion in discussions.iter() { yield discussion.clone(); } // Check for next page let next_page = headers .get("x-next-page") .and_then(|v| v.to_str().ok()) .and_then(|s| s.parse::().ok()); match next_page { Some(np) if !discussions.is_empty() => page = np, _ => break, } } }) } /// Make request and return response with headers for pagination. async fn request_with_headers( &self, path: &str, params: &[(&str, String)], ) -> Result<(T, HeaderMap)> { self.rate_limiter.lock().await.acquire().await; let url = format!("{}{}", self.base_url, path); tracing::debug!(url = %url, "GitLab request"); let response = self .client .get(&url) .header("PRIVATE-TOKEN", &self.token) .query(params) .send() .await .map_err(|e| GiError::GitLabNetworkError { base_url: self.base_url.clone(), source: Some(e), })?; let headers = response.headers().clone(); let data = self.handle_response(response, path).await?; Ok((data, headers)) } } ``` **Note:** Requires adding `async-stream` and `futures` to Cargo.toml: ```toml # Cargo.toml additions async-stream = "0.3" futures = "0.3" ``` --- ## Orchestration: Dependent Discussion Sync ### Canonical Pattern (CP1) When `gi ingest --type=issues` runs, it follows this orchestration: 1. **Ingest issues** (cursor-based, with incremental cursor updates per page) 2. **Collect touched issues** - For each issue that passed cursor tuple filtering, record: - `local_issue_id` - `issue_iid` - `issue_updated_at` - `discussions_synced_for_updated_at` (from DB) 3. **Filter for discussion sync** - Enqueue issues where: ``` issue.updated_at > issues.discussions_synced_for_updated_at ``` This prevents re-fetching discussions for issues that haven't changed, even with cursor rewind. 4. **Execute discussion sync** with bounded concurrency (`dependent_concurrency` from config) 5. **Update watermark** - After each issue's discussions are successfully ingested: ```sql UPDATE issues SET discussions_synced_for_updated_at = ? WHERE id = ? ``` **Invariant:** A rerun MUST NOT refetch discussions for issues whose `updated_at` has not advanced, even with cursor rewind. --- ## Ingestion Logic ### Runtime Strategy **Decision:** Use single-threaded Tokio runtime (`flavor = "current_thread"`) for CP1. **Rationale:** - `rusqlite::Connection` is `!Send`, which conflicts with multi-threaded runtimes - Single-threaded runtime avoids Send bounds entirely - Concurrency for discussion fetches uses `tokio::task::spawn_local` + `LocalSet` - Keeps code simple; can upgrade to channel-based DB writer in CP2 if needed ```rust // src/main.rs #[tokio::main(flavor = "current_thread")] async fn main() -> Result<()> { // ... } ``` ### Issue Ingestion ```rust // src/ingestion/issues.rs use futures::StreamExt; use rusqlite::Connection; use tracing::{debug, info}; use crate::core::config::Config; use crate::core::error::Result; use crate::core::payloads::store_payload; use crate::core::time::now_ms; use crate::gitlab::client::GitLabClient; use crate::gitlab::transformers::issue::{extract_labels, transform_issue}; /// Result of issue ingestion. #[derive(Debug, Default)] pub struct IngestIssuesResult { pub fetched: usize, pub upserted: usize, pub labels_created: usize, /// Issues that need discussion sync (updated_at advanced) pub issues_needing_discussion_sync: Vec, } /// Info needed to sync discussions for an issue. #[derive(Debug, Clone)] pub struct IssueForDiscussionSync { pub local_issue_id: i64, pub iid: i64, pub updated_at: i64, } /// Ingest issues for a project. /// Returns list of issues that need discussion sync. pub async fn ingest_issues( conn: &Connection, client: &GitLabClient, config: &Config, project_id: i64, // Local DB project ID gitlab_project_id: i64, // GitLab project ID ) -> Result { let mut result = IngestIssuesResult::default(); // Get current cursor let cursor = get_cursor(conn, project_id)?; let cursor_updated_at = cursor.0; let cursor_gitlab_id = cursor.1; let mut last_updated_at: Option = None; let mut last_gitlab_id: Option = None; let mut issues_in_page: Vec<(i64, i64, i64)> = Vec::new(); // (local_id, iid, updated_at) // Fetch issues with pagination let mut stream = client.paginate_issues( gitlab_project_id, cursor_updated_at, config.sync.cursor_rewind_seconds, ); while let Some(issue_result) = stream.next().await { let issue = issue_result?; result.fetched += 1; let issue_updated_at = crate::core::time::iso_to_ms(&issue.updated_at); // Apply cursor filtering for tuple semantics if let (Some(cursor_ts), Some(cursor_id)) = (cursor_updated_at, cursor_gitlab_id) { if issue_updated_at < cursor_ts { continue; } if issue_updated_at == cursor_ts && issue.id <= cursor_id { continue; } } // Begin transaction for this issue (atomicity + performance) let tx = conn.unchecked_transaction()?; // Store raw payload let payload_id = store_payload( &tx, project_id, "issue", &issue.id.to_string(), &issue, config.storage.compress_raw_payloads, )?; // Transform and upsert issue let normalized = transform_issue(&issue, project_id); let changes = upsert_issue(&tx, &normalized, payload_id)?; if changes > 0 { result.upserted += 1; } // Get local issue ID for label linking let local_issue_id = get_local_issue_id(&tx, normalized.gitlab_id)?; // Clear existing label links (ensures removed labels are unlinked) clear_issue_labels(&tx, local_issue_id)?; // Extract and upsert labels (name-only for CP1) let labels = extract_labels(&issue, project_id); for label in &labels { let created = upsert_label(&tx, label)?; if created { result.labels_created += 1; } // Link issue to label let label_id = get_label_id(&tx, project_id, &label.name)?; link_issue_label(&tx, local_issue_id, label_id)?; } tx.commit()?; // Track for discussion sync eligibility issues_in_page.push((local_issue_id, issue.iid, issue_updated_at)); // Track for cursor update last_updated_at = Some(issue_updated_at); last_gitlab_id = Some(issue.id); // Incremental cursor update every 100 issues (page boundary) // This ensures crashes don't cause massive refetch if result.fetched % 100 == 0 { if let (Some(updated_at), Some(gitlab_id)) = (last_updated_at, last_gitlab_id) { update_cursor(conn, project_id, "issues", updated_at, gitlab_id)?; } } } // Final cursor update if let (Some(updated_at), Some(gitlab_id)) = (last_updated_at, last_gitlab_id) { update_cursor(conn, project_id, "issues", updated_at, gitlab_id)?; } // Determine which issues need discussion sync (updated_at advanced) for (local_issue_id, iid, updated_at) in issues_in_page { let synced_at = get_discussions_synced_at(conn, local_issue_id)?; if synced_at.is_none() || updated_at > synced_at.unwrap() { result.issues_needing_discussion_sync.push(IssueForDiscussionSync { local_issue_id, iid, updated_at, }); } } info!( project_id, fetched = result.fetched, upserted = result.upserted, labels_created = result.labels_created, need_discussion_sync = result.issues_needing_discussion_sync.len(), "Issue ingestion complete" ); Ok(result) } fn get_cursor(conn: &Connection, project_id: i64) -> Result<(Option, Option)> { let mut stmt = conn.prepare( "SELECT updated_at_cursor, tie_breaker_id FROM sync_cursors WHERE project_id = ? AND resource_type = 'issues'" )?; let result = stmt.query_row([project_id], |row| { Ok((row.get::<_, Option>(0)?, row.get::<_, Option>(1)?)) }); match result { Ok(cursor) => Ok(cursor), Err(rusqlite::Error::QueryReturnedNoRows) => Ok((None, None)), Err(e) => Err(e.into()), } } fn get_discussions_synced_at(conn: &Connection, issue_id: i64) -> Result> { let result = conn.query_row( "SELECT discussions_synced_for_updated_at FROM issues WHERE id = ?", [issue_id], |row| row.get(0), ); match result { Ok(ts) => Ok(ts), Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), Err(e) => Err(e.into()), } } fn upsert_issue( conn: &Connection, issue: &crate::gitlab::transformers::issue::NormalizedIssue, payload_id: Option, ) -> Result { let changes = conn.execute( "INSERT INTO issues ( gitlab_id, project_id, iid, title, description, state, author_username, created_at, updated_at, last_seen_at, web_url, raw_payload_id ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12) ON CONFLICT(gitlab_id) DO UPDATE SET title = excluded.title, description = excluded.description, state = excluded.state, updated_at = excluded.updated_at, last_seen_at = excluded.last_seen_at, raw_payload_id = excluded.raw_payload_id", rusqlite::params![ issue.gitlab_id, issue.project_id, issue.iid, issue.title, issue.description, issue.state, issue.author_username, issue.created_at, issue.updated_at, issue.last_seen_at, issue.web_url, payload_id, ], )?; Ok(changes) } fn get_local_issue_id(conn: &Connection, gitlab_id: i64) -> Result { Ok(conn.query_row( "SELECT id FROM issues WHERE gitlab_id = ?", [gitlab_id], |row| row.get(0), )?) } fn clear_issue_labels(conn: &Connection, issue_id: i64) -> Result<()> { conn.execute("DELETE FROM issue_labels WHERE issue_id = ?", [issue_id])?; Ok(()) } fn upsert_label( conn: &Connection, label: &crate::gitlab::transformers::issue::NormalizedLabel, ) -> Result { // CP1: Name-only labels. Color/description columns remain NULL. let changes = conn.execute( "INSERT INTO labels (project_id, name) VALUES (?1, ?2) ON CONFLICT(project_id, name) DO NOTHING", rusqlite::params![label.project_id, label.name], )?; Ok(changes > 0) } fn get_label_id(conn: &Connection, project_id: i64, name: &str) -> Result { Ok(conn.query_row( "SELECT id FROM labels WHERE project_id = ? AND name = ?", rusqlite::params![project_id, name], |row| row.get(0), )?) } fn link_issue_label(conn: &Connection, issue_id: i64, label_id: i64) -> Result<()> { conn.execute( "INSERT OR IGNORE INTO issue_labels (issue_id, label_id) VALUES (?, ?)", [issue_id, label_id], )?; Ok(()) } fn update_cursor( conn: &Connection, project_id: i64, resource_type: &str, updated_at: i64, gitlab_id: i64, ) -> Result<()> { conn.execute( "INSERT INTO sync_cursors (project_id, resource_type, updated_at_cursor, tie_breaker_id) VALUES (?1, ?2, ?3, ?4) ON CONFLICT(project_id, resource_type) DO UPDATE SET updated_at_cursor = excluded.updated_at_cursor, tie_breaker_id = excluded.tie_breaker_id", rusqlite::params![project_id, resource_type, updated_at, gitlab_id], )?; Ok(()) } ``` ### Discussion Ingestion ```rust // src/ingestion/discussions.rs use futures::StreamExt; use rusqlite::Connection; use tracing::debug; use crate::core::config::Config; use crate::core::error::Result; use crate::core::payloads::store_payload; use crate::gitlab::client::GitLabClient; use crate::gitlab::transformers::discussion::{transform_discussion, transform_notes}; /// Result of discussion ingestion for a single issue. #[derive(Debug, Default)] pub struct IngestDiscussionsResult { pub discussions_fetched: usize, pub discussions_upserted: usize, pub notes_upserted: usize, pub system_notes_count: usize, } /// Ingest discussions for a single issue. /// Called only when issue.updated_at > discussions_synced_for_updated_at. pub async fn ingest_issue_discussions( conn: &Connection, client: &GitLabClient, config: &Config, project_id: i64, gitlab_project_id: i64, issue_iid: i64, local_issue_id: i64, issue_updated_at: i64, ) -> Result { let mut result = IngestDiscussionsResult::default(); let mut stream = client.paginate_issue_discussions(gitlab_project_id, issue_iid); while let Some(discussion_result) = stream.next().await { let discussion = discussion_result?; result.discussions_fetched += 1; // Begin transaction for this discussion (atomicity + performance) let tx = conn.unchecked_transaction()?; // Store raw payload for discussion let discussion_payload_id = store_payload( &tx, project_id, "discussion", &discussion.id, &discussion, config.storage.compress_raw_payloads, )?; // Transform and upsert discussion let normalized = transform_discussion(&discussion, project_id, local_issue_id); upsert_discussion(&tx, &normalized, discussion_payload_id)?; result.discussions_upserted += 1; // Get local discussion ID let local_discussion_id = get_local_discussion_id(&tx, project_id, &discussion.id)?; // Transform and upsert notes let notes = transform_notes(&discussion, project_id); for note in ¬es { // Store raw payload for note let gitlab_note = discussion.notes.iter().find(|n| n.id == note.gitlab_id); let note_payload_id = if let Some(gn) = gitlab_note { store_payload( &tx, project_id, "note", ¬e.gitlab_id.to_string(), gn, config.storage.compress_raw_payloads, )? } else { None }; upsert_note(&tx, local_discussion_id, note, note_payload_id)?; result.notes_upserted += 1; if note.is_system { result.system_notes_count += 1; } } tx.commit()?; } // Mark discussions as synced for this issue version mark_discussions_synced(conn, local_issue_id, issue_updated_at)?; debug!( project_id, issue_iid, discussions = result.discussions_fetched, notes = result.notes_upserted, "Issue discussions ingested" ); Ok(result) } fn upsert_discussion( conn: &Connection, discussion: &crate::gitlab::transformers::discussion::NormalizedDiscussion, payload_id: Option, ) -> Result<()> { conn.execute( "INSERT INTO discussions ( gitlab_discussion_id, project_id, issue_id, noteable_type, individual_note, first_note_at, last_note_at, last_seen_at, resolvable, resolved, raw_payload_id ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11) ON CONFLICT(project_id, gitlab_discussion_id) DO UPDATE SET first_note_at = excluded.first_note_at, last_note_at = excluded.last_note_at, last_seen_at = excluded.last_seen_at, resolvable = excluded.resolvable, resolved = excluded.resolved, raw_payload_id = excluded.raw_payload_id", rusqlite::params![ discussion.gitlab_discussion_id, discussion.project_id, discussion.issue_id, discussion.noteable_type, discussion.individual_note as i32, discussion.first_note_at, discussion.last_note_at, discussion.last_seen_at, discussion.resolvable as i32, discussion.resolved as i32, payload_id, ], )?; Ok(()) } fn get_local_discussion_id(conn: &Connection, project_id: i64, gitlab_id: &str) -> Result { Ok(conn.query_row( "SELECT id FROM discussions WHERE project_id = ? AND gitlab_discussion_id = ?", rusqlite::params![project_id, gitlab_id], |row| row.get(0), )?) } fn upsert_note( conn: &Connection, discussion_id: i64, note: &crate::gitlab::transformers::discussion::NormalizedNote, payload_id: Option, ) -> Result<()> { conn.execute( "INSERT INTO notes ( gitlab_id, discussion_id, project_id, note_type, is_system, author_username, body, created_at, updated_at, last_seen_at, position, resolvable, resolved, resolved_by, resolved_at, raw_payload_id ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16) ON CONFLICT(gitlab_id) DO UPDATE SET body = excluded.body, updated_at = excluded.updated_at, last_seen_at = excluded.last_seen_at, resolved = excluded.resolved, resolved_by = excluded.resolved_by, resolved_at = excluded.resolved_at, raw_payload_id = excluded.raw_payload_id", rusqlite::params![ note.gitlab_id, discussion_id, note.project_id, note.note_type, note.is_system as i32, note.author_username, note.body, note.created_at, note.updated_at, note.last_seen_at, note.position, note.resolvable as i32, note.resolved as i32, note.resolved_by, note.resolved_at, payload_id, ], )?; Ok(()) } fn mark_discussions_synced(conn: &Connection, issue_id: i64, issue_updated_at: i64) -> Result<()> { conn.execute( "UPDATE issues SET discussions_synced_for_updated_at = ? WHERE id = ?", rusqlite::params![issue_updated_at, issue_id], )?; Ok(()) } ``` --- ## CLI Commands ### `gi ingest --type=issues` Fetch and store all issues from configured projects. **Clap Definition:** ```rust // src/cli/mod.rs (addition to Commands enum) #[derive(Subcommand)] pub enum Commands { // ... existing commands ... /// Ingest data from GitLab Ingest { /// Resource type to ingest #[arg(long, value_parser = ["issues", "merge_requests"])] r#type: String, /// Filter to single project #[arg(long)] project: Option, /// Override stale sync lock #[arg(long)] force: bool, }, /// List entities List { /// Entity type to list #[arg(value_parser = ["issues", "mrs"])] entity: String, /// Maximum results #[arg(long, default_value = "20")] limit: usize, /// Filter by project path #[arg(long)] project: Option, /// Filter by state #[arg(long, value_parser = ["opened", "closed", "all"])] state: Option, }, /// Count entities Count { /// Entity type to count #[arg(value_parser = ["issues", "mrs", "discussions", "notes"])] entity: String, /// Filter by noteable type #[arg(long, value_parser = ["issue", "mr"])] r#type: Option, }, /// Show entity details Show { /// Entity type #[arg(value_parser = ["issue", "mr"])] entity: String, /// Entity IID iid: i64, /// Project path (required if ambiguous) #[arg(long)] project: Option, }, } ``` **Output:** ``` Ingesting issues... group/project-one: 1,234 issues fetched, 45 new labels Fetching discussions (312 issues with updates)... group/project-one: 312 issues → 1,234 discussions, 5,678 notes Total: 1,234 issues, 1,234 discussions, 5,678 notes (excluding 1,234 system notes) Skipped discussion sync for 922 unchanged issues. ``` ### `gi list issues` **Output:** ``` Issues (showing 20 of 3,801) #1234 Authentication redesign opened @johndoe 3 days ago #1233 Fix memory leak in cache closed @janedoe 5 days ago #1232 Add dark mode support opened @bobsmith 1 week ago ... ``` ### `gi count issues` **Output:** ``` Issues: 3,801 ``` ### `gi show issue ` **Output:** ``` Issue #1234: Authentication redesign ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ Project: group/project-one State: opened Author: @johndoe Created: 2024-01-15 Updated: 2024-03-20 Labels: enhancement, auth URL: https://gitlab.example.com/group/project-one/-/issues/1234 Description: We need to redesign the authentication flow to support... Discussions (5): @janedoe (2024-01-16): I agree we should move to JWT-based auth... @johndoe (2024-01-16): What about refresh token strategy? @bobsmith (2024-01-17): Have we considered OAuth2? ``` --- ## Automated Tests ### Unit Tests ```rust // tests/issue_transformer_tests.rs #[cfg(test)] mod tests { use gi::gitlab::transformers::issue::*; use gi::gitlab::types::*; #[test] fn transforms_gitlab_issue_to_normalized_schema() { /* ... */ } #[test] fn extracts_labels_from_issue_payload() { /* ... */ } #[test] fn handles_missing_optional_fields_gracefully() { /* ... */ } #[test] fn converts_iso_timestamps_to_ms_epoch() { /* ... */ } #[test] fn sets_last_seen_at_to_current_time() { /* ... */ } } ``` ```rust // tests/discussion_transformer_tests.rs #[cfg(test)] mod tests { use gi::gitlab::transformers::discussion::*; #[test] fn transforms_discussion_payload_to_normalized_schema() { /* ... */ } #[test] fn extracts_notes_array_from_discussion() { /* ... */ } #[test] fn sets_individual_note_flag_correctly() { /* ... */ } #[test] fn flags_system_notes_with_is_system_true() { /* ... */ } #[test] fn preserves_note_order_via_position_field() { /* ... */ } #[test] fn computes_first_note_at_and_last_note_at_correctly() { /* ... */ } #[test] fn computes_resolvable_and_resolved_status() { /* ... */ } } ``` ```rust // tests/pagination_tests.rs #[cfg(test)] mod tests { #[tokio::test] async fn fetches_all_pages_when_multiple_exist() { /* ... */ } #[tokio::test] async fn respects_per_page_parameter() { /* ... */ } #[tokio::test] async fn follows_x_next_page_header_until_empty() { /* ... */ } #[tokio::test] async fn falls_back_to_empty_page_stop_if_headers_missing() { /* ... */ } #[tokio::test] async fn applies_cursor_rewind_for_tuple_semantics() { /* ... */ } #[tokio::test] async fn clamps_negative_rewind_to_zero() { /* ... */ } } ``` ```rust // tests/label_linkage_tests.rs #[cfg(test)] mod tests { #[test] fn clears_existing_labels_before_linking_new_set() { /* ... */ } #[test] fn removes_stale_label_links_on_issue_update() { /* ... */ } #[test] fn handles_issue_with_all_labels_removed() { /* ... */ } #[test] fn preserves_labels_that_still_exist() { /* ... */ } } ``` ```rust // tests/discussion_watermark_tests.rs #[cfg(test)] mod tests { #[tokio::test] async fn skips_discussion_fetch_when_updated_at_unchanged() { /* ... */ } #[tokio::test] async fn fetches_discussions_when_updated_at_advanced() { /* ... */ } #[tokio::test] async fn updates_watermark_after_successful_discussion_sync() { /* ... */ } #[tokio::test] async fn does_not_update_watermark_on_discussion_sync_failure() { /* ... */ } } ``` ### Integration Tests ```rust // tests/issue_ingestion_tests.rs #[cfg(test)] mod tests { use tempfile::TempDir; use wiremock::{MockServer, Mock, ResponseTemplate}; use wiremock::matchers::{method, path_regex}; #[tokio::test] async fn inserts_issues_into_database() { /* ... */ } #[tokio::test] async fn creates_labels_from_issue_payloads() { /* ... */ } #[tokio::test] async fn links_issues_to_labels_via_junction_table() { /* ... */ } #[tokio::test] async fn removes_stale_label_links_on_resync() { /* ... */ } #[tokio::test] async fn stores_raw_payload_for_each_issue() { /* ... */ } #[tokio::test] async fn stores_raw_payload_for_each_discussion() { /* ... */ } #[tokio::test] async fn updates_cursor_incrementally_per_page() { /* ... */ } #[tokio::test] async fn resumes_from_cursor_on_subsequent_runs() { /* ... */ } #[tokio::test] async fn handles_issues_with_no_labels() { /* ... */ } #[tokio::test] async fn upserts_existing_issues_on_refetch() { /* ... */ } #[tokio::test] async fn skips_discussion_refetch_for_unchanged_issues() { /* ... */ } } ``` --- ## Manual Smoke Tests | Command | Expected Output | Pass Criteria | |---------|-----------------|---------------| | `gi ingest --type=issues` | Progress bar, final count | Completes without error | | `gi list issues --limit=10` | Table of 10 issues | Shows iid, title, state, author | | `gi list issues --project=group/project-one` | Filtered list | Only shows issues from that project | | `gi count issues` | `Issues: N` | Count matches GitLab UI | | `gi show issue 123` | Issue detail view | Shows title, description, labels, discussions, URL | | `gi show issue 123` (ambiguous) | Prompt or error | Asks for `--project` clarification | | `gi count discussions --type=issue` | `Issue Discussions: N` | Non-zero count | | `gi count notes --type=issue` | `Issue Notes: N (excluding M system)` | Non-zero count | | `gi sync-status` | Last sync time, cursor positions | Shows successful last run | | `gi ingest --type=issues` (re-run) | `0 new issues` | Cursor prevents re-fetch | | `gi ingest --type=issues` (re-run) | `Skipped discussion sync for N unchanged issues` | Watermark prevents refetch | | `gi ingest --type=issues` (concurrent) | Lock error | Second run fails with clear message | | Remove label from issue in GitLab, re-sync | Label link removed | Junction table reflects GitLab state | --- ## Data Integrity Checks After successful ingestion, verify: - [ ] `SELECT COUNT(*) FROM issues` matches GitLab issue count for configured projects - [ ] Every issue has a corresponding `raw_payloads` row - [ ] Every discussion has a corresponding `raw_payloads` row - [ ] Labels in `issue_labels` junction all exist in `labels` table - [ ] `issue_labels` count per issue matches GitLab UI label count - [ ] `sync_cursors` has entry for each `(project_id, 'issues')` pair - [ ] Re-running `gi ingest --type=issues` fetches 0 new items (cursor is current) - [ ] Re-running skips discussion sync for unchanged issues (watermark works) - [ ] `SELECT COUNT(*) FROM discussions WHERE noteable_type='Issue'` is non-zero - [ ] Every discussion has at least one note - [ ] `individual_note = 1` discussions have exactly one note - [ ] `SELECT COUNT(*) FROM notes WHERE is_system = 1` matches system note count in CLI output - [ ] After removing a label in GitLab and re-syncing, the link is removed from `issue_labels` --- ## Definition of Done ### Gate A: Issues Only (Must Pass First) - [ ] `gi ingest --type=issues` fetches all issues from configured projects - [ ] Issues stored with correct schema, including `last_seen_at` - [ ] Cursor-based sync is resumable (re-run fetches only new/updated) - [ ] Incremental cursor updates every 100 issues - [ ] Raw payloads stored for each issue - [ ] `gi list issues` and `gi count issues` work ### Gate B: Labels Correct (Must Pass) - [ ] Labels extracted and stored (name-only) - [ ] Label links created correctly - [ ] **Stale label links removed on re-sync** (verified with test) - [ ] Label count per issue matches GitLab ### Gate C: Dependent Discussion Sync (Must Pass) - [ ] Discussions fetched for issues with `updated_at` advancement - [ ] Notes stored with `is_system` flag correctly set - [ ] Raw payloads stored for discussions and notes - [ ] `discussions_synced_for_updated_at` watermark updated after sync - [ ] **Unchanged issues skip discussion refetch** (verified with test) - [ ] Bounded concurrency (`dependent_concurrency` respected) ### Gate D: Resumability Proof (Must Pass) - [ ] Kill mid-run, rerun; bounded redo (cursor progress preserved) - [ ] No redundant discussion refetch after crash recovery - [ ] Single-flight lock prevents concurrent runs ### Final Gate (Must Pass) - [ ] All unit tests pass (`cargo test`) - [ ] All integration tests pass (mocked with wiremock) - [ ] `cargo clippy` passes with no warnings - [ ] `cargo fmt --check` passes - [ ] Compiles with `--release` ### Hardening (Optional Before CP2) - [ ] Edge cases: issues with 0 labels, 0 discussions - [ ] Large pagination (100+ pages) - [ ] Rate limit handling under sustained load - [ ] Live tests pass against real GitLab instance - [ ] Performance: 1000+ issues ingested in <5 min --- ## Implementation Order 1. **Runtime decision** (5 min) - Confirm `#[tokio::main(flavor = "current_thread")]` - Add note about upgrade path for CP2 if needed 2. **Cargo.toml updates** (5 min) - Add `async-stream = "0.3"` and `futures = "0.3"` 3. **Database migration** (15 min) - `migrations/002_issues.sql` with `discussions_synced_for_updated_at` column - `raw_payload_id` on discussions table - Update `MIGRATIONS` const in `src/core/db.rs` 4. **GitLab types** (15 min) - Add types to `src/gitlab/types.rs` (no `labels_details`) - Test deserialization with fixtures 5. **Transformers** (25 min) - `src/gitlab/transformers/mod.rs` - `src/gitlab/transformers/issue.rs` (simplified NormalizedLabel) - `src/gitlab/transformers/discussion.rs` - Unit tests 6. **GitLab client pagination** (25 min) - Add `paginate_issues()` with underflow protection - Add `paginate_issue_discussions()` - Add `request_with_headers()` helper 7. **Issue ingestion** (45 min) - `src/ingestion/mod.rs` - `src/ingestion/issues.rs` with: - Transaction batching - `clear_issue_labels()` before linking - Incremental cursor updates - Return `issues_needing_discussion_sync` - Unit + integration tests including label stale-link removal 8. **Discussion ingestion** (30 min) - `src/ingestion/discussions.rs` with: - Transaction batching - `raw_payload_id` storage - `mark_discussions_synced()` watermark update - Integration tests including watermark behavior 9. **Orchestrator** (30 min) - `src/ingestion/orchestrator.rs` - Coordinates issue sync → filter for discussion needs → bounded discussion sync - Integration tests 10. **CLI commands** (45 min) - `gi ingest --type=issues` - `gi list issues` - `gi count issues|discussions|notes` - `gi show issue ` - Enhanced `gi sync-status` 11. **Final validation** (20 min) - `cargo test` - `cargo clippy` - Gate A/B/C/D verification - Manual smoke tests - Data integrity checks --- ## Risks & Mitigations | Risk | Mitigation | |------|------------| | GitLab rate limiting during large sync | Respect `Retry-After`, exponential backoff, configurable concurrency | | Discussion API N+1 problem (thousands of calls) | `dependent_concurrency` config limits parallel requests; watermark prevents refetch | | Cursor drift if GitLab timestamp behavior changes | Rolling backfill window catches missed items | | Large issues with 100+ discussions | Paginate discussions, bound memory usage | | System notes pollute data | `is_system` flag allows filtering | | Label deduplication across projects | Unique constraint on `(project_id, name)` | | Stale label links accumulate | `clear_issue_labels()` before linking ensures correctness | | Async stream complexity | Use `async-stream` crate for ergonomic generators | | rusqlite + async runtime Send/locking pitfalls | Single-threaded runtime (`current_thread`) avoids Send bounds | | Crash causes massive refetch | Incremental cursor updates every 100 issues | | Cursor rewind causes discussion refetch | Per-issue watermark (`discussions_synced_for_updated_at`) | | Timestamp underflow on rewind | Clamp to 0 with `.max(0)` | --- ## API Call Estimation For a project with 3,000 issues: - Issue list: `ceil(3000/100) = 30` calls - Issue discussions (first run): `3000 × 1.2 average pages = 3,600` calls - Issue discussions (subsequent runs, 10% updates): `300 × 1.2 = 360` calls - **Total first run: ~3,630 calls per project** - **Total subsequent run: ~390 calls per project** (90% savings from watermark) At 10 requests/second: - First run: ~6 minutes per project - Subsequent run: ~40 seconds per project --- ## References - [SPEC.md](../../SPEC.md) - Full system specification - [checkpoint-0.md](checkpoint-0.md) - Project setup PRD - [GitLab Issues API](https://docs.gitlab.com/ee/api/issues.html) - [GitLab Discussions API](https://docs.gitlab.com/ee/api/discussions.html) - [async-stream crate](https://docs.rs/async-stream) - Async generators for Rust - [wiremock](https://docs.rs/wiremock) - HTTP mocking for tests