Files
gitlore/docs/prd/checkpoint-3.md
Taylor Eernisse 9b63671df9 docs: Update documentation for search pipeline and Phase A spec
- README.md: Add hybrid search and robot mode to feature list. Update
  quick start to use new noun-first CLI syntax (lore issues, lore mrs,
  lore search). Add embedding configuration section. Update command
  examples throughout.

- AGENTS.md: Update robot mode examples to new CLI syntax. Add search,
  sync, stats, and generate-docs commands to the robot mode reference.
  Update flag conventions (-n for limit, -s for state, -J for JSON).

- docs/prd/checkpoint-3.md: Major expansion with gated milestone
  structure (Gate A: lexical, Gate B: hybrid, Gate C: sync). Add
  prerequisite rename note, code sample conventions, chunking strategy
  details, and sqlite-vec rowid encoding scheme. Clarify that Gate A
  requires only SQLite + FTS5 with no sqlite-vec dependency.

- docs/phase-a-spec.md: New detailed specification for Gate A (lexical
  search MVP) covering document schema, FTS5 configuration, dirty
  queue mechanics, CLI interface, and acceptance criteria.

- docs/api-efficiency-findings.md: Analysis of GitLab API pagination
  behavior and efficiency observations from production sync runs.
  Documents the missing x-next-page header issue and heuristic fix.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-30 15:47:33 -05:00

142 KiB

Checkpoint 3: Search & Sync MVP

Status: Planning Prerequisite: Checkpoints 0, 1, 2 complete (issues, MRs, discussions ingested) Goal: Deliver working semantic + lexical hybrid search with efficient incremental sync

This checkpoint consolidates SPEC.md checkpoints 3A, 3B, 4, and 5 into a unified implementation plan. The work is structured for parallel agent execution where dependencies allow.

All code integrates with existing gitlore infrastructure:

  • Error handling via LoreError and ErrorCode in src/core/error.rs
  • CLI patterns matching src/cli/commands/*.rs (run functions, JSON/human output)
  • Database via rusqlite::Connection with migrations in migrations/
  • Database connections via create_connection(db_path) in src/core/db.rs
  • Config via src/core/config.rs (EmbeddingConfig already defined)
  • Robot mode JSON with {"ok": true, "data": {...}} pattern

Code Sample Convention: Rust code samples in this PRD omit use imports for brevity. Implementers should add the appropriate imports (use crate::..., use rusqlite::..., etc.) based on the types and functions referenced in each sample.

Prerequisite Rename: Before or during Gate A, rename GiErrorLoreError across the codebase (16 files). Update src/core/error.rs enum name, src/core/mod.rs re-export, src/lib.rs re-export, and all use statements. This is a mechanical find-and-replace.


Executive Summary (Gated Milestones)

This checkpoint ships in three gates to reduce integration risk. Each gate is independently verifiable and shippable:

Gate A (Lexical MVP): documents + FTS + filters + lore search --mode=lexical + lore stats (no sqlite-vec dependency) Gate B (Hybrid MVP): sqlite-vec extension + embeddings + vector + RRF fusion + graceful degradation Gate C (Sync MVP): lore sync orchestration + queues/backoff + integrity check/repair

Deliverables:

Gate A

  1. Document generation from issues/MRs/discussions with FTS5 indexing
  2. Lexical search + filters + snippets + lore stats

Gate B 3. Ollama-powered embedding pipeline with sqlite-vec storage 4. Hybrid search (RRF-ranked vector + lexical) with rich filtering + graceful degradation

Gate C 5. Orchestrated lore sync command with incremental doc regen + re-embedding 6. Integrity checks + repair paths for FTS/embeddings consistency

Key Design Decisions:

  • Documents are the search unit (not raw entities)
  • FTS5 works standalone when Ollama unavailable (graceful degradation)
  • Gate A requires only SQLite + FTS5; sqlite-vec is introduced in Gate B (reduces env risk)
  • Documents store full untruncated text with a hard safety cap (2MB) for pathological outliers
  • Embedding pipeline chunks long documents at embed time (overlap-aware paragraph splitting)
  • sqlite-vec rowid = doc_id * 1000 + chunk_index encodes chunks; vector search deduplicates by doc_id
  • RRF ranking avoids score normalization complexity
  • Queue-based discussion fetching isolates failures
  • FTS5 query sanitization prevents syntax errors from user input
  • Exponential backoff on all queues prevents hot-loop retries
  • Transient embed failures trigger graceful degradation (not hard errors)
  • lore sync is the unified command (ingest + generate-docs + embed); individual commands exist for recovery
  • Dirty source tracking: mark ALL upserted entities inside ingestion transactions via mark_dirty_tx(&Transaction)
  • Queue draining: generate-docs and sync drain queues completely using bounded batch loops (not single-pass ceilings)
  • Discussion sweep uses CTE to capture stale IDs before cascading deletes
  • Unchanged documents are fully skipped (content_hash + labels_hash + paths_hash triple-check)

Phase 1: Schema Foundation

1.1 Documents Schema (Migration 007)

File: migrations/007_documents.sql

-- Unified searchable documents (derived from issues/MRs/discussions)
CREATE TABLE documents (
  id INTEGER PRIMARY KEY,
  source_type TEXT NOT NULL CHECK (source_type IN ('issue','merge_request','discussion')),
  source_id INTEGER NOT NULL,    -- local DB id in the source table
  project_id INTEGER NOT NULL REFERENCES projects(id),
  author_username TEXT,          -- for discussions: first note author
  label_names TEXT,              -- JSON array (display/debug only)
  created_at INTEGER,            -- ms epoch UTC
  updated_at INTEGER,            -- ms epoch UTC
  url TEXT,
  title TEXT,                    -- null for discussions
  content_text TEXT NOT NULL,    -- canonical text for embedding/search
  content_hash TEXT NOT NULL,    -- SHA-256 for change detection
  labels_hash TEXT NOT NULL DEFAULT '',  -- SHA-256 over sorted labels (write optimization)
  paths_hash TEXT NOT NULL DEFAULT '',   -- SHA-256 over sorted paths (write optimization)
  is_truncated INTEGER NOT NULL DEFAULT 0,
  truncated_reason TEXT CHECK (
    truncated_reason IN (
      'token_limit_middle_drop','single_note_oversized','first_last_oversized',
      'hard_cap_oversized'
    )
    OR truncated_reason IS NULL
  ),
  UNIQUE(source_type, source_id)
);

CREATE INDEX idx_documents_project_updated ON documents(project_id, updated_at);
CREATE INDEX idx_documents_author ON documents(author_username);
CREATE INDEX idx_documents_source ON documents(source_type, source_id);
CREATE INDEX idx_documents_hash ON documents(content_hash);

-- Fast label filtering (indexed exact-match)
CREATE TABLE document_labels (
  document_id INTEGER NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
  label_name TEXT NOT NULL,
  PRIMARY KEY(document_id, label_name)
) WITHOUT ROWID;
CREATE INDEX idx_document_labels_label ON document_labels(label_name);

-- Fast path filtering (DiffNote file paths)
CREATE TABLE document_paths (
  document_id INTEGER NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
  path TEXT NOT NULL,
  PRIMARY KEY(document_id, path)
) WITHOUT ROWID;
CREATE INDEX idx_document_paths_path ON document_paths(path);

-- Queue for incremental document regeneration (with retry tracking)
-- Uses next_attempt_at for index-friendly backoff queries
CREATE TABLE dirty_sources (
  source_type TEXT NOT NULL CHECK (source_type IN ('issue','merge_request','discussion')),
  source_id INTEGER NOT NULL,
  queued_at INTEGER NOT NULL,    -- ms epoch UTC
  attempt_count INTEGER NOT NULL DEFAULT 0,
  last_attempt_at INTEGER,
  last_error TEXT,
  next_attempt_at INTEGER,       -- ms epoch UTC; NULL means ready immediately
  PRIMARY KEY(source_type, source_id)
);
CREATE INDEX idx_dirty_sources_next_attempt ON dirty_sources(next_attempt_at);

-- Resumable queue for dependent discussion fetching
-- Uses next_attempt_at for index-friendly backoff queries
CREATE TABLE pending_discussion_fetches (
  project_id INTEGER NOT NULL REFERENCES projects(id),
  noteable_type TEXT NOT NULL,            -- 'Issue' | 'MergeRequest'
  noteable_iid INTEGER NOT NULL,
  queued_at INTEGER NOT NULL,             -- ms epoch UTC
  attempt_count INTEGER NOT NULL DEFAULT 0,
  last_attempt_at INTEGER,
  last_error TEXT,
  next_attempt_at INTEGER,                -- ms epoch UTC; NULL means ready immediately
  PRIMARY KEY(project_id, noteable_type, noteable_iid)
);
CREATE INDEX idx_pending_discussions_next_attempt ON pending_discussion_fetches(next_attempt_at);

Acceptance Criteria:

  • Migration applies cleanly on fresh DB
  • Migration applies cleanly after CP2 schema
  • All foreign keys enforced
  • Indexes created
  • labels_hash and paths_hash columns present for write optimization
  • next_attempt_at indexed for efficient backoff queries

1.2 FTS5 Index (Migration 008)

File: migrations/008_fts5.sql

-- Full-text search with porter stemmer and prefix indexes for type-ahead
CREATE VIRTUAL TABLE documents_fts USING fts5(
  title,
  content_text,
  content='documents',
  content_rowid='id',
  tokenize='porter unicode61',
  prefix='2 3 4'
);

-- Keep FTS in sync via triggers.
-- IMPORTANT: COALESCE(title, '') ensures FTS5 external-content table never
-- receives NULL values, which can cause inconsistencies with delete operations.
-- FTS5 delete requires exact match of original values; NULL != NULL in SQL,
-- so a NULL title on insert would make the delete trigger fail silently.
CREATE TRIGGER documents_ai AFTER INSERT ON documents BEGIN
  INSERT INTO documents_fts(rowid, title, content_text)
  VALUES (new.id, COALESCE(new.title, ''), new.content_text);
END;

CREATE TRIGGER documents_ad AFTER DELETE ON documents BEGIN
  INSERT INTO documents_fts(documents_fts, rowid, title, content_text)
  VALUES('delete', old.id, COALESCE(old.title, ''), old.content_text);
END;

-- Only rebuild FTS when searchable text actually changes (not metadata-only updates)
CREATE TRIGGER documents_au AFTER UPDATE ON documents
WHEN old.title IS NOT new.title OR old.content_text != new.content_text
BEGIN
  INSERT INTO documents_fts(documents_fts, rowid, title, content_text)
  VALUES('delete', old.id, COALESCE(old.title, ''), old.content_text);
  INSERT INTO documents_fts(rowid, title, content_text)
  VALUES (new.id, COALESCE(new.title, ''), new.content_text);
END;

Acceptance Criteria:

  • documents_fts created as virtual table
  • Triggers fire on insert/update/delete
  • Update trigger only fires when title or content_text changes (not metadata-only updates)
  • FTS row count matches documents count after bulk insert
  • Prefix search works for type-ahead UX

1.3 Embeddings Schema (Migration 009) — Gate B Only

File: migrations/009_embeddings.sql

Gate Boundary: This migration is intentionally part of Gate B. Gate A ships with migrations 007 + 008 only, so it has zero dependency on sqlite-vec being present or loadable. The migration runner must load sqlite-vec before applying this migration, but Gate A can run without it.

-- NOTE: sqlite-vec vec0 virtual tables cannot participate in FK cascades.
-- We must use an explicit trigger to delete orphan embeddings when documents
-- are deleted. See documents_embeddings_ad trigger below.

-- sqlite-vec virtual table for vector search
-- Storage rule: embeddings.rowid = document_id * 1000 + chunk_index
-- This encodes (document_id, chunk_index) into a single integer rowid.
-- Supports up to 1000 chunks per document (32M chars at 32k/chunk).
CREATE VIRTUAL TABLE embeddings USING vec0(
  embedding float[768]
);

-- Embedding provenance + change detection (one row per chunk)
-- NOTE: Two hash columns serve different purposes:
--   document_hash: SHA-256 of full documents.content_text (staleness detection)
--   chunk_hash: SHA-256 of this individual chunk's text (debug/provenance)
-- Pending detection uses document_hash (not chunk_hash) because staleness is
-- a document-level condition: if the document changed, ALL chunks need re-embedding.
CREATE TABLE embedding_metadata (
  document_id INTEGER NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
  chunk_index INTEGER NOT NULL DEFAULT 0,   -- 0-indexed position within document
  model TEXT NOT NULL,           -- 'nomic-embed-text'
  dims INTEGER NOT NULL,         -- 768
  document_hash TEXT NOT NULL,   -- SHA-256 of full documents.content_text (staleness)
  chunk_hash TEXT NOT NULL,      -- SHA-256 of this chunk's text (provenance)
  created_at INTEGER NOT NULL,   -- ms epoch UTC
  last_error TEXT,               -- error message from last failed attempt
  attempt_count INTEGER NOT NULL DEFAULT 0,
  last_attempt_at INTEGER,       -- ms epoch UTC
  PRIMARY KEY(document_id, chunk_index)
);

CREATE INDEX idx_embedding_metadata_errors
  ON embedding_metadata(last_error) WHERE last_error IS NOT NULL;
CREATE INDEX idx_embedding_metadata_doc ON embedding_metadata(document_id);

-- CRITICAL: Delete ALL chunk embeddings when a document is deleted.
-- vec0 virtual tables don't support FK ON DELETE CASCADE, so we need this trigger.
-- embedding_metadata has ON DELETE CASCADE, so only vec0 needs explicit cleanup.
-- Range: [document_id * 1000, document_id * 1000 + 999]
CREATE TRIGGER documents_embeddings_ad AFTER DELETE ON documents BEGIN
  DELETE FROM embeddings
    WHERE rowid >= old.id * 1000
      AND rowid < (old.id + 1) * 1000;
END;

Chunking Constants:

Constant Value Rationale
CHUNK_MAX_CHARS 32,000 ~8k tokens at 4 chars/token, fits nomic-embed-text context window
CHUNK_OVERLAP_CHARS 500 ~125 tokens overlap preserves context at chunk boundaries
CHUNK_ROWID_MULTIPLIER 1,000 Encodes (doc_id, chunk_index) into single rowid; supports up to 1000 chunks/doc

Acceptance Criteria:

  • embeddings vec0 table created
  • embedding_metadata tracks provenance per chunk (composite PK: document_id, chunk_index)
  • Error tracking fields present for retry logic
  • Orphan cleanup trigger deletes ALL chunks (range deletion) on document deletion
  • Documents under 32k chars produce exactly 1 chunk (chunk_index=0)
  • Long documents split at paragraph boundaries with overlap

Dependencies:

  • Requires sqlite-vec extension loaded at runtime
  • Extension loading already happens in src/core/db.rs
  • Migration runner must load sqlite-vec before applying migrations (including on fresh DB)
  • Add rand crate to Cargo.toml (used by backoff jitter in src/core/backoff.rs)

Phase 2: Document Generation

2.1 Document Module Structure

New module: src/documents/

src/documents/
├── mod.rs           # Module exports
├── extractor.rs     # Document extraction from entities
├── truncation.rs    # Note-boundary aware truncation
└── regenerator.rs   # Dirty source processing

File: src/documents/mod.rs

//! Document generation and management.
//!
//! Extracts searchable documents from issues, MRs, and discussions.

mod extractor;
mod regenerator;
mod truncation;

pub use extractor::{
    extract_discussion_document, extract_issue_document, extract_mr_document,
    DocumentData, SourceType,
};
// Note: extract_*_document() return Result<Option<DocumentData>>
// None means the source entity was deleted from the database
pub use regenerator::regenerate_dirty_documents;
pub use truncation::{truncate_content, TruncationResult};

Update src/lib.rs:

pub mod documents;  // Add to existing modules

2.2 Document Types

File: src/documents/extractor.rs

use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};

/// Source type for documents.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SourceType {
    Issue,
    MergeRequest,
    Discussion,
}

impl SourceType {
    pub fn as_str(&self) -> &'static str {
        match self {
            Self::Issue => "issue",
            Self::MergeRequest => "merge_request",
            Self::Discussion => "discussion",
        }
    }

    /// Parse from CLI input, accepting common aliases.
    ///
    /// Accepts: "issue", "mr", "merge_request", "discussion"
    pub fn parse(s: &str) -> Option<Self> {
        match s.to_lowercase().as_str() {
            "issue" | "issues" => Some(Self::Issue),
            "mr" | "mrs" | "merge_request" | "merge_requests" => Some(Self::MergeRequest),
            "discussion" | "discussions" => Some(Self::Discussion),
            _ => None,
        }
    }
}

impl std::fmt::Display for SourceType {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{}", self.as_str())
    }
}

/// Generated document ready for storage.
#[derive(Debug, Clone)]
pub struct DocumentData {
    pub source_type: SourceType,
    pub source_id: i64,
    pub project_id: i64,
    pub author_username: Option<String>,
    pub labels: Vec<String>,
    pub paths: Vec<String>,  // DiffNote file paths
    pub labels_hash: String, // SHA-256 over sorted labels (write optimization)
    pub paths_hash: String,  // SHA-256 over sorted paths (write optimization)
    pub created_at: i64,
    pub updated_at: i64,
    pub url: Option<String>,
    pub title: Option<String>,
    pub content_text: String,
    pub content_hash: String,
    pub is_truncated: bool,
    pub truncated_reason: Option<String>,
}

/// Compute SHA-256 hash of content.
pub fn compute_content_hash(content: &str) -> String {
    let mut hasher = Sha256::new();
    hasher.update(content.as_bytes());
    format!("{:x}", hasher.finalize())
}

/// Compute SHA-256 hash over a sorted list of strings.
/// Used for labels_hash and paths_hash to detect changes efficiently.
pub fn compute_list_hash(items: &[String]) -> String {
    let mut sorted = items.to_vec();
    sorted.sort();
    let joined = sorted.join("\n");
    compute_content_hash(&joined)
}

Document Formats:

All document types use consistent header format for better search relevance and context:

Source content_text
Issue Structured header + description (see below)
MR Structured header + description (see below)
Discussion Full thread with header (see below)

Issue Document Format:

[[Issue]] #234: Authentication redesign
Project: group/project-one
URL: https://gitlab.example.com/group/project-one/-/issues/234
Labels: ["bug", "auth"]
State: opened
Author: @johndoe

--- Description ---

We need to modernize our authentication system...

MR Document Format:

[[MergeRequest]] !456: Implement JWT authentication
Project: group/project-one
URL: https://gitlab.example.com/group/project-one/-/merge_requests/456
Labels: ["feature", "auth"]
State: opened
Author: @johndoe
Source: feature/jwt-auth -> main

--- Description ---

This MR implements JWT-based authentication as discussed in #234...

Discussion Document Format:

[[Discussion]] Issue #234: Authentication redesign
Project: group/project-one
URL: https://gitlab.example.com/group/project-one/-/issues/234#note_12345
Labels: ["bug", "auth"]
Files: ["src/auth/login.ts"]

--- Thread ---

@johndoe (2024-03-15):
I think we should move to JWT-based auth...

@janedoe (2024-03-15):
Agreed. What about refresh token strategy?

Extraction Queries:

Each extract_*_document() function queries the existing ingestion tables and assembles a DocumentData. Functions return Result<Option<DocumentData>>None means the source entity was deleted.

Issue extraction (extract_issue_document):

-- Main entity
SELECT i.id, i.iid, i.title, i.description, i.state, i.author_username,
       i.created_at, i.updated_at, i.web_url,
       p.path_with_namespace, p.id AS project_id
FROM issues i
JOIN projects p ON p.id = i.project_id
WHERE i.id = ?

-- Labels (via junction table)
SELECT l.name
FROM issue_labels il
JOIN labels l ON l.id = il.label_id
WHERE il.issue_id = ?
ORDER BY l.name

MR extraction (extract_mr_document):

-- Main entity
SELECT m.id, m.iid, m.title, m.description, m.state, m.author_username,
       m.source_branch, m.target_branch,
       m.created_at, m.updated_at, m.web_url,
       p.path_with_namespace, p.id AS project_id
FROM merge_requests m
JOIN projects p ON p.id = m.project_id
WHERE m.id = ?

-- Labels (via junction table)
SELECT l.name
FROM mr_labels ml
JOIN labels l ON l.id = ml.label_id
WHERE ml.merge_request_id = ?
ORDER BY l.name

Discussion extraction (extract_discussion_document):

-- Discussion with parent entity info
SELECT d.id, d.noteable_type, d.issue_id, d.merge_request_id,
       p.path_with_namespace, p.id AS project_id
FROM discussions d
JOIN projects p ON p.id = d.project_id
WHERE d.id = ?

-- Parent issue (when noteable_type = 'Issue')
SELECT i.iid, i.title, i.web_url
FROM issues i WHERE i.id = ?

-- Parent MR (when noteable_type = 'MergeRequest')
SELECT m.iid, m.title, m.web_url
FROM merge_requests m WHERE m.id = ?

-- Parent labels (issue or MR labels depending on noteable_type)
-- Use issue_labels or mr_labels junction table accordingly

-- Non-system notes in thread order
SELECT n.author_username, n.body, n.created_at, n.gitlab_id,
       n.note_type, n.position_old_path, n.position_new_path
FROM notes n
WHERE n.discussion_id = ? AND n.is_system = 0
ORDER BY n.created_at ASC, n.id ASC

Discussion URL construction: Constructed from parent entity's web_url + #note_{first_note.gitlab_id}:

let url = match first_note_gitlab_id {
    Some(gid) => Some(format!("{}#note_{}", parent_web_url, gid)),
    None => Some(parent_web_url.to_string()),
};

DiffNote path extraction: Collect both position_old_path and position_new_path from all non-system notes in the discussion thread. Deduplicated automatically by document_paths PK:

let mut paths: Vec<String> = Vec::new();
for note in &notes {
    if let Some(ref old_path) = note.position_old_path {
        paths.push(old_path.clone());
    }
    if let Some(ref new_path) = note.position_new_path {
        paths.push(new_path.clone());
    }
}
paths.sort();
paths.dedup();

Discussion author: First non-system note's author_username.

Acceptance Criteria:

  • Issue document: structured header with [[Issue]] prefix, project, URL, labels, state, author, then description
  • MR document: structured header with [[MergeRequest]] prefix, project, URL, labels, state, author, branches, then description
  • Discussion document: includes parent type+title, project, URL, labels, files, then thread
  • System notes (is_system=1) excluded from discussion content
  • DiffNote file paths extracted to paths vector
  • Labels extracted to labels vector
  • SHA-256 hash computed from content_text
  • Headers use consistent separator lines (--- Description ---, --- Thread ---)

2.3 Truncation Logic

Scope: Truncation applies to:

  1. Discussion documents — note-boundary truncation at 32,000 chars (multi-note threads)
  2. All document types — a hard safety cap at 2,000,000 chars (~2MB) to prevent pathological outliers (pasted log files, vendor lockfiles, base64 blobs) from bloating the DB, causing OOM in embedding chunking, or degrading FTS indexing performance.

Normal issue/MR documents remain untruncated. The safety cap triggers only for extreme content sizes that indicate non-prose content. The embedding pipeline handles long (but non-pathological) documents via chunking at embed time (see Phase 4.4).

File: src/documents/truncation.rs

/// Maximum content length for discussion threads (~8,000 tokens at 4 chars/token estimate).
/// NOTE: This only applies to discussion documents. Issue/MR documents store full text
/// (subject to MAX_DOCUMENT_CHARS_HARD safety cap below).
/// The embedding pipeline chunks long documents at embed time (see pipeline.rs).
pub const MAX_DISCUSSION_CHARS: usize = 32_000;

/// Hard safety cap for ALL document types (~2MB).
/// Only triggers on pathological content (pasted logs, vendor lockfiles, base64 blobs).
/// Normal issue/MR documents are never this large. This prevents:
/// - DB bloat from single extreme documents
/// - OOM in embedding chunking pipeline
/// - FTS indexing degradation from megabyte-scale content
pub const MAX_DOCUMENT_CHARS_HARD: usize = 2_000_000;

/// Truncation result with metadata.
#[derive(Debug, Clone)]
pub struct TruncationResult {
    pub content: String,
    pub is_truncated: bool,
    pub reason: Option<TruncationReason>,
}

/// Reason for truncation.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TruncationReason {
    TokenLimitMiddleDrop,
    SingleNoteOversized,
    FirstLastOversized,
    HardCapOversized,  // Safety cap for pathological content (any doc type)
}

impl TruncationReason {
    pub fn as_str(&self) -> &'static str {
        match self {
            Self::TokenLimitMiddleDrop => "token_limit_middle_drop",
            Self::SingleNoteOversized => "single_note_oversized",
            Self::FirstLastOversized => "first_last_oversized",
            Self::HardCapOversized => "hard_cap_oversized",
        }
    }
}

/// Truncate discussion thread at note boundaries.
///
/// Rules:
/// - Max content: 32,000 characters
/// - Truncate at NOTE boundaries (never mid-note)
/// - Preserve first N notes and last M notes
/// - Drop from middle, insert marker
///
/// NOT used for issue/MR documents — those store full text.
pub fn truncate_discussion(notes: &[NoteContent], max_chars: usize) -> TruncationResult {
    // Implementation handles edge cases per table below
    todo!()
}

/// Note content for truncation.
pub struct NoteContent {
    pub author: String,
    pub date: String,
    pub body: String,
}

Edge Cases:

Scenario Handling
Single note > 32000 chars Truncate at UTF-8-safe char boundary via truncate_utf8(), append [truncated], reason = single_note_oversized
First + last note > 32000 Keep only first note (truncated if needed), reason = first_last_oversized
Only one note Truncate at char boundary if needed
Any doc type > 2,000,000 chars Truncate at UTF-8-safe boundary, reason = hard_cap_oversized (applies to issues/MRs/discussions)

Acceptance Criteria:

  • Discussion note-boundary truncation works correctly
  • Issue/MR documents store full text unless exceeding 2MB hard safety cap
  • Hard cap truncation records hard_cap_oversized reason for provenance
  • First and last notes preserved when possible
  • Truncation marker \n\n[... N notes omitted for length ...]\n\n inserted
  • Metadata fields set correctly
  • Edge cases handled per table above
  • All byte-indexed truncation uses truncate_utf8() to avoid panics on multi-byte codepoints

2.4 CLI: lore generate-docs (Incremental by Default)

File: src/cli/commands/generate_docs.rs

//! Generate documents command - create searchable documents from entities.
//!
//! By default, runs incrementally (processes only dirty_sources queue).
//! Use --full to regenerate all documents from scratch.

use rusqlite::Connection;
use serde::Serialize;

use crate::core::error::Result;
use crate::documents::{DocumentData, SourceType};
use crate::Config;

/// Result of document generation.
#[derive(Debug, Serialize)]
pub struct GenerateDocsResult {
    pub issues: usize,
    pub mrs: usize,
    pub discussions: usize,
    pub total: usize,
    pub truncated: usize,
    pub skipped: usize,  // Unchanged documents
}

/// Chunk size for --full mode dirty queue seeding.
/// Balances throughput against WAL file growth and memory pressure.
const FULL_MODE_CHUNK_SIZE: usize = 2000;

/// Run document generation (incremental by default).
///
/// IMPORTANT: Both modes use the same regenerator codepath to avoid
/// logic divergence in label/path hashing, deletion semantics, and
/// write-optimization behavior. The only difference is how dirty_sources
/// gets populated.
///
/// Incremental mode (default):
/// - Processes only items already in dirty_sources queue
/// - Fast for routine syncs
///
/// Full mode (--full):
/// - Seeds dirty_sources with ALL source entities in chunks
/// - Drains through the same regenerator pipeline
/// - Uses keyset pagination (WHERE id > last_id) to avoid OFFSET degradation
/// - Final FTS optimize after all chunks complete
/// - Use when schema changes or after migration
pub fn run_generate_docs(
    config: &Config,
    full: bool,
    project_filter: Option<&str>,
) -> Result<GenerateDocsResult> {
    let conn = create_connection(&config.storage.db_path)?;

    if full {
        // Full mode: seed dirty_sources with all source entities, then drain.
        // Uses keyset pagination to avoid O(n²) OFFSET degradation on large tables.
        //
        // Seeding is chunked to bound WAL growth:
        // 1. For each source type (issues, MRs, discussions):
        //    a. Query next chunk WHERE id > last_id ORDER BY id LIMIT chunk_size
        //    b. INSERT OR IGNORE each into dirty_sources
        //    c. Advance last_id = chunk.last().id
        //    d. Loop until chunk is empty
        // 2. Drain dirty_sources through regenerator (same as incremental)
        // 3. Final FTS optimize (not full rebuild — triggers handle consistency)
        //
        // Benefits of unified codepath:
        // - No divergence in label/path hash behavior
        // - No divergence in deletion semantics
        // - No divergence in write-optimization logic (labels_hash, paths_hash)
        // - FTS triggers fire identically in both modes

        // Seed issues using set-based INSERT...SELECT for throughput.
        // Avoids N individual INSERT calls per chunk; single statement per page.
        let mut last_id: i64 = 0;
        loop {
            let tx = conn.transaction()?;
            let inserted = tx.execute(
                "INSERT INTO dirty_sources
                   (source_type, source_id, queued_at, attempt_count, last_attempt_at, last_error, next_attempt_at)
                 SELECT 'issue', id, ?, 0, NULL, NULL, NULL
                 FROM issues
                 WHERE id > ?
                 ORDER BY id
                 LIMIT ?
                 ON CONFLICT(source_type, source_id) DO NOTHING",
                rusqlite::params![crate::core::time::now_ms(), last_id, FULL_MODE_CHUNK_SIZE as i64],
            )?;
            // Advance keyset cursor
            if inserted == 0 { tx.commit()?; break; }
            last_id = tx.query_row(
                "SELECT MAX(source_id) FROM dirty_sources WHERE source_type = 'issue' AND source_id > ?",
                [last_id],
                |row| row.get(0),
            )?;
            tx.commit()?;
        }

        // Similar set-based keyset-paginated seeding for MRs and discussions...

        // Report: seeding complete, now regenerating
    }

    // Both modes: drain dirty_sources through the regenerator
    let regen = regenerate_dirty_documents(&conn)?;

    if full {
        // FTS optimize after bulk operations (compacts index segments)
        conn.execute(
            "INSERT INTO documents_fts(documents_fts) VALUES('optimize')",
            [],
        )?;
    }

    // Map regen -> GenerateDocsResult stats
    todo!()
}

/// Print human-readable output.
pub fn print_generate_docs(result: &GenerateDocsResult) {
    println!("Document generation complete:");
    println!("  Issues:      {:>6} documents", result.issues);
    println!("  MRs:         {:>6} documents", result.mrs);
    println!("  Discussions: {:>6} documents", result.discussions);
    println!("  ─────────────────────");
    println!("  Total:       {:>6} documents", result.total);
    if result.truncated > 0 {
        println!("  Truncated:   {:>6}", result.truncated);
    }
    if result.skipped > 0 {
        println!("  Skipped:     {:>6} (unchanged)", result.skipped);
    }
}

/// Print JSON output for robot mode.
pub fn print_generate_docs_json(result: &GenerateDocsResult) {
    let output = serde_json::json!({
        "ok": true,
        "data": result
    });
    println!("{}", serde_json::to_string_pretty(&output).unwrap());
}

CLI integration in src/cli/mod.rs:

/// Generate-docs subcommand arguments.
#[derive(Args)]
pub struct GenerateDocsArgs {
    /// Regenerate ALL documents (not just dirty queue)
    #[arg(long)]
    full: bool,

    /// Only generate for specific project
    #[arg(long)]
    project: Option<String>,
}

Acceptance Criteria:

  • Creates document for each issue
  • Creates document for each MR
  • Creates document for each discussion
  • Default mode processes dirty_sources queue only (incremental)
  • --full regenerates all documents from scratch
  • --full uses chunked transactions (2k docs/tx) to bound WAL growth
  • Final FTS optimize after all chunks complete (rebuild reserved for --repair)
  • Progress bar in human mode (via indicatif)
  • JSON output in robot mode

Progress bar note: The existing codebase uses indicatif for progress bars and tracing-indicatif to integrate progress bars with tracing log output. New commands should follow the same pattern — create a ProgressBar with ProgressStyle::default_bar() and use tracing for structured logging. See existing ingestion commands for reference.


3.1 Search Module Structure

New module: src/search/

src/search/
├── mod.rs       # Module exports
├── fts.rs       # FTS5 search
├── vector.rs    # Vector search (sqlite-vec)
├── hybrid.rs    # Combined hybrid search
└── filters.rs   # Filter parsing and application

File: src/search/mod.rs

//! Search functionality for documents.
//!
//! Supports lexical (FTS5), semantic (vector), and hybrid search.

mod filters;
mod fts;
mod hybrid;
mod rrf;
mod vector;

pub use filters::{SearchFilters, PathFilter, apply_filters};
pub use fts::{search_fts, to_fts_query, FtsResult, FtsQueryMode, generate_fallback_snippet, get_result_snippet};
pub use hybrid::{search_hybrid, HybridResult, SearchMode};
pub use rrf::{rank_rrf, RrfResult};
pub use vector::{search_vector, VectorResult};

3.2 FTS5 Search Function

File: src/search/fts.rs

use rusqlite::Connection;
use crate::core::error::Result;

/// FTS search result.
#[derive(Debug, Clone)]
pub struct FtsResult {
    pub document_id: i64,
    pub rank: f64,     // BM25 score (lower = better match)
    pub snippet: String, // Context snippet around match
}

/// Generate fallback snippet for semantic-only results.
///
/// When FTS snippets aren't available (semantic-only mode), this generates
/// a context snippet by truncating the document content. Useful for displaying
/// search results without FTS hits.
///
/// Args:
///   content_text: Full document content
///   max_chars: Maximum snippet length (default 200)
///
/// Returns a truncated string with ellipsis if truncated.
///
/// IMPORTANT: Uses `truncate_utf8()` to avoid panicking on multi-byte UTF-8
/// codepoint boundaries. Rust `&str` slicing is byte-indexed, so
/// `trimmed[..max_chars]` will panic if `max_chars` falls mid-codepoint.
pub fn generate_fallback_snippet(content_text: &str, max_chars: usize) -> String {
    let trimmed = content_text.trim();
    if trimmed.len() <= max_chars {
        return trimmed.to_string();
    }

    // Truncate at a valid UTF-8 char boundary first
    let safe = truncate_utf8(trimmed, max_chars);

    // Then find word boundary to avoid cutting mid-word
    let truncation_point = safe
        .rfind(|c: char| c.is_whitespace())
        .unwrap_or(safe.len());

    format!("{}...", &safe[..truncation_point])
}

/// Truncate a string to at most `max_bytes` bytes on a valid UTF-8 char boundary.
///
/// Rust &str slicing panics if the index falls inside a multi-byte codepoint.
/// This helper walks backward from max_bytes to find the nearest char boundary.
/// Used by snippet generation and discussion truncation to prevent panics on
/// content containing emoji, CJK characters, or other multi-byte UTF-8 text.
fn truncate_utf8(s: &str, max_bytes: usize) -> &str {
    if s.len() <= max_bytes {
        return s;
    }
    let mut cut = max_bytes;
    while cut > 0 && !s.is_char_boundary(cut) {
        cut -= 1;
    }
    &s[..cut]
}

/// Get snippet for search result, preferring FTS when available.
///
/// Priority:
/// 1. FTS snippet (if document matched FTS query)
/// 2. Fallback: truncated content_text
pub fn get_result_snippet(
    fts_snippet: Option<&str>,
    content_text: &str,
) -> String {
    match fts_snippet {
        Some(snippet) if !snippet.is_empty() => snippet.to_string(),
        _ => generate_fallback_snippet(content_text, 200),
    }
}

/// FTS query parsing mode.
#[derive(Debug, Clone, Copy, Default)]
pub enum FtsQueryMode {
    /// Safe parsing (default): escapes dangerous syntax but preserves
    /// trailing `*` for obvious prefix queries (type-ahead UX).
    #[default]
    Safe,
    /// Raw mode: passes user MATCH syntax through unchanged.
    /// Use with caution - invalid syntax will cause FTS5 errors.
    Raw,
}

/// Convert user query to FTS5-safe MATCH expression.
///
/// FTS5 MATCH syntax has special characters that cause errors if passed raw:
/// - `-` (NOT operator)
/// - `"` (phrase quotes)
/// - `:` (column filter)
/// - `*` (prefix)
/// - `AND`, `OR`, `NOT` (operators)
///
/// Strategy for Safe mode:
/// - Wrap each whitespace-delimited token in double quotes
/// - Escape internal quotes by doubling them
/// - PRESERVE trailing `*` for simple prefix queries (alphanumeric tokens)
/// - This forces FTS5 to treat tokens as literals while allowing type-ahead
///
/// Raw mode passes the query through unchanged for power users who want
/// full FTS5 syntax (phrase queries, column scopes, boolean operators).
///
/// Examples (Safe mode):
/// - "auth error" -> `"auth" "error"` (implicit AND)
/// - "auth*" -> `"auth"*` (prefix preserved!)
/// - "jwt_token*" -> `"jwt_token"*` (prefix preserved!)
/// - "C++" -> `"C++"` (special chars preserved, no prefix)
/// - "don't panic" -> `"don't" "panic"` (apostrophe preserved)
/// - "-DWITH_SSL" -> `"-DWITH_SSL"` (leading dash neutralized)
pub fn to_fts_query(raw: &str, mode: FtsQueryMode) -> String {
    if matches!(mode, FtsQueryMode::Raw) {
        return raw.trim().to_string();
    }
    
    raw.split_whitespace()
        .map(|token| {
            let t = token.trim();
            if t.is_empty() {
                return "\"\"".to_string();
            }
            
            // Detect simple prefix queries: alphanumeric/underscore followed by *
            // e.g., "auth*", "jwt_token*", "user123*"
            let is_prefix = t.ends_with('*')
                && t.len() > 1
                && t[..t.len() - 1]
                    .chars()
                    .all(|c| c.is_ascii_alphanumeric() || c == '_');
            
            // Escape internal double quotes by doubling them
            let escaped = t.replace('"', "\"\"");
            
            if is_prefix {
                // Strip trailing *, quote the core, then re-add *
                let core = &escaped[..escaped.len() - 1];
                format!("\"{}\"*", core)
            } else {
                format!("\"{}\"", escaped)
            }
        })
        .collect::<Vec<_>>()
        .join(" ")
}

/// Search documents using FTS5.
///
/// Returns matching document IDs with BM25 rank scores and snippets.
/// Lower rank values indicate better matches.
/// Uses bm25() explicitly (not the `rank` alias) and snippet() for context.
///
/// IMPORTANT: User input is sanitized via `to_fts_query()` to prevent
/// FTS5 syntax errors from special characters while preserving prefix search.
pub fn search_fts(
    conn: &Connection,
    query: &str,
    limit: usize,
    mode: FtsQueryMode,
) -> Result<Vec<FtsResult>> {
    if query.trim().is_empty() {
        return Ok(Vec::new());
    }

    let safe_query = to_fts_query(query, mode);

    let mut stmt = conn.prepare(
        "SELECT rowid,
                bm25(documents_fts),
                -- NOTE: Column index 1 = content_text (0=title, 1=content_text).
                -- If FTS column order changes, this index must be updated.
                snippet(documents_fts, 1, '<mark>', '</mark>', '...', 64)
         FROM documents_fts
         WHERE documents_fts MATCH ?
         ORDER BY bm25(documents_fts)
         LIMIT ?"
    )?;

    let results = stmt
        .query_map(rusqlite::params![safe_query, limit as i64], |row| {
            Ok(FtsResult {
                document_id: row.get(0)?,
                rank: row.get(1)?,
                snippet: row.get(2)?,
            })
        })?
        .collect::<std::result::Result<Vec<_>, _>>()?;

    Ok(results)
}

Acceptance Criteria:

  • Returns matching document IDs with BM25 rank
  • Porter stemming works (search/searching match)
  • Prefix search works (type-ahead UX): auth* returns results starting with "auth"
  • Empty query returns empty results
  • Nonsense query returns empty results
  • Special characters in query don't cause FTS5 syntax errors (-, ", :, *)
  • Query "-DWITH_SSL" returns results (not treated as NOT operator)
  • Query C++ returns results (special chars preserved)
  • Safe mode preserves trailing * on alphanumeric tokens
  • Raw mode (--fts-mode=raw) passes query through unchanged

3.3 Search Filters

File: src/search/filters.rs

use rusqlite::Connection;
use crate::core::error::Result;
use crate::documents::SourceType;

/// Maximum allowed limit for search results.
const MAX_SEARCH_LIMIT: usize = 100;

/// Default limit for search results.
const DEFAULT_SEARCH_LIMIT: usize = 20;

/// Search filters applied post-retrieval.
#[derive(Debug, Clone, Default)]
pub struct SearchFilters {
    pub source_type: Option<SourceType>,
    pub author: Option<String>,
    pub project_id: Option<i64>,
    pub after: Option<i64>,           // ms epoch (created_at >=)
    pub updated_after: Option<i64>,   // ms epoch (updated_at >=)
    pub labels: Vec<String>,          // AND logic
    pub path: Option<PathFilter>,
    pub limit: usize,                 // Default 20, max 100
}

impl SearchFilters {
    /// Check if any filter is set (used for adaptive recall).
    pub fn has_any_filter(&self) -> bool {
        self.source_type.is_some()
            || self.author.is_some()
            || self.project_id.is_some()
            || self.after.is_some()
            || self.updated_after.is_some()
            || !self.labels.is_empty()
            || self.path.is_some()
    }

    /// Clamp limit to valid range [1, MAX_SEARCH_LIMIT].
    pub fn clamp_limit(&self) -> usize {
        if self.limit == 0 {
            DEFAULT_SEARCH_LIMIT
        } else {
            self.limit.min(MAX_SEARCH_LIMIT)
        }
    }
}

/// Path filter with prefix or exact match.
#[derive(Debug, Clone)]
pub enum PathFilter {
    Prefix(String),  // Trailing `/` -> LIKE 'path/%'
    Exact(String),   // No trailing `/` -> = 'path'
}

impl PathFilter {
    pub fn from_str(s: &str) -> Self {
        if s.ends_with('/') {
            Self::Prefix(s.to_string())
        } else {
            Self::Exact(s.to_string())
        }
    }
}

/// Apply filters to document IDs, returning filtered set.
///
/// IMPORTANT: Preserves ranking order from input document_ids.
/// Filters must not reorder results - maintain the RRF/search ranking.
///
/// Uses JSON1 extension for efficient ordered ID passing:
/// - Passes document_ids as JSON array: `[1,2,3,...]`
/// - Uses `json_each()` to expand into rows with `key` as position
/// - JOINs with documents table and applies filters
/// - Orders by original position to preserve ranking
pub fn apply_filters(
    conn: &Connection,
    document_ids: &[i64],
    filters: &SearchFilters,
) -> Result<Vec<i64>> {
    if document_ids.is_empty() {
        return Ok(Vec::new());
    }

    // Build JSON array of document IDs
    let ids_json = serde_json::to_string(document_ids)?;

    // Build dynamic WHERE clauses
    let mut conditions: Vec<String> = Vec::new();
    let mut params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();

    // Always bind the JSON array first
    params.push(Box::new(ids_json));

    if let Some(ref source_type) = filters.source_type {
        conditions.push("d.source_type = ?".into());
        params.push(Box::new(source_type.as_str().to_string()));
    }

    if let Some(ref author) = filters.author {
        conditions.push("d.author_username = ?".into());
        params.push(Box::new(author.clone()));
    }

    if let Some(project_id) = filters.project_id {
        conditions.push("d.project_id = ?".into());
        params.push(Box::new(project_id));
    }

    if let Some(after) = filters.after {
        conditions.push("d.created_at >= ?".into());
        params.push(Box::new(after));
    }

    if let Some(updated_after) = filters.updated_after {
        conditions.push("d.updated_at >= ?".into());
        params.push(Box::new(updated_after));
    }

    // Labels: AND logic - all labels must be present
    for label in &filters.labels {
        conditions.push(
            "EXISTS (SELECT 1 FROM document_labels dl WHERE dl.document_id = d.id AND dl.label_name = ?)".into()
        );
        params.push(Box::new(label.clone()));
    }

    // Path filter
    if let Some(ref path_filter) = filters.path {
        match path_filter {
            PathFilter::Exact(path) => {
                conditions.push(
                    "EXISTS (SELECT 1 FROM document_paths dp WHERE dp.document_id = d.id AND dp.path = ?)".into()
                );
                params.push(Box::new(path.clone()));
            }
            PathFilter::Prefix(prefix) => {
                // IMPORTANT: Must use ESCAPE clause for backslash escaping to work in SQLite LIKE
                conditions.push(
                    "EXISTS (SELECT 1 FROM document_paths dp WHERE dp.document_id = d.id AND dp.path LIKE ? ESCAPE '\\')".into()
                );
                // Escape LIKE wildcards and add trailing %
                let like_pattern = format!(
                    "{}%",
                    prefix.replace('%', "\\%").replace('_', "\\_")
                );
                params.push(Box::new(like_pattern));
            }
        }
    }

    let where_clause = if conditions.is_empty() {
        String::new()
    } else {
        format!("AND {}", conditions.join(" AND "))
    };

    let limit = filters.clamp_limit();

    // SQL using JSON1 for ordered ID passing
    // json_each() returns rows with `key` (0-indexed position) and `value` (the ID)
    let sql = format!(
        r#"
        SELECT d.id
        FROM json_each(?) AS j
        JOIN documents d ON d.id = j.value
        WHERE 1=1 {}
        ORDER BY j.key
        LIMIT ?
        "#,
        where_clause
    );

    params.push(Box::new(limit as i64));

    let mut stmt = conn.prepare(&sql)?;
    let params_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();

    let results = stmt
        .query_map(params_refs.as_slice(), |row| row.get(0))?
        .collect::<std::result::Result<Vec<i64>, _>>()?;

    Ok(results)
}

Supported filters:

Filter SQL Column Notes
--type source_type issue, mr, discussion
--author author_username Exact match
--project project_id Resolve path to ID (see resolution logic below)
--after created_at >= date — parsed via time::parse_since() (accepts 7d, 2w, YYYY-MM-DD)
--updated-after updated_at >= date — parsed via time::parse_since(), common triage filter
--label document_labels JOIN, multiple = AND
--path document_paths JOIN, trailing / = prefix
--limit N/A Default 20, max 100

Project Resolution Logic (--project):

The --project flag accepts a string and resolves it to a project_id using cascading match:

  1. Exact match on projects.path_with_namespace (e.g., group/project)
  2. Case-insensitive exact match (e.g., Group/Project matches group/project)
  3. Suffix match (e.g., project-name matches group/project-name) — only if unambiguous (1 result)
  4. No match or ambiguous — return error listing available projects:
Error: Project 'auth-service' not found.

Available projects:
  backend/auth-service
  frontend/auth-service-ui
  infra/auth-proxy

Hint: Use the full path, e.g., --project=backend/auth-service

Uses the existing LoreError::Ambiguous variant when multiple suffix matches are found.

Acceptance Criteria:

  • Each filter correctly restricts results
  • Multiple --label flags use AND logic
  • Path prefix vs exact match works correctly
  • --updated-after filters on updated_at (not created_at)
  • --after and --updated-after accept relative (7d, 2w) and absolute (YYYY-MM-DD) formats via time::parse_since()
  • --project resolves via exact → case-insensitive → suffix match
  • --project with no match or ambiguous match shows available projects list
  • Filters compose (all applied together)
  • Ranking order preserved after filtering (ORDER BY position)
  • Limit clamped to valid range [1, 100]
  • Default limit is 20 when not specified
  • JSON1 json_each() correctly expands document IDs

3.4 CLI: lore search --mode=lexical

File: src/cli/commands/search.rs

//! Search command - find documents using lexical, semantic, or hybrid search.

use console::style;
use serde::Serialize;

use crate::core::error::Result;
use crate::core::time::ms_to_iso;
use crate::search::{SearchFilters, SearchMode, search_fts, search_vector, rank_rrf, RrfResult};
use crate::Config;

/// Search result for display.
#[derive(Debug, Serialize)]
pub struct SearchResultDisplay {
    pub document_id: i64,
    pub source_type: String,
    pub title: Option<String>,
    pub url: Option<String>,
    pub project_path: String,
    pub author: Option<String>,
    pub created_at: String,  // ISO format
    pub updated_at: String,  // ISO format
    pub score: f64,          // Normalized 0-1
    pub snippet: String,     // Context around match
    pub labels: Vec<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub explain: Option<ExplainData>,
}

/// Ranking explanation for --explain flag.
#[derive(Debug, Serialize)]
pub struct ExplainData {
    pub vector_rank: Option<usize>,
    pub fts_rank: Option<usize>,
    pub rrf_score: f64,
}

/// Search results response.
#[derive(Debug, Serialize)]
pub struct SearchResponse {
    pub query: String,
    pub mode: String,
    pub total_results: usize,
    pub results: Vec<SearchResultDisplay>,
    #[serde(skip_serializing_if = "Vec::is_empty")]
    pub warnings: Vec<String>,
}

/// Run search command.
pub fn run_search(
    config: &Config,
    query: &str,
    mode: SearchMode,
    filters: SearchFilters,
    explain: bool,
) -> Result<SearchResponse> {
    // 1. Parse query and filters
    // 2. Execute search based on mode -> ranked doc_ids (+ explain ranks)
    // 3. Apply post-retrieval filters preserving ranking order
    // 4. HYDRATE in one DB round-trip (see hydration query below):
    //    - documents fields (title, url, created_at, updated_at, content_text)
    //    - project_path via JOIN projects
    //    - labels aggregated via json_group_array
    //    - paths aggregated via json_group_array (optional)
    // 5. Attach snippet:
    //    - prefer FTS snippet when doc hit FTS
    //    - fallback: truncated content_text via generate_fallback_snippet()
    // 6. For --mode=semantic with 0% embedding coverage:
    //    return Err(LoreError::EmbeddingsNotBuilt)
    //    This is distinct from "Ollama unavailable" — tells user to run `lore embed` first
    todo!()
}

/// Hydration query: fetch all display fields for ranked doc IDs in a single round-trip.
///
/// Uses json_each(?) to preserve ranking order from the search pipeline.
/// Aggregates labels and paths inline to avoid N+1 queries.
///
/// ```sql
/// SELECT d.id, d.source_type, d.title, d.url, d.author_username,
///        d.created_at, d.updated_at, d.content_text,
///        p.path_with_namespace AS project_path,
///        (SELECT json_group_array(dl.label_name)
///         FROM document_labels dl WHERE dl.document_id = d.id) AS labels,
///        (SELECT json_group_array(dp.path)
///         FROM document_paths dp WHERE dp.document_id = d.id) AS paths
/// FROM json_each(?) AS j
/// JOIN documents d ON d.id = j.value
/// JOIN projects p ON p.id = d.project_id
/// ORDER BY j.key
/// ```
///
/// This single query replaces what would otherwise be:
/// - 1 query per document for metadata
/// - 1 query per document for labels
/// - 1 query per document for paths
/// For 20 results, that's 60 queries reduced to 1.
fn hydrate_results(
    conn: &Connection,
    doc_ids: &[i64],
) -> Result<Vec<HydratedDocument>> {
    todo!()
}

/// Print human-readable search results.
pub fn print_search_results(response: &SearchResponse, explain: bool) {
    println!(
        "Found {} results ({} search)\n",
        response.total_results,
        response.mode
    );

    for (i, result) in response.results.iter().enumerate() {
        let type_prefix = match result.source_type.as_str() {
            "merge_request" => "MR",
            "issue" => "Issue",
            "discussion" => "Discussion",
            _ => &result.source_type,
        };

        let title = result.title.as_deref().unwrap_or("(untitled)");
        println!(
            "[{}] {} - {} ({})",
            i + 1,
            style(type_prefix).cyan(),
            title,
            format!("{:.2}", result.score)
        );

        if explain {
            if let Some(exp) = &result.explain {
                let vec_str = exp.vector_rank.map(|r| format!("#{}", r)).unwrap_or_else(|| "-".into());
                let fts_str = exp.fts_rank.map(|r| format!("#{}", r)).unwrap_or_else(|| "-".into());
                println!(
                    "    Vector: {}, FTS: {}, RRF: {:.4}",
                    vec_str, fts_str, exp.rrf_score
                );
            }
        }

        if let Some(author) = &result.author {
            println!(
                "    @{} · {} · {}",
                author, &result.created_at[..10], result.project_path
            );
        }

        println!("    \"{}...\"", &result.snippet);

        if let Some(url) = &result.url {
            println!("    {}", style(url).dim());
        }
        println!();
    }
}

/// Print JSON search results for robot mode.
pub fn print_search_results_json(response: &SearchResponse, elapsed_ms: u64) {
    let output = serde_json::json!({
        "ok": true,
        "data": response,
        "meta": {
            "elapsed_ms": elapsed_ms
        }
    });
    println!("{}", serde_json::to_string_pretty(&output).unwrap());
}

CLI integration in src/cli/mod.rs:

/// Search subcommand arguments.
#[derive(Args)]
pub struct SearchArgs {
    /// Search query
    query: String,

    /// Search mode
    #[arg(long, default_value = "hybrid")]
    mode: String,  // "hybrid" | "lexical" | "semantic"

    /// Filter by source type
    #[arg(long, value_name = "TYPE")]
    r#type: Option<String>,

    /// Filter by author username
    #[arg(long)]
    author: Option<String>,

    /// Filter by project path
    #[arg(long)]
    project: Option<String>,

    /// Filter by creation date (after)
    #[arg(long)]
    after: Option<String>,

    /// Filter by updated date (recently active items)
    #[arg(long)]
    updated_after: Option<String>,

    /// Filter by label (can specify multiple)
    #[arg(long, action = clap::ArgAction::Append)]
    label: Vec<String>,

    /// Filter by file path
    #[arg(long)]
    path: Option<String>,

    /// Maximum results
    #[arg(long, default_value = "20")]
    limit: usize,

    /// Show ranking breakdown
    #[arg(long)]
    explain: bool,

    /// FTS query mode: "safe" (default) or "raw"
    /// - safe: Escapes special chars but preserves `*` for prefix queries
    /// - raw: Pass FTS5 MATCH syntax through unchanged (advanced)
    #[arg(long, default_value = "safe")]
    fts_mode: String,  // "safe" | "raw"
}

Acceptance Criteria:

  • Works without Ollama running
  • All filters functional (including --updated-after)
  • Human-readable output with snippets
  • Semantic-only results get fallback snippets from content_text
  • Results hydrated in single DB round-trip (no N+1 queries)
  • JSON output matches schema
  • Empty results show helpful message
  • "No data indexed" message if documents table empty
  • --mode=semantic with 0% embedding coverage returns actionable error (distinct from "Ollama unavailable" — tells user to run lore embed first)
  • --fts-mode=safe (default) preserves prefix * while escaping special chars
  • --fts-mode=raw passes FTS5 MATCH syntax through unchanged

Phase 4: Embedding Pipeline

4.1 Embedding Module Structure

New module: src/embedding/

src/embedding/
├── mod.rs              # Module exports
├── ollama.rs           # Ollama API client
├── pipeline.rs         # Batch embedding orchestration
└── change_detector.rs  # Detect documents needing re-embedding

File: src/embedding/mod.rs

//! Embedding generation and storage.
//!
//! Uses Ollama for embedding generation and sqlite-vec for storage.

mod change_detector;
mod chunk_ids;
mod ollama;
mod pipeline;

pub use change_detector::detect_embedding_changes;
pub use chunk_ids::{encode_rowid, decode_rowid, CHUNK_ROWID_MULTIPLIER};
pub use ollama::{OllamaClient, OllamaConfig, check_ollama_health};
pub use pipeline::{embed_documents, EmbedResult};

4.1b Chunk ID Encoding (Shared Module)

File: src/embedding/chunk_ids.rs

//! Shared chunk ID encoding for embedding rowids.
//!
//! Encoding: rowid = document_id * CHUNK_ROWID_MULTIPLIER + chunk_index
//! This encodes (document_id, chunk_index) into a single vec0 rowid.
//! Supports up to 1000 chunks per document.
//!
//! Used by:
//! - `embedding::pipeline` (when storing embeddings)
//! - `search::vector` (when decoding search results)
//! - `cli::commands::stats` (when detecting orphaned embeddings)
//!
//! Living in its own module prevents coupling between pipeline and search.

/// Multiplier for encoding (document_id, chunk_index) into a single vec0 rowid.
/// Supports up to 1000 chunks per document.
pub const CHUNK_ROWID_MULTIPLIER: i64 = 1000;

/// Encode (document_id, chunk_index) into a vec0 rowid.
pub fn encode_rowid(document_id: i64, chunk_index: usize) -> i64 {
    document_id * CHUNK_ROWID_MULTIPLIER + chunk_index as i64
}

/// Decode a vec0 rowid into (document_id, chunk_index).
pub fn decode_rowid(rowid: i64) -> (i64, usize) {
    (rowid / CHUNK_ROWID_MULTIPLIER, (rowid % CHUNK_ROWID_MULTIPLIER) as usize)
}

4.2 Ollama Client

File: src/embedding/ollama.rs

use reqwest::Client;
use serde::{Deserialize, Serialize};

use crate::core::error::{LoreError, Result};

/// Ollama client configuration.
#[derive(Debug, Clone)]
pub struct OllamaConfig {
    pub base_url: String,      // "http://localhost:11434"
    pub model: String,         // "nomic-embed-text"
    pub timeout_secs: u64,     // Request timeout
}

impl Default for OllamaConfig {
    fn default() -> Self {
        Self {
            base_url: "http://localhost:11434".into(),
            model: "nomic-embed-text".into(),
            timeout_secs: 60,
        }
    }
}

/// Ollama API client.
pub struct OllamaClient {
    client: Client,
    config: OllamaConfig,
}

/// Batch embed request.
#[derive(Serialize)]
struct EmbedRequest {
    model: String,
    input: Vec<String>,
}

/// Batch embed response.
#[derive(Deserialize)]
struct EmbedResponse {
    model: String,
    embeddings: Vec<Vec<f32>>,
}

/// Model info from /api/tags.
#[derive(Deserialize)]
struct TagsResponse {
    models: Vec<ModelInfo>,
}

#[derive(Deserialize)]
struct ModelInfo {
    name: String,
}

impl OllamaClient {
    pub fn new(config: OllamaConfig) -> Self {
        let client = Client::builder()
            .timeout(std::time::Duration::from_secs(config.timeout_secs))
            .build()
            .expect("Failed to create HTTP client");

        Self { client, config }
    }

    /// Check if Ollama is available and model is loaded.
    pub async fn health_check(&self) -> Result<()> {
        let url = format!("{}/api/tags", self.config.base_url);

        let response = self.client.get(&url).send().await.map_err(|e| {
            LoreError::OllamaUnavailable {
                base_url: self.config.base_url.clone(),
                source: Some(e),
            }
        })?;

        let tags: TagsResponse = response.json().await?;

        let model_available = tags.models.iter().any(|m| m.name.starts_with(&self.config.model));

        if !model_available {
            return Err(LoreError::OllamaModelNotFound {
                model: self.config.model.clone(),
            });
        }

        Ok(())
    }

    /// Generate embeddings for a batch of texts.
    ///
    /// Returns 768-dimensional vectors for each input text.
    pub async fn embed_batch(&self, texts: Vec<String>) -> Result<Vec<Vec<f32>>> {
        let url = format!("{}/api/embed", self.config.base_url);

        let request = EmbedRequest {
            model: self.config.model.clone(),
            input: texts,
        };

        let response = self.client
            .post(&url)
            .json(&request)
            .send()
            .await
            .map_err(|e| LoreError::OllamaUnavailable {
                base_url: self.config.base_url.clone(),
                source: Some(e),
            })?;

        if !response.status().is_success() {
            let status = response.status();
            let body = response.text().await.unwrap_or_default();
            return Err(LoreError::EmbeddingFailed {
                document_id: 0, // Batch failure
                reason: format!("HTTP {}: {}", status, body),
            });
        }

        let embed_response: EmbedResponse = response.json().await?;
        Ok(embed_response.embeddings)
    }
}

/// Quick health check without full client.
pub async fn check_ollama_health(base_url: &str) -> bool {
    let client = Client::new();
    client
        .get(format!("{}/api/tags", base_url))
        .send()
        .await
        .is_ok()
}

Endpoints:

Endpoint Purpose
GET /api/tags Health check, verify model available
POST /api/embed Batch embedding (preferred)

Acceptance Criteria:

  • Health check detects Ollama availability
  • Batch embedding works with up to 32 texts
  • Clear error messages for common failures

4.3 Error Handling Extensions

File: src/core/error.rs (extend existing)

Add to ErrorCode (existing exit codes 1-13 are taken):

pub enum ErrorCode {
    // ... existing variants (InternalError=1 through TransformError=13) ...
    OllamaUnavailable,
    OllamaModelNotFound,
    EmbeddingFailed,
}

impl ErrorCode {
    pub fn exit_code(&self) -> i32 {
        match self {
            // ... existing mappings ...
            Self::OllamaUnavailable => 14,
            Self::OllamaModelNotFound => 15,
            Self::EmbeddingFailed => 16,
        }
    }
}

impl std::fmt::Display for ErrorCode {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        let code = match self {
            // ... existing mappings ...
            Self::OllamaUnavailable => "OLLAMA_UNAVAILABLE",
            Self::OllamaModelNotFound => "OLLAMA_MODEL_NOT_FOUND",
            Self::EmbeddingFailed => "EMBEDDING_FAILED",
        };
        write!(f, "{code}")
    }
}

Add to LoreError:

pub enum LoreError {
    // ... existing variants (including Http(reqwest::Error) for generic HTTP errors) ...

    /// Ollama-specific connection failure. Use this instead of Http for Ollama errors
    /// because it includes the base_url for actionable error messages.
    /// The embedding pipeline should map Ollama HTTP errors to this variant, not Http.
    #[error("Cannot connect to Ollama at {base_url}. Is it running?")]
    OllamaUnavailable {
        base_url: String,
        #[source]
        source: Option<reqwest::Error>,
    },

    #[error("Ollama model '{model}' not found. Run: ollama pull {model}")]
    OllamaModelNotFound { model: String },

    #[error("Embedding failed for document {document_id}: {reason}")]
    EmbeddingFailed { document_id: i64, reason: String },

    #[error("No embeddings found. Run: lore embed")]
    EmbeddingsNotBuilt,
}

impl LoreError {
    pub fn code(&self) -> ErrorCode {
        match self {
            // ... existing mappings ...
            Self::OllamaUnavailable { .. } => ErrorCode::OllamaUnavailable,
            Self::OllamaModelNotFound { .. } => ErrorCode::OllamaModelNotFound,
            Self::EmbeddingFailed { .. } => ErrorCode::EmbeddingFailed,
            Self::EmbeddingsNotBuilt => ErrorCode::EmbeddingFailed,
        }
    }

    pub fn suggestion(&self) -> Option<&'static str> {
        match self {
            // ... existing mappings ...
            Self::OllamaUnavailable { .. } => Some("Start Ollama: ollama serve"),
            Self::OllamaModelNotFound { model } => Some("Pull the model: ollama pull nomic-embed-text"),
            Self::EmbeddingFailed { .. } => Some("Check Ollama logs or retry with 'lore embed --retry-failed'"),
            Self::EmbeddingsNotBuilt => Some("Generate embeddings first: lore embed"),
        }
    }
}

4.4 Embedding Pipeline

File: src/embedding/pipeline.rs

use indicatif::{ProgressBar, ProgressStyle};
use rusqlite::Connection;

use crate::core::error::Result;
use crate::embedding::OllamaClient;

/// Batch size for embedding requests (texts per Ollama API call).
const BATCH_SIZE: usize = 32;

/// SQLite page size for paging through pending documents.
/// Uses keyset paging (id > last_id) to avoid rescanning previously-processed rows.
const DB_PAGE_SIZE: usize = 500;

/// Expected embedding dimensions for nomic-embed-text model.
/// IMPORTANT: Validates against this to prevent silent corruption.
const EXPECTED_DIMS: usize = 768;

/// Maximum characters per chunk for embedding.
/// nomic-embed-text context window is ~8192 tokens.
/// At ~4 chars/token, 32k chars is a safe upper bound.
const CHUNK_MAX_CHARS: usize = 32_000;

/// Overlap between chunks to preserve context at boundaries.
/// 500 chars (~125 tokens) provides enough context for sentence boundaries.
const CHUNK_OVERLAP_CHARS: usize = 500;

// NOTE: encode_rowid, decode_rowid, and CHUNK_ROWID_MULTIPLIER are imported
// from `crate::embedding::chunk_ids` (shared module, see 4.1b).
// This prevents coupling between pipeline.rs and search/vector.rs.
use crate::embedding::chunk_ids::{encode_rowid, CHUNK_ROWID_MULTIPLIER};

/// Split document text into overlapping chunks at paragraph boundaries.
///
/// Rules:
/// - Each chunk <= CHUNK_MAX_CHARS
/// - Split at paragraph boundaries (\n\n) when possible
/// - Fall back to sentence boundaries (. ! ?) then word boundaries
/// - Overlap of CHUNK_OVERLAP_CHARS between consecutive chunks
/// - Single documents under CHUNK_MAX_CHARS produce exactly 1 chunk (index 0)
///
/// Returns: Vec<(chunk_index, chunk_text)>
fn split_into_chunks(content: &str) -> Vec<(usize, String)> {
    if content.len() <= CHUNK_MAX_CHARS {
        return vec![(0, content.to_string())];
    }
    // Split at paragraph boundaries with overlap
    todo!()
}

/// Which documents to embed.
#[derive(Debug, Clone, Copy)]
pub enum EmbedSelection {
    /// New or changed documents (default).
    Pending,
    /// Only previously failed documents.
    RetryFailed,
}

/// Result of embedding run.
#[derive(Debug, Default)]
pub struct EmbedResult {
    pub embedded: usize,
    pub failed: usize,
    pub skipped: usize,
}

/// Embed documents that need embedding.
///
/// Process:
/// 1. Select documents needing embeddings:
///    - Pending: missing embedding_metadata row OR content_hash mismatch
///    - RetryFailed: embedding_metadata.last_error IS NOT NULL
/// 2. Page through candidates using keyset pagination (id > last_id)
///    to avoid rescanning already-processed rows
/// 3. For each document: split content into chunks via `split_into_chunks()`
///    - Documents <= 32k chars produce 1 chunk (common case, no overhead)
///    - Longer documents split at paragraph boundaries with 500-char overlap
/// 4. Clear existing chunks for changed documents (`clear_document_embeddings()`)
/// 5. Batch chunk texts -> Ollama `/api/embed` with concurrent HTTP requests
///    (concurrency = max in-flight HTTP requests to Ollama)
/// 6. Write chunk embeddings + embedding_metadata in per-batch transactions
///    - rowid = document_id * 1000 + chunk_index
/// 7. Failed batches record `last_error` in embedding_metadata
///    (excluded from Pending selection; retried via RetryFailed)
/// 8. Progress reported as (embedded + failed) vs total_pending
pub async fn embed_documents(
    conn: &Connection,
    client: &OllamaClient,
    selection: EmbedSelection,
    concurrency: usize,
    progress_callback: Option<Box<dyn Fn(usize, usize)>>,
) -> Result<EmbedResult> {
    use futures::stream::{FuturesUnordered, StreamExt};

    let mut result = EmbedResult::default();
    let total_pending = count_pending_documents(conn, selection)?;

    if total_pending == 0 {
        return Ok(result);
    }

    // Page through pending documents using keyset pagination to avoid
    // both memory pressure and OFFSET performance degradation.
    let mut last_id: i64 = 0;
    loop {
        let pending = find_pending_documents(conn, DB_PAGE_SIZE, last_id, selection)?;
        if pending.is_empty() {
            break;
        }

        // Launch concurrent HTTP requests, collect results
        let mut futures = FuturesUnordered::new();

        // Build ChunkWork items: split each document into chunks, clear old embeddings
        let mut work: Vec<ChunkWork> = Vec::new();
        {
            let tx = conn.transaction()?;
            for doc in &pending {
                clear_document_embeddings(&tx, doc.id)?;
                for (chunk_index, chunk_text) in split_into_chunks(&doc.content) {
                    work.push(ChunkWork {
                        doc_id: doc.id,
                        chunk_index,
                        doc_hash: doc.content_hash.clone(),
                        chunk_hash: crate::documents::compute_content_hash(&chunk_text),
                        text: chunk_text,
                    });
                }
            }
            tx.commit()?;
        }

        // Batch ChunkWork texts into Ollama API calls
        for batch in work.chunks(BATCH_SIZE) {
            let texts: Vec<String> = batch.iter().map(|w| w.text.clone()).collect();
            let batch_meta: Vec<ChunkWork> = batch.to_vec();

            futures.push(async move {
                let embed_result = client.embed_batch(texts).await;
                (batch_meta, embed_result)
            });

            // Cap in-flight requests
            if futures.len() >= concurrency {
                if let Some((meta, res)) = futures.next().await {
                    collect_writes(conn, &meta, res, &mut result)?;
                }
            }
        }

        // Drain remaining futures
        while let Some((meta, res)) = futures.next().await {
            collect_writes(conn, &meta, res, &mut result)?;
        }

        // Advance keyset cursor for next page
        if let Some(last) = pending.last() {
            last_id = last.id;
        }

        if let Some(ref cb) = progress_callback {
            cb(result.embedded + result.failed, total_pending);
        }
    }

    Ok(result)
}

/// Chunk work item prepared for embedding.
struct ChunkWork {
    doc_id: i64,
    chunk_index: usize,
    doc_hash: String,     // SHA-256 of full document content (staleness detection)
    chunk_hash: String,   // SHA-256 of this chunk's text (provenance)
    text: String,
}

/// Collect embedding results and write to DB (sequential, on main thread).
///
/// IMPORTANT: Validates embedding dimensions to prevent silent corruption.
/// If model returns wrong dimensions (e.g., different model configured),
/// the document is marked as failed rather than storing corrupt data.
///
/// NOTE: batch_meta contains ChunkWork items (not raw documents) because the
/// caller must split documents into chunks before batching for Ollama /api/embed.
/// Each ChunkWork carries both doc_hash (for staleness) and chunk_hash (for provenance).
fn collect_writes(
    conn: &Connection,
    batch_meta: &[ChunkWork],
    embed_result: Result<Vec<Vec<f32>>>,
    result: &mut EmbedResult,
) -> Result<()> {
    let tx = conn.transaction()?;
    match embed_result {
        Ok(embeddings) => {
            for (chunk, embedding) in batch_meta.iter().zip(embeddings.iter()) {
                // Validate dimensions to prevent silent corruption
                if embedding.len() != EXPECTED_DIMS {
                    record_embedding_error(
                        &tx,
                        chunk.doc_id,
                        chunk.chunk_index,
                        &chunk.doc_hash,
                        &chunk.chunk_hash,
                        &format!(
                            "embedding dimension mismatch: got {}, expected {}",
                            embedding.len(),
                            EXPECTED_DIMS
                        ),
                    )?;
                    result.failed += 1;
                    continue;
                }
                store_embedding(&tx, chunk.doc_id, chunk.chunk_index, embedding, &chunk.doc_hash, &chunk.chunk_hash)?;
                result.embedded += 1;
            }
        }
        Err(e) => {
            for chunk in batch_meta {
                record_embedding_error(&tx, chunk.doc_id, chunk.chunk_index, &chunk.doc_hash, &chunk.chunk_hash, &e.to_string())?;
                result.failed += 1;
            }
        }
    }
    tx.commit()?;
    Ok(())
}

struct PendingDocument {
    id: i64,
    content: String,
    content_hash: String,
}

/// Count total pending documents (for progress reporting).
///
/// NOTE: With chunked embeddings, we detect pending at the document level.
/// A document needs embedding if:
/// - No embedding_metadata rows exist for it (new document)
/// - Its document_hash changed (chunk_index=0 row has mismatched hash)
/// This avoids counting individual chunks as separate pending items.
///
/// IMPORTANT: Uses `document_hash` (not `chunk_hash`) because staleness is
/// a document-level condition. See migration 009 comment for details.
fn count_pending_documents(conn: &Connection, selection: EmbedSelection) -> Result<usize> {
    let sql = match selection {
        EmbedSelection::Pending =>
            "SELECT COUNT(*)
             FROM documents d
             LEFT JOIN embedding_metadata em ON d.id = em.document_id AND em.chunk_index = 0
             WHERE em.document_id IS NULL
                OR em.document_hash != d.content_hash",
        EmbedSelection::RetryFailed =>
            "SELECT COUNT(DISTINCT d.id)
             FROM documents d
             JOIN embedding_metadata em ON d.id = em.document_id
             WHERE em.last_error IS NOT NULL",
    };
    let count: usize = conn.query_row(sql, [], |row| row.get(0))?;
    Ok(count)
}

/// Find pending documents for embedding using keyset pagination.
///
/// IMPORTANT: Uses keyset pagination (d.id > last_id) instead of OFFSET.
/// OFFSET degrades O(n²) on large result sets because SQLite must scan
/// and discard all rows before the offset. Keyset pagination is O(1) per page
/// since the index seek goes directly to the starting row.
fn find_pending_documents(
    conn: &Connection,
    limit: usize,
    last_id: i64,
    selection: EmbedSelection,
) -> Result<Vec<PendingDocument>> {
    // NOTE: Finds documents needing embedding at the document level.
    // The caller is responsible for chunking content_text via split_into_chunks()
    // and calling clear_document_embeddings() before storing new chunks.
    // Uses document_hash (not chunk_hash) for staleness detection.
    let sql = match selection {
        EmbedSelection::Pending =>
            "SELECT d.id, d.content_text, d.content_hash
             FROM documents d
             LEFT JOIN embedding_metadata em ON d.id = em.document_id AND em.chunk_index = 0
             WHERE (em.document_id IS NULL
                OR em.document_hash != d.content_hash)
               AND d.id > ?
             ORDER BY d.id
             LIMIT ?",
        EmbedSelection::RetryFailed =>
            "SELECT DISTINCT d.id, d.content_text, d.content_hash
             FROM documents d
             JOIN embedding_metadata em ON d.id = em.document_id
             WHERE em.last_error IS NOT NULL
               AND d.id > ?
             ORDER BY d.id
             LIMIT ?",
    };
    let mut stmt = conn.prepare(sql)?;

    let docs = stmt
        .query_map(rusqlite::params![last_id, limit as i64], |row| {
            Ok(PendingDocument {
                id: row.get(0)?,
                content: row.get(1)?,
                content_hash: row.get(2)?,
            })
        })?
        .collect::<std::result::Result<Vec<_>, _>>()?;

    Ok(docs)
}

/// Clear all existing chunk embeddings for a document before re-embedding.
/// Called when content_hash changes (chunk count may differ).
fn clear_document_embeddings(tx: &rusqlite::Transaction, document_id: i64) -> Result<()> {
    // Delete vec0 rows (range covers all possible chunk indices)
    tx.execute(
        "DELETE FROM embeddings WHERE rowid >= ? AND rowid < ?",
        rusqlite::params![
            document_id * CHUNK_ROWID_MULTIPLIER,
            (document_id + 1) * CHUNK_ROWID_MULTIPLIER,
        ],
    )?;
    // Delete metadata rows
    tx.execute(
        "DELETE FROM embedding_metadata WHERE document_id = ?",
        rusqlite::params![document_id],
    )?;
    Ok(())
}

fn store_embedding(
    tx: &rusqlite::Transaction,
    document_id: i64,
    chunk_index: usize,
    embedding: &[f32],
    doc_hash: &str,
    chunk_hash: &str,
) -> Result<()> {
    let rowid = encode_rowid(document_id, chunk_index);

    // Convert embedding to bytes for sqlite-vec
    // sqlite-vec expects raw little-endian bytes, not the array directly
    let embedding_bytes: Vec<u8> = embedding
        .iter()
        .flat_map(|f| f.to_le_bytes())
        .collect();

    // Store in sqlite-vec (rowid encodes doc_id + chunk_index)
    tx.execute(
        "INSERT OR REPLACE INTO embeddings(rowid, embedding) VALUES (?, ?)",
        rusqlite::params![rowid, embedding_bytes],
    )?;

    // Update metadata (per-chunk) with both hashes:
    // - document_hash for staleness detection (compared to documents.content_hash)
    // - chunk_hash for provenance/debug
    let now = crate::core::time::now_ms();
    tx.execute(
        "INSERT OR REPLACE INTO embedding_metadata
         (document_id, chunk_index, model, dims, document_hash, chunk_hash, created_at, last_error, attempt_count, last_attempt_at)
         VALUES (?, ?, 'nomic-embed-text', 768, ?, ?, ?, NULL, 0, ?)",
        rusqlite::params![document_id, chunk_index, doc_hash, chunk_hash, now, now],
    )?;

    Ok(())
}

fn record_embedding_error(
    tx: &rusqlite::Transaction,
    document_id: i64,
    chunk_index: usize,
    doc_hash: &str,
    chunk_hash: &str,
    error: &str,
) -> Result<()> {
    let now = crate::core::time::now_ms();
    tx.execute(
        "INSERT INTO embedding_metadata
         (document_id, chunk_index, model, dims, document_hash, chunk_hash, created_at, last_error, attempt_count, last_attempt_at)
         VALUES (?, ?, 'nomic-embed-text', 768, ?, ?, ?, ?, 1, ?)
         ON CONFLICT(document_id, chunk_index) DO UPDATE SET
           last_error = excluded.last_error,
           attempt_count = attempt_count + 1,
           last_attempt_at = excluded.last_attempt_at",
        rusqlite::params![document_id, chunk_index, doc_hash, chunk_hash, now, error, now],
    )?;

    Ok(())
}

Acceptance Criteria:

  • New documents get embedded (all chunks)
  • Changed documents (hash mismatch) get re-embedded: existing chunks cleared first via clear_document_embeddings()
  • Documents under 32k chars produce exactly 1 chunk (chunk_index=0)
  • Long documents split at paragraph boundaries with 500-char overlap
  • Unchanged documents skipped
  • Failures recorded in embedding_metadata.last_error per chunk
  • Failures record actual content_hash (not empty string)
  • Writes batched in transactions for performance
  • concurrency parameter controls max concurrent HTTP requests to Ollama API
  • Progress reported during embedding
  • Deterministic ORDER BY d.id ensures consistent paging
  • EmbedSelection parameter controls pending vs retry-failed mode

4.5 CLI: lore embed

File: src/cli/commands/embed.rs

//! Embed command - generate embeddings for documents.

use indicatif::{ProgressBar, ProgressStyle};
use serde::Serialize;

use crate::core::error::Result;
use crate::embedding::{embed_documents, EmbedResult, OllamaClient, OllamaConfig};
use crate::Config;

/// Run embedding command.
pub async fn run_embed(
    config: &Config,
    retry_failed: bool,
) -> Result<EmbedResult> {
    use crate::core::db::create_connection;
    use crate::embedding::pipeline::EmbedSelection;

    let ollama_config = OllamaConfig {
        base_url: config.embedding.base_url.clone(),
        model: config.embedding.model.clone(),
        timeout_secs: 120,
    };

    let client = OllamaClient::new(ollama_config);

    // Health check
    client.health_check().await?;

    // Open database connection
    let conn = create_connection(&config.storage.db_path)?;

    // Determine selection mode
    let selection = if retry_failed {
        EmbedSelection::RetryFailed
    } else {
        EmbedSelection::Pending
    };

    // Run embedding
    let result = embed_documents(
        &conn,
        &client,
        selection,
        config.embedding.concurrency as usize,
        None,
    ).await?;

    Ok(result)
}

/// Print human-readable output.
pub fn print_embed(result: &EmbedResult, elapsed_secs: u64) {
    println!("Embedding complete:");
    println!("  Embedded: {:>6} documents", result.embedded);
    println!("  Failed:   {:>6} documents", result.failed);
    println!("  Skipped:  {:>6} documents", result.skipped);
    println!("  Elapsed: {}m {}s", elapsed_secs / 60, elapsed_secs % 60);
}

/// Print JSON output for robot mode.
pub fn print_embed_json(result: &EmbedResult, elapsed_ms: u64) {
    let output = serde_json::json!({
        "ok": true,
        "data": {
            "embedded": result.embedded,
            "failed": result.failed,
            "skipped": result.skipped
        },
        "meta": {
            "elapsed_ms": elapsed_ms
        }
    });
    println!("{}", serde_json::to_string_pretty(&output).unwrap());
}

CLI integration:

/// Embed subcommand arguments.
#[derive(Args)]
pub struct EmbedArgs {
    /// Retry only previously failed documents
    #[arg(long)]
    retry_failed: bool,
}

Acceptance Criteria:

  • Embeds documents without embeddings
  • Re-embeds documents with changed hash
  • --retry-failed only processes failed documents
  • Progress bar with count
  • Clear error if Ollama unavailable

4.6 CLI: lore stats

File: src/cli/commands/stats.rs

//! Stats command - display document and embedding statistics.

use rusqlite::Connection;
use serde::Serialize;

use crate::core::error::Result;
use crate::Config;

/// Document statistics.
#[derive(Debug, Serialize)]
pub struct Stats {
    pub documents: DocumentStats,
    pub embeddings: EmbeddingStats,
    pub fts: FtsStats,
    pub queues: QueueStats,
}

#[derive(Debug, Serialize)]
pub struct DocumentStats {
    pub issues: usize,
    pub mrs: usize,
    pub discussions: usize,
    pub total: usize,
    pub truncated: usize,
}

#[derive(Debug, Serialize)]
pub struct EmbeddingStats {
    /// Documents with at least one embedding (chunk_index=0 exists in embedding_metadata)
    pub embedded: usize,
    pub pending: usize,
    pub failed: usize,
    /// embedded / total_documents * 100 (document-level, not chunk-level)
    pub coverage_pct: f64,
    /// Total chunks across all embedded documents
    pub total_chunks: usize,
}

#[derive(Debug, Serialize)]
pub struct FtsStats {
    pub indexed: usize,
}

/// Queue statistics for observability.
///
/// Exposes internal queue depths so operators can detect backlogs
/// and failing items that need manual intervention.
#[derive(Debug, Serialize)]
pub struct QueueStats {
    /// Items in dirty_sources queue (pending document regeneration)
    pub dirty_sources: usize,
    /// Items in dirty_sources with last_error set (failing regeneration)
    pub dirty_sources_failed: usize,
    /// Items in pending_discussion_fetches queue
    pub pending_discussion_fetches: usize,
    /// Items in pending_discussion_fetches with last_error set
    pub pending_discussion_fetches_failed: usize,
}

/// Integrity check result.
#[derive(Debug, Serialize)]
pub struct IntegrityCheck {
    pub documents_count: usize,
    pub fts_count: usize,
    pub embeddings_count: usize,
    pub metadata_count: usize,
    pub orphaned_embeddings: usize,
    pub hash_mismatches: usize,
    pub ok: bool,
}

/// Run stats command.
pub fn run_stats(config: &Config) -> Result<Stats> {
    // Query counts from database
    todo!()
}

/// Run integrity check (--check flag).
///
/// Verifies:
/// - documents count == documents_fts count
/// - All embeddings.rowid decode to valid document IDs (rowid / 1000 exists in documents.id)
/// - embedding_metadata.document_hash == documents.content_hash for chunk_index=0 rows
pub fn run_integrity_check(config: &Config) -> Result<IntegrityCheck> {
    // 1. Count documents
    // 2. Count FTS entries
    // 3. Find orphaned embeddings (rowid / 1000 not in documents.id)
    // 4. Find hash mismatches between embedding_metadata (chunk_index=0) and documents
    // 5. Return check results
    todo!()
}

/// Repair result from --repair flag.
#[derive(Debug, Serialize)]
pub struct RepairResult {
    pub orphaned_embeddings_deleted: usize,
    pub stale_embeddings_cleared: usize,
    pub missing_fts_repopulated: usize,
}

/// Repair issues found by integrity check (--repair flag).
///
/// Fixes:
/// - Deletes orphaned embeddings (embedding_metadata rows with no matching document)
/// - Clears stale embedding_metadata (hash mismatch) so they get re-embedded
/// - Rebuilds FTS index from scratch (correct-by-construction)
///
/// NOTE: FTS repair uses `rebuild` rather than partial row insertion.
/// With `content='documents'` (external-content FTS), partial repopulation
/// via INSERT of missing rows is fragile — if the external content table
/// and FTS content diverge in any way, partial fixes can leave the index
/// in an inconsistent state. A full rebuild is slower but guaranteed correct.
pub fn run_repair(config: &Config) -> Result<RepairResult> {
    let conn = create_connection(&config.storage.db_path)?;

    // Delete orphaned embedding_metadata (no matching document)
    let orphaned_deleted = conn.execute(
        "DELETE FROM embedding_metadata
         WHERE document_id NOT IN (SELECT id FROM documents)",
        [],
    )?;

    // Also delete orphaned chunks from embeddings virtual table (sqlite-vec).
    // Chunked rowids: doc_id * 1000 + chunk_index. Orphan = rowid / 1000 not in documents.
    conn.execute(
        "DELETE FROM embeddings
         WHERE rowid / 1000 NOT IN (SELECT id FROM documents)",
        [],
    )?;

    // Clear stale embeddings (document-level hash mismatch) - will be re-embedded.
    // Uses document_hash (not chunk_hash) because staleness is document-level:
    // if the document changed, ALL chunks for that document are stale.
    // Must also delete corresponding vec0 rows (range deletion for all chunks).
    let stale_doc_ids: Vec<i64> = {
        let mut stmt = conn.prepare(
            "SELECT DISTINCT d.id
             FROM documents d
             JOIN embedding_metadata em
               ON em.document_id = d.id AND em.chunk_index = 0
             WHERE em.document_hash != d.content_hash"
        )?;
        stmt.query_map([], |r| r.get(0))?
            .collect::<std::result::Result<Vec<_>, _>>()?
    };
    let mut stale_cleared: usize = 0;
    for doc_id in &stale_doc_ids {
        stale_cleared += conn.execute(
            "DELETE FROM embedding_metadata WHERE document_id = ?",
            [doc_id],
        )? as usize;
        // Also clean up vec0 rows (range covers all chunk indices for this doc)
        conn.execute(
            "DELETE FROM embeddings WHERE rowid >= ? AND rowid < ?",
            rusqlite::params![doc_id * 1000, (doc_id + 1) * 1000],
        )?;
    }

    // Rebuild FTS index from scratch — correct-by-construction.
    // This re-reads all rows from the external content table (documents)
    // and rebuilds the index. Slower than partial fix but guaranteed consistent.
    conn.execute(
        "INSERT INTO documents_fts(documents_fts) VALUES('rebuild')",
        [],
    )?;
    let fts_rebuilt = 1; // rebuild is all-or-nothing

    Ok(RepairResult {
        orphaned_embeddings_deleted: orphaned_deleted,
        stale_embeddings_cleared: stale_cleared,
        missing_fts_repopulated: fts_rebuilt,
    })
}

/// Print human-readable stats.
pub fn print_stats(stats: &Stats) {
    println!("Document Statistics:");
    println!("  Issues:      {:>6} documents", stats.documents.issues);
    println!("  MRs:         {:>6} documents", stats.documents.mrs);
    println!("  Discussions: {:>6} documents", stats.documents.discussions);
    println!("  Total:       {:>6} documents", stats.documents.total);
    if stats.documents.truncated > 0 {
        println!("  Truncated:   {:>6}", stats.documents.truncated);
    }
    println!();
    println!("Embedding Coverage:");
    println!("  Embedded: {:>6} ({:.1}%)", stats.embeddings.embedded, stats.embeddings.coverage_pct);
    println!("  Pending:  {:>6}", stats.embeddings.pending);
    println!("  Failed:   {:>6}", stats.embeddings.failed);
    println!();
    println!("FTS Index:");
    println!("  Indexed:  {:>6} documents", stats.fts.indexed);
    println!();
    println!("Queue Depths:");
    println!("  Dirty sources:     {:>6} ({} failed)",
        stats.queues.dirty_sources,
        stats.queues.dirty_sources_failed
    );
    println!("  Discussion fetches:{:>6} ({} failed)",
        stats.queues.pending_discussion_fetches,
        stats.queues.pending_discussion_fetches_failed
    );
}

/// Print integrity check results.
pub fn print_integrity_check(check: &IntegrityCheck) {
    println!("Integrity Check:");
    println!("  Documents:      {:>6}", check.documents_count);
    println!("  FTS entries:    {:>6}", check.fts_count);
    println!("  Embeddings:     {:>6}", check.embeddings_count);
    println!("  Metadata:       {:>6}", check.metadata_count);
    if check.orphaned_embeddings > 0 {
        println!("  Orphaned embeddings: {:>6} (WARN)", check.orphaned_embeddings);
    }
    if check.hash_mismatches > 0 {
        println!("  Hash mismatches:     {:>6} (WARN)", check.hash_mismatches);
    }
    println!();
    println!("  Status: {}", if check.ok { "OK" } else { "ISSUES FOUND" });
}

/// Print JSON stats for robot mode.
pub fn print_stats_json(stats: &Stats) {
    let output = serde_json::json!({
        "ok": true,
        "data": stats
    });
    println!("{}", serde_json::to_string_pretty(&output).unwrap());
}

/// Print repair results. pub fn print_repair_result(result: &RepairResult) { println!("Repair Results:"); println!(" Orphaned embeddings deleted: {}", result.orphaned_embeddings_deleted); println!(" Stale embeddings cleared: {}", result.stale_embeddings_cleared); println!(" Missing FTS repopulated: {}", result.missing_fts_repopulated); println!(); let total = result.orphaned_embeddings_deleted + result.stale_embeddings_cleared + result.missing_fts_repopulated; if total == 0 { println!(" No issues found to repair."); } else { println!(" Fixed {} issues.", total); } }


**CLI integration:**
```rust
/// Stats subcommand arguments.
#[derive(Args)]
pub struct StatsArgs {
    /// Run integrity checks (document/FTS/embedding consistency)
    #[arg(long)]
    check: bool,

    /// Repair issues found by --check (deletes orphaned embeddings, clears stale metadata)
    #[arg(long, requires = "check")]
    repair: bool,
}

Acceptance Criteria:

  • Shows document counts by type
  • Shows embedding coverage
  • Shows FTS index count
  • Identifies truncated documents
  • Shows queue depths (dirty_sources, pending_discussion_fetches)
  • Shows failed item counts for each queue
  • --check verifies document/FTS/embedding consistency
  • --repair fixes orphaned embeddings, stale metadata, missing FTS entries
  • JSON output for scripting

5.1 Vector Search Function

File: src/search/vector.rs

use rusqlite::Connection;
use crate::core::error::Result;

/// Vector search result.
#[derive(Debug, Clone)]
pub struct VectorResult {
    pub document_id: i64,
    pub distance: f64,  // Lower = more similar
}

/// Search documents using vector similarity.
///
/// Returns document-level results by taking the best (lowest distance)
/// chunk match per document. This deduplicates across chunks so the
/// caller sees one result per document.
///
/// IMPORTANT: sqlite-vec KNN queries require:
/// - k parameter for number of results
/// - embedding passed as raw little-endian bytes
///
/// With chunked embeddings, rowid = document_id * 1000 + chunk_index.
/// We over-fetch chunks (3x limit) to ensure enough unique documents after dedup.
pub fn search_vector(
    conn: &Connection,
    query_embedding: &[f32],
    limit: usize,
) -> Result<Vec<VectorResult>> {
    use crate::embedding::{decode_rowid, CHUNK_ROWID_MULTIPLIER};

    // Convert embedding to bytes for sqlite-vec
    let embedding_bytes: Vec<u8> = query_embedding
        .iter()
        .flat_map(|f| f.to_le_bytes())
        .collect();

    // Over-fetch to ensure enough unique documents after dedup.
    // With avg ~1-2 chunks per doc, 3x is conservative.
    // Explicit i64 type for sqlite-vec k parameter binding.
    let chunk_limit: i64 = (limit * 3) as i64;

    let mut stmt = conn.prepare(
        "SELECT rowid, distance
         FROM embeddings
         WHERE embedding MATCH ? AND k = ?
         ORDER BY distance"
    )?;

    let chunk_results: Vec<(i64, f64)> = stmt
        .query_map(rusqlite::params![embedding_bytes, chunk_limit], |row| {
            Ok((row.get::<_, i64>(0)?, row.get::<_, f64>(1)?))
        })?
        .collect::<std::result::Result<Vec<_>, _>>()?;

    // Deduplicate: keep best (lowest distance) chunk per document
    let mut best_by_doc: std::collections::HashMap<i64, f64> =
        std::collections::HashMap::new();
    for (rowid, distance) in &chunk_results {
        let (doc_id, _chunk_idx) = decode_rowid(*rowid);
        best_by_doc
            .entry(doc_id)
            .and_modify(|d| { if *distance < *d { *d = *distance; } })
            .or_insert(*distance);
    }

    // Sort by distance and take top `limit`
    let mut results: Vec<VectorResult> = best_by_doc
        .into_iter()
        .map(|(doc_id, dist)| VectorResult {
            document_id: doc_id,
            distance: dist,
        })
        .collect();
    results.sort_by(|a, b| a.distance.partial_cmp(&b.distance).unwrap());
    results.truncate(limit);

    Ok(results)
}

Acceptance Criteria:

  • Returns document IDs with distances (deduplicated across chunks)
  • Best chunk distance used per document
  • Lower distance = better match
  • Works with 768-dim vectors
  • Uses k parameter for KNN query
  • Over-fetches chunks (3x limit) to handle dedup without missing documents
  • Embedding passed as bytes

5.2 RRF Ranking

File: src/search/rrf.rs

use std::collections::HashMap;

/// RRF ranking constant.
const RRF_K: f64 = 60.0;

/// RRF-ranked result.
#[derive(Debug, Clone)]
pub struct RrfResult {
    pub document_id: i64,
    pub rrf_score: f64,         // Raw RRF score
    pub normalized_score: f64,  // Normalized to 0-1
    pub vector_rank: Option<usize>,
    pub fts_rank: Option<usize>,
}

/// Rank documents using Reciprocal Rank Fusion.
///
/// Algorithm:
/// RRF_score(d) = Σ 1 / (k + rank_i(d))
///
/// Where:
/// - k = 60 (tunable constant)
/// - rank_i(d) = rank of document d in retriever i (1-indexed)
/// - Sum over all retrievers where document appears
pub fn rank_rrf(
    vector_results: &[(i64, f64)],  // (doc_id, distance)
    fts_results: &[(i64, f64)],     // (doc_id, bm25_score)
) -> Vec<RrfResult> {
    let mut scores: HashMap<i64, (f64, Option<usize>, Option<usize>)> = HashMap::new();

    // Add vector results (1-indexed ranks)
    for (rank, (doc_id, _)) in vector_results.iter().enumerate() {
        let rrf_contribution = 1.0 / (RRF_K + (rank + 1) as f64);
        let entry = scores.entry(*doc_id).or_insert((0.0, None, None));
        entry.0 += rrf_contribution;
        entry.1 = Some(rank + 1);
    }

    // Add FTS results (1-indexed ranks)
    for (rank, (doc_id, _)) in fts_results.iter().enumerate() {
        let rrf_contribution = 1.0 / (RRF_K + (rank + 1) as f64);
        let entry = scores.entry(*doc_id).or_insert((0.0, None, None));
        entry.0 += rrf_contribution;
        entry.2 = Some(rank + 1);
    }

    // Convert to results and sort by RRF score descending
    let mut results: Vec<_> = scores
        .into_iter()
        .map(|(doc_id, (rrf_score, vector_rank, fts_rank))| {
            RrfResult {
                document_id: doc_id,
                rrf_score,
                normalized_score: 0.0, // Will be set below
                vector_rank,
                fts_rank,
            }
        })
        .collect();

    results.sort_by(|a, b| b.rrf_score.partial_cmp(&a.rrf_score).unwrap());

    // Normalize scores to 0-1
    if let Some(max_score) = results.first().map(|r| r.rrf_score) {
        for result in &mut results {
            result.normalized_score = result.rrf_score / max_score;
        }
    }

    results
}

Acceptance Criteria:

  • Documents in both lists score higher
  • Documents in one list still included
  • Normalized score = rrfScore / max(rrfScore)
  • Raw RRF score available in --explain output

5.3 Adaptive Recall

File: src/search/hybrid.rs

use rusqlite::Connection;

use crate::core::error::Result;
use crate::embedding::OllamaClient;
use crate::search::{SearchFilters, SearchMode, search_fts, search_vector, rank_rrf, RrfResult, FtsQueryMode};

/// Minimum base recall for unfiltered search.
const BASE_RECALL_MIN: usize = 50;

/// Minimum recall when filters are applied.
const FILTERED_RECALL_MIN: usize = 200;

/// Maximum recall to prevent excessive resource usage.
const RECALL_CAP: usize = 1500;

/// Search mode.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SearchMode {
    Hybrid,    // Vector + FTS with RRF
    Lexical,   // FTS only
    Semantic,  // Vector only
}

impl SearchMode {
    pub fn from_str(s: &str) -> Option<Self> {
        match s.to_lowercase().as_str() {
            "hybrid" => Some(Self::Hybrid),
            "lexical" | "fts" => Some(Self::Lexical),
            "semantic" | "vector" => Some(Self::Semantic),
            _ => None,
        }
    }

    pub fn as_str(&self) -> &'static str {
        match self {
            Self::Hybrid => "hybrid",
            Self::Lexical => "lexical",
            Self::Semantic => "semantic",
        }
    }
}

/// Hybrid search result.
#[derive(Debug)]
pub struct HybridResult {
    pub document_id: i64,
    pub score: f64,
    pub vector_rank: Option<usize>,
    pub fts_rank: Option<usize>,
    pub rrf_score: f64,
}

/// Execute hybrid search.
///
/// Adaptive recall: expands topK proportionally to requested limit and filter
/// restrictiveness to prevent "no results" when relevant docs would be filtered out.
///
/// Formula:
/// - Unfiltered: max(50, limit * 10), capped at 1500
/// - Filtered: max(200, limit * 50), capped at 1500
///
/// IMPORTANT: All modes use RRF consistently to ensure rank fields
/// are populated correctly for --explain output.
pub async fn search_hybrid(
    conn: &Connection,
    client: Option<&OllamaClient>,
    ollama_base_url: Option<&str>,  // For actionable error messages
    query: &str,
    mode: SearchMode,
    filters: &SearchFilters,
    fts_mode: FtsQueryMode,
) -> Result<(Vec<HybridResult>, Vec<String>)> {
    let mut warnings: Vec<String> = Vec::new();

    // Adaptive recall: proportional to requested limit and filter count
    let requested = filters.clamp_limit();
    let top_k = if filters.has_any_filter() {
        (requested * 50).max(FILTERED_RECALL_MIN).min(RECALL_CAP)
    } else {
        (requested * 10).max(BASE_RECALL_MIN).min(RECALL_CAP)
    };

    match mode {
        SearchMode::Lexical => {
            // FTS only - use RRF with empty vector results for consistent ranking
            let fts_results = search_fts(conn, query, top_k, fts_mode)?;

            let fts_tuples: Vec<_> = fts_results.iter().map(|r| (r.document_id, r.rank)).collect();
            let ranked = rank_rrf(&[], &fts_tuples);

            let results = ranked
                .into_iter()
                .map(|r| HybridResult {
                    document_id: r.document_id,
                    score: r.normalized_score,
                    vector_rank: r.vector_rank,
                    fts_rank: r.fts_rank,
                    rrf_score: r.rrf_score,
                })
                .collect();
            Ok((results, warnings))
        }
        SearchMode::Semantic => {
            // Vector only - requires client
            let client = client.ok_or_else(|| crate::core::error::LoreError::OllamaUnavailable {
                base_url: ollama_base_url.unwrap_or("http://localhost:11434").into(),
                source: None,
            })?;

            let query_embedding = client.embed_batch(vec![query.to_string()]).await?;
            let embedding = query_embedding.into_iter().next().unwrap();

            let vec_results = search_vector(conn, &embedding, top_k)?;

            // Use RRF with empty FTS results for consistent ranking
            let vec_tuples: Vec<_> = vec_results.iter().map(|r| (r.document_id, r.distance)).collect();
            let ranked = rank_rrf(&vec_tuples, &[]);

            let results = ranked
                .into_iter()
                .map(|r| HybridResult {
                    document_id: r.document_id,
                    score: r.normalized_score,
                    vector_rank: r.vector_rank,
                    fts_rank: r.fts_rank,
                    rrf_score: r.rrf_score,
                })
                .collect();
            Ok((results, warnings))
        }
        SearchMode::Hybrid => {
            // Both retrievers with RRF fusion
            let fts_results = search_fts(conn, query, top_k, fts_mode)?;

            // Attempt vector search with graceful degradation on any failure
            let vec_results = match client {
                Some(client) => {
                    // Try to embed query; gracefully degrade on transient failures
                    match client.embed_batch(vec![query.to_string()]).await {
                        Ok(embeddings) => {
                            let embedding = embeddings.into_iter().next().unwrap();
                            search_vector(conn, &embedding, top_k)?
                        }
                        Err(e) => {
                            // Transient failure (network, timeout, rate limit, etc.)
                            // Log and fall back to FTS-only rather than failing the search
                            tracing::warn!("Vector search failed, falling back to lexical: {}", e);
                            warnings.push(format!(
                                "Vector search unavailable ({}), using lexical search only",
                                e
                            ));
                            Vec::new()
                        }
                    }
                }
                None => {
                    // No client configured
                    warnings.push("Embedding service unavailable, using lexical search only".into());
                    Vec::new()
                }
            };

            // RRF fusion
            let vec_tuples: Vec<_> = vec_results.iter().map(|r| (r.document_id, r.distance)).collect();
            let fts_tuples: Vec<_> = fts_results.iter().map(|r| (r.document_id, r.rank)).collect();

            let ranked = rank_rrf(&vec_tuples, &fts_tuples);

            let results = ranked
                .into_iter()
                .map(|r| HybridResult {
                    document_id: r.document_id,
                    score: r.normalized_score,
                    vector_rank: r.vector_rank,
                    fts_rank: r.fts_rank,
                    rrf_score: r.rrf_score,
                })
                .collect();
            Ok((results, warnings))
        }
    }
}

Acceptance Criteria:

  • Unfiltered search uses topK=max(50, limit*10), capped at 1500
  • Filtered search uses topK=max(200, limit*50), capped at 1500
  • Final results still limited by --limit
  • Adaptive recall prevents "no results" under heavy filtering

5.4 Graceful Degradation

When Ollama unavailable during hybrid/semantic search:

  1. Log warning: "Embedding service unavailable, using lexical search only"
  2. Fall back to FTS-only search
  3. Include warning in response

Acceptance Criteria:

  • Default mode is hybrid
  • --mode=lexical works without Ollama
  • --mode=semantic requires Ollama
  • Graceful degradation when Ollama down
  • --explain shows rank breakdown
  • All Phase 3 filters work in hybrid mode

Phase 6: Sync Orchestration

6.1 Dirty Source Tracking

File: src/ingestion/dirty_tracker.rs

use rusqlite::Connection;
use crate::core::error::Result;
use crate::core::time::now_ms;
use crate::documents::SourceType;

/// Batch size when draining dirty_sources queue.
/// We drain repeatedly in a loop until no ready items remain,
/// so this is a batch size (not a hard ceiling per invocation).
/// This bounds WAL growth and memory per iteration while ensuring
/// `generate-docs` and `sync` converge in a single invocation.
const DIRTY_SOURCES_BATCH_SIZE: usize = 500;

/// Mark a source as dirty (needs document regeneration) — transactional variant.
///
/// Called during entity upsert operations INSIDE ingestion transactions.
/// The `_tx` suffix enforces the invariant that dirty marking happens
/// atomically with the entity upsert — callers cannot accidentally use
/// a bare Connection and break atomicity.
///
/// IMPORTANT: Uses upsert with backoff reset, NOT `INSERT OR IGNORE`.
/// `INSERT OR IGNORE` would silently drop re-queues for entities that are
/// already in the queue with a long `next_attempt_at` due to previous failures.
/// This means fresh updates would wait behind stale backoff windows — exactly
/// the opposite of what we want for incremental sync.
///
/// The ON CONFLICT clause resets all backoff/error state so the entity is
/// immediately eligible for processing on the next regeneration pass.
pub fn mark_dirty_tx(
    tx: &rusqlite::Transaction<'_>,
    source_type: SourceType,
    source_id: i64,
) -> Result<()> {
    tx.execute(
        "INSERT INTO dirty_sources
         (source_type, source_id, queued_at, attempt_count, last_attempt_at, last_error, next_attempt_at)
         VALUES (?, ?, ?, 0, NULL, NULL, NULL)
         ON CONFLICT(source_type, source_id) DO UPDATE SET
           queued_at        = excluded.queued_at,
           attempt_count    = 0,
           last_attempt_at  = NULL,
           last_error       = NULL,
           next_attempt_at  = NULL",
        rusqlite::params![source_type.as_str(), source_id, now_ms()],
    )?;
    Ok(())
}

/// Convenience wrapper for non-transactional contexts (e.g., CLI `--full` seeding).
/// Wraps a single mark_dirty call in its own transaction.
pub fn mark_dirty(conn: &Connection, source_type: SourceType, source_id: i64) -> Result<()> {
    let tx = conn.transaction()?;
    mark_dirty_tx(&tx, source_type, source_id)?;
    tx.commit()?;
    Ok(())
}

/// Get dirty sources ready for processing.
///
/// Uses `next_attempt_at` for efficient, index-friendly backoff queries.
/// Items with NULL `next_attempt_at` are ready immediately (first attempt).
/// Items with `next_attempt_at <= now` have waited long enough after failure.
///
/// Benefits over SQL bitshift calculation:
/// - No overflow risk from large attempt_count values
/// - Index-friendly: `WHERE next_attempt_at <= ?`
/// - Jitter can be added in Rust when computing next_attempt_at
///
/// This prevents hot-loop retries when a source consistently fails
/// to generate a document (e.g., malformed data, missing references).
pub fn get_dirty_sources(conn: &Connection) -> Result<Vec<(SourceType, i64)>> {
    let now = now_ms();

    let mut stmt = conn.prepare(
        "SELECT source_type, source_id
         FROM dirty_sources
         WHERE next_attempt_at IS NULL OR next_attempt_at <= ?
         ORDER BY attempt_count ASC, queued_at ASC
         LIMIT ?"
    )?;

    let results = stmt
        .query_map(rusqlite::params![now, DIRTY_SOURCES_BATCH_SIZE], |row| {
            let type_str: String = row.get(0)?;
            let source_type = match type_str.as_str() {
                "issue" => SourceType::Issue,
                "merge_request" => SourceType::MergeRequest,
                "discussion" => SourceType::Discussion,
                other => return Err(rusqlite::Error::FromSqlConversionFailure(
                    0,
                    rusqlite::types::Type::Text,
                    Box::new(std::io::Error::new(
                        std::io::ErrorKind::InvalidData,
                        format!("invalid source_type: {other}"),
                    )),
                )),
            };
            Ok((source_type, row.get(1)?))
        })?
        .collect::<std::result::Result<Vec<_>, _>>()?;

    Ok(results)
}

/// Clear dirty source after processing.
pub fn clear_dirty(
    conn: &Connection,
    source_type: SourceType,
    source_id: i64,
) -> Result<()> {
    conn.execute(
        "DELETE FROM dirty_sources WHERE source_type = ? AND source_id = ?",
        rusqlite::params![source_type.as_str(), source_id],
    )?;
    Ok(())
}

Integration Points — Where mark_dirty_tx() is Called:

All calls use mark_dirty_tx(&tx, ...) inside the existing ingestion transaction for atomic consistency. The _tx suffix on the function signature enforces this at the API level — callers cannot accidentally use a bare Connection and break atomicity.

Mark ALL upserted entities (not just changed ones). The regenerator's hash comparison handles "unchanged" detection cheaply — this avoids needing change detection in ingestion.

Location Source Type Call When
src/ingestion/issues.rs — issue upsert loop SourceType::Issue mark_dirty_tx(&tx, ...) After each issue INSERT/UPDATE within the batch transaction
src/ingestion/merge_requests.rs — MR upsert loop SourceType::MergeRequest mark_dirty_tx(&tx, ...) After each MR INSERT/UPDATE within the batch transaction
src/ingestion/discussions.rs — issue discussion insert SourceType::Discussion mark_dirty_tx(&tx, ...) After each discussion INSERT within the full-refresh transaction
src/ingestion/mr_discussions.rs — MR discussion upsert SourceType::Discussion mark_dirty_tx(&tx, ...) After each discussion upsert in the write phase

Discussion Sweep Cleanup:

When the MR discussion sweep deletes stale discussions (last_seen_at < run_start_time), also delete the corresponding document rows directly. The ON DELETE CASCADE on document_labels/document_paths and the documents_embeddings_ad trigger handle all downstream cleanup.

-- In src/ingestion/mr_discussions.rs, during sweep phase.
-- Uses a CTE to capture stale IDs atomically before cascading deletes.
-- This is more defensive than two separate statements because the CTE
-- guarantees the ID set is captured before any row is deleted.
WITH stale AS (
  SELECT id FROM discussions
  WHERE merge_request_id = ? AND last_seen_at < ?
)
-- Step 1: delete orphaned documents (must happen while source_id still resolves)
DELETE FROM documents
  WHERE source_type = 'discussion' AND source_id IN (SELECT id FROM stale);
-- Step 2: delete the stale discussions themselves
DELETE FROM discussions
  WHERE id IN (SELECT id FROM stale);
-- NOTE: If your SQLite version doesn't support CTE-based multi-statement,
-- execute as two sequential statements capturing IDs in Rust first:
--   let stale_ids: Vec<i64> = query("SELECT id FROM discussions WHERE ...").collect();
--   DELETE FROM documents WHERE source_type='discussion' AND source_id IN (...)
--   DELETE FROM discussions WHERE id IN (...)

Acceptance Criteria:

  • Upserted entities added to dirty_sources inside ingestion transactions via mark_dirty_tx(&tx, ...)
  • mark_dirty_tx enforces transactional usage at API level (takes &Transaction, not &Connection)
  • ALL upserted entities marked dirty (not just changed ones)
  • Re-queue resets backoff state (upsert with ON CONFLICT, not INSERT OR IGNORE)
  • Queue cleared after document regeneration
  • Queue fully drained via loop (bounded batch size, not hard ceiling per invocation)
  • Exponential backoff uses next_attempt_at (index-friendly, no overflow)
  • Backoff computed with jitter to prevent thundering herd
  • Failed items prioritized lower than fresh items (ORDER BY attempt_count ASC)
  • Discussion sweep uses CTE (or Rust-side ID capture) to atomically identify stale IDs before cascading deletes

6.2 Pending Discussion Queue

File: src/ingestion/discussion_queue.rs

use rusqlite::Connection;
use crate::core::error::Result;
use crate::core::time::now_ms;

/// Noteable type for discussion fetching.
#[derive(Debug, Clone, Copy)]
pub enum NoteableType {
    Issue,
    MergeRequest,
}

impl NoteableType {
    pub fn as_str(&self) -> &'static str {
        match self {
            Self::Issue => "Issue",
            Self::MergeRequest => "MergeRequest",
        }
    }
}

/// Pending discussion fetch entry.
pub struct PendingFetch {
    pub project_id: i64,
    pub noteable_type: NoteableType,
    pub noteable_iid: i64,
    pub attempt_count: i64,
}

/// Queue a discussion fetch for an entity.
///
/// Uses ON CONFLICT DO UPDATE (not INSERT OR REPLACE) for consistency with
/// dirty_sources and to avoid DELETE+INSERT semantics that can trigger
/// unexpected FK behavior and reset fields unintentionally.
pub fn queue_discussion_fetch(
    conn: &Connection,
    project_id: i64,
    noteable_type: NoteableType,
    noteable_iid: i64,
) -> Result<()> {
    conn.execute(
        "INSERT INTO pending_discussion_fetches
         (project_id, noteable_type, noteable_iid, queued_at, attempt_count, last_attempt_at, last_error, next_attempt_at)
         VALUES (?, ?, ?, ?, 0, NULL, NULL, NULL)
         ON CONFLICT(project_id, noteable_type, noteable_iid) DO UPDATE SET
           queued_at        = excluded.queued_at,
           attempt_count    = 0,
           last_attempt_at  = NULL,
           last_error       = NULL,
           next_attempt_at  = NULL",
        rusqlite::params![project_id, noteable_type.as_str(), noteable_iid, now_ms()],
    )?;
    Ok(())
}

/// Get pending fetches ready for processing.
///
/// Uses `next_attempt_at` for efficient, index-friendly backoff queries.
/// Items with NULL `next_attempt_at` are ready immediately (first attempt).
/// Items with `next_attempt_at <= now` have waited long enough after failure.
///
/// Benefits over SQL bitshift calculation:
/// - No overflow risk from large attempt_count values
/// - Index-friendly: `WHERE next_attempt_at <= ?`
/// - Jitter can be added in Rust when computing next_attempt_at
///
/// Limited to `max_items` to bound API calls per sync run.
pub fn get_pending_fetches(conn: &Connection, max_items: usize) -> Result<Vec<PendingFetch>> {
    let now = now_ms();

    let mut stmt = conn.prepare(
        "SELECT project_id, noteable_type, noteable_iid, attempt_count
         FROM pending_discussion_fetches
         WHERE next_attempt_at IS NULL OR next_attempt_at <= ?
         ORDER BY attempt_count ASC, queued_at ASC
         LIMIT ?"
    )?;

    let results = stmt
        .query_map(rusqlite::params![now, max_items], |row| {
            let type_str: String = row.get(1)?;
            let noteable_type = if type_str == "Issue" {
                NoteableType::Issue
            } else {
                NoteableType::MergeRequest
            };
            Ok(PendingFetch {
                project_id: row.get(0)?,
                noteable_type,
                noteable_iid: row.get(2)?,
                attempt_count: row.get(3)?,
            })
        })?
        .collect::<std::result::Result<Vec<_>, _>>()?;

    Ok(results)
}

/// Mark fetch as successful and remove from queue.
pub fn complete_fetch(
    conn: &Connection,
    project_id: i64,
    noteable_type: NoteableType,
    noteable_iid: i64,
) -> Result<()> {
    conn.execute(
        "DELETE FROM pending_discussion_fetches
         WHERE project_id = ? AND noteable_type = ? AND noteable_iid = ?",
        rusqlite::params![project_id, noteable_type.as_str(), noteable_iid],
    )?;
    Ok(())
}

/// Record fetch failure and compute next retry time.
///
/// Computes `next_attempt_at` using exponential backoff with jitter:
/// - Base delay: 1000ms * 2^attempt_count
/// - Cap: 1 hour (3600000ms)
/// - Jitter: ±10% to prevent thundering herd
pub fn record_fetch_error(
    conn: &Connection,
    project_id: i64,
    noteable_type: NoteableType,
    noteable_iid: i64,
    error: &str,
    current_attempt: i64,
) -> Result<()> {
    let now = now_ms();
    let next_attempt = crate::core::backoff::compute_next_attempt_at(now, current_attempt + 1);

    conn.execute(
        "UPDATE pending_discussion_fetches
         SET attempt_count = attempt_count + 1,
             last_attempt_at = ?,
             last_error = ?,
             next_attempt_at = ?
         WHERE project_id = ? AND noteable_type = ? AND noteable_iid = ?",
        rusqlite::params![now, error, next_attempt, project_id, noteable_type.as_str(), noteable_iid],
    )?;
    Ok(())
}

// NOTE: Backoff computation uses the shared utility in `src/core/backoff.rs`.
// See Phase 6.X below for the shared implementation.

Acceptance Criteria:

  • Updated entities queued for discussion fetch
  • Queue uses ON CONFLICT DO UPDATE (not INSERT OR REPLACE) consistent with dirty_sources
  • Re-queue resets backoff state immediately (same semantics as dirty_sources)
  • Success removes from queue
  • Failure increments attempt_count and sets next_attempt_at
  • Processing bounded per run (max 100)
  • Exponential backoff uses next_attempt_at (index-friendly, no overflow)
  • Backoff computed with jitter to prevent thundering herd

6.X Shared Backoff Utility

File: src/core/backoff.rs

Single implementation of exponential backoff with jitter, used by both dirty_sources and pending_discussion_fetches queues. Living in src/core/ because it's a cross-cutting concern used by multiple modules.

use rand::Rng;

/// Compute next_attempt_at with exponential backoff and jitter.
///
/// Formula: now + min(3600000, 1000 * 2^attempt_count) * (0.9 to 1.1)
/// - Capped at 1 hour to prevent runaway delays
/// - ±10% jitter prevents synchronized retries after outages
///
/// Used by:
/// - `dirty_sources` retry scheduling (document regeneration failures)
/// - `pending_discussion_fetches` retry scheduling (API fetch failures)
///
/// Having one implementation prevents subtle divergence between queues
/// (e.g., different caps or jitter ranges).
pub fn compute_next_attempt_at(now: i64, attempt_count: i64) -> i64 {
    // Cap attempt_count to prevent overflow (2^30 > 1 hour anyway)
    let capped_attempts = attempt_count.min(30) as u32;
    let base_delay_ms = 1000_i64.saturating_mul(1 << capped_attempts);
    let capped_delay_ms = base_delay_ms.min(3_600_000); // 1 hour cap

    // Add ±10% jitter
    let jitter_factor = rand::thread_rng().gen_range(0.9..=1.1);
    let delay_with_jitter = (capped_delay_ms as f64 * jitter_factor) as i64;

    now + delay_with_jitter
}

Update src/core/mod.rs:

pub mod backoff;  // Add to existing modules

Acceptance Criteria:

  • Single implementation shared by both queue retry paths
  • Cap at 1 hour prevents runaway delays
  • Jitter prevents thundering herd after outage recovery
  • Unit tests verify backoff curve and cap behavior

6.3 Document Regenerator

File: src/documents/regenerator.rs

use rusqlite::Connection;

use crate::core::error::Result;
use crate::documents::{
    extract_issue_document, extract_mr_document, extract_discussion_document,
    DocumentData, SourceType,
};
use crate::ingestion::dirty_tracker::{get_dirty_sources, clear_dirty};

/// Result of regeneration run.
#[derive(Debug, Default)]
pub struct RegenerateResult {
    pub regenerated: usize,
    pub unchanged: usize,
    pub errored: usize,
}

/// Regenerate documents from dirty queue.
///
/// Process:
/// 1. Query dirty_sources in bounded batches until queue is drained
/// 2. For each: regenerate document, compute new hash
/// 3. ALWAYS upsert document (labels/paths may change even if content_hash unchanged)
/// 4. Track whether content_hash changed (for stats)
/// 5. Delete from dirty_sources (or record error on failure)
///
/// IMPORTANT: Each dirty item is wrapped in its own transaction. This ensures
/// that a crash or interrupt cannot leave partial writes (e.g., document row
/// updated but document_labels only partially repopulated). The queue clear
/// or error record happens inside the same transaction, guaranteeing atomicity.
///
/// NOTE: Drains the queue completely in a loop (not single-pass). This ensures
/// `lore generate-docs` and `lore sync` converge in one invocation even when
/// the queue has more items than the batch size.
pub fn regenerate_dirty_documents(conn: &Connection) -> Result<RegenerateResult> {
    let mut result = RegenerateResult::default();

    loop {
    let dirty = get_dirty_sources(conn)?;
    if dirty.is_empty() { break; }

    for (source_type, source_id) in &dirty {
        let tx = conn.transaction()?;
        match regenerate_one_tx(&tx, *source_type, *source_id) {
            Ok(changed) => {
                if changed {
                    result.regenerated += 1;
                } else {
                    result.unchanged += 1;
                }
                clear_dirty_tx(&tx, *source_type, *source_id)?;
                tx.commit()?;
            }
            Err(e) => {
                // Fail-soft: record error inside same transaction, then commit
                record_dirty_error_tx(&tx, *source_type, *source_id, &e.to_string())?;
                tx.commit()?;
                result.errored += 1;
            }
        }
    }
    } // end drain loop

    Ok(result)
}

// NOTE: regenerate_one_tx, clear_dirty_tx, record_dirty_error_tx,
// upsert_document_tx, delete_document_tx all take &Transaction instead
// of &Connection. This ensures all writes for a single dirty item are
// atomic. The _tx suffix distinguishes from non-transactional helpers.

/// Regenerate a single document. Returns true if content_hash changed.
///
/// If the source entity has been deleted, the corresponding document
/// is also deleted (cascade cleans up labels, paths, embeddings).
fn regenerate_one(
    conn: &Connection,
    source_type: SourceType,
    source_id: i64,
) -> Result<bool> {
    // Extractors return Option: None means source entity was deleted
    let doc = match source_type {
        SourceType::Issue => extract_issue_document(conn, source_id)?,
        SourceType::MergeRequest => extract_mr_document(conn, source_id)?,
        SourceType::Discussion => extract_discussion_document(conn, source_id)?,
    };

    let Some(doc) = doc else {
        // Source was deleted — remove the document (cascade handles FTS/embeddings)
        delete_document(conn, source_type, source_id)?;
        return Ok(true);
    };

    let existing_hash = get_existing_hash(conn, source_type, source_id)?;
    let changed = existing_hash.as_ref() != Some(&doc.content_hash);

    // Always upsert: labels/paths can change independently of content_hash
    upsert_document(conn, &doc)?;

    Ok(changed)
}

/// Delete a document by source identity (cascade handles FTS trigger, labels, paths, embeddings).
fn delete_document(
    conn: &Connection,
    source_type: SourceType,
    source_id: i64,
) -> Result<()> {
    conn.execute(
        "DELETE FROM documents WHERE source_type = ? AND source_id = ?",
        rusqlite::params![source_type.as_str(), source_id],
    )?;
    Ok(())
}

/// Record a regeneration error on a dirty source for retry.
///
/// IMPORTANT: Sets `next_attempt_at` using the shared backoff utility.
/// Without this, failed items would retry every run (hot-loop), defeating
/// the backoff design documented in the schema.
fn record_dirty_error(
    conn: &Connection,
    source_type: SourceType,
    source_id: i64,
    error: &str,
) -> Result<()> {
    let now = now_ms();

    // Read current attempt_count from DB to compute backoff
    let attempt_count: i64 = conn.query_row(
        "SELECT attempt_count FROM dirty_sources WHERE source_type = ? AND source_id = ?",
        rusqlite::params![source_type.as_str(), source_id],
        |row| row.get(0),
    )?;

    // Use shared backoff utility (same as pending_discussion_fetches)
    let next_attempt_at = crate::core::backoff::compute_next_attempt_at(now, attempt_count + 1);

    conn.execute(
        "UPDATE dirty_sources
         SET attempt_count = attempt_count + 1,
             last_attempt_at = ?,
             last_error = ?,
             next_attempt_at = ?
         WHERE source_type = ? AND source_id = ?",
        rusqlite::params![now, error, next_attempt_at, source_type.as_str(), source_id],
    )?;
    Ok(())
}

/// Get existing content hash for a document, if it exists.
///
/// IMPORTANT: Uses `optional()` to distinguish between:
/// - No row found -> Ok(None)
/// - Row found -> Ok(Some(hash))
/// - DB error -> Err(...)
///
/// Using `.ok()` would hide real DB errors (disk I/O, corruption, etc.)
/// which should propagate up for proper error handling.
fn get_existing_hash(
    conn: &Connection,
    source_type: SourceType,
    source_id: i64,
) -> Result<Option<String>> {
    use rusqlite::OptionalExtension;

    let mut stmt = conn.prepare(
        "SELECT content_hash FROM documents WHERE source_type = ? AND source_id = ?"
    )?;

    let hash: Option<String> = stmt
        .query_row(rusqlite::params![source_type.as_str(), source_id], |row| row.get(0))
        .optional()?;

    Ok(hash)
}

fn upsert_document(conn: &Connection, doc: &DocumentData) -> Result<()> {
    use rusqlite::OptionalExtension;

    // Check existing hashes before upserting (for write optimization)
    let existing: Option<(i64, String, String, String)> = conn
        .query_row(
            "SELECT id, content_hash, labels_hash, paths_hash FROM documents
             WHERE source_type = ? AND source_id = ?",
            rusqlite::params![doc.source_type.as_str(), doc.source_id],
            |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
        )
        .optional()?;

    // Fast path: skip ALL writes when nothing changed.
    // This prevents WAL churn on the "mark ALL upserted entities dirty" strategy,
    // where most entities will be unchanged on incremental syncs.
    if let Some((_, ref old_content_hash, ref old_labels_hash, ref old_paths_hash)) = existing {
        if old_content_hash == &doc.content_hash
            && old_labels_hash == &doc.labels_hash
            && old_paths_hash == &doc.paths_hash
        {
            return Ok(());
        }
    }

    // Upsert main document (includes labels_hash, paths_hash)
    conn.execute(
        "INSERT INTO documents
         (source_type, source_id, project_id, author_username, label_names,
          labels_hash, paths_hash,
          created_at, updated_at, url, title, content_text, content_hash,
          is_truncated, truncated_reason)
         VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
         ON CONFLICT(source_type, source_id) DO UPDATE SET
           author_username = excluded.author_username,
           label_names = excluded.label_names,
           labels_hash = excluded.labels_hash,
           paths_hash = excluded.paths_hash,
           updated_at = excluded.updated_at,
           url = excluded.url,
           title = excluded.title,
           content_text = excluded.content_text,
           content_hash = excluded.content_hash,
           is_truncated = excluded.is_truncated,
           truncated_reason = excluded.truncated_reason",
        rusqlite::params![
            doc.source_type.as_str(),
            doc.source_id,
            doc.project_id,
            doc.author_username,
            serde_json::to_string(&doc.labels)?,
            doc.labels_hash,
            doc.paths_hash,
            doc.created_at,
            doc.updated_at,
            doc.url,
            doc.title,
            doc.content_text,
            doc.content_hash,
            doc.is_truncated,
            doc.truncated_reason,
        ],
    )?;

    // Get document ID (either existing or newly inserted)
    let doc_id = match existing {
        Some((id, _, _, _)) => id,
        None => get_document_id(conn, doc.source_type, doc.source_id)?,
    };

    // Only update labels if hash changed (reduces write amplification)
    let labels_changed = match &existing {
        Some((_, _, old_hash, _)) => old_hash != &doc.labels_hash,
        None => true, // New document, must insert
    };
    if labels_changed {
        conn.execute(
            "DELETE FROM document_labels WHERE document_id = ?",
            [doc_id],
        )?;
        for label in &doc.labels {
            conn.execute(
                "INSERT INTO document_labels (document_id, label_name) VALUES (?, ?)",
                rusqlite::params![doc_id, label],
            )?;
        }
    }

    // Only update paths if hash changed (reduces write amplification)
    let paths_changed = match &existing {
        Some((_, _, _, old_hash)) => old_hash != &doc.paths_hash,
        None => true, // New document, must insert
    };
    if paths_changed {
        conn.execute(
            "DELETE FROM document_paths WHERE document_id = ?",
            [doc_id],
        )?;
        for path in &doc.paths {
            conn.execute(
                "INSERT INTO document_paths (document_id, path) VALUES (?, ?)",
                rusqlite::params![doc_id, path],
            )?;
        }
    }

    Ok(())
}

fn get_document_id(
    conn: &Connection,
    source_type: SourceType,
    source_id: i64,
) -> Result<i64> {
    let id: i64 = conn.query_row(
        "SELECT id FROM documents WHERE source_type = ? AND source_id = ?",
        rusqlite::params![source_type.as_str(), source_id],
        |row| row.get(0),
    )?;
    Ok(id)
}

Acceptance Criteria:

  • Dirty sources get documents regenerated
  • Queue fully drained via loop (not single-pass ceiling)
  • Each dirty item processed in its own transaction (crash-safe labels/paths writes)
  • Triple-hash fast path (content_hash + labels_hash + paths_hash) skips all writes when unchanged
  • Hash comparison prevents unnecessary updates when only some hashes changed
  • FTS triggers fire on document update
  • Queue cleared after processing (inside same transaction as document write)

6.4 CLI: lore sync

File: src/cli/commands/sync.rs

//! Sync command - orchestrate full sync pipeline.

use serde::Serialize;

use crate::core::error::Result;
use crate::Config;

/// Sync result summary.
#[derive(Debug, Serialize)]
pub struct SyncResult {
    pub issues_updated: usize,
    pub mrs_updated: usize,
    pub discussions_fetched: usize,
    pub documents_regenerated: usize,
    pub documents_embedded: usize,
}

/// Sync options.
#[derive(Debug, Default)]
pub struct SyncOptions {
    pub full: bool,       // Reset cursors, fetch everything
    pub force: bool,      // Override stale lock
    pub no_embed: bool,   // Skip embedding step
    pub no_docs: bool,    // Skip document regeneration
}

/// Run sync orchestration — the unified pipeline command.
///
/// `lore sync` is the primary command for keeping the search index up to date.
/// It wraps the existing ingestion orchestrator and adds document generation
/// and embedding as post-ingestion steps. Replaces `lore ingest` as the
/// recommended user-facing command for all data flow.
///
/// Individual commands (`lore generate-docs`, `lore embed`) still exist for
/// manual recovery and debugging use cases.
///
/// Steps (sequential):
/// 1. Acquire app lock with heartbeat (via existing `src/core/lock.rs`)
/// 2. Ingest delta: fetch issues + MRs via cursor-based sync
///    (calls existing ingestion orchestrator in `src/ingestion/orchestrator.rs`)
///    - Each upserted entity marked dirty via mark_dirty_tx(&tx) inside ingestion transaction
/// 3. Process pending_discussion_fetches queue (bounded)
///    - Discussion sweep uses CTE to capture stale IDs, then cascading deletes
/// 4. Regenerate documents from dirty_sources queue
///    (unless --no-docs)
/// 5. Embed documents with changed content_hash
///    (unless --no-embed; skipped gracefully if Ollama unavailable)
/// 6. Release lock, record sync_run
///
/// NOTE: Rolling backfill window removed — the existing cursor + watermark
/// design handles old issues with resumed activity. GitLab updates
/// `updated_at` when new comments are added, so the cursor naturally
/// picks up old issues that receive new activity.
pub async fn run_sync(config: &Config, options: SyncOptions) -> Result<SyncResult> {
    // Implementation wraps existing ingestion orchestrator
    // and chains document generation + embedding pipelines
    todo!()
}

/// Print human-readable sync output.
pub fn print_sync(result: &SyncResult, elapsed_secs: u64) {
    println!("Sync complete:");
    println!("  Issues updated:        {:>6}", result.issues_updated);
    println!("  MRs updated:           {:>6}", result.mrs_updated);
    println!("  Discussions fetched:   {:>6}", result.discussions_fetched);
    println!("  Documents regenerated: {:>6}", result.documents_regenerated);
    println!("  Documents embedded:    {:>6}", result.documents_embedded);
    println!("  Elapsed: {}m {}s", elapsed_secs / 60, elapsed_secs % 60);
}

/// Print JSON output for robot mode.
pub fn print_sync_json(result: &SyncResult, elapsed_ms: u64) {
    let output = serde_json::json!({
        "ok": true,
        "data": result,
        "meta": {
            "elapsed_ms": elapsed_ms
        }
    });
    println!("{}", serde_json::to_string_pretty(&output).unwrap());
}

CLI integration:

/// Sync subcommand arguments.
#[derive(Args)]
pub struct SyncArgs {
    /// Reset cursors, fetch everything
    #[arg(long)]
    full: bool,

    /// Override stale lock
    #[arg(long)]
    force: bool,

    /// Skip embedding step
    #[arg(long)]
    no_embed: bool,

    /// Skip document regeneration
    #[arg(long)]
    no_docs: bool,
}

Acceptance Criteria:

  • Orchestrates full sync pipeline
  • Respects app lock
  • --full resets cursors
  • --no-embed skips embedding
  • --no-docs skips document regeneration
  • Progress reporting in human mode
  • JSON summary in robot mode

Testing Strategy

Unit Tests

Module Test File Coverage
Document extractor src/documents/extractor.rs (mod tests) Issue/MR/discussion extraction, consistent headers
Truncation src/documents/truncation.rs (mod tests) All edge cases
RRF ranking src/search/rrf.rs (mod tests) Score computation, merging
Content hash src/documents/extractor.rs (mod tests) Deterministic hashing
FTS query sanitization src/search/fts.rs (mod tests) to_fts_query() edge cases: -, ", :, *, C++
SourceType parsing src/documents/extractor.rs (mod tests) parse() accepts aliases: mr, mrs, issue, etc.
SearchFilters src/search/filters.rs (mod tests) has_any_filter(), clamp_limit()
Backoff logic src/core/backoff.rs (mod tests) Shared exponential backoff curve, cap, jitter
Hydration src/cli/commands/search.rs (mod tests) Single round-trip, label/path aggregation

Integration Tests

Feature Test File Coverage
FTS search tests/fts_search.rs Stemming, empty results
Embedding storage tests/embedding.rs sqlite-vec operations
Hybrid search tests/hybrid_search.rs Combined retrieval
Sync orchestration tests/sync.rs Full pipeline

Golden Query Suite

File: tests/fixtures/golden_queries.json

[
  {
    "query": "authentication redesign",
    "expected_urls": [".../-/issues/234", ".../-/merge_requests/847"],
    "min_results": 1,
    "max_rank": 10
  }
]

Each query must have at least one expected URL in top 10 results.

Test fixture note: Golden query tests require a seeded SQLite database with known documents, FTS entries, and pre-computed embeddings. The test setup should insert deterministic fixture data (not call Ollama) so golden tests run in CI without external dependencies. Use fixed embedding vectors that produce known similarity distances.


CLI Smoke Tests

Command Expected Pass Criteria
lore generate-docs Progress, count Completes, count > 0
lore generate-docs (re-run) 0 regenerated Hash comparison works
lore embed Progress, count Completes, count matches docs
lore embed (re-run) 0 embedded Skips unchanged
lore embed --retry-failed Processes failed Only failed docs processed
lore stats Coverage stats Shows 100% after embed
lore stats Queue depths Shows dirty_sources and pending_discussion_fetches counts
lore search "auth" --mode=lexical Results Works without Ollama
lore search "auth" Hybrid results Vector + FTS combined
lore search "auth" (Ollama down) FTS results + warning Graceful degradation, warning in response
lore search "auth" --explain Rank breakdown Shows vector/FTS/RRF
lore search "auth" --type=mr Filtered results Only MRs
lore search "auth" --type=mrs Filtered results Alias works
lore search "auth" --label=bug Filtered results Only labeled docs
lore search "-DWITH_SSL" Results Leading dash doesn't cause FTS error
lore search 'C++' Results Special chars in query work
lore search "auth" --updated-after 2024-01-01 Filtered results Only recently updated docs
lore search "nonexistent123" No results Graceful empty state
lore search "auth" --mode=semantic (no embeddings) Actionable error Tells user to run lore embed first
lore sync Full pipeline All steps complete
lore sync --no-embed Skip embedding Docs generated, not embedded
lore generate-docs --full Progress, count Keyset pagination completes without OFFSET degradation
lore stats --check --repair Repair results FTS rebuilt, orphans cleaned

Data Integrity Checks

  • documents count = issues + MRs + discussions
  • documents_fts count = documents count
  • embedding_metadata has at least one row per document (after full embed)
  • All embeddings.rowid / 1000 map to valid documents.id (SQLite integer division is floor-correct for integer operands)
  • embedding_metadata.document_hash = documents.content_hash for all chunk_index=0 rows
  • All document_labels reference valid documents
  • All document_paths reference valid documents
  • No orphaned embeddings (embeddings.rowid / 1000 without matching documents.id)
  • Discussion documents exclude system notes
  • Discussion documents include parent title
  • All dirty_sources entries reference existing source entities
  • All pending_discussion_fetches entries reference existing projects
  • attempt_count >= 0 for all queue entries (never negative)
  • last_attempt_at is NULL when attempt_count = 0

Success Criteria

Checkpoint 3 is complete when all three gates pass:

Prerequisite: GiErrorLoreError rename completed across codebase (16 files).

Gate A: Lexical MVP (no sqlite-vec dependency)

  1. Lexical search works without Ollama

    • lore search "query" --mode=lexical returns relevant results
    • All filters functional (including --updated-after via time::parse_since())
    • FTS5 syntax errors prevented by query sanitization
    • Special characters in queries work correctly (-DWITH_SSL, C++)
    • Search results hydrated in single DB round-trip (no N+1)
    • Project filter uses cascading resolution (exact → case-insensitive → suffix → error with suggestions)
  2. Document generation is correct

    • Full and incremental modes use the same regenerator codepath
    • --full uses set-based INSERT...SELECT with keyset pagination (no OFFSET degradation, no N INSERTs)
    • FTS triggers use COALESCE for NULL-safe operation
    • Extraction queries pull all relevant metadata (labels, DiffNote old+new paths, discussion URLs)
    • Discussion URLs reconstructed from parent web_url + #note_{first_note.gitlab_id}
    • Hard safety cap (2MB) prevents pathological content from bloating DB
    • Triple-hash fast path skips all writes when content, labels, and paths are unchanged

Gate B: Hybrid MVP

  1. Semantic search works with Ollama

    • Gate B introduces sqlite-vec extension and migration 009
    • lore embed completes successfully
    • Chunked embeddings: long documents split at paragraph boundaries with overlap
    • lore search "query" returns semantically relevant results
    • --explain shows ranking breakdown
    • --mode=semantic with 0% embedding coverage returns LoreError::EmbeddingsNotBuilt (distinct from OllamaUnavailable — tells user to run lore embed first)
  2. Hybrid search combines both

    • Documents appearing in both retrievers rank higher
    • Vector search over-fetches 3x and deduplicates by document_id (best chunk per doc)
    • Graceful degradation when Ollama unavailable (falls back to FTS)
    • Transient embed failures don't fail the entire search
    • Warning message included in response on degradation
    • Embedding pipeline uses keyset pagination for consistent paging

Gate C: Sync MVP

  1. Incremental sync is efficient

    • lore sync is the unified command (replaces lore ingest + lore generate-docs + lore embed)
    • Individual commands still exist for recovery/debugging
    • Re-embedding only happens for changed documents (content_hash comparison)
    • Progress visible during long syncs (via indicatif)
    • Queue backoff actually prevents hot-loop retries (both queues set next_attempt_at)
    • Shared backoff utility ensures consistent behavior across queues
    • Dirty source marking uses mark_dirty_tx(&tx, ...) inside ingestion transactions (enforced at API level)
    • Re-queuing resets backoff state (upsert, not INSERT OR IGNORE) so fresh updates aren't stuck behind stale backoff
    • Both queues (dirty_sources, pending_discussion_fetches) use ON CONFLICT DO UPDATE consistently
    • Queue draining is complete (bounded batch loop, not single-pass ceiling)
    • Discussion sweep uses CTE to capture stale IDs before cascading deletes
  2. Data integrity maintained

    • All counts match between tables
    • No orphaned records (embedding orphan check uses rowid / 1000 for chunked scheme — SQLite integer division is floor-correct)
    • Hashes consistent (embedding_metadata.document_hash matches documents.content_hash)
    • get_existing_hash() properly distinguishes "not found" from DB errors
    • --repair uses FTS rebuild for correct-by-construction repair
    • --repair orphan deletion uses range-based cleanup for chunked embeddings
    • --repair stale detection uses document_hash (not chunk_hash) for document-level staleness
    • Per-item transactions prevent partial label/path writes on crash
    • Triple-hash fast path prevents WAL churn from unchanged documents
  3. Observability

    • lore stats shows queue depths and failed item counts
    • lore stats embedding coverage is document-level (not chunk-level) with total chunk count
    • Failed items visible for operator intervention
    • Deterministic ordering ensures consistent paging
  4. Tests pass

    • Unit tests for core algorithms (including FTS sanitization, shared backoff, hydration)
    • Integration tests for pipelines
    • Golden queries return expected results (seeded DB with fixed embedding vectors, no Ollama dependency)