feat(documents): Add document generation pipeline with dirty tracking

Implements the documents module that transforms raw ingested entities
(issues, MRs, discussions) into searchable document blobs stored in
the documents table. This is the foundation for both FTS5 lexical
search and vector embedding.

Key components:

- documents::extractor: Renders entities into structured text documents.
  Issues include title, description, labels, milestone, assignees, and
  threaded discussion summaries. MRs additionally include source/target
  branches, reviewers, and approval status. Discussions are rendered
  with full note threading.

- documents::regenerator: Drains the dirty_queue table to regenerate
  only documents whose source entities changed since last sync. Supports
  full rebuild mode (seeds all entities into dirty queue first) and
  project-scoped regeneration.

- documents::truncation: Safety cap at 2MB per document to prevent
  pathological outliers from degrading FTS or embedding performance.

- ingestion::dirty_tracker: Marks entities as dirty inside the
  ingestion transaction so document regeneration stays consistent
  with data changes. Uses INSERT OR IGNORE to deduplicate.

- ingestion::discussion_queue: Queue-based discussion fetching that
  isolates individual discussion failures from the broader ingestion
  pipeline, preventing a single corrupt discussion from blocking
  an entire project sync.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Taylor Eernisse
2026-01-30 15:46:18 -05:00
parent d31d5292f2
commit 20edff4ab1
7 changed files with 2431 additions and 0 deletions

1085
src/documents/extractor.rs Normal file

File diff suppressed because it is too large Load Diff

17
src/documents/mod.rs Normal file
View File

@@ -0,0 +1,17 @@
//! Document generation and management.
//!
//! Extracts searchable documents from issues, MRs, and discussions.
mod extractor;
mod regenerator;
mod truncation;
pub use extractor::{
compute_content_hash, compute_list_hash, extract_discussion_document,
extract_issue_document, extract_mr_document, DocumentData, SourceType,
};
pub use regenerator::{regenerate_dirty_documents, RegenerateResult};
pub use truncation::{
truncate_discussion, truncate_hard_cap, truncate_utf8, NoteContent, TruncationReason,
TruncationResult, MAX_DISCUSSION_BYTES, MAX_DOCUMENT_BYTES_HARD,
};

View File

@@ -0,0 +1,475 @@
use rusqlite::Connection;
use rusqlite::OptionalExtension;
use tracing::{debug, warn};
use crate::core::error::Result;
use crate::documents::{
extract_discussion_document, extract_issue_document, extract_mr_document, DocumentData,
SourceType,
};
use crate::ingestion::dirty_tracker::{clear_dirty, get_dirty_sources, record_dirty_error};
/// Result of a document regeneration run.
#[derive(Debug, Default)]
pub struct RegenerateResult {
pub regenerated: usize,
pub unchanged: usize,
pub errored: usize,
}
/// Drain the dirty_sources queue, regenerating documents for each entry.
///
/// Uses per-item error handling (fail-soft) and drains the queue completely
/// via a bounded batch loop. Each dirty item is processed independently.
pub fn regenerate_dirty_documents(conn: &Connection) -> Result<RegenerateResult> {
let mut result = RegenerateResult::default();
loop {
let dirty = get_dirty_sources(conn)?;
if dirty.is_empty() {
break;
}
for (source_type, source_id) in &dirty {
match regenerate_one(conn, *source_type, *source_id) {
Ok(changed) => {
if changed {
result.regenerated += 1;
} else {
result.unchanged += 1;
}
clear_dirty(conn, *source_type, *source_id)?;
}
Err(e) => {
warn!(
source_type = %source_type,
source_id,
error = %e,
"Failed to regenerate document"
);
record_dirty_error(conn, *source_type, *source_id, &e.to_string())?;
result.errored += 1;
}
}
}
}
debug!(
regenerated = result.regenerated,
unchanged = result.unchanged,
errored = result.errored,
"Document regeneration complete"
);
Ok(result)
}
/// Regenerate a single document. Returns true if content_hash changed.
fn regenerate_one(
conn: &Connection,
source_type: SourceType,
source_id: i64,
) -> Result<bool> {
let doc = match source_type {
SourceType::Issue => extract_issue_document(conn, source_id)?,
SourceType::MergeRequest => extract_mr_document(conn, source_id)?,
SourceType::Discussion => extract_discussion_document(conn, source_id)?,
};
let Some(doc) = doc else {
// Source was deleted — remove the document (cascade handles FTS/embeddings)
delete_document(conn, source_type, source_id)?;
return Ok(true);
};
let existing_hash = get_existing_hash(conn, source_type, source_id)?;
let changed = existing_hash.as_ref() != Some(&doc.content_hash);
// Always upsert: labels/paths can change independently of content_hash
upsert_document(conn, &doc)?;
Ok(changed)
}
/// Get existing content hash for a document, if it exists.
fn get_existing_hash(
conn: &Connection,
source_type: SourceType,
source_id: i64,
) -> Result<Option<String>> {
let mut stmt =
conn.prepare("SELECT content_hash FROM documents WHERE source_type = ?1 AND source_id = ?2")?;
let hash: Option<String> = stmt
.query_row(rusqlite::params![source_type.as_str(), source_id], |row| {
row.get(0)
})
.optional()?;
Ok(hash)
}
/// Upsert a document with triple-hash write optimization.
///
/// Wrapped in a SAVEPOINT to ensure atomicity of the multi-statement write
/// (document row + labels + paths). Without this, a crash between statements
/// could leave the document with a stale labels_hash but missing label rows.
fn upsert_document(conn: &Connection, doc: &DocumentData) -> Result<()> {
conn.execute_batch("SAVEPOINT upsert_doc")?;
match upsert_document_inner(conn, doc) {
Ok(()) => {
conn.execute_batch("RELEASE upsert_doc")?;
Ok(())
}
Err(e) => {
let _ = conn.execute_batch("ROLLBACK TO upsert_doc");
Err(e)
}
}
}
fn upsert_document_inner(conn: &Connection, doc: &DocumentData) -> Result<()> {
// Check existing hashes before writing
let existing: Option<(i64, String, String, String)> = conn
.query_row(
"SELECT id, content_hash, labels_hash, paths_hash FROM documents
WHERE source_type = ?1 AND source_id = ?2",
rusqlite::params![doc.source_type.as_str(), doc.source_id],
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
)
.optional()?;
// Fast path: skip ALL writes when nothing changed (prevents WAL churn)
if let Some((_, ref old_content_hash, ref old_labels_hash, ref old_paths_hash)) = existing {
if old_content_hash == &doc.content_hash
&& old_labels_hash == &doc.labels_hash
&& old_paths_hash == &doc.paths_hash
{
return Ok(());
}
}
let labels_json =
serde_json::to_string(&doc.labels).unwrap_or_else(|_| "[]".to_string());
// Upsert document row
conn.execute(
"INSERT INTO documents
(source_type, source_id, project_id, author_username, label_names,
labels_hash, paths_hash,
created_at, updated_at, url, title, content_text, content_hash,
is_truncated, truncated_reason)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)
ON CONFLICT(source_type, source_id) DO UPDATE SET
author_username = excluded.author_username,
label_names = excluded.label_names,
labels_hash = excluded.labels_hash,
paths_hash = excluded.paths_hash,
updated_at = excluded.updated_at,
url = excluded.url,
title = excluded.title,
content_text = excluded.content_text,
content_hash = excluded.content_hash,
is_truncated = excluded.is_truncated,
truncated_reason = excluded.truncated_reason",
rusqlite::params![
doc.source_type.as_str(),
doc.source_id,
doc.project_id,
doc.author_username,
labels_json,
doc.labels_hash,
doc.paths_hash,
doc.created_at,
doc.updated_at,
doc.url,
doc.title,
doc.content_text,
doc.content_hash,
doc.is_truncated as i32,
doc.truncated_reason,
],
)?;
// Get document ID
let doc_id = match existing {
Some((id, _, _, _)) => id,
None => get_document_id(conn, doc.source_type, doc.source_id)?,
};
// Only update labels if hash changed
let labels_changed = match &existing {
Some((_, _, old_hash, _)) => old_hash != &doc.labels_hash,
None => true,
};
if labels_changed {
conn.execute(
"DELETE FROM document_labels WHERE document_id = ?1",
[doc_id],
)?;
for label in &doc.labels {
conn.execute(
"INSERT INTO document_labels (document_id, label_name) VALUES (?1, ?2)",
rusqlite::params![doc_id, label],
)?;
}
}
// Only update paths if hash changed
let paths_changed = match &existing {
Some((_, _, _, old_hash)) => old_hash != &doc.paths_hash,
None => true,
};
if paths_changed {
conn.execute(
"DELETE FROM document_paths WHERE document_id = ?1",
[doc_id],
)?;
for path in &doc.paths {
conn.execute(
"INSERT INTO document_paths (document_id, path) VALUES (?1, ?2)",
rusqlite::params![doc_id, path],
)?;
}
}
Ok(())
}
/// Delete a document by source identity.
fn delete_document(
conn: &Connection,
source_type: SourceType,
source_id: i64,
) -> Result<()> {
conn.execute(
"DELETE FROM documents WHERE source_type = ?1 AND source_id = ?2",
rusqlite::params![source_type.as_str(), source_id],
)?;
Ok(())
}
/// Get document ID by source type and source ID.
fn get_document_id(
conn: &Connection,
source_type: SourceType,
source_id: i64,
) -> Result<i64> {
let id: i64 = conn.query_row(
"SELECT id FROM documents WHERE source_type = ?1 AND source_id = ?2",
rusqlite::params![source_type.as_str(), source_id],
|row| row.get(0),
)?;
Ok(id)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ingestion::dirty_tracker::mark_dirty;
fn setup_db() -> Connection {
let conn = Connection::open_in_memory().unwrap();
conn.execute_batch("
CREATE TABLE projects (
id INTEGER PRIMARY KEY,
gitlab_project_id INTEGER UNIQUE NOT NULL,
path_with_namespace TEXT NOT NULL,
default_branch TEXT,
web_url TEXT,
created_at INTEGER,
updated_at INTEGER,
raw_payload_id INTEGER
);
INSERT INTO projects (id, gitlab_project_id, path_with_namespace) VALUES (1, 100, 'group/project');
CREATE TABLE issues (
id INTEGER PRIMARY KEY,
gitlab_id INTEGER UNIQUE NOT NULL,
project_id INTEGER NOT NULL REFERENCES projects(id),
iid INTEGER NOT NULL,
title TEXT,
description TEXT,
state TEXT NOT NULL,
author_username TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
last_seen_at INTEGER NOT NULL,
discussions_synced_for_updated_at INTEGER,
web_url TEXT,
raw_payload_id INTEGER
);
CREATE TABLE labels (
id INTEGER PRIMARY KEY,
gitlab_id INTEGER,
project_id INTEGER NOT NULL REFERENCES projects(id),
name TEXT NOT NULL,
color TEXT,
description TEXT
);
CREATE TABLE issue_labels (
issue_id INTEGER NOT NULL REFERENCES issues(id),
label_id INTEGER NOT NULL REFERENCES labels(id),
PRIMARY KEY(issue_id, label_id)
);
CREATE TABLE documents (
id INTEGER PRIMARY KEY,
source_type TEXT NOT NULL,
source_id INTEGER NOT NULL,
project_id INTEGER NOT NULL,
author_username TEXT,
label_names TEXT,
created_at INTEGER,
updated_at INTEGER,
url TEXT,
title TEXT,
content_text TEXT NOT NULL,
content_hash TEXT NOT NULL,
labels_hash TEXT NOT NULL DEFAULT '',
paths_hash TEXT NOT NULL DEFAULT '',
is_truncated INTEGER NOT NULL DEFAULT 0,
truncated_reason TEXT,
UNIQUE(source_type, source_id)
);
CREATE TABLE document_labels (
document_id INTEGER NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
label_name TEXT NOT NULL,
PRIMARY KEY(document_id, label_name)
);
CREATE TABLE document_paths (
document_id INTEGER NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
path TEXT NOT NULL,
PRIMARY KEY(document_id, path)
);
CREATE TABLE dirty_sources (
source_type TEXT NOT NULL,
source_id INTEGER NOT NULL,
queued_at INTEGER NOT NULL,
attempt_count INTEGER NOT NULL DEFAULT 0,
last_attempt_at INTEGER,
last_error TEXT,
next_attempt_at INTEGER,
PRIMARY KEY(source_type, source_id)
);
CREATE INDEX idx_dirty_sources_next_attempt ON dirty_sources(next_attempt_at);
").unwrap();
conn
}
#[test]
fn test_regenerate_creates_document() {
let conn = setup_db();
conn.execute(
"INSERT INTO issues (id, gitlab_id, project_id, iid, title, description, state, author_username, created_at, updated_at, last_seen_at) VALUES (1, 10, 1, 42, 'Test Issue', 'Description here', 'opened', 'alice', 1000, 2000, 3000)",
[],
).unwrap();
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
let result = regenerate_dirty_documents(&conn).unwrap();
assert_eq!(result.regenerated, 1);
assert_eq!(result.unchanged, 0);
assert_eq!(result.errored, 0);
// Verify document was created
let count: i64 = conn.query_row("SELECT COUNT(*) FROM documents", [], |r| r.get(0)).unwrap();
assert_eq!(count, 1);
let content: String = conn.query_row("SELECT content_text FROM documents", [], |r| r.get(0)).unwrap();
assert!(content.contains("[[Issue]] #42: Test Issue"));
}
#[test]
fn test_regenerate_unchanged() {
let conn = setup_db();
conn.execute(
"INSERT INTO issues (id, gitlab_id, project_id, iid, title, description, state, author_username, created_at, updated_at, last_seen_at) VALUES (1, 10, 1, 42, 'Test', 'Desc', 'opened', 'alice', 1000, 2000, 3000)",
[],
).unwrap();
// First regeneration creates the document
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
let r1 = regenerate_dirty_documents(&conn).unwrap();
assert_eq!(r1.regenerated, 1);
// Second regeneration — same data, should be unchanged
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
let r2 = regenerate_dirty_documents(&conn).unwrap();
assert_eq!(r2.unchanged, 1);
assert_eq!(r2.regenerated, 0);
}
#[test]
fn test_regenerate_deleted_source() {
let conn = setup_db();
conn.execute(
"INSERT INTO issues (id, gitlab_id, project_id, iid, title, state, created_at, updated_at, last_seen_at) VALUES (1, 10, 1, 42, 'Test', 'opened', 1000, 2000, 3000)",
[],
).unwrap();
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
regenerate_dirty_documents(&conn).unwrap();
// Delete the issue and re-mark dirty
conn.execute("PRAGMA foreign_keys = OFF", []).unwrap();
conn.execute("DELETE FROM issues WHERE id = 1", []).unwrap();
conn.execute("PRAGMA foreign_keys = ON", []).unwrap();
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
let result = regenerate_dirty_documents(&conn).unwrap();
assert_eq!(result.regenerated, 1); // Deletion counts as "changed"
let count: i64 = conn.query_row("SELECT COUNT(*) FROM documents", [], |r| r.get(0)).unwrap();
assert_eq!(count, 0);
}
#[test]
fn test_regenerate_drains_queue() {
let conn = setup_db();
for i in 1..=10 {
conn.execute(
"INSERT INTO issues (id, gitlab_id, project_id, iid, title, state, created_at, updated_at, last_seen_at) VALUES (?1, ?2, 1, ?1, 'Test', 'opened', 1000, 2000, 3000)",
rusqlite::params![i, i * 10],
).unwrap();
mark_dirty(&conn, SourceType::Issue, i).unwrap();
}
let result = regenerate_dirty_documents(&conn).unwrap();
assert_eq!(result.regenerated, 10);
// Queue should be empty
let dirty = get_dirty_sources(&conn).unwrap();
assert!(dirty.is_empty());
}
#[test]
fn test_triple_hash_fast_path() {
let conn = setup_db();
conn.execute(
"INSERT INTO issues (id, gitlab_id, project_id, iid, title, state, created_at, updated_at, last_seen_at) VALUES (1, 10, 1, 42, 'Test', 'opened', 1000, 2000, 3000)",
[],
).unwrap();
conn.execute(
"INSERT INTO labels (id, project_id, name) VALUES (1, 1, 'bug')",
[],
).unwrap();
conn.execute(
"INSERT INTO issue_labels (issue_id, label_id) VALUES (1, 1)",
[],
).unwrap();
// First run creates document
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
regenerate_dirty_documents(&conn).unwrap();
// Second run — triple hash match, should skip ALL writes
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
let result = regenerate_dirty_documents(&conn).unwrap();
assert_eq!(result.unchanged, 1);
// Labels should still be present (not deleted and re-inserted)
let label_count: i64 = conn.query_row(
"SELECT COUNT(*) FROM document_labels", [], |r| r.get(0),
).unwrap();
assert_eq!(label_count, 1);
}
}

329
src/documents/truncation.rs Normal file
View File

@@ -0,0 +1,329 @@
/// Maximum byte limit for discussion documents (suitable for embedding chunking).
/// Note: uses `.len()` (byte count), not char count — consistent with `CHUNK_MAX_BYTES`.
pub const MAX_DISCUSSION_BYTES: usize = 32_000;
/// Hard safety cap (bytes) for any document type (pathological content: pasted logs, base64).
pub const MAX_DOCUMENT_BYTES_HARD: usize = 2_000_000;
/// A single note's content for truncation processing.
pub struct NoteContent {
pub author: String,
pub date: String,
pub body: String,
}
/// Result of truncation processing.
pub struct TruncationResult {
pub content: String,
pub is_truncated: bool,
pub reason: Option<TruncationReason>,
}
/// Why a document was truncated (matches DB CHECK constraint values).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TruncationReason {
TokenLimitMiddleDrop,
SingleNoteOversized,
FirstLastOversized,
HardCapOversized,
}
impl TruncationReason {
/// Returns the DB-compatible string matching the CHECK constraint.
pub fn as_str(&self) -> &'static str {
match self {
Self::TokenLimitMiddleDrop => "token_limit_middle_drop",
Self::SingleNoteOversized => "single_note_oversized",
Self::FirstLastOversized => "first_last_oversized",
Self::HardCapOversized => "hard_cap_oversized",
}
}
}
/// Format a single note as `@author (date):\nbody\n\n`.
fn format_note(note: &NoteContent) -> String {
format!("@{} ({}):\n{}\n\n", note.author, note.date, note.body)
}
/// Truncate a string at a UTF-8-safe byte boundary.
/// Returns a slice no longer than `max_bytes` bytes, walking backward
/// to find the nearest char boundary if needed.
pub fn truncate_utf8(s: &str, max_bytes: usize) -> &str {
if s.len() <= max_bytes {
return s;
}
// Walk backward from max_bytes to find a char boundary
let mut end = max_bytes;
while end > 0 && !s.is_char_boundary(end) {
end -= 1;
}
&s[..end]
}
/// Truncate discussion notes to fit within `max_bytes`.
///
/// Algorithm:
/// 1. Format all notes
/// 2. If total fits, return as-is
/// 3. Single note: truncate at UTF-8 boundary, append [truncated]
/// 4. Try to keep first N notes + last note + marker within limit
/// 5. If first + last > limit: keep only first (truncated)
pub fn truncate_discussion(notes: &[NoteContent], max_bytes: usize) -> TruncationResult {
if notes.is_empty() {
return TruncationResult {
content: String::new(),
is_truncated: false,
reason: None,
};
}
let formatted: Vec<String> = notes.iter().map(format_note).collect();
let total: String = formatted.concat();
// Case 1: fits within limit
if total.len() <= max_bytes {
return TruncationResult {
content: total,
is_truncated: false,
reason: None,
};
}
// Case 2: single note — truncate it
if notes.len() == 1 {
let truncated = truncate_utf8(&total, max_bytes.saturating_sub(11)); // room for [truncated]
let content = format!("{}[truncated]", truncated);
return TruncationResult {
content,
is_truncated: true,
reason: Some(TruncationReason::SingleNoteOversized),
};
}
// Case 3: multiple notes — try first N + marker + last
let last_note = &formatted[formatted.len() - 1];
// Binary search for max N where first N notes + marker + last note fit
let mut best_n = 0;
for n in 1..formatted.len() - 1 {
let first_n: usize = formatted[..n].iter().map(|s| s.len()).sum();
let omitted = formatted.len() - n - 1;
let marker = format!("\n\n[... {} notes omitted for length ...]\n\n", omitted);
let candidate_len = first_n + marker.len() + last_note.len();
if candidate_len <= max_bytes {
best_n = n;
} else {
break;
}
}
if best_n > 0 {
// We can keep first best_n notes + marker + last note
let first_part: String = formatted[..best_n].concat();
let omitted = formatted.len() - best_n - 1;
let marker = format!("\n\n[... {} notes omitted for length ...]\n\n", omitted);
let content = format!("{}{}{}", first_part, marker, last_note);
return TruncationResult {
content,
is_truncated: true,
reason: Some(TruncationReason::TokenLimitMiddleDrop),
};
}
// Case 4: even first + last don't fit — keep only first (truncated)
let first_note = &formatted[0];
if first_note.len() + last_note.len() > max_bytes {
let truncated = truncate_utf8(first_note, max_bytes.saturating_sub(11));
let content = format!("{}[truncated]", truncated);
return TruncationResult {
content,
is_truncated: true,
reason: Some(TruncationReason::FirstLastOversized),
};
}
// Fallback: first + marker + last (0 middle notes kept)
let omitted = formatted.len() - 2;
let marker = format!("\n\n[... {} notes omitted for length ...]\n\n", omitted);
let content = format!("{}{}{}", formatted[0], marker, last_note);
TruncationResult {
content,
is_truncated: true,
reason: Some(TruncationReason::TokenLimitMiddleDrop),
}
}
/// Apply hard cap truncation to any document type.
/// Truncates at UTF-8-safe boundary if content exceeds 2MB.
pub fn truncate_hard_cap(content: &str) -> TruncationResult {
if content.len() <= MAX_DOCUMENT_BYTES_HARD {
return TruncationResult {
content: content.to_string(),
is_truncated: false,
reason: None,
};
}
let truncated = truncate_utf8(content, MAX_DOCUMENT_BYTES_HARD.saturating_sub(11));
TruncationResult {
content: format!("{}[truncated]", truncated),
is_truncated: true,
reason: Some(TruncationReason::HardCapOversized),
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_note(author: &str, body: &str) -> NoteContent {
NoteContent {
author: author.to_string(),
date: "2024-01-01".to_string(),
body: body.to_string(),
}
}
#[test]
fn test_no_truncation_under_limit() {
let notes = vec![
make_note("alice", "Short note 1"),
make_note("bob", "Short note 2"),
make_note("carol", "Short note 3"),
];
let result = truncate_discussion(&notes, MAX_DISCUSSION_BYTES);
assert!(!result.is_truncated);
assert!(result.reason.is_none());
assert!(result.content.contains("@alice"));
assert!(result.content.contains("@bob"));
assert!(result.content.contains("@carol"));
}
#[test]
fn test_middle_notes_dropped() {
// Create 10 notes where total exceeds limit
let big_body = "x".repeat(4000);
let notes: Vec<NoteContent> = (0..10)
.map(|i| make_note(&format!("user{}", i), &big_body))
.collect();
let result = truncate_discussion(&notes, 10_000);
assert!(result.is_truncated);
assert_eq!(result.reason, Some(TruncationReason::TokenLimitMiddleDrop));
// First note preserved
assert!(result.content.contains("@user0"));
// Last note preserved
assert!(result.content.contains("@user9"));
// Marker present
assert!(result.content.contains("notes omitted for length"));
}
#[test]
fn test_single_note_oversized() {
let big_body = "x".repeat(50_000);
let notes = vec![make_note("alice", &big_body)];
let result = truncate_discussion(&notes, MAX_DISCUSSION_BYTES);
assert!(result.is_truncated);
assert_eq!(result.reason, Some(TruncationReason::SingleNoteOversized));
assert!(result.content.ends_with("[truncated]"));
assert!(result.content.len() <= MAX_DISCUSSION_BYTES + 20);
}
#[test]
fn test_first_last_oversized() {
let big_body = "x".repeat(20_000);
let notes = vec![
make_note("alice", &big_body),
make_note("bob", &big_body),
];
let result = truncate_discussion(&notes, 10_000);
assert!(result.is_truncated);
assert_eq!(result.reason, Some(TruncationReason::FirstLastOversized));
assert!(result.content.contains("@alice"));
assert!(result.content.ends_with("[truncated]"));
}
#[test]
fn test_one_note_under_limit() {
let notes = vec![make_note("alice", "Short note")];
let result = truncate_discussion(&notes, MAX_DISCUSSION_BYTES);
assert!(!result.is_truncated);
assert!(result.content.contains("@alice"));
}
#[test]
fn test_empty_notes() {
let result = truncate_discussion(&[], MAX_DISCUSSION_BYTES);
assert!(!result.is_truncated);
assert!(result.content.is_empty());
}
#[test]
fn test_utf8_boundary_safety() {
// Emoji are 4 bytes each
let emoji_content = "🎉".repeat(10);
let truncated = truncate_utf8(&emoji_content, 10);
// 10 bytes should hold 2 emoji (8 bytes) with 2 bytes left over (not enough for another)
assert_eq!(truncated.len(), 8);
assert_eq!(truncated, "🎉🎉");
}
#[test]
fn test_utf8_boundary_cjk() {
// CJK characters are 3 bytes each
let cjk = "中文字符测试";
let truncated = truncate_utf8(cjk, 7);
// 7 bytes: 2 full chars (6 bytes), 1 byte left (not enough for another)
assert_eq!(truncated, "中文");
assert_eq!(truncated.len(), 6);
}
#[test]
fn test_hard_cap() {
let big_content = "x".repeat(3_000_000);
let result = truncate_hard_cap(&big_content);
assert!(result.is_truncated);
assert_eq!(result.reason, Some(TruncationReason::HardCapOversized));
assert!(result.content.len() <= MAX_DOCUMENT_BYTES_HARD + 20);
assert!(result.content.ends_with("[truncated]"));
}
#[test]
fn test_hard_cap_under_limit() {
let content = "Short content";
let result = truncate_hard_cap(content);
assert!(!result.is_truncated);
assert_eq!(result.content, content);
}
#[test]
fn test_marker_count_correct() {
// 7 notes, keep first 1 + last 1, drop middle 5
let big_body = "x".repeat(5000);
let notes: Vec<NoteContent> = (0..7)
.map(|i| make_note(&format!("user{}", i), &big_body))
.collect();
let result = truncate_discussion(&notes, 12_000);
assert!(result.is_truncated);
assert!(result.content.contains("[... 5 notes omitted for length ...]"));
}
#[test]
fn test_truncation_reason_as_str() {
assert_eq!(
TruncationReason::TokenLimitMiddleDrop.as_str(),
"token_limit_middle_drop"
);
assert_eq!(
TruncationReason::SingleNoteOversized.as_str(),
"single_note_oversized"
);
assert_eq!(
TruncationReason::FirstLastOversized.as_str(),
"first_last_oversized"
);
assert_eq!(
TruncationReason::HardCapOversized.as_str(),
"hard_cap_oversized"
);
}
}

View File

@@ -0,0 +1,258 @@
use rusqlite::Connection;
use crate::core::backoff::compute_next_attempt_at;
use crate::core::error::Result;
use crate::core::time::now_ms;
use crate::documents::SourceType;
const DIRTY_SOURCES_BATCH_SIZE: usize = 500;
/// Mark a source entity as dirty INSIDE an existing transaction.
/// ON CONFLICT resets ALL backoff/error state so fresh updates are immediately eligible.
pub fn mark_dirty_tx(
tx: &rusqlite::Transaction<'_>,
source_type: SourceType,
source_id: i64,
) -> Result<()> {
tx.execute(
"INSERT INTO dirty_sources (source_type, source_id, queued_at)
VALUES (?1, ?2, ?3)
ON CONFLICT(source_type, source_id) DO UPDATE SET
queued_at = excluded.queued_at,
attempt_count = 0,
last_attempt_at = NULL,
last_error = NULL,
next_attempt_at = NULL",
rusqlite::params![source_type.as_str(), source_id, now_ms()],
)?;
Ok(())
}
/// Convenience wrapper for non-transactional contexts.
pub fn mark_dirty(conn: &Connection, source_type: SourceType, source_id: i64) -> Result<()> {
conn.execute(
"INSERT INTO dirty_sources (source_type, source_id, queued_at)
VALUES (?1, ?2, ?3)
ON CONFLICT(source_type, source_id) DO UPDATE SET
queued_at = excluded.queued_at,
attempt_count = 0,
last_attempt_at = NULL,
last_error = NULL,
next_attempt_at = NULL",
rusqlite::params![source_type.as_str(), source_id, now_ms()],
)?;
Ok(())
}
/// Get dirty sources ready for processing.
/// Returns entries where next_attempt_at is NULL or <= now.
/// Orders by attempt_count ASC (fresh before failed), then queued_at ASC.
pub fn get_dirty_sources(conn: &Connection) -> Result<Vec<(SourceType, i64)>> {
let now = now_ms();
let mut stmt = conn.prepare(
"SELECT source_type, source_id FROM dirty_sources
WHERE next_attempt_at IS NULL OR next_attempt_at <= ?1
ORDER BY attempt_count ASC, queued_at ASC
LIMIT ?2"
)?;
let rows = stmt
.query_map(rusqlite::params![now, DIRTY_SOURCES_BATCH_SIZE as i64], |row| {
let st_str: String = row.get(0)?;
let source_id: i64 = row.get(1)?;
Ok((st_str, source_id))
})?
.collect::<std::result::Result<Vec<_>, _>>()?;
let mut results = Vec::with_capacity(rows.len());
for (st_str, source_id) in rows {
let source_type = SourceType::parse(&st_str).ok_or_else(|| {
crate::core::error::LoreError::Other(format!(
"Invalid source_type in dirty_sources: {}",
st_str
))
})?;
results.push((source_type, source_id));
}
Ok(results)
}
/// Clear dirty entry after successful processing.
pub fn clear_dirty(conn: &Connection, source_type: SourceType, source_id: i64) -> Result<()> {
conn.execute(
"DELETE FROM dirty_sources WHERE source_type = ?1 AND source_id = ?2",
rusqlite::params![source_type.as_str(), source_id],
)?;
Ok(())
}
/// Record an error for a dirty source, incrementing attempt_count and setting backoff.
pub fn record_dirty_error(
conn: &Connection,
source_type: SourceType,
source_id: i64,
error: &str,
) -> Result<()> {
let now = now_ms();
// Get current attempt_count first
let attempt_count: i64 = conn.query_row(
"SELECT attempt_count FROM dirty_sources WHERE source_type = ?1 AND source_id = ?2",
rusqlite::params![source_type.as_str(), source_id],
|row| row.get(0),
)?;
let new_attempt = attempt_count + 1;
let next_at = compute_next_attempt_at(now, new_attempt);
conn.execute(
"UPDATE dirty_sources SET
attempt_count = ?1,
last_attempt_at = ?2,
last_error = ?3,
next_attempt_at = ?4
WHERE source_type = ?5 AND source_id = ?6",
rusqlite::params![new_attempt, now, error, next_at, source_type.as_str(), source_id],
)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
fn setup_db() -> Connection {
let conn = Connection::open_in_memory().unwrap();
conn.execute_batch("
CREATE TABLE dirty_sources (
source_type TEXT NOT NULL CHECK (source_type IN ('issue','merge_request','discussion')),
source_id INTEGER NOT NULL,
queued_at INTEGER NOT NULL,
attempt_count INTEGER NOT NULL DEFAULT 0,
last_attempt_at INTEGER,
last_error TEXT,
next_attempt_at INTEGER,
PRIMARY KEY(source_type, source_id)
);
CREATE INDEX idx_dirty_sources_next_attempt ON dirty_sources(next_attempt_at);
").unwrap();
conn
}
#[test]
fn test_mark_dirty_inserts() {
let conn = setup_db();
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
let count: i64 = conn.query_row("SELECT COUNT(*) FROM dirty_sources", [], |r| r.get(0)).unwrap();
assert_eq!(count, 1);
}
#[test]
fn test_mark_dirty_tx_inserts() {
let mut conn = setup_db();
{
let tx = conn.transaction().unwrap();
mark_dirty_tx(&tx, SourceType::Issue, 1).unwrap();
tx.commit().unwrap();
}
let count: i64 = conn.query_row("SELECT COUNT(*) FROM dirty_sources", [], |r| r.get(0)).unwrap();
assert_eq!(count, 1);
}
#[test]
fn test_requeue_resets_backoff() {
let conn = setup_db();
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
// Simulate error state
record_dirty_error(&conn, SourceType::Issue, 1, "test error").unwrap();
let attempt: i64 = conn.query_row(
"SELECT attempt_count FROM dirty_sources WHERE source_id = 1", [], |r| r.get(0)
).unwrap();
assert_eq!(attempt, 1);
// Re-mark should reset
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
let attempt: i64 = conn.query_row(
"SELECT attempt_count FROM dirty_sources WHERE source_id = 1", [], |r| r.get(0)
).unwrap();
assert_eq!(attempt, 0);
let next_at: Option<i64> = conn.query_row(
"SELECT next_attempt_at FROM dirty_sources WHERE source_id = 1", [], |r| r.get(0)
).unwrap();
assert!(next_at.is_none());
}
#[test]
fn test_get_respects_backoff() {
let conn = setup_db();
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
// Set next_attempt_at far in the future
conn.execute(
"UPDATE dirty_sources SET next_attempt_at = 9999999999999 WHERE source_id = 1",
[],
).unwrap();
let results = get_dirty_sources(&conn).unwrap();
assert!(results.is_empty());
}
#[test]
fn test_get_orders_by_attempt_count() {
let conn = setup_db();
// Insert issue 1 (failed, attempt_count=2)
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
conn.execute(
"UPDATE dirty_sources SET attempt_count = 2 WHERE source_id = 1",
[],
).unwrap();
// Insert issue 2 (fresh, attempt_count=0)
mark_dirty(&conn, SourceType::Issue, 2).unwrap();
let results = get_dirty_sources(&conn).unwrap();
assert_eq!(results.len(), 2);
assert_eq!(results[0].1, 2); // Fresh first
assert_eq!(results[1].1, 1); // Failed second
}
#[test]
fn test_batch_size_500() {
let conn = setup_db();
for i in 0..600 {
mark_dirty(&conn, SourceType::Issue, i).unwrap();
}
let results = get_dirty_sources(&conn).unwrap();
assert_eq!(results.len(), 500);
}
#[test]
fn test_clear_removes() {
let conn = setup_db();
mark_dirty(&conn, SourceType::Issue, 1).unwrap();
clear_dirty(&conn, SourceType::Issue, 1).unwrap();
let count: i64 = conn.query_row("SELECT COUNT(*) FROM dirty_sources", [], |r| r.get(0)).unwrap();
assert_eq!(count, 0);
}
#[test]
fn test_drain_loop() {
let conn = setup_db();
for i in 0..1200 {
mark_dirty(&conn, SourceType::Issue, i).unwrap();
}
let mut total = 0;
loop {
let batch = get_dirty_sources(&conn).unwrap();
if batch.is_empty() {
break;
}
for (st, id) in &batch {
clear_dirty(&conn, *st, *id).unwrap();
}
total += batch.len();
}
assert_eq!(total, 1200);
}
}

View File

@@ -0,0 +1,265 @@
use rusqlite::Connection;
use crate::core::backoff::compute_next_attempt_at;
use crate::core::error::Result;
use crate::core::time::now_ms;
/// Noteable type for discussion queue.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NoteableType {
Issue,
MergeRequest,
}
impl NoteableType {
pub fn as_str(&self) -> &'static str {
match self {
Self::Issue => "Issue",
Self::MergeRequest => "MergeRequest",
}
}
pub fn parse(s: &str) -> Option<Self> {
match s {
"Issue" => Some(Self::Issue),
"MergeRequest" => Some(Self::MergeRequest),
_ => None,
}
}
}
/// A pending discussion fetch entry.
pub struct PendingFetch {
pub project_id: i64,
pub noteable_type: NoteableType,
pub noteable_iid: i64,
pub attempt_count: i32,
}
/// Queue a discussion fetch. ON CONFLICT resets backoff (consistent with dirty_sources).
pub fn queue_discussion_fetch(
conn: &Connection,
project_id: i64,
noteable_type: NoteableType,
noteable_iid: i64,
) -> Result<()> {
conn.execute(
"INSERT INTO pending_discussion_fetches (project_id, noteable_type, noteable_iid, queued_at)
VALUES (?1, ?2, ?3, ?4)
ON CONFLICT(project_id, noteable_type, noteable_iid) DO UPDATE SET
queued_at = excluded.queued_at,
attempt_count = 0,
last_attempt_at = NULL,
last_error = NULL,
next_attempt_at = NULL",
rusqlite::params![project_id, noteable_type.as_str(), noteable_iid, now_ms()],
)?;
Ok(())
}
/// Get next batch of pending fetches (WHERE next_attempt_at IS NULL OR <= now).
pub fn get_pending_fetches(conn: &Connection, limit: usize) -> Result<Vec<PendingFetch>> {
let now = now_ms();
let mut stmt = conn.prepare(
"SELECT project_id, noteable_type, noteable_iid, attempt_count
FROM pending_discussion_fetches
WHERE next_attempt_at IS NULL OR next_attempt_at <= ?1
ORDER BY queued_at ASC
LIMIT ?2"
)?;
let rows = stmt
.query_map(rusqlite::params![now, limit as i64], |row| {
Ok((
row.get::<_, i64>(0)?,
row.get::<_, String>(1)?,
row.get::<_, i64>(2)?,
row.get::<_, i32>(3)?,
))
})?
.collect::<std::result::Result<Vec<_>, _>>()?;
let mut results = Vec::with_capacity(rows.len());
for (project_id, nt_str, noteable_iid, attempt_count) in rows {
let noteable_type = NoteableType::parse(&nt_str).ok_or_else(|| {
crate::core::error::LoreError::Other(format!(
"Invalid noteable_type in pending_discussion_fetches: {}",
nt_str
))
})?;
results.push(PendingFetch {
project_id,
noteable_type,
noteable_iid,
attempt_count,
});
}
Ok(results)
}
/// Mark fetch complete (remove from queue).
pub fn complete_fetch(
conn: &Connection,
project_id: i64,
noteable_type: NoteableType,
noteable_iid: i64,
) -> Result<()> {
conn.execute(
"DELETE FROM pending_discussion_fetches
WHERE project_id = ?1 AND noteable_type = ?2 AND noteable_iid = ?3",
rusqlite::params![project_id, noteable_type.as_str(), noteable_iid],
)?;
Ok(())
}
/// Record fetch error with backoff.
pub fn record_fetch_error(
conn: &Connection,
project_id: i64,
noteable_type: NoteableType,
noteable_iid: i64,
error: &str,
) -> Result<()> {
let now = now_ms();
let attempt_count: i64 = conn.query_row(
"SELECT attempt_count FROM pending_discussion_fetches
WHERE project_id = ?1 AND noteable_type = ?2 AND noteable_iid = ?3",
rusqlite::params![project_id, noteable_type.as_str(), noteable_iid],
|row| row.get(0),
)?;
let new_attempt = attempt_count + 1;
let next_at = compute_next_attempt_at(now, new_attempt);
conn.execute(
"UPDATE pending_discussion_fetches SET
attempt_count = ?1,
last_attempt_at = ?2,
last_error = ?3,
next_attempt_at = ?4
WHERE project_id = ?5 AND noteable_type = ?6 AND noteable_iid = ?7",
rusqlite::params![new_attempt, now, error, next_at, project_id, noteable_type.as_str(), noteable_iid],
)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
fn setup_db() -> Connection {
let conn = Connection::open_in_memory().unwrap();
conn.execute_batch("
CREATE TABLE projects (
id INTEGER PRIMARY KEY,
gitlab_project_id INTEGER UNIQUE NOT NULL,
path_with_namespace TEXT NOT NULL,
default_branch TEXT,
web_url TEXT,
created_at INTEGER,
updated_at INTEGER,
raw_payload_id INTEGER
);
INSERT INTO projects (id, gitlab_project_id, path_with_namespace) VALUES (1, 100, 'group/project');
CREATE TABLE pending_discussion_fetches (
project_id INTEGER NOT NULL REFERENCES projects(id),
noteable_type TEXT NOT NULL,
noteable_iid INTEGER NOT NULL,
queued_at INTEGER NOT NULL,
attempt_count INTEGER NOT NULL DEFAULT 0,
last_attempt_at INTEGER,
last_error TEXT,
next_attempt_at INTEGER,
PRIMARY KEY(project_id, noteable_type, noteable_iid)
);
CREATE INDEX idx_pending_discussions_next_attempt ON pending_discussion_fetches(next_attempt_at);
").unwrap();
conn
}
#[test]
fn test_queue_and_get() {
let conn = setup_db();
queue_discussion_fetch(&conn, 1, NoteableType::Issue, 42).unwrap();
let fetches = get_pending_fetches(&conn, 100).unwrap();
assert_eq!(fetches.len(), 1);
assert_eq!(fetches[0].project_id, 1);
assert_eq!(fetches[0].noteable_type, NoteableType::Issue);
assert_eq!(fetches[0].noteable_iid, 42);
assert_eq!(fetches[0].attempt_count, 0);
}
#[test]
fn test_requeue_resets_backoff() {
let conn = setup_db();
queue_discussion_fetch(&conn, 1, NoteableType::Issue, 42).unwrap();
record_fetch_error(&conn, 1, NoteableType::Issue, 42, "network error").unwrap();
let attempt: i32 = conn.query_row(
"SELECT attempt_count FROM pending_discussion_fetches WHERE noteable_iid = 42",
[], |r| r.get(0),
).unwrap();
assert_eq!(attempt, 1);
// Re-queue should reset
queue_discussion_fetch(&conn, 1, NoteableType::Issue, 42).unwrap();
let attempt: i32 = conn.query_row(
"SELECT attempt_count FROM pending_discussion_fetches WHERE noteable_iid = 42",
[], |r| r.get(0),
).unwrap();
assert_eq!(attempt, 0);
}
#[test]
fn test_backoff_respected() {
let conn = setup_db();
queue_discussion_fetch(&conn, 1, NoteableType::Issue, 42).unwrap();
conn.execute(
"UPDATE pending_discussion_fetches SET next_attempt_at = 9999999999999 WHERE noteable_iid = 42",
[],
).unwrap();
let fetches = get_pending_fetches(&conn, 100).unwrap();
assert!(fetches.is_empty());
}
#[test]
fn test_complete_removes() {
let conn = setup_db();
queue_discussion_fetch(&conn, 1, NoteableType::Issue, 42).unwrap();
complete_fetch(&conn, 1, NoteableType::Issue, 42).unwrap();
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM pending_discussion_fetches", [], |r| r.get(0),
).unwrap();
assert_eq!(count, 0);
}
#[test]
fn test_error_increments_attempts() {
let conn = setup_db();
queue_discussion_fetch(&conn, 1, NoteableType::MergeRequest, 10).unwrap();
record_fetch_error(&conn, 1, NoteableType::MergeRequest, 10, "timeout").unwrap();
let (attempt, error): (i32, Option<String>) = conn.query_row(
"SELECT attempt_count, last_error FROM pending_discussion_fetches WHERE noteable_iid = 10",
[], |r| Ok((r.get(0)?, r.get(1)?)),
).unwrap();
assert_eq!(attempt, 1);
assert_eq!(error, Some("timeout".to_string()));
let next_at: Option<i64> = conn.query_row(
"SELECT next_attempt_at FROM pending_discussion_fetches WHERE noteable_iid = 10",
[], |r| r.get(0),
).unwrap();
assert!(next_at.is_some());
}
#[test]
fn test_noteable_type_parse() {
assert_eq!(NoteableType::parse("Issue"), Some(NoteableType::Issue));
assert_eq!(NoteableType::parse("MergeRequest"), Some(NoteableType::MergeRequest));
assert_eq!(NoteableType::parse("invalid"), None);
}
}

View File

@@ -3,6 +3,8 @@
//! This module handles fetching and storing issues, discussions, and notes
//! from GitLab with cursor-based incremental sync.
pub mod dirty_tracker;
pub mod discussion_queue;
pub mod discussions;
pub mod issues;
pub mod merge_requests;