use rusqlite::Connection; use rusqlite::OptionalExtension; use tracing::{debug, instrument, warn}; use crate::core::error::Result; use crate::documents::{ DocumentData, SourceType, extract_discussion_document, extract_issue_document, extract_mr_document, }; use crate::ingestion::dirty_tracker::{clear_dirty, get_dirty_sources, record_dirty_error}; /// Result of a document regeneration run. #[derive(Debug, Default)] pub struct RegenerateResult { pub regenerated: usize, pub unchanged: usize, pub errored: usize, } /// Drain the dirty_sources queue, regenerating documents for each entry. /// /// Uses per-item error handling (fail-soft) and drains the queue completely /// via a bounded batch loop. Each dirty item is processed independently. /// /// `progress_callback` reports `(processed, estimated_total)` after each item. #[instrument( skip(conn, progress_callback), fields(items_processed, items_skipped, errors) )] pub fn regenerate_dirty_documents( conn: &Connection, progress_callback: Option<&dyn Fn(usize, usize)>, ) -> Result { let mut result = RegenerateResult::default(); // Estimated total for progress reporting. Recount each loop iteration // so the denominator grows if new items are enqueued during processing // (the queue can grow while we drain it). We use max() so the value // never shrinks — preventing the progress fraction from going backwards. let mut estimated_total: usize = 0; loop { let dirty = get_dirty_sources(conn)?; if dirty.is_empty() { break; } // Recount remaining + already-processed to get the true total. let remaining: usize = conn .query_row("SELECT COUNT(*) FROM dirty_sources", [], |row| row.get(0)) .unwrap_or(0_i64) as usize; let processed_so_far = result.regenerated + result.unchanged + result.errored; estimated_total = estimated_total.max(processed_so_far + remaining); for (source_type, source_id) in &dirty { match regenerate_one(conn, *source_type, *source_id) { Ok(changed) => { if changed { result.regenerated += 1; } else { result.unchanged += 1; } clear_dirty(conn, *source_type, *source_id)?; } Err(e) => { warn!( source_type = %source_type, source_id, error = %e, "Failed to regenerate document" ); record_dirty_error(conn, *source_type, *source_id, &e.to_string())?; result.errored += 1; } } let processed = result.regenerated + result.unchanged + result.errored; if let Some(cb) = progress_callback { cb(processed, estimated_total); } } } debug!( regenerated = result.regenerated, unchanged = result.unchanged, errored = result.errored, "Document regeneration complete" ); tracing::Span::current().record("items_processed", result.regenerated); tracing::Span::current().record("items_skipped", result.unchanged); tracing::Span::current().record("errors", result.errored); Ok(result) } /// Regenerate a single document. Returns true if content_hash changed. fn regenerate_one(conn: &Connection, source_type: SourceType, source_id: i64) -> Result { let doc = match source_type { SourceType::Issue => extract_issue_document(conn, source_id)?, SourceType::MergeRequest => extract_mr_document(conn, source_id)?, SourceType::Discussion => extract_discussion_document(conn, source_id)?, }; let Some(doc) = doc else { // Source was deleted — remove the document (cascade handles FTS/embeddings) delete_document(conn, source_type, source_id)?; return Ok(true); }; let existing_hash = get_existing_hash(conn, source_type, source_id)?; let changed = existing_hash.as_ref() != Some(&doc.content_hash); // Always upsert: labels/paths can change independently of content_hash upsert_document(conn, &doc)?; Ok(changed) } /// Get existing content hash for a document, if it exists. fn get_existing_hash( conn: &Connection, source_type: SourceType, source_id: i64, ) -> Result> { let mut stmt = conn .prepare("SELECT content_hash FROM documents WHERE source_type = ?1 AND source_id = ?2")?; let hash: Option = stmt .query_row(rusqlite::params![source_type.as_str(), source_id], |row| { row.get(0) }) .optional()?; Ok(hash) } /// Upsert a document with triple-hash write optimization. /// /// Wrapped in a SAVEPOINT to ensure atomicity of the multi-statement write /// (document row + labels + paths). Without this, a crash between statements /// could leave the document with a stale labels_hash but missing label rows. fn upsert_document(conn: &Connection, doc: &DocumentData) -> Result<()> { conn.execute_batch("SAVEPOINT upsert_doc")?; match upsert_document_inner(conn, doc) { Ok(()) => { conn.execute_batch("RELEASE upsert_doc")?; Ok(()) } Err(e) => { // ROLLBACK TO restores the savepoint but leaves it active. // RELEASE removes it so the connection is clean for the next call. let _ = conn.execute_batch("ROLLBACK TO upsert_doc; RELEASE upsert_doc"); Err(e) } } } fn upsert_document_inner(conn: &Connection, doc: &DocumentData) -> Result<()> { // Check existing hashes before writing let existing: Option<(i64, String, String, String)> = conn .query_row( "SELECT id, content_hash, labels_hash, paths_hash FROM documents WHERE source_type = ?1 AND source_id = ?2", rusqlite::params![doc.source_type.as_str(), doc.source_id], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)), ) .optional()?; // Fast path: skip ALL writes when nothing changed (prevents WAL churn) if let Some((_, ref old_content_hash, ref old_labels_hash, ref old_paths_hash)) = existing && old_content_hash == &doc.content_hash && old_labels_hash == &doc.labels_hash && old_paths_hash == &doc.paths_hash { return Ok(()); } let labels_json = serde_json::to_string(&doc.labels).unwrap_or_else(|_| "[]".to_string()); // Upsert document row conn.execute( "INSERT INTO documents (source_type, source_id, project_id, author_username, label_names, labels_hash, paths_hash, created_at, updated_at, url, title, content_text, content_hash, is_truncated, truncated_reason) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15) ON CONFLICT(source_type, source_id) DO UPDATE SET author_username = excluded.author_username, label_names = excluded.label_names, labels_hash = excluded.labels_hash, paths_hash = excluded.paths_hash, updated_at = excluded.updated_at, url = excluded.url, title = excluded.title, content_text = excluded.content_text, content_hash = excluded.content_hash, is_truncated = excluded.is_truncated, truncated_reason = excluded.truncated_reason", rusqlite::params![ doc.source_type.as_str(), doc.source_id, doc.project_id, doc.author_username, labels_json, doc.labels_hash, doc.paths_hash, doc.created_at, doc.updated_at, doc.url, doc.title, doc.content_text, doc.content_hash, doc.is_truncated as i32, doc.truncated_reason, ], )?; // Get document ID let doc_id = match existing { Some((id, _, _, _)) => id, None => get_document_id(conn, doc.source_type, doc.source_id)?, }; // Only update labels if hash changed let labels_changed = match &existing { Some((_, _, old_hash, _)) => old_hash != &doc.labels_hash, None => true, }; if labels_changed { conn.execute( "DELETE FROM document_labels WHERE document_id = ?1", [doc_id], )?; for label in &doc.labels { conn.execute( "INSERT INTO document_labels (document_id, label_name) VALUES (?1, ?2)", rusqlite::params![doc_id, label], )?; } } // Only update paths if hash changed let paths_changed = match &existing { Some((_, _, _, old_hash)) => old_hash != &doc.paths_hash, None => true, }; if paths_changed { conn.execute( "DELETE FROM document_paths WHERE document_id = ?1", [doc_id], )?; for path in &doc.paths { conn.execute( "INSERT INTO document_paths (document_id, path) VALUES (?1, ?2)", rusqlite::params![doc_id, path], )?; } } Ok(()) } /// Delete a document by source identity. fn delete_document(conn: &Connection, source_type: SourceType, source_id: i64) -> Result<()> { conn.execute( "DELETE FROM documents WHERE source_type = ?1 AND source_id = ?2", rusqlite::params![source_type.as_str(), source_id], )?; Ok(()) } /// Get document ID by source type and source ID. fn get_document_id(conn: &Connection, source_type: SourceType, source_id: i64) -> Result { let id: i64 = conn.query_row( "SELECT id FROM documents WHERE source_type = ?1 AND source_id = ?2", rusqlite::params![source_type.as_str(), source_id], |row| row.get(0), )?; Ok(id) } #[cfg(test)] mod tests { use super::*; use crate::ingestion::dirty_tracker::mark_dirty; fn setup_db() -> Connection { let conn = Connection::open_in_memory().unwrap(); conn.execute_batch(" CREATE TABLE projects ( id INTEGER PRIMARY KEY, gitlab_project_id INTEGER UNIQUE NOT NULL, path_with_namespace TEXT NOT NULL, default_branch TEXT, web_url TEXT, created_at INTEGER, updated_at INTEGER, raw_payload_id INTEGER ); INSERT INTO projects (id, gitlab_project_id, path_with_namespace) VALUES (1, 100, 'group/project'); CREATE TABLE issues ( id INTEGER PRIMARY KEY, gitlab_id INTEGER UNIQUE NOT NULL, project_id INTEGER NOT NULL REFERENCES projects(id), iid INTEGER NOT NULL, title TEXT, description TEXT, state TEXT NOT NULL, author_username TEXT, created_at INTEGER NOT NULL, updated_at INTEGER NOT NULL, last_seen_at INTEGER NOT NULL, discussions_synced_for_updated_at INTEGER, resource_events_synced_for_updated_at INTEGER, web_url TEXT, raw_payload_id INTEGER ); CREATE TABLE labels ( id INTEGER PRIMARY KEY, gitlab_id INTEGER, project_id INTEGER NOT NULL REFERENCES projects(id), name TEXT NOT NULL, color TEXT, description TEXT ); CREATE TABLE issue_labels ( issue_id INTEGER NOT NULL REFERENCES issues(id), label_id INTEGER NOT NULL REFERENCES labels(id), PRIMARY KEY(issue_id, label_id) ); CREATE TABLE documents ( id INTEGER PRIMARY KEY, source_type TEXT NOT NULL, source_id INTEGER NOT NULL, project_id INTEGER NOT NULL, author_username TEXT, label_names TEXT, created_at INTEGER, updated_at INTEGER, url TEXT, title TEXT, content_text TEXT NOT NULL, content_hash TEXT NOT NULL, labels_hash TEXT NOT NULL DEFAULT '', paths_hash TEXT NOT NULL DEFAULT '', is_truncated INTEGER NOT NULL DEFAULT 0, truncated_reason TEXT, UNIQUE(source_type, source_id) ); CREATE TABLE document_labels ( document_id INTEGER NOT NULL REFERENCES documents(id) ON DELETE CASCADE, label_name TEXT NOT NULL, PRIMARY KEY(document_id, label_name) ); CREATE TABLE document_paths ( document_id INTEGER NOT NULL REFERENCES documents(id) ON DELETE CASCADE, path TEXT NOT NULL, PRIMARY KEY(document_id, path) ); CREATE TABLE dirty_sources ( source_type TEXT NOT NULL, source_id INTEGER NOT NULL, queued_at INTEGER NOT NULL, attempt_count INTEGER NOT NULL DEFAULT 0, last_attempt_at INTEGER, last_error TEXT, next_attempt_at INTEGER, PRIMARY KEY(source_type, source_id) ); CREATE INDEX idx_dirty_sources_next_attempt ON dirty_sources(next_attempt_at); ").unwrap(); conn } #[test] fn test_regenerate_creates_document() { let conn = setup_db(); conn.execute( "INSERT INTO issues (id, gitlab_id, project_id, iid, title, description, state, author_username, created_at, updated_at, last_seen_at) VALUES (1, 10, 1, 42, 'Test Issue', 'Description here', 'opened', 'alice', 1000, 2000, 3000)", [], ).unwrap(); mark_dirty(&conn, SourceType::Issue, 1).unwrap(); let result = regenerate_dirty_documents(&conn, None).unwrap(); assert_eq!(result.regenerated, 1); assert_eq!(result.unchanged, 0); assert_eq!(result.errored, 0); // Verify document was created let count: i64 = conn .query_row("SELECT COUNT(*) FROM documents", [], |r| r.get(0)) .unwrap(); assert_eq!(count, 1); let content: String = conn .query_row("SELECT content_text FROM documents", [], |r| r.get(0)) .unwrap(); assert!(content.contains("[[Issue]] #42: Test Issue")); } #[test] fn test_regenerate_unchanged() { let conn = setup_db(); conn.execute( "INSERT INTO issues (id, gitlab_id, project_id, iid, title, description, state, author_username, created_at, updated_at, last_seen_at) VALUES (1, 10, 1, 42, 'Test', 'Desc', 'opened', 'alice', 1000, 2000, 3000)", [], ).unwrap(); // First regeneration creates the document mark_dirty(&conn, SourceType::Issue, 1).unwrap(); let r1 = regenerate_dirty_documents(&conn, None).unwrap(); assert_eq!(r1.regenerated, 1); // Second regeneration — same data, should be unchanged mark_dirty(&conn, SourceType::Issue, 1).unwrap(); let r2 = regenerate_dirty_documents(&conn, None).unwrap(); assert_eq!(r2.unchanged, 1); assert_eq!(r2.regenerated, 0); } #[test] fn test_regenerate_deleted_source() { let conn = setup_db(); conn.execute( "INSERT INTO issues (id, gitlab_id, project_id, iid, title, state, created_at, updated_at, last_seen_at) VALUES (1, 10, 1, 42, 'Test', 'opened', 1000, 2000, 3000)", [], ).unwrap(); mark_dirty(&conn, SourceType::Issue, 1).unwrap(); regenerate_dirty_documents(&conn, None).unwrap(); // Delete the issue and re-mark dirty conn.execute("PRAGMA foreign_keys = OFF", []).unwrap(); conn.execute("DELETE FROM issues WHERE id = 1", []).unwrap(); conn.execute("PRAGMA foreign_keys = ON", []).unwrap(); mark_dirty(&conn, SourceType::Issue, 1).unwrap(); let result = regenerate_dirty_documents(&conn, None).unwrap(); assert_eq!(result.regenerated, 1); // Deletion counts as "changed" let count: i64 = conn .query_row("SELECT COUNT(*) FROM documents", [], |r| r.get(0)) .unwrap(); assert_eq!(count, 0); } #[test] fn test_regenerate_drains_queue() { let conn = setup_db(); for i in 1..=10 { conn.execute( "INSERT INTO issues (id, gitlab_id, project_id, iid, title, state, created_at, updated_at, last_seen_at) VALUES (?1, ?2, 1, ?1, 'Test', 'opened', 1000, 2000, 3000)", rusqlite::params![i, i * 10], ).unwrap(); mark_dirty(&conn, SourceType::Issue, i).unwrap(); } let result = regenerate_dirty_documents(&conn, None).unwrap(); assert_eq!(result.regenerated, 10); // Queue should be empty let dirty = get_dirty_sources(&conn).unwrap(); assert!(dirty.is_empty()); } #[test] fn test_triple_hash_fast_path() { let conn = setup_db(); conn.execute( "INSERT INTO issues (id, gitlab_id, project_id, iid, title, state, created_at, updated_at, last_seen_at) VALUES (1, 10, 1, 42, 'Test', 'opened', 1000, 2000, 3000)", [], ).unwrap(); conn.execute( "INSERT INTO labels (id, project_id, name) VALUES (1, 1, 'bug')", [], ) .unwrap(); conn.execute( "INSERT INTO issue_labels (issue_id, label_id) VALUES (1, 1)", [], ) .unwrap(); // First run creates document mark_dirty(&conn, SourceType::Issue, 1).unwrap(); regenerate_dirty_documents(&conn, None).unwrap(); // Second run — triple hash match, should skip ALL writes mark_dirty(&conn, SourceType::Issue, 1).unwrap(); let result = regenerate_dirty_documents(&conn, None).unwrap(); assert_eq!(result.unchanged, 1); // Labels should still be present (not deleted and re-inserted) let label_count: i64 = conn .query_row("SELECT COUNT(*) FROM document_labels", [], |r| r.get(0)) .unwrap(); assert_eq!(label_count, 1); } }