# Checkpoint 3: Search & Sync MVP > **Status:** Planning > **Prerequisite:** Checkpoints 0, 1, 2 complete (issues, MRs, discussions ingested) > **Goal:** Deliver working semantic + lexical hybrid search with efficient incremental sync This checkpoint consolidates SPEC.md checkpoints 3A, 3B, 4, and 5 into a unified implementation plan. The work is structured for parallel agent execution where dependencies allow. All code integrates with existing `gitlab-inbox` infrastructure: - Error handling via `GiError` and `ErrorCode` in `src/core/error.rs` - CLI patterns matching `src/cli/commands/*.rs` (run functions, JSON/human output) - Database via `rusqlite::Connection` with migrations in `migrations/` - Config via `src/core/config.rs` (EmbeddingConfig already defined) - Robot mode JSON with `{"ok": true, "data": {...}}` pattern --- ## Executive Summary **Deliverables:** 1. Document generation from issues/MRs/discussions with FTS5 indexing 2. Ollama-powered embedding pipeline with sqlite-vec storage 3. Hybrid search (RRF-ranked vector + lexical) with rich filtering 4. Orchestrated `gi sync` command with incremental re-embedding **Key Design Decisions:** - Documents are the search unit (not raw entities) - FTS5 works standalone when Ollama unavailable (graceful degradation) - sqlite-vec `rowid = documents.id` for simple joins - RRF ranking avoids score normalization complexity - Queue-based discussion fetching isolates failures --- ## Phase 1: Schema Foundation ### 1.1 Documents Schema (Migration 007) **File:** `migrations/007_documents.sql` ```sql -- Unified searchable documents (derived from issues/MRs/discussions) CREATE TABLE documents ( id INTEGER PRIMARY KEY, source_type TEXT NOT NULL CHECK (source_type IN ('issue','merge_request','discussion')), source_id INTEGER NOT NULL, -- local DB id in the source table project_id INTEGER NOT NULL REFERENCES projects(id), author_username TEXT, -- for discussions: first note author label_names TEXT, -- JSON array (display/debug only) created_at INTEGER, -- ms epoch UTC updated_at INTEGER, -- ms epoch UTC url TEXT, title TEXT, -- null for discussions content_text TEXT NOT NULL, -- canonical text for embedding/search content_hash TEXT NOT NULL, -- SHA-256 for change detection is_truncated INTEGER NOT NULL DEFAULT 0, truncated_reason TEXT CHECK ( truncated_reason IN ('token_limit_middle_drop','single_note_oversized','first_last_oversized') OR truncated_reason IS NULL ), UNIQUE(source_type, source_id) ); CREATE INDEX idx_documents_project_updated ON documents(project_id, updated_at); CREATE INDEX idx_documents_author ON documents(author_username); CREATE INDEX idx_documents_source ON documents(source_type, source_id); CREATE INDEX idx_documents_hash ON documents(content_hash); -- Fast label filtering (indexed exact-match) CREATE TABLE document_labels ( document_id INTEGER NOT NULL REFERENCES documents(id) ON DELETE CASCADE, label_name TEXT NOT NULL, PRIMARY KEY(document_id, label_name) ) WITHOUT ROWID; CREATE INDEX idx_document_labels_label ON document_labels(label_name); -- Fast path filtering (DiffNote file paths) CREATE TABLE document_paths ( document_id INTEGER NOT NULL REFERENCES documents(id) ON DELETE CASCADE, path TEXT NOT NULL, PRIMARY KEY(document_id, path) ) WITHOUT ROWID; CREATE INDEX idx_document_paths_path ON document_paths(path); -- Queue for incremental document regeneration (with retry tracking) CREATE TABLE dirty_sources ( source_type TEXT NOT NULL CHECK (source_type IN ('issue','merge_request','discussion')), source_id INTEGER NOT NULL, queued_at INTEGER NOT NULL, -- ms epoch UTC attempt_count INTEGER NOT NULL DEFAULT 0, last_attempt_at INTEGER, last_error TEXT, PRIMARY KEY(source_type, source_id) ); CREATE INDEX idx_dirty_sources_retry ON dirty_sources(attempt_count, last_attempt_at) WHERE last_error IS NOT NULL; -- Resumable queue for dependent discussion fetching CREATE TABLE pending_discussion_fetches ( project_id INTEGER NOT NULL REFERENCES projects(id), noteable_type TEXT NOT NULL, -- 'Issue' | 'MergeRequest' noteable_iid INTEGER NOT NULL, queued_at INTEGER NOT NULL, -- ms epoch UTC attempt_count INTEGER NOT NULL DEFAULT 0, last_attempt_at INTEGER, last_error TEXT, PRIMARY KEY(project_id, noteable_type, noteable_iid) ); CREATE INDEX idx_pending_discussions_retry ON pending_discussion_fetches(attempt_count, last_attempt_at) WHERE last_error IS NOT NULL; ``` **Acceptance Criteria:** - [ ] Migration applies cleanly on fresh DB - [ ] Migration applies cleanly after CP2 schema - [ ] All foreign keys enforced - [ ] Indexes created --- ### 1.2 FTS5 Index (Migration 008) **File:** `migrations/008_fts5.sql` ```sql -- Full-text search with porter stemmer and prefix indexes for type-ahead CREATE VIRTUAL TABLE documents_fts USING fts5( title, content_text, content='documents', content_rowid='id', tokenize='porter unicode61', prefix='2 3 4' ); -- Keep FTS in sync via triggers CREATE TRIGGER documents_ai AFTER INSERT ON documents BEGIN INSERT INTO documents_fts(rowid, title, content_text) VALUES (new.id, new.title, new.content_text); END; CREATE TRIGGER documents_ad AFTER DELETE ON documents BEGIN INSERT INTO documents_fts(documents_fts, rowid, title, content_text) VALUES('delete', old.id, old.title, old.content_text); END; -- Only rebuild FTS when searchable text actually changes (not metadata-only updates) CREATE TRIGGER documents_au AFTER UPDATE ON documents WHEN old.title IS NOT new.title OR old.content_text != new.content_text BEGIN INSERT INTO documents_fts(documents_fts, rowid, title, content_text) VALUES('delete', old.id, old.title, old.content_text); INSERT INTO documents_fts(rowid, title, content_text) VALUES (new.id, new.title, new.content_text); END; ``` **Acceptance Criteria:** - [ ] `documents_fts` created as virtual table - [ ] Triggers fire on insert/update/delete - [ ] Update trigger only fires when title or content_text changes (not metadata-only updates) - [ ] FTS row count matches documents count after bulk insert - [ ] Prefix search works for type-ahead UX --- ### 1.3 Embeddings Schema (Migration 009) **File:** `migrations/009_embeddings.sql` ```sql -- NOTE: sqlite-vec vec0 virtual tables cannot participate in FK cascades. -- We must use an explicit trigger to delete orphan embeddings when documents -- are deleted. See documents_embeddings_ad trigger below. -- sqlite-vec virtual table for vector search -- Storage rule: embeddings.rowid = documents.id CREATE VIRTUAL TABLE embeddings USING vec0( embedding float[768] ); -- Embedding provenance + change detection CREATE TABLE embedding_metadata ( document_id INTEGER PRIMARY KEY REFERENCES documents(id) ON DELETE CASCADE, model TEXT NOT NULL, -- 'nomic-embed-text' dims INTEGER NOT NULL, -- 768 content_hash TEXT NOT NULL, -- copied from documents.content_hash created_at INTEGER NOT NULL, -- ms epoch UTC last_error TEXT, -- error message from last failed attempt attempt_count INTEGER NOT NULL DEFAULT 0, last_attempt_at INTEGER -- ms epoch UTC ); CREATE INDEX idx_embedding_metadata_errors ON embedding_metadata(last_error) WHERE last_error IS NOT NULL; CREATE INDEX idx_embedding_metadata_hash ON embedding_metadata(content_hash); -- CRITICAL: Delete orphan embeddings when documents are deleted. -- vec0 virtual tables don't support FK ON DELETE CASCADE, so we need this trigger. -- embedding_metadata has ON DELETE CASCADE, so only vec0 needs explicit cleanup CREATE TRIGGER documents_embeddings_ad AFTER DELETE ON documents BEGIN DELETE FROM embeddings WHERE rowid = old.id; END; ``` **Acceptance Criteria:** - [ ] `embeddings` vec0 table created - [ ] `embedding_metadata` tracks provenance - [ ] Error tracking fields present for retry logic - [ ] Orphan cleanup trigger fires on document deletion **Dependencies:** - Requires sqlite-vec extension loaded at runtime - Extension loading already happens in `src/core/db.rs` - [ ] Migration runner must load sqlite-vec *before* applying migrations (including on fresh DB) --- ## Phase 2: Document Generation ### 2.1 Document Module Structure **New module:** `src/documents/` ``` src/documents/ ├── mod.rs # Module exports ├── extractor.rs # Document extraction from entities ├── truncation.rs # Note-boundary aware truncation └── regenerator.rs # Dirty source processing ``` **File:** `src/documents/mod.rs` ```rust //! Document generation and management. //! //! Extracts searchable documents from issues, MRs, and discussions. mod extractor; mod regenerator; mod truncation; pub use extractor::{ extract_discussion_document, extract_issue_document, extract_mr_document, DocumentData, SourceType, }; // Note: extract_*_document() return Result> // None means the source entity was deleted from the database pub use regenerator::regenerate_dirty_documents; pub use truncation::{truncate_content, TruncationResult}; ``` **Update `src/lib.rs`:** ```rust pub mod documents; // Add to existing modules ``` --- ### 2.2 Document Types **File:** `src/documents/extractor.rs` ```rust use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; /// Source type for documents. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum SourceType { Issue, MergeRequest, Discussion, } impl SourceType { pub fn as_str(&self) -> &'static str { match self { Self::Issue => "issue", Self::MergeRequest => "merge_request", Self::Discussion => "discussion", } } } impl std::fmt::Display for SourceType { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.as_str()) } } /// Generated document ready for storage. #[derive(Debug, Clone)] pub struct DocumentData { pub source_type: SourceType, pub source_id: i64, pub project_id: i64, pub author_username: Option, pub labels: Vec, pub paths: Vec, // DiffNote file paths pub created_at: i64, pub updated_at: i64, pub url: Option, pub title: Option, pub content_text: String, pub content_hash: String, pub is_truncated: bool, pub truncated_reason: Option, } /// Compute SHA-256 hash of content. pub fn compute_content_hash(content: &str) -> String { let mut hasher = Sha256::new(); hasher.update(content.as_bytes()); format!("{:x}", hasher.finalize()) } ``` **Document Formats:** | Source | content_text | |--------|-------------| | Issue | `{title}\n\n{description}` | | MR | `{title}\n\n{description}` | | Discussion | Full thread with header (see below) | **Discussion Document Format:** ``` [[Discussion]] Issue #234: Authentication redesign Project: group/project-one URL: https://gitlab.example.com/group/project-one/-/issues/234#note_12345 Labels: ["bug", "auth"] Files: ["src/auth/login.ts"] --- Thread --- @johndoe (2024-03-15): I think we should move to JWT-based auth... @janedoe (2024-03-15): Agreed. What about refresh token strategy? ``` **Acceptance Criteria:** - [ ] Issue document: title + description concatenated - [ ] MR document: title + description concatenated - [ ] Discussion document: includes parent title, project, URL, labels, files, thread - [ ] System notes (is_system=1) excluded from discussion content - [ ] DiffNote file paths extracted to paths vector - [ ] Labels extracted to labels vector - [ ] SHA-256 hash computed from content_text --- ### 2.3 Truncation Logic **File:** `src/documents/truncation.rs` ```rust /// Maximum content length (~8,000 tokens at 4 chars/token estimate). pub const MAX_CONTENT_CHARS: usize = 32_000; /// Truncation result with metadata. #[derive(Debug, Clone)] pub struct TruncationResult { pub content: String, pub is_truncated: bool, pub reason: Option, } /// Reason for truncation. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum TruncationReason { TokenLimitMiddleDrop, SingleNoteOversized, FirstLastOversized, } impl TruncationReason { pub fn as_str(&self) -> &'static str { match self { Self::TokenLimitMiddleDrop => "token_limit_middle_drop", Self::SingleNoteOversized => "single_note_oversized", Self::FirstLastOversized => "first_last_oversized", } } } /// Truncate content at note boundaries. /// /// Rules: /// - Max content: 32,000 characters /// - Truncate at NOTE boundaries (never mid-note) /// - Preserve first N notes and last M notes /// - Drop from middle, insert marker pub fn truncate_content(notes: &[NoteContent], max_chars: usize) -> TruncationResult { // Implementation handles edge cases per table below todo!() } /// Note content for truncation. pub struct NoteContent { pub author: String, pub date: String, pub body: String, } ``` **Edge Cases:** | Scenario | Handling | |----------|----------| | Single note > 32000 chars | Truncate at char boundary, append `[truncated]`, reason = `single_note_oversized` | | First + last note > 32000 | Keep only first note (truncated if needed), reason = `first_last_oversized` | | Only one note | Truncate at char boundary if needed | **Acceptance Criteria:** - [ ] Notes never cut mid-content - [ ] First and last notes preserved when possible - [ ] Truncation marker `\n\n[... N notes omitted for length ...]\n\n` inserted - [ ] Metadata fields set correctly - [ ] Edge cases handled per table above --- ### 2.4 CLI: `gi generate-docs` (Incremental by Default) **File:** `src/cli/commands/generate_docs.rs` ```rust //! Generate documents command - create searchable documents from entities. //! //! By default, runs incrementally (processes only dirty_sources queue). //! Use --full to regenerate all documents from scratch. use rusqlite::Connection; use serde::Serialize; use crate::core::error::Result; use crate::documents::{DocumentData, SourceType}; use crate::Config; /// Result of document generation. #[derive(Debug, Serialize)] pub struct GenerateDocsResult { pub issues: usize, pub mrs: usize, pub discussions: usize, pub total: usize, pub truncated: usize, pub skipped: usize, // Unchanged documents } /// Run document generation (incremental by default). /// /// Incremental mode (default): /// - Processes only items in dirty_sources queue /// - Fast for routine syncs /// /// Full mode (--full): /// - Regenerates ALL documents from scratch /// - Use when schema changes or after migration pub fn run_generate_docs( config: &Config, full: bool, project_filter: Option<&str>, ) -> Result { if full { // Full mode: regenerate everything inside a single transaction // 1. BEGIN IMMEDIATE transaction // 2. Query all issues, MRs, discussions // 3. For each: generate document, compute hash // 4. Upsert into `documents` table (FTS triggers auto-fire) // 5. Populate `document_labels` and `document_paths` // 6. Rebuild FTS: INSERT INTO documents_fts(documents_fts) VALUES('rebuild') // 7. COMMIT // 8. Return counts // // The FTS rebuild at step 6 ensures the index is consistent // after bulk operations. Wrapping in a transaction avoids // partial state if the process is interrupted. } else { // Incremental mode: process dirty_sources only // 1. Query dirty_sources (bounded by LIMIT) // 2. Regenerate only those documents // 3. Clear from dirty_sources after processing } todo!() } /// Print human-readable output. pub fn print_generate_docs(result: &GenerateDocsResult) { println!("Document generation complete:"); println!(" Issues: {:>6} documents", result.issues); println!(" MRs: {:>6} documents", result.mrs); println!(" Discussions: {:>6} documents", result.discussions); println!(" ─────────────────────"); println!(" Total: {:>6} documents", result.total); if result.truncated > 0 { println!(" Truncated: {:>6}", result.truncated); } if result.skipped > 0 { println!(" Skipped: {:>6} (unchanged)", result.skipped); } } /// Print JSON output for robot mode. pub fn print_generate_docs_json(result: &GenerateDocsResult) { let output = serde_json::json!({ "ok": true, "data": result }); println!("{}", serde_json::to_string_pretty(&output).unwrap()); } ``` **CLI integration in `src/cli/mod.rs`:** ```rust /// Generate-docs subcommand arguments. #[derive(Args)] pub struct GenerateDocsArgs { /// Regenerate ALL documents (not just dirty queue) #[arg(long)] full: bool, /// Only generate for specific project #[arg(long)] project: Option, } ``` **Acceptance Criteria:** - [ ] Creates document for each issue - [ ] Creates document for each MR - [ ] Creates document for each discussion - [ ] Default mode processes dirty_sources queue only (incremental) - [ ] `--full` regenerates all documents from scratch - [ ] Progress bar in human mode (via `indicatif`) - [ ] JSON output in robot mode --- ## Phase 3: Lexical Search ### 3.1 Search Module Structure **New module:** `src/search/` ``` src/search/ ├── mod.rs # Module exports ├── fts.rs # FTS5 search ├── vector.rs # Vector search (sqlite-vec) ├── hybrid.rs # Combined hybrid search ├── rrf.rs # RRF ranking algorithm └── filters.rs # Filter parsing and application ``` **File:** `src/search/mod.rs` ```rust //! Search functionality for documents. //! //! Supports lexical (FTS5), semantic (vector), and hybrid search. mod filters; mod fts; mod hybrid; mod rrf; mod vector; pub use filters::{SearchFilters, apply_filters}; pub use fts::{search_fts, FtsResult}; pub use hybrid::{search_hybrid, HybridResult, SearchMode}; pub use rrf::{rank_rrf, RrfResult}; pub use vector::{search_vector, VectorResult}; ``` --- ### 3.2 FTS5 Search Function **File:** `src/search/fts.rs` ```rust use rusqlite::Connection; use crate::core::error::Result; /// FTS search result. #[derive(Debug, Clone)] pub struct FtsResult { pub document_id: i64, pub rank: f64, // BM25 score (lower = better match) pub snippet: String, // Context snippet around match } /// Search documents using FTS5. /// /// Returns matching document IDs with BM25 rank scores and snippets. /// Lower rank values indicate better matches. /// Uses bm25() explicitly (not the `rank` alias) and snippet() for context. pub fn search_fts( conn: &Connection, query: &str, limit: usize, ) -> Result> { if query.trim().is_empty() { return Ok(Vec::new()); } let mut stmt = conn.prepare( "SELECT rowid, bm25(documents_fts), snippet(documents_fts, 1, '', '', '...', 64) FROM documents_fts WHERE documents_fts MATCH ? ORDER BY bm25(documents_fts) LIMIT ?" )?; let results = stmt .query_map([query, &limit.to_string()], |row| { Ok(FtsResult { document_id: row.get(0)?, rank: row.get(1)?, snippet: row.get(2)?, }) })? .collect::, _>>()?; Ok(results) } ``` **Acceptance Criteria:** - [ ] Returns matching document IDs with BM25 rank - [ ] Porter stemming works (search/searching match) - [ ] Prefix search works (type-ahead UX) - [ ] Empty query returns empty results - [ ] Nonsense query returns empty results --- ### 3.3 Search Filters **File:** `src/search/filters.rs` ```rust use rusqlite::Connection; use crate::core::error::Result; use crate::documents::SourceType; /// Search filters applied post-retrieval. #[derive(Debug, Clone, Default)] pub struct SearchFilters { pub source_type: Option, pub author: Option, pub project_id: Option, pub after: Option, // ms epoch pub labels: Vec, // AND logic pub path: Option, pub limit: usize, // Default 20, max 100 } /// Path filter with prefix or exact match. #[derive(Debug, Clone)] pub enum PathFilter { Prefix(String), // Trailing `/` -> LIKE 'path/%' Exact(String), // No trailing `/` -> = 'path' } impl PathFilter { pub fn from_str(s: &str) -> Self { if s.ends_with('/') { Self::Prefix(s.to_string()) } else { Self::Exact(s.to_string()) } } } /// Apply filters to document IDs, returning filtered set. /// /// IMPORTANT: Preserves ranking order from input document_ids. /// Filters must not reorder results - maintain the RRF/search ranking. pub fn apply_filters( conn: &Connection, document_ids: &[i64], filters: &SearchFilters, ) -> Result> { // Build dynamic WHERE clause based on filters // Multiple --label flags use AND logic // Path prefix vs exact match per PathFilter variant // // Implementation strategy to preserve ranking order: // 1. Accept document_ids as ordered list // 2. Build CTE with position // 3. JOIN with filters // 4. ORDER BY original position // // Example SQL pattern: // ```sql // WITH ranked_docs(doc_id, pos) AS ( // SELECT column1, ROW_NUMBER() OVER() as pos // FROM (VALUES (?),(?),(?),...) // ) // SELECT d.id // FROM documents d // JOIN ranked_docs rd ON d.id = rd.doc_id // WHERE d.source_type = ? // AND EXISTS ( // SELECT 1 FROM document_labels dl // WHERE dl.document_id = d.id AND dl.label_name = ? // ) // ORDER BY rd.pos // LIMIT ? // ``` todo!() } ``` **Supported filters:** | Filter | SQL Column | Notes | |--------|-----------|-------| | `--type` | `source_type` | `issue`, `mr`, `discussion` | | `--author` | `author_username` | Exact match | | `--project` | `project_id` | Resolve path to ID | | `--after` | `created_at` | `>= date` (ms epoch) | | `--label` | `document_labels` | JOIN, multiple = AND | | `--path` | `document_paths` | JOIN, trailing `/` = prefix | | `--limit` | N/A | Default 20, max 100 | **Acceptance Criteria:** - [ ] Each filter correctly restricts results - [ ] Multiple `--label` flags use AND logic - [ ] Path prefix vs exact match works correctly - [ ] Filters compose (all applied together) - [ ] Ranking order preserved after filtering --- ### 3.4 CLI: `gi search --mode=lexical` **File:** `src/cli/commands/search.rs` ```rust //! Search command - find documents using lexical, semantic, or hybrid search. use console::style; use serde::Serialize; use crate::core::error::Result; use crate::core::time::ms_to_iso; use crate::search::{SearchFilters, SearchMode, search_hybrid, HybridResult}; use crate::Config; /// Search result for display. #[derive(Debug, Serialize)] pub struct SearchResultDisplay { pub document_id: i64, pub source_type: String, pub title: Option, pub url: Option, pub project_path: String, pub author: Option, pub created_at: String, // ISO format pub updated_at: String, // ISO format pub score: f64, // Normalized 0-1 pub snippet: String, // Context around match pub labels: Vec, #[serde(skip_serializing_if = "Option::is_none")] pub explain: Option, } /// Ranking explanation for --explain flag. #[derive(Debug, Serialize)] pub struct ExplainData { pub vector_rank: Option, pub fts_rank: Option, pub rrf_score: f64, } /// Search results response. #[derive(Debug, Serialize)] pub struct SearchResponse { pub query: String, pub mode: String, pub total_results: usize, pub results: Vec, #[serde(skip_serializing_if = "Vec::is_empty")] pub warnings: Vec, } /// Run search command. pub fn run_search( config: &Config, query: &str, mode: SearchMode, filters: SearchFilters, explain: bool, ) -> Result { // 1. Parse query and filters // 2. Execute search based on mode // 3. Apply post-retrieval filters // 4. Format and return results todo!() } /// Print human-readable search results. pub fn print_search_results(response: &SearchResponse, explain: bool) { println!( "Found {} results ({} search)\n", response.total_results, response.mode ); for (i, result) in response.results.iter().enumerate() { let type_prefix = match result.source_type.as_str() { "merge_request" => "MR", "issue" => "Issue", "discussion" => "Discussion", _ => &result.source_type, }; let title = result.title.as_deref().unwrap_or("(untitled)"); println!( "[{}] {} - {} ({})", i + 1, style(type_prefix).cyan(), title, format!("{:.2}", result.score) ); if explain { if let Some(exp) = &result.explain { let vec_str = exp.vector_rank.map(|r| format!("#{}", r)).unwrap_or_else(|| "-".into()); let fts_str = exp.fts_rank.map(|r| format!("#{}", r)).unwrap_or_else(|| "-".into()); println!( " Vector: {}, FTS: {}, RRF: {:.4}", vec_str, fts_str, exp.rrf_score ); } } if let Some(author) = &result.author { println!( " @{} · {} · {}", author, &result.created_at[..10], result.project_path ); } println!(" \"{}...\"", &result.snippet); if let Some(url) = &result.url { println!(" {}", style(url).dim()); } println!(); } } /// Print JSON search results for robot mode. pub fn print_search_results_json(response: &SearchResponse, elapsed_ms: u64) { let output = serde_json::json!({ "ok": true, "data": response, "meta": { "elapsed_ms": elapsed_ms } }); println!("{}", serde_json::to_string_pretty(&output).unwrap()); } ``` **CLI integration in `src/cli/mod.rs`:** ```rust /// Search subcommand arguments. #[derive(Args)] pub struct SearchArgs { /// Search query query: String, /// Search mode #[arg(long, default_value = "hybrid")] mode: String, // "hybrid" | "lexical" | "semantic" /// Filter by source type #[arg(long, value_name = "TYPE")] r#type: Option, /// Filter by author username #[arg(long)] author: Option, /// Filter by project path #[arg(long)] project: Option, /// Filter by creation date (after) #[arg(long)] after: Option, /// Filter by label (can specify multiple) #[arg(long, action = clap::ArgAction::Append)] label: Vec, /// Filter by file path #[arg(long)] path: Option, /// Maximum results #[arg(long, default_value = "20")] limit: usize, /// Show ranking breakdown #[arg(long)] explain: bool, } ``` **Acceptance Criteria:** - [ ] Works without Ollama running - [ ] All filters functional - [ ] Human-readable output with snippets - [ ] JSON output matches schema - [ ] Empty results show helpful message - [ ] "No data indexed" message if documents table empty --- ## Phase 4: Embedding Pipeline ### 4.1 Embedding Module Structure **New module:** `src/embedding/` ``` src/embedding/ ├── mod.rs # Module exports ├── ollama.rs # Ollama API client ├── pipeline.rs # Batch embedding orchestration └── change_detector.rs # Detect documents needing re-embedding ``` **File:** `src/embedding/mod.rs` ```rust //! Embedding generation and storage. //! //! Uses Ollama for embedding generation and sqlite-vec for storage. mod change_detector; mod ollama; mod pipeline; pub use change_detector::detect_embedding_changes; pub use ollama::{OllamaClient, OllamaConfig, check_ollama_health}; pub use pipeline::{embed_documents, EmbedResult}; ``` --- ### 4.2 Ollama Client **File:** `src/embedding/ollama.rs` ```rust use reqwest::Client; use serde::{Deserialize, Serialize}; use crate::core::error::{GiError, Result}; /// Ollama client configuration. #[derive(Debug, Clone)] pub struct OllamaConfig { pub base_url: String, // "http://localhost:11434" pub model: String, // "nomic-embed-text" pub timeout_secs: u64, // Request timeout } impl Default for OllamaConfig { fn default() -> Self { Self { base_url: "http://localhost:11434".into(), model: "nomic-embed-text".into(), timeout_secs: 60, } } } /// Ollama API client. pub struct OllamaClient { client: Client, config: OllamaConfig, } /// Batch embed request. #[derive(Serialize)] struct EmbedRequest { model: String, input: Vec, } /// Batch embed response. #[derive(Deserialize)] struct EmbedResponse { model: String, embeddings: Vec>, } /// Model info from /api/tags. #[derive(Deserialize)] struct TagsResponse { models: Vec, } #[derive(Deserialize)] struct ModelInfo { name: String, } impl OllamaClient { pub fn new(config: OllamaConfig) -> Self { let client = Client::builder() .timeout(std::time::Duration::from_secs(config.timeout_secs)) .build() .expect("Failed to create HTTP client"); Self { client, config } } /// Check if Ollama is available and model is loaded. pub async fn health_check(&self) -> Result<()> { let url = format!("{}/api/tags", self.config.base_url); let response = self.client.get(&url).send().await.map_err(|e| { GiError::OllamaUnavailable { base_url: self.config.base_url.clone(), source: Some(e), } })?; let tags: TagsResponse = response.json().await?; let model_available = tags.models.iter().any(|m| m.name.starts_with(&self.config.model)); if !model_available { return Err(GiError::OllamaModelNotFound { model: self.config.model.clone(), }); } Ok(()) } /// Generate embeddings for a batch of texts. /// /// Returns 768-dimensional vectors for each input text. pub async fn embed_batch(&self, texts: Vec) -> Result>> { let url = format!("{}/api/embed", self.config.base_url); let request = EmbedRequest { model: self.config.model.clone(), input: texts, }; let response = self.client .post(&url) .json(&request) .send() .await .map_err(|e| GiError::OllamaUnavailable { base_url: self.config.base_url.clone(), source: Some(e), })?; if !response.status().is_success() { let status = response.status(); let body = response.text().await.unwrap_or_default(); return Err(GiError::EmbeddingFailed { document_id: 0, // Batch failure reason: format!("HTTP {}: {}", status, body), }); } let embed_response: EmbedResponse = response.json().await?; Ok(embed_response.embeddings) } } /// Quick health check without full client. pub async fn check_ollama_health(base_url: &str) -> bool { let client = Client::new(); client .get(format!("{}/api/tags", base_url)) .send() .await .is_ok() } ``` **Endpoints:** | Endpoint | Purpose | |----------|---------| | `GET /api/tags` | Health check, verify model available | | `POST /api/embed` | Batch embedding (preferred) | **Acceptance Criteria:** - [ ] Health check detects Ollama availability - [ ] Batch embedding works with up to 32 texts - [ ] Clear error messages for common failures --- ### 4.3 Error Handling Extensions **File:** `src/core/error.rs` (extend existing) Add to `ErrorCode`: ```rust pub enum ErrorCode { // ... existing variants ... InvalidEnumValue, OllamaUnavailable, OllamaModelNotFound, EmbeddingFailed, } impl ErrorCode { pub fn exit_code(&self) -> i32 { match self { // ... existing mappings ... Self::InvalidEnumValue => 13, Self::OllamaUnavailable => 14, Self::OllamaModelNotFound => 15, Self::EmbeddingFailed => 16, } } } impl std::fmt::Display for ErrorCode { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let code = match self { // ... existing mappings ... Self::InvalidEnumValue => "INVALID_ENUM_VALUE", Self::OllamaUnavailable => "OLLAMA_UNAVAILABLE", Self::OllamaModelNotFound => "OLLAMA_MODEL_NOT_FOUND", Self::EmbeddingFailed => "EMBEDDING_FAILED", }; write!(f, "{code}") } } ``` Add to `GiError`: ```rust pub enum GiError { // ... existing variants ... #[error("Cannot connect to Ollama at {base_url}. Is it running?")] OllamaUnavailable { base_url: String, #[source] source: Option, }, #[error("Ollama model '{model}' not found. Run: ollama pull {model}")] OllamaModelNotFound { model: String }, #[error("Embedding failed for document {document_id}: {reason}")] EmbeddingFailed { document_id: i64, reason: String }, } impl GiError { pub fn code(&self) -> ErrorCode { match self { // ... existing mappings ... Self::OllamaUnavailable { .. } => ErrorCode::OllamaUnavailable, Self::OllamaModelNotFound { .. } => ErrorCode::OllamaModelNotFound, Self::EmbeddingFailed { .. } => ErrorCode::EmbeddingFailed, } } pub fn suggestion(&self) -> Option<&'static str> { match self { // ... existing mappings ... Self::OllamaUnavailable { .. } => Some("Start Ollama: ollama serve"), Self::OllamaModelNotFound { model } => Some("Pull the model: ollama pull nomic-embed-text"), Self::EmbeddingFailed { .. } => Some("Check Ollama logs or retry with 'gi embed --retry-failed'"), } } } ``` --- ### 4.4 Embedding Pipeline **File:** `src/embedding/pipeline.rs` ```rust use indicatif::{ProgressBar, ProgressStyle}; use rusqlite::Connection; use crate::core::error::Result; use crate::embedding::OllamaClient; /// Batch size for embedding requests. const BATCH_SIZE: usize = 32; /// SQLite page size for paging through pending documents. const DB_PAGE_SIZE: usize = 500; /// Which documents to embed. #[derive(Debug, Clone, Copy)] pub enum EmbedSelection { /// New or changed documents (default). Pending, /// Only previously failed documents. RetryFailed, } /// Result of embedding run. #[derive(Debug, Default)] pub struct EmbedResult { pub embedded: usize, pub failed: usize, pub skipped: usize, } /// Embed documents that need embedding. /// /// Process: /// 1. Page through documents needing embedding (DB_PAGE_SIZE at a time) /// 2. Batch documents (32 per Ollama request) /// 3. Fire concurrent HTTP requests via FuturesUnordered (capped by concurrency) /// 4. Collect results and write to SQLite sequentially (rusqlite is !Send) /// 5. On failure: record error with actual content_hash, continue with next batch /// /// Architecture note: rusqlite::Connection is !Send, so all DB reads/writes /// happen on the main thread. Only HTTP calls are concurrent. pub async fn embed_documents( conn: &Connection, client: &OllamaClient, concurrency: usize, progress_callback: Option>, ) -> Result { use futures::stream::{FuturesUnordered, StreamExt}; let mut result = EmbedResult::default(); let mut total_pending = count_pending_documents(conn)?; if total_pending == 0 { return Ok(result); } // Page through pending documents to avoid loading all into memory loop { let pending = find_pending_documents(conn, DB_PAGE_SIZE)?; if pending.is_empty() { break; } // Launch concurrent HTTP requests, collect results let mut futures = FuturesUnordered::new(); for batch in pending.chunks(BATCH_SIZE) { let texts: Vec = batch.iter().map(|d| d.content.clone()).collect(); let batch_meta: Vec<(i64, String)> = batch .iter() .map(|d| (d.id, d.content_hash.clone())) .collect(); futures.push(async move { let embed_result = client.embed_batch(texts).await; (batch_meta, embed_result) }); // Cap in-flight requests if futures.len() >= concurrency { if let Some((meta, res)) = futures.next().await { collect_writes(conn, &meta, res, &mut result)?; } } } // Drain remaining futures while let Some((meta, res)) = futures.next().await { collect_writes(conn, &meta, res, &mut result)?; } if let Some(ref cb) = progress_callback { cb(result.embedded + result.failed, total_pending); } } Ok(result) } /// Collect embedding results and write to DB (sequential, on main thread). fn collect_writes( conn: &Connection, batch_meta: &[(i64, String)], embed_result: Result>>, result: &mut EmbedResult, ) -> Result<()> { let tx = conn.transaction()?; match embed_result { Ok(embeddings) => { for ((doc_id, hash), embedding) in batch_meta.iter().zip(embeddings.iter()) { store_embedding(&tx, *doc_id, embedding, hash)?; result.embedded += 1; } } Err(e) => { for (doc_id, hash) in batch_meta { record_embedding_error(&tx, *doc_id, hash, &e.to_string())?; result.failed += 1; } } } tx.commit()?; Ok(()) } struct PendingDocument { id: i64, content: String, content_hash: String, } /// Count total pending documents (for progress reporting). fn count_pending_documents(conn: &Connection) -> Result { let count: usize = conn.query_row( "SELECT COUNT(*) FROM documents d LEFT JOIN embedding_metadata em ON d.id = em.document_id WHERE em.document_id IS NULL OR em.content_hash != d.content_hash", [], |row| row.get(0), )?; Ok(count) } fn find_pending_documents( conn: &Connection, limit: usize, selection: EmbedSelection, ) -> Result> { let sql = match selection { EmbedSelection::Pending => "SELECT d.id, d.content_text, d.content_hash FROM documents d LEFT JOIN embedding_metadata em ON d.id = em.document_id WHERE em.document_id IS NULL OR em.content_hash != d.content_hash LIMIT ?", EmbedSelection::RetryFailed => "SELECT d.id, d.content_text, d.content_hash FROM documents d JOIN embedding_metadata em ON d.id = em.document_id WHERE em.last_error IS NOT NULL LIMIT ?", }; let mut stmt = conn.prepare(sql)?; let docs = stmt .query_map([limit], |row| { Ok(PendingDocument { id: row.get(0)?, content: row.get(1)?, content_hash: row.get(2)?, }) })? .collect::, _>>()?; Ok(docs) } fn store_embedding( conn: &Connection, document_id: i64, embedding: &[f32], content_hash: &str, ) -> Result<()> { // Convert embedding to bytes for sqlite-vec // sqlite-vec expects raw little-endian bytes, not the array directly let embedding_bytes: Vec = embedding .iter() .flat_map(|f| f.to_le_bytes()) .collect(); // Store in sqlite-vec (rowid = document_id) conn.execute( "INSERT OR REPLACE INTO embeddings(rowid, embedding) VALUES (?, ?)", rusqlite::params![document_id, embedding_bytes], )?; // Update metadata let now = crate::core::time::now_ms(); conn.execute( "INSERT OR REPLACE INTO embedding_metadata (document_id, model, dims, content_hash, created_at, last_error, attempt_count, last_attempt_at) VALUES (?, 'nomic-embed-text', 768, ?, ?, NULL, 0, ?)", rusqlite::params![document_id, content_hash, now, now], )?; Ok(()) } fn record_embedding_error( conn: &Connection, document_id: i64, content_hash: &str, error: &str, ) -> Result<()> { let now = crate::core::time::now_ms(); conn.execute( "INSERT INTO embedding_metadata (document_id, model, dims, content_hash, created_at, last_error, attempt_count, last_attempt_at) VALUES (?, 'nomic-embed-text', 768, ?, ?, ?, 1, ?) ON CONFLICT(document_id) DO UPDATE SET last_error = excluded.last_error, attempt_count = attempt_count + 1, last_attempt_at = excluded.last_attempt_at", rusqlite::params![document_id, content_hash, now, error, now], )?; Ok(()) } ``` **Acceptance Criteria:** - [ ] New documents get embedded - [ ] Changed documents (hash mismatch) get re-embedded - [ ] Unchanged documents skipped - [ ] Failures recorded in `embedding_metadata.last_error` - [ ] Failures record actual content_hash (not empty string) - [ ] Writes batched in transactions for performance - [ ] Concurrency parameter respected - [ ] Progress reported during embedding --- ### 4.5 CLI: `gi embed` **File:** `src/cli/commands/embed.rs` ```rust //! Embed command - generate embeddings for documents. use indicatif::{ProgressBar, ProgressStyle}; use serde::Serialize; use crate::core::error::Result; use crate::embedding::{embed_documents, EmbedResult, OllamaClient, OllamaConfig}; use crate::Config; /// Run embedding command. pub async fn run_embed( config: &Config, retry_failed: bool, ) -> Result { let ollama_config = OllamaConfig { base_url: config.embedding.base_url.clone(), model: config.embedding.model.clone(), timeout_secs: 120, }; let client = OllamaClient::new(ollama_config); // Health check client.health_check().await?; // Run embedding let result = embed_documents( &conn, &client, config.embedding.concurrency as usize, None, ).await?; Ok(result) } /// Print human-readable output. pub fn print_embed(result: &EmbedResult, elapsed_secs: u64) { println!("Embedding complete:"); println!(" Embedded: {:>6} documents", result.embedded); println!(" Failed: {:>6} documents", result.failed); println!(" Skipped: {:>6} documents", result.skipped); println!(" Elapsed: {}m {}s", elapsed_secs / 60, elapsed_secs % 60); } /// Print JSON output for robot mode. pub fn print_embed_json(result: &EmbedResult, elapsed_ms: u64) { let output = serde_json::json!({ "ok": true, "data": { "embedded": result.embedded, "failed": result.failed, "skipped": result.skipped }, "meta": { "elapsed_ms": elapsed_ms } }); println!("{}", serde_json::to_string_pretty(&output).unwrap()); } ``` **CLI integration:** ```rust /// Embed subcommand arguments. #[derive(Args)] pub struct EmbedArgs { /// Retry only previously failed documents #[arg(long)] retry_failed: bool, } ``` **Acceptance Criteria:** - [ ] Embeds documents without embeddings - [ ] Re-embeds documents with changed hash - [ ] `--retry-failed` only processes failed documents - [ ] Progress bar with count - [ ] Clear error if Ollama unavailable --- ### 4.6 CLI: `gi stats` **File:** `src/cli/commands/stats.rs` ```rust //! Stats command - display document and embedding statistics. use rusqlite::Connection; use serde::Serialize; use crate::core::error::Result; use crate::Config; /// Document statistics. #[derive(Debug, Serialize)] pub struct Stats { pub documents: DocumentStats, pub embeddings: EmbeddingStats, pub fts: FtsStats, } #[derive(Debug, Serialize)] pub struct DocumentStats { pub issues: usize, pub mrs: usize, pub discussions: usize, pub total: usize, pub truncated: usize, } #[derive(Debug, Serialize)] pub struct EmbeddingStats { pub embedded: usize, pub pending: usize, pub failed: usize, pub coverage_pct: f64, } #[derive(Debug, Serialize)] pub struct FtsStats { pub indexed: usize, } /// Integrity check result. #[derive(Debug, Serialize)] pub struct IntegrityCheck { pub documents_count: usize, pub fts_count: usize, pub embeddings_count: usize, pub metadata_count: usize, pub orphaned_embeddings: usize, pub hash_mismatches: usize, pub ok: bool, } /// Run stats command. pub fn run_stats(config: &Config) -> Result { // Query counts from database todo!() } /// Run integrity check (--check flag). /// /// Verifies: /// - documents count == documents_fts count /// - embeddings.rowid all exist in documents.id /// - embedding_metadata.content_hash == documents.content_hash pub fn run_integrity_check(config: &Config) -> Result { // 1. Count documents // 2. Count FTS entries // 3. Find orphaned embeddings (no matching document) // 4. Find hash mismatches between embedding_metadata and documents // 5. Return check results todo!() } /// Print human-readable stats. pub fn print_stats(stats: &Stats) { println!("Document Statistics:"); println!(" Issues: {:>6} documents", stats.documents.issues); println!(" MRs: {:>6} documents", stats.documents.mrs); println!(" Discussions: {:>6} documents", stats.documents.discussions); println!(" Total: {:>6} documents", stats.documents.total); if stats.documents.truncated > 0 { println!(" Truncated: {:>6}", stats.documents.truncated); } println!(); println!("Embedding Coverage:"); println!(" Embedded: {:>6} ({:.1}%)", stats.embeddings.embedded, stats.embeddings.coverage_pct); println!(" Pending: {:>6}", stats.embeddings.pending); println!(" Failed: {:>6}", stats.embeddings.failed); println!(); println!("FTS Index:"); println!(" Indexed: {:>6} documents", stats.fts.indexed); } /// Print integrity check results. pub fn print_integrity_check(check: &IntegrityCheck) { println!("Integrity Check:"); println!(" Documents: {:>6}", check.documents_count); println!(" FTS entries: {:>6}", check.fts_count); println!(" Embeddings: {:>6}", check.embeddings_count); println!(" Metadata: {:>6}", check.metadata_count); if check.orphaned_embeddings > 0 { println!(" Orphaned embeddings: {:>6} (WARN)", check.orphaned_embeddings); } if check.hash_mismatches > 0 { println!(" Hash mismatches: {:>6} (WARN)", check.hash_mismatches); } println!(); println!(" Status: {}", if check.ok { "OK" } else { "ISSUES FOUND" }); } /// Print JSON stats for robot mode. pub fn print_stats_json(stats: &Stats) { let output = serde_json::json!({ "ok": true, "data": stats }); println!("{}", serde_json::to_string_pretty(&output).unwrap()); } ``` **CLI integration:** ```rust /// Stats subcommand arguments. #[derive(Args)] pub struct StatsArgs { /// Run integrity checks (document/FTS/embedding consistency) #[arg(long)] check: bool, } ``` **Acceptance Criteria:** - [ ] Shows document counts by type - [ ] Shows embedding coverage - [ ] Shows FTS index count - [ ] Identifies truncated documents - [ ] `--check` verifies document/FTS/embedding consistency - [ ] JSON output for scripting --- ## Phase 5: Hybrid Search ### 5.1 Vector Search Function **File:** `src/search/vector.rs` ```rust use rusqlite::Connection; use crate::core::error::Result; /// Vector search result. #[derive(Debug, Clone)] pub struct VectorResult { pub document_id: i64, pub distance: f64, // Lower = more similar } /// Search documents using vector similarity. /// /// Uses sqlite-vec for efficient vector search. /// Returns document IDs sorted by distance (lower = better match). /// /// IMPORTANT: sqlite-vec KNN queries require: /// - k parameter for number of results /// - embedding passed as raw little-endian bytes pub fn search_vector( conn: &Connection, query_embedding: &[f32], limit: usize, ) -> Result> { // Convert embedding to bytes for sqlite-vec let embedding_bytes: Vec = query_embedding .iter() .flat_map(|f| f.to_le_bytes()) .collect(); let mut stmt = conn.prepare( "SELECT rowid, distance FROM embeddings WHERE embedding MATCH ? AND k = ? ORDER BY distance LIMIT ?" )?; let results = stmt .query_map(rusqlite::params![embedding_bytes, limit, limit], |row| { Ok(VectorResult { document_id: row.get(0)?, distance: row.get(1)?, }) })? .collect::, _>>()?; Ok(results) } ``` **Acceptance Criteria:** - [ ] Returns document IDs with distances - [ ] Lower distance = better match - [ ] Works with 768-dim vectors - [ ] Uses k parameter for KNN query - [ ] Embedding passed as bytes --- ### 5.2 RRF Ranking **File:** `src/search/rrf.rs` ```rust use std::collections::HashMap; /// RRF ranking constant. const RRF_K: f64 = 60.0; /// RRF-ranked result. #[derive(Debug, Clone)] pub struct RrfResult { pub document_id: i64, pub rrf_score: f64, // Raw RRF score pub normalized_score: f64, // Normalized to 0-1 pub vector_rank: Option, pub fts_rank: Option, } /// Rank documents using Reciprocal Rank Fusion. /// /// Algorithm: /// RRF_score(d) = Σ 1 / (k + rank_i(d)) /// /// Where: /// - k = 60 (tunable constant) /// - rank_i(d) = rank of document d in retriever i (1-indexed) /// - Sum over all retrievers where document appears pub fn rank_rrf( vector_results: &[(i64, f64)], // (doc_id, distance) fts_results: &[(i64, f64)], // (doc_id, bm25_score) ) -> Vec { let mut scores: HashMap, Option)> = HashMap::new(); // Add vector results (1-indexed ranks) for (rank, (doc_id, _)) in vector_results.iter().enumerate() { let rrf_contribution = 1.0 / (RRF_K + (rank + 1) as f64); let entry = scores.entry(*doc_id).or_insert((0.0, None, None)); entry.0 += rrf_contribution; entry.1 = Some(rank + 1); } // Add FTS results (1-indexed ranks) for (rank, (doc_id, _)) in fts_results.iter().enumerate() { let rrf_contribution = 1.0 / (RRF_K + (rank + 1) as f64); let entry = scores.entry(*doc_id).or_insert((0.0, None, None)); entry.0 += rrf_contribution; entry.2 = Some(rank + 1); } // Convert to results and sort by RRF score descending let mut results: Vec<_> = scores .into_iter() .map(|(doc_id, (rrf_score, vector_rank, fts_rank))| { RrfResult { document_id: doc_id, rrf_score, normalized_score: 0.0, // Will be set below vector_rank, fts_rank, } }) .collect(); results.sort_by(|a, b| b.rrf_score.partial_cmp(&a.rrf_score).unwrap()); // Normalize scores to 0-1 if let Some(max_score) = results.first().map(|r| r.rrf_score) { for result in &mut results { result.normalized_score = result.rrf_score / max_score; } } results } ``` **Acceptance Criteria:** - [ ] Documents in both lists score higher - [ ] Documents in one list still included - [ ] Normalized score = rrfScore / max(rrfScore) - [ ] Raw RRF score available in `--explain` output --- ### 5.3 Adaptive Recall **File:** `src/search/hybrid.rs` ```rust use rusqlite::Connection; use crate::core::error::Result; use crate::embedding::OllamaClient; use crate::search::{SearchFilters, search_fts, search_vector, rank_rrf, RrfResult}; /// Base recall for unfiltered search. const BASE_RECALL: usize = 50; /// Expanded recall when filters are applied. const FILTERED_RECALL: usize = 200; /// Search mode. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum SearchMode { Hybrid, // Vector + FTS with RRF Lexical, // FTS only Semantic, // Vector only } impl SearchMode { pub fn from_str(s: &str) -> Option { match s.to_lowercase().as_str() { "hybrid" => Some(Self::Hybrid), "lexical" | "fts" => Some(Self::Lexical), "semantic" | "vector" => Some(Self::Semantic), _ => None, } } pub fn as_str(&self) -> &'static str { match self { Self::Hybrid => "hybrid", Self::Lexical => "lexical", Self::Semantic => "semantic", } } } /// Hybrid search result. #[derive(Debug)] pub struct HybridResult { pub document_id: i64, pub score: f64, pub vector_rank: Option, pub fts_rank: Option, pub rrf_score: f64, } /// Execute hybrid search. /// /// Adaptive recall: expands topK when filters are applied to prevent /// "no results" when relevant docs exist but would be filtered out. /// /// IMPORTANT: All modes use RRF consistently to ensure rank fields /// are populated correctly for --explain output. pub async fn search_hybrid( conn: &Connection, client: Option<&OllamaClient>, query: &str, mode: SearchMode, filters: &SearchFilters, ) -> Result<(Vec, Vec)> { let mut warnings: Vec = Vec::new(); // Determine recall based on filters let top_k = if filters.has_any_filter() { FILTERED_RECALL } else { BASE_RECALL }; match mode { SearchMode::Lexical => { // FTS only - use RRF with empty vector results for consistent ranking let fts_results = search_fts(conn, query, top_k)?; let fts_tuples: Vec<_> = fts_results.iter().map(|r| (r.document_id, r.rank)).collect(); let ranked = rank_rrf(&[], &fts_tuples); let results = ranked .into_iter() .map(|r| HybridResult { document_id: r.document_id, score: r.normalized_score, vector_rank: r.vector_rank, fts_rank: r.fts_rank, rrf_score: r.rrf_score, }) .collect(); Ok((results, warnings)) } SearchMode::Semantic => { // Vector only - requires client let client = client.ok_or_else(|| crate::core::error::GiError::OllamaUnavailable { base_url: "unknown".into(), source: None, })?; let query_embedding = client.embed_batch(vec![query.to_string()]).await?; let embedding = query_embedding.into_iter().next().unwrap(); let vec_results = search_vector(conn, &embedding, top_k)?; // Use RRF with empty FTS results for consistent ranking let vec_tuples: Vec<_> = vec_results.iter().map(|r| (r.document_id, r.distance)).collect(); let ranked = rank_rrf(&vec_tuples, &[]); let results = ranked .into_iter() .map(|r| HybridResult { document_id: r.document_id, score: r.normalized_score, vector_rank: r.vector_rank, fts_rank: r.fts_rank, rrf_score: r.rrf_score, }) .collect(); Ok((results, warnings)) } SearchMode::Hybrid => { // Both retrievers with RRF fusion let fts_results = search_fts(conn, query, top_k)?; let vec_results = if let Some(client) = client { let query_embedding = client.embed_batch(vec![query.to_string()]).await?; let embedding = query_embedding.into_iter().next().unwrap(); search_vector(conn, &embedding, top_k)? } else { // Graceful degradation: use FTS only warnings.push("Embedding service unavailable, using lexical search only".into()); Vec::new() }; // RRF fusion let vec_tuples: Vec<_> = vec_results.iter().map(|r| (r.document_id, r.distance)).collect(); let fts_tuples: Vec<_> = fts_results.iter().map(|r| (r.document_id, r.rank)).collect(); let ranked = rank_rrf(&vec_tuples, &fts_tuples); let results = ranked .into_iter() .map(|r| HybridResult { document_id: r.document_id, score: r.normalized_score, vector_rank: r.vector_rank, fts_rank: r.fts_rank, rrf_score: r.rrf_score, }) .collect(); Ok((results, warnings)) } } } ``` **Acceptance Criteria:** - [ ] Unfiltered search uses topK=50 - [ ] Any filter triggers topK=200 - [ ] Final results still limited by `--limit` --- ### 5.4 Graceful Degradation When Ollama unavailable during hybrid/semantic search: 1. Log warning: "Embedding service unavailable, using lexical search only" 2. Fall back to FTS-only search 3. Include warning in response **Acceptance Criteria:** - [ ] Default mode is hybrid - [ ] `--mode=lexical` works without Ollama - [ ] `--mode=semantic` requires Ollama - [ ] Graceful degradation when Ollama down - [ ] `--explain` shows rank breakdown - [ ] All Phase 3 filters work in hybrid mode --- ## Phase 6: Sync Orchestration ### 6.1 Dirty Source Tracking **File:** `src/ingestion/dirty_tracker.rs` ```rust use rusqlite::Connection; use crate::core::error::Result; use crate::core::time::now_ms; use crate::documents::SourceType; /// Maximum dirty sources to process per sync run. const MAX_DIRTY_SOURCES_PER_RUN: usize = 500; /// Mark a source as dirty (needs document regeneration). /// /// Called during entity upsert operations. /// Uses INSERT OR IGNORE to avoid duplicates. pub fn mark_dirty( conn: &Connection, source_type: SourceType, source_id: i64, ) -> Result<()> { conn.execute( "INSERT OR IGNORE INTO dirty_sources (source_type, source_id, queued_at) VALUES (?, ?, ?)", rusqlite::params![source_type.as_str(), source_id, now_ms()], )?; Ok(()) } /// Get dirty sources ordered by queue time (bounded). /// /// Limits results to prevent unbounded processing during large syncs. pub fn get_dirty_sources(conn: &Connection) -> Result> { let mut stmt = conn.prepare( "SELECT source_type, source_id FROM dirty_sources ORDER BY queued_at LIMIT ?" )?; let results = stmt .query_map([MAX_DIRTY_SOURCES_PER_RUN], |row| { let type_str: String = row.get(0)?; let source_type = match type_str.as_str() { "issue" => SourceType::Issue, "merge_request" => SourceType::MergeRequest, "discussion" => SourceType::Discussion, other => return Err(rusqlite::Error::FromSqlConversionFailure( 0, rusqlite::types::Type::Text, Box::new(std::io::Error::new( std::io::ErrorKind::InvalidData, format!("invalid source_type: {other}"), )), )), }; Ok((source_type, row.get(1)?)) })? .collect::, _>>()?; Ok(results) } /// Clear dirty source after processing. pub fn clear_dirty( conn: &Connection, source_type: SourceType, source_id: i64, ) -> Result<()> { conn.execute( "DELETE FROM dirty_sources WHERE source_type = ? AND source_id = ?", rusqlite::params![source_type.as_str(), source_id], )?; Ok(()) } ``` **Acceptance Criteria:** - [ ] Upserted entities added to dirty_sources - [ ] Duplicates ignored - [ ] Queue cleared after document regeneration - [ ] Processing bounded per run (max 500) --- ### 6.2 Pending Discussion Queue **File:** `src/ingestion/discussion_queue.rs` ```rust use rusqlite::Connection; use crate::core::error::Result; use crate::core::time::now_ms; /// Noteable type for discussion fetching. #[derive(Debug, Clone, Copy)] pub enum NoteableType { Issue, MergeRequest, } impl NoteableType { pub fn as_str(&self) -> &'static str { match self { Self::Issue => "Issue", Self::MergeRequest => "MergeRequest", } } } /// Pending discussion fetch entry. pub struct PendingFetch { pub project_id: i64, pub noteable_type: NoteableType, pub noteable_iid: i64, pub attempt_count: i64, } /// Queue a discussion fetch for an entity. pub fn queue_discussion_fetch( conn: &Connection, project_id: i64, noteable_type: NoteableType, noteable_iid: i64, ) -> Result<()> { conn.execute( "INSERT OR REPLACE INTO pending_discussion_fetches (project_id, noteable_type, noteable_iid, queued_at, attempt_count, last_attempt_at, last_error) VALUES (?, ?, ?, ?, 0, NULL, NULL)", rusqlite::params![project_id, noteable_type.as_str(), noteable_iid, now_ms()], )?; Ok(()) } /// Get pending fetches with exponential backoff. /// /// Only returns items that have waited long enough based on attempt_count. /// Backoff formula: min_wait_ms = 1000 * 2^attempt_count (capped at 1 hour) /// /// Limited to `max_items` to bound API calls per sync run. pub fn get_pending_fetches(conn: &Connection, max_items: usize) -> Result> { let now = now_ms(); let mut stmt = conn.prepare( "SELECT project_id, noteable_type, noteable_iid, attempt_count FROM pending_discussion_fetches WHERE last_attempt_at IS NULL OR (? - last_attempt_at) > MIN(3600000, 1000 * (1 << attempt_count)) ORDER BY attempt_count ASC, queued_at ASC LIMIT ?" )?; let results = stmt .query_map(rusqlite::params![now, max_items], |row| { let type_str: String = row.get(1)?; let noteable_type = if type_str == "Issue" { NoteableType::Issue } else { NoteableType::MergeRequest }; Ok(PendingFetch { project_id: row.get(0)?, noteable_type, noteable_iid: row.get(2)?, attempt_count: row.get(3)?, }) })? .collect::, _>>()?; Ok(results) } /// Mark fetch as successful and remove from queue. pub fn complete_fetch( conn: &Connection, project_id: i64, noteable_type: NoteableType, noteable_iid: i64, ) -> Result<()> { conn.execute( "DELETE FROM pending_discussion_fetches WHERE project_id = ? AND noteable_type = ? AND noteable_iid = ?", rusqlite::params![project_id, noteable_type.as_str(), noteable_iid], )?; Ok(()) } /// Record fetch failure. pub fn record_fetch_error( conn: &Connection, project_id: i64, noteable_type: NoteableType, noteable_iid: i64, error: &str, ) -> Result<()> { conn.execute( "UPDATE pending_discussion_fetches SET attempt_count = attempt_count + 1, last_attempt_at = ?, last_error = ? WHERE project_id = ? AND noteable_type = ? AND noteable_iid = ?", rusqlite::params![now_ms(), error, project_id, noteable_type.as_str(), noteable_iid], )?; Ok(()) } ``` **Acceptance Criteria:** - [ ] Updated entities queued for discussion fetch - [ ] Success removes from queue - [ ] Failure increments attempt_count - [ ] Processing bounded per run (max 100) - [ ] Exponential backoff respects attempt_count --- ### 6.3 Document Regenerator **File:** `src/documents/regenerator.rs` ```rust use rusqlite::Connection; use crate::core::error::Result; use crate::documents::{ extract_issue_document, extract_mr_document, extract_discussion_document, DocumentData, SourceType, }; use crate::ingestion::dirty_tracker::{get_dirty_sources, clear_dirty}; /// Result of regeneration run. #[derive(Debug, Default)] pub struct RegenerateResult { pub regenerated: usize, pub unchanged: usize, pub errored: usize, } /// Regenerate documents from dirty queue. /// /// Process: /// 1. Query dirty_sources ordered by queued_at /// 2. For each: regenerate document, compute new hash /// 3. ALWAYS upsert document (labels/paths may change even if content_hash unchanged) /// 4. Track whether content_hash changed (for stats) /// 5. Delete from dirty_sources (or record error on failure) pub fn regenerate_dirty_documents(conn: &Connection) -> Result { let dirty = get_dirty_sources(conn)?; let mut result = RegenerateResult::default(); for (source_type, source_id) in &dirty { match regenerate_one(conn, *source_type, *source_id) { Ok(changed) => { if changed { result.regenerated += 1; } else { result.unchanged += 1; } clear_dirty(conn, *source_type, *source_id)?; } Err(e) => { // Fail-soft: record error but continue processing remaining items record_dirty_error(conn, *source_type, *source_id, &e.to_string())?; result.errored += 1; } } } Ok(result) } /// Regenerate a single document. Returns true if content_hash changed. /// /// If the source entity has been deleted, the corresponding document /// is also deleted (cascade cleans up labels, paths, embeddings). fn regenerate_one( conn: &Connection, source_type: SourceType, source_id: i64, ) -> Result { // Extractors return Option: None means source entity was deleted let doc = match source_type { SourceType::Issue => extract_issue_document(conn, source_id)?, SourceType::MergeRequest => extract_mr_document(conn, source_id)?, SourceType::Discussion => extract_discussion_document(conn, source_id)?, }; let Some(doc) = doc else { // Source was deleted — remove the document (cascade handles FTS/embeddings) delete_document(conn, source_type, source_id)?; return Ok(true); }; let existing_hash = get_existing_hash(conn, source_type, source_id)?; let changed = existing_hash.as_ref() != Some(&doc.content_hash); // Always upsert: labels/paths can change independently of content_hash upsert_document(conn, &doc)?; Ok(changed) } /// Delete a document by source identity (cascade handles FTS trigger, labels, paths, embeddings). fn delete_document( conn: &Connection, source_type: SourceType, source_id: i64, ) -> Result<()> { conn.execute( "DELETE FROM documents WHERE source_type = ? AND source_id = ?", rusqlite::params![source_type.as_str(), source_id], )?; Ok(()) } /// Record a regeneration error on a dirty source for retry. fn record_dirty_error( conn: &Connection, source_type: SourceType, source_id: i64, error: &str, ) -> Result<()> { conn.execute( "UPDATE dirty_sources SET attempt_count = attempt_count + 1, last_attempt_at = ?, last_error = ? WHERE source_type = ? AND source_id = ?", rusqlite::params![crate::core::time::now_ms(), error, source_type.as_str(), source_id], )?; Ok(()) } fn get_existing_hash( conn: &Connection, source_type: SourceType, source_id: i64, ) -> Result> { let mut stmt = conn.prepare( "SELECT content_hash FROM documents WHERE source_type = ? AND source_id = ?" )?; let hash: Option = stmt .query_row(rusqlite::params![source_type.as_str(), source_id], |row| row.get(0)) .ok(); Ok(hash) } fn upsert_document(conn: &Connection, doc: &DocumentData) -> Result<()> { // Upsert main document conn.execute( "INSERT INTO documents (source_type, source_id, project_id, author_username, label_names, created_at, updated_at, url, title, content_text, content_hash, is_truncated, truncated_reason) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(source_type, source_id) DO UPDATE SET author_username = excluded.author_username, label_names = excluded.label_names, updated_at = excluded.updated_at, url = excluded.url, title = excluded.title, content_text = excluded.content_text, content_hash = excluded.content_hash, is_truncated = excluded.is_truncated, truncated_reason = excluded.truncated_reason", rusqlite::params![ doc.source_type.as_str(), doc.source_id, doc.project_id, doc.author_username, serde_json::to_string(&doc.labels)?, doc.created_at, doc.updated_at, doc.url, doc.title, doc.content_text, doc.content_hash, doc.is_truncated, doc.truncated_reason, ], )?; // Get inserted/updated document ID let doc_id = get_document_id(conn, doc.source_type, doc.source_id)?; // Update labels conn.execute( "DELETE FROM document_labels WHERE document_id = ?", [doc_id], )?; for label in &doc.labels { conn.execute( "INSERT INTO document_labels (document_id, label_name) VALUES (?, ?)", rusqlite::params![doc_id, label], )?; } // Update paths conn.execute( "DELETE FROM document_paths WHERE document_id = ?", [doc_id], )?; for path in &doc.paths { conn.execute( "INSERT INTO document_paths (document_id, path) VALUES (?, ?)", rusqlite::params![doc_id, path], )?; } Ok(()) } fn get_document_id( conn: &Connection, source_type: SourceType, source_id: i64, ) -> Result { let id: i64 = conn.query_row( "SELECT id FROM documents WHERE source_type = ? AND source_id = ?", rusqlite::params![source_type.as_str(), source_id], |row| row.get(0), )?; Ok(id) } ``` **Acceptance Criteria:** - [ ] Dirty sources get documents regenerated - [ ] Hash comparison prevents unnecessary updates - [ ] FTS triggers fire on document update - [ ] Queue cleared after processing --- ### 6.4 CLI: `gi sync` **File:** `src/cli/commands/sync.rs` ```rust //! Sync command - orchestrate full sync pipeline. use serde::Serialize; use crate::core::error::Result; use crate::Config; /// Sync result summary. #[derive(Debug, Serialize)] pub struct SyncResult { pub issues_updated: usize, pub mrs_updated: usize, pub discussions_fetched: usize, pub documents_regenerated: usize, pub documents_embedded: usize, } /// Sync options. #[derive(Debug, Default)] pub struct SyncOptions { pub full: bool, // Reset cursors, fetch everything pub force: bool, // Override stale lock pub no_embed: bool, // Skip embedding step pub no_docs: bool, // Skip document regeneration } /// Run sync orchestration. /// /// Steps: /// 1. Acquire app lock with heartbeat /// 2. Ingest delta (issues, MRs) based on cursors /// 3. Process pending_discussion_fetches queue (bounded) /// 4. Apply rolling backfill window (configurable, default 14 days) /// 5. Regenerate documents from dirty_sources /// 6. Embed documents with changed content_hash /// 7. Release lock, record sync_run pub async fn run_sync(config: &Config, options: SyncOptions) -> Result { // Implementation uses existing ingestion orchestrator // and new document/embedding pipelines todo!() } /// Print human-readable sync output. pub fn print_sync(result: &SyncResult, elapsed_secs: u64) { println!("Sync complete:"); println!(" Issues updated: {:>6}", result.issues_updated); println!(" MRs updated: {:>6}", result.mrs_updated); println!(" Discussions fetched: {:>6}", result.discussions_fetched); println!(" Documents regenerated: {:>6}", result.documents_regenerated); println!(" Documents embedded: {:>6}", result.documents_embedded); println!(" Elapsed: {}m {}s", elapsed_secs / 60, elapsed_secs % 60); } /// Print JSON sync output for robot mode. pub fn print_sync_json(result: &SyncResult, elapsed_ms: u64) { let output = serde_json::json!({ "ok": true, "data": result, "meta": { "elapsed_ms": elapsed_ms } }); println!("{}", serde_json::to_string_pretty(&output).unwrap()); } ``` **CLI integration:** ```rust /// Sync subcommand arguments. #[derive(Args)] pub struct SyncArgs { /// Reset cursors, fetch everything #[arg(long)] full: bool, /// Override stale lock #[arg(long)] force: bool, /// Skip embedding step #[arg(long)] no_embed: bool, /// Skip document regeneration #[arg(long)] no_docs: bool, } ``` **Acceptance Criteria:** - [ ] Orchestrates full sync pipeline - [ ] Respects app lock - [ ] `--full` resets cursors - [ ] `--no-embed` skips embedding - [ ] `--no-docs` skips document regeneration - [ ] Progress reporting in human mode - [ ] JSON summary in robot mode --- ## Testing Strategy ### Unit Tests | Module | Test File | Coverage | |--------|-----------|----------| | Document extractor | `src/documents/extractor.rs` (mod tests) | Issue/MR/discussion extraction | | Truncation | `src/documents/truncation.rs` (mod tests) | All edge cases | | RRF ranking | `src/search/rrf.rs` (mod tests) | Score computation, merging | | Content hash | `src/documents/extractor.rs` (mod tests) | Deterministic hashing | ### Integration Tests | Feature | Test File | Coverage | |---------|-----------|----------| | FTS search | `tests/fts_search.rs` | Stemming, empty results | | Embedding storage | `tests/embedding.rs` | sqlite-vec operations | | Hybrid search | `tests/hybrid_search.rs` | Combined retrieval | | Sync orchestration | `tests/sync.rs` | Full pipeline | ### Golden Query Suite **File:** `tests/fixtures/golden_queries.json` ```json [ { "query": "authentication redesign", "expected_urls": [".../-/issues/234", ".../-/merge_requests/847"], "min_results": 1, "max_rank": 10 } ] ``` Each query must have at least one expected URL in top 10 results. --- ## CLI Smoke Tests | Command | Expected | Pass Criteria | |---------|----------|---------------| | `gi generate-docs` | Progress, count | Completes, count > 0 | | `gi generate-docs` (re-run) | 0 regenerated | Hash comparison works | | `gi embed` | Progress, count | Completes, count matches docs | | `gi embed` (re-run) | 0 embedded | Skips unchanged | | `gi stats` | Coverage stats | Shows 100% after embed | | `gi search "auth" --mode=lexical` | Results | Works without Ollama | | `gi search "auth"` | Hybrid results | Vector + FTS combined | | `gi search "auth" --explain` | Rank breakdown | Shows vector/FTS/RRF | | `gi search "auth" --type=mr` | Filtered results | Only MRs | | `gi search "auth" --label=bug` | Filtered results | Only labeled docs | | `gi search "nonexistent123"` | No results | Graceful empty state | | `gi sync` | Full pipeline | All steps complete | | `gi sync --no-embed` | Skip embedding | Docs generated, not embedded | --- ## Data Integrity Checks - [ ] `documents` count = issues + MRs + discussions - [ ] `documents_fts` count = `documents` count - [ ] `embeddings` count = `documents` count (after full embed) - [ ] `embedding_metadata.content_hash` = `documents.content_hash` for all rows - [ ] All `document_labels` reference valid documents - [ ] All `document_paths` reference valid documents - [ ] No orphaned embeddings (embeddings.rowid without matching documents.id) - [ ] Discussion documents exclude system notes - [ ] Discussion documents include parent title --- ## Success Criteria Checkpoint 3 is complete when: 1. **Lexical search works without Ollama** - `gi search "query" --mode=lexical` returns relevant results - All filters functional 2. **Semantic search works with Ollama** - `gi embed` completes successfully - `gi search "query"` returns semantically relevant results - `--explain` shows ranking breakdown 3. **Hybrid search combines both** - Documents appearing in both retrievers rank higher - Graceful degradation when Ollama unavailable 4. **Incremental sync is efficient** - `gi sync` only processes changed entities - Re-embedding only happens for changed documents - Progress visible during long syncs 5. **Data integrity maintained** - All counts match between tables - No orphaned records - Hashes consistent 6. **Tests pass** - Unit tests for core algorithms - Integration tests for pipelines - Golden queries return expected results