Compare commits
6 Commits
39cb0cb087
...
a573d695d5
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a573d695d5 | ||
|
|
a855759bf8 | ||
|
|
f3f3560e0d | ||
|
|
2bfa4f1f8c | ||
|
|
8cf14fb69b | ||
|
|
c2036c64e9 |
1
build.rs
1
build.rs
@@ -7,4 +7,5 @@ fn main() {
|
||||
.unwrap_or_default();
|
||||
println!("cargo:rustc-env=GIT_HASH={}", hash.trim());
|
||||
println!("cargo:rerun-if-changed=.git/HEAD");
|
||||
println!("cargo:rerun-if-changed=.git/refs/heads");
|
||||
}
|
||||
|
||||
@@ -2,16 +2,17 @@ use console::style;
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::Config;
|
||||
use crate::core::db::create_connection;
|
||||
use crate::core::error::Result;
|
||||
use crate::core::db::{LATEST_SCHEMA_VERSION, create_connection, get_schema_version};
|
||||
use crate::core::error::{LoreError, Result};
|
||||
use crate::core::paths::get_db_path;
|
||||
use crate::core::shutdown::ShutdownSignal;
|
||||
use crate::embedding::ollama::{OllamaClient, OllamaConfig};
|
||||
use crate::embedding::pipeline::embed_documents;
|
||||
use crate::embedding::pipeline::{DEFAULT_EMBED_CONCURRENCY, embed_documents};
|
||||
|
||||
#[derive(Debug, Default, Serialize)]
|
||||
pub struct EmbedCommandResult {
|
||||
pub embedded: usize,
|
||||
pub docs_embedded: usize,
|
||||
pub chunks_embedded: usize,
|
||||
pub failed: usize,
|
||||
pub skipped: usize,
|
||||
}
|
||||
@@ -26,6 +27,18 @@ pub async fn run_embed(
|
||||
let db_path = get_db_path(config.storage.db_path.as_deref());
|
||||
let conn = create_connection(&db_path)?;
|
||||
|
||||
let schema_version = get_schema_version(&conn);
|
||||
if schema_version < LATEST_SCHEMA_VERSION {
|
||||
return Err(LoreError::MigrationFailed {
|
||||
version: schema_version,
|
||||
message: format!(
|
||||
"Database is at schema version {schema_version} but {LATEST_SCHEMA_VERSION} is required. \
|
||||
Run 'lore migrate' first."
|
||||
),
|
||||
source: None,
|
||||
});
|
||||
}
|
||||
|
||||
let ollama_config = OllamaConfig {
|
||||
base_url: config.embedding.base_url.clone(),
|
||||
model: config.embedding.model.clone(),
|
||||
@@ -43,18 +56,39 @@ pub async fn run_embed(
|
||||
COMMIT;",
|
||||
)?;
|
||||
} else if retry_failed {
|
||||
conn.execute(
|
||||
"UPDATE embedding_metadata SET last_error = NULL, attempt_count = 0
|
||||
WHERE last_error IS NOT NULL",
|
||||
[],
|
||||
// DELETE (not UPDATE) so the LEFT JOIN in find_pending_documents returns NULL,
|
||||
// making the doc appear pending again. UPDATE would leave a matching row that
|
||||
// still satisfies the config-param check, making the doc permanently invisible.
|
||||
conn.execute_batch(
|
||||
"BEGIN;
|
||||
DELETE FROM embeddings WHERE rowid / 1000 IN (
|
||||
SELECT DISTINCT document_id FROM embedding_metadata
|
||||
WHERE last_error IS NOT NULL
|
||||
);
|
||||
DELETE FROM embedding_metadata WHERE last_error IS NOT NULL;
|
||||
COMMIT;",
|
||||
)?;
|
||||
}
|
||||
|
||||
let model_name = &config.embedding.model;
|
||||
let result = embed_documents(&conn, &client, model_name, progress_callback, signal).await?;
|
||||
let concurrency = if config.embedding.concurrency > 0 {
|
||||
config.embedding.concurrency as usize
|
||||
} else {
|
||||
DEFAULT_EMBED_CONCURRENCY
|
||||
};
|
||||
let result = embed_documents(
|
||||
&conn,
|
||||
&client,
|
||||
model_name,
|
||||
concurrency,
|
||||
progress_callback,
|
||||
signal,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(EmbedCommandResult {
|
||||
embedded: result.embedded,
|
||||
docs_embedded: result.docs_embedded,
|
||||
chunks_embedded: result.chunks_embedded,
|
||||
failed: result.failed,
|
||||
skipped: result.skipped,
|
||||
})
|
||||
@@ -62,7 +96,10 @@ pub async fn run_embed(
|
||||
|
||||
pub fn print_embed(result: &EmbedCommandResult) {
|
||||
println!("{} Embedding complete", style("done").green().bold(),);
|
||||
println!(" Embedded: {}", result.embedded);
|
||||
println!(
|
||||
" Embedded: {} documents ({} chunks)",
|
||||
result.docs_embedded, result.chunks_embedded
|
||||
);
|
||||
if result.failed > 0 {
|
||||
println!(" Failed: {}", style(result.failed).red());
|
||||
}
|
||||
|
||||
@@ -241,7 +241,7 @@ pub async fn run_sync(
|
||||
});
|
||||
match run_embed(config, options.full, false, Some(embed_cb), signal).await {
|
||||
Ok(embed_result) => {
|
||||
result.documents_embedded = embed_result.embedded;
|
||||
result.documents_embedded = embed_result.docs_embedded;
|
||||
embed_bar.finish_and_clear();
|
||||
spinner.finish_and_clear();
|
||||
}
|
||||
|
||||
@@ -76,7 +76,9 @@ impl Cli {
|
||||
let args: Vec<String> = std::env::args().collect();
|
||||
args.iter()
|
||||
.any(|a| a == "--robot" || a == "-J" || a == "--json")
|
||||
|| std::env::var("LORE_ROBOT").is_ok()
|
||||
|| std::env::var("LORE_ROBOT")
|
||||
.ok()
|
||||
.is_some_and(|v| !v.is_empty() && v != "0" && v != "false")
|
||||
|| !std::io::stdout().is_terminal()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -53,7 +53,7 @@ impl ErrorCode {
|
||||
pub fn exit_code(&self) -> i32 {
|
||||
match self {
|
||||
Self::InternalError => 1,
|
||||
Self::ConfigNotFound => 2,
|
||||
Self::ConfigNotFound => 20,
|
||||
Self::ConfigInvalid => 3,
|
||||
Self::TokenNotSet => 4,
|
||||
Self::GitLabAuthFailed => 5,
|
||||
|
||||
@@ -19,11 +19,11 @@ impl ShutdownSignal {
|
||||
}
|
||||
|
||||
pub fn cancel(&self) {
|
||||
self.cancelled.store(true, Ordering::Relaxed);
|
||||
self.cancelled.store(true, Ordering::Release);
|
||||
}
|
||||
|
||||
pub fn is_cancelled(&self) -> bool {
|
||||
self.cancelled.load(Ordering::Relaxed)
|
||||
self.cancelled.load(Ordering::Acquire)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -95,38 +95,15 @@ fn regenerate_one(conn: &Connection, source_type: SourceType, source_id: i64) ->
|
||||
return Ok(true);
|
||||
};
|
||||
|
||||
let existing_hash = get_existing_hash(conn, source_type, source_id)?;
|
||||
let changed = existing_hash.as_ref() != Some(&doc.content_hash);
|
||||
|
||||
upsert_document(conn, &doc)?;
|
||||
|
||||
Ok(changed)
|
||||
upsert_document(conn, &doc)
|
||||
}
|
||||
|
||||
fn get_existing_hash(
|
||||
conn: &Connection,
|
||||
source_type: SourceType,
|
||||
source_id: i64,
|
||||
) -> Result<Option<String>> {
|
||||
let mut stmt = conn.prepare_cached(
|
||||
"SELECT content_hash FROM documents WHERE source_type = ?1 AND source_id = ?2",
|
||||
)?;
|
||||
|
||||
let hash: Option<String> = stmt
|
||||
.query_row(rusqlite::params![source_type.as_str(), source_id], |row| {
|
||||
row.get(0)
|
||||
})
|
||||
.optional()?;
|
||||
|
||||
Ok(hash)
|
||||
}
|
||||
|
||||
fn upsert_document(conn: &Connection, doc: &DocumentData) -> Result<()> {
|
||||
fn upsert_document(conn: &Connection, doc: &DocumentData) -> Result<bool> {
|
||||
conn.execute_batch("SAVEPOINT upsert_doc")?;
|
||||
match upsert_document_inner(conn, doc) {
|
||||
Ok(()) => {
|
||||
Ok(changed) => {
|
||||
conn.execute_batch("RELEASE upsert_doc")?;
|
||||
Ok(())
|
||||
Ok(changed)
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = conn.execute_batch("ROLLBACK TO upsert_doc; RELEASE upsert_doc");
|
||||
@@ -135,7 +112,7 @@ fn upsert_document(conn: &Connection, doc: &DocumentData) -> Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
fn upsert_document_inner(conn: &Connection, doc: &DocumentData) -> Result<()> {
|
||||
fn upsert_document_inner(conn: &Connection, doc: &DocumentData) -> Result<bool> {
|
||||
let existing: Option<(i64, String, String, String)> = conn
|
||||
.query_row(
|
||||
"SELECT id, content_hash, labels_hash, paths_hash FROM documents
|
||||
@@ -145,12 +122,17 @@ fn upsert_document_inner(conn: &Connection, doc: &DocumentData) -> Result<()> {
|
||||
)
|
||||
.optional()?;
|
||||
|
||||
let content_changed = match &existing {
|
||||
Some((_, old_content_hash, _, _)) => old_content_hash != &doc.content_hash,
|
||||
None => true,
|
||||
};
|
||||
|
||||
if let Some((_, ref old_content_hash, ref old_labels_hash, ref old_paths_hash)) = existing
|
||||
&& old_content_hash == &doc.content_hash
|
||||
&& old_labels_hash == &doc.labels_hash
|
||||
&& old_paths_hash == &doc.paths_hash
|
||||
{
|
||||
return Ok(());
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let labels_json = serde_json::to_string(&doc.labels).unwrap_or_else(|_| "[]".to_string());
|
||||
@@ -260,7 +242,7 @@ fn upsert_document_inner(conn: &Connection, doc: &DocumentData) -> Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(content_changed)
|
||||
}
|
||||
|
||||
fn delete_document(conn: &Connection, source_type: SourceType, source_id: i64) -> Result<()> {
|
||||
|
||||
@@ -83,3 +83,148 @@ pub fn count_pending_documents(conn: &Connection, model_name: &str) -> Result<i6
|
||||
)?;
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::path::Path;
|
||||
|
||||
use super::*;
|
||||
use crate::core::db::{create_connection, run_migrations};
|
||||
use crate::embedding::pipeline::record_embedding_error;
|
||||
|
||||
const MODEL: &str = "nomic-embed-text";
|
||||
|
||||
fn setup_db() -> Connection {
|
||||
let conn = create_connection(Path::new(":memory:")).unwrap();
|
||||
run_migrations(&conn).unwrap();
|
||||
conn
|
||||
}
|
||||
|
||||
fn insert_test_project(conn: &Connection) -> i64 {
|
||||
conn.execute(
|
||||
"INSERT INTO projects (gitlab_project_id, path_with_namespace, web_url)
|
||||
VALUES (1, 'group/test', 'https://gitlab.example.com/group/test')",
|
||||
[],
|
||||
)
|
||||
.unwrap();
|
||||
conn.last_insert_rowid()
|
||||
}
|
||||
|
||||
fn insert_test_document(conn: &Connection, project_id: i64, content: &str) -> i64 {
|
||||
conn.execute(
|
||||
"INSERT INTO documents (source_type, source_id, project_id, content_text, content_hash)
|
||||
VALUES ('issue', 1, ?1, ?2, 'hash123')",
|
||||
rusqlite::params![project_id, content],
|
||||
)
|
||||
.unwrap();
|
||||
conn.last_insert_rowid()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn retry_failed_delete_makes_doc_pending_again() {
|
||||
let conn = setup_db();
|
||||
let proj_id = insert_test_project(&conn);
|
||||
let doc_id = insert_test_document(&conn, proj_id, "some text content");
|
||||
|
||||
// Doc starts as pending
|
||||
let pending = find_pending_documents(&conn, 100, 0, MODEL).unwrap();
|
||||
assert_eq!(pending.len(), 1, "Doc should be pending initially");
|
||||
|
||||
// Record an error — doc should no longer be pending
|
||||
record_embedding_error(
|
||||
&conn,
|
||||
doc_id,
|
||||
0,
|
||||
"hash123",
|
||||
"chunkhash",
|
||||
MODEL,
|
||||
"test error",
|
||||
)
|
||||
.unwrap();
|
||||
let pending = find_pending_documents(&conn, 100, 0, MODEL).unwrap();
|
||||
assert!(
|
||||
pending.is_empty(),
|
||||
"Doc with error metadata should not be pending"
|
||||
);
|
||||
|
||||
// DELETE error rows (mimicking --retry-failed) — doc should become pending again
|
||||
conn.execute_batch(
|
||||
"DELETE FROM embeddings WHERE rowid / 1000 IN (
|
||||
SELECT DISTINCT document_id FROM embedding_metadata
|
||||
WHERE last_error IS NOT NULL
|
||||
);
|
||||
DELETE FROM embedding_metadata WHERE last_error IS NOT NULL;",
|
||||
)
|
||||
.unwrap();
|
||||
let pending = find_pending_documents(&conn, 100, 0, MODEL).unwrap();
|
||||
assert_eq!(pending.len(), 1, "Doc should be pending again after DELETE");
|
||||
assert_eq!(pending[0].document_id, doc_id);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn empty_doc_with_error_not_pending() {
|
||||
let conn = setup_db();
|
||||
let proj_id = insert_test_project(&conn);
|
||||
let doc_id = insert_test_document(&conn, proj_id, "");
|
||||
|
||||
// Empty doc starts as pending
|
||||
let pending = find_pending_documents(&conn, 100, 0, MODEL).unwrap();
|
||||
assert_eq!(pending.len(), 1, "Empty doc should be pending initially");
|
||||
|
||||
// Record an error for the empty doc
|
||||
record_embedding_error(
|
||||
&conn,
|
||||
doc_id,
|
||||
0,
|
||||
"hash123",
|
||||
"empty",
|
||||
MODEL,
|
||||
"Document has empty content",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Should no longer be pending
|
||||
let pending = find_pending_documents(&conn, 100, 0, MODEL).unwrap();
|
||||
assert!(
|
||||
pending.is_empty(),
|
||||
"Empty doc with error metadata should not be pending"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn old_update_approach_leaves_doc_invisible() {
|
||||
// This test demonstrates WHY we use DELETE instead of UPDATE.
|
||||
// UPDATE clears last_error but the row still matches config params,
|
||||
// so the doc stays "not pending" — permanently invisible.
|
||||
let conn = setup_db();
|
||||
let proj_id = insert_test_project(&conn);
|
||||
let doc_id = insert_test_document(&conn, proj_id, "some text content");
|
||||
|
||||
// Record an error
|
||||
record_embedding_error(
|
||||
&conn,
|
||||
doc_id,
|
||||
0,
|
||||
"hash123",
|
||||
"chunkhash",
|
||||
MODEL,
|
||||
"test error",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Old approach: UPDATE to clear error
|
||||
conn.execute(
|
||||
"UPDATE embedding_metadata SET last_error = NULL, attempt_count = 0
|
||||
WHERE last_error IS NOT NULL",
|
||||
[],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Doc is NOT pending — it's permanently invisible! This is the bug.
|
||||
let pending = find_pending_documents(&conn, 100, 0, MODEL).unwrap();
|
||||
assert!(
|
||||
pending.is_empty(),
|
||||
"UPDATE approach leaves doc invisible (this proves the bug)"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,9 +41,19 @@ pub fn split_into_chunks(content: &str) -> Vec<(usize, String)> {
|
||||
split_at
|
||||
}
|
||||
.max(1);
|
||||
let old_start = start;
|
||||
start += advance;
|
||||
// Ensure start lands on a char boundary after overlap subtraction
|
||||
start = floor_char_boundary(content, start);
|
||||
// Guarantee forward progress: multi-byte chars can cause
|
||||
// floor_char_boundary to round back to old_start
|
||||
if start <= old_start {
|
||||
start = old_start
|
||||
+ content[old_start..]
|
||||
.chars()
|
||||
.next()
|
||||
.map_or(1, |c| c.len_utf8());
|
||||
}
|
||||
chunk_index += 1;
|
||||
}
|
||||
|
||||
@@ -219,4 +229,105 @@ mod tests {
|
||||
let chunks = split_into_chunks(&content);
|
||||
assert!(chunks.len() >= 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_box_drawing_heavy_content() {
|
||||
// Simulates a document with many box-drawing characters (3-byte UTF-8)
|
||||
// like the ─ (U+2500) character found in markdown tables
|
||||
let mut content = String::new();
|
||||
// Normal text header
|
||||
content.push_str("# Title\n\nSome description text.\n\n");
|
||||
// Table header with box drawing
|
||||
content.push('┌');
|
||||
for _ in 0..200 {
|
||||
content.push('─');
|
||||
}
|
||||
content.push('┬');
|
||||
for _ in 0..200 {
|
||||
content.push('─');
|
||||
}
|
||||
content.push_str("┐\n"); // clippy: push_str is correct here (multi-char)
|
||||
// Table rows
|
||||
for row in 0..50 {
|
||||
content.push_str(&format!("│ row {:<194}│ data {:<193}│\n", row, row));
|
||||
content.push('├');
|
||||
for _ in 0..200 {
|
||||
content.push('─');
|
||||
}
|
||||
content.push('┼');
|
||||
for _ in 0..200 {
|
||||
content.push('─');
|
||||
}
|
||||
content.push_str("┤\n"); // push_str for multi-char
|
||||
}
|
||||
content.push('└');
|
||||
for _ in 0..200 {
|
||||
content.push('─');
|
||||
}
|
||||
content.push('┴');
|
||||
for _ in 0..200 {
|
||||
content.push('─');
|
||||
}
|
||||
content.push_str("┘\n"); // push_str for multi-char
|
||||
|
||||
eprintln!(
|
||||
"Content size: {} bytes, {} chars",
|
||||
content.len(),
|
||||
content.chars().count()
|
||||
);
|
||||
let start = std::time::Instant::now();
|
||||
let chunks = split_into_chunks(&content);
|
||||
let elapsed = start.elapsed();
|
||||
eprintln!(
|
||||
"Chunking took {:?}, produced {} chunks",
|
||||
elapsed,
|
||||
chunks.len()
|
||||
);
|
||||
|
||||
// Should complete in reasonable time
|
||||
assert!(
|
||||
elapsed.as_secs() < 5,
|
||||
"Chunking took too long: {:?}",
|
||||
elapsed
|
||||
);
|
||||
assert!(!chunks.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_real_doc_18526_pattern() {
|
||||
// Reproduce exact pattern: long lines of ─ (3 bytes each, no spaces)
|
||||
// followed by newlines, creating a pattern where chunk windows
|
||||
// land in spaceless regions
|
||||
let mut content = String::new();
|
||||
content.push_str("Header text with spaces\n\n");
|
||||
// Create a very long line of ─ chars (2000+ bytes, exceeding CHUNK_MAX_BYTES)
|
||||
for _ in 0..800 {
|
||||
content.push('─'); // 3 bytes each = 2400 bytes
|
||||
}
|
||||
content.push('\n');
|
||||
content.push_str("Some more text.\n\n");
|
||||
// Another long run
|
||||
for _ in 0..800 {
|
||||
content.push('─');
|
||||
}
|
||||
content.push('\n');
|
||||
content.push_str("End text.\n");
|
||||
|
||||
eprintln!("Content size: {} bytes", content.len());
|
||||
let start = std::time::Instant::now();
|
||||
let chunks = split_into_chunks(&content);
|
||||
let elapsed = start.elapsed();
|
||||
eprintln!(
|
||||
"Chunking took {:?}, produced {} chunks",
|
||||
elapsed,
|
||||
chunks.len()
|
||||
);
|
||||
|
||||
assert!(
|
||||
elapsed.as_secs() < 2,
|
||||
"Chunking took too long: {:?}",
|
||||
elapsed
|
||||
);
|
||||
assert!(!chunks.is_empty());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
use std::collections::HashSet;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
use futures::future::join_all;
|
||||
use rusqlite::Connection;
|
||||
use sha2::{Digest, Sha256};
|
||||
use tracing::{info, instrument, warn};
|
||||
use tracing::{debug, info, instrument, warn};
|
||||
|
||||
use crate::core::error::Result;
|
||||
use crate::core::shutdown::ShutdownSignal;
|
||||
@@ -14,11 +14,12 @@ use crate::embedding::ollama::OllamaClient;
|
||||
|
||||
const BATCH_SIZE: usize = 32;
|
||||
const DB_PAGE_SIZE: usize = 500;
|
||||
const EMBED_CONCURRENCY: usize = 2;
|
||||
pub const DEFAULT_EMBED_CONCURRENCY: usize = 4;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct EmbedResult {
|
||||
pub embedded: usize,
|
||||
pub chunks_embedded: usize,
|
||||
pub docs_embedded: usize,
|
||||
pub failed: usize,
|
||||
pub skipped: usize,
|
||||
}
|
||||
@@ -37,6 +38,7 @@ pub async fn embed_documents(
|
||||
conn: &Connection,
|
||||
client: &OllamaClient,
|
||||
model_name: &str,
|
||||
concurrency: usize,
|
||||
progress_callback: Option<Box<dyn Fn(usize, usize)>>,
|
||||
signal: &ShutdownSignal,
|
||||
) -> Result<EmbedResult> {
|
||||
@@ -57,16 +59,22 @@ pub async fn embed_documents(
|
||||
break;
|
||||
}
|
||||
|
||||
info!(last_id, "Querying pending documents...");
|
||||
let pending = find_pending_documents(conn, DB_PAGE_SIZE, last_id, model_name)?;
|
||||
if pending.is_empty() {
|
||||
break;
|
||||
}
|
||||
info!(
|
||||
count = pending.len(),
|
||||
"Found pending documents, starting page"
|
||||
);
|
||||
|
||||
conn.execute_batch("SAVEPOINT embed_page")?;
|
||||
let page_result = embed_page(
|
||||
conn,
|
||||
client,
|
||||
model_name,
|
||||
concurrency,
|
||||
&pending,
|
||||
&mut result,
|
||||
&mut last_id,
|
||||
@@ -77,8 +85,20 @@ pub async fn embed_documents(
|
||||
)
|
||||
.await;
|
||||
match page_result {
|
||||
Ok(()) if signal.is_cancelled() => {
|
||||
let _ = conn.execute_batch("ROLLBACK TO embed_page; RELEASE embed_page");
|
||||
info!("Rolled back incomplete page to preserve data integrity");
|
||||
}
|
||||
Ok(()) => {
|
||||
conn.execute_batch("RELEASE embed_page")?;
|
||||
let _ = conn.execute_batch("PRAGMA wal_checkpoint(PASSIVE)");
|
||||
info!(
|
||||
chunks_embedded = result.chunks_embedded,
|
||||
failed = result.failed,
|
||||
skipped = result.skipped,
|
||||
total,
|
||||
"Page complete"
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = conn.execute_batch("ROLLBACK TO embed_page; RELEASE embed_page");
|
||||
@@ -88,13 +108,13 @@ pub async fn embed_documents(
|
||||
}
|
||||
|
||||
info!(
|
||||
embedded = result.embedded,
|
||||
chunks_embedded = result.chunks_embedded,
|
||||
failed = result.failed,
|
||||
skipped = result.skipped,
|
||||
"Embedding pipeline complete"
|
||||
);
|
||||
|
||||
tracing::Span::current().record("items_processed", result.embedded);
|
||||
tracing::Span::current().record("items_processed", result.chunks_embedded);
|
||||
tracing::Span::current().record("items_skipped", result.skipped);
|
||||
tracing::Span::current().record("errors", result.failed);
|
||||
|
||||
@@ -106,6 +126,7 @@ async fn embed_page(
|
||||
conn: &Connection,
|
||||
client: &OllamaClient,
|
||||
model_name: &str,
|
||||
concurrency: usize,
|
||||
pending: &[crate::embedding::change_detector::PendingDocument],
|
||||
result: &mut EmbedResult,
|
||||
last_id: &mut i64,
|
||||
@@ -116,17 +137,50 @@ async fn embed_page(
|
||||
) -> Result<()> {
|
||||
let mut all_chunks: Vec<ChunkWork> = Vec::with_capacity(pending.len() * 3);
|
||||
let mut page_normal_docs: usize = 0;
|
||||
let mut chunks_needed: HashMap<i64, usize> = HashMap::with_capacity(pending.len());
|
||||
let mut chunks_stored: HashMap<i64, usize> = HashMap::with_capacity(pending.len());
|
||||
|
||||
debug!(count = pending.len(), "Starting chunking loop");
|
||||
for doc in pending {
|
||||
*last_id = doc.document_id;
|
||||
|
||||
if doc.content_text.is_empty() {
|
||||
record_embedding_error(
|
||||
conn,
|
||||
doc.document_id,
|
||||
0,
|
||||
&doc.content_hash,
|
||||
"empty",
|
||||
model_name,
|
||||
"Document has empty content",
|
||||
)?;
|
||||
result.skipped += 1;
|
||||
*processed += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
if page_normal_docs.is_multiple_of(50) {
|
||||
debug!(
|
||||
doc_id = doc.document_id,
|
||||
doc_num = page_normal_docs,
|
||||
content_bytes = doc.content_text.len(),
|
||||
"Chunking document"
|
||||
);
|
||||
}
|
||||
if page_normal_docs.is_multiple_of(100) {
|
||||
info!(
|
||||
doc_id = doc.document_id,
|
||||
content_bytes = doc.content_text.len(),
|
||||
docs_so_far = page_normal_docs,
|
||||
"Chunking document"
|
||||
);
|
||||
}
|
||||
let chunks = split_into_chunks(&doc.content_text);
|
||||
debug!(
|
||||
doc_id = doc.document_id,
|
||||
chunk_count = chunks.len(),
|
||||
"Chunked"
|
||||
);
|
||||
let total_chunks = chunks.len();
|
||||
|
||||
if total_chunks as i64 > CHUNK_ROWID_MULTIPLIER {
|
||||
@@ -156,6 +210,8 @@ async fn embed_page(
|
||||
continue;
|
||||
}
|
||||
|
||||
chunks_needed.insert(doc.document_id, total_chunks);
|
||||
|
||||
for (chunk_index, text) in chunks {
|
||||
all_chunks.push(ChunkWork {
|
||||
doc_id: doc.document_id,
|
||||
@@ -170,12 +226,30 @@ async fn embed_page(
|
||||
page_normal_docs += 1;
|
||||
}
|
||||
|
||||
debug!(total_chunks = all_chunks.len(), "Chunking loop done");
|
||||
|
||||
let mut cleared_docs: HashSet<i64> = HashSet::with_capacity(pending.len());
|
||||
let mut embed_buf: Vec<u8> = Vec::with_capacity(EXPECTED_DIMS * 4);
|
||||
|
||||
// Split chunks into batches, then process batches in concurrent groups
|
||||
let batches: Vec<&[ChunkWork]> = all_chunks.chunks(BATCH_SIZE).collect();
|
||||
debug!(
|
||||
batches = batches.len(),
|
||||
concurrency, "Starting Ollama requests"
|
||||
);
|
||||
info!(
|
||||
chunks = all_chunks.len(),
|
||||
batches = batches.len(),
|
||||
docs = page_normal_docs,
|
||||
"Chunking complete, starting Ollama requests"
|
||||
);
|
||||
|
||||
for concurrent_group in batches.chunks(EMBED_CONCURRENCY) {
|
||||
info!(
|
||||
batches = batches.len(),
|
||||
concurrency, "About to start Ollama request loop"
|
||||
);
|
||||
for (group_idx, concurrent_group) in batches.chunks(concurrency).enumerate() {
|
||||
debug!(group_idx, "Starting concurrent group");
|
||||
if signal.is_cancelled() {
|
||||
info!("Shutdown requested during embedding, stopping mid-page");
|
||||
break;
|
||||
@@ -193,6 +267,11 @@ async fn embed_page(
|
||||
.map(|texts| client.embed_batch(texts))
|
||||
.collect();
|
||||
let api_results = join_all(futures).await;
|
||||
debug!(
|
||||
group_idx,
|
||||
results = api_results.len(),
|
||||
"Ollama group complete"
|
||||
);
|
||||
|
||||
// Phase 3: Serial DB writes for each batch result
|
||||
for (batch, api_result) in concurrent_group.iter().zip(api_results) {
|
||||
@@ -243,8 +322,35 @@ async fn embed_page(
|
||||
model_name,
|
||||
embedding,
|
||||
chunk.total_chunks,
|
||||
&mut embed_buf,
|
||||
)?;
|
||||
result.embedded += 1;
|
||||
result.chunks_embedded += 1;
|
||||
*chunks_stored.entry(chunk.doc_id).or_insert(0) += 1;
|
||||
}
|
||||
|
||||
// Record errors for chunks that Ollama silently dropped
|
||||
if embeddings.len() < batch.len() {
|
||||
warn!(
|
||||
returned = embeddings.len(),
|
||||
expected = batch.len(),
|
||||
"Ollama returned fewer embeddings than inputs"
|
||||
);
|
||||
for chunk in &batch[embeddings.len()..] {
|
||||
record_embedding_error(
|
||||
conn,
|
||||
chunk.doc_id,
|
||||
chunk.chunk_index,
|
||||
&chunk.doc_hash,
|
||||
&chunk.chunk_hash,
|
||||
model_name,
|
||||
&format!(
|
||||
"Batch mismatch: got {} of {} embeddings",
|
||||
embeddings.len(),
|
||||
batch.len()
|
||||
),
|
||||
)?;
|
||||
result.failed += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -281,15 +387,24 @@ async fn embed_page(
|
||||
model_name,
|
||||
&embeddings[0],
|
||||
chunk.total_chunks,
|
||||
&mut embed_buf,
|
||||
)?;
|
||||
result.embedded += 1;
|
||||
result.chunks_embedded += 1;
|
||||
*chunks_stored.entry(chunk.doc_id).or_insert(0) += 1;
|
||||
}
|
||||
_ => {
|
||||
Ok(embeddings) => {
|
||||
let got_dims = embeddings.first().map_or(0, std::vec::Vec::len);
|
||||
let reason = format!(
|
||||
"Retry failed: got {} embeddings, first has {} dims (expected {})",
|
||||
embeddings.len(),
|
||||
got_dims,
|
||||
EXPECTED_DIMS
|
||||
);
|
||||
warn!(
|
||||
doc_id = chunk.doc_id,
|
||||
chunk_index = chunk.chunk_index,
|
||||
chunk_bytes = chunk.text.len(),
|
||||
"Chunk too large for model context window"
|
||||
%reason,
|
||||
"Chunk retry returned unexpected result"
|
||||
);
|
||||
record_embedding_error(
|
||||
conn,
|
||||
@@ -298,7 +413,27 @@ async fn embed_page(
|
||||
&chunk.doc_hash,
|
||||
&chunk.chunk_hash,
|
||||
model_name,
|
||||
"Chunk exceeds model context window",
|
||||
&reason,
|
||||
)?;
|
||||
result.failed += 1;
|
||||
}
|
||||
Err(retry_err) => {
|
||||
let reason = format!("Retry failed: {}", retry_err);
|
||||
warn!(
|
||||
doc_id = chunk.doc_id,
|
||||
chunk_index = chunk.chunk_index,
|
||||
chunk_bytes = chunk.text.len(),
|
||||
error = %retry_err,
|
||||
"Chunk retry request failed"
|
||||
);
|
||||
record_embedding_error(
|
||||
conn,
|
||||
chunk.doc_id,
|
||||
chunk.chunk_index,
|
||||
&chunk.doc_hash,
|
||||
&chunk.chunk_hash,
|
||||
model_name,
|
||||
&reason,
|
||||
)?;
|
||||
result.failed += 1;
|
||||
}
|
||||
@@ -324,6 +459,13 @@ async fn embed_page(
|
||||
}
|
||||
}
|
||||
|
||||
// Count docs where all chunks were successfully stored
|
||||
for (doc_id, needed) in &chunks_needed {
|
||||
if chunks_stored.get(doc_id).copied().unwrap_or(0) == *needed {
|
||||
result.docs_embedded += 1;
|
||||
}
|
||||
}
|
||||
|
||||
*processed += page_normal_docs;
|
||||
if let Some(cb) = progress_callback {
|
||||
cb(*processed, total);
|
||||
@@ -333,17 +475,13 @@ async fn embed_page(
|
||||
}
|
||||
|
||||
fn clear_document_embeddings(conn: &Connection, document_id: i64) -> Result<()> {
|
||||
conn.execute(
|
||||
"DELETE FROM embedding_metadata WHERE document_id = ?1",
|
||||
[document_id],
|
||||
)?;
|
||||
conn.prepare_cached("DELETE FROM embedding_metadata WHERE document_id = ?1")?
|
||||
.execute([document_id])?;
|
||||
|
||||
let start_rowid = encode_rowid(document_id, 0);
|
||||
let end_rowid = encode_rowid(document_id + 1, 0);
|
||||
conn.execute(
|
||||
"DELETE FROM embeddings WHERE rowid >= ?1 AND rowid < ?2",
|
||||
rusqlite::params![start_rowid, end_rowid],
|
||||
)?;
|
||||
conn.prepare_cached("DELETE FROM embeddings WHERE rowid >= ?1 AND rowid < ?2")?
|
||||
.execute(rusqlite::params![start_rowid, end_rowid])?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -358,18 +496,18 @@ fn store_embedding(
|
||||
model_name: &str,
|
||||
embedding: &[f32],
|
||||
total_chunks: usize,
|
||||
embed_buf: &mut Vec<u8>,
|
||||
) -> Result<()> {
|
||||
let rowid = encode_rowid(doc_id, chunk_index as i64);
|
||||
|
||||
let mut embedding_bytes = Vec::with_capacity(embedding.len() * 4);
|
||||
embed_buf.clear();
|
||||
embed_buf.reserve(embedding.len() * 4);
|
||||
for f in embedding {
|
||||
embedding_bytes.extend_from_slice(&f.to_le_bytes());
|
||||
embed_buf.extend_from_slice(&f.to_le_bytes());
|
||||
}
|
||||
|
||||
conn.execute(
|
||||
"INSERT OR REPLACE INTO embeddings (rowid, embedding) VALUES (?1, ?2)",
|
||||
rusqlite::params![rowid, embedding_bytes],
|
||||
)?;
|
||||
conn.prepare_cached("INSERT OR REPLACE INTO embeddings (rowid, embedding) VALUES (?1, ?2)")?
|
||||
.execute(rusqlite::params![rowid, &embed_buf[..]])?;
|
||||
|
||||
let chunk_count: Option<i64> = if chunk_index == 0 {
|
||||
Some(total_chunks as i64)
|
||||
@@ -378,12 +516,13 @@ fn store_embedding(
|
||||
};
|
||||
|
||||
let now = chrono::Utc::now().timestamp_millis();
|
||||
conn.execute(
|
||||
conn.prepare_cached(
|
||||
"INSERT OR REPLACE INTO embedding_metadata
|
||||
(document_id, chunk_index, model, dims, document_hash, chunk_hash,
|
||||
created_at, attempt_count, last_error, chunk_max_bytes, chunk_count)
|
||||
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, 1, NULL, ?8, ?9)",
|
||||
rusqlite::params![
|
||||
)?
|
||||
.execute(rusqlite::params![
|
||||
doc_id,
|
||||
chunk_index as i64,
|
||||
model_name,
|
||||
@@ -393,13 +532,12 @@ fn store_embedding(
|
||||
now,
|
||||
CHUNK_MAX_BYTES as i64,
|
||||
chunk_count
|
||||
],
|
||||
)?;
|
||||
])?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn record_embedding_error(
|
||||
pub(crate) fn record_embedding_error(
|
||||
conn: &Connection,
|
||||
doc_id: i64,
|
||||
chunk_index: usize,
|
||||
@@ -409,7 +547,7 @@ fn record_embedding_error(
|
||||
error: &str,
|
||||
) -> Result<()> {
|
||||
let now = chrono::Utc::now().timestamp_millis();
|
||||
conn.execute(
|
||||
conn.prepare_cached(
|
||||
"INSERT INTO embedding_metadata
|
||||
(document_id, chunk_index, model, dims, document_hash, chunk_hash,
|
||||
created_at, attempt_count, last_error, last_attempt_at, chunk_max_bytes)
|
||||
@@ -419,7 +557,8 @@ fn record_embedding_error(
|
||||
last_error = ?8,
|
||||
last_attempt_at = ?7,
|
||||
chunk_max_bytes = ?9",
|
||||
rusqlite::params![
|
||||
)?
|
||||
.execute(rusqlite::params![
|
||||
doc_id,
|
||||
chunk_index as i64,
|
||||
model_name,
|
||||
@@ -429,8 +568,7 @@ fn record_embedding_error(
|
||||
now,
|
||||
error,
|
||||
CHUNK_MAX_BYTES as i64
|
||||
],
|
||||
)?;
|
||||
])?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -131,13 +131,11 @@ pub async fn ingest_project_issues_with_progress(
|
||||
|
||||
let issues_needing_sync = issue_result.issues_needing_discussion_sync;
|
||||
|
||||
let total_issues: i64 = conn
|
||||
.query_row(
|
||||
let total_issues: i64 = conn.query_row(
|
||||
"SELECT COUNT(*) FROM issues WHERE project_id = ?",
|
||||
[project_id],
|
||||
|row| row.get(0),
|
||||
)
|
||||
.unwrap_or(0);
|
||||
)?;
|
||||
let total_issues = total_issues as usize;
|
||||
result.issues_skipped_discussion_sync = total_issues.saturating_sub(issues_needing_sync.len());
|
||||
|
||||
@@ -346,13 +344,11 @@ pub async fn ingest_project_merge_requests_with_progress(
|
||||
|
||||
let mrs_needing_sync = get_mrs_needing_discussion_sync(conn, project_id)?;
|
||||
|
||||
let total_mrs: i64 = conn
|
||||
.query_row(
|
||||
let total_mrs: i64 = conn.query_row(
|
||||
"SELECT COUNT(*) FROM merge_requests WHERE project_id = ?",
|
||||
[project_id],
|
||||
|row| row.get(0),
|
||||
)
|
||||
.unwrap_or(0);
|
||||
)?;
|
||||
let total_mrs = total_mrs as usize;
|
||||
result.mrs_skipped_discussion_sync = total_mrs.saturating_sub(mrs_needing_sync.len());
|
||||
|
||||
@@ -573,7 +569,12 @@ fn enqueue_resource_events_for_entity_type(
|
||||
[project_id],
|
||||
)?;
|
||||
}
|
||||
_ => {}
|
||||
other => {
|
||||
warn!(
|
||||
entity_type = other,
|
||||
"Unknown entity_type in enqueue_resource_events, skipping stale job cleanup"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let entities: Vec<(i64, i64)> = match entity_type {
|
||||
@@ -900,7 +901,12 @@ fn update_resource_event_watermark_tx(
|
||||
[entity_local_id],
|
||||
)?;
|
||||
}
|
||||
_ => {}
|
||||
other => {
|
||||
warn!(
|
||||
entity_type = other,
|
||||
"Unknown entity_type in watermark update, skipping"
|
||||
);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -1138,6 +1144,8 @@ fn store_closes_issues_refs(
|
||||
mr_local_id: i64,
|
||||
closes_issues: &[crate::gitlab::types::GitLabIssueRef],
|
||||
) -> Result<()> {
|
||||
conn.execute_batch("SAVEPOINT store_closes_refs")?;
|
||||
let inner = || -> Result<()> {
|
||||
for issue_ref in closes_issues {
|
||||
let target_local_id = resolve_issue_local_id(conn, project_id, issue_ref.iid)?;
|
||||
|
||||
@@ -1164,8 +1172,18 @@ fn store_closes_issues_refs(
|
||||
|
||||
insert_entity_reference(conn, &ref_)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
};
|
||||
match inner() {
|
||||
Ok(()) => {
|
||||
conn.execute_batch("RELEASE store_closes_refs")?;
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = conn.execute_batch("ROLLBACK TO store_closes_refs; RELEASE store_closes_refs");
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
17
src/main.rs
17
src/main.rs
@@ -134,7 +134,9 @@ async fn main() {
|
||||
"never" => console::set_colors_enabled(false),
|
||||
"always" => console::set_colors_enabled(true),
|
||||
"auto" => {}
|
||||
_ => unreachable!(),
|
||||
other => {
|
||||
eprintln!("Warning: unknown color mode '{}', using auto", other);
|
||||
}
|
||||
}
|
||||
|
||||
let quiet = cli.quiet;
|
||||
@@ -664,7 +666,10 @@ async fn handle_ingest(
|
||||
let signal_for_handler = signal.clone();
|
||||
tokio::spawn(async move {
|
||||
let _ = tokio::signal::ctrl_c().await;
|
||||
eprintln!("\nInterrupted, finishing current batch... (Ctrl+C again to force quit)");
|
||||
signal_for_handler.cancel();
|
||||
let _ = tokio::signal::ctrl_c().await;
|
||||
std::process::exit(130);
|
||||
});
|
||||
|
||||
let ingest_result: std::result::Result<(), Box<dyn std::error::Error>> = async {
|
||||
@@ -1264,7 +1269,9 @@ fn handle_completions(shell: &str) -> Result<(), Box<dyn std::error::Error>> {
|
||||
"zsh" => Shell::Zsh,
|
||||
"fish" => Shell::Fish,
|
||||
"powershell" => Shell::PowerShell,
|
||||
_ => unreachable!(),
|
||||
other => {
|
||||
return Err(format!("Unsupported shell: {other}").into());
|
||||
}
|
||||
};
|
||||
|
||||
let mut cmd = Cli::command();
|
||||
@@ -1522,7 +1529,10 @@ async fn handle_embed(
|
||||
let signal_for_handler = signal.clone();
|
||||
tokio::spawn(async move {
|
||||
let _ = tokio::signal::ctrl_c().await;
|
||||
eprintln!("\nInterrupted, finishing current batch... (Ctrl+C again to force quit)");
|
||||
signal_for_handler.cancel();
|
||||
let _ = tokio::signal::ctrl_c().await;
|
||||
std::process::exit(130);
|
||||
});
|
||||
|
||||
let result = run_embed(&config, full, retry_failed, None, &signal).await?;
|
||||
@@ -1573,7 +1583,10 @@ async fn handle_sync_cmd(
|
||||
let signal_for_handler = signal.clone();
|
||||
tokio::spawn(async move {
|
||||
let _ = tokio::signal::ctrl_c().await;
|
||||
eprintln!("\nInterrupted, finishing current batch... (Ctrl+C again to force quit)");
|
||||
signal_for_handler.cancel();
|
||||
let _ = tokio::signal::ctrl_c().await;
|
||||
std::process::exit(130);
|
||||
});
|
||||
|
||||
let start = std::time::Instant::now();
|
||||
|
||||
@@ -14,9 +14,38 @@ pub struct FtsResult {
|
||||
pub snippet: String,
|
||||
}
|
||||
|
||||
/// Validate an FTS5 query string for safety.
|
||||
/// Rejects known-dangerous patterns: unbalanced quotes, excessive wildcards,
|
||||
/// and empty queries. Returns the sanitized query or falls back to Safe mode.
|
||||
fn sanitize_raw_fts(raw: &str) -> Option<String> {
|
||||
let trimmed = raw.trim();
|
||||
if trimmed.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Reject unbalanced double quotes (FTS5 syntax error)
|
||||
let quote_count = trimmed.chars().filter(|&c| c == '"').count();
|
||||
if quote_count % 2 != 0 {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Reject leading wildcard-only queries (expensive full-table scan)
|
||||
if trimmed == "*" || trimmed.starts_with("* ") {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(trimmed.to_string())
|
||||
}
|
||||
|
||||
pub fn to_fts_query(raw: &str, mode: FtsQueryMode) -> String {
|
||||
match mode {
|
||||
FtsQueryMode::Raw => raw.to_string(),
|
||||
FtsQueryMode::Raw => {
|
||||
// Validate raw FTS5 input; fall back to Safe mode if invalid
|
||||
match sanitize_raw_fts(raw) {
|
||||
Some(sanitized) => sanitized,
|
||||
None => to_fts_query(raw, FtsQueryMode::Safe),
|
||||
}
|
||||
}
|
||||
FtsQueryMode::Safe => {
|
||||
let trimmed = raw.trim();
|
||||
if trimmed.is_empty() {
|
||||
@@ -202,4 +231,38 @@ mod tests {
|
||||
let result = get_result_snippet(Some(""), "full content text");
|
||||
assert_eq!(result, "full content text");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_raw_mode_valid_fts5_passes_through() {
|
||||
let result = to_fts_query("auth OR error", FtsQueryMode::Raw);
|
||||
assert_eq!(result, "auth OR error");
|
||||
|
||||
let result = to_fts_query("\"exact phrase\"", FtsQueryMode::Raw);
|
||||
assert_eq!(result, "\"exact phrase\"");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_raw_mode_unbalanced_quotes_falls_back_to_safe() {
|
||||
let result = to_fts_query("auth \"error", FtsQueryMode::Raw);
|
||||
// Falls back to Safe mode: each token quoted
|
||||
assert_eq!(result, "\"auth\" \"\"\"error\"");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_raw_mode_leading_wildcard_falls_back_to_safe() {
|
||||
let result = to_fts_query("* OR auth", FtsQueryMode::Raw);
|
||||
assert_eq!(result, "\"*\" \"OR\" \"auth\"");
|
||||
|
||||
let result = to_fts_query("*", FtsQueryMode::Raw);
|
||||
assert_eq!(result, "\"*\"");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_raw_mode_empty_falls_back_to_safe() {
|
||||
let result = to_fts_query("", FtsQueryMode::Raw);
|
||||
assert_eq!(result, "");
|
||||
|
||||
let result = to_fts_query(" ", FtsQueryMode::Raw);
|
||||
assert_eq!(result, "");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -422,3 +422,161 @@ fn bench_prepare_vs_prepare_cached() {
|
||||
println!("Speedup: {:.2}x", speedup);
|
||||
println!();
|
||||
}
|
||||
|
||||
/// Benchmark: redundant hash query elimination in document regeneration.
|
||||
/// OLD: get_existing_hash (1 query) + upsert_document_inner (1 query) = 2 queries per doc
|
||||
/// NEW: upsert_document_inner only (1 query) = 1 query per doc
|
||||
#[test]
|
||||
fn bench_redundant_hash_query_elimination() {
|
||||
let conn = setup_db();
|
||||
|
||||
// Seed documents
|
||||
for i in 1..=100 {
|
||||
conn.execute(
|
||||
"INSERT INTO issues (id, gitlab_id, project_id, iid, title, state, created_at, updated_at, last_seen_at)
|
||||
VALUES (?1, ?2, 1, ?1, 'Test', 'opened', 1000, 2000, 3000)",
|
||||
rusqlite::params![i, i * 10],
|
||||
).unwrap();
|
||||
conn.execute(
|
||||
"INSERT INTO documents (source_type, source_id, project_id, content_text, content_hash, labels_hash, paths_hash)
|
||||
VALUES ('issue', ?1, 1, 'content', ?2, 'lh', 'ph')",
|
||||
rusqlite::params![i, format!("hash_{}", i)],
|
||||
).unwrap();
|
||||
}
|
||||
|
||||
let iterations = 10_000;
|
||||
let hash_sql = "SELECT content_hash FROM documents WHERE source_type = ?1 AND source_id = ?2";
|
||||
let full_sql = "SELECT id, content_hash, labels_hash, paths_hash FROM documents WHERE source_type = ?1 AND source_id = ?2";
|
||||
|
||||
// OLD: 2 queries per document (get_existing_hash + upsert_document_inner)
|
||||
let start = Instant::now();
|
||||
for i in 0..iterations {
|
||||
let source_id = (i % 100) + 1;
|
||||
// Query 1: get_existing_hash
|
||||
let mut stmt1 = conn.prepare_cached(hash_sql).unwrap();
|
||||
let _hash: Option<String> = stmt1
|
||||
.query_row(rusqlite::params!["issue", source_id as i64], |row| {
|
||||
row.get(0)
|
||||
})
|
||||
.ok();
|
||||
// Query 2: upsert_document_inner
|
||||
let mut stmt2 = conn.prepare_cached(full_sql).unwrap();
|
||||
let _existing: Option<(i64, String, String, String)> = stmt2
|
||||
.query_row(rusqlite::params!["issue", source_id as i64], |row| {
|
||||
Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
|
||||
})
|
||||
.ok();
|
||||
std::hint::black_box((_hash, _existing));
|
||||
}
|
||||
let old_elapsed = start.elapsed();
|
||||
|
||||
// NEW: 1 query per document (upsert_document_inner returns change info)
|
||||
let start = Instant::now();
|
||||
for i in 0..iterations {
|
||||
let source_id = (i % 100) + 1;
|
||||
// Single query that provides both change detection and upsert data
|
||||
let mut stmt = conn.prepare_cached(full_sql).unwrap();
|
||||
let existing: Option<(i64, String, String, String)> = stmt
|
||||
.query_row(rusqlite::params!["issue", source_id as i64], |row| {
|
||||
Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
|
||||
})
|
||||
.ok();
|
||||
let _changed = match &existing {
|
||||
Some((_, old_hash, _, _)) => old_hash != &format!("hash_{}", source_id),
|
||||
None => true,
|
||||
};
|
||||
std::hint::black_box((existing, _changed));
|
||||
}
|
||||
let new_elapsed = start.elapsed();
|
||||
|
||||
let speedup = old_elapsed.as_nanos() as f64 / new_elapsed.as_nanos() as f64;
|
||||
|
||||
println!(
|
||||
"\n=== Redundant Hash Query Elimination ({} iterations) ===",
|
||||
iterations
|
||||
);
|
||||
println!("OLD (2 queries): {:?}", old_elapsed);
|
||||
println!("NEW (1 query): {:?}", new_elapsed);
|
||||
println!("Speedup: {:.2}x", speedup);
|
||||
println!();
|
||||
}
|
||||
|
||||
// NOTE: SHA256 hex formatting (format!("{:x}") vs LUT) was benchmarked at 1.01x.
|
||||
// The SHA256 computation dominates; hex encoding is negligible. Optimization reverted.
|
||||
|
||||
// NOTE: compute_list_hash indirect index sort vs direct &str sort was benchmarked at 1.02x.
|
||||
// SHA256 dominates here too; the sort strategy is negligible. Optimization reverted.
|
||||
|
||||
/// Benchmark: f32-to-bytes conversion - allocate-per-call vs reusable buffer.
|
||||
/// The embedding pipeline converts 768 f32s to 3072 bytes per chunk stored.
|
||||
#[test]
|
||||
fn bench_embedding_bytes_alloc_vs_reuse() {
|
||||
// Simulate 768-dim embeddings (nomic-embed-text)
|
||||
let dims = 768;
|
||||
let embeddings: Vec<Vec<f32>> = (0..100)
|
||||
.map(|i| (0..dims).map(|j| (i * dims + j) as f32 * 0.001).collect())
|
||||
.collect();
|
||||
let iterations = 50_000;
|
||||
|
||||
fn to_bytes_alloc(embedding: &[f32]) -> Vec<u8> {
|
||||
let mut bytes = Vec::with_capacity(embedding.len() * 4);
|
||||
for f in embedding {
|
||||
bytes.extend_from_slice(&f.to_le_bytes());
|
||||
}
|
||||
bytes
|
||||
}
|
||||
|
||||
fn to_bytes_reuse(embedding: &[f32], buf: &mut Vec<u8>) {
|
||||
buf.clear();
|
||||
buf.reserve(embedding.len() * 4);
|
||||
for f in embedding {
|
||||
buf.extend_from_slice(&f.to_le_bytes());
|
||||
}
|
||||
}
|
||||
|
||||
// Warm up
|
||||
let mut buf = Vec::with_capacity(dims * 4);
|
||||
for emb in &embeddings {
|
||||
let _ = to_bytes_alloc(emb);
|
||||
to_bytes_reuse(emb, &mut buf);
|
||||
}
|
||||
|
||||
// Benchmark OLD: allocate per call
|
||||
let start = Instant::now();
|
||||
for i in 0..iterations {
|
||||
let emb = &embeddings[i % embeddings.len()];
|
||||
let bytes = to_bytes_alloc(emb);
|
||||
std::hint::black_box(&bytes);
|
||||
}
|
||||
let old_elapsed = start.elapsed();
|
||||
|
||||
// Benchmark NEW: reusable buffer
|
||||
let start = Instant::now();
|
||||
let mut buf = Vec::with_capacity(dims * 4);
|
||||
for i in 0..iterations {
|
||||
let emb = &embeddings[i % embeddings.len()];
|
||||
to_bytes_reuse(emb, &mut buf);
|
||||
std::hint::black_box(&buf);
|
||||
}
|
||||
let new_elapsed = start.elapsed();
|
||||
|
||||
let speedup = old_elapsed.as_nanos() as f64 / new_elapsed.as_nanos() as f64;
|
||||
|
||||
println!(
|
||||
"\n=== Embedding Bytes Conversion Benchmark ({} iterations, {} dims) ===",
|
||||
iterations, dims
|
||||
);
|
||||
println!("Alloc per call: {:?}", old_elapsed);
|
||||
println!("Reusable buffer: {:?}", new_elapsed);
|
||||
println!("Speedup: {:.2}x", speedup);
|
||||
println!();
|
||||
|
||||
// Verify correctness
|
||||
let test_emb: Vec<f32> = (0..dims).map(|i| i as f32 * 0.1).collect();
|
||||
let alloc_result = to_bytes_alloc(&test_emb);
|
||||
to_bytes_reuse(&test_emb, &mut buf);
|
||||
assert_eq!(
|
||||
alloc_result, buf,
|
||||
"Both approaches must produce identical bytes"
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user