# Checkpoint 3: Search & Sync MVP > **Status:** Planning > **Prerequisite:** Checkpoints 0, 1, 2 complete (issues, MRs, discussions ingested) > **Goal:** Deliver working semantic + lexical hybrid search with efficient incremental sync This checkpoint consolidates SPEC.md checkpoints 3A, 3B, 4, and 5 into a unified implementation plan. The work is structured for parallel agent execution where dependencies allow. All code integrates with existing `gitlore` infrastructure: - Error handling via `GiError` and `ErrorCode` in `src/core/error.rs` - CLI patterns matching `src/cli/commands/*.rs` (run functions, JSON/human output) - Database via `rusqlite::Connection` with migrations in `migrations/` - Config via `src/core/config.rs` (EmbeddingConfig already defined) - Robot mode JSON with `{"ok": true, "data": {...}}` pattern --- ## Executive Summary (Gated Milestones) This checkpoint ships in three gates to reduce integration risk. Each gate is independently verifiable and shippable: **Gate A (Lexical MVP):** documents + FTS + filters + `lore search --mode=lexical` + `lore stats` **Gate B (Hybrid MVP):** embeddings + vector + RRF fusion + graceful degradation **Gate C (Sync MVP):** `lore sync` orchestration + queues/backoff + integrity check/repair **Deliverables:** **Gate A** 1. Document generation from issues/MRs/discussions with FTS5 indexing 2. Lexical search + filters + snippets + `lore stats` **Gate B** 3. Ollama-powered embedding pipeline with sqlite-vec storage 4. Hybrid search (RRF-ranked vector + lexical) with rich filtering + graceful degradation **Gate C** 5. Orchestrated `lore sync` command with incremental doc regen + re-embedding 6. Integrity checks + repair paths for FTS/embeddings consistency **Key Design Decisions:** - Documents are the search unit (not raw entities) - FTS5 works standalone when Ollama unavailable (graceful degradation) - sqlite-vec `rowid = documents.id` for simple joins - RRF ranking avoids score normalization complexity - Queue-based discussion fetching isolates failures - FTS5 query sanitization prevents syntax errors from user input - Exponential backoff on all queues prevents hot-loop retries - Transient embed failures trigger graceful degradation (not hard errors) --- ## Phase 1: Schema Foundation ### 1.1 Documents Schema (Migration 007) **File:** `migrations/007_documents.sql` ```sql -- Unified searchable documents (derived from issues/MRs/discussions) CREATE TABLE documents ( id INTEGER PRIMARY KEY, source_type TEXT NOT NULL CHECK (source_type IN ('issue','merge_request','discussion')), source_id INTEGER NOT NULL, -- local DB id in the source table project_id INTEGER NOT NULL REFERENCES projects(id), author_username TEXT, -- for discussions: first note author label_names TEXT, -- JSON array (display/debug only) created_at INTEGER, -- ms epoch UTC updated_at INTEGER, -- ms epoch UTC url TEXT, title TEXT, -- null for discussions content_text TEXT NOT NULL, -- canonical text for embedding/search content_hash TEXT NOT NULL, -- SHA-256 for change detection labels_hash TEXT NOT NULL DEFAULT '', -- SHA-256 over sorted labels (write optimization) paths_hash TEXT NOT NULL DEFAULT '', -- SHA-256 over sorted paths (write optimization) is_truncated INTEGER NOT NULL DEFAULT 0, truncated_reason TEXT CHECK ( truncated_reason IN ('token_limit_middle_drop','single_note_oversized','first_last_oversized') OR truncated_reason IS NULL ), UNIQUE(source_type, source_id) ); CREATE INDEX idx_documents_project_updated ON documents(project_id, updated_at); CREATE INDEX idx_documents_author ON documents(author_username); CREATE INDEX idx_documents_source ON documents(source_type, source_id); CREATE INDEX idx_documents_hash ON documents(content_hash); -- Fast label filtering (indexed exact-match) CREATE TABLE document_labels ( document_id INTEGER NOT NULL REFERENCES documents(id) ON DELETE CASCADE, label_name TEXT NOT NULL, PRIMARY KEY(document_id, label_name) ) WITHOUT ROWID; CREATE INDEX idx_document_labels_label ON document_labels(label_name); -- Fast path filtering (DiffNote file paths) CREATE TABLE document_paths ( document_id INTEGER NOT NULL REFERENCES documents(id) ON DELETE CASCADE, path TEXT NOT NULL, PRIMARY KEY(document_id, path) ) WITHOUT ROWID; CREATE INDEX idx_document_paths_path ON document_paths(path); -- Queue for incremental document regeneration (with retry tracking) -- Uses next_attempt_at for index-friendly backoff queries CREATE TABLE dirty_sources ( source_type TEXT NOT NULL CHECK (source_type IN ('issue','merge_request','discussion')), source_id INTEGER NOT NULL, queued_at INTEGER NOT NULL, -- ms epoch UTC attempt_count INTEGER NOT NULL DEFAULT 0, last_attempt_at INTEGER, last_error TEXT, next_attempt_at INTEGER, -- ms epoch UTC; NULL means ready immediately PRIMARY KEY(source_type, source_id) ); CREATE INDEX idx_dirty_sources_next_attempt ON dirty_sources(next_attempt_at); -- Resumable queue for dependent discussion fetching -- Uses next_attempt_at for index-friendly backoff queries CREATE TABLE pending_discussion_fetches ( project_id INTEGER NOT NULL REFERENCES projects(id), noteable_type TEXT NOT NULL, -- 'Issue' | 'MergeRequest' noteable_iid INTEGER NOT NULL, queued_at INTEGER NOT NULL, -- ms epoch UTC attempt_count INTEGER NOT NULL DEFAULT 0, last_attempt_at INTEGER, last_error TEXT, next_attempt_at INTEGER, -- ms epoch UTC; NULL means ready immediately PRIMARY KEY(project_id, noteable_type, noteable_iid) ); CREATE INDEX idx_pending_discussions_next_attempt ON pending_discussion_fetches(next_attempt_at); ``` **Acceptance Criteria:** - [ ] Migration applies cleanly on fresh DB - [ ] Migration applies cleanly after CP2 schema - [ ] All foreign keys enforced - [ ] Indexes created - [ ] `labels_hash` and `paths_hash` columns present for write optimization - [ ] `next_attempt_at` indexed for efficient backoff queries --- ### 1.2 FTS5 Index (Migration 008) **File:** `migrations/008_fts5.sql` ```sql -- Full-text search with porter stemmer and prefix indexes for type-ahead CREATE VIRTUAL TABLE documents_fts USING fts5( title, content_text, content='documents', content_rowid='id', tokenize='porter unicode61', prefix='2 3 4' ); -- Keep FTS in sync via triggers. -- IMPORTANT: COALESCE(title, '') ensures FTS5 external-content table never -- receives NULL values, which can cause inconsistencies with delete operations. -- FTS5 delete requires exact match of original values; NULL != NULL in SQL, -- so a NULL title on insert would make the delete trigger fail silently. CREATE TRIGGER documents_ai AFTER INSERT ON documents BEGIN INSERT INTO documents_fts(rowid, title, content_text) VALUES (new.id, COALESCE(new.title, ''), new.content_text); END; CREATE TRIGGER documents_ad AFTER DELETE ON documents BEGIN INSERT INTO documents_fts(documents_fts, rowid, title, content_text) VALUES('delete', old.id, COALESCE(old.title, ''), old.content_text); END; -- Only rebuild FTS when searchable text actually changes (not metadata-only updates) CREATE TRIGGER documents_au AFTER UPDATE ON documents WHEN old.title IS NOT new.title OR old.content_text != new.content_text BEGIN INSERT INTO documents_fts(documents_fts, rowid, title, content_text) VALUES('delete', old.id, COALESCE(old.title, ''), old.content_text); INSERT INTO documents_fts(rowid, title, content_text) VALUES (new.id, COALESCE(new.title, ''), new.content_text); END; ``` **Acceptance Criteria:** - [ ] `documents_fts` created as virtual table - [ ] Triggers fire on insert/update/delete - [ ] Update trigger only fires when title or content_text changes (not metadata-only updates) - [ ] FTS row count matches documents count after bulk insert - [ ] Prefix search works for type-ahead UX --- ### 1.3 Embeddings Schema (Migration 009) **File:** `migrations/009_embeddings.sql` ```sql -- NOTE: sqlite-vec vec0 virtual tables cannot participate in FK cascades. -- We must use an explicit trigger to delete orphan embeddings when documents -- are deleted. See documents_embeddings_ad trigger below. -- sqlite-vec virtual table for vector search -- Storage rule: embeddings.rowid = documents.id CREATE VIRTUAL TABLE embeddings USING vec0( embedding float[768] ); -- Embedding provenance + change detection CREATE TABLE embedding_metadata ( document_id INTEGER PRIMARY KEY REFERENCES documents(id) ON DELETE CASCADE, model TEXT NOT NULL, -- 'nomic-embed-text' dims INTEGER NOT NULL, -- 768 content_hash TEXT NOT NULL, -- copied from documents.content_hash created_at INTEGER NOT NULL, -- ms epoch UTC last_error TEXT, -- error message from last failed attempt attempt_count INTEGER NOT NULL DEFAULT 0, last_attempt_at INTEGER -- ms epoch UTC ); CREATE INDEX idx_embedding_metadata_errors ON embedding_metadata(last_error) WHERE last_error IS NOT NULL; CREATE INDEX idx_embedding_metadata_hash ON embedding_metadata(content_hash); -- CRITICAL: Delete orphan embeddings when documents are deleted. -- vec0 virtual tables don't support FK ON DELETE CASCADE, so we need this trigger. -- embedding_metadata has ON DELETE CASCADE, so only vec0 needs explicit cleanup CREATE TRIGGER documents_embeddings_ad AFTER DELETE ON documents BEGIN DELETE FROM embeddings WHERE rowid = old.id; END; ``` **Acceptance Criteria:** - [ ] `embeddings` vec0 table created - [ ] `embedding_metadata` tracks provenance - [ ] Error tracking fields present for retry logic - [ ] Orphan cleanup trigger fires on document deletion **Dependencies:** - Requires sqlite-vec extension loaded at runtime - Extension loading already happens in `src/core/db.rs` - [ ] Migration runner must load sqlite-vec *before* applying migrations (including on fresh DB) --- ## Phase 2: Document Generation ### 2.1 Document Module Structure **New module:** `src/documents/` ``` src/documents/ ├── mod.rs # Module exports ├── extractor.rs # Document extraction from entities ├── truncation.rs # Note-boundary aware truncation └── regenerator.rs # Dirty source processing ``` **File:** `src/documents/mod.rs` ```rust //! Document generation and management. //! //! Extracts searchable documents from issues, MRs, and discussions. mod extractor; mod regenerator; mod truncation; pub use extractor::{ extract_discussion_document, extract_issue_document, extract_mr_document, DocumentData, SourceType, }; // Note: extract_*_document() return Result> // None means the source entity was deleted from the database pub use regenerator::regenerate_dirty_documents; pub use truncation::{truncate_content, TruncationResult}; ``` **Update `src/lib.rs`:** ```rust pub mod documents; // Add to existing modules ``` --- ### 2.2 Document Types **File:** `src/documents/extractor.rs` ```rust use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; /// Source type for documents. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum SourceType { Issue, MergeRequest, Discussion, } impl SourceType { pub fn as_str(&self) -> &'static str { match self { Self::Issue => "issue", Self::MergeRequest => "merge_request", Self::Discussion => "discussion", } } /// Parse from CLI input, accepting common aliases. /// /// Accepts: "issue", "mr", "merge_request", "discussion" pub fn parse(s: &str) -> Option { match s.to_lowercase().as_str() { "issue" | "issues" => Some(Self::Issue), "mr" | "mrs" | "merge_request" | "merge_requests" => Some(Self::MergeRequest), "discussion" | "discussions" => Some(Self::Discussion), _ => None, } } } impl std::fmt::Display for SourceType { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.as_str()) } } /// Generated document ready for storage. #[derive(Debug, Clone)] pub struct DocumentData { pub source_type: SourceType, pub source_id: i64, pub project_id: i64, pub author_username: Option, pub labels: Vec, pub paths: Vec, // DiffNote file paths pub labels_hash: String, // SHA-256 over sorted labels (write optimization) pub paths_hash: String, // SHA-256 over sorted paths (write optimization) pub created_at: i64, pub updated_at: i64, pub url: Option, pub title: Option, pub content_text: String, pub content_hash: String, pub is_truncated: bool, pub truncated_reason: Option, } /// Compute SHA-256 hash of content. pub fn compute_content_hash(content: &str) -> String { let mut hasher = Sha256::new(); hasher.update(content.as_bytes()); format!("{:x}", hasher.finalize()) } /// Compute SHA-256 hash over a sorted list of strings. /// Used for labels_hash and paths_hash to detect changes efficiently. pub fn compute_list_hash(items: &[String]) -> String { let mut sorted = items.to_vec(); sorted.sort(); let joined = sorted.join("\n"); compute_content_hash(&joined) } ``` **Document Formats:** All document types use consistent header format for better search relevance and context: | Source | content_text | |--------|-------------| | Issue | Structured header + description (see below) | | MR | Structured header + description (see below) | | Discussion | Full thread with header (see below) | **Issue Document Format:** ``` [[Issue]] #234: Authentication redesign Project: group/project-one URL: https://gitlab.example.com/group/project-one/-/issues/234 Labels: ["bug", "auth"] State: opened Author: @johndoe --- Description --- We need to modernize our authentication system... ``` **MR Document Format:** ``` [[MergeRequest]] !456: Implement JWT authentication Project: group/project-one URL: https://gitlab.example.com/group/project-one/-/merge_requests/456 Labels: ["feature", "auth"] State: opened Author: @johndoe Source: feature/jwt-auth -> main --- Description --- This MR implements JWT-based authentication as discussed in #234... ``` **Discussion Document Format:** ``` [[Discussion]] Issue #234: Authentication redesign Project: group/project-one URL: https://gitlab.example.com/group/project-one/-/issues/234#note_12345 Labels: ["bug", "auth"] Files: ["src/auth/login.ts"] --- Thread --- @johndoe (2024-03-15): I think we should move to JWT-based auth... @janedoe (2024-03-15): Agreed. What about refresh token strategy? ``` **Acceptance Criteria:** - [ ] Issue document: structured header with `[[Issue]]` prefix, project, URL, labels, state, author, then description - [ ] MR document: structured header with `[[MergeRequest]]` prefix, project, URL, labels, state, author, branches, then description - [ ] Discussion document: includes parent type+title, project, URL, labels, files, then thread - [ ] System notes (is_system=1) excluded from discussion content - [ ] DiffNote file paths extracted to paths vector - [ ] Labels extracted to labels vector - [ ] SHA-256 hash computed from content_text - [ ] Headers use consistent separator lines (`--- Description ---`, `--- Thread ---`) --- ### 2.3 Truncation Logic **File:** `src/documents/truncation.rs` ```rust /// Maximum content length (~8,000 tokens at 4 chars/token estimate). pub const MAX_CONTENT_CHARS: usize = 32_000; /// Truncation result with metadata. #[derive(Debug, Clone)] pub struct TruncationResult { pub content: String, pub is_truncated: bool, pub reason: Option, } /// Reason for truncation. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum TruncationReason { TokenLimitMiddleDrop, SingleNoteOversized, FirstLastOversized, } impl TruncationReason { pub fn as_str(&self) -> &'static str { match self { Self::TokenLimitMiddleDrop => "token_limit_middle_drop", Self::SingleNoteOversized => "single_note_oversized", Self::FirstLastOversized => "first_last_oversized", } } } /// Truncate content at note boundaries. /// /// Rules: /// - Max content: 32,000 characters /// - Truncate at NOTE boundaries (never mid-note) /// - Preserve first N notes and last M notes /// - Drop from middle, insert marker pub fn truncate_content(notes: &[NoteContent], max_chars: usize) -> TruncationResult { // Implementation handles edge cases per table below todo!() } /// Note content for truncation. pub struct NoteContent { pub author: String, pub date: String, pub body: String, } ``` **Edge Cases:** | Scenario | Handling | |----------|----------| | Single note > 32000 chars | Truncate at char boundary, append `[truncated]`, reason = `single_note_oversized` | | First + last note > 32000 | Keep only first note (truncated if needed), reason = `first_last_oversized` | | Only one note | Truncate at char boundary if needed | **Acceptance Criteria:** - [ ] Notes never cut mid-content - [ ] First and last notes preserved when possible - [ ] Truncation marker `\n\n[... N notes omitted for length ...]\n\n` inserted - [ ] Metadata fields set correctly - [ ] Edge cases handled per table above --- ### 2.4 CLI: `lore generate-docs` (Incremental by Default) **File:** `src/cli/commands/generate_docs.rs` ```rust //! Generate documents command - create searchable documents from entities. //! //! By default, runs incrementally (processes only dirty_sources queue). //! Use --full to regenerate all documents from scratch. use rusqlite::Connection; use serde::Serialize; use crate::core::error::Result; use crate::documents::{DocumentData, SourceType}; use crate::Config; /// Result of document generation. #[derive(Debug, Serialize)] pub struct GenerateDocsResult { pub issues: usize, pub mrs: usize, pub discussions: usize, pub total: usize, pub truncated: usize, pub skipped: usize, // Unchanged documents } /// Chunk size for --full mode dirty queue seeding. /// Balances throughput against WAL file growth and memory pressure. const FULL_MODE_CHUNK_SIZE: usize = 2000; /// Run document generation (incremental by default). /// /// IMPORTANT: Both modes use the same regenerator codepath to avoid /// logic divergence in label/path hashing, deletion semantics, and /// write-optimization behavior. The only difference is how dirty_sources /// gets populated. /// /// Incremental mode (default): /// - Processes only items already in dirty_sources queue /// - Fast for routine syncs /// /// Full mode (--full): /// - Seeds dirty_sources with ALL source entities in chunks /// - Drains through the same regenerator pipeline /// - Uses keyset pagination (WHERE id > last_id) to avoid OFFSET degradation /// - Final FTS optimize after all chunks complete /// - Use when schema changes or after migration pub fn run_generate_docs( config: &Config, full: bool, project_filter: Option<&str>, ) -> Result { let conn = open_db(config)?; if full { // Full mode: seed dirty_sources with all source entities, then drain. // Uses keyset pagination to avoid O(n²) OFFSET degradation on large tables. // // Seeding is chunked to bound WAL growth: // 1. For each source type (issues, MRs, discussions): // a. Query next chunk WHERE id > last_id ORDER BY id LIMIT chunk_size // b. INSERT OR IGNORE each into dirty_sources // c. Advance last_id = chunk.last().id // d. Loop until chunk is empty // 2. Drain dirty_sources through regenerator (same as incremental) // 3. Final FTS optimize (not full rebuild — triggers handle consistency) // // Benefits of unified codepath: // - No divergence in label/path hash behavior // - No divergence in deletion semantics // - No divergence in write-optimization logic (labels_hash, paths_hash) // - FTS triggers fire identically in both modes // Seed issues let mut last_id: i64 = 0; loop { let chunk = query_issue_ids_after(&conn, project_filter, FULL_MODE_CHUNK_SIZE, last_id)?; if chunk.is_empty() { break; } let tx = conn.transaction()?; for id in &chunk { mark_dirty(&tx, SourceType::Issue, *id)?; } tx.commit()?; last_id = *chunk.last().unwrap(); } // Similar keyset-paginated seeding for MRs and discussions... // Report: seeding complete, now regenerating } // Both modes: drain dirty_sources through the regenerator let regen = regenerate_dirty_documents(&conn)?; if full { // FTS optimize after bulk operations (compacts index segments) conn.execute( "INSERT INTO documents_fts(documents_fts) VALUES('optimize')", [], )?; } // Map regen -> GenerateDocsResult stats todo!() } /// Print human-readable output. pub fn print_generate_docs(result: &GenerateDocsResult) { println!("Document generation complete:"); println!(" Issues: {:>6} documents", result.issues); println!(" MRs: {:>6} documents", result.mrs); println!(" Discussions: {:>6} documents", result.discussions); println!(" ─────────────────────"); println!(" Total: {:>6} documents", result.total); if result.truncated > 0 { println!(" Truncated: {:>6}", result.truncated); } if result.skipped > 0 { println!(" Skipped: {:>6} (unchanged)", result.skipped); } } /// Print JSON output for robot mode. pub fn print_generate_docs_json(result: &GenerateDocsResult) { let output = serde_json::json!({ "ok": true, "data": result }); println!("{}", serde_json::to_string_pretty(&output).unwrap()); } ``` **CLI integration in `src/cli/mod.rs`:** ```rust /// Generate-docs subcommand arguments. #[derive(Args)] pub struct GenerateDocsArgs { /// Regenerate ALL documents (not just dirty queue) #[arg(long)] full: bool, /// Only generate for specific project #[arg(long)] project: Option, } ``` **Acceptance Criteria:** - [ ] Creates document for each issue - [ ] Creates document for each MR - [ ] Creates document for each discussion - [ ] Default mode processes dirty_sources queue only (incremental) - [ ] `--full` regenerates all documents from scratch - [ ] `--full` uses chunked transactions (2k docs/tx) to bound WAL growth - [ ] Final FTS rebuild after all chunks complete - [ ] Progress bar in human mode (via `indicatif`) - [ ] JSON output in robot mode --- ## Phase 3: Lexical Search ### 3.1 Search Module Structure **New module:** `src/search/` ``` src/search/ ├── mod.rs # Module exports ├── fts.rs # FTS5 search ├── vector.rs # Vector search (sqlite-vec) ├── hybrid.rs # Combined hybrid search └── filters.rs # Filter parsing and application ``` **File:** `src/search/mod.rs` ```rust //! Search functionality for documents. //! //! Supports lexical (FTS5), semantic (vector), and hybrid search. mod filters; mod fts; mod hybrid; mod rrf; mod vector; pub use filters::{SearchFilters, PathFilter, apply_filters}; pub use fts::{search_fts, to_fts_query, FtsResult, FtsQueryMode, generate_fallback_snippet, get_result_snippet}; pub use hybrid::{search_hybrid, HybridResult, SearchMode}; pub use rrf::{rank_rrf, RrfResult}; pub use vector::{search_vector, VectorResult}; ``` --- ### 3.2 FTS5 Search Function **File:** `src/search/fts.rs` ```rust use rusqlite::Connection; use crate::core::error::Result; /// FTS search result. #[derive(Debug, Clone)] pub struct FtsResult { pub document_id: i64, pub rank: f64, // BM25 score (lower = better match) pub snippet: String, // Context snippet around match } /// Generate fallback snippet for semantic-only results. /// /// When FTS snippets aren't available (semantic-only mode), this generates /// a context snippet by truncating the document content. Useful for displaying /// search results without FTS hits. /// /// Args: /// content_text: Full document content /// max_chars: Maximum snippet length (default 200) /// /// Returns a truncated string with ellipsis if truncated. pub fn generate_fallback_snippet(content_text: &str, max_chars: usize) -> String { let trimmed = content_text.trim(); if trimmed.len() <= max_chars { return trimmed.to_string(); } // Find word boundary near max_chars to avoid cutting mid-word let truncation_point = trimmed[..max_chars] .rfind(|c: char| c.is_whitespace()) .unwrap_or(max_chars); format!("{}...", &trimmed[..truncation_point]) } /// Get snippet for search result, preferring FTS when available. /// /// Priority: /// 1. FTS snippet (if document matched FTS query) /// 2. Fallback: truncated content_text pub fn get_result_snippet( fts_snippet: Option<&str>, content_text: &str, ) -> String { match fts_snippet { Some(snippet) if !snippet.is_empty() => snippet.to_string(), _ => generate_fallback_snippet(content_text, 200), } } /// FTS query parsing mode. #[derive(Debug, Clone, Copy, Default)] pub enum FtsQueryMode { /// Safe parsing (default): escapes dangerous syntax but preserves /// trailing `*` for obvious prefix queries (type-ahead UX). #[default] Safe, /// Raw mode: passes user MATCH syntax through unchanged. /// Use with caution - invalid syntax will cause FTS5 errors. Raw, } /// Convert user query to FTS5-safe MATCH expression. /// /// FTS5 MATCH syntax has special characters that cause errors if passed raw: /// - `-` (NOT operator) /// - `"` (phrase quotes) /// - `:` (column filter) /// - `*` (prefix) /// - `AND`, `OR`, `NOT` (operators) /// /// Strategy for Safe mode: /// - Wrap each whitespace-delimited token in double quotes /// - Escape internal quotes by doubling them /// - PRESERVE trailing `*` for simple prefix queries (alphanumeric tokens) /// - This forces FTS5 to treat tokens as literals while allowing type-ahead /// /// Raw mode passes the query through unchanged for power users who want /// full FTS5 syntax (phrase queries, column scopes, boolean operators). /// /// Examples (Safe mode): /// - "auth error" -> `"auth" "error"` (implicit AND) /// - "auth*" -> `"auth"*` (prefix preserved!) /// - "jwt_token*" -> `"jwt_token"*` (prefix preserved!) /// - "C++" -> `"C++"` (special chars preserved, no prefix) /// - "don't panic" -> `"don't" "panic"` (apostrophe preserved) /// - "-DWITH_SSL" -> `"-DWITH_SSL"` (leading dash neutralized) pub fn to_fts_query(raw: &str, mode: FtsQueryMode) -> String { if matches!(mode, FtsQueryMode::Raw) { return raw.trim().to_string(); } raw.split_whitespace() .map(|token| { let t = token.trim(); if t.is_empty() { return "\"\"".to_string(); } // Detect simple prefix queries: alphanumeric/underscore followed by * // e.g., "auth*", "jwt_token*", "user123*" let is_prefix = t.ends_with('*') && t.len() > 1 && t[..t.len() - 1] .chars() .all(|c| c.is_ascii_alphanumeric() || c == '_'); // Escape internal double quotes by doubling them let escaped = t.replace('"', "\"\""); if is_prefix { // Strip trailing *, quote the core, then re-add * let core = &escaped[..escaped.len() - 1]; format!("\"{}\"*", core) } else { format!("\"{}\"", escaped) } }) .collect::>() .join(" ") } /// Search documents using FTS5. /// /// Returns matching document IDs with BM25 rank scores and snippets. /// Lower rank values indicate better matches. /// Uses bm25() explicitly (not the `rank` alias) and snippet() for context. /// /// IMPORTANT: User input is sanitized via `to_fts_query()` to prevent /// FTS5 syntax errors from special characters while preserving prefix search. pub fn search_fts( conn: &Connection, query: &str, limit: usize, mode: FtsQueryMode, ) -> Result> { if query.trim().is_empty() { return Ok(Vec::new()); } let safe_query = to_fts_query(query, mode); let mut stmt = conn.prepare( "SELECT rowid, bm25(documents_fts), snippet(documents_fts, 1, '', '', '...', 64) FROM documents_fts WHERE documents_fts MATCH ? ORDER BY bm25(documents_fts) LIMIT ?" )?; let results = stmt .query_map(rusqlite::params![safe_query, limit as i64], |row| { Ok(FtsResult { document_id: row.get(0)?, rank: row.get(1)?, snippet: row.get(2)?, }) })? .collect::, _>>()?; Ok(results) } ``` **Acceptance Criteria:** - [ ] Returns matching document IDs with BM25 rank - [ ] Porter stemming works (search/searching match) - [ ] Prefix search works (type-ahead UX): `auth*` returns results starting with "auth" - [ ] Empty query returns empty results - [ ] Nonsense query returns empty results - [ ] Special characters in query don't cause FTS5 syntax errors (`-`, `"`, `:`, `*`) - [ ] Query `"-DWITH_SSL"` returns results (not treated as NOT operator) - [ ] Query `C++` returns results (special chars preserved) - [ ] Safe mode preserves trailing `*` on alphanumeric tokens - [ ] Raw mode (`--fts-mode=raw`) passes query through unchanged --- ### 3.3 Search Filters **File:** `src/search/filters.rs` ```rust use rusqlite::Connection; use crate::core::error::Result; use crate::documents::SourceType; /// Maximum allowed limit for search results. const MAX_SEARCH_LIMIT: usize = 100; /// Default limit for search results. const DEFAULT_SEARCH_LIMIT: usize = 20; /// Search filters applied post-retrieval. #[derive(Debug, Clone, Default)] pub struct SearchFilters { pub source_type: Option, pub author: Option, pub project_id: Option, pub after: Option, // ms epoch (created_at >=) pub updated_after: Option, // ms epoch (updated_at >=) pub labels: Vec, // AND logic pub path: Option, pub limit: usize, // Default 20, max 100 } impl SearchFilters { /// Check if any filter is set (used for adaptive recall). pub fn has_any_filter(&self) -> bool { self.source_type.is_some() || self.author.is_some() || self.project_id.is_some() || self.after.is_some() || self.updated_after.is_some() || !self.labels.is_empty() || self.path.is_some() } /// Clamp limit to valid range [1, MAX_SEARCH_LIMIT]. pub fn clamp_limit(&self) -> usize { if self.limit == 0 { DEFAULT_SEARCH_LIMIT } else { self.limit.min(MAX_SEARCH_LIMIT) } } } /// Path filter with prefix or exact match. #[derive(Debug, Clone)] pub enum PathFilter { Prefix(String), // Trailing `/` -> LIKE 'path/%' Exact(String), // No trailing `/` -> = 'path' } impl PathFilter { pub fn from_str(s: &str) -> Self { if s.ends_with('/') { Self::Prefix(s.to_string()) } else { Self::Exact(s.to_string()) } } } /// Apply filters to document IDs, returning filtered set. /// /// IMPORTANT: Preserves ranking order from input document_ids. /// Filters must not reorder results - maintain the RRF/search ranking. /// /// Uses JSON1 extension for efficient ordered ID passing: /// - Passes document_ids as JSON array: `[1,2,3,...]` /// - Uses `json_each()` to expand into rows with `key` as position /// - JOINs with documents table and applies filters /// - Orders by original position to preserve ranking pub fn apply_filters( conn: &Connection, document_ids: &[i64], filters: &SearchFilters, ) -> Result> { if document_ids.is_empty() { return Ok(Vec::new()); } // Build JSON array of document IDs let ids_json = serde_json::to_string(document_ids)?; // Build dynamic WHERE clauses let mut conditions: Vec = Vec::new(); let mut params: Vec> = Vec::new(); // Always bind the JSON array first params.push(Box::new(ids_json)); if let Some(ref source_type) = filters.source_type { conditions.push("d.source_type = ?".into()); params.push(Box::new(source_type.as_str().to_string())); } if let Some(ref author) = filters.author { conditions.push("d.author_username = ?".into()); params.push(Box::new(author.clone())); } if let Some(project_id) = filters.project_id { conditions.push("d.project_id = ?".into()); params.push(Box::new(project_id)); } if let Some(after) = filters.after { conditions.push("d.created_at >= ?".into()); params.push(Box::new(after)); } if let Some(updated_after) = filters.updated_after { conditions.push("d.updated_at >= ?".into()); params.push(Box::new(updated_after)); } // Labels: AND logic - all labels must be present for label in &filters.labels { conditions.push( "EXISTS (SELECT 1 FROM document_labels dl WHERE dl.document_id = d.id AND dl.label_name = ?)".into() ); params.push(Box::new(label.clone())); } // Path filter if let Some(ref path_filter) = filters.path { match path_filter { PathFilter::Exact(path) => { conditions.push( "EXISTS (SELECT 1 FROM document_paths dp WHERE dp.document_id = d.id AND dp.path = ?)".into() ); params.push(Box::new(path.clone())); } PathFilter::Prefix(prefix) => { // IMPORTANT: Must use ESCAPE clause for backslash escaping to work in SQLite LIKE conditions.push( "EXISTS (SELECT 1 FROM document_paths dp WHERE dp.document_id = d.id AND dp.path LIKE ? ESCAPE '\\')".into() ); // Escape LIKE wildcards and add trailing % let like_pattern = format!( "{}%", prefix.replace('%', "\\%").replace('_', "\\_") ); params.push(Box::new(like_pattern)); } } } let where_clause = if conditions.is_empty() { String::new() } else { format!("AND {}", conditions.join(" AND ")) }; let limit = filters.clamp_limit(); // SQL using JSON1 for ordered ID passing // json_each() returns rows with `key` (0-indexed position) and `value` (the ID) let sql = format!( r#" SELECT d.id FROM json_each(?) AS j JOIN documents d ON d.id = j.value WHERE 1=1 {} ORDER BY j.key LIMIT ? "#, where_clause ); params.push(Box::new(limit as i64)); let mut stmt = conn.prepare(&sql)?; let params_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect(); let results = stmt .query_map(params_refs.as_slice(), |row| row.get(0))? .collect::, _>>()?; Ok(results) } ``` **Supported filters:** | Filter | SQL Column | Notes | |--------|-----------|-------| | `--type` | `source_type` | `issue`, `mr`, `discussion` | | `--author` | `author_username` | Exact match | | `--project` | `project_id` | Resolve path to ID | | `--after` | `created_at` | `>= date` (ms epoch) | | `--updated-after` | `updated_at` | `>= date` (ms epoch), common triage filter | | `--label` | `document_labels` | JOIN, multiple = AND | | `--path` | `document_paths` | JOIN, trailing `/` = prefix | | `--limit` | N/A | Default 20, max 100 | **Acceptance Criteria:** - [ ] Each filter correctly restricts results - [ ] Multiple `--label` flags use AND logic - [ ] Path prefix vs exact match works correctly - [ ] `--updated-after` filters on updated_at (not created_at) - [ ] Filters compose (all applied together) - [ ] Ranking order preserved after filtering (ORDER BY position) - [ ] Limit clamped to valid range [1, 100] - [ ] Default limit is 20 when not specified - [ ] JSON1 `json_each()` correctly expands document IDs --- ### 3.4 CLI: `lore search --mode=lexical` **File:** `src/cli/commands/search.rs` ```rust //! Search command - find documents using lexical, semantic, or hybrid search. use console::style; use serde::Serialize; use crate::core::error::Result; use crate::core::time::ms_to_iso; use crate::search::{SearchFilters, SearchMode, search_fts, search_vector, rank_rrf, RrfResult}; use crate::Config; /// Search result for display. #[derive(Debug, Serialize)] pub struct SearchResultDisplay { pub document_id: i64, pub source_type: String, pub title: Option, pub url: Option, pub project_path: String, pub author: Option, pub created_at: String, // ISO format pub updated_at: String, // ISO format pub score: f64, // Normalized 0-1 pub snippet: String, // Context around match pub labels: Vec, #[serde(skip_serializing_if = "Option::is_none")] pub explain: Option, } /// Ranking explanation for --explain flag. #[derive(Debug, Serialize)] pub struct ExplainData { pub vector_rank: Option, pub fts_rank: Option, pub rrf_score: f64, } /// Search results response. #[derive(Debug, Serialize)] pub struct SearchResponse { pub query: String, pub mode: String, pub total_results: usize, pub results: Vec, #[serde(skip_serializing_if = "Vec::is_empty")] pub warnings: Vec, } /// Run search command. pub fn run_search( config: &Config, query: &str, mode: SearchMode, filters: SearchFilters, explain: bool, ) -> Result { // 1. Parse query and filters // 2. Execute search based on mode -> ranked doc_ids (+ explain ranks) // 3. Apply post-retrieval filters preserving ranking order // 4. HYDRATE in one DB round-trip (see hydration query below): // - documents fields (title, url, created_at, updated_at, content_text) // - project_path via JOIN projects // - labels aggregated via json_group_array // - paths aggregated via json_group_array (optional) // 5. Attach snippet: // - prefer FTS snippet when doc hit FTS // - fallback: truncated content_text via generate_fallback_snippet() // 6. For --mode=semantic with 0% embedding coverage: // return early with actionable error message (distinct from "Ollama down") todo!() } /// Hydration query: fetch all display fields for ranked doc IDs in a single round-trip. /// /// Uses json_each(?) to preserve ranking order from the search pipeline. /// Aggregates labels and paths inline to avoid N+1 queries. /// /// ```sql /// SELECT d.id, d.source_type, d.title, d.url, d.author_username, /// d.created_at, d.updated_at, d.content_text, /// p.path AS project_path, /// (SELECT json_group_array(dl.label_name) /// FROM document_labels dl WHERE dl.document_id = d.id) AS labels, /// (SELECT json_group_array(dp.path) /// FROM document_paths dp WHERE dp.document_id = d.id) AS paths /// FROM json_each(?) AS j /// JOIN documents d ON d.id = j.value /// JOIN projects p ON p.id = d.project_id /// ORDER BY j.key /// ``` /// /// This single query replaces what would otherwise be: /// - 1 query per document for metadata /// - 1 query per document for labels /// - 1 query per document for paths /// For 20 results, that's 60 queries reduced to 1. fn hydrate_results( conn: &Connection, doc_ids: &[i64], ) -> Result> { todo!() } /// Print human-readable search results. pub fn print_search_results(response: &SearchResponse, explain: bool) { println!( "Found {} results ({} search)\n", response.total_results, response.mode ); for (i, result) in response.results.iter().enumerate() { let type_prefix = match result.source_type.as_str() { "merge_request" => "MR", "issue" => "Issue", "discussion" => "Discussion", _ => &result.source_type, }; let title = result.title.as_deref().unwrap_or("(untitled)"); println!( "[{}] {} - {} ({})", i + 1, style(type_prefix).cyan(), title, format!("{:.2}", result.score) ); if explain { if let Some(exp) = &result.explain { let vec_str = exp.vector_rank.map(|r| format!("#{}", r)).unwrap_or_else(|| "-".into()); let fts_str = exp.fts_rank.map(|r| format!("#{}", r)).unwrap_or_else(|| "-".into()); println!( " Vector: {}, FTS: {}, RRF: {:.4}", vec_str, fts_str, exp.rrf_score ); } } if let Some(author) = &result.author { println!( " @{} · {} · {}", author, &result.created_at[..10], result.project_path ); } println!(" \"{}...\"", &result.snippet); if let Some(url) = &result.url { println!(" {}", style(url).dim()); } println!(); } } /// Print JSON search results for robot mode. pub fn print_search_results_json(response: &SearchResponse, elapsed_ms: u64) { let output = serde_json::json!({ "ok": true, "data": response, "meta": { "elapsed_ms": elapsed_ms } }); println!("{}", serde_json::to_string_pretty(&output).unwrap()); } ``` **CLI integration in `src/cli/mod.rs`:** ```rust /// Search subcommand arguments. #[derive(Args)] pub struct SearchArgs { /// Search query query: String, /// Search mode #[arg(long, default_value = "hybrid")] mode: String, // "hybrid" | "lexical" | "semantic" /// Filter by source type #[arg(long, value_name = "TYPE")] r#type: Option, /// Filter by author username #[arg(long)] author: Option, /// Filter by project path #[arg(long)] project: Option, /// Filter by creation date (after) #[arg(long)] after: Option, /// Filter by updated date (recently active items) #[arg(long)] updated_after: Option, /// Filter by label (can specify multiple) #[arg(long, action = clap::ArgAction::Append)] label: Vec, /// Filter by file path #[arg(long)] path: Option, /// Maximum results #[arg(long, default_value = "20")] limit: usize, /// Show ranking breakdown #[arg(long)] explain: bool, /// FTS query mode: "safe" (default) or "raw" /// - safe: Escapes special chars but preserves `*` for prefix queries /// - raw: Pass FTS5 MATCH syntax through unchanged (advanced) #[arg(long, default_value = "safe")] fts_mode: String, // "safe" | "raw" } ``` **Acceptance Criteria:** - [ ] Works without Ollama running - [ ] All filters functional (including `--updated-after`) - [ ] Human-readable output with snippets - [ ] Semantic-only results get fallback snippets from content_text - [ ] Results hydrated in single DB round-trip (no N+1 queries) - [ ] JSON output matches schema - [ ] Empty results show helpful message - [ ] "No data indexed" message if documents table empty - [ ] `--mode=semantic` with 0% embedding coverage returns actionable error (distinct from "Ollama unavailable" — tells user to run `lore embed` first) - [ ] `--fts-mode=safe` (default) preserves prefix `*` while escaping special chars - [ ] `--fts-mode=raw` passes FTS5 MATCH syntax through unchanged --- ## Phase 4: Embedding Pipeline ### 4.1 Embedding Module Structure **New module:** `src/embedding/` ``` src/embedding/ ├── mod.rs # Module exports ├── ollama.rs # Ollama API client ├── pipeline.rs # Batch embedding orchestration └── change_detector.rs # Detect documents needing re-embedding ``` **File:** `src/embedding/mod.rs` ```rust //! Embedding generation and storage. //! //! Uses Ollama for embedding generation and sqlite-vec for storage. mod change_detector; mod ollama; mod pipeline; pub use change_detector::detect_embedding_changes; pub use ollama::{OllamaClient, OllamaConfig, check_ollama_health}; pub use pipeline::{embed_documents, EmbedResult}; ``` --- ### 4.2 Ollama Client **File:** `src/embedding/ollama.rs` ```rust use reqwest::Client; use serde::{Deserialize, Serialize}; use crate::core::error::{GiError, Result}; /// Ollama client configuration. #[derive(Debug, Clone)] pub struct OllamaConfig { pub base_url: String, // "http://localhost:11434" pub model: String, // "nomic-embed-text" pub timeout_secs: u64, // Request timeout } impl Default for OllamaConfig { fn default() -> Self { Self { base_url: "http://localhost:11434".into(), model: "nomic-embed-text".into(), timeout_secs: 60, } } } /// Ollama API client. pub struct OllamaClient { client: Client, config: OllamaConfig, } /// Batch embed request. #[derive(Serialize)] struct EmbedRequest { model: String, input: Vec, } /// Batch embed response. #[derive(Deserialize)] struct EmbedResponse { model: String, embeddings: Vec>, } /// Model info from /api/tags. #[derive(Deserialize)] struct TagsResponse { models: Vec, } #[derive(Deserialize)] struct ModelInfo { name: String, } impl OllamaClient { pub fn new(config: OllamaConfig) -> Self { let client = Client::builder() .timeout(std::time::Duration::from_secs(config.timeout_secs)) .build() .expect("Failed to create HTTP client"); Self { client, config } } /// Check if Ollama is available and model is loaded. pub async fn health_check(&self) -> Result<()> { let url = format!("{}/api/tags", self.config.base_url); let response = self.client.get(&url).send().await.map_err(|e| { GiError::OllamaUnavailable { base_url: self.config.base_url.clone(), source: Some(e), } })?; let tags: TagsResponse = response.json().await?; let model_available = tags.models.iter().any(|m| m.name.starts_with(&self.config.model)); if !model_available { return Err(GiError::OllamaModelNotFound { model: self.config.model.clone(), }); } Ok(()) } /// Generate embeddings for a batch of texts. /// /// Returns 768-dimensional vectors for each input text. pub async fn embed_batch(&self, texts: Vec) -> Result>> { let url = format!("{}/api/embed", self.config.base_url); let request = EmbedRequest { model: self.config.model.clone(), input: texts, }; let response = self.client .post(&url) .json(&request) .send() .await .map_err(|e| GiError::OllamaUnavailable { base_url: self.config.base_url.clone(), source: Some(e), })?; if !response.status().is_success() { let status = response.status(); let body = response.text().await.unwrap_or_default(); return Err(GiError::EmbeddingFailed { document_id: 0, // Batch failure reason: format!("HTTP {}: {}", status, body), }); } let embed_response: EmbedResponse = response.json().await?; Ok(embed_response.embeddings) } } /// Quick health check without full client. pub async fn check_ollama_health(base_url: &str) -> bool { let client = Client::new(); client .get(format!("{}/api/tags", base_url)) .send() .await .is_ok() } ``` **Endpoints:** | Endpoint | Purpose | |----------|---------| | `GET /api/tags` | Health check, verify model available | | `POST /api/embed` | Batch embedding (preferred) | **Acceptance Criteria:** - [ ] Health check detects Ollama availability - [ ] Batch embedding works with up to 32 texts - [ ] Clear error messages for common failures --- ### 4.3 Error Handling Extensions **File:** `src/core/error.rs` (extend existing) Add to `ErrorCode`: ```rust pub enum ErrorCode { // ... existing variants ... InvalidEnumValue, OllamaUnavailable, OllamaModelNotFound, EmbeddingFailed, } impl ErrorCode { pub fn exit_code(&self) -> i32 { match self { // ... existing mappings ... Self::InvalidEnumValue => 13, Self::OllamaUnavailable => 14, Self::OllamaModelNotFound => 15, Self::EmbeddingFailed => 16, } } } impl std::fmt::Display for ErrorCode { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let code = match self { // ... existing mappings ... Self::InvalidEnumValue => "INVALID_ENUM_VALUE", Self::OllamaUnavailable => "OLLAMA_UNAVAILABLE", Self::OllamaModelNotFound => "OLLAMA_MODEL_NOT_FOUND", Self::EmbeddingFailed => "EMBEDDING_FAILED", }; write!(f, "{code}") } } ``` Add to `GiError`: ```rust pub enum GiError { // ... existing variants ... #[error("Cannot connect to Ollama at {base_url}. Is it running?")] OllamaUnavailable { base_url: String, #[source] source: Option, }, #[error("Ollama model '{model}' not found. Run: ollama pull {model}")] OllamaModelNotFound { model: String }, #[error("Embedding failed for document {document_id}: {reason}")] EmbeddingFailed { document_id: i64, reason: String }, } impl GiError { pub fn code(&self) -> ErrorCode { match self { // ... existing mappings ... Self::OllamaUnavailable { .. } => ErrorCode::OllamaUnavailable, Self::OllamaModelNotFound { .. } => ErrorCode::OllamaModelNotFound, Self::EmbeddingFailed { .. } => ErrorCode::EmbeddingFailed, } } pub fn suggestion(&self) -> Option<&'static str> { match self { // ... existing mappings ... Self::OllamaUnavailable { .. } => Some("Start Ollama: ollama serve"), Self::OllamaModelNotFound { model } => Some("Pull the model: ollama pull nomic-embed-text"), Self::EmbeddingFailed { .. } => Some("Check Ollama logs or retry with 'lore embed --retry-failed'"), } } } ``` --- ### 4.4 Embedding Pipeline **File:** `src/embedding/pipeline.rs` ```rust use indicatif::{ProgressBar, ProgressStyle}; use rusqlite::Connection; use crate::core::error::Result; use crate::embedding::OllamaClient; /// Batch size for embedding requests. const BATCH_SIZE: usize = 32; /// SQLite page size for paging through pending documents. /// Uses keyset paging (id > last_id) to avoid rescanning previously-processed rows. const DB_PAGE_SIZE: usize = 500; /// Expected embedding dimensions for nomic-embed-text model. /// IMPORTANT: Validates against this to prevent silent corruption. const EXPECTED_DIMS: usize = 768; /// Which documents to embed. #[derive(Debug, Clone, Copy)] pub enum EmbedSelection { /// New or changed documents (default). Pending, /// Only previously failed documents. RetryFailed, } /// Result of embedding run. #[derive(Debug, Default)] pub struct EmbedResult { pub embedded: usize, pub failed: usize, pub skipped: usize, } /// Embed documents that need embedding. /// /// Process: /// 1. Select documents needing embeddings: /// - Pending: missing embedding_metadata row OR content_hash mismatch /// - RetryFailed: embedding_metadata.last_error IS NOT NULL /// 2. Page through candidates using keyset pagination (id > last_id) /// to avoid rescanning already-processed rows /// 3. Batch texts -> Ollama `/api/embed` with concurrent HTTP requests /// 4. Write embeddings + embedding_metadata in per-batch transactions /// 5. Failed batches record `last_error` in embedding_metadata /// (excluded from Pending selection; retried via RetryFailed) /// 6. Progress reported as (embedded + failed) vs total_pending pub async fn embed_documents( conn: &Connection, client: &OllamaClient, selection: EmbedSelection, concurrency: usize, progress_callback: Option>, ) -> Result { use futures::stream::{FuturesUnordered, StreamExt}; let mut result = EmbedResult::default(); let total_pending = count_pending_documents(conn, selection)?; if total_pending == 0 { return Ok(result); } // Page through pending documents using keyset pagination to avoid // both memory pressure and OFFSET performance degradation. let mut last_id: i64 = 0; loop { let pending = find_pending_documents(conn, DB_PAGE_SIZE, last_id, selection)?; if pending.is_empty() { break; } // Launch concurrent HTTP requests, collect results let mut futures = FuturesUnordered::new(); for batch in pending.chunks(BATCH_SIZE) { let texts: Vec = batch.iter().map(|d| d.content.clone()).collect(); let batch_meta: Vec<(i64, String)> = batch .iter() .map(|d| (d.id, d.content_hash.clone())) .collect(); futures.push(async move { let embed_result = client.embed_batch(texts).await; (batch_meta, embed_result) }); // Cap in-flight requests if futures.len() >= concurrency { if let Some((meta, res)) = futures.next().await { collect_writes(conn, &meta, res, &mut result)?; } } } // Drain remaining futures while let Some((meta, res)) = futures.next().await { collect_writes(conn, &meta, res, &mut result)?; } // Advance keyset cursor for next page if let Some(last) = pending.last() { last_id = last.id; } if let Some(ref cb) = progress_callback { cb(result.embedded + result.failed, total_pending); } } Ok(result) } /// Collect embedding results and write to DB (sequential, on main thread). /// /// IMPORTANT: Validates embedding dimensions to prevent silent corruption. /// If model returns wrong dimensions (e.g., different model configured), /// the document is marked as failed rather than storing corrupt data. fn collect_writes( conn: &Connection, batch_meta: &[(i64, String)], embed_result: Result>>, result: &mut EmbedResult, ) -> Result<()> { let tx = conn.transaction()?; match embed_result { Ok(embeddings) => { for ((doc_id, hash), embedding) in batch_meta.iter().zip(embeddings.iter()) { // Validate dimensions to prevent silent corruption if embedding.len() != EXPECTED_DIMS { record_embedding_error( &tx, *doc_id, hash, &format!( "embedding dimension mismatch: got {}, expected {}", embedding.len(), EXPECTED_DIMS ), )?; result.failed += 1; continue; } store_embedding(&tx, *doc_id, embedding, hash)?; result.embedded += 1; } } Err(e) => { for (doc_id, hash) in batch_meta { record_embedding_error(&tx, *doc_id, hash, &e.to_string())?; result.failed += 1; } } } tx.commit()?; Ok(()) } struct PendingDocument { id: i64, content: String, content_hash: String, } /// Count total pending documents (for progress reporting). fn count_pending_documents(conn: &Connection, selection: EmbedSelection) -> Result { let sql = match selection { EmbedSelection::Pending => "SELECT COUNT(*) FROM documents d LEFT JOIN embedding_metadata em ON d.id = em.document_id WHERE em.document_id IS NULL OR em.content_hash != d.content_hash", EmbedSelection::RetryFailed => "SELECT COUNT(*) FROM documents d JOIN embedding_metadata em ON d.id = em.document_id WHERE em.last_error IS NOT NULL", }; let count: usize = conn.query_row(sql, [], |row| row.get(0))?; Ok(count) } /// Find pending documents for embedding using keyset pagination. /// /// IMPORTANT: Uses keyset pagination (d.id > last_id) instead of OFFSET. /// OFFSET degrades O(n²) on large result sets because SQLite must scan /// and discard all rows before the offset. Keyset pagination is O(1) per page /// since the index seek goes directly to the starting row. fn find_pending_documents( conn: &Connection, limit: usize, last_id: i64, selection: EmbedSelection, ) -> Result> { let sql = match selection { EmbedSelection::Pending => "SELECT d.id, d.content_text, d.content_hash FROM documents d LEFT JOIN embedding_metadata em ON d.id = em.document_id WHERE (em.document_id IS NULL OR em.content_hash != d.content_hash) AND d.id > ? ORDER BY d.id LIMIT ?", EmbedSelection::RetryFailed => "SELECT d.id, d.content_text, d.content_hash FROM documents d JOIN embedding_metadata em ON d.id = em.document_id WHERE em.last_error IS NOT NULL AND d.id > ? ORDER BY d.id LIMIT ?", }; let mut stmt = conn.prepare(sql)?; let docs = stmt .query_map(rusqlite::params![last_id, limit as i64], |row| { Ok(PendingDocument { id: row.get(0)?, content: row.get(1)?, content_hash: row.get(2)?, }) })? .collect::, _>>()?; Ok(docs) } fn store_embedding( tx: &rusqlite::Transaction, document_id: i64, embedding: &[f32], content_hash: &str, ) -> Result<()> { // Convert embedding to bytes for sqlite-vec // sqlite-vec expects raw little-endian bytes, not the array directly let embedding_bytes: Vec = embedding .iter() .flat_map(|f| f.to_le_bytes()) .collect(); // Store in sqlite-vec (rowid = document_id) tx.execute( "INSERT OR REPLACE INTO embeddings(rowid, embedding) VALUES (?, ?)", rusqlite::params![document_id, embedding_bytes], )?; // Update metadata let now = crate::core::time::now_ms(); tx.execute( "INSERT OR REPLACE INTO embedding_metadata (document_id, model, dims, content_hash, created_at, last_error, attempt_count, last_attempt_at) VALUES (?, 'nomic-embed-text', 768, ?, ?, NULL, 0, ?)", rusqlite::params![document_id, content_hash, now, now], )?; Ok(()) } fn record_embedding_error( tx: &rusqlite::Transaction, document_id: i64, content_hash: &str, error: &str, ) -> Result<()> { let now = crate::core::time::now_ms(); tx.execute( "INSERT INTO embedding_metadata (document_id, model, dims, content_hash, created_at, last_error, attempt_count, last_attempt_at) VALUES (?, 'nomic-embed-text', 768, ?, ?, ?, 1, ?) ON CONFLICT(document_id) DO UPDATE SET last_error = excluded.last_error, attempt_count = attempt_count + 1, last_attempt_at = excluded.last_attempt_at", rusqlite::params![document_id, content_hash, now, error, now], )?; Ok(()) } ``` **Acceptance Criteria:** - [ ] New documents get embedded - [ ] Changed documents (hash mismatch) get re-embedded - [ ] Unchanged documents skipped - [ ] Failures recorded in `embedding_metadata.last_error` - [ ] Failures record actual content_hash (not empty string) - [ ] Writes batched in transactions for performance - [ ] Concurrency parameter respected - [ ] Progress reported during embedding - [ ] Deterministic `ORDER BY d.id` ensures consistent paging - [ ] `EmbedSelection` parameter controls pending vs retry-failed mode --- ### 4.5 CLI: `lore embed` **File:** `src/cli/commands/embed.rs` ```rust //! Embed command - generate embeddings for documents. use indicatif::{ProgressBar, ProgressStyle}; use serde::Serialize; use crate::core::error::Result; use crate::embedding::{embed_documents, EmbedResult, OllamaClient, OllamaConfig}; use crate::Config; /// Run embedding command. pub async fn run_embed( config: &Config, retry_failed: bool, ) -> Result { use crate::core::db::open_database; use crate::embedding::pipeline::EmbedSelection; let ollama_config = OllamaConfig { base_url: config.embedding.base_url.clone(), model: config.embedding.model.clone(), timeout_secs: 120, }; let client = OllamaClient::new(ollama_config); // Health check client.health_check().await?; // Open database connection let conn = open_database(config)?; // Determine selection mode let selection = if retry_failed { EmbedSelection::RetryFailed } else { EmbedSelection::Pending }; // Run embedding let result = embed_documents( &conn, &client, selection, config.embedding.concurrency as usize, None, ).await?; Ok(result) } /// Print human-readable output. pub fn print_embed(result: &EmbedResult, elapsed_secs: u64) { println!("Embedding complete:"); println!(" Embedded: {:>6} documents", result.embedded); println!(" Failed: {:>6} documents", result.failed); println!(" Skipped: {:>6} documents", result.skipped); println!(" Elapsed: {}m {}s", elapsed_secs / 60, elapsed_secs % 60); } /// Print JSON output for robot mode. pub fn print_embed_json(result: &EmbedResult, elapsed_ms: u64) { let output = serde_json::json!({ "ok": true, "data": { "embedded": result.embedded, "failed": result.failed, "skipped": result.skipped }, "meta": { "elapsed_ms": elapsed_ms } }); println!("{}", serde_json::to_string_pretty(&output).unwrap()); } ``` **CLI integration:** ```rust /// Embed subcommand arguments. #[derive(Args)] pub struct EmbedArgs { /// Retry only previously failed documents #[arg(long)] retry_failed: bool, } ``` **Acceptance Criteria:** - [ ] Embeds documents without embeddings - [ ] Re-embeds documents with changed hash - [ ] `--retry-failed` only processes failed documents - [ ] Progress bar with count - [ ] Clear error if Ollama unavailable --- ### 4.6 CLI: `lore stats` **File:** `src/cli/commands/stats.rs` ```rust //! Stats command - display document and embedding statistics. use rusqlite::Connection; use serde::Serialize; use crate::core::error::Result; use crate::Config; /// Document statistics. #[derive(Debug, Serialize)] pub struct Stats { pub documents: DocumentStats, pub embeddings: EmbeddingStats, pub fts: FtsStats, pub queues: QueueStats, } #[derive(Debug, Serialize)] pub struct DocumentStats { pub issues: usize, pub mrs: usize, pub discussions: usize, pub total: usize, pub truncated: usize, } #[derive(Debug, Serialize)] pub struct EmbeddingStats { pub embedded: usize, pub pending: usize, pub failed: usize, pub coverage_pct: f64, } #[derive(Debug, Serialize)] pub struct FtsStats { pub indexed: usize, } /// Queue statistics for observability. /// /// Exposes internal queue depths so operators can detect backlogs /// and failing items that need manual intervention. #[derive(Debug, Serialize)] pub struct QueueStats { /// Items in dirty_sources queue (pending document regeneration) pub dirty_sources: usize, /// Items in dirty_sources with last_error set (failing regeneration) pub dirty_sources_failed: usize, /// Items in pending_discussion_fetches queue pub pending_discussion_fetches: usize, /// Items in pending_discussion_fetches with last_error set pub pending_discussion_fetches_failed: usize, } /// Integrity check result. #[derive(Debug, Serialize)] pub struct IntegrityCheck { pub documents_count: usize, pub fts_count: usize, pub embeddings_count: usize, pub metadata_count: usize, pub orphaned_embeddings: usize, pub hash_mismatches: usize, pub ok: bool, } /// Run stats command. pub fn run_stats(config: &Config) -> Result { // Query counts from database todo!() } /// Run integrity check (--check flag). /// /// Verifies: /// - documents count == documents_fts count /// - embeddings.rowid all exist in documents.id /// - embedding_metadata.content_hash == documents.content_hash pub fn run_integrity_check(config: &Config) -> Result { // 1. Count documents // 2. Count FTS entries // 3. Find orphaned embeddings (no matching document) // 4. Find hash mismatches between embedding_metadata and documents // 5. Return check results todo!() } /// Repair result from --repair flag. #[derive(Debug, Serialize)] pub struct RepairResult { pub orphaned_embeddings_deleted: usize, pub stale_embeddings_cleared: usize, pub missing_fts_repopulated: usize, } /// Repair issues found by integrity check (--repair flag). /// /// Fixes: /// - Deletes orphaned embeddings (embedding_metadata rows with no matching document) /// - Clears stale embedding_metadata (hash mismatch) so they get re-embedded /// - Rebuilds FTS index from scratch (correct-by-construction) /// /// NOTE: FTS repair uses `rebuild` rather than partial row insertion. /// With `content='documents'` (external-content FTS), partial repopulation /// via INSERT of missing rows is fragile — if the external content table /// and FTS content diverge in any way, partial fixes can leave the index /// in an inconsistent state. A full rebuild is slower but guaranteed correct. pub fn run_repair(config: &Config) -> Result { let conn = open_db(config)?; // Delete orphaned embeddings (no matching document) let orphaned_deleted = conn.execute( "DELETE FROM embedding_metadata WHERE document_id NOT IN (SELECT id FROM documents)", [], )?; // Also delete from embeddings virtual table (sqlite-vec) conn.execute( "DELETE FROM embeddings WHERE rowid NOT IN (SELECT id FROM documents)", [], )?; // Clear stale embedding_metadata (hash mismatch) - will be re-embedded let stale_cleared = conn.execute( "DELETE FROM embedding_metadata WHERE (document_id, content_hash) NOT IN ( SELECT id, content_hash FROM documents )", [], )?; // Rebuild FTS index from scratch — correct-by-construction. // This re-reads all rows from the external content table (documents) // and rebuilds the index. Slower than partial fix but guaranteed consistent. conn.execute( "INSERT INTO documents_fts(documents_fts) VALUES('rebuild')", [], )?; let fts_rebuilt = 1; // rebuild is all-or-nothing Ok(RepairResult { orphaned_embeddings_deleted: orphaned_deleted, stale_embeddings_cleared: stale_cleared, missing_fts_repopulated: fts_rebuilt, }) } /// Print human-readable stats. pub fn print_stats(stats: &Stats) { println!("Document Statistics:"); println!(" Issues: {:>6} documents", stats.documents.issues); println!(" MRs: {:>6} documents", stats.documents.mrs); println!(" Discussions: {:>6} documents", stats.documents.discussions); println!(" Total: {:>6} documents", stats.documents.total); if stats.documents.truncated > 0 { println!(" Truncated: {:>6}", stats.documents.truncated); } println!(); println!("Embedding Coverage:"); println!(" Embedded: {:>6} ({:.1}%)", stats.embeddings.embedded, stats.embeddings.coverage_pct); println!(" Pending: {:>6}", stats.embeddings.pending); println!(" Failed: {:>6}", stats.embeddings.failed); println!(); println!("FTS Index:"); println!(" Indexed: {:>6} documents", stats.fts.indexed); println!(); println!("Queue Depths:"); println!(" Dirty sources: {:>6} ({} failed)", stats.queues.dirty_sources, stats.queues.dirty_sources_failed ); println!(" Discussion fetches:{:>6} ({} failed)", stats.queues.pending_discussion_fetches, stats.queues.pending_discussion_fetches_failed ); } /// Print integrity check results. pub fn print_integrity_check(check: &IntegrityCheck) { println!("Integrity Check:"); println!(" Documents: {:>6}", check.documents_count); println!(" FTS entries: {:>6}", check.fts_count); println!(" Embeddings: {:>6}", check.embeddings_count); println!(" Metadata: {:>6}", check.metadata_count); if check.orphaned_embeddings > 0 { println!(" Orphaned embeddings: {:>6} (WARN)", check.orphaned_embeddings); } if check.hash_mismatches > 0 { println!(" Hash mismatches: {:>6} (WARN)", check.hash_mismatches); } println!(); println!(" Status: {}", if check.ok { "OK" } else { "ISSUES FOUND" }); } /// Print JSON stats for robot mode. pub fn print_stats_json(stats: &Stats) { let output = serde_json::json!({ "ok": true, "data": stats }); println!("{}", serde_json::to_string_pretty(&output).unwrap()); } ``` /// Print repair results. pub fn print_repair_result(result: &RepairResult) { println!("Repair Results:"); println!(" Orphaned embeddings deleted: {}", result.orphaned_embeddings_deleted); println!(" Stale embeddings cleared: {}", result.stale_embeddings_cleared); println!(" Missing FTS repopulated: {}", result.missing_fts_repopulated); println!(); let total = result.orphaned_embeddings_deleted + result.stale_embeddings_cleared + result.missing_fts_repopulated; if total == 0 { println!(" No issues found to repair."); } else { println!(" Fixed {} issues.", total); } } ``` **CLI integration:** ```rust /// Stats subcommand arguments. #[derive(Args)] pub struct StatsArgs { /// Run integrity checks (document/FTS/embedding consistency) #[arg(long)] check: bool, /// Repair issues found by --check (deletes orphaned embeddings, clears stale metadata) #[arg(long, requires = "check")] repair: bool, } ``` **Acceptance Criteria:** - [ ] Shows document counts by type - [ ] Shows embedding coverage - [ ] Shows FTS index count - [ ] Identifies truncated documents - [ ] Shows queue depths (dirty_sources, pending_discussion_fetches) - [ ] Shows failed item counts for each queue - [ ] `--check` verifies document/FTS/embedding consistency - [ ] `--repair` fixes orphaned embeddings, stale metadata, missing FTS entries - [ ] JSON output for scripting --- ## Phase 5: Hybrid Search ### 5.1 Vector Search Function **File:** `src/search/vector.rs` ```rust use rusqlite::Connection; use crate::core::error::Result; /// Vector search result. #[derive(Debug, Clone)] pub struct VectorResult { pub document_id: i64, pub distance: f64, // Lower = more similar } /// Search documents using vector similarity. /// /// Uses sqlite-vec for efficient vector search. /// Returns document IDs sorted by distance (lower = better match). /// /// IMPORTANT: sqlite-vec KNN queries require: /// - k parameter for number of results /// - embedding passed as raw little-endian bytes pub fn search_vector( conn: &Connection, query_embedding: &[f32], limit: usize, ) -> Result> { // Convert embedding to bytes for sqlite-vec let embedding_bytes: Vec = query_embedding .iter() .flat_map(|f| f.to_le_bytes()) .collect(); let mut stmt = conn.prepare( "SELECT rowid, distance FROM embeddings WHERE embedding MATCH ? AND k = ? ORDER BY distance LIMIT ?" )?; let results = stmt .query_map(rusqlite::params![embedding_bytes, limit, limit], |row| { Ok(VectorResult { document_id: row.get(0)?, distance: row.get(1)?, }) })? .collect::, _>>()?; Ok(results) } ``` **Acceptance Criteria:** - [ ] Returns document IDs with distances - [ ] Lower distance = better match - [ ] Works with 768-dim vectors - [ ] Uses k parameter for KNN query - [ ] Embedding passed as bytes --- ### 5.2 RRF Ranking **File:** `src/search/rrf.rs` ```rust use std::collections::HashMap; /// RRF ranking constant. const RRF_K: f64 = 60.0; /// RRF-ranked result. #[derive(Debug, Clone)] pub struct RrfResult { pub document_id: i64, pub rrf_score: f64, // Raw RRF score pub normalized_score: f64, // Normalized to 0-1 pub vector_rank: Option, pub fts_rank: Option, } /// Rank documents using Reciprocal Rank Fusion. /// /// Algorithm: /// RRF_score(d) = Σ 1 / (k + rank_i(d)) /// /// Where: /// - k = 60 (tunable constant) /// - rank_i(d) = rank of document d in retriever i (1-indexed) /// - Sum over all retrievers where document appears pub fn rank_rrf( vector_results: &[(i64, f64)], // (doc_id, distance) fts_results: &[(i64, f64)], // (doc_id, bm25_score) ) -> Vec { let mut scores: HashMap, Option)> = HashMap::new(); // Add vector results (1-indexed ranks) for (rank, (doc_id, _)) in vector_results.iter().enumerate() { let rrf_contribution = 1.0 / (RRF_K + (rank + 1) as f64); let entry = scores.entry(*doc_id).or_insert((0.0, None, None)); entry.0 += rrf_contribution; entry.1 = Some(rank + 1); } // Add FTS results (1-indexed ranks) for (rank, (doc_id, _)) in fts_results.iter().enumerate() { let rrf_contribution = 1.0 / (RRF_K + (rank + 1) as f64); let entry = scores.entry(*doc_id).or_insert((0.0, None, None)); entry.0 += rrf_contribution; entry.2 = Some(rank + 1); } // Convert to results and sort by RRF score descending let mut results: Vec<_> = scores .into_iter() .map(|(doc_id, (rrf_score, vector_rank, fts_rank))| { RrfResult { document_id: doc_id, rrf_score, normalized_score: 0.0, // Will be set below vector_rank, fts_rank, } }) .collect(); results.sort_by(|a, b| b.rrf_score.partial_cmp(&a.rrf_score).unwrap()); // Normalize scores to 0-1 if let Some(max_score) = results.first().map(|r| r.rrf_score) { for result in &mut results { result.normalized_score = result.rrf_score / max_score; } } results } ``` **Acceptance Criteria:** - [ ] Documents in both lists score higher - [ ] Documents in one list still included - [ ] Normalized score = rrfScore / max(rrfScore) - [ ] Raw RRF score available in `--explain` output --- ### 5.3 Adaptive Recall **File:** `src/search/hybrid.rs` ```rust use rusqlite::Connection; use crate::core::error::Result; use crate::embedding::OllamaClient; use crate::search::{SearchFilters, SearchMode, search_fts, search_vector, rank_rrf, RrfResult, FtsQueryMode}; /// Minimum base recall for unfiltered search. const BASE_RECALL_MIN: usize = 50; /// Minimum recall when filters are applied. const FILTERED_RECALL_MIN: usize = 200; /// Maximum recall to prevent excessive resource usage. const RECALL_CAP: usize = 1500; /// Search mode. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum SearchMode { Hybrid, // Vector + FTS with RRF Lexical, // FTS only Semantic, // Vector only } impl SearchMode { pub fn from_str(s: &str) -> Option { match s.to_lowercase().as_str() { "hybrid" => Some(Self::Hybrid), "lexical" | "fts" => Some(Self::Lexical), "semantic" | "vector" => Some(Self::Semantic), _ => None, } } pub fn as_str(&self) -> &'static str { match self { Self::Hybrid => "hybrid", Self::Lexical => "lexical", Self::Semantic => "semantic", } } } /// Hybrid search result. #[derive(Debug)] pub struct HybridResult { pub document_id: i64, pub score: f64, pub vector_rank: Option, pub fts_rank: Option, pub rrf_score: f64, } /// Execute hybrid search. /// /// Adaptive recall: expands topK proportionally to requested limit and filter /// restrictiveness to prevent "no results" when relevant docs would be filtered out. /// /// Formula: /// - Unfiltered: max(50, limit * 10), capped at 1500 /// - Filtered: max(200, limit * 50), capped at 1500 /// /// IMPORTANT: All modes use RRF consistently to ensure rank fields /// are populated correctly for --explain output. pub async fn search_hybrid( conn: &Connection, client: Option<&OllamaClient>, ollama_base_url: Option<&str>, // For actionable error messages query: &str, mode: SearchMode, filters: &SearchFilters, fts_mode: FtsQueryMode, ) -> Result<(Vec, Vec)> { let mut warnings: Vec = Vec::new(); // Adaptive recall: proportional to requested limit and filter count let requested = filters.clamp_limit(); let top_k = if filters.has_any_filter() { (requested * 50).max(FILTERED_RECALL_MIN).min(RECALL_CAP) } else { (requested * 10).max(BASE_RECALL_MIN).min(RECALL_CAP) }; match mode { SearchMode::Lexical => { // FTS only - use RRF with empty vector results for consistent ranking let fts_results = search_fts(conn, query, top_k, fts_mode)?; let fts_tuples: Vec<_> = fts_results.iter().map(|r| (r.document_id, r.rank)).collect(); let ranked = rank_rrf(&[], &fts_tuples); let results = ranked .into_iter() .map(|r| HybridResult { document_id: r.document_id, score: r.normalized_score, vector_rank: r.vector_rank, fts_rank: r.fts_rank, rrf_score: r.rrf_score, }) .collect(); Ok((results, warnings)) } SearchMode::Semantic => { // Vector only - requires client let client = client.ok_or_else(|| crate::core::error::GiError::OllamaUnavailable { base_url: ollama_base_url.unwrap_or("http://localhost:11434").into(), source: None, })?; let query_embedding = client.embed_batch(vec![query.to_string()]).await?; let embedding = query_embedding.into_iter().next().unwrap(); let vec_results = search_vector(conn, &embedding, top_k)?; // Use RRF with empty FTS results for consistent ranking let vec_tuples: Vec<_> = vec_results.iter().map(|r| (r.document_id, r.distance)).collect(); let ranked = rank_rrf(&vec_tuples, &[]); let results = ranked .into_iter() .map(|r| HybridResult { document_id: r.document_id, score: r.normalized_score, vector_rank: r.vector_rank, fts_rank: r.fts_rank, rrf_score: r.rrf_score, }) .collect(); Ok((results, warnings)) } SearchMode::Hybrid => { // Both retrievers with RRF fusion let fts_results = search_fts(conn, query, top_k, fts_mode)?; // Attempt vector search with graceful degradation on any failure let vec_results = match client { Some(client) => { // Try to embed query; gracefully degrade on transient failures match client.embed_batch(vec![query.to_string()]).await { Ok(embeddings) => { let embedding = embeddings.into_iter().next().unwrap(); search_vector(conn, &embedding, top_k)? } Err(e) => { // Transient failure (network, timeout, rate limit, etc.) // Log and fall back to FTS-only rather than failing the search tracing::warn!("Vector search failed, falling back to lexical: {}", e); warnings.push(format!( "Vector search unavailable ({}), using lexical search only", e )); Vec::new() } } } None => { // No client configured warnings.push("Embedding service unavailable, using lexical search only".into()); Vec::new() } }; // RRF fusion let vec_tuples: Vec<_> = vec_results.iter().map(|r| (r.document_id, r.distance)).collect(); let fts_tuples: Vec<_> = fts_results.iter().map(|r| (r.document_id, r.rank)).collect(); let ranked = rank_rrf(&vec_tuples, &fts_tuples); let results = ranked .into_iter() .map(|r| HybridResult { document_id: r.document_id, score: r.normalized_score, vector_rank: r.vector_rank, fts_rank: r.fts_rank, rrf_score: r.rrf_score, }) .collect(); Ok((results, warnings)) } } } ``` **Acceptance Criteria:** - [ ] Unfiltered search uses topK=max(50, limit*10), capped at 1500 - [ ] Filtered search uses topK=max(200, limit*50), capped at 1500 - [ ] Final results still limited by `--limit` - [ ] Adaptive recall prevents "no results" under heavy filtering --- ### 5.4 Graceful Degradation When Ollama unavailable during hybrid/semantic search: 1. Log warning: "Embedding service unavailable, using lexical search only" 2. Fall back to FTS-only search 3. Include warning in response **Acceptance Criteria:** - [ ] Default mode is hybrid - [ ] `--mode=lexical` works without Ollama - [ ] `--mode=semantic` requires Ollama - [ ] Graceful degradation when Ollama down - [ ] `--explain` shows rank breakdown - [ ] All Phase 3 filters work in hybrid mode --- ## Phase 6: Sync Orchestration ### 6.1 Dirty Source Tracking **File:** `src/ingestion/dirty_tracker.rs` ```rust use rusqlite::Connection; use crate::core::error::Result; use crate::core::time::now_ms; use crate::documents::SourceType; /// Maximum dirty sources to process per sync run. const MAX_DIRTY_SOURCES_PER_RUN: usize = 500; /// Mark a source as dirty (needs document regeneration). /// /// Called during entity upsert operations. /// Uses INSERT OR IGNORE to avoid duplicates. pub fn mark_dirty( conn: &Connection, source_type: SourceType, source_id: i64, ) -> Result<()> { conn.execute( "INSERT OR IGNORE INTO dirty_sources (source_type, source_id, queued_at) VALUES (?, ?, ?)", rusqlite::params![source_type.as_str(), source_id, now_ms()], )?; Ok(()) } /// Get dirty sources ready for processing. /// /// Uses `next_attempt_at` for efficient, index-friendly backoff queries. /// Items with NULL `next_attempt_at` are ready immediately (first attempt). /// Items with `next_attempt_at <= now` have waited long enough after failure. /// /// Benefits over SQL bitshift calculation: /// - No overflow risk from large attempt_count values /// - Index-friendly: `WHERE next_attempt_at <= ?` /// - Jitter can be added in Rust when computing next_attempt_at /// /// This prevents hot-loop retries when a source consistently fails /// to generate a document (e.g., malformed data, missing references). pub fn get_dirty_sources(conn: &Connection) -> Result> { let now = now_ms(); let mut stmt = conn.prepare( "SELECT source_type, source_id FROM dirty_sources WHERE next_attempt_at IS NULL OR next_attempt_at <= ? ORDER BY attempt_count ASC, queued_at ASC LIMIT ?" )?; let results = stmt .query_map(rusqlite::params![now, MAX_DIRTY_SOURCES_PER_RUN], |row| { let type_str: String = row.get(0)?; let source_type = match type_str.as_str() { "issue" => SourceType::Issue, "merge_request" => SourceType::MergeRequest, "discussion" => SourceType::Discussion, other => return Err(rusqlite::Error::FromSqlConversionFailure( 0, rusqlite::types::Type::Text, Box::new(std::io::Error::new( std::io::ErrorKind::InvalidData, format!("invalid source_type: {other}"), )), )), }; Ok((source_type, row.get(1)?)) })? .collect::, _>>()?; Ok(results) } /// Clear dirty source after processing. pub fn clear_dirty( conn: &Connection, source_type: SourceType, source_id: i64, ) -> Result<()> { conn.execute( "DELETE FROM dirty_sources WHERE source_type = ? AND source_id = ?", rusqlite::params![source_type.as_str(), source_id], )?; Ok(()) } ``` **Acceptance Criteria:** - [ ] Upserted entities added to dirty_sources - [ ] Duplicates ignored - [ ] Queue cleared after document regeneration - [ ] Processing bounded per run (max 500) - [ ] Exponential backoff uses `next_attempt_at` (index-friendly, no overflow) - [ ] Backoff computed with jitter to prevent thundering herd - [ ] Failed items prioritized lower than fresh items (ORDER BY attempt_count ASC) --- ### 6.2 Pending Discussion Queue **File:** `src/ingestion/discussion_queue.rs` ```rust use rusqlite::Connection; use crate::core::error::Result; use crate::core::time::now_ms; /// Noteable type for discussion fetching. #[derive(Debug, Clone, Copy)] pub enum NoteableType { Issue, MergeRequest, } impl NoteableType { pub fn as_str(&self) -> &'static str { match self { Self::Issue => "Issue", Self::MergeRequest => "MergeRequest", } } } /// Pending discussion fetch entry. pub struct PendingFetch { pub project_id: i64, pub noteable_type: NoteableType, pub noteable_iid: i64, pub attempt_count: i64, } /// Queue a discussion fetch for an entity. pub fn queue_discussion_fetch( conn: &Connection, project_id: i64, noteable_type: NoteableType, noteable_iid: i64, ) -> Result<()> { conn.execute( "INSERT OR REPLACE INTO pending_discussion_fetches (project_id, noteable_type, noteable_iid, queued_at, attempt_count, last_attempt_at, last_error) VALUES (?, ?, ?, ?, 0, NULL, NULL)", rusqlite::params![project_id, noteable_type.as_str(), noteable_iid, now_ms()], )?; Ok(()) } /// Get pending fetches ready for processing. /// /// Uses `next_attempt_at` for efficient, index-friendly backoff queries. /// Items with NULL `next_attempt_at` are ready immediately (first attempt). /// Items with `next_attempt_at <= now` have waited long enough after failure. /// /// Benefits over SQL bitshift calculation: /// - No overflow risk from large attempt_count values /// - Index-friendly: `WHERE next_attempt_at <= ?` /// - Jitter can be added in Rust when computing next_attempt_at /// /// Limited to `max_items` to bound API calls per sync run. pub fn get_pending_fetches(conn: &Connection, max_items: usize) -> Result> { let now = now_ms(); let mut stmt = conn.prepare( "SELECT project_id, noteable_type, noteable_iid, attempt_count FROM pending_discussion_fetches WHERE next_attempt_at IS NULL OR next_attempt_at <= ? ORDER BY attempt_count ASC, queued_at ASC LIMIT ?" )?; let results = stmt .query_map(rusqlite::params![now, max_items], |row| { let type_str: String = row.get(1)?; let noteable_type = if type_str == "Issue" { NoteableType::Issue } else { NoteableType::MergeRequest }; Ok(PendingFetch { project_id: row.get(0)?, noteable_type, noteable_iid: row.get(2)?, attempt_count: row.get(3)?, }) })? .collect::, _>>()?; Ok(results) } /// Mark fetch as successful and remove from queue. pub fn complete_fetch( conn: &Connection, project_id: i64, noteable_type: NoteableType, noteable_iid: i64, ) -> Result<()> { conn.execute( "DELETE FROM pending_discussion_fetches WHERE project_id = ? AND noteable_type = ? AND noteable_iid = ?", rusqlite::params![project_id, noteable_type.as_str(), noteable_iid], )?; Ok(()) } /// Record fetch failure and compute next retry time. /// /// Computes `next_attempt_at` using exponential backoff with jitter: /// - Base delay: 1000ms * 2^attempt_count /// - Cap: 1 hour (3600000ms) /// - Jitter: ±10% to prevent thundering herd pub fn record_fetch_error( conn: &Connection, project_id: i64, noteable_type: NoteableType, noteable_iid: i64, error: &str, current_attempt: i64, ) -> Result<()> { let now = now_ms(); let next_attempt = crate::core::backoff::compute_next_attempt_at(now, current_attempt + 1); conn.execute( "UPDATE pending_discussion_fetches SET attempt_count = attempt_count + 1, last_attempt_at = ?, last_error = ?, next_attempt_at = ? WHERE project_id = ? AND noteable_type = ? AND noteable_iid = ?", rusqlite::params![now, error, next_attempt, project_id, noteable_type.as_str(), noteable_iid], )?; Ok(()) } // NOTE: Backoff computation uses the shared utility in `src/core/backoff.rs`. // See Phase 6.X below for the shared implementation. ``` **Acceptance Criteria:** - [ ] Updated entities queued for discussion fetch - [ ] Success removes from queue - [ ] Failure increments attempt_count and sets next_attempt_at - [ ] Processing bounded per run (max 100) - [ ] Exponential backoff uses `next_attempt_at` (index-friendly, no overflow) - [ ] Backoff computed with jitter to prevent thundering herd --- ### 6.X Shared Backoff Utility **File:** `src/core/backoff.rs` Single implementation of exponential backoff with jitter, used by both `dirty_sources` and `pending_discussion_fetches` queues. Living in `src/core/` because it's a cross-cutting concern used by multiple modules. ```rust use rand::Rng; /// Compute next_attempt_at with exponential backoff and jitter. /// /// Formula: now + min(3600000, 1000 * 2^attempt_count) * (0.9 to 1.1) /// - Capped at 1 hour to prevent runaway delays /// - ±10% jitter prevents synchronized retries after outages /// /// Used by: /// - `dirty_sources` retry scheduling (document regeneration failures) /// - `pending_discussion_fetches` retry scheduling (API fetch failures) /// /// Having one implementation prevents subtle divergence between queues /// (e.g., different caps or jitter ranges). pub fn compute_next_attempt_at(now: i64, attempt_count: i64) -> i64 { // Cap attempt_count to prevent overflow (2^30 > 1 hour anyway) let capped_attempts = attempt_count.min(30) as u32; let base_delay_ms = 1000_i64.saturating_mul(1 << capped_attempts); let capped_delay_ms = base_delay_ms.min(3_600_000); // 1 hour cap // Add ±10% jitter let jitter_factor = rand::thread_rng().gen_range(0.9..=1.1); let delay_with_jitter = (capped_delay_ms as f64 * jitter_factor) as i64; now + delay_with_jitter } ``` **Update `src/core/mod.rs`:** ```rust pub mod backoff; // Add to existing modules ``` **Acceptance Criteria:** - [ ] Single implementation shared by both queue retry paths - [ ] Cap at 1 hour prevents runaway delays - [ ] Jitter prevents thundering herd after outage recovery - [ ] Unit tests verify backoff curve and cap behavior --- ### 6.3 Document Regenerator **File:** `src/documents/regenerator.rs` ```rust use rusqlite::Connection; use crate::core::error::Result; use crate::documents::{ extract_issue_document, extract_mr_document, extract_discussion_document, DocumentData, SourceType, }; use crate::ingestion::dirty_tracker::{get_dirty_sources, clear_dirty}; /// Result of regeneration run. #[derive(Debug, Default)] pub struct RegenerateResult { pub regenerated: usize, pub unchanged: usize, pub errored: usize, } /// Regenerate documents from dirty queue. /// /// Process: /// 1. Query dirty_sources ordered by queued_at /// 2. For each: regenerate document, compute new hash /// 3. ALWAYS upsert document (labels/paths may change even if content_hash unchanged) /// 4. Track whether content_hash changed (for stats) /// 5. Delete from dirty_sources (or record error on failure) pub fn regenerate_dirty_documents(conn: &Connection) -> Result { let dirty = get_dirty_sources(conn)?; let mut result = RegenerateResult::default(); for (source_type, source_id) in &dirty { match regenerate_one(conn, *source_type, *source_id) { Ok(changed) => { if changed { result.regenerated += 1; } else { result.unchanged += 1; } clear_dirty(conn, *source_type, *source_id)?; } Err(e) => { // Fail-soft: record error but continue processing remaining items record_dirty_error(conn, *source_type, *source_id, &e.to_string())?; result.errored += 1; } } } Ok(result) } /// Regenerate a single document. Returns true if content_hash changed. /// /// If the source entity has been deleted, the corresponding document /// is also deleted (cascade cleans up labels, paths, embeddings). fn regenerate_one( conn: &Connection, source_type: SourceType, source_id: i64, ) -> Result { // Extractors return Option: None means source entity was deleted let doc = match source_type { SourceType::Issue => extract_issue_document(conn, source_id)?, SourceType::MergeRequest => extract_mr_document(conn, source_id)?, SourceType::Discussion => extract_discussion_document(conn, source_id)?, }; let Some(doc) = doc else { // Source was deleted — remove the document (cascade handles FTS/embeddings) delete_document(conn, source_type, source_id)?; return Ok(true); }; let existing_hash = get_existing_hash(conn, source_type, source_id)?; let changed = existing_hash.as_ref() != Some(&doc.content_hash); // Always upsert: labels/paths can change independently of content_hash upsert_document(conn, &doc)?; Ok(changed) } /// Delete a document by source identity (cascade handles FTS trigger, labels, paths, embeddings). fn delete_document( conn: &Connection, source_type: SourceType, source_id: i64, ) -> Result<()> { conn.execute( "DELETE FROM documents WHERE source_type = ? AND source_id = ?", rusqlite::params![source_type.as_str(), source_id], )?; Ok(()) } /// Record a regeneration error on a dirty source for retry. /// /// IMPORTANT: Sets `next_attempt_at` using the shared backoff utility. /// Without this, failed items would retry every run (hot-loop), defeating /// the backoff design documented in the schema. fn record_dirty_error( conn: &Connection, source_type: SourceType, source_id: i64, error: &str, ) -> Result<()> { let now = now_ms(); // Read current attempt_count from DB to compute backoff let attempt_count: i64 = conn.query_row( "SELECT attempt_count FROM dirty_sources WHERE source_type = ? AND source_id = ?", rusqlite::params![source_type.as_str(), source_id], |row| row.get(0), )?; // Use shared backoff utility (same as pending_discussion_fetches) let next_attempt_at = crate::core::backoff::compute_next_attempt_at(now, attempt_count + 1); conn.execute( "UPDATE dirty_sources SET attempt_count = attempt_count + 1, last_attempt_at = ?, last_error = ?, next_attempt_at = ? WHERE source_type = ? AND source_id = ?", rusqlite::params![now, error, next_attempt_at, source_type.as_str(), source_id], )?; Ok(()) } /// Get existing content hash for a document, if it exists. /// /// IMPORTANT: Uses `optional()` to distinguish between: /// - No row found -> Ok(None) /// - Row found -> Ok(Some(hash)) /// - DB error -> Err(...) /// /// Using `.ok()` would hide real DB errors (disk I/O, corruption, etc.) /// which should propagate up for proper error handling. fn get_existing_hash( conn: &Connection, source_type: SourceType, source_id: i64, ) -> Result> { use rusqlite::OptionalExtension; let mut stmt = conn.prepare( "SELECT content_hash FROM documents WHERE source_type = ? AND source_id = ?" )?; let hash: Option = stmt .query_row(rusqlite::params![source_type.as_str(), source_id], |row| row.get(0)) .optional()?; Ok(hash) } fn upsert_document(conn: &Connection, doc: &DocumentData) -> Result<()> { use rusqlite::OptionalExtension; // Check existing hashes before upserting (for write optimization) let existing: Option<(i64, String, String)> = conn .query_row( "SELECT id, labels_hash, paths_hash FROM documents WHERE source_type = ? AND source_id = ?", rusqlite::params![doc.source_type.as_str(), doc.source_id], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)), ) .optional()?; // Upsert main document (includes labels_hash, paths_hash) conn.execute( "INSERT INTO documents (source_type, source_id, project_id, author_username, label_names, labels_hash, paths_hash, created_at, updated_at, url, title, content_text, content_hash, is_truncated, truncated_reason) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(source_type, source_id) DO UPDATE SET author_username = excluded.author_username, label_names = excluded.label_names, labels_hash = excluded.labels_hash, paths_hash = excluded.paths_hash, updated_at = excluded.updated_at, url = excluded.url, title = excluded.title, content_text = excluded.content_text, content_hash = excluded.content_hash, is_truncated = excluded.is_truncated, truncated_reason = excluded.truncated_reason", rusqlite::params![ doc.source_type.as_str(), doc.source_id, doc.project_id, doc.author_username, serde_json::to_string(&doc.labels)?, doc.labels_hash, doc.paths_hash, doc.created_at, doc.updated_at, doc.url, doc.title, doc.content_text, doc.content_hash, doc.is_truncated, doc.truncated_reason, ], )?; // Get document ID (either existing or newly inserted) let doc_id = match existing { Some((id, _, _)) => id, None => get_document_id(conn, doc.source_type, doc.source_id)?, }; // Only update labels if hash changed (reduces write amplification) let labels_changed = match &existing { Some((_, old_hash, _)) => old_hash != &doc.labels_hash, None => true, // New document, must insert }; if labels_changed { conn.execute( "DELETE FROM document_labels WHERE document_id = ?", [doc_id], )?; for label in &doc.labels { conn.execute( "INSERT INTO document_labels (document_id, label_name) VALUES (?, ?)", rusqlite::params![doc_id, label], )?; } } // Only update paths if hash changed (reduces write amplification) let paths_changed = match &existing { Some((_, _, old_hash)) => old_hash != &doc.paths_hash, None => true, // New document, must insert }; if paths_changed { conn.execute( "DELETE FROM document_paths WHERE document_id = ?", [doc_id], )?; for path in &doc.paths { conn.execute( "INSERT INTO document_paths (document_id, path) VALUES (?, ?)", rusqlite::params![doc_id, path], )?; } } Ok(()) } fn get_document_id( conn: &Connection, source_type: SourceType, source_id: i64, ) -> Result { let id: i64 = conn.query_row( "SELECT id FROM documents WHERE source_type = ? AND source_id = ?", rusqlite::params![source_type.as_str(), source_id], |row| row.get(0), )?; Ok(id) } ``` **Acceptance Criteria:** - [ ] Dirty sources get documents regenerated - [ ] Hash comparison prevents unnecessary updates - [ ] FTS triggers fire on document update - [ ] Queue cleared after processing --- ### 6.4 CLI: `lore sync` **File:** `src/cli/commands/sync.rs` ```rust //! Sync command - orchestrate full sync pipeline. use serde::Serialize; use crate::core::error::Result; use crate::Config; /// Sync result summary. #[derive(Debug, Serialize)] pub struct SyncResult { pub issues_updated: usize, pub mrs_updated: usize, pub discussions_fetched: usize, pub documents_regenerated: usize, pub documents_embedded: usize, } /// Sync options. #[derive(Debug, Default)] pub struct SyncOptions { pub full: bool, // Reset cursors, fetch everything pub force: bool, // Override stale lock pub no_embed: bool, // Skip embedding step pub no_docs: bool, // Skip document regeneration } /// Run sync orchestration. /// /// Steps: /// 1. Acquire app lock with heartbeat /// 2. Ingest delta (issues, MRs) based on cursors /// 3. Process pending_discussion_fetches queue (bounded) /// 4. Apply rolling backfill window (configurable, default 14 days) /// 5. Regenerate documents from dirty_sources /// 6. Embed documents with changed content_hash /// 7. Release lock, record sync_run pub async fn run_sync(config: &Config, options: SyncOptions) -> Result { // Implementation uses existing ingestion orchestrator // and new document/embedding pipelines todo!() } /// Print human-readable sync output. pub fn print_sync(result: &SyncResult, elapsed_secs: u64) { println!("Sync complete:"); println!(" Issues updated: {:>6}", result.issues_updated); println!(" MRs updated: {:>6}", result.mrs_updated); println!(" Discussions fetched: {:>6}", result.discussions_fetched); println!(" Documents regenerated: {:>6}", result.documents_regenerated); println!(" Documents embedded: {:>6}", result.documents_embedded); println!(" Elapsed: {}m {}s", elapsed_secs / 60, elapsed_secs % 60); } /// Print JSON output for robot mode. pub fn print_sync_json(result: &SyncResult, elapsed_ms: u64) { let output = serde_json::json!({ "ok": true, "data": result, "meta": { "elapsed_ms": elapsed_ms } }); println!("{}", serde_json::to_string_pretty(&output).unwrap()); } ``` **CLI integration:** ```rust /// Sync subcommand arguments. #[derive(Args)] pub struct SyncArgs { /// Reset cursors, fetch everything #[arg(long)] full: bool, /// Override stale lock #[arg(long)] force: bool, /// Skip embedding step #[arg(long)] no_embed: bool, /// Skip document regeneration #[arg(long)] no_docs: bool, } ``` **Acceptance Criteria:** - [ ] Orchestrates full sync pipeline - [ ] Respects app lock - [ ] `--full` resets cursors - [ ] `--no-embed` skips embedding - [ ] `--no-docs` skips document regeneration - [ ] Progress reporting in human mode - [ ] JSON summary in robot mode --- ## Testing Strategy ### Unit Tests | Module | Test File | Coverage | |--------|-----------|----------| | Document extractor | `src/documents/extractor.rs` (mod tests) | Issue/MR/discussion extraction, consistent headers | | Truncation | `src/documents/truncation.rs` (mod tests) | All edge cases | | RRF ranking | `src/search/rrf.rs` (mod tests) | Score computation, merging | | Content hash | `src/documents/extractor.rs` (mod tests) | Deterministic hashing | | FTS query sanitization | `src/search/fts.rs` (mod tests) | `to_fts_query()` edge cases: `-`, `"`, `:`, `*`, `C++` | | SourceType parsing | `src/documents/extractor.rs` (mod tests) | `parse()` accepts aliases: `mr`, `mrs`, `issue`, etc. | | SearchFilters | `src/search/filters.rs` (mod tests) | `has_any_filter()`, `clamp_limit()` | | Backoff logic | `src/core/backoff.rs` (mod tests) | Shared exponential backoff curve, cap, jitter | | Hydration | `src/cli/commands/search.rs` (mod tests) | Single round-trip, label/path aggregation | ### Integration Tests | Feature | Test File | Coverage | |---------|-----------|----------| | FTS search | `tests/fts_search.rs` | Stemming, empty results | | Embedding storage | `tests/embedding.rs` | sqlite-vec operations | | Hybrid search | `tests/hybrid_search.rs` | Combined retrieval | | Sync orchestration | `tests/sync.rs` | Full pipeline | ### Golden Query Suite **File:** `tests/fixtures/golden_queries.json` ```json [ { "query": "authentication redesign", "expected_urls": [".../-/issues/234", ".../-/merge_requests/847"], "min_results": 1, "max_rank": 10 } ] ``` Each query must have at least one expected URL in top 10 results. --- ## CLI Smoke Tests | Command | Expected | Pass Criteria | |---------|----------|---------------| | `lore generate-docs` | Progress, count | Completes, count > 0 | | `lore generate-docs` (re-run) | 0 regenerated | Hash comparison works | | `lore embed` | Progress, count | Completes, count matches docs | | `lore embed` (re-run) | 0 embedded | Skips unchanged | | `lore embed --retry-failed` | Processes failed | Only failed docs processed | | `lore stats` | Coverage stats | Shows 100% after embed | | `lore stats` | Queue depths | Shows dirty_sources and pending_discussion_fetches counts | | `lore search "auth" --mode=lexical` | Results | Works without Ollama | | `lore search "auth"` | Hybrid results | Vector + FTS combined | | `lore search "auth"` (Ollama down) | FTS results + warning | Graceful degradation, warning in response | | `lore search "auth" --explain` | Rank breakdown | Shows vector/FTS/RRF | | `lore search "auth" --type=mr` | Filtered results | Only MRs | | `lore search "auth" --type=mrs` | Filtered results | Alias works | | `lore search "auth" --label=bug` | Filtered results | Only labeled docs | | `lore search "-DWITH_SSL"` | Results | Leading dash doesn't cause FTS error | | `lore search 'C++'` | Results | Special chars in query work | | `lore search "auth" --updated-after 2024-01-01` | Filtered results | Only recently updated docs | | `lore search "nonexistent123"` | No results | Graceful empty state | | `lore search "auth" --mode=semantic` (no embeddings) | Actionable error | Tells user to run `lore embed` first | | `lore sync` | Full pipeline | All steps complete | | `lore sync --no-embed` | Skip embedding | Docs generated, not embedded | | `lore generate-docs --full` | Progress, count | Keyset pagination completes without OFFSET degradation | | `lore stats --check --repair` | Repair results | FTS rebuilt, orphans cleaned | --- ## Data Integrity Checks - [ ] `documents` count = issues + MRs + discussions - [ ] `documents_fts` count = `documents` count - [ ] `embeddings` count = `documents` count (after full embed) - [ ] `embedding_metadata.content_hash` = `documents.content_hash` for all rows - [ ] All `document_labels` reference valid documents - [ ] All `document_paths` reference valid documents - [ ] No orphaned embeddings (embeddings.rowid without matching documents.id) - [ ] Discussion documents exclude system notes - [ ] Discussion documents include parent title - [ ] All `dirty_sources` entries reference existing source entities - [ ] All `pending_discussion_fetches` entries reference existing projects - [ ] `attempt_count` >= 0 for all queue entries (never negative) - [ ] `last_attempt_at` is NULL when `attempt_count` = 0 --- ## Success Criteria Checkpoint 3 is complete when all three gates pass: ### Gate A: Lexical MVP 1. **Lexical search works without Ollama** - `lore search "query" --mode=lexical` returns relevant results - All filters functional (including `--updated-after`) - FTS5 syntax errors prevented by query sanitization - Special characters in queries work correctly (`-DWITH_SSL`, `C++`) - Search results hydrated in single DB round-trip (no N+1) 2. **Document generation is correct** - Full and incremental modes use the same regenerator codepath - `--full` uses keyset pagination (no OFFSET degradation) - FTS triggers use COALESCE for NULL-safe operation ### Gate B: Hybrid MVP 3. **Semantic search works with Ollama** - `lore embed` completes successfully - `lore search "query"` returns semantically relevant results - `--explain` shows ranking breakdown - `--mode=semantic` with 0% embedding coverage returns actionable error 4. **Hybrid search combines both** - Documents appearing in both retrievers rank higher - Graceful degradation when Ollama unavailable (falls back to FTS) - Transient embed failures don't fail the entire search - Warning message included in response on degradation - Embedding pipeline uses keyset pagination for consistent paging ### Gate C: Sync MVP 5. **Incremental sync is efficient** - `lore sync` only processes changed entities - Re-embedding only happens for changed documents - Progress visible during long syncs - Queue backoff actually prevents hot-loop retries (both queues set `next_attempt_at`) - Shared backoff utility ensures consistent behavior across queues 6. **Data integrity maintained** - All counts match between tables - No orphaned records - Hashes consistent - `get_existing_hash()` properly distinguishes "not found" from DB errors - `--repair` uses FTS `rebuild` for correct-by-construction repair 7. **Observability** - `lore stats` shows queue depths and failed item counts - Failed items visible for operator intervention - Deterministic ordering ensures consistent paging 8. **Tests pass** - Unit tests for core algorithms (including FTS sanitization, shared backoff, hydration) - Integration tests for pipelines - Golden queries return expected results