Move inline #[cfg(test)] mod tests { ... } blocks from 22 source files
into dedicated _tests.rs companion files, wired via:
#[cfg(test)]
#[path = "module_tests.rs"]
mod tests;
This keeps implementation-focused source files leaner and more scannable
while preserving full access to private items through `use super::*;`.
Modules extracted:
core: db, note_parser, payloads, project, references, sync_run,
timeline_collect, timeline_expand, timeline_seed
cli: list (55 tests), who (75 tests)
documents: extractor (43 tests), regenerator
embedding: change_detector, chunking
gitlab: graphql (wiremock async tests), transformers/issue
ingestion: dirty_tracker, discussions, issues, mr_diffs
Also adds conflicts_with("explain_score") to the --detail flag in the
who command to prevent mutually exclusive flags from being combined.
All 629 unit tests pass. No behavior changes.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
274 lines
9.3 KiB
Rust
274 lines
9.3 KiB
Rust
use rusqlite::Connection;
|
|
use rusqlite::OptionalExtension;
|
|
use tracing::{debug, instrument, warn};
|
|
|
|
use crate::core::error::Result;
|
|
use crate::documents::{
|
|
DocumentData, ParentMetadataCache, SourceType, extract_discussion_document,
|
|
extract_issue_document, extract_mr_document, extract_note_document_cached,
|
|
};
|
|
use crate::ingestion::dirty_tracker::{clear_dirty, get_dirty_sources, record_dirty_error};
|
|
|
|
#[derive(Debug, Default)]
|
|
pub struct RegenerateResult {
|
|
pub regenerated: usize,
|
|
pub unchanged: usize,
|
|
pub errored: usize,
|
|
}
|
|
|
|
#[instrument(
|
|
skip(conn, progress_callback),
|
|
fields(items_processed, items_skipped, errors)
|
|
)]
|
|
pub fn regenerate_dirty_documents(
|
|
conn: &Connection,
|
|
progress_callback: Option<&dyn Fn(usize, usize)>,
|
|
) -> Result<RegenerateResult> {
|
|
let mut result = RegenerateResult::default();
|
|
|
|
let mut estimated_total: usize = 0;
|
|
let mut cache = ParentMetadataCache::new();
|
|
|
|
loop {
|
|
let dirty = get_dirty_sources(conn)?;
|
|
if dirty.is_empty() {
|
|
break;
|
|
}
|
|
|
|
let remaining: usize = conn
|
|
.query_row("SELECT COUNT(*) FROM dirty_sources", [], |row| row.get(0))
|
|
.unwrap_or(0_i64) as usize;
|
|
let processed_so_far = result.regenerated + result.unchanged + result.errored;
|
|
estimated_total = estimated_total.max(processed_so_far + remaining);
|
|
|
|
for (source_type, source_id) in &dirty {
|
|
match regenerate_one(conn, *source_type, *source_id, &mut cache) {
|
|
Ok(changed) => {
|
|
if changed {
|
|
result.regenerated += 1;
|
|
} else {
|
|
result.unchanged += 1;
|
|
}
|
|
clear_dirty(conn, *source_type, *source_id)?;
|
|
}
|
|
Err(e) => {
|
|
warn!(
|
|
source_type = %source_type,
|
|
source_id,
|
|
error = %e,
|
|
"Failed to regenerate document"
|
|
);
|
|
record_dirty_error(conn, *source_type, *source_id, &e.to_string())?;
|
|
result.errored += 1;
|
|
}
|
|
}
|
|
|
|
let processed = result.regenerated + result.unchanged + result.errored;
|
|
if let Some(cb) = progress_callback {
|
|
cb(processed, estimated_total);
|
|
}
|
|
}
|
|
}
|
|
|
|
debug!(
|
|
regenerated = result.regenerated,
|
|
unchanged = result.unchanged,
|
|
errored = result.errored,
|
|
"Document regeneration complete"
|
|
);
|
|
|
|
tracing::Span::current().record("items_processed", result.regenerated);
|
|
tracing::Span::current().record("items_skipped", result.unchanged);
|
|
tracing::Span::current().record("errors", result.errored);
|
|
|
|
Ok(result)
|
|
}
|
|
|
|
fn regenerate_one(
|
|
conn: &Connection,
|
|
source_type: SourceType,
|
|
source_id: i64,
|
|
cache: &mut ParentMetadataCache,
|
|
) -> Result<bool> {
|
|
let doc = match source_type {
|
|
SourceType::Issue => extract_issue_document(conn, source_id)?,
|
|
SourceType::MergeRequest => extract_mr_document(conn, source_id)?,
|
|
SourceType::Discussion => extract_discussion_document(conn, source_id)?,
|
|
SourceType::Note => extract_note_document_cached(conn, source_id, cache)?,
|
|
};
|
|
|
|
let Some(doc) = doc else {
|
|
delete_document(conn, source_type, source_id)?;
|
|
return Ok(true);
|
|
};
|
|
|
|
upsert_document(conn, &doc)
|
|
}
|
|
|
|
fn upsert_document(conn: &Connection, doc: &DocumentData) -> Result<bool> {
|
|
conn.execute_batch("SAVEPOINT upsert_doc")?;
|
|
match upsert_document_inner(conn, doc) {
|
|
Ok(changed) => {
|
|
conn.execute_batch("RELEASE upsert_doc")?;
|
|
Ok(changed)
|
|
}
|
|
Err(e) => {
|
|
let _ = conn.execute_batch("ROLLBACK TO upsert_doc; RELEASE upsert_doc");
|
|
Err(e)
|
|
}
|
|
}
|
|
}
|
|
|
|
fn upsert_document_inner(conn: &Connection, doc: &DocumentData) -> Result<bool> {
|
|
let existing: Option<(i64, String, String, String)> = conn
|
|
.query_row(
|
|
"SELECT id, content_hash, labels_hash, paths_hash FROM documents
|
|
WHERE source_type = ?1 AND source_id = ?2",
|
|
rusqlite::params![doc.source_type.as_str(), doc.source_id],
|
|
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
|
|
)
|
|
.optional()?;
|
|
|
|
// Fast path: if all three hashes match, nothing changed at all.
|
|
if let Some((_, ref old_content_hash, ref old_labels_hash, ref old_paths_hash)) = existing
|
|
&& old_content_hash == &doc.content_hash
|
|
&& old_labels_hash == &doc.labels_hash
|
|
&& old_paths_hash == &doc.paths_hash
|
|
{
|
|
return Ok(false);
|
|
}
|
|
// Past this point at least one hash differs, so the document will be updated.
|
|
|
|
let labels_json = serde_json::to_string(&doc.labels).unwrap_or_else(|_| "[]".to_string());
|
|
|
|
conn.execute(
|
|
"INSERT INTO documents
|
|
(source_type, source_id, project_id, author_username, label_names,
|
|
labels_hash, paths_hash,
|
|
created_at, updated_at, url, title, content_text, content_hash,
|
|
is_truncated, truncated_reason)
|
|
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)
|
|
ON CONFLICT(source_type, source_id) DO UPDATE SET
|
|
project_id = excluded.project_id,
|
|
author_username = excluded.author_username,
|
|
label_names = excluded.label_names,
|
|
labels_hash = excluded.labels_hash,
|
|
paths_hash = excluded.paths_hash,
|
|
updated_at = excluded.updated_at,
|
|
url = excluded.url,
|
|
title = excluded.title,
|
|
content_text = excluded.content_text,
|
|
content_hash = excluded.content_hash,
|
|
is_truncated = excluded.is_truncated,
|
|
truncated_reason = excluded.truncated_reason",
|
|
rusqlite::params![
|
|
doc.source_type.as_str(),
|
|
doc.source_id,
|
|
doc.project_id,
|
|
doc.author_username,
|
|
labels_json,
|
|
doc.labels_hash,
|
|
doc.paths_hash,
|
|
doc.created_at,
|
|
doc.updated_at,
|
|
doc.url,
|
|
doc.title,
|
|
doc.content_text,
|
|
doc.content_hash,
|
|
doc.is_truncated as i32,
|
|
doc.truncated_reason,
|
|
],
|
|
)?;
|
|
|
|
let doc_id = match existing {
|
|
Some((id, _, _, _)) => id,
|
|
None => get_document_id(conn, doc.source_type, doc.source_id)?,
|
|
};
|
|
|
|
let labels_changed = match &existing {
|
|
Some((_, _, old_hash, _)) => old_hash != &doc.labels_hash,
|
|
None => true,
|
|
};
|
|
if labels_changed {
|
|
conn.execute(
|
|
"DELETE FROM document_labels WHERE document_id = ?1",
|
|
[doc_id],
|
|
)?;
|
|
if !doc.labels.is_empty() {
|
|
let placeholders: String = doc
|
|
.labels
|
|
.iter()
|
|
.enumerate()
|
|
.map(|(i, _)| format!("(?1, ?{})", i + 2))
|
|
.collect::<Vec<_>>()
|
|
.join(", ");
|
|
let sql = format!(
|
|
"INSERT INTO document_labels (document_id, label_name) VALUES {}",
|
|
placeholders
|
|
);
|
|
let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(doc_id)];
|
|
for label in &doc.labels {
|
|
params.push(Box::new(label.as_str()));
|
|
}
|
|
let param_refs: Vec<&dyn rusqlite::types::ToSql> =
|
|
params.iter().map(|p| p.as_ref()).collect();
|
|
conn.execute(&sql, param_refs.as_slice())?;
|
|
}
|
|
}
|
|
|
|
let paths_changed = match &existing {
|
|
Some((_, _, _, old_hash)) => old_hash != &doc.paths_hash,
|
|
None => true,
|
|
};
|
|
if paths_changed {
|
|
conn.execute(
|
|
"DELETE FROM document_paths WHERE document_id = ?1",
|
|
[doc_id],
|
|
)?;
|
|
if !doc.paths.is_empty() {
|
|
let placeholders: String = doc
|
|
.paths
|
|
.iter()
|
|
.enumerate()
|
|
.map(|(i, _)| format!("(?1, ?{})", i + 2))
|
|
.collect::<Vec<_>>()
|
|
.join(", ");
|
|
let sql = format!(
|
|
"INSERT INTO document_paths (document_id, path) VALUES {}",
|
|
placeholders
|
|
);
|
|
let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(doc_id)];
|
|
for path in &doc.paths {
|
|
params.push(Box::new(path.as_str()));
|
|
}
|
|
let param_refs: Vec<&dyn rusqlite::types::ToSql> =
|
|
params.iter().map(|p| p.as_ref()).collect();
|
|
conn.execute(&sql, param_refs.as_slice())?;
|
|
}
|
|
}
|
|
|
|
// We passed the triple-hash fast path, so at least one hash differs.
|
|
Ok(true)
|
|
}
|
|
|
|
fn delete_document(conn: &Connection, source_type: SourceType, source_id: i64) -> Result<()> {
|
|
conn.execute(
|
|
"DELETE FROM documents WHERE source_type = ?1 AND source_id = ?2",
|
|
rusqlite::params![source_type.as_str(), source_id],
|
|
)?;
|
|
Ok(())
|
|
}
|
|
|
|
fn get_document_id(conn: &Connection, source_type: SourceType, source_id: i64) -> Result<i64> {
|
|
let id: i64 = conn.query_row(
|
|
"SELECT id FROM documents WHERE source_type = ?1 AND source_id = ?2",
|
|
rusqlite::params![source_type.as_str(), source_id],
|
|
|row| row.get(0),
|
|
)?;
|
|
Ok(id)
|
|
}
|
|
|
|
#[cfg(test)]
|
|
#[path = "regenerator_tests.rs"]
|
|
mod tests;
|