diff --git a/src/documents/extractor.rs b/src/documents/extractor.rs new file mode 100644 index 0000000..8b3b04e --- /dev/null +++ b/src/documents/extractor.rs @@ -0,0 +1,1085 @@ +use chrono::DateTime; +use rusqlite::Connection; +use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; +use std::collections::BTreeSet; + +use crate::core::error::Result; +use super::truncation::{ + truncate_discussion, truncate_hard_cap, NoteContent, MAX_DISCUSSION_BYTES, +}; + +/// Source type for documents. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum SourceType { + Issue, + MergeRequest, + Discussion, +} + +impl SourceType { + pub fn as_str(&self) -> &'static str { + match self { + Self::Issue => "issue", + Self::MergeRequest => "merge_request", + Self::Discussion => "discussion", + } + } + + /// Parse from CLI input, accepting common aliases. + /// + /// Accepts: "issue", "issues", "mr", "mrs", "merge_request", "merge_requests", + /// "discussion", "discussions" + pub fn parse(s: &str) -> Option { + match s.to_lowercase().as_str() { + "issue" | "issues" => Some(Self::Issue), + "mr" | "mrs" | "merge_request" | "merge_requests" => Some(Self::MergeRequest), + "discussion" | "discussions" => Some(Self::Discussion), + _ => None, + } + } +} + +impl std::fmt::Display for SourceType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.as_str()) + } +} + +/// Generated document ready for storage. +#[derive(Debug, Clone)] +pub struct DocumentData { + pub source_type: SourceType, + pub source_id: i64, + pub project_id: i64, + pub author_username: Option, + pub labels: Vec, + pub paths: Vec, + pub labels_hash: String, + pub paths_hash: String, + pub created_at: i64, + pub updated_at: i64, + pub url: Option, + pub title: Option, + pub content_text: String, + pub content_hash: String, + pub is_truncated: bool, + pub truncated_reason: Option, +} + +/// Compute SHA-256 hash of content. +pub fn compute_content_hash(content: &str) -> String { + let mut hasher = Sha256::new(); + hasher.update(content.as_bytes()); + format!("{:x}", hasher.finalize()) +} + +/// Compute SHA-256 hash over a sorted list of strings. +/// Used for labels_hash and paths_hash to detect changes efficiently. +pub fn compute_list_hash(items: &[String]) -> String { + let mut sorted = items.to_vec(); + sorted.sort(); + let joined = sorted.join("\n"); + compute_content_hash(&joined) +} + +/// Extract a searchable document from an issue. +/// Returns None if the issue has been deleted from the DB. +pub fn extract_issue_document(conn: &Connection, issue_id: i64) -> Result> { + // Query main issue entity with project info + let row = conn.query_row( + "SELECT i.id, i.iid, i.title, i.description, i.state, i.author_username, + i.created_at, i.updated_at, i.web_url, + p.path_with_namespace, p.id AS project_id + FROM issues i + JOIN projects p ON p.id = i.project_id + WHERE i.id = ?1", + rusqlite::params![issue_id], + |row| { + Ok(( + row.get::<_, i64>(0)?, // id + row.get::<_, i64>(1)?, // iid + row.get::<_, Option>(2)?, // title + row.get::<_, Option>(3)?, // description + row.get::<_, String>(4)?, // state + row.get::<_, Option>(5)?, // author_username + row.get::<_, i64>(6)?, // created_at + row.get::<_, i64>(7)?, // updated_at + row.get::<_, Option>(8)?, // web_url + row.get::<_, String>(9)?, // path_with_namespace + row.get::<_, i64>(10)?, // project_id + )) + }, + ); + + let (id, iid, title, description, state, author_username, created_at, updated_at, web_url, path_with_namespace, project_id) = match row { + Ok(r) => r, + Err(rusqlite::Error::QueryReturnedNoRows) => return Ok(None), + Err(e) => return Err(e.into()), + }; + + // Query labels via junction table + let mut label_stmt = conn.prepare( + "SELECT l.name FROM issue_labels il + JOIN labels l ON l.id = il.label_id + WHERE il.issue_id = ?1 + ORDER BY l.name" + )?; + let labels: Vec = label_stmt + .query_map(rusqlite::params![id], |row| row.get(0))? + .collect::, _>>()?; + + // Build labels JSON array string + let labels_json = serde_json::to_string(&labels) + .unwrap_or_else(|_| "[]".to_string()); + + // Format content_text per PRD template + let display_title = title.as_deref().unwrap_or("(untitled)"); + let mut content = format!( + "[[Issue]] #{}: {}\nProject: {}\n", + iid, display_title, path_with_namespace + ); + if let Some(ref url) = web_url { + content.push_str(&format!("URL: {}\n", url)); + } + content.push_str(&format!("Labels: {}\n", labels_json)); + content.push_str(&format!("State: {}\n", state)); + if let Some(ref author) = author_username { + content.push_str(&format!("Author: @{}\n", author)); + } + + // Add description section only if description is Some + if let Some(ref desc) = description { + content.push_str("\n--- Description ---\n\n"); + content.push_str(desc); + } + + let content_hash = compute_content_hash(&content); + let labels_hash = compute_list_hash(&labels); + let paths_hash = compute_list_hash(&[]); // Issues have no paths + + // Apply hard cap truncation for safety + let hard_cap = truncate_hard_cap(&content); + + Ok(Some(DocumentData { + source_type: SourceType::Issue, + source_id: id, + project_id, + author_username, + labels, + paths: Vec::new(), + labels_hash, + paths_hash, + created_at, + updated_at, + url: web_url, + title: Some(display_title.to_string()), + content_text: hard_cap.content, + content_hash, + is_truncated: hard_cap.is_truncated, + truncated_reason: hard_cap.reason.map(|r| r.as_str().to_string()), + })) +} + +/// Extract a searchable document from a merge request. +/// Returns None if the MR has been deleted from the DB. +pub fn extract_mr_document(conn: &Connection, mr_id: i64) -> Result> { + let row = conn.query_row( + "SELECT m.id, m.iid, m.title, m.description, m.state, m.author_username, + m.source_branch, m.target_branch, + m.created_at, m.updated_at, m.web_url, + p.path_with_namespace, p.id AS project_id + FROM merge_requests m + JOIN projects p ON p.id = m.project_id + WHERE m.id = ?1", + rusqlite::params![mr_id], + |row| { + Ok(( + row.get::<_, i64>(0)?, // id + row.get::<_, i64>(1)?, // iid + row.get::<_, Option>(2)?, // title + row.get::<_, Option>(3)?, // description + row.get::<_, Option>(4)?, // state + row.get::<_, Option>(5)?, // author_username + row.get::<_, Option>(6)?, // source_branch + row.get::<_, Option>(7)?, // target_branch + row.get::<_, Option>(8)?, // created_at (nullable in schema) + row.get::<_, Option>(9)?, // updated_at (nullable in schema) + row.get::<_, Option>(10)?, // web_url + row.get::<_, String>(11)?, // path_with_namespace + row.get::<_, i64>(12)?, // project_id + )) + }, + ); + + let (id, iid, title, description, state, author_username, source_branch, target_branch, created_at, updated_at, web_url, path_with_namespace, project_id) = match row { + Ok(r) => r, + Err(rusqlite::Error::QueryReturnedNoRows) => return Ok(None), + Err(e) => return Err(e.into()), + }; + + // Query labels via junction table + let mut label_stmt = conn.prepare( + "SELECT l.name FROM mr_labels ml + JOIN labels l ON l.id = ml.label_id + WHERE ml.merge_request_id = ?1 + ORDER BY l.name" + )?; + let labels: Vec = label_stmt + .query_map(rusqlite::params![id], |row| row.get(0))? + .collect::, _>>()?; + + let labels_json = serde_json::to_string(&labels) + .unwrap_or_else(|_| "[]".to_string()); + + let display_title = title.as_deref().unwrap_or("(untitled)"); + let display_state = state.as_deref().unwrap_or("unknown"); + let mut content = format!( + "[[MergeRequest]] !{}: {}\nProject: {}\n", + iid, display_title, path_with_namespace + ); + if let Some(ref url) = web_url { + content.push_str(&format!("URL: {}\n", url)); + } + content.push_str(&format!("Labels: {}\n", labels_json)); + content.push_str(&format!("State: {}\n", display_state)); + if let Some(ref author) = author_username { + content.push_str(&format!("Author: @{}\n", author)); + } + // Source line: source_branch -> target_branch + if let (Some(src), Some(tgt)) = (&source_branch, &target_branch) { + content.push_str(&format!("Source: {} -> {}\n", src, tgt)); + } + + if let Some(ref desc) = description { + content.push_str("\n--- Description ---\n\n"); + content.push_str(desc); + } + + let content_hash = compute_content_hash(&content); + let labels_hash = compute_list_hash(&labels); + let paths_hash = compute_list_hash(&[]); + + // Apply hard cap truncation for safety + let hard_cap = truncate_hard_cap(&content); + + Ok(Some(DocumentData { + source_type: SourceType::MergeRequest, + source_id: id, + project_id, + author_username, + labels, + paths: Vec::new(), + labels_hash, + paths_hash, + created_at: created_at.unwrap_or(0), + updated_at: updated_at.unwrap_or(0), + url: web_url, + title: Some(display_title.to_string()), + content_text: hard_cap.content, + content_hash, + is_truncated: hard_cap.is_truncated, + truncated_reason: hard_cap.reason.map(|r| r.as_str().to_string()), + })) +} + +/// Format ms epoch as YYYY-MM-DD date string. +fn format_date(ms: i64) -> String { + DateTime::from_timestamp_millis(ms) + .map(|dt| dt.format("%Y-%m-%d").to_string()) + .unwrap_or_else(|| "unknown".to_string()) +} + +/// Extract a searchable document from a discussion thread. +/// Returns None if the discussion or its parent has been deleted. +pub fn extract_discussion_document( + conn: &Connection, + discussion_id: i64, +) -> Result> { + // Query discussion metadata + let disc_row = conn.query_row( + "SELECT d.id, d.noteable_type, d.issue_id, d.merge_request_id, + p.path_with_namespace, p.id AS project_id + FROM discussions d + JOIN projects p ON p.id = d.project_id + WHERE d.id = ?1", + rusqlite::params![discussion_id], + |row| { + Ok(( + row.get::<_, i64>(0)?, // id + row.get::<_, String>(1)?, // noteable_type + row.get::<_, Option>(2)?, // issue_id + row.get::<_, Option>(3)?, // merge_request_id + row.get::<_, String>(4)?, // path_with_namespace + row.get::<_, i64>(5)?, // project_id + )) + }, + ); + + let (id, noteable_type, issue_id, merge_request_id, path_with_namespace, project_id) = + match disc_row { + Ok(r) => r, + Err(rusqlite::Error::QueryReturnedNoRows) => return Ok(None), + Err(e) => return Err(e.into()), + }; + + // Query parent entity + let (_parent_iid, parent_title, parent_web_url, parent_type_prefix, labels) = + match noteable_type.as_str() { + "Issue" => { + let parent_id = match issue_id { + Some(pid) => pid, + None => return Ok(None), + }; + let parent = conn.query_row( + "SELECT i.iid, i.title, i.web_url FROM issues i WHERE i.id = ?1", + rusqlite::params![parent_id], + |row| { + Ok(( + row.get::<_, i64>(0)?, + row.get::<_, Option>(1)?, + row.get::<_, Option>(2)?, + )) + }, + ); + let (iid, title, web_url) = match parent { + Ok(r) => r, + Err(rusqlite::Error::QueryReturnedNoRows) => return Ok(None), + Err(e) => return Err(e.into()), + }; + // Query parent labels + let mut label_stmt = conn.prepare( + "SELECT l.name FROM issue_labels il + JOIN labels l ON l.id = il.label_id + WHERE il.issue_id = ?1 + ORDER BY l.name", + )?; + let labels: Vec = label_stmt + .query_map(rusqlite::params![parent_id], |row| row.get(0))? + .collect::, _>>()?; + + ( + iid, + title, + web_url, + format!("Issue #{}", iid), + labels, + ) + } + "MergeRequest" => { + let parent_id = match merge_request_id { + Some(pid) => pid, + None => return Ok(None), + }; + let parent = conn.query_row( + "SELECT m.iid, m.title, m.web_url FROM merge_requests m WHERE m.id = ?1", + rusqlite::params![parent_id], + |row| { + Ok(( + row.get::<_, i64>(0)?, + row.get::<_, Option>(1)?, + row.get::<_, Option>(2)?, + )) + }, + ); + let (iid, title, web_url) = match parent { + Ok(r) => r, + Err(rusqlite::Error::QueryReturnedNoRows) => return Ok(None), + Err(e) => return Err(e.into()), + }; + // Query parent labels + let mut label_stmt = conn.prepare( + "SELECT l.name FROM mr_labels ml + JOIN labels l ON l.id = ml.label_id + WHERE ml.merge_request_id = ?1 + ORDER BY l.name", + )?; + let labels: Vec = label_stmt + .query_map(rusqlite::params![parent_id], |row| row.get(0))? + .collect::, _>>()?; + + ( + iid, + title, + web_url, + format!("MR !{}", iid), + labels, + ) + } + _ => return Ok(None), + }; + + // Query non-system notes in thread order + let mut note_stmt = conn.prepare( + "SELECT n.author_username, n.body, n.created_at, n.gitlab_id, + n.note_type, n.position_old_path, n.position_new_path + FROM notes n + WHERE n.discussion_id = ?1 AND n.is_system = 0 + ORDER BY n.created_at ASC, n.id ASC", + )?; + + struct NoteRow { + author: Option, + body: Option, + created_at: i64, + gitlab_id: i64, + old_path: Option, + new_path: Option, + } + + let notes: Vec = note_stmt + .query_map(rusqlite::params![id], |row| { + Ok(NoteRow { + author: row.get(0)?, + body: row.get(1)?, + created_at: row.get(2)?, + gitlab_id: row.get(3)?, + // index 4 is note_type (unused here) + old_path: row.get(5)?, + new_path: row.get(6)?, + }) + })? + .collect::, _>>()?; + + if notes.is_empty() { + return Ok(None); + } + + // Extract DiffNote paths (deduplicated, sorted) + let mut path_set = BTreeSet::new(); + for note in ¬es { + if let Some(ref p) = note.old_path { + if !p.is_empty() { + path_set.insert(p.clone()); + } + } + if let Some(ref p) = note.new_path { + if !p.is_empty() { + path_set.insert(p.clone()); + } + } + } + let paths: Vec = path_set.into_iter().collect(); + + // Construct URL: parent_web_url#note_{first_note_gitlab_id} + let first_note_gitlab_id = notes[0].gitlab_id; + let url = parent_web_url + .as_ref() + .map(|wu| format!("{}#note_{}", wu, first_note_gitlab_id)); + + // First non-system note author + let author_username = notes[0].author.clone(); + + // Build content + let display_title = parent_title.as_deref().unwrap_or("(untitled)"); + let labels_json = serde_json::to_string(&labels).unwrap_or_else(|_| "[]".to_string()); + let paths_json = serde_json::to_string(&paths).unwrap_or_else(|_| "[]".to_string()); + + let mut content = format!( + "[[Discussion]] {}: {}\nProject: {}\n", + parent_type_prefix, display_title, path_with_namespace + ); + if let Some(ref u) = url { + content.push_str(&format!("URL: {}\n", u)); + } + content.push_str(&format!("Labels: {}\n", labels_json)); + if !paths.is_empty() { + content.push_str(&format!("Files: {}\n", paths_json)); + } + + // Build NoteContent list for truncation-aware thread rendering + let note_contents: Vec = notes + .iter() + .map(|note| NoteContent { + author: note.author.as_deref().unwrap_or("unknown").to_string(), + date: format_date(note.created_at), + body: note.body.as_deref().unwrap_or("").to_string(), + }) + .collect(); + + // Estimate header size to reserve budget for thread content + let header_len = content.len() + "\n--- Thread ---\n\n".len(); + let thread_budget = MAX_DISCUSSION_BYTES.saturating_sub(header_len); + + let thread_result = truncate_discussion(¬e_contents, thread_budget); + content.push_str("\n--- Thread ---\n\n"); + content.push_str(&thread_result.content); + + // Use first note's created_at and last note's created_at for timestamps + let created_at = notes[0].created_at; + let updated_at = notes.last().map(|n| n.created_at).unwrap_or(created_at); + + let content_hash = compute_content_hash(&content); + let labels_hash = compute_list_hash(&labels); + let paths_hash = compute_list_hash(&paths); + + Ok(Some(DocumentData { + source_type: SourceType::Discussion, + source_id: id, + project_id, + author_username, + labels, + paths, + labels_hash, + paths_hash, + created_at, + updated_at, + url, + title: None, // Discussions don't have their own title + content_text: content, + content_hash, + is_truncated: thread_result.is_truncated, + truncated_reason: thread_result.reason.map(|r| r.as_str().to_string()), + })) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_source_type_parse_aliases() { + assert_eq!(SourceType::parse("issue"), Some(SourceType::Issue)); + assert_eq!(SourceType::parse("issues"), Some(SourceType::Issue)); + assert_eq!(SourceType::parse("mr"), Some(SourceType::MergeRequest)); + assert_eq!(SourceType::parse("mrs"), Some(SourceType::MergeRequest)); + assert_eq!( + SourceType::parse("merge_request"), + Some(SourceType::MergeRequest) + ); + assert_eq!( + SourceType::parse("merge_requests"), + Some(SourceType::MergeRequest) + ); + assert_eq!( + SourceType::parse("discussion"), + Some(SourceType::Discussion) + ); + assert_eq!( + SourceType::parse("discussions"), + Some(SourceType::Discussion) + ); + assert_eq!(SourceType::parse("invalid"), None); + assert_eq!(SourceType::parse("ISSUE"), Some(SourceType::Issue)); // case insensitive + } + + #[test] + fn test_source_type_as_str() { + assert_eq!(SourceType::Issue.as_str(), "issue"); + assert_eq!(SourceType::MergeRequest.as_str(), "merge_request"); + assert_eq!(SourceType::Discussion.as_str(), "discussion"); + } + + #[test] + fn test_source_type_display() { + assert_eq!(format!("{}", SourceType::Issue), "issue"); + assert_eq!(format!("{}", SourceType::MergeRequest), "merge_request"); + assert_eq!(format!("{}", SourceType::Discussion), "discussion"); + } + + #[test] + fn test_content_hash_deterministic() { + let hash1 = compute_content_hash("hello"); + let hash2 = compute_content_hash("hello"); + assert_eq!(hash1, hash2); + assert!(!hash1.is_empty()); + // SHA-256 of "hello" is known + assert_eq!(hash1.len(), 64); // 256 bits = 64 hex chars + } + + #[test] + fn test_content_hash_different_inputs() { + let hash1 = compute_content_hash("hello"); + let hash2 = compute_content_hash("world"); + assert_ne!(hash1, hash2); + } + + #[test] + fn test_content_hash_empty() { + let hash = compute_content_hash(""); + assert_eq!(hash.len(), 64); + } + + #[test] + fn test_list_hash_order_independent() { + let hash1 = compute_list_hash(&["b".to_string(), "a".to_string()]); + let hash2 = compute_list_hash(&["a".to_string(), "b".to_string()]); + assert_eq!(hash1, hash2); + } + + #[test] + fn test_list_hash_empty() { + let hash = compute_list_hash(&[]); + assert_eq!(hash.len(), 64); + // Empty list hashes consistently + let hash2 = compute_list_hash(&[]); + assert_eq!(hash, hash2); + } + + // Helper to create an in-memory DB with the required tables for extraction tests + fn setup_test_db() -> Connection { + let conn = Connection::open_in_memory().unwrap(); + conn.execute_batch(" + CREATE TABLE projects ( + id INTEGER PRIMARY KEY, + gitlab_project_id INTEGER UNIQUE NOT NULL, + path_with_namespace TEXT NOT NULL, + default_branch TEXT, + web_url TEXT, + created_at INTEGER, + updated_at INTEGER, + raw_payload_id INTEGER + ); + CREATE TABLE issues ( + id INTEGER PRIMARY KEY, + gitlab_id INTEGER UNIQUE NOT NULL, + project_id INTEGER NOT NULL REFERENCES projects(id), + iid INTEGER NOT NULL, + title TEXT, + description TEXT, + state TEXT NOT NULL, + author_username TEXT, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + last_seen_at INTEGER NOT NULL, + discussions_synced_for_updated_at INTEGER, + web_url TEXT, + raw_payload_id INTEGER + ); + CREATE TABLE labels ( + id INTEGER PRIMARY KEY, + gitlab_id INTEGER, + project_id INTEGER NOT NULL REFERENCES projects(id), + name TEXT NOT NULL, + color TEXT, + description TEXT + ); + CREATE TABLE issue_labels ( + issue_id INTEGER NOT NULL REFERENCES issues(id), + label_id INTEGER NOT NULL REFERENCES labels(id), + PRIMARY KEY(issue_id, label_id) + ); + ").unwrap(); + + // Insert a test project + conn.execute( + "INSERT INTO projects (id, gitlab_project_id, path_with_namespace, web_url) VALUES (1, 100, 'group/project-one', 'https://gitlab.example.com/group/project-one')", + [], + ).unwrap(); + + conn + } + + fn insert_issue(conn: &Connection, id: i64, iid: i64, title: Option<&str>, description: Option<&str>, state: &str, author: Option<&str>, web_url: Option<&str>) { + conn.execute( + "INSERT INTO issues (id, gitlab_id, project_id, iid, title, description, state, author_username, created_at, updated_at, last_seen_at, web_url) VALUES (?1, ?2, 1, ?3, ?4, ?5, ?6, ?7, 1000, 2000, 3000, ?8)", + rusqlite::params![id, id * 10, iid, title, description, state, author, web_url], + ).unwrap(); + } + + fn insert_label(conn: &Connection, id: i64, name: &str) { + conn.execute( + "INSERT INTO labels (id, project_id, name) VALUES (?1, 1, ?2)", + rusqlite::params![id, name], + ).unwrap(); + } + + fn link_issue_label(conn: &Connection, issue_id: i64, label_id: i64) { + conn.execute( + "INSERT INTO issue_labels (issue_id, label_id) VALUES (?1, ?2)", + rusqlite::params![issue_id, label_id], + ).unwrap(); + } + + #[test] + fn test_issue_document_format() { + let conn = setup_test_db(); + insert_issue(&conn, 1, 234, Some("Authentication redesign"), Some("We need to modernize our authentication system..."), "opened", Some("johndoe"), Some("https://gitlab.example.com/group/project-one/-/issues/234")); + insert_label(&conn, 1, "auth"); + insert_label(&conn, 2, "bug"); + link_issue_label(&conn, 1, 1); + link_issue_label(&conn, 1, 2); + + let doc = extract_issue_document(&conn, 1).unwrap().unwrap(); + assert_eq!(doc.source_type, SourceType::Issue); + assert_eq!(doc.source_id, 1); + assert_eq!(doc.project_id, 1); + assert_eq!(doc.author_username, Some("johndoe".to_string())); + assert!(doc.content_text.starts_with("[[Issue]] #234: Authentication redesign\n")); + assert!(doc.content_text.contains("Project: group/project-one\n")); + assert!(doc.content_text.contains("URL: https://gitlab.example.com/group/project-one/-/issues/234\n")); + assert!(doc.content_text.contains("Labels: [\"auth\",\"bug\"]\n")); + assert!(doc.content_text.contains("State: opened\n")); + assert!(doc.content_text.contains("Author: @johndoe\n")); + assert!(doc.content_text.contains("--- Description ---\n\nWe need to modernize our authentication system...")); + assert!(!doc.is_truncated); + assert!(doc.paths.is_empty()); + } + + #[test] + fn test_issue_not_found() { + let conn = setup_test_db(); + let result = extract_issue_document(&conn, 999).unwrap(); + assert!(result.is_none()); + } + + #[test] + fn test_issue_no_description() { + let conn = setup_test_db(); + insert_issue(&conn, 1, 10, Some("Quick fix"), None, "opened", Some("alice"), None); + + let doc = extract_issue_document(&conn, 1).unwrap().unwrap(); + assert!(!doc.content_text.contains("--- Description ---")); + assert!(doc.content_text.contains("[[Issue]] #10: Quick fix\n")); + } + + #[test] + fn test_issue_labels_sorted() { + let conn = setup_test_db(); + insert_issue(&conn, 1, 10, Some("Test"), Some("Body"), "opened", Some("bob"), None); + insert_label(&conn, 1, "zeta"); + insert_label(&conn, 2, "alpha"); + insert_label(&conn, 3, "middle"); + link_issue_label(&conn, 1, 1); + link_issue_label(&conn, 1, 2); + link_issue_label(&conn, 1, 3); + + let doc = extract_issue_document(&conn, 1).unwrap().unwrap(); + assert_eq!(doc.labels, vec!["alpha", "middle", "zeta"]); + assert!(doc.content_text.contains("Labels: [\"alpha\",\"middle\",\"zeta\"]")); + } + + #[test] + fn test_issue_no_labels() { + let conn = setup_test_db(); + insert_issue(&conn, 1, 10, Some("Test"), Some("Body"), "opened", None, None); + + let doc = extract_issue_document(&conn, 1).unwrap().unwrap(); + assert!(doc.labels.is_empty()); + assert!(doc.content_text.contains("Labels: []\n")); + } + + #[test] + fn test_issue_hash_deterministic() { + let conn = setup_test_db(); + insert_issue(&conn, 1, 10, Some("Test"), Some("Body"), "opened", Some("alice"), None); + + let doc1 = extract_issue_document(&conn, 1).unwrap().unwrap(); + let doc2 = extract_issue_document(&conn, 1).unwrap().unwrap(); + assert_eq!(doc1.content_hash, doc2.content_hash); + assert_eq!(doc1.labels_hash, doc2.labels_hash); + assert_eq!(doc1.content_hash.len(), 64); + } + + #[test] + fn test_issue_empty_description() { + let conn = setup_test_db(); + insert_issue(&conn, 1, 10, Some("Test"), Some(""), "opened", None, None); + + let doc = extract_issue_document(&conn, 1).unwrap().unwrap(); + // Empty string description still includes the section header + assert!(doc.content_text.contains("--- Description ---\n\n")); + } + + // --- MR extraction tests --- + + fn setup_mr_test_db() -> Connection { + let conn = setup_test_db(); + conn.execute_batch(" + CREATE TABLE merge_requests ( + id INTEGER PRIMARY KEY, + gitlab_id INTEGER UNIQUE NOT NULL, + project_id INTEGER NOT NULL REFERENCES projects(id), + iid INTEGER NOT NULL, + title TEXT, + description TEXT, + state TEXT, + draft INTEGER NOT NULL DEFAULT 0, + author_username TEXT, + source_branch TEXT, + target_branch TEXT, + head_sha TEXT, + references_short TEXT, + references_full TEXT, + detailed_merge_status TEXT, + merge_user_username TEXT, + created_at INTEGER, + updated_at INTEGER, + merged_at INTEGER, + closed_at INTEGER, + last_seen_at INTEGER NOT NULL, + discussions_synced_for_updated_at INTEGER, + discussions_sync_last_attempt_at INTEGER, + discussions_sync_attempts INTEGER DEFAULT 0, + discussions_sync_last_error TEXT, + web_url TEXT, + raw_payload_id INTEGER + ); + CREATE TABLE mr_labels ( + merge_request_id INTEGER REFERENCES merge_requests(id), + label_id INTEGER REFERENCES labels(id), + PRIMARY KEY(merge_request_id, label_id) + ); + ").unwrap(); + conn + } + + fn insert_mr(conn: &Connection, id: i64, iid: i64, title: Option<&str>, description: Option<&str>, state: Option<&str>, author: Option<&str>, source_branch: Option<&str>, target_branch: Option<&str>, web_url: Option<&str>) { + conn.execute( + "INSERT INTO merge_requests (id, gitlab_id, project_id, iid, title, description, state, author_username, source_branch, target_branch, created_at, updated_at, last_seen_at, web_url) VALUES (?1, ?2, 1, ?3, ?4, ?5, ?6, ?7, ?8, ?9, 1000, 2000, 3000, ?10)", + rusqlite::params![id, id * 10, iid, title, description, state, author, source_branch, target_branch, web_url], + ).unwrap(); + } + + fn link_mr_label(conn: &Connection, mr_id: i64, label_id: i64) { + conn.execute( + "INSERT INTO mr_labels (merge_request_id, label_id) VALUES (?1, ?2)", + rusqlite::params![mr_id, label_id], + ).unwrap(); + } + + #[test] + fn test_mr_document_format() { + let conn = setup_mr_test_db(); + insert_mr(&conn, 1, 456, Some("Implement JWT authentication"), Some("This MR implements JWT-based authentication..."), Some("opened"), Some("johndoe"), Some("feature/jwt-auth"), Some("main"), Some("https://gitlab.example.com/group/project-one/-/merge_requests/456")); + insert_label(&conn, 1, "auth"); + insert_label(&conn, 2, "feature"); + link_mr_label(&conn, 1, 1); + link_mr_label(&conn, 1, 2); + + let doc = extract_mr_document(&conn, 1).unwrap().unwrap(); + assert_eq!(doc.source_type, SourceType::MergeRequest); + assert_eq!(doc.source_id, 1); + assert!(doc.content_text.starts_with("[[MergeRequest]] !456: Implement JWT authentication\n")); + assert!(doc.content_text.contains("Project: group/project-one\n")); + assert!(doc.content_text.contains("Labels: [\"auth\",\"feature\"]\n")); + assert!(doc.content_text.contains("State: opened\n")); + assert!(doc.content_text.contains("Author: @johndoe\n")); + assert!(doc.content_text.contains("Source: feature/jwt-auth -> main\n")); + assert!(doc.content_text.contains("--- Description ---\n\nThis MR implements JWT-based authentication...")); + } + + #[test] + fn test_mr_not_found() { + let conn = setup_mr_test_db(); + let result = extract_mr_document(&conn, 999).unwrap(); + assert!(result.is_none()); + } + + #[test] + fn test_mr_no_description() { + let conn = setup_mr_test_db(); + insert_mr(&conn, 1, 10, Some("Quick fix"), None, Some("merged"), Some("alice"), Some("fix/bug"), Some("main"), None); + + let doc = extract_mr_document(&conn, 1).unwrap().unwrap(); + assert!(!doc.content_text.contains("--- Description ---")); + assert!(doc.content_text.contains("[[MergeRequest]] !10: Quick fix\n")); + } + + #[test] + fn test_mr_branch_info() { + let conn = setup_mr_test_db(); + insert_mr(&conn, 1, 10, Some("Test"), Some("Body"), Some("opened"), None, Some("feature/foo"), Some("develop"), None); + + let doc = extract_mr_document(&conn, 1).unwrap().unwrap(); + assert!(doc.content_text.contains("Source: feature/foo -> develop\n")); + } + + #[test] + fn test_mr_no_branches() { + let conn = setup_mr_test_db(); + insert_mr(&conn, 1, 10, Some("Test"), None, Some("opened"), None, None, None, None); + + let doc = extract_mr_document(&conn, 1).unwrap().unwrap(); + assert!(!doc.content_text.contains("Source:")); + } + + // --- Discussion extraction tests --- + + fn setup_discussion_test_db() -> Connection { + let conn = setup_mr_test_db(); // includes projects, issues schema, labels, mr tables + conn.execute_batch(" + CREATE TABLE discussions ( + id INTEGER PRIMARY KEY, + gitlab_discussion_id TEXT NOT NULL, + project_id INTEGER NOT NULL REFERENCES projects(id), + issue_id INTEGER REFERENCES issues(id), + merge_request_id INTEGER, + noteable_type TEXT NOT NULL, + individual_note INTEGER NOT NULL DEFAULT 0, + first_note_at INTEGER, + last_note_at INTEGER, + last_seen_at INTEGER NOT NULL, + resolvable INTEGER NOT NULL DEFAULT 0, + resolved INTEGER NOT NULL DEFAULT 0 + ); + CREATE TABLE notes ( + id INTEGER PRIMARY KEY, + gitlab_id INTEGER UNIQUE NOT NULL, + discussion_id INTEGER NOT NULL REFERENCES discussions(id), + project_id INTEGER NOT NULL REFERENCES projects(id), + note_type TEXT, + is_system INTEGER NOT NULL DEFAULT 0, + author_username TEXT, + body TEXT, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + last_seen_at INTEGER NOT NULL, + position INTEGER, + resolvable INTEGER NOT NULL DEFAULT 0, + resolved INTEGER NOT NULL DEFAULT 0, + resolved_by TEXT, + resolved_at INTEGER, + position_old_path TEXT, + position_new_path TEXT, + position_old_line INTEGER, + position_new_line INTEGER, + raw_payload_id INTEGER + ); + ").unwrap(); + conn + } + + fn insert_discussion(conn: &Connection, id: i64, noteable_type: &str, issue_id: Option, mr_id: Option) { + conn.execute( + "INSERT INTO discussions (id, gitlab_discussion_id, project_id, issue_id, merge_request_id, noteable_type, last_seen_at) VALUES (?1, ?2, 1, ?3, ?4, ?5, 3000)", + rusqlite::params![id, format!("disc_{}", id), issue_id, mr_id, noteable_type], + ).unwrap(); + } + + fn insert_note(conn: &Connection, id: i64, gitlab_id: i64, discussion_id: i64, author: Option<&str>, body: Option<&str>, created_at: i64, is_system: bool, old_path: Option<&str>, new_path: Option<&str>) { + conn.execute( + "INSERT INTO notes (id, gitlab_id, discussion_id, project_id, author_username, body, created_at, updated_at, last_seen_at, is_system, position_old_path, position_new_path) VALUES (?1, ?2, ?3, 1, ?4, ?5, ?6, ?6, ?6, ?7, ?8, ?9)", + rusqlite::params![id, gitlab_id, discussion_id, author, body, created_at, is_system as i32, old_path, new_path], + ).unwrap(); + } + + #[test] + fn test_discussion_document_format() { + let conn = setup_discussion_test_db(); + insert_issue(&conn, 1, 234, Some("Authentication redesign"), Some("desc"), "opened", Some("johndoe"), Some("https://gitlab.example.com/group/project-one/-/issues/234")); + insert_label(&conn, 1, "auth"); + insert_label(&conn, 2, "bug"); + link_issue_label(&conn, 1, 1); + link_issue_label(&conn, 1, 2); + insert_discussion(&conn, 1, "Issue", Some(1), None); + // 1710460800000 = 2024-03-15T00:00:00Z + insert_note(&conn, 1, 12345, 1, Some("johndoe"), Some("I think we should move to JWT-based auth..."), 1710460800000, false, None, None); + insert_note(&conn, 2, 12346, 1, Some("janedoe"), Some("Agreed. What about refresh token strategy?"), 1710460800000, false, None, None); + + let doc = extract_discussion_document(&conn, 1).unwrap().unwrap(); + assert_eq!(doc.source_type, SourceType::Discussion); + assert!(doc.content_text.starts_with("[[Discussion]] Issue #234: Authentication redesign\n")); + assert!(doc.content_text.contains("Project: group/project-one\n")); + assert!(doc.content_text.contains("URL: https://gitlab.example.com/group/project-one/-/issues/234#note_12345\n")); + assert!(doc.content_text.contains("Labels: [\"auth\",\"bug\"]\n")); + assert!(doc.content_text.contains("--- Thread ---")); + assert!(doc.content_text.contains("@johndoe (2024-03-15):\nI think we should move to JWT-based auth...")); + assert!(doc.content_text.contains("@janedoe (2024-03-15):\nAgreed. What about refresh token strategy?")); + assert_eq!(doc.author_username, Some("johndoe".to_string())); + assert!(doc.title.is_none()); // Discussions don't have their own title + } + + #[test] + fn test_discussion_not_found() { + let conn = setup_discussion_test_db(); + let result = extract_discussion_document(&conn, 999).unwrap(); + assert!(result.is_none()); + } + + #[test] + fn test_discussion_parent_deleted() { + let conn = setup_discussion_test_db(); + // Insert issue, create discussion, then delete the issue + insert_issue(&conn, 99, 10, Some("To be deleted"), None, "opened", None, None); + insert_discussion(&conn, 1, "Issue", Some(99), None); + insert_note(&conn, 1, 100, 1, Some("alice"), Some("Hello"), 1000, false, None, None); + // Delete the parent issue — FK cascade won't delete discussion in test since + // we used REFERENCES without ON DELETE CASCADE in test schema, so just delete from issues + conn.execute("PRAGMA foreign_keys = OFF", []).unwrap(); + conn.execute("DELETE FROM issues WHERE id = 99", []).unwrap(); + conn.execute("PRAGMA foreign_keys = ON", []).unwrap(); + + let result = extract_discussion_document(&conn, 1).unwrap(); + assert!(result.is_none()); + } + + #[test] + fn test_discussion_system_notes_excluded() { + let conn = setup_discussion_test_db(); + insert_issue(&conn, 1, 10, Some("Test"), Some("desc"), "opened", Some("alice"), None); + insert_discussion(&conn, 1, "Issue", Some(1), None); + insert_note(&conn, 1, 100, 1, Some("alice"), Some("Real comment"), 1000, false, None, None); + insert_note(&conn, 2, 101, 1, Some("bot"), Some("assigned to @alice"), 2000, true, None, None); + insert_note(&conn, 3, 102, 1, Some("bob"), Some("Follow-up"), 3000, false, None, None); + + let doc = extract_discussion_document(&conn, 1).unwrap().unwrap(); + assert!(doc.content_text.contains("@alice")); + assert!(doc.content_text.contains("@bob")); + assert!(!doc.content_text.contains("assigned to")); + } + + #[test] + fn test_discussion_diffnote_paths() { + let conn = setup_discussion_test_db(); + insert_issue(&conn, 1, 10, Some("Test"), Some("desc"), "opened", None, None); + insert_discussion(&conn, 1, "Issue", Some(1), None); + insert_note(&conn, 1, 100, 1, Some("alice"), Some("Comment on code"), 1000, false, Some("src/old.rs"), Some("src/new.rs")); + insert_note(&conn, 2, 101, 1, Some("bob"), Some("Reply"), 2000, false, Some("src/old.rs"), Some("src/new.rs")); + + let doc = extract_discussion_document(&conn, 1).unwrap().unwrap(); + // Paths should be deduplicated and sorted + assert_eq!(doc.paths, vec!["src/new.rs", "src/old.rs"]); + assert!(doc.content_text.contains("Files: [\"src/new.rs\",\"src/old.rs\"]")); + } + + #[test] + fn test_discussion_url_construction() { + let conn = setup_discussion_test_db(); + insert_issue(&conn, 1, 10, Some("Test"), Some("desc"), "opened", None, Some("https://gitlab.example.com/group/project-one/-/issues/10")); + insert_discussion(&conn, 1, "Issue", Some(1), None); + insert_note(&conn, 1, 54321, 1, Some("alice"), Some("Hello"), 1000, false, None, None); + + let doc = extract_discussion_document(&conn, 1).unwrap().unwrap(); + assert_eq!(doc.url, Some("https://gitlab.example.com/group/project-one/-/issues/10#note_54321".to_string())); + } + + #[test] + fn test_discussion_uses_parent_labels() { + let conn = setup_discussion_test_db(); + insert_issue(&conn, 1, 10, Some("Test"), Some("desc"), "opened", None, None); + insert_label(&conn, 1, "backend"); + insert_label(&conn, 2, "api"); + link_issue_label(&conn, 1, 1); + link_issue_label(&conn, 1, 2); + insert_discussion(&conn, 1, "Issue", Some(1), None); + insert_note(&conn, 1, 100, 1, Some("alice"), Some("Comment"), 1000, false, None, None); + + let doc = extract_discussion_document(&conn, 1).unwrap().unwrap(); + assert_eq!(doc.labels, vec!["api", "backend"]); + } + + #[test] + fn test_discussion_on_mr() { + let conn = setup_discussion_test_db(); + insert_mr(&conn, 1, 456, Some("JWT Auth"), Some("desc"), Some("opened"), Some("johndoe"), Some("feature/jwt"), Some("main"), Some("https://gitlab.example.com/group/project-one/-/merge_requests/456")); + insert_discussion(&conn, 1, "MergeRequest", None, Some(1)); + insert_note(&conn, 1, 100, 1, Some("alice"), Some("LGTM"), 1000, false, None, None); + + let doc = extract_discussion_document(&conn, 1).unwrap().unwrap(); + assert!(doc.content_text.contains("[[Discussion]] MR !456: JWT Auth\n")); + } + + #[test] + fn test_discussion_all_system_notes() { + let conn = setup_discussion_test_db(); + insert_issue(&conn, 1, 10, Some("Test"), Some("desc"), "opened", None, None); + insert_discussion(&conn, 1, "Issue", Some(1), None); + insert_note(&conn, 1, 100, 1, Some("bot"), Some("assigned to @alice"), 1000, true, None, None); + + // All notes are system notes -> no content -> returns None + let result = extract_discussion_document(&conn, 1).unwrap(); + assert!(result.is_none()); + } +} diff --git a/src/documents/mod.rs b/src/documents/mod.rs new file mode 100644 index 0000000..453854a --- /dev/null +++ b/src/documents/mod.rs @@ -0,0 +1,17 @@ +//! Document generation and management. +//! +//! Extracts searchable documents from issues, MRs, and discussions. + +mod extractor; +mod regenerator; +mod truncation; + +pub use extractor::{ + compute_content_hash, compute_list_hash, extract_discussion_document, + extract_issue_document, extract_mr_document, DocumentData, SourceType, +}; +pub use regenerator::{regenerate_dirty_documents, RegenerateResult}; +pub use truncation::{ + truncate_discussion, truncate_hard_cap, truncate_utf8, NoteContent, TruncationReason, + TruncationResult, MAX_DISCUSSION_BYTES, MAX_DOCUMENT_BYTES_HARD, +}; diff --git a/src/documents/regenerator.rs b/src/documents/regenerator.rs new file mode 100644 index 0000000..92f206c --- /dev/null +++ b/src/documents/regenerator.rs @@ -0,0 +1,475 @@ +use rusqlite::Connection; +use rusqlite::OptionalExtension; +use tracing::{debug, warn}; + +use crate::core::error::Result; +use crate::documents::{ + extract_discussion_document, extract_issue_document, extract_mr_document, DocumentData, + SourceType, +}; +use crate::ingestion::dirty_tracker::{clear_dirty, get_dirty_sources, record_dirty_error}; + +/// Result of a document regeneration run. +#[derive(Debug, Default)] +pub struct RegenerateResult { + pub regenerated: usize, + pub unchanged: usize, + pub errored: usize, +} + +/// Drain the dirty_sources queue, regenerating documents for each entry. +/// +/// Uses per-item error handling (fail-soft) and drains the queue completely +/// via a bounded batch loop. Each dirty item is processed independently. +pub fn regenerate_dirty_documents(conn: &Connection) -> Result { + let mut result = RegenerateResult::default(); + + loop { + let dirty = get_dirty_sources(conn)?; + if dirty.is_empty() { + break; + } + + for (source_type, source_id) in &dirty { + match regenerate_one(conn, *source_type, *source_id) { + Ok(changed) => { + if changed { + result.regenerated += 1; + } else { + result.unchanged += 1; + } + clear_dirty(conn, *source_type, *source_id)?; + } + Err(e) => { + warn!( + source_type = %source_type, + source_id, + error = %e, + "Failed to regenerate document" + ); + record_dirty_error(conn, *source_type, *source_id, &e.to_string())?; + result.errored += 1; + } + } + } + } + + debug!( + regenerated = result.regenerated, + unchanged = result.unchanged, + errored = result.errored, + "Document regeneration complete" + ); + + Ok(result) +} + +/// Regenerate a single document. Returns true if content_hash changed. +fn regenerate_one( + conn: &Connection, + source_type: SourceType, + source_id: i64, +) -> Result { + let doc = match source_type { + SourceType::Issue => extract_issue_document(conn, source_id)?, + SourceType::MergeRequest => extract_mr_document(conn, source_id)?, + SourceType::Discussion => extract_discussion_document(conn, source_id)?, + }; + + let Some(doc) = doc else { + // Source was deleted — remove the document (cascade handles FTS/embeddings) + delete_document(conn, source_type, source_id)?; + return Ok(true); + }; + + let existing_hash = get_existing_hash(conn, source_type, source_id)?; + let changed = existing_hash.as_ref() != Some(&doc.content_hash); + + // Always upsert: labels/paths can change independently of content_hash + upsert_document(conn, &doc)?; + + Ok(changed) +} + +/// Get existing content hash for a document, if it exists. +fn get_existing_hash( + conn: &Connection, + source_type: SourceType, + source_id: i64, +) -> Result> { + let mut stmt = + conn.prepare("SELECT content_hash FROM documents WHERE source_type = ?1 AND source_id = ?2")?; + + let hash: Option = stmt + .query_row(rusqlite::params![source_type.as_str(), source_id], |row| { + row.get(0) + }) + .optional()?; + + Ok(hash) +} + +/// Upsert a document with triple-hash write optimization. +/// +/// Wrapped in a SAVEPOINT to ensure atomicity of the multi-statement write +/// (document row + labels + paths). Without this, a crash between statements +/// could leave the document with a stale labels_hash but missing label rows. +fn upsert_document(conn: &Connection, doc: &DocumentData) -> Result<()> { + conn.execute_batch("SAVEPOINT upsert_doc")?; + match upsert_document_inner(conn, doc) { + Ok(()) => { + conn.execute_batch("RELEASE upsert_doc")?; + Ok(()) + } + Err(e) => { + let _ = conn.execute_batch("ROLLBACK TO upsert_doc"); + Err(e) + } + } +} + +fn upsert_document_inner(conn: &Connection, doc: &DocumentData) -> Result<()> { + // Check existing hashes before writing + let existing: Option<(i64, String, String, String)> = conn + .query_row( + "SELECT id, content_hash, labels_hash, paths_hash FROM documents + WHERE source_type = ?1 AND source_id = ?2", + rusqlite::params![doc.source_type.as_str(), doc.source_id], + |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)), + ) + .optional()?; + + // Fast path: skip ALL writes when nothing changed (prevents WAL churn) + if let Some((_, ref old_content_hash, ref old_labels_hash, ref old_paths_hash)) = existing { + if old_content_hash == &doc.content_hash + && old_labels_hash == &doc.labels_hash + && old_paths_hash == &doc.paths_hash + { + return Ok(()); + } + } + + let labels_json = + serde_json::to_string(&doc.labels).unwrap_or_else(|_| "[]".to_string()); + + // Upsert document row + conn.execute( + "INSERT INTO documents + (source_type, source_id, project_id, author_username, label_names, + labels_hash, paths_hash, + created_at, updated_at, url, title, content_text, content_hash, + is_truncated, truncated_reason) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15) + ON CONFLICT(source_type, source_id) DO UPDATE SET + author_username = excluded.author_username, + label_names = excluded.label_names, + labels_hash = excluded.labels_hash, + paths_hash = excluded.paths_hash, + updated_at = excluded.updated_at, + url = excluded.url, + title = excluded.title, + content_text = excluded.content_text, + content_hash = excluded.content_hash, + is_truncated = excluded.is_truncated, + truncated_reason = excluded.truncated_reason", + rusqlite::params![ + doc.source_type.as_str(), + doc.source_id, + doc.project_id, + doc.author_username, + labels_json, + doc.labels_hash, + doc.paths_hash, + doc.created_at, + doc.updated_at, + doc.url, + doc.title, + doc.content_text, + doc.content_hash, + doc.is_truncated as i32, + doc.truncated_reason, + ], + )?; + + // Get document ID + let doc_id = match existing { + Some((id, _, _, _)) => id, + None => get_document_id(conn, doc.source_type, doc.source_id)?, + }; + + // Only update labels if hash changed + let labels_changed = match &existing { + Some((_, _, old_hash, _)) => old_hash != &doc.labels_hash, + None => true, + }; + if labels_changed { + conn.execute( + "DELETE FROM document_labels WHERE document_id = ?1", + [doc_id], + )?; + for label in &doc.labels { + conn.execute( + "INSERT INTO document_labels (document_id, label_name) VALUES (?1, ?2)", + rusqlite::params![doc_id, label], + )?; + } + } + + // Only update paths if hash changed + let paths_changed = match &existing { + Some((_, _, _, old_hash)) => old_hash != &doc.paths_hash, + None => true, + }; + if paths_changed { + conn.execute( + "DELETE FROM document_paths WHERE document_id = ?1", + [doc_id], + )?; + for path in &doc.paths { + conn.execute( + "INSERT INTO document_paths (document_id, path) VALUES (?1, ?2)", + rusqlite::params![doc_id, path], + )?; + } + } + + Ok(()) +} + +/// Delete a document by source identity. +fn delete_document( + conn: &Connection, + source_type: SourceType, + source_id: i64, +) -> Result<()> { + conn.execute( + "DELETE FROM documents WHERE source_type = ?1 AND source_id = ?2", + rusqlite::params![source_type.as_str(), source_id], + )?; + Ok(()) +} + +/// Get document ID by source type and source ID. +fn get_document_id( + conn: &Connection, + source_type: SourceType, + source_id: i64, +) -> Result { + let id: i64 = conn.query_row( + "SELECT id FROM documents WHERE source_type = ?1 AND source_id = ?2", + rusqlite::params![source_type.as_str(), source_id], + |row| row.get(0), + )?; + Ok(id) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::ingestion::dirty_tracker::mark_dirty; + + fn setup_db() -> Connection { + let conn = Connection::open_in_memory().unwrap(); + conn.execute_batch(" + CREATE TABLE projects ( + id INTEGER PRIMARY KEY, + gitlab_project_id INTEGER UNIQUE NOT NULL, + path_with_namespace TEXT NOT NULL, + default_branch TEXT, + web_url TEXT, + created_at INTEGER, + updated_at INTEGER, + raw_payload_id INTEGER + ); + INSERT INTO projects (id, gitlab_project_id, path_with_namespace) VALUES (1, 100, 'group/project'); + + CREATE TABLE issues ( + id INTEGER PRIMARY KEY, + gitlab_id INTEGER UNIQUE NOT NULL, + project_id INTEGER NOT NULL REFERENCES projects(id), + iid INTEGER NOT NULL, + title TEXT, + description TEXT, + state TEXT NOT NULL, + author_username TEXT, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + last_seen_at INTEGER NOT NULL, + discussions_synced_for_updated_at INTEGER, + web_url TEXT, + raw_payload_id INTEGER + ); + CREATE TABLE labels ( + id INTEGER PRIMARY KEY, + gitlab_id INTEGER, + project_id INTEGER NOT NULL REFERENCES projects(id), + name TEXT NOT NULL, + color TEXT, + description TEXT + ); + CREATE TABLE issue_labels ( + issue_id INTEGER NOT NULL REFERENCES issues(id), + label_id INTEGER NOT NULL REFERENCES labels(id), + PRIMARY KEY(issue_id, label_id) + ); + + CREATE TABLE documents ( + id INTEGER PRIMARY KEY, + source_type TEXT NOT NULL, + source_id INTEGER NOT NULL, + project_id INTEGER NOT NULL, + author_username TEXT, + label_names TEXT, + created_at INTEGER, + updated_at INTEGER, + url TEXT, + title TEXT, + content_text TEXT NOT NULL, + content_hash TEXT NOT NULL, + labels_hash TEXT NOT NULL DEFAULT '', + paths_hash TEXT NOT NULL DEFAULT '', + is_truncated INTEGER NOT NULL DEFAULT 0, + truncated_reason TEXT, + UNIQUE(source_type, source_id) + ); + CREATE TABLE document_labels ( + document_id INTEGER NOT NULL REFERENCES documents(id) ON DELETE CASCADE, + label_name TEXT NOT NULL, + PRIMARY KEY(document_id, label_name) + ); + CREATE TABLE document_paths ( + document_id INTEGER NOT NULL REFERENCES documents(id) ON DELETE CASCADE, + path TEXT NOT NULL, + PRIMARY KEY(document_id, path) + ); + CREATE TABLE dirty_sources ( + source_type TEXT NOT NULL, + source_id INTEGER NOT NULL, + queued_at INTEGER NOT NULL, + attempt_count INTEGER NOT NULL DEFAULT 0, + last_attempt_at INTEGER, + last_error TEXT, + next_attempt_at INTEGER, + PRIMARY KEY(source_type, source_id) + ); + CREATE INDEX idx_dirty_sources_next_attempt ON dirty_sources(next_attempt_at); + ").unwrap(); + conn + } + + #[test] + fn test_regenerate_creates_document() { + let conn = setup_db(); + conn.execute( + "INSERT INTO issues (id, gitlab_id, project_id, iid, title, description, state, author_username, created_at, updated_at, last_seen_at) VALUES (1, 10, 1, 42, 'Test Issue', 'Description here', 'opened', 'alice', 1000, 2000, 3000)", + [], + ).unwrap(); + mark_dirty(&conn, SourceType::Issue, 1).unwrap(); + + let result = regenerate_dirty_documents(&conn).unwrap(); + assert_eq!(result.regenerated, 1); + assert_eq!(result.unchanged, 0); + assert_eq!(result.errored, 0); + + // Verify document was created + let count: i64 = conn.query_row("SELECT COUNT(*) FROM documents", [], |r| r.get(0)).unwrap(); + assert_eq!(count, 1); + + let content: String = conn.query_row("SELECT content_text FROM documents", [], |r| r.get(0)).unwrap(); + assert!(content.contains("[[Issue]] #42: Test Issue")); + } + + #[test] + fn test_regenerate_unchanged() { + let conn = setup_db(); + conn.execute( + "INSERT INTO issues (id, gitlab_id, project_id, iid, title, description, state, author_username, created_at, updated_at, last_seen_at) VALUES (1, 10, 1, 42, 'Test', 'Desc', 'opened', 'alice', 1000, 2000, 3000)", + [], + ).unwrap(); + + // First regeneration creates the document + mark_dirty(&conn, SourceType::Issue, 1).unwrap(); + let r1 = regenerate_dirty_documents(&conn).unwrap(); + assert_eq!(r1.regenerated, 1); + + // Second regeneration — same data, should be unchanged + mark_dirty(&conn, SourceType::Issue, 1).unwrap(); + let r2 = regenerate_dirty_documents(&conn).unwrap(); + assert_eq!(r2.unchanged, 1); + assert_eq!(r2.regenerated, 0); + } + + #[test] + fn test_regenerate_deleted_source() { + let conn = setup_db(); + conn.execute( + "INSERT INTO issues (id, gitlab_id, project_id, iid, title, state, created_at, updated_at, last_seen_at) VALUES (1, 10, 1, 42, 'Test', 'opened', 1000, 2000, 3000)", + [], + ).unwrap(); + mark_dirty(&conn, SourceType::Issue, 1).unwrap(); + regenerate_dirty_documents(&conn).unwrap(); + + // Delete the issue and re-mark dirty + conn.execute("PRAGMA foreign_keys = OFF", []).unwrap(); + conn.execute("DELETE FROM issues WHERE id = 1", []).unwrap(); + conn.execute("PRAGMA foreign_keys = ON", []).unwrap(); + mark_dirty(&conn, SourceType::Issue, 1).unwrap(); + + let result = regenerate_dirty_documents(&conn).unwrap(); + assert_eq!(result.regenerated, 1); // Deletion counts as "changed" + + let count: i64 = conn.query_row("SELECT COUNT(*) FROM documents", [], |r| r.get(0)).unwrap(); + assert_eq!(count, 0); + } + + #[test] + fn test_regenerate_drains_queue() { + let conn = setup_db(); + for i in 1..=10 { + conn.execute( + "INSERT INTO issues (id, gitlab_id, project_id, iid, title, state, created_at, updated_at, last_seen_at) VALUES (?1, ?2, 1, ?1, 'Test', 'opened', 1000, 2000, 3000)", + rusqlite::params![i, i * 10], + ).unwrap(); + mark_dirty(&conn, SourceType::Issue, i).unwrap(); + } + + let result = regenerate_dirty_documents(&conn).unwrap(); + assert_eq!(result.regenerated, 10); + + // Queue should be empty + let dirty = get_dirty_sources(&conn).unwrap(); + assert!(dirty.is_empty()); + } + + #[test] + fn test_triple_hash_fast_path() { + let conn = setup_db(); + conn.execute( + "INSERT INTO issues (id, gitlab_id, project_id, iid, title, state, created_at, updated_at, last_seen_at) VALUES (1, 10, 1, 42, 'Test', 'opened', 1000, 2000, 3000)", + [], + ).unwrap(); + conn.execute( + "INSERT INTO labels (id, project_id, name) VALUES (1, 1, 'bug')", + [], + ).unwrap(); + conn.execute( + "INSERT INTO issue_labels (issue_id, label_id) VALUES (1, 1)", + [], + ).unwrap(); + + // First run creates document + mark_dirty(&conn, SourceType::Issue, 1).unwrap(); + regenerate_dirty_documents(&conn).unwrap(); + + // Second run — triple hash match, should skip ALL writes + mark_dirty(&conn, SourceType::Issue, 1).unwrap(); + let result = regenerate_dirty_documents(&conn).unwrap(); + assert_eq!(result.unchanged, 1); + + // Labels should still be present (not deleted and re-inserted) + let label_count: i64 = conn.query_row( + "SELECT COUNT(*) FROM document_labels", [], |r| r.get(0), + ).unwrap(); + assert_eq!(label_count, 1); + } +} diff --git a/src/documents/truncation.rs b/src/documents/truncation.rs new file mode 100644 index 0000000..003f607 --- /dev/null +++ b/src/documents/truncation.rs @@ -0,0 +1,329 @@ +/// Maximum byte limit for discussion documents (suitable for embedding chunking). +/// Note: uses `.len()` (byte count), not char count — consistent with `CHUNK_MAX_BYTES`. +pub const MAX_DISCUSSION_BYTES: usize = 32_000; + +/// Hard safety cap (bytes) for any document type (pathological content: pasted logs, base64). +pub const MAX_DOCUMENT_BYTES_HARD: usize = 2_000_000; + +/// A single note's content for truncation processing. +pub struct NoteContent { + pub author: String, + pub date: String, + pub body: String, +} + +/// Result of truncation processing. +pub struct TruncationResult { + pub content: String, + pub is_truncated: bool, + pub reason: Option, +} + +/// Why a document was truncated (matches DB CHECK constraint values). +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TruncationReason { + TokenLimitMiddleDrop, + SingleNoteOversized, + FirstLastOversized, + HardCapOversized, +} + +impl TruncationReason { + /// Returns the DB-compatible string matching the CHECK constraint. + pub fn as_str(&self) -> &'static str { + match self { + Self::TokenLimitMiddleDrop => "token_limit_middle_drop", + Self::SingleNoteOversized => "single_note_oversized", + Self::FirstLastOversized => "first_last_oversized", + Self::HardCapOversized => "hard_cap_oversized", + } + } +} + +/// Format a single note as `@author (date):\nbody\n\n`. +fn format_note(note: &NoteContent) -> String { + format!("@{} ({}):\n{}\n\n", note.author, note.date, note.body) +} + +/// Truncate a string at a UTF-8-safe byte boundary. +/// Returns a slice no longer than `max_bytes` bytes, walking backward +/// to find the nearest char boundary if needed. +pub fn truncate_utf8(s: &str, max_bytes: usize) -> &str { + if s.len() <= max_bytes { + return s; + } + // Walk backward from max_bytes to find a char boundary + let mut end = max_bytes; + while end > 0 && !s.is_char_boundary(end) { + end -= 1; + } + &s[..end] +} + +/// Truncate discussion notes to fit within `max_bytes`. +/// +/// Algorithm: +/// 1. Format all notes +/// 2. If total fits, return as-is +/// 3. Single note: truncate at UTF-8 boundary, append [truncated] +/// 4. Try to keep first N notes + last note + marker within limit +/// 5. If first + last > limit: keep only first (truncated) +pub fn truncate_discussion(notes: &[NoteContent], max_bytes: usize) -> TruncationResult { + if notes.is_empty() { + return TruncationResult { + content: String::new(), + is_truncated: false, + reason: None, + }; + } + + let formatted: Vec = notes.iter().map(format_note).collect(); + let total: String = formatted.concat(); + + // Case 1: fits within limit + if total.len() <= max_bytes { + return TruncationResult { + content: total, + is_truncated: false, + reason: None, + }; + } + + // Case 2: single note — truncate it + if notes.len() == 1 { + let truncated = truncate_utf8(&total, max_bytes.saturating_sub(11)); // room for [truncated] + let content = format!("{}[truncated]", truncated); + return TruncationResult { + content, + is_truncated: true, + reason: Some(TruncationReason::SingleNoteOversized), + }; + } + + // Case 3: multiple notes — try first N + marker + last + let last_note = &formatted[formatted.len() - 1]; + + // Binary search for max N where first N notes + marker + last note fit + let mut best_n = 0; + for n in 1..formatted.len() - 1 { + let first_n: usize = formatted[..n].iter().map(|s| s.len()).sum(); + let omitted = formatted.len() - n - 1; + let marker = format!("\n\n[... {} notes omitted for length ...]\n\n", omitted); + let candidate_len = first_n + marker.len() + last_note.len(); + if candidate_len <= max_bytes { + best_n = n; + } else { + break; + } + } + + if best_n > 0 { + // We can keep first best_n notes + marker + last note + let first_part: String = formatted[..best_n].concat(); + let omitted = formatted.len() - best_n - 1; + let marker = format!("\n\n[... {} notes omitted for length ...]\n\n", omitted); + let content = format!("{}{}{}", first_part, marker, last_note); + return TruncationResult { + content, + is_truncated: true, + reason: Some(TruncationReason::TokenLimitMiddleDrop), + }; + } + + // Case 4: even first + last don't fit — keep only first (truncated) + let first_note = &formatted[0]; + if first_note.len() + last_note.len() > max_bytes { + let truncated = truncate_utf8(first_note, max_bytes.saturating_sub(11)); + let content = format!("{}[truncated]", truncated); + return TruncationResult { + content, + is_truncated: true, + reason: Some(TruncationReason::FirstLastOversized), + }; + } + + // Fallback: first + marker + last (0 middle notes kept) + let omitted = formatted.len() - 2; + let marker = format!("\n\n[... {} notes omitted for length ...]\n\n", omitted); + let content = format!("{}{}{}", formatted[0], marker, last_note); + TruncationResult { + content, + is_truncated: true, + reason: Some(TruncationReason::TokenLimitMiddleDrop), + } +} + +/// Apply hard cap truncation to any document type. +/// Truncates at UTF-8-safe boundary if content exceeds 2MB. +pub fn truncate_hard_cap(content: &str) -> TruncationResult { + if content.len() <= MAX_DOCUMENT_BYTES_HARD { + return TruncationResult { + content: content.to_string(), + is_truncated: false, + reason: None, + }; + } + + let truncated = truncate_utf8(content, MAX_DOCUMENT_BYTES_HARD.saturating_sub(11)); + TruncationResult { + content: format!("{}[truncated]", truncated), + is_truncated: true, + reason: Some(TruncationReason::HardCapOversized), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_note(author: &str, body: &str) -> NoteContent { + NoteContent { + author: author.to_string(), + date: "2024-01-01".to_string(), + body: body.to_string(), + } + } + + #[test] + fn test_no_truncation_under_limit() { + let notes = vec![ + make_note("alice", "Short note 1"), + make_note("bob", "Short note 2"), + make_note("carol", "Short note 3"), + ]; + let result = truncate_discussion(¬es, MAX_DISCUSSION_BYTES); + assert!(!result.is_truncated); + assert!(result.reason.is_none()); + assert!(result.content.contains("@alice")); + assert!(result.content.contains("@bob")); + assert!(result.content.contains("@carol")); + } + + #[test] + fn test_middle_notes_dropped() { + // Create 10 notes where total exceeds limit + let big_body = "x".repeat(4000); + let notes: Vec = (0..10) + .map(|i| make_note(&format!("user{}", i), &big_body)) + .collect(); + let result = truncate_discussion(¬es, 10_000); + assert!(result.is_truncated); + assert_eq!(result.reason, Some(TruncationReason::TokenLimitMiddleDrop)); + // First note preserved + assert!(result.content.contains("@user0")); + // Last note preserved + assert!(result.content.contains("@user9")); + // Marker present + assert!(result.content.contains("notes omitted for length")); + } + + #[test] + fn test_single_note_oversized() { + let big_body = "x".repeat(50_000); + let notes = vec![make_note("alice", &big_body)]; + let result = truncate_discussion(¬es, MAX_DISCUSSION_BYTES); + assert!(result.is_truncated); + assert_eq!(result.reason, Some(TruncationReason::SingleNoteOversized)); + assert!(result.content.ends_with("[truncated]")); + assert!(result.content.len() <= MAX_DISCUSSION_BYTES + 20); + } + + #[test] + fn test_first_last_oversized() { + let big_body = "x".repeat(20_000); + let notes = vec![ + make_note("alice", &big_body), + make_note("bob", &big_body), + ]; + let result = truncate_discussion(¬es, 10_000); + assert!(result.is_truncated); + assert_eq!(result.reason, Some(TruncationReason::FirstLastOversized)); + assert!(result.content.contains("@alice")); + assert!(result.content.ends_with("[truncated]")); + } + + #[test] + fn test_one_note_under_limit() { + let notes = vec![make_note("alice", "Short note")]; + let result = truncate_discussion(¬es, MAX_DISCUSSION_BYTES); + assert!(!result.is_truncated); + assert!(result.content.contains("@alice")); + } + + #[test] + fn test_empty_notes() { + let result = truncate_discussion(&[], MAX_DISCUSSION_BYTES); + assert!(!result.is_truncated); + assert!(result.content.is_empty()); + } + + #[test] + fn test_utf8_boundary_safety() { + // Emoji are 4 bytes each + let emoji_content = "🎉".repeat(10); + let truncated = truncate_utf8(&emoji_content, 10); + // 10 bytes should hold 2 emoji (8 bytes) with 2 bytes left over (not enough for another) + assert_eq!(truncated.len(), 8); + assert_eq!(truncated, "🎉🎉"); + } + + #[test] + fn test_utf8_boundary_cjk() { + // CJK characters are 3 bytes each + let cjk = "中文字符测试"; + let truncated = truncate_utf8(cjk, 7); + // 7 bytes: 2 full chars (6 bytes), 1 byte left (not enough for another) + assert_eq!(truncated, "中文"); + assert_eq!(truncated.len(), 6); + } + + #[test] + fn test_hard_cap() { + let big_content = "x".repeat(3_000_000); + let result = truncate_hard_cap(&big_content); + assert!(result.is_truncated); + assert_eq!(result.reason, Some(TruncationReason::HardCapOversized)); + assert!(result.content.len() <= MAX_DOCUMENT_BYTES_HARD + 20); + assert!(result.content.ends_with("[truncated]")); + } + + #[test] + fn test_hard_cap_under_limit() { + let content = "Short content"; + let result = truncate_hard_cap(content); + assert!(!result.is_truncated); + assert_eq!(result.content, content); + } + + #[test] + fn test_marker_count_correct() { + // 7 notes, keep first 1 + last 1, drop middle 5 + let big_body = "x".repeat(5000); + let notes: Vec = (0..7) + .map(|i| make_note(&format!("user{}", i), &big_body)) + .collect(); + let result = truncate_discussion(¬es, 12_000); + assert!(result.is_truncated); + assert!(result.content.contains("[... 5 notes omitted for length ...]")); + } + + #[test] + fn test_truncation_reason_as_str() { + assert_eq!( + TruncationReason::TokenLimitMiddleDrop.as_str(), + "token_limit_middle_drop" + ); + assert_eq!( + TruncationReason::SingleNoteOversized.as_str(), + "single_note_oversized" + ); + assert_eq!( + TruncationReason::FirstLastOversized.as_str(), + "first_last_oversized" + ); + assert_eq!( + TruncationReason::HardCapOversized.as_str(), + "hard_cap_oversized" + ); + } +} diff --git a/src/ingestion/dirty_tracker.rs b/src/ingestion/dirty_tracker.rs new file mode 100644 index 0000000..f3e5ca2 --- /dev/null +++ b/src/ingestion/dirty_tracker.rs @@ -0,0 +1,258 @@ +use rusqlite::Connection; + +use crate::core::backoff::compute_next_attempt_at; +use crate::core::error::Result; +use crate::core::time::now_ms; +use crate::documents::SourceType; + +const DIRTY_SOURCES_BATCH_SIZE: usize = 500; + +/// Mark a source entity as dirty INSIDE an existing transaction. +/// ON CONFLICT resets ALL backoff/error state so fresh updates are immediately eligible. +pub fn mark_dirty_tx( + tx: &rusqlite::Transaction<'_>, + source_type: SourceType, + source_id: i64, +) -> Result<()> { + tx.execute( + "INSERT INTO dirty_sources (source_type, source_id, queued_at) + VALUES (?1, ?2, ?3) + ON CONFLICT(source_type, source_id) DO UPDATE SET + queued_at = excluded.queued_at, + attempt_count = 0, + last_attempt_at = NULL, + last_error = NULL, + next_attempt_at = NULL", + rusqlite::params![source_type.as_str(), source_id, now_ms()], + )?; + Ok(()) +} + +/// Convenience wrapper for non-transactional contexts. +pub fn mark_dirty(conn: &Connection, source_type: SourceType, source_id: i64) -> Result<()> { + conn.execute( + "INSERT INTO dirty_sources (source_type, source_id, queued_at) + VALUES (?1, ?2, ?3) + ON CONFLICT(source_type, source_id) DO UPDATE SET + queued_at = excluded.queued_at, + attempt_count = 0, + last_attempt_at = NULL, + last_error = NULL, + next_attempt_at = NULL", + rusqlite::params![source_type.as_str(), source_id, now_ms()], + )?; + Ok(()) +} + +/// Get dirty sources ready for processing. +/// Returns entries where next_attempt_at is NULL or <= now. +/// Orders by attempt_count ASC (fresh before failed), then queued_at ASC. +pub fn get_dirty_sources(conn: &Connection) -> Result> { + let now = now_ms(); + let mut stmt = conn.prepare( + "SELECT source_type, source_id FROM dirty_sources + WHERE next_attempt_at IS NULL OR next_attempt_at <= ?1 + ORDER BY attempt_count ASC, queued_at ASC + LIMIT ?2" + )?; + let rows = stmt + .query_map(rusqlite::params![now, DIRTY_SOURCES_BATCH_SIZE as i64], |row| { + let st_str: String = row.get(0)?; + let source_id: i64 = row.get(1)?; + Ok((st_str, source_id)) + })? + .collect::, _>>()?; + + let mut results = Vec::with_capacity(rows.len()); + for (st_str, source_id) in rows { + let source_type = SourceType::parse(&st_str).ok_or_else(|| { + crate::core::error::LoreError::Other(format!( + "Invalid source_type in dirty_sources: {}", + st_str + )) + })?; + results.push((source_type, source_id)); + } + Ok(results) +} + +/// Clear dirty entry after successful processing. +pub fn clear_dirty(conn: &Connection, source_type: SourceType, source_id: i64) -> Result<()> { + conn.execute( + "DELETE FROM dirty_sources WHERE source_type = ?1 AND source_id = ?2", + rusqlite::params![source_type.as_str(), source_id], + )?; + Ok(()) +} + +/// Record an error for a dirty source, incrementing attempt_count and setting backoff. +pub fn record_dirty_error( + conn: &Connection, + source_type: SourceType, + source_id: i64, + error: &str, +) -> Result<()> { + let now = now_ms(); + // Get current attempt_count first + let attempt_count: i64 = conn.query_row( + "SELECT attempt_count FROM dirty_sources WHERE source_type = ?1 AND source_id = ?2", + rusqlite::params![source_type.as_str(), source_id], + |row| row.get(0), + )?; + + let new_attempt = attempt_count + 1; + let next_at = compute_next_attempt_at(now, new_attempt); + + conn.execute( + "UPDATE dirty_sources SET + attempt_count = ?1, + last_attempt_at = ?2, + last_error = ?3, + next_attempt_at = ?4 + WHERE source_type = ?5 AND source_id = ?6", + rusqlite::params![new_attempt, now, error, next_at, source_type.as_str(), source_id], + )?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn setup_db() -> Connection { + let conn = Connection::open_in_memory().unwrap(); + conn.execute_batch(" + CREATE TABLE dirty_sources ( + source_type TEXT NOT NULL CHECK (source_type IN ('issue','merge_request','discussion')), + source_id INTEGER NOT NULL, + queued_at INTEGER NOT NULL, + attempt_count INTEGER NOT NULL DEFAULT 0, + last_attempt_at INTEGER, + last_error TEXT, + next_attempt_at INTEGER, + PRIMARY KEY(source_type, source_id) + ); + CREATE INDEX idx_dirty_sources_next_attempt ON dirty_sources(next_attempt_at); + ").unwrap(); + conn + } + + #[test] + fn test_mark_dirty_inserts() { + let conn = setup_db(); + mark_dirty(&conn, SourceType::Issue, 1).unwrap(); + + let count: i64 = conn.query_row("SELECT COUNT(*) FROM dirty_sources", [], |r| r.get(0)).unwrap(); + assert_eq!(count, 1); + } + + #[test] + fn test_mark_dirty_tx_inserts() { + let mut conn = setup_db(); + { + let tx = conn.transaction().unwrap(); + mark_dirty_tx(&tx, SourceType::Issue, 1).unwrap(); + tx.commit().unwrap(); + } + let count: i64 = conn.query_row("SELECT COUNT(*) FROM dirty_sources", [], |r| r.get(0)).unwrap(); + assert_eq!(count, 1); + } + + #[test] + fn test_requeue_resets_backoff() { + let conn = setup_db(); + mark_dirty(&conn, SourceType::Issue, 1).unwrap(); + // Simulate error state + record_dirty_error(&conn, SourceType::Issue, 1, "test error").unwrap(); + + let attempt: i64 = conn.query_row( + "SELECT attempt_count FROM dirty_sources WHERE source_id = 1", [], |r| r.get(0) + ).unwrap(); + assert_eq!(attempt, 1); + + // Re-mark should reset + mark_dirty(&conn, SourceType::Issue, 1).unwrap(); + let attempt: i64 = conn.query_row( + "SELECT attempt_count FROM dirty_sources WHERE source_id = 1", [], |r| r.get(0) + ).unwrap(); + assert_eq!(attempt, 0); + + let next_at: Option = conn.query_row( + "SELECT next_attempt_at FROM dirty_sources WHERE source_id = 1", [], |r| r.get(0) + ).unwrap(); + assert!(next_at.is_none()); + } + + #[test] + fn test_get_respects_backoff() { + let conn = setup_db(); + mark_dirty(&conn, SourceType::Issue, 1).unwrap(); + // Set next_attempt_at far in the future + conn.execute( + "UPDATE dirty_sources SET next_attempt_at = 9999999999999 WHERE source_id = 1", + [], + ).unwrap(); + + let results = get_dirty_sources(&conn).unwrap(); + assert!(results.is_empty()); + } + + #[test] + fn test_get_orders_by_attempt_count() { + let conn = setup_db(); + // Insert issue 1 (failed, attempt_count=2) + mark_dirty(&conn, SourceType::Issue, 1).unwrap(); + conn.execute( + "UPDATE dirty_sources SET attempt_count = 2 WHERE source_id = 1", + [], + ).unwrap(); + // Insert issue 2 (fresh, attempt_count=0) + mark_dirty(&conn, SourceType::Issue, 2).unwrap(); + + let results = get_dirty_sources(&conn).unwrap(); + assert_eq!(results.len(), 2); + assert_eq!(results[0].1, 2); // Fresh first + assert_eq!(results[1].1, 1); // Failed second + } + + #[test] + fn test_batch_size_500() { + let conn = setup_db(); + for i in 0..600 { + mark_dirty(&conn, SourceType::Issue, i).unwrap(); + } + let results = get_dirty_sources(&conn).unwrap(); + assert_eq!(results.len(), 500); + } + + #[test] + fn test_clear_removes() { + let conn = setup_db(); + mark_dirty(&conn, SourceType::Issue, 1).unwrap(); + clear_dirty(&conn, SourceType::Issue, 1).unwrap(); + + let count: i64 = conn.query_row("SELECT COUNT(*) FROM dirty_sources", [], |r| r.get(0)).unwrap(); + assert_eq!(count, 0); + } + + #[test] + fn test_drain_loop() { + let conn = setup_db(); + for i in 0..1200 { + mark_dirty(&conn, SourceType::Issue, i).unwrap(); + } + + let mut total = 0; + loop { + let batch = get_dirty_sources(&conn).unwrap(); + if batch.is_empty() { + break; + } + for (st, id) in &batch { + clear_dirty(&conn, *st, *id).unwrap(); + } + total += batch.len(); + } + assert_eq!(total, 1200); + } +} diff --git a/src/ingestion/discussion_queue.rs b/src/ingestion/discussion_queue.rs new file mode 100644 index 0000000..ee4074d --- /dev/null +++ b/src/ingestion/discussion_queue.rs @@ -0,0 +1,265 @@ +use rusqlite::Connection; + +use crate::core::backoff::compute_next_attempt_at; +use crate::core::error::Result; +use crate::core::time::now_ms; + +/// Noteable type for discussion queue. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum NoteableType { + Issue, + MergeRequest, +} + +impl NoteableType { + pub fn as_str(&self) -> &'static str { + match self { + Self::Issue => "Issue", + Self::MergeRequest => "MergeRequest", + } + } + + pub fn parse(s: &str) -> Option { + match s { + "Issue" => Some(Self::Issue), + "MergeRequest" => Some(Self::MergeRequest), + _ => None, + } + } +} + +/// A pending discussion fetch entry. +pub struct PendingFetch { + pub project_id: i64, + pub noteable_type: NoteableType, + pub noteable_iid: i64, + pub attempt_count: i32, +} + +/// Queue a discussion fetch. ON CONFLICT resets backoff (consistent with dirty_sources). +pub fn queue_discussion_fetch( + conn: &Connection, + project_id: i64, + noteable_type: NoteableType, + noteable_iid: i64, +) -> Result<()> { + conn.execute( + "INSERT INTO pending_discussion_fetches (project_id, noteable_type, noteable_iid, queued_at) + VALUES (?1, ?2, ?3, ?4) + ON CONFLICT(project_id, noteable_type, noteable_iid) DO UPDATE SET + queued_at = excluded.queued_at, + attempt_count = 0, + last_attempt_at = NULL, + last_error = NULL, + next_attempt_at = NULL", + rusqlite::params![project_id, noteable_type.as_str(), noteable_iid, now_ms()], + )?; + Ok(()) +} + +/// Get next batch of pending fetches (WHERE next_attempt_at IS NULL OR <= now). +pub fn get_pending_fetches(conn: &Connection, limit: usize) -> Result> { + let now = now_ms(); + let mut stmt = conn.prepare( + "SELECT project_id, noteable_type, noteable_iid, attempt_count + FROM pending_discussion_fetches + WHERE next_attempt_at IS NULL OR next_attempt_at <= ?1 + ORDER BY queued_at ASC + LIMIT ?2" + )?; + let rows = stmt + .query_map(rusqlite::params![now, limit as i64], |row| { + Ok(( + row.get::<_, i64>(0)?, + row.get::<_, String>(1)?, + row.get::<_, i64>(2)?, + row.get::<_, i32>(3)?, + )) + })? + .collect::, _>>()?; + + let mut results = Vec::with_capacity(rows.len()); + for (project_id, nt_str, noteable_iid, attempt_count) in rows { + let noteable_type = NoteableType::parse(&nt_str).ok_or_else(|| { + crate::core::error::LoreError::Other(format!( + "Invalid noteable_type in pending_discussion_fetches: {}", + nt_str + )) + })?; + results.push(PendingFetch { + project_id, + noteable_type, + noteable_iid, + attempt_count, + }); + } + Ok(results) +} + +/// Mark fetch complete (remove from queue). +pub fn complete_fetch( + conn: &Connection, + project_id: i64, + noteable_type: NoteableType, + noteable_iid: i64, +) -> Result<()> { + conn.execute( + "DELETE FROM pending_discussion_fetches + WHERE project_id = ?1 AND noteable_type = ?2 AND noteable_iid = ?3", + rusqlite::params![project_id, noteable_type.as_str(), noteable_iid], + )?; + Ok(()) +} + +/// Record fetch error with backoff. +pub fn record_fetch_error( + conn: &Connection, + project_id: i64, + noteable_type: NoteableType, + noteable_iid: i64, + error: &str, +) -> Result<()> { + let now = now_ms(); + let attempt_count: i64 = conn.query_row( + "SELECT attempt_count FROM pending_discussion_fetches + WHERE project_id = ?1 AND noteable_type = ?2 AND noteable_iid = ?3", + rusqlite::params![project_id, noteable_type.as_str(), noteable_iid], + |row| row.get(0), + )?; + + let new_attempt = attempt_count + 1; + let next_at = compute_next_attempt_at(now, new_attempt); + + conn.execute( + "UPDATE pending_discussion_fetches SET + attempt_count = ?1, + last_attempt_at = ?2, + last_error = ?3, + next_attempt_at = ?4 + WHERE project_id = ?5 AND noteable_type = ?6 AND noteable_iid = ?7", + rusqlite::params![new_attempt, now, error, next_at, project_id, noteable_type.as_str(), noteable_iid], + )?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn setup_db() -> Connection { + let conn = Connection::open_in_memory().unwrap(); + conn.execute_batch(" + CREATE TABLE projects ( + id INTEGER PRIMARY KEY, + gitlab_project_id INTEGER UNIQUE NOT NULL, + path_with_namespace TEXT NOT NULL, + default_branch TEXT, + web_url TEXT, + created_at INTEGER, + updated_at INTEGER, + raw_payload_id INTEGER + ); + INSERT INTO projects (id, gitlab_project_id, path_with_namespace) VALUES (1, 100, 'group/project'); + + CREATE TABLE pending_discussion_fetches ( + project_id INTEGER NOT NULL REFERENCES projects(id), + noteable_type TEXT NOT NULL, + noteable_iid INTEGER NOT NULL, + queued_at INTEGER NOT NULL, + attempt_count INTEGER NOT NULL DEFAULT 0, + last_attempt_at INTEGER, + last_error TEXT, + next_attempt_at INTEGER, + PRIMARY KEY(project_id, noteable_type, noteable_iid) + ); + CREATE INDEX idx_pending_discussions_next_attempt ON pending_discussion_fetches(next_attempt_at); + ").unwrap(); + conn + } + + #[test] + fn test_queue_and_get() { + let conn = setup_db(); + queue_discussion_fetch(&conn, 1, NoteableType::Issue, 42).unwrap(); + + let fetches = get_pending_fetches(&conn, 100).unwrap(); + assert_eq!(fetches.len(), 1); + assert_eq!(fetches[0].project_id, 1); + assert_eq!(fetches[0].noteable_type, NoteableType::Issue); + assert_eq!(fetches[0].noteable_iid, 42); + assert_eq!(fetches[0].attempt_count, 0); + } + + #[test] + fn test_requeue_resets_backoff() { + let conn = setup_db(); + queue_discussion_fetch(&conn, 1, NoteableType::Issue, 42).unwrap(); + record_fetch_error(&conn, 1, NoteableType::Issue, 42, "network error").unwrap(); + + let attempt: i32 = conn.query_row( + "SELECT attempt_count FROM pending_discussion_fetches WHERE noteable_iid = 42", + [], |r| r.get(0), + ).unwrap(); + assert_eq!(attempt, 1); + + // Re-queue should reset + queue_discussion_fetch(&conn, 1, NoteableType::Issue, 42).unwrap(); + let attempt: i32 = conn.query_row( + "SELECT attempt_count FROM pending_discussion_fetches WHERE noteable_iid = 42", + [], |r| r.get(0), + ).unwrap(); + assert_eq!(attempt, 0); + } + + #[test] + fn test_backoff_respected() { + let conn = setup_db(); + queue_discussion_fetch(&conn, 1, NoteableType::Issue, 42).unwrap(); + conn.execute( + "UPDATE pending_discussion_fetches SET next_attempt_at = 9999999999999 WHERE noteable_iid = 42", + [], + ).unwrap(); + + let fetches = get_pending_fetches(&conn, 100).unwrap(); + assert!(fetches.is_empty()); + } + + #[test] + fn test_complete_removes() { + let conn = setup_db(); + queue_discussion_fetch(&conn, 1, NoteableType::Issue, 42).unwrap(); + complete_fetch(&conn, 1, NoteableType::Issue, 42).unwrap(); + + let count: i64 = conn.query_row( + "SELECT COUNT(*) FROM pending_discussion_fetches", [], |r| r.get(0), + ).unwrap(); + assert_eq!(count, 0); + } + + #[test] + fn test_error_increments_attempts() { + let conn = setup_db(); + queue_discussion_fetch(&conn, 1, NoteableType::MergeRequest, 10).unwrap(); + record_fetch_error(&conn, 1, NoteableType::MergeRequest, 10, "timeout").unwrap(); + + let (attempt, error): (i32, Option) = conn.query_row( + "SELECT attempt_count, last_error FROM pending_discussion_fetches WHERE noteable_iid = 10", + [], |r| Ok((r.get(0)?, r.get(1)?)), + ).unwrap(); + assert_eq!(attempt, 1); + assert_eq!(error, Some("timeout".to_string())); + + let next_at: Option = conn.query_row( + "SELECT next_attempt_at FROM pending_discussion_fetches WHERE noteable_iid = 10", + [], |r| r.get(0), + ).unwrap(); + assert!(next_at.is_some()); + } + + #[test] + fn test_noteable_type_parse() { + assert_eq!(NoteableType::parse("Issue"), Some(NoteableType::Issue)); + assert_eq!(NoteableType::parse("MergeRequest"), Some(NoteableType::MergeRequest)); + assert_eq!(NoteableType::parse("invalid"), None); + } +} diff --git a/src/ingestion/mod.rs b/src/ingestion/mod.rs index 87534c7..ead64f1 100644 --- a/src/ingestion/mod.rs +++ b/src/ingestion/mod.rs @@ -3,6 +3,8 @@ //! This module handles fetching and storing issues, discussions, and notes //! from GitLab with cursor-based incremental sync. +pub mod dirty_tracker; +pub mod discussion_queue; pub mod discussions; pub mod issues; pub mod merge_requests;