- README.md: Add hybrid search and robot mode to feature list. Update quick start to use new noun-first CLI syntax (lore issues, lore mrs, lore search). Add embedding configuration section. Update command examples throughout. - AGENTS.md: Update robot mode examples to new CLI syntax. Add search, sync, stats, and generate-docs commands to the robot mode reference. Update flag conventions (-n for limit, -s for state, -J for JSON). - docs/prd/checkpoint-3.md: Major expansion with gated milestone structure (Gate A: lexical, Gate B: hybrid, Gate C: sync). Add prerequisite rename note, code sample conventions, chunking strategy details, and sqlite-vec rowid encoding scheme. Clarify that Gate A requires only SQLite + FTS5 with no sqlite-vec dependency. - docs/phase-a-spec.md: New detailed specification for Gate A (lexical search MVP) covering document schema, FTS5 configuration, dirty queue mechanics, CLI interface, and acceptance criteria. - docs/api-efficiency-findings.md: Analysis of GitLab API pagination behavior and efficiency observations from production sync runs. Documents the missing x-next-page header issue and heuristic fix. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
142 KiB
Checkpoint 3: Search & Sync MVP
Status: Planning Prerequisite: Checkpoints 0, 1, 2 complete (issues, MRs, discussions ingested) Goal: Deliver working semantic + lexical hybrid search with efficient incremental sync
This checkpoint consolidates SPEC.md checkpoints 3A, 3B, 4, and 5 into a unified implementation plan. The work is structured for parallel agent execution where dependencies allow.
All code integrates with existing gitlore infrastructure:
- Error handling via
LoreErrorandErrorCodeinsrc/core/error.rs - CLI patterns matching
src/cli/commands/*.rs(run functions, JSON/human output) - Database via
rusqlite::Connectionwith migrations inmigrations/ - Database connections via
create_connection(db_path)insrc/core/db.rs - Config via
src/core/config.rs(EmbeddingConfig already defined) - Robot mode JSON with
{"ok": true, "data": {...}}pattern
Code Sample Convention: Rust code samples in this PRD omit use imports for brevity.
Implementers should add the appropriate imports (use crate::..., use rusqlite::..., etc.)
based on the types and functions referenced in each sample.
Prerequisite Rename: Before or during Gate A, rename GiError → LoreError across the
codebase (16 files). Update src/core/error.rs enum name, src/core/mod.rs re-export,
src/lib.rs re-export, and all use statements. This is a mechanical find-and-replace.
Executive Summary (Gated Milestones)
This checkpoint ships in three gates to reduce integration risk. Each gate is independently verifiable and shippable:
Gate A (Lexical MVP): documents + FTS + filters + lore search --mode=lexical + lore stats (no sqlite-vec dependency)
Gate B (Hybrid MVP): sqlite-vec extension + embeddings + vector + RRF fusion + graceful degradation
Gate C (Sync MVP): lore sync orchestration + queues/backoff + integrity check/repair
Deliverables:
Gate A
- Document generation from issues/MRs/discussions with FTS5 indexing
- Lexical search + filters + snippets +
lore stats
Gate B 3. Ollama-powered embedding pipeline with sqlite-vec storage 4. Hybrid search (RRF-ranked vector + lexical) with rich filtering + graceful degradation
Gate C
5. Orchestrated lore sync command with incremental doc regen + re-embedding
6. Integrity checks + repair paths for FTS/embeddings consistency
Key Design Decisions:
- Documents are the search unit (not raw entities)
- FTS5 works standalone when Ollama unavailable (graceful degradation)
- Gate A requires only SQLite + FTS5; sqlite-vec is introduced in Gate B (reduces env risk)
- Documents store full untruncated text with a hard safety cap (2MB) for pathological outliers
- Embedding pipeline chunks long documents at embed time (overlap-aware paragraph splitting)
- sqlite-vec
rowid = doc_id * 1000 + chunk_indexencodes chunks; vector search deduplicates by doc_id - RRF ranking avoids score normalization complexity
- Queue-based discussion fetching isolates failures
- FTS5 query sanitization prevents syntax errors from user input
- Exponential backoff on all queues prevents hot-loop retries
- Transient embed failures trigger graceful degradation (not hard errors)
lore syncis the unified command (ingest + generate-docs + embed); individual commands exist for recovery- Dirty source tracking: mark ALL upserted entities inside ingestion transactions via
mark_dirty_tx(&Transaction) - Queue draining:
generate-docsandsyncdrain queues completely using bounded batch loops (not single-pass ceilings) - Discussion sweep uses CTE to capture stale IDs before cascading deletes
- Unchanged documents are fully skipped (content_hash + labels_hash + paths_hash triple-check)
Phase 1: Schema Foundation
1.1 Documents Schema (Migration 007)
File: migrations/007_documents.sql
-- Unified searchable documents (derived from issues/MRs/discussions)
CREATE TABLE documents (
id INTEGER PRIMARY KEY,
source_type TEXT NOT NULL CHECK (source_type IN ('issue','merge_request','discussion')),
source_id INTEGER NOT NULL, -- local DB id in the source table
project_id INTEGER NOT NULL REFERENCES projects(id),
author_username TEXT, -- for discussions: first note author
label_names TEXT, -- JSON array (display/debug only)
created_at INTEGER, -- ms epoch UTC
updated_at INTEGER, -- ms epoch UTC
url TEXT,
title TEXT, -- null for discussions
content_text TEXT NOT NULL, -- canonical text for embedding/search
content_hash TEXT NOT NULL, -- SHA-256 for change detection
labels_hash TEXT NOT NULL DEFAULT '', -- SHA-256 over sorted labels (write optimization)
paths_hash TEXT NOT NULL DEFAULT '', -- SHA-256 over sorted paths (write optimization)
is_truncated INTEGER NOT NULL DEFAULT 0,
truncated_reason TEXT CHECK (
truncated_reason IN (
'token_limit_middle_drop','single_note_oversized','first_last_oversized',
'hard_cap_oversized'
)
OR truncated_reason IS NULL
),
UNIQUE(source_type, source_id)
);
CREATE INDEX idx_documents_project_updated ON documents(project_id, updated_at);
CREATE INDEX idx_documents_author ON documents(author_username);
CREATE INDEX idx_documents_source ON documents(source_type, source_id);
CREATE INDEX idx_documents_hash ON documents(content_hash);
-- Fast label filtering (indexed exact-match)
CREATE TABLE document_labels (
document_id INTEGER NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
label_name TEXT NOT NULL,
PRIMARY KEY(document_id, label_name)
) WITHOUT ROWID;
CREATE INDEX idx_document_labels_label ON document_labels(label_name);
-- Fast path filtering (DiffNote file paths)
CREATE TABLE document_paths (
document_id INTEGER NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
path TEXT NOT NULL,
PRIMARY KEY(document_id, path)
) WITHOUT ROWID;
CREATE INDEX idx_document_paths_path ON document_paths(path);
-- Queue for incremental document regeneration (with retry tracking)
-- Uses next_attempt_at for index-friendly backoff queries
CREATE TABLE dirty_sources (
source_type TEXT NOT NULL CHECK (source_type IN ('issue','merge_request','discussion')),
source_id INTEGER NOT NULL,
queued_at INTEGER NOT NULL, -- ms epoch UTC
attempt_count INTEGER NOT NULL DEFAULT 0,
last_attempt_at INTEGER,
last_error TEXT,
next_attempt_at INTEGER, -- ms epoch UTC; NULL means ready immediately
PRIMARY KEY(source_type, source_id)
);
CREATE INDEX idx_dirty_sources_next_attempt ON dirty_sources(next_attempt_at);
-- Resumable queue for dependent discussion fetching
-- Uses next_attempt_at for index-friendly backoff queries
CREATE TABLE pending_discussion_fetches (
project_id INTEGER NOT NULL REFERENCES projects(id),
noteable_type TEXT NOT NULL, -- 'Issue' | 'MergeRequest'
noteable_iid INTEGER NOT NULL,
queued_at INTEGER NOT NULL, -- ms epoch UTC
attempt_count INTEGER NOT NULL DEFAULT 0,
last_attempt_at INTEGER,
last_error TEXT,
next_attempt_at INTEGER, -- ms epoch UTC; NULL means ready immediately
PRIMARY KEY(project_id, noteable_type, noteable_iid)
);
CREATE INDEX idx_pending_discussions_next_attempt ON pending_discussion_fetches(next_attempt_at);
Acceptance Criteria:
- Migration applies cleanly on fresh DB
- Migration applies cleanly after CP2 schema
- All foreign keys enforced
- Indexes created
labels_hashandpaths_hashcolumns present for write optimizationnext_attempt_atindexed for efficient backoff queries
1.2 FTS5 Index (Migration 008)
File: migrations/008_fts5.sql
-- Full-text search with porter stemmer and prefix indexes for type-ahead
CREATE VIRTUAL TABLE documents_fts USING fts5(
title,
content_text,
content='documents',
content_rowid='id',
tokenize='porter unicode61',
prefix='2 3 4'
);
-- Keep FTS in sync via triggers.
-- IMPORTANT: COALESCE(title, '') ensures FTS5 external-content table never
-- receives NULL values, which can cause inconsistencies with delete operations.
-- FTS5 delete requires exact match of original values; NULL != NULL in SQL,
-- so a NULL title on insert would make the delete trigger fail silently.
CREATE TRIGGER documents_ai AFTER INSERT ON documents BEGIN
INSERT INTO documents_fts(rowid, title, content_text)
VALUES (new.id, COALESCE(new.title, ''), new.content_text);
END;
CREATE TRIGGER documents_ad AFTER DELETE ON documents BEGIN
INSERT INTO documents_fts(documents_fts, rowid, title, content_text)
VALUES('delete', old.id, COALESCE(old.title, ''), old.content_text);
END;
-- Only rebuild FTS when searchable text actually changes (not metadata-only updates)
CREATE TRIGGER documents_au AFTER UPDATE ON documents
WHEN old.title IS NOT new.title OR old.content_text != new.content_text
BEGIN
INSERT INTO documents_fts(documents_fts, rowid, title, content_text)
VALUES('delete', old.id, COALESCE(old.title, ''), old.content_text);
INSERT INTO documents_fts(rowid, title, content_text)
VALUES (new.id, COALESCE(new.title, ''), new.content_text);
END;
Acceptance Criteria:
documents_ftscreated as virtual table- Triggers fire on insert/update/delete
- Update trigger only fires when title or content_text changes (not metadata-only updates)
- FTS row count matches documents count after bulk insert
- Prefix search works for type-ahead UX
1.3 Embeddings Schema (Migration 009) — Gate B Only
File: migrations/009_embeddings.sql
Gate Boundary: This migration is intentionally part of Gate B. Gate A ships with migrations 007 + 008 only, so it has zero dependency on sqlite-vec being present or loadable. The migration runner must load sqlite-vec before applying this migration, but Gate A can run without it.
-- NOTE: sqlite-vec vec0 virtual tables cannot participate in FK cascades.
-- We must use an explicit trigger to delete orphan embeddings when documents
-- are deleted. See documents_embeddings_ad trigger below.
-- sqlite-vec virtual table for vector search
-- Storage rule: embeddings.rowid = document_id * 1000 + chunk_index
-- This encodes (document_id, chunk_index) into a single integer rowid.
-- Supports up to 1000 chunks per document (32M chars at 32k/chunk).
CREATE VIRTUAL TABLE embeddings USING vec0(
embedding float[768]
);
-- Embedding provenance + change detection (one row per chunk)
-- NOTE: Two hash columns serve different purposes:
-- document_hash: SHA-256 of full documents.content_text (staleness detection)
-- chunk_hash: SHA-256 of this individual chunk's text (debug/provenance)
-- Pending detection uses document_hash (not chunk_hash) because staleness is
-- a document-level condition: if the document changed, ALL chunks need re-embedding.
CREATE TABLE embedding_metadata (
document_id INTEGER NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL DEFAULT 0, -- 0-indexed position within document
model TEXT NOT NULL, -- 'nomic-embed-text'
dims INTEGER NOT NULL, -- 768
document_hash TEXT NOT NULL, -- SHA-256 of full documents.content_text (staleness)
chunk_hash TEXT NOT NULL, -- SHA-256 of this chunk's text (provenance)
created_at INTEGER NOT NULL, -- ms epoch UTC
last_error TEXT, -- error message from last failed attempt
attempt_count INTEGER NOT NULL DEFAULT 0,
last_attempt_at INTEGER, -- ms epoch UTC
PRIMARY KEY(document_id, chunk_index)
);
CREATE INDEX idx_embedding_metadata_errors
ON embedding_metadata(last_error) WHERE last_error IS NOT NULL;
CREATE INDEX idx_embedding_metadata_doc ON embedding_metadata(document_id);
-- CRITICAL: Delete ALL chunk embeddings when a document is deleted.
-- vec0 virtual tables don't support FK ON DELETE CASCADE, so we need this trigger.
-- embedding_metadata has ON DELETE CASCADE, so only vec0 needs explicit cleanup.
-- Range: [document_id * 1000, document_id * 1000 + 999]
CREATE TRIGGER documents_embeddings_ad AFTER DELETE ON documents BEGIN
DELETE FROM embeddings
WHERE rowid >= old.id * 1000
AND rowid < (old.id + 1) * 1000;
END;
Chunking Constants:
| Constant | Value | Rationale |
|---|---|---|
CHUNK_MAX_CHARS |
32,000 | ~8k tokens at 4 chars/token, fits nomic-embed-text context window |
CHUNK_OVERLAP_CHARS |
500 | ~125 tokens overlap preserves context at chunk boundaries |
CHUNK_ROWID_MULTIPLIER |
1,000 | Encodes (doc_id, chunk_index) into single rowid; supports up to 1000 chunks/doc |
Acceptance Criteria:
embeddingsvec0 table createdembedding_metadatatracks provenance per chunk (composite PK: document_id, chunk_index)- Error tracking fields present for retry logic
- Orphan cleanup trigger deletes ALL chunks (range deletion) on document deletion
- Documents under 32k chars produce exactly 1 chunk (chunk_index=0)
- Long documents split at paragraph boundaries with overlap
Dependencies:
- Requires sqlite-vec extension loaded at runtime
- Extension loading already happens in
src/core/db.rs - Migration runner must load sqlite-vec before applying migrations (including on fresh DB)
- Add
randcrate toCargo.toml(used by backoff jitter insrc/core/backoff.rs)
Phase 2: Document Generation
2.1 Document Module Structure
New module: src/documents/
src/documents/
├── mod.rs # Module exports
├── extractor.rs # Document extraction from entities
├── truncation.rs # Note-boundary aware truncation
└── regenerator.rs # Dirty source processing
File: src/documents/mod.rs
//! Document generation and management.
//!
//! Extracts searchable documents from issues, MRs, and discussions.
mod extractor;
mod regenerator;
mod truncation;
pub use extractor::{
extract_discussion_document, extract_issue_document, extract_mr_document,
DocumentData, SourceType,
};
// Note: extract_*_document() return Result<Option<DocumentData>>
// None means the source entity was deleted from the database
pub use regenerator::regenerate_dirty_documents;
pub use truncation::{truncate_content, TruncationResult};
Update src/lib.rs:
pub mod documents; // Add to existing modules
2.2 Document Types
File: src/documents/extractor.rs
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
/// Source type for documents.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SourceType {
Issue,
MergeRequest,
Discussion,
}
impl SourceType {
pub fn as_str(&self) -> &'static str {
match self {
Self::Issue => "issue",
Self::MergeRequest => "merge_request",
Self::Discussion => "discussion",
}
}
/// Parse from CLI input, accepting common aliases.
///
/// Accepts: "issue", "mr", "merge_request", "discussion"
pub fn parse(s: &str) -> Option<Self> {
match s.to_lowercase().as_str() {
"issue" | "issues" => Some(Self::Issue),
"mr" | "mrs" | "merge_request" | "merge_requests" => Some(Self::MergeRequest),
"discussion" | "discussions" => Some(Self::Discussion),
_ => None,
}
}
}
impl std::fmt::Display for SourceType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_str())
}
}
/// Generated document ready for storage.
#[derive(Debug, Clone)]
pub struct DocumentData {
pub source_type: SourceType,
pub source_id: i64,
pub project_id: i64,
pub author_username: Option<String>,
pub labels: Vec<String>,
pub paths: Vec<String>, // DiffNote file paths
pub labels_hash: String, // SHA-256 over sorted labels (write optimization)
pub paths_hash: String, // SHA-256 over sorted paths (write optimization)
pub created_at: i64,
pub updated_at: i64,
pub url: Option<String>,
pub title: Option<String>,
pub content_text: String,
pub content_hash: String,
pub is_truncated: bool,
pub truncated_reason: Option<String>,
}
/// Compute SHA-256 hash of content.
pub fn compute_content_hash(content: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(content.as_bytes());
format!("{:x}", hasher.finalize())
}
/// Compute SHA-256 hash over a sorted list of strings.
/// Used for labels_hash and paths_hash to detect changes efficiently.
pub fn compute_list_hash(items: &[String]) -> String {
let mut sorted = items.to_vec();
sorted.sort();
let joined = sorted.join("\n");
compute_content_hash(&joined)
}
Document Formats:
All document types use consistent header format for better search relevance and context:
| Source | content_text |
|---|---|
| Issue | Structured header + description (see below) |
| MR | Structured header + description (see below) |
| Discussion | Full thread with header (see below) |
Issue Document Format:
[[Issue]] #234: Authentication redesign
Project: group/project-one
URL: https://gitlab.example.com/group/project-one/-/issues/234
Labels: ["bug", "auth"]
State: opened
Author: @johndoe
--- Description ---
We need to modernize our authentication system...
MR Document Format:
[[MergeRequest]] !456: Implement JWT authentication
Project: group/project-one
URL: https://gitlab.example.com/group/project-one/-/merge_requests/456
Labels: ["feature", "auth"]
State: opened
Author: @johndoe
Source: feature/jwt-auth -> main
--- Description ---
This MR implements JWT-based authentication as discussed in #234...
Discussion Document Format:
[[Discussion]] Issue #234: Authentication redesign
Project: group/project-one
URL: https://gitlab.example.com/group/project-one/-/issues/234#note_12345
Labels: ["bug", "auth"]
Files: ["src/auth/login.ts"]
--- Thread ---
@johndoe (2024-03-15):
I think we should move to JWT-based auth...
@janedoe (2024-03-15):
Agreed. What about refresh token strategy?
Extraction Queries:
Each extract_*_document() function queries the existing ingestion tables and assembles a DocumentData.
Functions return Result<Option<DocumentData>> — None means the source entity was deleted.
Issue extraction (extract_issue_document):
-- Main entity
SELECT i.id, i.iid, i.title, i.description, i.state, i.author_username,
i.created_at, i.updated_at, i.web_url,
p.path_with_namespace, p.id AS project_id
FROM issues i
JOIN projects p ON p.id = i.project_id
WHERE i.id = ?
-- Labels (via junction table)
SELECT l.name
FROM issue_labels il
JOIN labels l ON l.id = il.label_id
WHERE il.issue_id = ?
ORDER BY l.name
MR extraction (extract_mr_document):
-- Main entity
SELECT m.id, m.iid, m.title, m.description, m.state, m.author_username,
m.source_branch, m.target_branch,
m.created_at, m.updated_at, m.web_url,
p.path_with_namespace, p.id AS project_id
FROM merge_requests m
JOIN projects p ON p.id = m.project_id
WHERE m.id = ?
-- Labels (via junction table)
SELECT l.name
FROM mr_labels ml
JOIN labels l ON l.id = ml.label_id
WHERE ml.merge_request_id = ?
ORDER BY l.name
Discussion extraction (extract_discussion_document):
-- Discussion with parent entity info
SELECT d.id, d.noteable_type, d.issue_id, d.merge_request_id,
p.path_with_namespace, p.id AS project_id
FROM discussions d
JOIN projects p ON p.id = d.project_id
WHERE d.id = ?
-- Parent issue (when noteable_type = 'Issue')
SELECT i.iid, i.title, i.web_url
FROM issues i WHERE i.id = ?
-- Parent MR (when noteable_type = 'MergeRequest')
SELECT m.iid, m.title, m.web_url
FROM merge_requests m WHERE m.id = ?
-- Parent labels (issue or MR labels depending on noteable_type)
-- Use issue_labels or mr_labels junction table accordingly
-- Non-system notes in thread order
SELECT n.author_username, n.body, n.created_at, n.gitlab_id,
n.note_type, n.position_old_path, n.position_new_path
FROM notes n
WHERE n.discussion_id = ? AND n.is_system = 0
ORDER BY n.created_at ASC, n.id ASC
Discussion URL construction:
Constructed from parent entity's web_url + #note_{first_note.gitlab_id}:
let url = match first_note_gitlab_id {
Some(gid) => Some(format!("{}#note_{}", parent_web_url, gid)),
None => Some(parent_web_url.to_string()),
};
DiffNote path extraction:
Collect both position_old_path and position_new_path from all non-system notes
in the discussion thread. Deduplicated automatically by document_paths PK:
let mut paths: Vec<String> = Vec::new();
for note in ¬es {
if let Some(ref old_path) = note.position_old_path {
paths.push(old_path.clone());
}
if let Some(ref new_path) = note.position_new_path {
paths.push(new_path.clone());
}
}
paths.sort();
paths.dedup();
Discussion author: First non-system note's author_username.
Acceptance Criteria:
- Issue document: structured header with
[[Issue]]prefix, project, URL, labels, state, author, then description - MR document: structured header with
[[MergeRequest]]prefix, project, URL, labels, state, author, branches, then description - Discussion document: includes parent type+title, project, URL, labels, files, then thread
- System notes (is_system=1) excluded from discussion content
- DiffNote file paths extracted to paths vector
- Labels extracted to labels vector
- SHA-256 hash computed from content_text
- Headers use consistent separator lines (
--- Description ---,--- Thread ---)
2.3 Truncation Logic
Scope: Truncation applies to:
- Discussion documents — note-boundary truncation at 32,000 chars (multi-note threads)
- All document types — a hard safety cap at 2,000,000 chars (~2MB) to prevent pathological outliers (pasted log files, vendor lockfiles, base64 blobs) from bloating the DB, causing OOM in embedding chunking, or degrading FTS indexing performance.
Normal issue/MR documents remain untruncated. The safety cap triggers only for extreme content sizes that indicate non-prose content. The embedding pipeline handles long (but non-pathological) documents via chunking at embed time (see Phase 4.4).
File: src/documents/truncation.rs
/// Maximum content length for discussion threads (~8,000 tokens at 4 chars/token estimate).
/// NOTE: This only applies to discussion documents. Issue/MR documents store full text
/// (subject to MAX_DOCUMENT_CHARS_HARD safety cap below).
/// The embedding pipeline chunks long documents at embed time (see pipeline.rs).
pub const MAX_DISCUSSION_CHARS: usize = 32_000;
/// Hard safety cap for ALL document types (~2MB).
/// Only triggers on pathological content (pasted logs, vendor lockfiles, base64 blobs).
/// Normal issue/MR documents are never this large. This prevents:
/// - DB bloat from single extreme documents
/// - OOM in embedding chunking pipeline
/// - FTS indexing degradation from megabyte-scale content
pub const MAX_DOCUMENT_CHARS_HARD: usize = 2_000_000;
/// Truncation result with metadata.
#[derive(Debug, Clone)]
pub struct TruncationResult {
pub content: String,
pub is_truncated: bool,
pub reason: Option<TruncationReason>,
}
/// Reason for truncation.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TruncationReason {
TokenLimitMiddleDrop,
SingleNoteOversized,
FirstLastOversized,
HardCapOversized, // Safety cap for pathological content (any doc type)
}
impl TruncationReason {
pub fn as_str(&self) -> &'static str {
match self {
Self::TokenLimitMiddleDrop => "token_limit_middle_drop",
Self::SingleNoteOversized => "single_note_oversized",
Self::FirstLastOversized => "first_last_oversized",
Self::HardCapOversized => "hard_cap_oversized",
}
}
}
/// Truncate discussion thread at note boundaries.
///
/// Rules:
/// - Max content: 32,000 characters
/// - Truncate at NOTE boundaries (never mid-note)
/// - Preserve first N notes and last M notes
/// - Drop from middle, insert marker
///
/// NOT used for issue/MR documents — those store full text.
pub fn truncate_discussion(notes: &[NoteContent], max_chars: usize) -> TruncationResult {
// Implementation handles edge cases per table below
todo!()
}
/// Note content for truncation.
pub struct NoteContent {
pub author: String,
pub date: String,
pub body: String,
}
Edge Cases:
| Scenario | Handling |
|---|---|
| Single note > 32000 chars | Truncate at UTF-8-safe char boundary via truncate_utf8(), append [truncated], reason = single_note_oversized |
| First + last note > 32000 | Keep only first note (truncated if needed), reason = first_last_oversized |
| Only one note | Truncate at char boundary if needed |
| Any doc type > 2,000,000 chars | Truncate at UTF-8-safe boundary, reason = hard_cap_oversized (applies to issues/MRs/discussions) |
Acceptance Criteria:
- Discussion note-boundary truncation works correctly
- Issue/MR documents store full text unless exceeding 2MB hard safety cap
- Hard cap truncation records
hard_cap_oversizedreason for provenance - First and last notes preserved when possible
- Truncation marker
\n\n[... N notes omitted for length ...]\n\ninserted - Metadata fields set correctly
- Edge cases handled per table above
- All byte-indexed truncation uses
truncate_utf8()to avoid panics on multi-byte codepoints
2.4 CLI: lore generate-docs (Incremental by Default)
File: src/cli/commands/generate_docs.rs
//! Generate documents command - create searchable documents from entities.
//!
//! By default, runs incrementally (processes only dirty_sources queue).
//! Use --full to regenerate all documents from scratch.
use rusqlite::Connection;
use serde::Serialize;
use crate::core::error::Result;
use crate::documents::{DocumentData, SourceType};
use crate::Config;
/// Result of document generation.
#[derive(Debug, Serialize)]
pub struct GenerateDocsResult {
pub issues: usize,
pub mrs: usize,
pub discussions: usize,
pub total: usize,
pub truncated: usize,
pub skipped: usize, // Unchanged documents
}
/// Chunk size for --full mode dirty queue seeding.
/// Balances throughput against WAL file growth and memory pressure.
const FULL_MODE_CHUNK_SIZE: usize = 2000;
/// Run document generation (incremental by default).
///
/// IMPORTANT: Both modes use the same regenerator codepath to avoid
/// logic divergence in label/path hashing, deletion semantics, and
/// write-optimization behavior. The only difference is how dirty_sources
/// gets populated.
///
/// Incremental mode (default):
/// - Processes only items already in dirty_sources queue
/// - Fast for routine syncs
///
/// Full mode (--full):
/// - Seeds dirty_sources with ALL source entities in chunks
/// - Drains through the same regenerator pipeline
/// - Uses keyset pagination (WHERE id > last_id) to avoid OFFSET degradation
/// - Final FTS optimize after all chunks complete
/// - Use when schema changes or after migration
pub fn run_generate_docs(
config: &Config,
full: bool,
project_filter: Option<&str>,
) -> Result<GenerateDocsResult> {
let conn = create_connection(&config.storage.db_path)?;
if full {
// Full mode: seed dirty_sources with all source entities, then drain.
// Uses keyset pagination to avoid O(n²) OFFSET degradation on large tables.
//
// Seeding is chunked to bound WAL growth:
// 1. For each source type (issues, MRs, discussions):
// a. Query next chunk WHERE id > last_id ORDER BY id LIMIT chunk_size
// b. INSERT OR IGNORE each into dirty_sources
// c. Advance last_id = chunk.last().id
// d. Loop until chunk is empty
// 2. Drain dirty_sources through regenerator (same as incremental)
// 3. Final FTS optimize (not full rebuild — triggers handle consistency)
//
// Benefits of unified codepath:
// - No divergence in label/path hash behavior
// - No divergence in deletion semantics
// - No divergence in write-optimization logic (labels_hash, paths_hash)
// - FTS triggers fire identically in both modes
// Seed issues using set-based INSERT...SELECT for throughput.
// Avoids N individual INSERT calls per chunk; single statement per page.
let mut last_id: i64 = 0;
loop {
let tx = conn.transaction()?;
let inserted = tx.execute(
"INSERT INTO dirty_sources
(source_type, source_id, queued_at, attempt_count, last_attempt_at, last_error, next_attempt_at)
SELECT 'issue', id, ?, 0, NULL, NULL, NULL
FROM issues
WHERE id > ?
ORDER BY id
LIMIT ?
ON CONFLICT(source_type, source_id) DO NOTHING",
rusqlite::params![crate::core::time::now_ms(), last_id, FULL_MODE_CHUNK_SIZE as i64],
)?;
// Advance keyset cursor
if inserted == 0 { tx.commit()?; break; }
last_id = tx.query_row(
"SELECT MAX(source_id) FROM dirty_sources WHERE source_type = 'issue' AND source_id > ?",
[last_id],
|row| row.get(0),
)?;
tx.commit()?;
}
// Similar set-based keyset-paginated seeding for MRs and discussions...
// Report: seeding complete, now regenerating
}
// Both modes: drain dirty_sources through the regenerator
let regen = regenerate_dirty_documents(&conn)?;
if full {
// FTS optimize after bulk operations (compacts index segments)
conn.execute(
"INSERT INTO documents_fts(documents_fts) VALUES('optimize')",
[],
)?;
}
// Map regen -> GenerateDocsResult stats
todo!()
}
/// Print human-readable output.
pub fn print_generate_docs(result: &GenerateDocsResult) {
println!("Document generation complete:");
println!(" Issues: {:>6} documents", result.issues);
println!(" MRs: {:>6} documents", result.mrs);
println!(" Discussions: {:>6} documents", result.discussions);
println!(" ─────────────────────");
println!(" Total: {:>6} documents", result.total);
if result.truncated > 0 {
println!(" Truncated: {:>6}", result.truncated);
}
if result.skipped > 0 {
println!(" Skipped: {:>6} (unchanged)", result.skipped);
}
}
/// Print JSON output for robot mode.
pub fn print_generate_docs_json(result: &GenerateDocsResult) {
let output = serde_json::json!({
"ok": true,
"data": result
});
println!("{}", serde_json::to_string_pretty(&output).unwrap());
}
CLI integration in src/cli/mod.rs:
/// Generate-docs subcommand arguments.
#[derive(Args)]
pub struct GenerateDocsArgs {
/// Regenerate ALL documents (not just dirty queue)
#[arg(long)]
full: bool,
/// Only generate for specific project
#[arg(long)]
project: Option<String>,
}
Acceptance Criteria:
- Creates document for each issue
- Creates document for each MR
- Creates document for each discussion
- Default mode processes dirty_sources queue only (incremental)
--fullregenerates all documents from scratch--fulluses chunked transactions (2k docs/tx) to bound WAL growth- Final FTS optimize after all chunks complete (rebuild reserved for --repair)
- Progress bar in human mode (via
indicatif) - JSON output in robot mode
Progress bar note: The existing codebase uses indicatif for progress bars and
tracing-indicatif to integrate progress bars with tracing log output. New commands
should follow the same pattern — create a ProgressBar with ProgressStyle::default_bar()
and use tracing for structured logging. See existing ingestion commands for reference.
Phase 3: Lexical Search
3.1 Search Module Structure
New module: src/search/
src/search/
├── mod.rs # Module exports
├── fts.rs # FTS5 search
├── vector.rs # Vector search (sqlite-vec)
├── hybrid.rs # Combined hybrid search
└── filters.rs # Filter parsing and application
File: src/search/mod.rs
//! Search functionality for documents.
//!
//! Supports lexical (FTS5), semantic (vector), and hybrid search.
mod filters;
mod fts;
mod hybrid;
mod rrf;
mod vector;
pub use filters::{SearchFilters, PathFilter, apply_filters};
pub use fts::{search_fts, to_fts_query, FtsResult, FtsQueryMode, generate_fallback_snippet, get_result_snippet};
pub use hybrid::{search_hybrid, HybridResult, SearchMode};
pub use rrf::{rank_rrf, RrfResult};
pub use vector::{search_vector, VectorResult};
3.2 FTS5 Search Function
File: src/search/fts.rs
use rusqlite::Connection;
use crate::core::error::Result;
/// FTS search result.
#[derive(Debug, Clone)]
pub struct FtsResult {
pub document_id: i64,
pub rank: f64, // BM25 score (lower = better match)
pub snippet: String, // Context snippet around match
}
/// Generate fallback snippet for semantic-only results.
///
/// When FTS snippets aren't available (semantic-only mode), this generates
/// a context snippet by truncating the document content. Useful for displaying
/// search results without FTS hits.
///
/// Args:
/// content_text: Full document content
/// max_chars: Maximum snippet length (default 200)
///
/// Returns a truncated string with ellipsis if truncated.
///
/// IMPORTANT: Uses `truncate_utf8()` to avoid panicking on multi-byte UTF-8
/// codepoint boundaries. Rust `&str` slicing is byte-indexed, so
/// `trimmed[..max_chars]` will panic if `max_chars` falls mid-codepoint.
pub fn generate_fallback_snippet(content_text: &str, max_chars: usize) -> String {
let trimmed = content_text.trim();
if trimmed.len() <= max_chars {
return trimmed.to_string();
}
// Truncate at a valid UTF-8 char boundary first
let safe = truncate_utf8(trimmed, max_chars);
// Then find word boundary to avoid cutting mid-word
let truncation_point = safe
.rfind(|c: char| c.is_whitespace())
.unwrap_or(safe.len());
format!("{}...", &safe[..truncation_point])
}
/// Truncate a string to at most `max_bytes` bytes on a valid UTF-8 char boundary.
///
/// Rust &str slicing panics if the index falls inside a multi-byte codepoint.
/// This helper walks backward from max_bytes to find the nearest char boundary.
/// Used by snippet generation and discussion truncation to prevent panics on
/// content containing emoji, CJK characters, or other multi-byte UTF-8 text.
fn truncate_utf8(s: &str, max_bytes: usize) -> &str {
if s.len() <= max_bytes {
return s;
}
let mut cut = max_bytes;
while cut > 0 && !s.is_char_boundary(cut) {
cut -= 1;
}
&s[..cut]
}
/// Get snippet for search result, preferring FTS when available.
///
/// Priority:
/// 1. FTS snippet (if document matched FTS query)
/// 2. Fallback: truncated content_text
pub fn get_result_snippet(
fts_snippet: Option<&str>,
content_text: &str,
) -> String {
match fts_snippet {
Some(snippet) if !snippet.is_empty() => snippet.to_string(),
_ => generate_fallback_snippet(content_text, 200),
}
}
/// FTS query parsing mode.
#[derive(Debug, Clone, Copy, Default)]
pub enum FtsQueryMode {
/// Safe parsing (default): escapes dangerous syntax but preserves
/// trailing `*` for obvious prefix queries (type-ahead UX).
#[default]
Safe,
/// Raw mode: passes user MATCH syntax through unchanged.
/// Use with caution - invalid syntax will cause FTS5 errors.
Raw,
}
/// Convert user query to FTS5-safe MATCH expression.
///
/// FTS5 MATCH syntax has special characters that cause errors if passed raw:
/// - `-` (NOT operator)
/// - `"` (phrase quotes)
/// - `:` (column filter)
/// - `*` (prefix)
/// - `AND`, `OR`, `NOT` (operators)
///
/// Strategy for Safe mode:
/// - Wrap each whitespace-delimited token in double quotes
/// - Escape internal quotes by doubling them
/// - PRESERVE trailing `*` for simple prefix queries (alphanumeric tokens)
/// - This forces FTS5 to treat tokens as literals while allowing type-ahead
///
/// Raw mode passes the query through unchanged for power users who want
/// full FTS5 syntax (phrase queries, column scopes, boolean operators).
///
/// Examples (Safe mode):
/// - "auth error" -> `"auth" "error"` (implicit AND)
/// - "auth*" -> `"auth"*` (prefix preserved!)
/// - "jwt_token*" -> `"jwt_token"*` (prefix preserved!)
/// - "C++" -> `"C++"` (special chars preserved, no prefix)
/// - "don't panic" -> `"don't" "panic"` (apostrophe preserved)
/// - "-DWITH_SSL" -> `"-DWITH_SSL"` (leading dash neutralized)
pub fn to_fts_query(raw: &str, mode: FtsQueryMode) -> String {
if matches!(mode, FtsQueryMode::Raw) {
return raw.trim().to_string();
}
raw.split_whitespace()
.map(|token| {
let t = token.trim();
if t.is_empty() {
return "\"\"".to_string();
}
// Detect simple prefix queries: alphanumeric/underscore followed by *
// e.g., "auth*", "jwt_token*", "user123*"
let is_prefix = t.ends_with('*')
&& t.len() > 1
&& t[..t.len() - 1]
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '_');
// Escape internal double quotes by doubling them
let escaped = t.replace('"', "\"\"");
if is_prefix {
// Strip trailing *, quote the core, then re-add *
let core = &escaped[..escaped.len() - 1];
format!("\"{}\"*", core)
} else {
format!("\"{}\"", escaped)
}
})
.collect::<Vec<_>>()
.join(" ")
}
/// Search documents using FTS5.
///
/// Returns matching document IDs with BM25 rank scores and snippets.
/// Lower rank values indicate better matches.
/// Uses bm25() explicitly (not the `rank` alias) and snippet() for context.
///
/// IMPORTANT: User input is sanitized via `to_fts_query()` to prevent
/// FTS5 syntax errors from special characters while preserving prefix search.
pub fn search_fts(
conn: &Connection,
query: &str,
limit: usize,
mode: FtsQueryMode,
) -> Result<Vec<FtsResult>> {
if query.trim().is_empty() {
return Ok(Vec::new());
}
let safe_query = to_fts_query(query, mode);
let mut stmt = conn.prepare(
"SELECT rowid,
bm25(documents_fts),
-- NOTE: Column index 1 = content_text (0=title, 1=content_text).
-- If FTS column order changes, this index must be updated.
snippet(documents_fts, 1, '<mark>', '</mark>', '...', 64)
FROM documents_fts
WHERE documents_fts MATCH ?
ORDER BY bm25(documents_fts)
LIMIT ?"
)?;
let results = stmt
.query_map(rusqlite::params![safe_query, limit as i64], |row| {
Ok(FtsResult {
document_id: row.get(0)?,
rank: row.get(1)?,
snippet: row.get(2)?,
})
})?
.collect::<std::result::Result<Vec<_>, _>>()?;
Ok(results)
}
Acceptance Criteria:
- Returns matching document IDs with BM25 rank
- Porter stemming works (search/searching match)
- Prefix search works (type-ahead UX):
auth*returns results starting with "auth" - Empty query returns empty results
- Nonsense query returns empty results
- Special characters in query don't cause FTS5 syntax errors (
-,",:,*) - Query
"-DWITH_SSL"returns results (not treated as NOT operator) - Query
C++returns results (special chars preserved) - Safe mode preserves trailing
*on alphanumeric tokens - Raw mode (
--fts-mode=raw) passes query through unchanged
3.3 Search Filters
File: src/search/filters.rs
use rusqlite::Connection;
use crate::core::error::Result;
use crate::documents::SourceType;
/// Maximum allowed limit for search results.
const MAX_SEARCH_LIMIT: usize = 100;
/// Default limit for search results.
const DEFAULT_SEARCH_LIMIT: usize = 20;
/// Search filters applied post-retrieval.
#[derive(Debug, Clone, Default)]
pub struct SearchFilters {
pub source_type: Option<SourceType>,
pub author: Option<String>,
pub project_id: Option<i64>,
pub after: Option<i64>, // ms epoch (created_at >=)
pub updated_after: Option<i64>, // ms epoch (updated_at >=)
pub labels: Vec<String>, // AND logic
pub path: Option<PathFilter>,
pub limit: usize, // Default 20, max 100
}
impl SearchFilters {
/// Check if any filter is set (used for adaptive recall).
pub fn has_any_filter(&self) -> bool {
self.source_type.is_some()
|| self.author.is_some()
|| self.project_id.is_some()
|| self.after.is_some()
|| self.updated_after.is_some()
|| !self.labels.is_empty()
|| self.path.is_some()
}
/// Clamp limit to valid range [1, MAX_SEARCH_LIMIT].
pub fn clamp_limit(&self) -> usize {
if self.limit == 0 {
DEFAULT_SEARCH_LIMIT
} else {
self.limit.min(MAX_SEARCH_LIMIT)
}
}
}
/// Path filter with prefix or exact match.
#[derive(Debug, Clone)]
pub enum PathFilter {
Prefix(String), // Trailing `/` -> LIKE 'path/%'
Exact(String), // No trailing `/` -> = 'path'
}
impl PathFilter {
pub fn from_str(s: &str) -> Self {
if s.ends_with('/') {
Self::Prefix(s.to_string())
} else {
Self::Exact(s.to_string())
}
}
}
/// Apply filters to document IDs, returning filtered set.
///
/// IMPORTANT: Preserves ranking order from input document_ids.
/// Filters must not reorder results - maintain the RRF/search ranking.
///
/// Uses JSON1 extension for efficient ordered ID passing:
/// - Passes document_ids as JSON array: `[1,2,3,...]`
/// - Uses `json_each()` to expand into rows with `key` as position
/// - JOINs with documents table and applies filters
/// - Orders by original position to preserve ranking
pub fn apply_filters(
conn: &Connection,
document_ids: &[i64],
filters: &SearchFilters,
) -> Result<Vec<i64>> {
if document_ids.is_empty() {
return Ok(Vec::new());
}
// Build JSON array of document IDs
let ids_json = serde_json::to_string(document_ids)?;
// Build dynamic WHERE clauses
let mut conditions: Vec<String> = Vec::new();
let mut params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
// Always bind the JSON array first
params.push(Box::new(ids_json));
if let Some(ref source_type) = filters.source_type {
conditions.push("d.source_type = ?".into());
params.push(Box::new(source_type.as_str().to_string()));
}
if let Some(ref author) = filters.author {
conditions.push("d.author_username = ?".into());
params.push(Box::new(author.clone()));
}
if let Some(project_id) = filters.project_id {
conditions.push("d.project_id = ?".into());
params.push(Box::new(project_id));
}
if let Some(after) = filters.after {
conditions.push("d.created_at >= ?".into());
params.push(Box::new(after));
}
if let Some(updated_after) = filters.updated_after {
conditions.push("d.updated_at >= ?".into());
params.push(Box::new(updated_after));
}
// Labels: AND logic - all labels must be present
for label in &filters.labels {
conditions.push(
"EXISTS (SELECT 1 FROM document_labels dl WHERE dl.document_id = d.id AND dl.label_name = ?)".into()
);
params.push(Box::new(label.clone()));
}
// Path filter
if let Some(ref path_filter) = filters.path {
match path_filter {
PathFilter::Exact(path) => {
conditions.push(
"EXISTS (SELECT 1 FROM document_paths dp WHERE dp.document_id = d.id AND dp.path = ?)".into()
);
params.push(Box::new(path.clone()));
}
PathFilter::Prefix(prefix) => {
// IMPORTANT: Must use ESCAPE clause for backslash escaping to work in SQLite LIKE
conditions.push(
"EXISTS (SELECT 1 FROM document_paths dp WHERE dp.document_id = d.id AND dp.path LIKE ? ESCAPE '\\')".into()
);
// Escape LIKE wildcards and add trailing %
let like_pattern = format!(
"{}%",
prefix.replace('%', "\\%").replace('_', "\\_")
);
params.push(Box::new(like_pattern));
}
}
}
let where_clause = if conditions.is_empty() {
String::new()
} else {
format!("AND {}", conditions.join(" AND "))
};
let limit = filters.clamp_limit();
// SQL using JSON1 for ordered ID passing
// json_each() returns rows with `key` (0-indexed position) and `value` (the ID)
let sql = format!(
r#"
SELECT d.id
FROM json_each(?) AS j
JOIN documents d ON d.id = j.value
WHERE 1=1 {}
ORDER BY j.key
LIMIT ?
"#,
where_clause
);
params.push(Box::new(limit as i64));
let mut stmt = conn.prepare(&sql)?;
let params_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
let results = stmt
.query_map(params_refs.as_slice(), |row| row.get(0))?
.collect::<std::result::Result<Vec<i64>, _>>()?;
Ok(results)
}
Supported filters:
| Filter | SQL Column | Notes |
|---|---|---|
--type |
source_type |
issue, mr, discussion |
--author |
author_username |
Exact match |
--project |
project_id |
Resolve path to ID (see resolution logic below) |
--after |
created_at |
>= date — parsed via time::parse_since() (accepts 7d, 2w, YYYY-MM-DD) |
--updated-after |
updated_at |
>= date — parsed via time::parse_since(), common triage filter |
--label |
document_labels |
JOIN, multiple = AND |
--path |
document_paths |
JOIN, trailing / = prefix |
--limit |
N/A | Default 20, max 100 |
Project Resolution Logic (--project):
The --project flag accepts a string and resolves it to a project_id using cascading match:
- Exact match on
projects.path_with_namespace(e.g.,group/project) - Case-insensitive exact match (e.g.,
Group/Projectmatchesgroup/project) - Suffix match (e.g.,
project-namematchesgroup/project-name) — only if unambiguous (1 result) - No match or ambiguous — return error listing available projects:
Error: Project 'auth-service' not found.
Available projects:
backend/auth-service
frontend/auth-service-ui
infra/auth-proxy
Hint: Use the full path, e.g., --project=backend/auth-service
Uses the existing LoreError::Ambiguous variant when multiple suffix matches are found.
Acceptance Criteria:
- Each filter correctly restricts results
- Multiple
--labelflags use AND logic - Path prefix vs exact match works correctly
--updated-afterfilters on updated_at (not created_at)--afterand--updated-afteraccept relative (7d,2w) and absolute (YYYY-MM-DD) formats viatime::parse_since()--projectresolves via exact → case-insensitive → suffix match--projectwith no match or ambiguous match shows available projects list- Filters compose (all applied together)
- Ranking order preserved after filtering (ORDER BY position)
- Limit clamped to valid range [1, 100]
- Default limit is 20 when not specified
- JSON1
json_each()correctly expands document IDs
3.4 CLI: lore search --mode=lexical
File: src/cli/commands/search.rs
//! Search command - find documents using lexical, semantic, or hybrid search.
use console::style;
use serde::Serialize;
use crate::core::error::Result;
use crate::core::time::ms_to_iso;
use crate::search::{SearchFilters, SearchMode, search_fts, search_vector, rank_rrf, RrfResult};
use crate::Config;
/// Search result for display.
#[derive(Debug, Serialize)]
pub struct SearchResultDisplay {
pub document_id: i64,
pub source_type: String,
pub title: Option<String>,
pub url: Option<String>,
pub project_path: String,
pub author: Option<String>,
pub created_at: String, // ISO format
pub updated_at: String, // ISO format
pub score: f64, // Normalized 0-1
pub snippet: String, // Context around match
pub labels: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub explain: Option<ExplainData>,
}
/// Ranking explanation for --explain flag.
#[derive(Debug, Serialize)]
pub struct ExplainData {
pub vector_rank: Option<usize>,
pub fts_rank: Option<usize>,
pub rrf_score: f64,
}
/// Search results response.
#[derive(Debug, Serialize)]
pub struct SearchResponse {
pub query: String,
pub mode: String,
pub total_results: usize,
pub results: Vec<SearchResultDisplay>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub warnings: Vec<String>,
}
/// Run search command.
pub fn run_search(
config: &Config,
query: &str,
mode: SearchMode,
filters: SearchFilters,
explain: bool,
) -> Result<SearchResponse> {
// 1. Parse query and filters
// 2. Execute search based on mode -> ranked doc_ids (+ explain ranks)
// 3. Apply post-retrieval filters preserving ranking order
// 4. HYDRATE in one DB round-trip (see hydration query below):
// - documents fields (title, url, created_at, updated_at, content_text)
// - project_path via JOIN projects
// - labels aggregated via json_group_array
// - paths aggregated via json_group_array (optional)
// 5. Attach snippet:
// - prefer FTS snippet when doc hit FTS
// - fallback: truncated content_text via generate_fallback_snippet()
// 6. For --mode=semantic with 0% embedding coverage:
// return Err(LoreError::EmbeddingsNotBuilt)
// This is distinct from "Ollama unavailable" — tells user to run `lore embed` first
todo!()
}
/// Hydration query: fetch all display fields for ranked doc IDs in a single round-trip.
///
/// Uses json_each(?) to preserve ranking order from the search pipeline.
/// Aggregates labels and paths inline to avoid N+1 queries.
///
/// ```sql
/// SELECT d.id, d.source_type, d.title, d.url, d.author_username,
/// d.created_at, d.updated_at, d.content_text,
/// p.path_with_namespace AS project_path,
/// (SELECT json_group_array(dl.label_name)
/// FROM document_labels dl WHERE dl.document_id = d.id) AS labels,
/// (SELECT json_group_array(dp.path)
/// FROM document_paths dp WHERE dp.document_id = d.id) AS paths
/// FROM json_each(?) AS j
/// JOIN documents d ON d.id = j.value
/// JOIN projects p ON p.id = d.project_id
/// ORDER BY j.key
/// ```
///
/// This single query replaces what would otherwise be:
/// - 1 query per document for metadata
/// - 1 query per document for labels
/// - 1 query per document for paths
/// For 20 results, that's 60 queries reduced to 1.
fn hydrate_results(
conn: &Connection,
doc_ids: &[i64],
) -> Result<Vec<HydratedDocument>> {
todo!()
}
/// Print human-readable search results.
pub fn print_search_results(response: &SearchResponse, explain: bool) {
println!(
"Found {} results ({} search)\n",
response.total_results,
response.mode
);
for (i, result) in response.results.iter().enumerate() {
let type_prefix = match result.source_type.as_str() {
"merge_request" => "MR",
"issue" => "Issue",
"discussion" => "Discussion",
_ => &result.source_type,
};
let title = result.title.as_deref().unwrap_or("(untitled)");
println!(
"[{}] {} - {} ({})",
i + 1,
style(type_prefix).cyan(),
title,
format!("{:.2}", result.score)
);
if explain {
if let Some(exp) = &result.explain {
let vec_str = exp.vector_rank.map(|r| format!("#{}", r)).unwrap_or_else(|| "-".into());
let fts_str = exp.fts_rank.map(|r| format!("#{}", r)).unwrap_or_else(|| "-".into());
println!(
" Vector: {}, FTS: {}, RRF: {:.4}",
vec_str, fts_str, exp.rrf_score
);
}
}
if let Some(author) = &result.author {
println!(
" @{} · {} · {}",
author, &result.created_at[..10], result.project_path
);
}
println!(" \"{}...\"", &result.snippet);
if let Some(url) = &result.url {
println!(" {}", style(url).dim());
}
println!();
}
}
/// Print JSON search results for robot mode.
pub fn print_search_results_json(response: &SearchResponse, elapsed_ms: u64) {
let output = serde_json::json!({
"ok": true,
"data": response,
"meta": {
"elapsed_ms": elapsed_ms
}
});
println!("{}", serde_json::to_string_pretty(&output).unwrap());
}
CLI integration in src/cli/mod.rs:
/// Search subcommand arguments.
#[derive(Args)]
pub struct SearchArgs {
/// Search query
query: String,
/// Search mode
#[arg(long, default_value = "hybrid")]
mode: String, // "hybrid" | "lexical" | "semantic"
/// Filter by source type
#[arg(long, value_name = "TYPE")]
r#type: Option<String>,
/// Filter by author username
#[arg(long)]
author: Option<String>,
/// Filter by project path
#[arg(long)]
project: Option<String>,
/// Filter by creation date (after)
#[arg(long)]
after: Option<String>,
/// Filter by updated date (recently active items)
#[arg(long)]
updated_after: Option<String>,
/// Filter by label (can specify multiple)
#[arg(long, action = clap::ArgAction::Append)]
label: Vec<String>,
/// Filter by file path
#[arg(long)]
path: Option<String>,
/// Maximum results
#[arg(long, default_value = "20")]
limit: usize,
/// Show ranking breakdown
#[arg(long)]
explain: bool,
/// FTS query mode: "safe" (default) or "raw"
/// - safe: Escapes special chars but preserves `*` for prefix queries
/// - raw: Pass FTS5 MATCH syntax through unchanged (advanced)
#[arg(long, default_value = "safe")]
fts_mode: String, // "safe" | "raw"
}
Acceptance Criteria:
- Works without Ollama running
- All filters functional (including
--updated-after) - Human-readable output with snippets
- Semantic-only results get fallback snippets from content_text
- Results hydrated in single DB round-trip (no N+1 queries)
- JSON output matches schema
- Empty results show helpful message
- "No data indexed" message if documents table empty
--mode=semanticwith 0% embedding coverage returns actionable error (distinct from "Ollama unavailable" — tells user to runlore embedfirst)--fts-mode=safe(default) preserves prefix*while escaping special chars--fts-mode=rawpasses FTS5 MATCH syntax through unchanged
Phase 4: Embedding Pipeline
4.1 Embedding Module Structure
New module: src/embedding/
src/embedding/
├── mod.rs # Module exports
├── ollama.rs # Ollama API client
├── pipeline.rs # Batch embedding orchestration
└── change_detector.rs # Detect documents needing re-embedding
File: src/embedding/mod.rs
//! Embedding generation and storage.
//!
//! Uses Ollama for embedding generation and sqlite-vec for storage.
mod change_detector;
mod chunk_ids;
mod ollama;
mod pipeline;
pub use change_detector::detect_embedding_changes;
pub use chunk_ids::{encode_rowid, decode_rowid, CHUNK_ROWID_MULTIPLIER};
pub use ollama::{OllamaClient, OllamaConfig, check_ollama_health};
pub use pipeline::{embed_documents, EmbedResult};
4.1b Chunk ID Encoding (Shared Module)
File: src/embedding/chunk_ids.rs
//! Shared chunk ID encoding for embedding rowids.
//!
//! Encoding: rowid = document_id * CHUNK_ROWID_MULTIPLIER + chunk_index
//! This encodes (document_id, chunk_index) into a single vec0 rowid.
//! Supports up to 1000 chunks per document.
//!
//! Used by:
//! - `embedding::pipeline` (when storing embeddings)
//! - `search::vector` (when decoding search results)
//! - `cli::commands::stats` (when detecting orphaned embeddings)
//!
//! Living in its own module prevents coupling between pipeline and search.
/// Multiplier for encoding (document_id, chunk_index) into a single vec0 rowid.
/// Supports up to 1000 chunks per document.
pub const CHUNK_ROWID_MULTIPLIER: i64 = 1000;
/// Encode (document_id, chunk_index) into a vec0 rowid.
pub fn encode_rowid(document_id: i64, chunk_index: usize) -> i64 {
document_id * CHUNK_ROWID_MULTIPLIER + chunk_index as i64
}
/// Decode a vec0 rowid into (document_id, chunk_index).
pub fn decode_rowid(rowid: i64) -> (i64, usize) {
(rowid / CHUNK_ROWID_MULTIPLIER, (rowid % CHUNK_ROWID_MULTIPLIER) as usize)
}
4.2 Ollama Client
File: src/embedding/ollama.rs
use reqwest::Client;
use serde::{Deserialize, Serialize};
use crate::core::error::{LoreError, Result};
/// Ollama client configuration.
#[derive(Debug, Clone)]
pub struct OllamaConfig {
pub base_url: String, // "http://localhost:11434"
pub model: String, // "nomic-embed-text"
pub timeout_secs: u64, // Request timeout
}
impl Default for OllamaConfig {
fn default() -> Self {
Self {
base_url: "http://localhost:11434".into(),
model: "nomic-embed-text".into(),
timeout_secs: 60,
}
}
}
/// Ollama API client.
pub struct OllamaClient {
client: Client,
config: OllamaConfig,
}
/// Batch embed request.
#[derive(Serialize)]
struct EmbedRequest {
model: String,
input: Vec<String>,
}
/// Batch embed response.
#[derive(Deserialize)]
struct EmbedResponse {
model: String,
embeddings: Vec<Vec<f32>>,
}
/// Model info from /api/tags.
#[derive(Deserialize)]
struct TagsResponse {
models: Vec<ModelInfo>,
}
#[derive(Deserialize)]
struct ModelInfo {
name: String,
}
impl OllamaClient {
pub fn new(config: OllamaConfig) -> Self {
let client = Client::builder()
.timeout(std::time::Duration::from_secs(config.timeout_secs))
.build()
.expect("Failed to create HTTP client");
Self { client, config }
}
/// Check if Ollama is available and model is loaded.
pub async fn health_check(&self) -> Result<()> {
let url = format!("{}/api/tags", self.config.base_url);
let response = self.client.get(&url).send().await.map_err(|e| {
LoreError::OllamaUnavailable {
base_url: self.config.base_url.clone(),
source: Some(e),
}
})?;
let tags: TagsResponse = response.json().await?;
let model_available = tags.models.iter().any(|m| m.name.starts_with(&self.config.model));
if !model_available {
return Err(LoreError::OllamaModelNotFound {
model: self.config.model.clone(),
});
}
Ok(())
}
/// Generate embeddings for a batch of texts.
///
/// Returns 768-dimensional vectors for each input text.
pub async fn embed_batch(&self, texts: Vec<String>) -> Result<Vec<Vec<f32>>> {
let url = format!("{}/api/embed", self.config.base_url);
let request = EmbedRequest {
model: self.config.model.clone(),
input: texts,
};
let response = self.client
.post(&url)
.json(&request)
.send()
.await
.map_err(|e| LoreError::OllamaUnavailable {
base_url: self.config.base_url.clone(),
source: Some(e),
})?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(LoreError::EmbeddingFailed {
document_id: 0, // Batch failure
reason: format!("HTTP {}: {}", status, body),
});
}
let embed_response: EmbedResponse = response.json().await?;
Ok(embed_response.embeddings)
}
}
/// Quick health check without full client.
pub async fn check_ollama_health(base_url: &str) -> bool {
let client = Client::new();
client
.get(format!("{}/api/tags", base_url))
.send()
.await
.is_ok()
}
Endpoints:
| Endpoint | Purpose |
|---|---|
GET /api/tags |
Health check, verify model available |
POST /api/embed |
Batch embedding (preferred) |
Acceptance Criteria:
- Health check detects Ollama availability
- Batch embedding works with up to 32 texts
- Clear error messages for common failures
4.3 Error Handling Extensions
File: src/core/error.rs (extend existing)
Add to ErrorCode (existing exit codes 1-13 are taken):
pub enum ErrorCode {
// ... existing variants (InternalError=1 through TransformError=13) ...
OllamaUnavailable,
OllamaModelNotFound,
EmbeddingFailed,
}
impl ErrorCode {
pub fn exit_code(&self) -> i32 {
match self {
// ... existing mappings ...
Self::OllamaUnavailable => 14,
Self::OllamaModelNotFound => 15,
Self::EmbeddingFailed => 16,
}
}
}
impl std::fmt::Display for ErrorCode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let code = match self {
// ... existing mappings ...
Self::OllamaUnavailable => "OLLAMA_UNAVAILABLE",
Self::OllamaModelNotFound => "OLLAMA_MODEL_NOT_FOUND",
Self::EmbeddingFailed => "EMBEDDING_FAILED",
};
write!(f, "{code}")
}
}
Add to LoreError:
pub enum LoreError {
// ... existing variants (including Http(reqwest::Error) for generic HTTP errors) ...
/// Ollama-specific connection failure. Use this instead of Http for Ollama errors
/// because it includes the base_url for actionable error messages.
/// The embedding pipeline should map Ollama HTTP errors to this variant, not Http.
#[error("Cannot connect to Ollama at {base_url}. Is it running?")]
OllamaUnavailable {
base_url: String,
#[source]
source: Option<reqwest::Error>,
},
#[error("Ollama model '{model}' not found. Run: ollama pull {model}")]
OllamaModelNotFound { model: String },
#[error("Embedding failed for document {document_id}: {reason}")]
EmbeddingFailed { document_id: i64, reason: String },
#[error("No embeddings found. Run: lore embed")]
EmbeddingsNotBuilt,
}
impl LoreError {
pub fn code(&self) -> ErrorCode {
match self {
// ... existing mappings ...
Self::OllamaUnavailable { .. } => ErrorCode::OllamaUnavailable,
Self::OllamaModelNotFound { .. } => ErrorCode::OllamaModelNotFound,
Self::EmbeddingFailed { .. } => ErrorCode::EmbeddingFailed,
Self::EmbeddingsNotBuilt => ErrorCode::EmbeddingFailed,
}
}
pub fn suggestion(&self) -> Option<&'static str> {
match self {
// ... existing mappings ...
Self::OllamaUnavailable { .. } => Some("Start Ollama: ollama serve"),
Self::OllamaModelNotFound { model } => Some("Pull the model: ollama pull nomic-embed-text"),
Self::EmbeddingFailed { .. } => Some("Check Ollama logs or retry with 'lore embed --retry-failed'"),
Self::EmbeddingsNotBuilt => Some("Generate embeddings first: lore embed"),
}
}
}
4.4 Embedding Pipeline
File: src/embedding/pipeline.rs
use indicatif::{ProgressBar, ProgressStyle};
use rusqlite::Connection;
use crate::core::error::Result;
use crate::embedding::OllamaClient;
/// Batch size for embedding requests (texts per Ollama API call).
const BATCH_SIZE: usize = 32;
/// SQLite page size for paging through pending documents.
/// Uses keyset paging (id > last_id) to avoid rescanning previously-processed rows.
const DB_PAGE_SIZE: usize = 500;
/// Expected embedding dimensions for nomic-embed-text model.
/// IMPORTANT: Validates against this to prevent silent corruption.
const EXPECTED_DIMS: usize = 768;
/// Maximum characters per chunk for embedding.
/// nomic-embed-text context window is ~8192 tokens.
/// At ~4 chars/token, 32k chars is a safe upper bound.
const CHUNK_MAX_CHARS: usize = 32_000;
/// Overlap between chunks to preserve context at boundaries.
/// 500 chars (~125 tokens) provides enough context for sentence boundaries.
const CHUNK_OVERLAP_CHARS: usize = 500;
// NOTE: encode_rowid, decode_rowid, and CHUNK_ROWID_MULTIPLIER are imported
// from `crate::embedding::chunk_ids` (shared module, see 4.1b).
// This prevents coupling between pipeline.rs and search/vector.rs.
use crate::embedding::chunk_ids::{encode_rowid, CHUNK_ROWID_MULTIPLIER};
/// Split document text into overlapping chunks at paragraph boundaries.
///
/// Rules:
/// - Each chunk <= CHUNK_MAX_CHARS
/// - Split at paragraph boundaries (\n\n) when possible
/// - Fall back to sentence boundaries (. ! ?) then word boundaries
/// - Overlap of CHUNK_OVERLAP_CHARS between consecutive chunks
/// - Single documents under CHUNK_MAX_CHARS produce exactly 1 chunk (index 0)
///
/// Returns: Vec<(chunk_index, chunk_text)>
fn split_into_chunks(content: &str) -> Vec<(usize, String)> {
if content.len() <= CHUNK_MAX_CHARS {
return vec![(0, content.to_string())];
}
// Split at paragraph boundaries with overlap
todo!()
}
/// Which documents to embed.
#[derive(Debug, Clone, Copy)]
pub enum EmbedSelection {
/// New or changed documents (default).
Pending,
/// Only previously failed documents.
RetryFailed,
}
/// Result of embedding run.
#[derive(Debug, Default)]
pub struct EmbedResult {
pub embedded: usize,
pub failed: usize,
pub skipped: usize,
}
/// Embed documents that need embedding.
///
/// Process:
/// 1. Select documents needing embeddings:
/// - Pending: missing embedding_metadata row OR content_hash mismatch
/// - RetryFailed: embedding_metadata.last_error IS NOT NULL
/// 2. Page through candidates using keyset pagination (id > last_id)
/// to avoid rescanning already-processed rows
/// 3. For each document: split content into chunks via `split_into_chunks()`
/// - Documents <= 32k chars produce 1 chunk (common case, no overhead)
/// - Longer documents split at paragraph boundaries with 500-char overlap
/// 4. Clear existing chunks for changed documents (`clear_document_embeddings()`)
/// 5. Batch chunk texts -> Ollama `/api/embed` with concurrent HTTP requests
/// (concurrency = max in-flight HTTP requests to Ollama)
/// 6. Write chunk embeddings + embedding_metadata in per-batch transactions
/// - rowid = document_id * 1000 + chunk_index
/// 7. Failed batches record `last_error` in embedding_metadata
/// (excluded from Pending selection; retried via RetryFailed)
/// 8. Progress reported as (embedded + failed) vs total_pending
pub async fn embed_documents(
conn: &Connection,
client: &OllamaClient,
selection: EmbedSelection,
concurrency: usize,
progress_callback: Option<Box<dyn Fn(usize, usize)>>,
) -> Result<EmbedResult> {
use futures::stream::{FuturesUnordered, StreamExt};
let mut result = EmbedResult::default();
let total_pending = count_pending_documents(conn, selection)?;
if total_pending == 0 {
return Ok(result);
}
// Page through pending documents using keyset pagination to avoid
// both memory pressure and OFFSET performance degradation.
let mut last_id: i64 = 0;
loop {
let pending = find_pending_documents(conn, DB_PAGE_SIZE, last_id, selection)?;
if pending.is_empty() {
break;
}
// Launch concurrent HTTP requests, collect results
let mut futures = FuturesUnordered::new();
// Build ChunkWork items: split each document into chunks, clear old embeddings
let mut work: Vec<ChunkWork> = Vec::new();
{
let tx = conn.transaction()?;
for doc in &pending {
clear_document_embeddings(&tx, doc.id)?;
for (chunk_index, chunk_text) in split_into_chunks(&doc.content) {
work.push(ChunkWork {
doc_id: doc.id,
chunk_index,
doc_hash: doc.content_hash.clone(),
chunk_hash: crate::documents::compute_content_hash(&chunk_text),
text: chunk_text,
});
}
}
tx.commit()?;
}
// Batch ChunkWork texts into Ollama API calls
for batch in work.chunks(BATCH_SIZE) {
let texts: Vec<String> = batch.iter().map(|w| w.text.clone()).collect();
let batch_meta: Vec<ChunkWork> = batch.to_vec();
futures.push(async move {
let embed_result = client.embed_batch(texts).await;
(batch_meta, embed_result)
});
// Cap in-flight requests
if futures.len() >= concurrency {
if let Some((meta, res)) = futures.next().await {
collect_writes(conn, &meta, res, &mut result)?;
}
}
}
// Drain remaining futures
while let Some((meta, res)) = futures.next().await {
collect_writes(conn, &meta, res, &mut result)?;
}
// Advance keyset cursor for next page
if let Some(last) = pending.last() {
last_id = last.id;
}
if let Some(ref cb) = progress_callback {
cb(result.embedded + result.failed, total_pending);
}
}
Ok(result)
}
/// Chunk work item prepared for embedding.
struct ChunkWork {
doc_id: i64,
chunk_index: usize,
doc_hash: String, // SHA-256 of full document content (staleness detection)
chunk_hash: String, // SHA-256 of this chunk's text (provenance)
text: String,
}
/// Collect embedding results and write to DB (sequential, on main thread).
///
/// IMPORTANT: Validates embedding dimensions to prevent silent corruption.
/// If model returns wrong dimensions (e.g., different model configured),
/// the document is marked as failed rather than storing corrupt data.
///
/// NOTE: batch_meta contains ChunkWork items (not raw documents) because the
/// caller must split documents into chunks before batching for Ollama /api/embed.
/// Each ChunkWork carries both doc_hash (for staleness) and chunk_hash (for provenance).
fn collect_writes(
conn: &Connection,
batch_meta: &[ChunkWork],
embed_result: Result<Vec<Vec<f32>>>,
result: &mut EmbedResult,
) -> Result<()> {
let tx = conn.transaction()?;
match embed_result {
Ok(embeddings) => {
for (chunk, embedding) in batch_meta.iter().zip(embeddings.iter()) {
// Validate dimensions to prevent silent corruption
if embedding.len() != EXPECTED_DIMS {
record_embedding_error(
&tx,
chunk.doc_id,
chunk.chunk_index,
&chunk.doc_hash,
&chunk.chunk_hash,
&format!(
"embedding dimension mismatch: got {}, expected {}",
embedding.len(),
EXPECTED_DIMS
),
)?;
result.failed += 1;
continue;
}
store_embedding(&tx, chunk.doc_id, chunk.chunk_index, embedding, &chunk.doc_hash, &chunk.chunk_hash)?;
result.embedded += 1;
}
}
Err(e) => {
for chunk in batch_meta {
record_embedding_error(&tx, chunk.doc_id, chunk.chunk_index, &chunk.doc_hash, &chunk.chunk_hash, &e.to_string())?;
result.failed += 1;
}
}
}
tx.commit()?;
Ok(())
}
struct PendingDocument {
id: i64,
content: String,
content_hash: String,
}
/// Count total pending documents (for progress reporting).
///
/// NOTE: With chunked embeddings, we detect pending at the document level.
/// A document needs embedding if:
/// - No embedding_metadata rows exist for it (new document)
/// - Its document_hash changed (chunk_index=0 row has mismatched hash)
/// This avoids counting individual chunks as separate pending items.
///
/// IMPORTANT: Uses `document_hash` (not `chunk_hash`) because staleness is
/// a document-level condition. See migration 009 comment for details.
fn count_pending_documents(conn: &Connection, selection: EmbedSelection) -> Result<usize> {
let sql = match selection {
EmbedSelection::Pending =>
"SELECT COUNT(*)
FROM documents d
LEFT JOIN embedding_metadata em ON d.id = em.document_id AND em.chunk_index = 0
WHERE em.document_id IS NULL
OR em.document_hash != d.content_hash",
EmbedSelection::RetryFailed =>
"SELECT COUNT(DISTINCT d.id)
FROM documents d
JOIN embedding_metadata em ON d.id = em.document_id
WHERE em.last_error IS NOT NULL",
};
let count: usize = conn.query_row(sql, [], |row| row.get(0))?;
Ok(count)
}
/// Find pending documents for embedding using keyset pagination.
///
/// IMPORTANT: Uses keyset pagination (d.id > last_id) instead of OFFSET.
/// OFFSET degrades O(n²) on large result sets because SQLite must scan
/// and discard all rows before the offset. Keyset pagination is O(1) per page
/// since the index seek goes directly to the starting row.
fn find_pending_documents(
conn: &Connection,
limit: usize,
last_id: i64,
selection: EmbedSelection,
) -> Result<Vec<PendingDocument>> {
// NOTE: Finds documents needing embedding at the document level.
// The caller is responsible for chunking content_text via split_into_chunks()
// and calling clear_document_embeddings() before storing new chunks.
// Uses document_hash (not chunk_hash) for staleness detection.
let sql = match selection {
EmbedSelection::Pending =>
"SELECT d.id, d.content_text, d.content_hash
FROM documents d
LEFT JOIN embedding_metadata em ON d.id = em.document_id AND em.chunk_index = 0
WHERE (em.document_id IS NULL
OR em.document_hash != d.content_hash)
AND d.id > ?
ORDER BY d.id
LIMIT ?",
EmbedSelection::RetryFailed =>
"SELECT DISTINCT d.id, d.content_text, d.content_hash
FROM documents d
JOIN embedding_metadata em ON d.id = em.document_id
WHERE em.last_error IS NOT NULL
AND d.id > ?
ORDER BY d.id
LIMIT ?",
};
let mut stmt = conn.prepare(sql)?;
let docs = stmt
.query_map(rusqlite::params![last_id, limit as i64], |row| {
Ok(PendingDocument {
id: row.get(0)?,
content: row.get(1)?,
content_hash: row.get(2)?,
})
})?
.collect::<std::result::Result<Vec<_>, _>>()?;
Ok(docs)
}
/// Clear all existing chunk embeddings for a document before re-embedding.
/// Called when content_hash changes (chunk count may differ).
fn clear_document_embeddings(tx: &rusqlite::Transaction, document_id: i64) -> Result<()> {
// Delete vec0 rows (range covers all possible chunk indices)
tx.execute(
"DELETE FROM embeddings WHERE rowid >= ? AND rowid < ?",
rusqlite::params![
document_id * CHUNK_ROWID_MULTIPLIER,
(document_id + 1) * CHUNK_ROWID_MULTIPLIER,
],
)?;
// Delete metadata rows
tx.execute(
"DELETE FROM embedding_metadata WHERE document_id = ?",
rusqlite::params![document_id],
)?;
Ok(())
}
fn store_embedding(
tx: &rusqlite::Transaction,
document_id: i64,
chunk_index: usize,
embedding: &[f32],
doc_hash: &str,
chunk_hash: &str,
) -> Result<()> {
let rowid = encode_rowid(document_id, chunk_index);
// Convert embedding to bytes for sqlite-vec
// sqlite-vec expects raw little-endian bytes, not the array directly
let embedding_bytes: Vec<u8> = embedding
.iter()
.flat_map(|f| f.to_le_bytes())
.collect();
// Store in sqlite-vec (rowid encodes doc_id + chunk_index)
tx.execute(
"INSERT OR REPLACE INTO embeddings(rowid, embedding) VALUES (?, ?)",
rusqlite::params![rowid, embedding_bytes],
)?;
// Update metadata (per-chunk) with both hashes:
// - document_hash for staleness detection (compared to documents.content_hash)
// - chunk_hash for provenance/debug
let now = crate::core::time::now_ms();
tx.execute(
"INSERT OR REPLACE INTO embedding_metadata
(document_id, chunk_index, model, dims, document_hash, chunk_hash, created_at, last_error, attempt_count, last_attempt_at)
VALUES (?, ?, 'nomic-embed-text', 768, ?, ?, ?, NULL, 0, ?)",
rusqlite::params![document_id, chunk_index, doc_hash, chunk_hash, now, now],
)?;
Ok(())
}
fn record_embedding_error(
tx: &rusqlite::Transaction,
document_id: i64,
chunk_index: usize,
doc_hash: &str,
chunk_hash: &str,
error: &str,
) -> Result<()> {
let now = crate::core::time::now_ms();
tx.execute(
"INSERT INTO embedding_metadata
(document_id, chunk_index, model, dims, document_hash, chunk_hash, created_at, last_error, attempt_count, last_attempt_at)
VALUES (?, ?, 'nomic-embed-text', 768, ?, ?, ?, ?, 1, ?)
ON CONFLICT(document_id, chunk_index) DO UPDATE SET
last_error = excluded.last_error,
attempt_count = attempt_count + 1,
last_attempt_at = excluded.last_attempt_at",
rusqlite::params![document_id, chunk_index, doc_hash, chunk_hash, now, error, now],
)?;
Ok(())
}
Acceptance Criteria:
- New documents get embedded (all chunks)
- Changed documents (hash mismatch) get re-embedded: existing chunks cleared first via
clear_document_embeddings() - Documents under 32k chars produce exactly 1 chunk (chunk_index=0)
- Long documents split at paragraph boundaries with 500-char overlap
- Unchanged documents skipped
- Failures recorded in
embedding_metadata.last_errorper chunk - Failures record actual content_hash (not empty string)
- Writes batched in transactions for performance
concurrencyparameter controls max concurrent HTTP requests to Ollama API- Progress reported during embedding
- Deterministic
ORDER BY d.idensures consistent paging EmbedSelectionparameter controls pending vs retry-failed mode
4.5 CLI: lore embed
File: src/cli/commands/embed.rs
//! Embed command - generate embeddings for documents.
use indicatif::{ProgressBar, ProgressStyle};
use serde::Serialize;
use crate::core::error::Result;
use crate::embedding::{embed_documents, EmbedResult, OllamaClient, OllamaConfig};
use crate::Config;
/// Run embedding command.
pub async fn run_embed(
config: &Config,
retry_failed: bool,
) -> Result<EmbedResult> {
use crate::core::db::create_connection;
use crate::embedding::pipeline::EmbedSelection;
let ollama_config = OllamaConfig {
base_url: config.embedding.base_url.clone(),
model: config.embedding.model.clone(),
timeout_secs: 120,
};
let client = OllamaClient::new(ollama_config);
// Health check
client.health_check().await?;
// Open database connection
let conn = create_connection(&config.storage.db_path)?;
// Determine selection mode
let selection = if retry_failed {
EmbedSelection::RetryFailed
} else {
EmbedSelection::Pending
};
// Run embedding
let result = embed_documents(
&conn,
&client,
selection,
config.embedding.concurrency as usize,
None,
).await?;
Ok(result)
}
/// Print human-readable output.
pub fn print_embed(result: &EmbedResult, elapsed_secs: u64) {
println!("Embedding complete:");
println!(" Embedded: {:>6} documents", result.embedded);
println!(" Failed: {:>6} documents", result.failed);
println!(" Skipped: {:>6} documents", result.skipped);
println!(" Elapsed: {}m {}s", elapsed_secs / 60, elapsed_secs % 60);
}
/// Print JSON output for robot mode.
pub fn print_embed_json(result: &EmbedResult, elapsed_ms: u64) {
let output = serde_json::json!({
"ok": true,
"data": {
"embedded": result.embedded,
"failed": result.failed,
"skipped": result.skipped
},
"meta": {
"elapsed_ms": elapsed_ms
}
});
println!("{}", serde_json::to_string_pretty(&output).unwrap());
}
CLI integration:
/// Embed subcommand arguments.
#[derive(Args)]
pub struct EmbedArgs {
/// Retry only previously failed documents
#[arg(long)]
retry_failed: bool,
}
Acceptance Criteria:
- Embeds documents without embeddings
- Re-embeds documents with changed hash
--retry-failedonly processes failed documents- Progress bar with count
- Clear error if Ollama unavailable
4.6 CLI: lore stats
File: src/cli/commands/stats.rs
//! Stats command - display document and embedding statistics.
use rusqlite::Connection;
use serde::Serialize;
use crate::core::error::Result;
use crate::Config;
/// Document statistics.
#[derive(Debug, Serialize)]
pub struct Stats {
pub documents: DocumentStats,
pub embeddings: EmbeddingStats,
pub fts: FtsStats,
pub queues: QueueStats,
}
#[derive(Debug, Serialize)]
pub struct DocumentStats {
pub issues: usize,
pub mrs: usize,
pub discussions: usize,
pub total: usize,
pub truncated: usize,
}
#[derive(Debug, Serialize)]
pub struct EmbeddingStats {
/// Documents with at least one embedding (chunk_index=0 exists in embedding_metadata)
pub embedded: usize,
pub pending: usize,
pub failed: usize,
/// embedded / total_documents * 100 (document-level, not chunk-level)
pub coverage_pct: f64,
/// Total chunks across all embedded documents
pub total_chunks: usize,
}
#[derive(Debug, Serialize)]
pub struct FtsStats {
pub indexed: usize,
}
/// Queue statistics for observability.
///
/// Exposes internal queue depths so operators can detect backlogs
/// and failing items that need manual intervention.
#[derive(Debug, Serialize)]
pub struct QueueStats {
/// Items in dirty_sources queue (pending document regeneration)
pub dirty_sources: usize,
/// Items in dirty_sources with last_error set (failing regeneration)
pub dirty_sources_failed: usize,
/// Items in pending_discussion_fetches queue
pub pending_discussion_fetches: usize,
/// Items in pending_discussion_fetches with last_error set
pub pending_discussion_fetches_failed: usize,
}
/// Integrity check result.
#[derive(Debug, Serialize)]
pub struct IntegrityCheck {
pub documents_count: usize,
pub fts_count: usize,
pub embeddings_count: usize,
pub metadata_count: usize,
pub orphaned_embeddings: usize,
pub hash_mismatches: usize,
pub ok: bool,
}
/// Run stats command.
pub fn run_stats(config: &Config) -> Result<Stats> {
// Query counts from database
todo!()
}
/// Run integrity check (--check flag).
///
/// Verifies:
/// - documents count == documents_fts count
/// - All embeddings.rowid decode to valid document IDs (rowid / 1000 exists in documents.id)
/// - embedding_metadata.document_hash == documents.content_hash for chunk_index=0 rows
pub fn run_integrity_check(config: &Config) -> Result<IntegrityCheck> {
// 1. Count documents
// 2. Count FTS entries
// 3. Find orphaned embeddings (rowid / 1000 not in documents.id)
// 4. Find hash mismatches between embedding_metadata (chunk_index=0) and documents
// 5. Return check results
todo!()
}
/// Repair result from --repair flag.
#[derive(Debug, Serialize)]
pub struct RepairResult {
pub orphaned_embeddings_deleted: usize,
pub stale_embeddings_cleared: usize,
pub missing_fts_repopulated: usize,
}
/// Repair issues found by integrity check (--repair flag).
///
/// Fixes:
/// - Deletes orphaned embeddings (embedding_metadata rows with no matching document)
/// - Clears stale embedding_metadata (hash mismatch) so they get re-embedded
/// - Rebuilds FTS index from scratch (correct-by-construction)
///
/// NOTE: FTS repair uses `rebuild` rather than partial row insertion.
/// With `content='documents'` (external-content FTS), partial repopulation
/// via INSERT of missing rows is fragile — if the external content table
/// and FTS content diverge in any way, partial fixes can leave the index
/// in an inconsistent state. A full rebuild is slower but guaranteed correct.
pub fn run_repair(config: &Config) -> Result<RepairResult> {
let conn = create_connection(&config.storage.db_path)?;
// Delete orphaned embedding_metadata (no matching document)
let orphaned_deleted = conn.execute(
"DELETE FROM embedding_metadata
WHERE document_id NOT IN (SELECT id FROM documents)",
[],
)?;
// Also delete orphaned chunks from embeddings virtual table (sqlite-vec).
// Chunked rowids: doc_id * 1000 + chunk_index. Orphan = rowid / 1000 not in documents.
conn.execute(
"DELETE FROM embeddings
WHERE rowid / 1000 NOT IN (SELECT id FROM documents)",
[],
)?;
// Clear stale embeddings (document-level hash mismatch) - will be re-embedded.
// Uses document_hash (not chunk_hash) because staleness is document-level:
// if the document changed, ALL chunks for that document are stale.
// Must also delete corresponding vec0 rows (range deletion for all chunks).
let stale_doc_ids: Vec<i64> = {
let mut stmt = conn.prepare(
"SELECT DISTINCT d.id
FROM documents d
JOIN embedding_metadata em
ON em.document_id = d.id AND em.chunk_index = 0
WHERE em.document_hash != d.content_hash"
)?;
stmt.query_map([], |r| r.get(0))?
.collect::<std::result::Result<Vec<_>, _>>()?
};
let mut stale_cleared: usize = 0;
for doc_id in &stale_doc_ids {
stale_cleared += conn.execute(
"DELETE FROM embedding_metadata WHERE document_id = ?",
[doc_id],
)? as usize;
// Also clean up vec0 rows (range covers all chunk indices for this doc)
conn.execute(
"DELETE FROM embeddings WHERE rowid >= ? AND rowid < ?",
rusqlite::params![doc_id * 1000, (doc_id + 1) * 1000],
)?;
}
// Rebuild FTS index from scratch — correct-by-construction.
// This re-reads all rows from the external content table (documents)
// and rebuilds the index. Slower than partial fix but guaranteed consistent.
conn.execute(
"INSERT INTO documents_fts(documents_fts) VALUES('rebuild')",
[],
)?;
let fts_rebuilt = 1; // rebuild is all-or-nothing
Ok(RepairResult {
orphaned_embeddings_deleted: orphaned_deleted,
stale_embeddings_cleared: stale_cleared,
missing_fts_repopulated: fts_rebuilt,
})
}
/// Print human-readable stats.
pub fn print_stats(stats: &Stats) {
println!("Document Statistics:");
println!(" Issues: {:>6} documents", stats.documents.issues);
println!(" MRs: {:>6} documents", stats.documents.mrs);
println!(" Discussions: {:>6} documents", stats.documents.discussions);
println!(" Total: {:>6} documents", stats.documents.total);
if stats.documents.truncated > 0 {
println!(" Truncated: {:>6}", stats.documents.truncated);
}
println!();
println!("Embedding Coverage:");
println!(" Embedded: {:>6} ({:.1}%)", stats.embeddings.embedded, stats.embeddings.coverage_pct);
println!(" Pending: {:>6}", stats.embeddings.pending);
println!(" Failed: {:>6}", stats.embeddings.failed);
println!();
println!("FTS Index:");
println!(" Indexed: {:>6} documents", stats.fts.indexed);
println!();
println!("Queue Depths:");
println!(" Dirty sources: {:>6} ({} failed)",
stats.queues.dirty_sources,
stats.queues.dirty_sources_failed
);
println!(" Discussion fetches:{:>6} ({} failed)",
stats.queues.pending_discussion_fetches,
stats.queues.pending_discussion_fetches_failed
);
}
/// Print integrity check results.
pub fn print_integrity_check(check: &IntegrityCheck) {
println!("Integrity Check:");
println!(" Documents: {:>6}", check.documents_count);
println!(" FTS entries: {:>6}", check.fts_count);
println!(" Embeddings: {:>6}", check.embeddings_count);
println!(" Metadata: {:>6}", check.metadata_count);
if check.orphaned_embeddings > 0 {
println!(" Orphaned embeddings: {:>6} (WARN)", check.orphaned_embeddings);
}
if check.hash_mismatches > 0 {
println!(" Hash mismatches: {:>6} (WARN)", check.hash_mismatches);
}
println!();
println!(" Status: {}", if check.ok { "OK" } else { "ISSUES FOUND" });
}
/// Print JSON stats for robot mode.
pub fn print_stats_json(stats: &Stats) {
let output = serde_json::json!({
"ok": true,
"data": stats
});
println!("{}", serde_json::to_string_pretty(&output).unwrap());
}
/// Print repair results. pub fn print_repair_result(result: &RepairResult) { println!("Repair Results:"); println!(" Orphaned embeddings deleted: {}", result.orphaned_embeddings_deleted); println!(" Stale embeddings cleared: {}", result.stale_embeddings_cleared); println!(" Missing FTS repopulated: {}", result.missing_fts_repopulated); println!(); let total = result.orphaned_embeddings_deleted + result.stale_embeddings_cleared + result.missing_fts_repopulated; if total == 0 { println!(" No issues found to repair."); } else { println!(" Fixed {} issues.", total); } }
**CLI integration:**
```rust
/// Stats subcommand arguments.
#[derive(Args)]
pub struct StatsArgs {
/// Run integrity checks (document/FTS/embedding consistency)
#[arg(long)]
check: bool,
/// Repair issues found by --check (deletes orphaned embeddings, clears stale metadata)
#[arg(long, requires = "check")]
repair: bool,
}
Acceptance Criteria:
- Shows document counts by type
- Shows embedding coverage
- Shows FTS index count
- Identifies truncated documents
- Shows queue depths (dirty_sources, pending_discussion_fetches)
- Shows failed item counts for each queue
--checkverifies document/FTS/embedding consistency--repairfixes orphaned embeddings, stale metadata, missing FTS entries- JSON output for scripting
Phase 5: Hybrid Search
5.1 Vector Search Function
File: src/search/vector.rs
use rusqlite::Connection;
use crate::core::error::Result;
/// Vector search result.
#[derive(Debug, Clone)]
pub struct VectorResult {
pub document_id: i64,
pub distance: f64, // Lower = more similar
}
/// Search documents using vector similarity.
///
/// Returns document-level results by taking the best (lowest distance)
/// chunk match per document. This deduplicates across chunks so the
/// caller sees one result per document.
///
/// IMPORTANT: sqlite-vec KNN queries require:
/// - k parameter for number of results
/// - embedding passed as raw little-endian bytes
///
/// With chunked embeddings, rowid = document_id * 1000 + chunk_index.
/// We over-fetch chunks (3x limit) to ensure enough unique documents after dedup.
pub fn search_vector(
conn: &Connection,
query_embedding: &[f32],
limit: usize,
) -> Result<Vec<VectorResult>> {
use crate::embedding::{decode_rowid, CHUNK_ROWID_MULTIPLIER};
// Convert embedding to bytes for sqlite-vec
let embedding_bytes: Vec<u8> = query_embedding
.iter()
.flat_map(|f| f.to_le_bytes())
.collect();
// Over-fetch to ensure enough unique documents after dedup.
// With avg ~1-2 chunks per doc, 3x is conservative.
// Explicit i64 type for sqlite-vec k parameter binding.
let chunk_limit: i64 = (limit * 3) as i64;
let mut stmt = conn.prepare(
"SELECT rowid, distance
FROM embeddings
WHERE embedding MATCH ? AND k = ?
ORDER BY distance"
)?;
let chunk_results: Vec<(i64, f64)> = stmt
.query_map(rusqlite::params![embedding_bytes, chunk_limit], |row| {
Ok((row.get::<_, i64>(0)?, row.get::<_, f64>(1)?))
})?
.collect::<std::result::Result<Vec<_>, _>>()?;
// Deduplicate: keep best (lowest distance) chunk per document
let mut best_by_doc: std::collections::HashMap<i64, f64> =
std::collections::HashMap::new();
for (rowid, distance) in &chunk_results {
let (doc_id, _chunk_idx) = decode_rowid(*rowid);
best_by_doc
.entry(doc_id)
.and_modify(|d| { if *distance < *d { *d = *distance; } })
.or_insert(*distance);
}
// Sort by distance and take top `limit`
let mut results: Vec<VectorResult> = best_by_doc
.into_iter()
.map(|(doc_id, dist)| VectorResult {
document_id: doc_id,
distance: dist,
})
.collect();
results.sort_by(|a, b| a.distance.partial_cmp(&b.distance).unwrap());
results.truncate(limit);
Ok(results)
}
Acceptance Criteria:
- Returns document IDs with distances (deduplicated across chunks)
- Best chunk distance used per document
- Lower distance = better match
- Works with 768-dim vectors
- Uses k parameter for KNN query
- Over-fetches chunks (3x limit) to handle dedup without missing documents
- Embedding passed as bytes
5.2 RRF Ranking
File: src/search/rrf.rs
use std::collections::HashMap;
/// RRF ranking constant.
const RRF_K: f64 = 60.0;
/// RRF-ranked result.
#[derive(Debug, Clone)]
pub struct RrfResult {
pub document_id: i64,
pub rrf_score: f64, // Raw RRF score
pub normalized_score: f64, // Normalized to 0-1
pub vector_rank: Option<usize>,
pub fts_rank: Option<usize>,
}
/// Rank documents using Reciprocal Rank Fusion.
///
/// Algorithm:
/// RRF_score(d) = Σ 1 / (k + rank_i(d))
///
/// Where:
/// - k = 60 (tunable constant)
/// - rank_i(d) = rank of document d in retriever i (1-indexed)
/// - Sum over all retrievers where document appears
pub fn rank_rrf(
vector_results: &[(i64, f64)], // (doc_id, distance)
fts_results: &[(i64, f64)], // (doc_id, bm25_score)
) -> Vec<RrfResult> {
let mut scores: HashMap<i64, (f64, Option<usize>, Option<usize>)> = HashMap::new();
// Add vector results (1-indexed ranks)
for (rank, (doc_id, _)) in vector_results.iter().enumerate() {
let rrf_contribution = 1.0 / (RRF_K + (rank + 1) as f64);
let entry = scores.entry(*doc_id).or_insert((0.0, None, None));
entry.0 += rrf_contribution;
entry.1 = Some(rank + 1);
}
// Add FTS results (1-indexed ranks)
for (rank, (doc_id, _)) in fts_results.iter().enumerate() {
let rrf_contribution = 1.0 / (RRF_K + (rank + 1) as f64);
let entry = scores.entry(*doc_id).or_insert((0.0, None, None));
entry.0 += rrf_contribution;
entry.2 = Some(rank + 1);
}
// Convert to results and sort by RRF score descending
let mut results: Vec<_> = scores
.into_iter()
.map(|(doc_id, (rrf_score, vector_rank, fts_rank))| {
RrfResult {
document_id: doc_id,
rrf_score,
normalized_score: 0.0, // Will be set below
vector_rank,
fts_rank,
}
})
.collect();
results.sort_by(|a, b| b.rrf_score.partial_cmp(&a.rrf_score).unwrap());
// Normalize scores to 0-1
if let Some(max_score) = results.first().map(|r| r.rrf_score) {
for result in &mut results {
result.normalized_score = result.rrf_score / max_score;
}
}
results
}
Acceptance Criteria:
- Documents in both lists score higher
- Documents in one list still included
- Normalized score = rrfScore / max(rrfScore)
- Raw RRF score available in
--explainoutput
5.3 Adaptive Recall
File: src/search/hybrid.rs
use rusqlite::Connection;
use crate::core::error::Result;
use crate::embedding::OllamaClient;
use crate::search::{SearchFilters, SearchMode, search_fts, search_vector, rank_rrf, RrfResult, FtsQueryMode};
/// Minimum base recall for unfiltered search.
const BASE_RECALL_MIN: usize = 50;
/// Minimum recall when filters are applied.
const FILTERED_RECALL_MIN: usize = 200;
/// Maximum recall to prevent excessive resource usage.
const RECALL_CAP: usize = 1500;
/// Search mode.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SearchMode {
Hybrid, // Vector + FTS with RRF
Lexical, // FTS only
Semantic, // Vector only
}
impl SearchMode {
pub fn from_str(s: &str) -> Option<Self> {
match s.to_lowercase().as_str() {
"hybrid" => Some(Self::Hybrid),
"lexical" | "fts" => Some(Self::Lexical),
"semantic" | "vector" => Some(Self::Semantic),
_ => None,
}
}
pub fn as_str(&self) -> &'static str {
match self {
Self::Hybrid => "hybrid",
Self::Lexical => "lexical",
Self::Semantic => "semantic",
}
}
}
/// Hybrid search result.
#[derive(Debug)]
pub struct HybridResult {
pub document_id: i64,
pub score: f64,
pub vector_rank: Option<usize>,
pub fts_rank: Option<usize>,
pub rrf_score: f64,
}
/// Execute hybrid search.
///
/// Adaptive recall: expands topK proportionally to requested limit and filter
/// restrictiveness to prevent "no results" when relevant docs would be filtered out.
///
/// Formula:
/// - Unfiltered: max(50, limit * 10), capped at 1500
/// - Filtered: max(200, limit * 50), capped at 1500
///
/// IMPORTANT: All modes use RRF consistently to ensure rank fields
/// are populated correctly for --explain output.
pub async fn search_hybrid(
conn: &Connection,
client: Option<&OllamaClient>,
ollama_base_url: Option<&str>, // For actionable error messages
query: &str,
mode: SearchMode,
filters: &SearchFilters,
fts_mode: FtsQueryMode,
) -> Result<(Vec<HybridResult>, Vec<String>)> {
let mut warnings: Vec<String> = Vec::new();
// Adaptive recall: proportional to requested limit and filter count
let requested = filters.clamp_limit();
let top_k = if filters.has_any_filter() {
(requested * 50).max(FILTERED_RECALL_MIN).min(RECALL_CAP)
} else {
(requested * 10).max(BASE_RECALL_MIN).min(RECALL_CAP)
};
match mode {
SearchMode::Lexical => {
// FTS only - use RRF with empty vector results for consistent ranking
let fts_results = search_fts(conn, query, top_k, fts_mode)?;
let fts_tuples: Vec<_> = fts_results.iter().map(|r| (r.document_id, r.rank)).collect();
let ranked = rank_rrf(&[], &fts_tuples);
let results = ranked
.into_iter()
.map(|r| HybridResult {
document_id: r.document_id,
score: r.normalized_score,
vector_rank: r.vector_rank,
fts_rank: r.fts_rank,
rrf_score: r.rrf_score,
})
.collect();
Ok((results, warnings))
}
SearchMode::Semantic => {
// Vector only - requires client
let client = client.ok_or_else(|| crate::core::error::LoreError::OllamaUnavailable {
base_url: ollama_base_url.unwrap_or("http://localhost:11434").into(),
source: None,
})?;
let query_embedding = client.embed_batch(vec![query.to_string()]).await?;
let embedding = query_embedding.into_iter().next().unwrap();
let vec_results = search_vector(conn, &embedding, top_k)?;
// Use RRF with empty FTS results for consistent ranking
let vec_tuples: Vec<_> = vec_results.iter().map(|r| (r.document_id, r.distance)).collect();
let ranked = rank_rrf(&vec_tuples, &[]);
let results = ranked
.into_iter()
.map(|r| HybridResult {
document_id: r.document_id,
score: r.normalized_score,
vector_rank: r.vector_rank,
fts_rank: r.fts_rank,
rrf_score: r.rrf_score,
})
.collect();
Ok((results, warnings))
}
SearchMode::Hybrid => {
// Both retrievers with RRF fusion
let fts_results = search_fts(conn, query, top_k, fts_mode)?;
// Attempt vector search with graceful degradation on any failure
let vec_results = match client {
Some(client) => {
// Try to embed query; gracefully degrade on transient failures
match client.embed_batch(vec![query.to_string()]).await {
Ok(embeddings) => {
let embedding = embeddings.into_iter().next().unwrap();
search_vector(conn, &embedding, top_k)?
}
Err(e) => {
// Transient failure (network, timeout, rate limit, etc.)
// Log and fall back to FTS-only rather than failing the search
tracing::warn!("Vector search failed, falling back to lexical: {}", e);
warnings.push(format!(
"Vector search unavailable ({}), using lexical search only",
e
));
Vec::new()
}
}
}
None => {
// No client configured
warnings.push("Embedding service unavailable, using lexical search only".into());
Vec::new()
}
};
// RRF fusion
let vec_tuples: Vec<_> = vec_results.iter().map(|r| (r.document_id, r.distance)).collect();
let fts_tuples: Vec<_> = fts_results.iter().map(|r| (r.document_id, r.rank)).collect();
let ranked = rank_rrf(&vec_tuples, &fts_tuples);
let results = ranked
.into_iter()
.map(|r| HybridResult {
document_id: r.document_id,
score: r.normalized_score,
vector_rank: r.vector_rank,
fts_rank: r.fts_rank,
rrf_score: r.rrf_score,
})
.collect();
Ok((results, warnings))
}
}
}
Acceptance Criteria:
- Unfiltered search uses topK=max(50, limit*10), capped at 1500
- Filtered search uses topK=max(200, limit*50), capped at 1500
- Final results still limited by
--limit - Adaptive recall prevents "no results" under heavy filtering
5.4 Graceful Degradation
When Ollama unavailable during hybrid/semantic search:
- Log warning: "Embedding service unavailable, using lexical search only"
- Fall back to FTS-only search
- Include warning in response
Acceptance Criteria:
- Default mode is hybrid
--mode=lexicalworks without Ollama--mode=semanticrequires Ollama- Graceful degradation when Ollama down
--explainshows rank breakdown- All Phase 3 filters work in hybrid mode
Phase 6: Sync Orchestration
6.1 Dirty Source Tracking
File: src/ingestion/dirty_tracker.rs
use rusqlite::Connection;
use crate::core::error::Result;
use crate::core::time::now_ms;
use crate::documents::SourceType;
/// Batch size when draining dirty_sources queue.
/// We drain repeatedly in a loop until no ready items remain,
/// so this is a batch size (not a hard ceiling per invocation).
/// This bounds WAL growth and memory per iteration while ensuring
/// `generate-docs` and `sync` converge in a single invocation.
const DIRTY_SOURCES_BATCH_SIZE: usize = 500;
/// Mark a source as dirty (needs document regeneration) — transactional variant.
///
/// Called during entity upsert operations INSIDE ingestion transactions.
/// The `_tx` suffix enforces the invariant that dirty marking happens
/// atomically with the entity upsert — callers cannot accidentally use
/// a bare Connection and break atomicity.
///
/// IMPORTANT: Uses upsert with backoff reset, NOT `INSERT OR IGNORE`.
/// `INSERT OR IGNORE` would silently drop re-queues for entities that are
/// already in the queue with a long `next_attempt_at` due to previous failures.
/// This means fresh updates would wait behind stale backoff windows — exactly
/// the opposite of what we want for incremental sync.
///
/// The ON CONFLICT clause resets all backoff/error state so the entity is
/// immediately eligible for processing on the next regeneration pass.
pub fn mark_dirty_tx(
tx: &rusqlite::Transaction<'_>,
source_type: SourceType,
source_id: i64,
) -> Result<()> {
tx.execute(
"INSERT INTO dirty_sources
(source_type, source_id, queued_at, attempt_count, last_attempt_at, last_error, next_attempt_at)
VALUES (?, ?, ?, 0, NULL, NULL, NULL)
ON CONFLICT(source_type, source_id) DO UPDATE SET
queued_at = excluded.queued_at,
attempt_count = 0,
last_attempt_at = NULL,
last_error = NULL,
next_attempt_at = NULL",
rusqlite::params![source_type.as_str(), source_id, now_ms()],
)?;
Ok(())
}
/// Convenience wrapper for non-transactional contexts (e.g., CLI `--full` seeding).
/// Wraps a single mark_dirty call in its own transaction.
pub fn mark_dirty(conn: &Connection, source_type: SourceType, source_id: i64) -> Result<()> {
let tx = conn.transaction()?;
mark_dirty_tx(&tx, source_type, source_id)?;
tx.commit()?;
Ok(())
}
/// Get dirty sources ready for processing.
///
/// Uses `next_attempt_at` for efficient, index-friendly backoff queries.
/// Items with NULL `next_attempt_at` are ready immediately (first attempt).
/// Items with `next_attempt_at <= now` have waited long enough after failure.
///
/// Benefits over SQL bitshift calculation:
/// - No overflow risk from large attempt_count values
/// - Index-friendly: `WHERE next_attempt_at <= ?`
/// - Jitter can be added in Rust when computing next_attempt_at
///
/// This prevents hot-loop retries when a source consistently fails
/// to generate a document (e.g., malformed data, missing references).
pub fn get_dirty_sources(conn: &Connection) -> Result<Vec<(SourceType, i64)>> {
let now = now_ms();
let mut stmt = conn.prepare(
"SELECT source_type, source_id
FROM dirty_sources
WHERE next_attempt_at IS NULL OR next_attempt_at <= ?
ORDER BY attempt_count ASC, queued_at ASC
LIMIT ?"
)?;
let results = stmt
.query_map(rusqlite::params![now, DIRTY_SOURCES_BATCH_SIZE], |row| {
let type_str: String = row.get(0)?;
let source_type = match type_str.as_str() {
"issue" => SourceType::Issue,
"merge_request" => SourceType::MergeRequest,
"discussion" => SourceType::Discussion,
other => return Err(rusqlite::Error::FromSqlConversionFailure(
0,
rusqlite::types::Type::Text,
Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("invalid source_type: {other}"),
)),
)),
};
Ok((source_type, row.get(1)?))
})?
.collect::<std::result::Result<Vec<_>, _>>()?;
Ok(results)
}
/// Clear dirty source after processing.
pub fn clear_dirty(
conn: &Connection,
source_type: SourceType,
source_id: i64,
) -> Result<()> {
conn.execute(
"DELETE FROM dirty_sources WHERE source_type = ? AND source_id = ?",
rusqlite::params![source_type.as_str(), source_id],
)?;
Ok(())
}
Integration Points — Where mark_dirty_tx() is Called:
All calls use mark_dirty_tx(&tx, ...) inside the existing ingestion transaction for
atomic consistency. The _tx suffix on the function signature enforces this at the API level —
callers cannot accidentally use a bare Connection and break atomicity.
Mark ALL upserted entities (not just changed ones). The regenerator's hash comparison handles "unchanged" detection cheaply — this avoids needing change detection in ingestion.
| Location | Source Type | Call | When |
|---|---|---|---|
src/ingestion/issues.rs — issue upsert loop |
SourceType::Issue |
mark_dirty_tx(&tx, ...) |
After each issue INSERT/UPDATE within the batch transaction |
src/ingestion/merge_requests.rs — MR upsert loop |
SourceType::MergeRequest |
mark_dirty_tx(&tx, ...) |
After each MR INSERT/UPDATE within the batch transaction |
src/ingestion/discussions.rs — issue discussion insert |
SourceType::Discussion |
mark_dirty_tx(&tx, ...) |
After each discussion INSERT within the full-refresh transaction |
src/ingestion/mr_discussions.rs — MR discussion upsert |
SourceType::Discussion |
mark_dirty_tx(&tx, ...) |
After each discussion upsert in the write phase |
Discussion Sweep Cleanup:
When the MR discussion sweep deletes stale discussions (last_seen_at < run_start_time),
also delete the corresponding document rows directly. The ON DELETE CASCADE on
document_labels/document_paths and the documents_embeddings_ad trigger handle
all downstream cleanup.
-- In src/ingestion/mr_discussions.rs, during sweep phase.
-- Uses a CTE to capture stale IDs atomically before cascading deletes.
-- This is more defensive than two separate statements because the CTE
-- guarantees the ID set is captured before any row is deleted.
WITH stale AS (
SELECT id FROM discussions
WHERE merge_request_id = ? AND last_seen_at < ?
)
-- Step 1: delete orphaned documents (must happen while source_id still resolves)
DELETE FROM documents
WHERE source_type = 'discussion' AND source_id IN (SELECT id FROM stale);
-- Step 2: delete the stale discussions themselves
DELETE FROM discussions
WHERE id IN (SELECT id FROM stale);
-- NOTE: If your SQLite version doesn't support CTE-based multi-statement,
-- execute as two sequential statements capturing IDs in Rust first:
-- let stale_ids: Vec<i64> = query("SELECT id FROM discussions WHERE ...").collect();
-- DELETE FROM documents WHERE source_type='discussion' AND source_id IN (...)
-- DELETE FROM discussions WHERE id IN (...)
Acceptance Criteria:
- Upserted entities added to dirty_sources inside ingestion transactions via
mark_dirty_tx(&tx, ...) mark_dirty_txenforces transactional usage at API level (takes&Transaction, not&Connection)- ALL upserted entities marked dirty (not just changed ones)
- Re-queue resets backoff state (upsert with ON CONFLICT, not INSERT OR IGNORE)
- Queue cleared after document regeneration
- Queue fully drained via loop (bounded batch size, not hard ceiling per invocation)
- Exponential backoff uses
next_attempt_at(index-friendly, no overflow) - Backoff computed with jitter to prevent thundering herd
- Failed items prioritized lower than fresh items (ORDER BY attempt_count ASC)
- Discussion sweep uses CTE (or Rust-side ID capture) to atomically identify stale IDs before cascading deletes
6.2 Pending Discussion Queue
File: src/ingestion/discussion_queue.rs
use rusqlite::Connection;
use crate::core::error::Result;
use crate::core::time::now_ms;
/// Noteable type for discussion fetching.
#[derive(Debug, Clone, Copy)]
pub enum NoteableType {
Issue,
MergeRequest,
}
impl NoteableType {
pub fn as_str(&self) -> &'static str {
match self {
Self::Issue => "Issue",
Self::MergeRequest => "MergeRequest",
}
}
}
/// Pending discussion fetch entry.
pub struct PendingFetch {
pub project_id: i64,
pub noteable_type: NoteableType,
pub noteable_iid: i64,
pub attempt_count: i64,
}
/// Queue a discussion fetch for an entity.
///
/// Uses ON CONFLICT DO UPDATE (not INSERT OR REPLACE) for consistency with
/// dirty_sources and to avoid DELETE+INSERT semantics that can trigger
/// unexpected FK behavior and reset fields unintentionally.
pub fn queue_discussion_fetch(
conn: &Connection,
project_id: i64,
noteable_type: NoteableType,
noteable_iid: i64,
) -> Result<()> {
conn.execute(
"INSERT INTO pending_discussion_fetches
(project_id, noteable_type, noteable_iid, queued_at, attempt_count, last_attempt_at, last_error, next_attempt_at)
VALUES (?, ?, ?, ?, 0, NULL, NULL, NULL)
ON CONFLICT(project_id, noteable_type, noteable_iid) DO UPDATE SET
queued_at = excluded.queued_at,
attempt_count = 0,
last_attempt_at = NULL,
last_error = NULL,
next_attempt_at = NULL",
rusqlite::params![project_id, noteable_type.as_str(), noteable_iid, now_ms()],
)?;
Ok(())
}
/// Get pending fetches ready for processing.
///
/// Uses `next_attempt_at` for efficient, index-friendly backoff queries.
/// Items with NULL `next_attempt_at` are ready immediately (first attempt).
/// Items with `next_attempt_at <= now` have waited long enough after failure.
///
/// Benefits over SQL bitshift calculation:
/// - No overflow risk from large attempt_count values
/// - Index-friendly: `WHERE next_attempt_at <= ?`
/// - Jitter can be added in Rust when computing next_attempt_at
///
/// Limited to `max_items` to bound API calls per sync run.
pub fn get_pending_fetches(conn: &Connection, max_items: usize) -> Result<Vec<PendingFetch>> {
let now = now_ms();
let mut stmt = conn.prepare(
"SELECT project_id, noteable_type, noteable_iid, attempt_count
FROM pending_discussion_fetches
WHERE next_attempt_at IS NULL OR next_attempt_at <= ?
ORDER BY attempt_count ASC, queued_at ASC
LIMIT ?"
)?;
let results = stmt
.query_map(rusqlite::params![now, max_items], |row| {
let type_str: String = row.get(1)?;
let noteable_type = if type_str == "Issue" {
NoteableType::Issue
} else {
NoteableType::MergeRequest
};
Ok(PendingFetch {
project_id: row.get(0)?,
noteable_type,
noteable_iid: row.get(2)?,
attempt_count: row.get(3)?,
})
})?
.collect::<std::result::Result<Vec<_>, _>>()?;
Ok(results)
}
/// Mark fetch as successful and remove from queue.
pub fn complete_fetch(
conn: &Connection,
project_id: i64,
noteable_type: NoteableType,
noteable_iid: i64,
) -> Result<()> {
conn.execute(
"DELETE FROM pending_discussion_fetches
WHERE project_id = ? AND noteable_type = ? AND noteable_iid = ?",
rusqlite::params![project_id, noteable_type.as_str(), noteable_iid],
)?;
Ok(())
}
/// Record fetch failure and compute next retry time.
///
/// Computes `next_attempt_at` using exponential backoff with jitter:
/// - Base delay: 1000ms * 2^attempt_count
/// - Cap: 1 hour (3600000ms)
/// - Jitter: ±10% to prevent thundering herd
pub fn record_fetch_error(
conn: &Connection,
project_id: i64,
noteable_type: NoteableType,
noteable_iid: i64,
error: &str,
current_attempt: i64,
) -> Result<()> {
let now = now_ms();
let next_attempt = crate::core::backoff::compute_next_attempt_at(now, current_attempt + 1);
conn.execute(
"UPDATE pending_discussion_fetches
SET attempt_count = attempt_count + 1,
last_attempt_at = ?,
last_error = ?,
next_attempt_at = ?
WHERE project_id = ? AND noteable_type = ? AND noteable_iid = ?",
rusqlite::params![now, error, next_attempt, project_id, noteable_type.as_str(), noteable_iid],
)?;
Ok(())
}
// NOTE: Backoff computation uses the shared utility in `src/core/backoff.rs`.
// See Phase 6.X below for the shared implementation.
Acceptance Criteria:
- Updated entities queued for discussion fetch
- Queue uses ON CONFLICT DO UPDATE (not INSERT OR REPLACE) consistent with dirty_sources
- Re-queue resets backoff state immediately (same semantics as dirty_sources)
- Success removes from queue
- Failure increments attempt_count and sets next_attempt_at
- Processing bounded per run (max 100)
- Exponential backoff uses
next_attempt_at(index-friendly, no overflow) - Backoff computed with jitter to prevent thundering herd
6.X Shared Backoff Utility
File: src/core/backoff.rs
Single implementation of exponential backoff with jitter, used by both
dirty_sources and pending_discussion_fetches queues. Living in src/core/
because it's a cross-cutting concern used by multiple modules.
use rand::Rng;
/// Compute next_attempt_at with exponential backoff and jitter.
///
/// Formula: now + min(3600000, 1000 * 2^attempt_count) * (0.9 to 1.1)
/// - Capped at 1 hour to prevent runaway delays
/// - ±10% jitter prevents synchronized retries after outages
///
/// Used by:
/// - `dirty_sources` retry scheduling (document regeneration failures)
/// - `pending_discussion_fetches` retry scheduling (API fetch failures)
///
/// Having one implementation prevents subtle divergence between queues
/// (e.g., different caps or jitter ranges).
pub fn compute_next_attempt_at(now: i64, attempt_count: i64) -> i64 {
// Cap attempt_count to prevent overflow (2^30 > 1 hour anyway)
let capped_attempts = attempt_count.min(30) as u32;
let base_delay_ms = 1000_i64.saturating_mul(1 << capped_attempts);
let capped_delay_ms = base_delay_ms.min(3_600_000); // 1 hour cap
// Add ±10% jitter
let jitter_factor = rand::thread_rng().gen_range(0.9..=1.1);
let delay_with_jitter = (capped_delay_ms as f64 * jitter_factor) as i64;
now + delay_with_jitter
}
Update src/core/mod.rs:
pub mod backoff; // Add to existing modules
Acceptance Criteria:
- Single implementation shared by both queue retry paths
- Cap at 1 hour prevents runaway delays
- Jitter prevents thundering herd after outage recovery
- Unit tests verify backoff curve and cap behavior
6.3 Document Regenerator
File: src/documents/regenerator.rs
use rusqlite::Connection;
use crate::core::error::Result;
use crate::documents::{
extract_issue_document, extract_mr_document, extract_discussion_document,
DocumentData, SourceType,
};
use crate::ingestion::dirty_tracker::{get_dirty_sources, clear_dirty};
/// Result of regeneration run.
#[derive(Debug, Default)]
pub struct RegenerateResult {
pub regenerated: usize,
pub unchanged: usize,
pub errored: usize,
}
/// Regenerate documents from dirty queue.
///
/// Process:
/// 1. Query dirty_sources in bounded batches until queue is drained
/// 2. For each: regenerate document, compute new hash
/// 3. ALWAYS upsert document (labels/paths may change even if content_hash unchanged)
/// 4. Track whether content_hash changed (for stats)
/// 5. Delete from dirty_sources (or record error on failure)
///
/// IMPORTANT: Each dirty item is wrapped in its own transaction. This ensures
/// that a crash or interrupt cannot leave partial writes (e.g., document row
/// updated but document_labels only partially repopulated). The queue clear
/// or error record happens inside the same transaction, guaranteeing atomicity.
///
/// NOTE: Drains the queue completely in a loop (not single-pass). This ensures
/// `lore generate-docs` and `lore sync` converge in one invocation even when
/// the queue has more items than the batch size.
pub fn regenerate_dirty_documents(conn: &Connection) -> Result<RegenerateResult> {
let mut result = RegenerateResult::default();
loop {
let dirty = get_dirty_sources(conn)?;
if dirty.is_empty() { break; }
for (source_type, source_id) in &dirty {
let tx = conn.transaction()?;
match regenerate_one_tx(&tx, *source_type, *source_id) {
Ok(changed) => {
if changed {
result.regenerated += 1;
} else {
result.unchanged += 1;
}
clear_dirty_tx(&tx, *source_type, *source_id)?;
tx.commit()?;
}
Err(e) => {
// Fail-soft: record error inside same transaction, then commit
record_dirty_error_tx(&tx, *source_type, *source_id, &e.to_string())?;
tx.commit()?;
result.errored += 1;
}
}
}
} // end drain loop
Ok(result)
}
// NOTE: regenerate_one_tx, clear_dirty_tx, record_dirty_error_tx,
// upsert_document_tx, delete_document_tx all take &Transaction instead
// of &Connection. This ensures all writes for a single dirty item are
// atomic. The _tx suffix distinguishes from non-transactional helpers.
/// Regenerate a single document. Returns true if content_hash changed.
///
/// If the source entity has been deleted, the corresponding document
/// is also deleted (cascade cleans up labels, paths, embeddings).
fn regenerate_one(
conn: &Connection,
source_type: SourceType,
source_id: i64,
) -> Result<bool> {
// Extractors return Option: None means source entity was deleted
let doc = match source_type {
SourceType::Issue => extract_issue_document(conn, source_id)?,
SourceType::MergeRequest => extract_mr_document(conn, source_id)?,
SourceType::Discussion => extract_discussion_document(conn, source_id)?,
};
let Some(doc) = doc else {
// Source was deleted — remove the document (cascade handles FTS/embeddings)
delete_document(conn, source_type, source_id)?;
return Ok(true);
};
let existing_hash = get_existing_hash(conn, source_type, source_id)?;
let changed = existing_hash.as_ref() != Some(&doc.content_hash);
// Always upsert: labels/paths can change independently of content_hash
upsert_document(conn, &doc)?;
Ok(changed)
}
/// Delete a document by source identity (cascade handles FTS trigger, labels, paths, embeddings).
fn delete_document(
conn: &Connection,
source_type: SourceType,
source_id: i64,
) -> Result<()> {
conn.execute(
"DELETE FROM documents WHERE source_type = ? AND source_id = ?",
rusqlite::params![source_type.as_str(), source_id],
)?;
Ok(())
}
/// Record a regeneration error on a dirty source for retry.
///
/// IMPORTANT: Sets `next_attempt_at` using the shared backoff utility.
/// Without this, failed items would retry every run (hot-loop), defeating
/// the backoff design documented in the schema.
fn record_dirty_error(
conn: &Connection,
source_type: SourceType,
source_id: i64,
error: &str,
) -> Result<()> {
let now = now_ms();
// Read current attempt_count from DB to compute backoff
let attempt_count: i64 = conn.query_row(
"SELECT attempt_count FROM dirty_sources WHERE source_type = ? AND source_id = ?",
rusqlite::params![source_type.as_str(), source_id],
|row| row.get(0),
)?;
// Use shared backoff utility (same as pending_discussion_fetches)
let next_attempt_at = crate::core::backoff::compute_next_attempt_at(now, attempt_count + 1);
conn.execute(
"UPDATE dirty_sources
SET attempt_count = attempt_count + 1,
last_attempt_at = ?,
last_error = ?,
next_attempt_at = ?
WHERE source_type = ? AND source_id = ?",
rusqlite::params![now, error, next_attempt_at, source_type.as_str(), source_id],
)?;
Ok(())
}
/// Get existing content hash for a document, if it exists.
///
/// IMPORTANT: Uses `optional()` to distinguish between:
/// - No row found -> Ok(None)
/// - Row found -> Ok(Some(hash))
/// - DB error -> Err(...)
///
/// Using `.ok()` would hide real DB errors (disk I/O, corruption, etc.)
/// which should propagate up for proper error handling.
fn get_existing_hash(
conn: &Connection,
source_type: SourceType,
source_id: i64,
) -> Result<Option<String>> {
use rusqlite::OptionalExtension;
let mut stmt = conn.prepare(
"SELECT content_hash FROM documents WHERE source_type = ? AND source_id = ?"
)?;
let hash: Option<String> = stmt
.query_row(rusqlite::params![source_type.as_str(), source_id], |row| row.get(0))
.optional()?;
Ok(hash)
}
fn upsert_document(conn: &Connection, doc: &DocumentData) -> Result<()> {
use rusqlite::OptionalExtension;
// Check existing hashes before upserting (for write optimization)
let existing: Option<(i64, String, String, String)> = conn
.query_row(
"SELECT id, content_hash, labels_hash, paths_hash FROM documents
WHERE source_type = ? AND source_id = ?",
rusqlite::params![doc.source_type.as_str(), doc.source_id],
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
)
.optional()?;
// Fast path: skip ALL writes when nothing changed.
// This prevents WAL churn on the "mark ALL upserted entities dirty" strategy,
// where most entities will be unchanged on incremental syncs.
if let Some((_, ref old_content_hash, ref old_labels_hash, ref old_paths_hash)) = existing {
if old_content_hash == &doc.content_hash
&& old_labels_hash == &doc.labels_hash
&& old_paths_hash == &doc.paths_hash
{
return Ok(());
}
}
// Upsert main document (includes labels_hash, paths_hash)
conn.execute(
"INSERT INTO documents
(source_type, source_id, project_id, author_username, label_names,
labels_hash, paths_hash,
created_at, updated_at, url, title, content_text, content_hash,
is_truncated, truncated_reason)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(source_type, source_id) DO UPDATE SET
author_username = excluded.author_username,
label_names = excluded.label_names,
labels_hash = excluded.labels_hash,
paths_hash = excluded.paths_hash,
updated_at = excluded.updated_at,
url = excluded.url,
title = excluded.title,
content_text = excluded.content_text,
content_hash = excluded.content_hash,
is_truncated = excluded.is_truncated,
truncated_reason = excluded.truncated_reason",
rusqlite::params![
doc.source_type.as_str(),
doc.source_id,
doc.project_id,
doc.author_username,
serde_json::to_string(&doc.labels)?,
doc.labels_hash,
doc.paths_hash,
doc.created_at,
doc.updated_at,
doc.url,
doc.title,
doc.content_text,
doc.content_hash,
doc.is_truncated,
doc.truncated_reason,
],
)?;
// Get document ID (either existing or newly inserted)
let doc_id = match existing {
Some((id, _, _, _)) => id,
None => get_document_id(conn, doc.source_type, doc.source_id)?,
};
// Only update labels if hash changed (reduces write amplification)
let labels_changed = match &existing {
Some((_, _, old_hash, _)) => old_hash != &doc.labels_hash,
None => true, // New document, must insert
};
if labels_changed {
conn.execute(
"DELETE FROM document_labels WHERE document_id = ?",
[doc_id],
)?;
for label in &doc.labels {
conn.execute(
"INSERT INTO document_labels (document_id, label_name) VALUES (?, ?)",
rusqlite::params![doc_id, label],
)?;
}
}
// Only update paths if hash changed (reduces write amplification)
let paths_changed = match &existing {
Some((_, _, _, old_hash)) => old_hash != &doc.paths_hash,
None => true, // New document, must insert
};
if paths_changed {
conn.execute(
"DELETE FROM document_paths WHERE document_id = ?",
[doc_id],
)?;
for path in &doc.paths {
conn.execute(
"INSERT INTO document_paths (document_id, path) VALUES (?, ?)",
rusqlite::params![doc_id, path],
)?;
}
}
Ok(())
}
fn get_document_id(
conn: &Connection,
source_type: SourceType,
source_id: i64,
) -> Result<i64> {
let id: i64 = conn.query_row(
"SELECT id FROM documents WHERE source_type = ? AND source_id = ?",
rusqlite::params![source_type.as_str(), source_id],
|row| row.get(0),
)?;
Ok(id)
}
Acceptance Criteria:
- Dirty sources get documents regenerated
- Queue fully drained via loop (not single-pass ceiling)
- Each dirty item processed in its own transaction (crash-safe labels/paths writes)
- Triple-hash fast path (content_hash + labels_hash + paths_hash) skips all writes when unchanged
- Hash comparison prevents unnecessary updates when only some hashes changed
- FTS triggers fire on document update
- Queue cleared after processing (inside same transaction as document write)
6.4 CLI: lore sync
File: src/cli/commands/sync.rs
//! Sync command - orchestrate full sync pipeline.
use serde::Serialize;
use crate::core::error::Result;
use crate::Config;
/// Sync result summary.
#[derive(Debug, Serialize)]
pub struct SyncResult {
pub issues_updated: usize,
pub mrs_updated: usize,
pub discussions_fetched: usize,
pub documents_regenerated: usize,
pub documents_embedded: usize,
}
/// Sync options.
#[derive(Debug, Default)]
pub struct SyncOptions {
pub full: bool, // Reset cursors, fetch everything
pub force: bool, // Override stale lock
pub no_embed: bool, // Skip embedding step
pub no_docs: bool, // Skip document regeneration
}
/// Run sync orchestration — the unified pipeline command.
///
/// `lore sync` is the primary command for keeping the search index up to date.
/// It wraps the existing ingestion orchestrator and adds document generation
/// and embedding as post-ingestion steps. Replaces `lore ingest` as the
/// recommended user-facing command for all data flow.
///
/// Individual commands (`lore generate-docs`, `lore embed`) still exist for
/// manual recovery and debugging use cases.
///
/// Steps (sequential):
/// 1. Acquire app lock with heartbeat (via existing `src/core/lock.rs`)
/// 2. Ingest delta: fetch issues + MRs via cursor-based sync
/// (calls existing ingestion orchestrator in `src/ingestion/orchestrator.rs`)
/// - Each upserted entity marked dirty via mark_dirty_tx(&tx) inside ingestion transaction
/// 3. Process pending_discussion_fetches queue (bounded)
/// - Discussion sweep uses CTE to capture stale IDs, then cascading deletes
/// 4. Regenerate documents from dirty_sources queue
/// (unless --no-docs)
/// 5. Embed documents with changed content_hash
/// (unless --no-embed; skipped gracefully if Ollama unavailable)
/// 6. Release lock, record sync_run
///
/// NOTE: Rolling backfill window removed — the existing cursor + watermark
/// design handles old issues with resumed activity. GitLab updates
/// `updated_at` when new comments are added, so the cursor naturally
/// picks up old issues that receive new activity.
pub async fn run_sync(config: &Config, options: SyncOptions) -> Result<SyncResult> {
// Implementation wraps existing ingestion orchestrator
// and chains document generation + embedding pipelines
todo!()
}
/// Print human-readable sync output.
pub fn print_sync(result: &SyncResult, elapsed_secs: u64) {
println!("Sync complete:");
println!(" Issues updated: {:>6}", result.issues_updated);
println!(" MRs updated: {:>6}", result.mrs_updated);
println!(" Discussions fetched: {:>6}", result.discussions_fetched);
println!(" Documents regenerated: {:>6}", result.documents_regenerated);
println!(" Documents embedded: {:>6}", result.documents_embedded);
println!(" Elapsed: {}m {}s", elapsed_secs / 60, elapsed_secs % 60);
}
/// Print JSON output for robot mode.
pub fn print_sync_json(result: &SyncResult, elapsed_ms: u64) {
let output = serde_json::json!({
"ok": true,
"data": result,
"meta": {
"elapsed_ms": elapsed_ms
}
});
println!("{}", serde_json::to_string_pretty(&output).unwrap());
}
CLI integration:
/// Sync subcommand arguments.
#[derive(Args)]
pub struct SyncArgs {
/// Reset cursors, fetch everything
#[arg(long)]
full: bool,
/// Override stale lock
#[arg(long)]
force: bool,
/// Skip embedding step
#[arg(long)]
no_embed: bool,
/// Skip document regeneration
#[arg(long)]
no_docs: bool,
}
Acceptance Criteria:
- Orchestrates full sync pipeline
- Respects app lock
--fullresets cursors--no-embedskips embedding--no-docsskips document regeneration- Progress reporting in human mode
- JSON summary in robot mode
Testing Strategy
Unit Tests
| Module | Test File | Coverage |
|---|---|---|
| Document extractor | src/documents/extractor.rs (mod tests) |
Issue/MR/discussion extraction, consistent headers |
| Truncation | src/documents/truncation.rs (mod tests) |
All edge cases |
| RRF ranking | src/search/rrf.rs (mod tests) |
Score computation, merging |
| Content hash | src/documents/extractor.rs (mod tests) |
Deterministic hashing |
| FTS query sanitization | src/search/fts.rs (mod tests) |
to_fts_query() edge cases: -, ", :, *, C++ |
| SourceType parsing | src/documents/extractor.rs (mod tests) |
parse() accepts aliases: mr, mrs, issue, etc. |
| SearchFilters | src/search/filters.rs (mod tests) |
has_any_filter(), clamp_limit() |
| Backoff logic | src/core/backoff.rs (mod tests) |
Shared exponential backoff curve, cap, jitter |
| Hydration | src/cli/commands/search.rs (mod tests) |
Single round-trip, label/path aggregation |
Integration Tests
| Feature | Test File | Coverage |
|---|---|---|
| FTS search | tests/fts_search.rs |
Stemming, empty results |
| Embedding storage | tests/embedding.rs |
sqlite-vec operations |
| Hybrid search | tests/hybrid_search.rs |
Combined retrieval |
| Sync orchestration | tests/sync.rs |
Full pipeline |
Golden Query Suite
File: tests/fixtures/golden_queries.json
[
{
"query": "authentication redesign",
"expected_urls": [".../-/issues/234", ".../-/merge_requests/847"],
"min_results": 1,
"max_rank": 10
}
]
Each query must have at least one expected URL in top 10 results.
Test fixture note: Golden query tests require a seeded SQLite database with known documents, FTS entries, and pre-computed embeddings. The test setup should insert deterministic fixture data (not call Ollama) so golden tests run in CI without external dependencies. Use fixed embedding vectors that produce known similarity distances.
CLI Smoke Tests
| Command | Expected | Pass Criteria |
|---|---|---|
lore generate-docs |
Progress, count | Completes, count > 0 |
lore generate-docs (re-run) |
0 regenerated | Hash comparison works |
lore embed |
Progress, count | Completes, count matches docs |
lore embed (re-run) |
0 embedded | Skips unchanged |
lore embed --retry-failed |
Processes failed | Only failed docs processed |
lore stats |
Coverage stats | Shows 100% after embed |
lore stats |
Queue depths | Shows dirty_sources and pending_discussion_fetches counts |
lore search "auth" --mode=lexical |
Results | Works without Ollama |
lore search "auth" |
Hybrid results | Vector + FTS combined |
lore search "auth" (Ollama down) |
FTS results + warning | Graceful degradation, warning in response |
lore search "auth" --explain |
Rank breakdown | Shows vector/FTS/RRF |
lore search "auth" --type=mr |
Filtered results | Only MRs |
lore search "auth" --type=mrs |
Filtered results | Alias works |
lore search "auth" --label=bug |
Filtered results | Only labeled docs |
lore search "-DWITH_SSL" |
Results | Leading dash doesn't cause FTS error |
lore search 'C++' |
Results | Special chars in query work |
lore search "auth" --updated-after 2024-01-01 |
Filtered results | Only recently updated docs |
lore search "nonexistent123" |
No results | Graceful empty state |
lore search "auth" --mode=semantic (no embeddings) |
Actionable error | Tells user to run lore embed first |
lore sync |
Full pipeline | All steps complete |
lore sync --no-embed |
Skip embedding | Docs generated, not embedded |
lore generate-docs --full |
Progress, count | Keyset pagination completes without OFFSET degradation |
lore stats --check --repair |
Repair results | FTS rebuilt, orphans cleaned |
Data Integrity Checks
documentscount = issues + MRs + discussionsdocuments_ftscount =documentscountembedding_metadatahas at least one row per document (after full embed)- All
embeddings.rowid / 1000map to validdocuments.id(SQLite integer division is floor-correct for integer operands) embedding_metadata.document_hash=documents.content_hashfor all chunk_index=0 rows- All
document_labelsreference valid documents - All
document_pathsreference valid documents - No orphaned embeddings (embeddings.rowid / 1000 without matching documents.id)
- Discussion documents exclude system notes
- Discussion documents include parent title
- All
dirty_sourcesentries reference existing source entities - All
pending_discussion_fetchesentries reference existing projects attempt_count>= 0 for all queue entries (never negative)last_attempt_atis NULL whenattempt_count= 0
Success Criteria
Checkpoint 3 is complete when all three gates pass:
Prerequisite: GiError → LoreError rename completed across codebase (16 files).
Gate A: Lexical MVP (no sqlite-vec dependency)
-
Lexical search works without Ollama
lore search "query" --mode=lexicalreturns relevant results- All filters functional (including
--updated-afterviatime::parse_since()) - FTS5 syntax errors prevented by query sanitization
- Special characters in queries work correctly (
-DWITH_SSL,C++) - Search results hydrated in single DB round-trip (no N+1)
- Project filter uses cascading resolution (exact → case-insensitive → suffix → error with suggestions)
-
Document generation is correct
- Full and incremental modes use the same regenerator codepath
--fulluses set-based INSERT...SELECT with keyset pagination (no OFFSET degradation, no N INSERTs)- FTS triggers use COALESCE for NULL-safe operation
- Extraction queries pull all relevant metadata (labels, DiffNote old+new paths, discussion URLs)
- Discussion URLs reconstructed from parent
web_url+#note_{first_note.gitlab_id} - Hard safety cap (2MB) prevents pathological content from bloating DB
- Triple-hash fast path skips all writes when content, labels, and paths are unchanged
Gate B: Hybrid MVP
-
Semantic search works with Ollama
- Gate B introduces sqlite-vec extension and migration 009
lore embedcompletes successfully- Chunked embeddings: long documents split at paragraph boundaries with overlap
lore search "query"returns semantically relevant results--explainshows ranking breakdown--mode=semanticwith 0% embedding coverage returnsLoreError::EmbeddingsNotBuilt(distinct fromOllamaUnavailable— tells user to runlore embedfirst)
-
Hybrid search combines both
- Documents appearing in both retrievers rank higher
- Vector search over-fetches 3x and deduplicates by document_id (best chunk per doc)
- Graceful degradation when Ollama unavailable (falls back to FTS)
- Transient embed failures don't fail the entire search
- Warning message included in response on degradation
- Embedding pipeline uses keyset pagination for consistent paging
Gate C: Sync MVP
-
Incremental sync is efficient
lore syncis the unified command (replaceslore ingest+lore generate-docs+lore embed)- Individual commands still exist for recovery/debugging
- Re-embedding only happens for changed documents (content_hash comparison)
- Progress visible during long syncs (via
indicatif) - Queue backoff actually prevents hot-loop retries (both queues set
next_attempt_at) - Shared backoff utility ensures consistent behavior across queues
- Dirty source marking uses
mark_dirty_tx(&tx, ...)inside ingestion transactions (enforced at API level) - Re-queuing resets backoff state (upsert, not INSERT OR IGNORE) so fresh updates aren't stuck behind stale backoff
- Both queues (dirty_sources, pending_discussion_fetches) use ON CONFLICT DO UPDATE consistently
- Queue draining is complete (bounded batch loop, not single-pass ceiling)
- Discussion sweep uses CTE to capture stale IDs before cascading deletes
-
Data integrity maintained
- All counts match between tables
- No orphaned records (embedding orphan check uses
rowid / 1000for chunked scheme — SQLite integer division is floor-correct) - Hashes consistent (embedding_metadata.document_hash matches documents.content_hash)
get_existing_hash()properly distinguishes "not found" from DB errors--repairuses FTSrebuildfor correct-by-construction repair--repairorphan deletion uses range-based cleanup for chunked embeddings--repairstale detection usesdocument_hash(notchunk_hash) for document-level staleness- Per-item transactions prevent partial label/path writes on crash
- Triple-hash fast path prevents WAL churn from unchanged documents
-
Observability
lore statsshows queue depths and failed item countslore statsembedding coverage is document-level (not chunk-level) with total chunk count- Failed items visible for operator intervention
- Deterministic ordering ensures consistent paging
-
Tests pass
- Unit tests for core algorithms (including FTS sanitization, shared backoff, hydration)
- Integration tests for pipelines
- Golden queries return expected results (seeded DB with fixed embedding vectors, no Ollama dependency)