feat: implement per-note search and document pipeline

- Add SourceType::Note with extract_note_document() and ParentMetadataCache
- Migration 022: composite indexes for notes queries + author_id column
- Migration 024: table rebuild adding 'note' to CHECK constraints, defense triggers
- Migration 025: backfill existing non-system notes into dirty queue
- Add lore notes CLI command with 17 filter options (author, path, resolution, etc.)
- Support table/json/jsonl/csv output formats with field selection
- Wire note dirty tracking through discussion and MR discussion ingestion
- Fix test_migration_024_preserves_existing_data off-by-one (tested wrong migration)
- Fix upsert_document_inner returning false for label/path-only changes
This commit is contained in:
teernisse
2026-02-12 12:37:11 -05:00
parent fda9cd8835
commit 83cd16c918
21 changed files with 5345 additions and 126 deletions

View File

@@ -14,6 +14,7 @@ use crate::gitlab::transformers::{
};
use crate::gitlab::types::GitLabDiscussion;
use crate::ingestion::dirty_tracker;
use crate::ingestion::discussions::NoteUpsertOutcome;
use super::merge_requests::MrForDiscussionSync;
@@ -161,6 +162,16 @@ pub fn write_prefetched_mr_discussions(
dirty_tracker::mark_dirty_tx(&tx, SourceType::Discussion, local_discussion_id)?;
// Mark child note documents dirty (they inherit parent metadata)
tx.execute(
"INSERT INTO dirty_sources (source_type, source_id, queued_at)
SELECT 'note', n.id, ?1
FROM notes n
WHERE n.discussion_id = ?2 AND n.is_system = 0
ON CONFLICT(source_type, source_id) DO UPDATE SET queued_at = excluded.queued_at, attempt_count = 0",
params![now_ms(), local_discussion_id],
)?;
for note in &disc.notes {
let should_store_payload = !note.is_system
|| note.position_new_path.is_some()
@@ -187,7 +198,11 @@ pub fn write_prefetched_mr_discussions(
None
};
upsert_note(&tx, local_discussion_id, note, run_seen_at, note_payload_id)?;
let outcome =
upsert_note(&tx, local_discussion_id, note, run_seen_at, note_payload_id)?;
if !note.is_system && outcome.changed_semantics {
dirty_tracker::mark_dirty_tx(&tx, SourceType::Note, outcome.local_note_id)?;
}
}
tx.commit()?;
@@ -361,6 +376,16 @@ async fn ingest_discussions_for_mr(
dirty_tracker::mark_dirty_tx(&tx, SourceType::Discussion, local_discussion_id)?;
// Mark child note documents dirty (they inherit parent metadata)
tx.execute(
"INSERT INTO dirty_sources (source_type, source_id, queued_at)
SELECT 'note', n.id, ?1
FROM notes n
WHERE n.discussion_id = ?2 AND n.is_system = 0
ON CONFLICT(source_type, source_id) DO UPDATE SET queued_at = excluded.queued_at, attempt_count = 0",
params![now_ms(), local_discussion_id],
)?;
for note in &notes {
let should_store_payload = !note.is_system
|| note.position_new_path.is_some()
@@ -390,7 +415,11 @@ async fn ingest_discussions_for_mr(
None
};
upsert_note(&tx, local_discussion_id, note, run_seen_at, note_payload_id)?;
let outcome =
upsert_note(&tx, local_discussion_id, note, run_seen_at, note_payload_id)?;
if !note.is_system && outcome.changed_semantics {
dirty_tracker::mark_dirty_tx(&tx, SourceType::Note, outcome.local_note_id)?;
}
}
tx.commit()?;
@@ -473,19 +502,87 @@ fn upsert_note(
note: &NormalizedNote,
last_seen_at: i64,
payload_id: Option<i64>,
) -> Result<()> {
) -> Result<NoteUpsertOutcome> {
// Pre-read for semantic change detection
let existing = conn
.query_row(
"SELECT id, body, note_type, resolved, resolved_by,
position_old_path, position_new_path, position_old_line, position_new_line,
position_type, position_line_range_start, position_line_range_end,
position_base_sha, position_start_sha, position_head_sha
FROM notes WHERE gitlab_id = ?",
params![note.gitlab_id],
|row| {
Ok((
row.get::<_, i64>(0)?,
row.get::<_, String>(1)?,
row.get::<_, Option<String>>(2)?,
row.get::<_, bool>(3)?,
row.get::<_, Option<String>>(4)?,
row.get::<_, Option<String>>(5)?,
row.get::<_, Option<String>>(6)?,
row.get::<_, Option<i32>>(7)?,
row.get::<_, Option<i32>>(8)?,
row.get::<_, Option<String>>(9)?,
row.get::<_, Option<i32>>(10)?,
row.get::<_, Option<i32>>(11)?,
row.get::<_, Option<String>>(12)?,
row.get::<_, Option<String>>(13)?,
row.get::<_, Option<String>>(14)?,
))
},
)
.ok();
let changed_semantics = match &existing {
Some((
_id,
body,
note_type,
resolved,
resolved_by,
pos_old_path,
pos_new_path,
pos_old_line,
pos_new_line,
pos_type,
pos_range_start,
pos_range_end,
pos_base_sha,
pos_start_sha,
pos_head_sha,
)) => {
*body != note.body
|| *note_type != note.note_type
|| *resolved != note.resolved
|| *resolved_by != note.resolved_by
|| *pos_old_path != note.position_old_path
|| *pos_new_path != note.position_new_path
|| *pos_old_line != note.position_old_line
|| *pos_new_line != note.position_new_line
|| *pos_type != note.position_type
|| *pos_range_start != note.position_line_range_start
|| *pos_range_end != note.position_line_range_end
|| *pos_base_sha != note.position_base_sha
|| *pos_start_sha != note.position_start_sha
|| *pos_head_sha != note.position_head_sha
}
None => true,
};
conn.execute(
"INSERT INTO notes (
gitlab_id, discussion_id, project_id, note_type, is_system,
author_username, body, created_at, updated_at, last_seen_at,
author_id, author_username, body, created_at, updated_at, last_seen_at,
position, resolvable, resolved, resolved_by, resolved_at,
position_old_path, position_new_path, position_old_line, position_new_line,
position_type, position_line_range_start, position_line_range_end,
position_base_sha, position_start_sha, position_head_sha,
raw_payload_id
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22, ?23, ?24, ?25, ?26)
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22, ?23, ?24, ?25, ?26, ?27)
ON CONFLICT(gitlab_id) DO UPDATE SET
note_type = excluded.note_type,
author_id = excluded.author_id,
body = excluded.body,
updated_at = excluded.updated_at,
last_seen_at = excluded.last_seen_at,
@@ -510,6 +607,7 @@ fn upsert_note(
note.project_id,
&note.note_type,
note.is_system,
note.author_id,
&note.author_username,
&note.body,
note.created_at,
@@ -533,7 +631,17 @@ fn upsert_note(
payload_id,
],
)?;
Ok(())
let local_note_id: i64 = conn.query_row(
"SELECT id FROM notes WHERE gitlab_id = ?",
params![note.gitlab_id],
|row| row.get(0),
)?;
Ok(NoteUpsertOutcome {
local_note_id,
changed_semantics,
})
}
fn sweep_stale_discussions(conn: &Connection, local_mr_id: i64, run_seen_at: i64) -> Result<usize> {
@@ -554,13 +662,36 @@ fn sweep_stale_notes(
local_mr_id: i64,
run_seen_at: i64,
) -> Result<usize> {
// Step 1: Delete note documents for stale notes
conn.execute(
"DELETE FROM documents WHERE source_type = 'note' AND source_id IN
(SELECT id FROM notes
WHERE project_id = ?1
AND discussion_id IN (SELECT id FROM discussions WHERE merge_request_id = ?2)
AND last_seen_at < ?3
AND is_system = 0)",
params![local_project_id, local_mr_id, run_seen_at],
)?;
// Step 2: Delete dirty_sources entries for stale notes
conn.execute(
"DELETE FROM dirty_sources WHERE source_type = 'note' AND source_id IN
(SELECT id FROM notes
WHERE project_id = ?1
AND discussion_id IN (SELECT id FROM discussions WHERE merge_request_id = ?2)
AND last_seen_at < ?3
AND is_system = 0)",
params![local_project_id, local_mr_id, run_seen_at],
)?;
// Step 3: Delete the stale notes themselves
let deleted = conn.execute(
"DELETE FROM notes
WHERE project_id = ?
WHERE project_id = ?1
AND discussion_id IN (
SELECT id FROM discussions WHERE merge_request_id = ?
SELECT id FROM discussions WHERE merge_request_id = ?2
)
AND last_seen_at < ?",
AND last_seen_at < ?3",
params![local_project_id, local_mr_id, run_seen_at],
)?;
if deleted > 0 {
@@ -604,6 +735,8 @@ fn clear_sync_health_error(conn: &Connection, local_mr_id: i64) -> Result<()> {
#[cfg(test)]
mod tests {
use super::*;
use crate::core::db::{create_connection, run_migrations};
use std::path::Path;
#[test]
fn result_default_has_zero_counts() {
@@ -621,4 +754,153 @@ mod tests {
let result = IngestMrDiscussionsResult::default();
assert!(!result.pagination_succeeded);
}
fn setup_mr() -> Connection {
let conn = create_connection(Path::new(":memory:")).unwrap();
run_migrations(&conn).unwrap();
conn.execute(
"INSERT INTO projects (gitlab_project_id, path_with_namespace, web_url) \
VALUES (1, 'group/repo', 'https://gitlab.com/group/repo')",
[],
)
.unwrap();
conn.execute(
"INSERT INTO merge_requests (gitlab_id, iid, project_id, title, state, \
author_username, source_branch, target_branch, created_at, updated_at, last_seen_at) \
VALUES (200, 1, 1, 'Test MR', 'opened', 'testuser', 'feat', 'main', 1000, 2000, 3000)",
[],
)
.unwrap();
conn.execute(
"INSERT INTO discussions (gitlab_discussion_id, project_id, merge_request_id, noteable_type, \
individual_note, last_seen_at, resolvable, resolved) \
VALUES ('mr-disc-1', 1, 1, 'MergeRequest', 0, 3000, 0, 0)",
[],
)
.unwrap();
conn
}
fn get_mr_discussion_id(conn: &Connection) -> i64 {
conn.query_row("SELECT id FROM discussions LIMIT 1", [], |row| row.get(0))
.unwrap()
}
#[allow(clippy::too_many_arguments)]
fn make_mr_note(
gitlab_id: i64,
project_id: i64,
body: &str,
note_type: Option<&str>,
created_at: i64,
updated_at: i64,
resolved: bool,
resolved_by: Option<&str>,
) -> NormalizedNote {
NormalizedNote {
gitlab_id,
project_id,
note_type: note_type.map(String::from),
is_system: false,
author_id: None,
author_username: "testuser".to_string(),
body: body.to_string(),
created_at,
updated_at,
last_seen_at: updated_at,
position: 0,
resolvable: false,
resolved,
resolved_by: resolved_by.map(String::from),
resolved_at: None,
position_old_path: None,
position_new_path: None,
position_old_line: None,
position_new_line: None,
position_type: None,
position_line_range_start: None,
position_line_range_end: None,
position_base_sha: None,
position_start_sha: None,
position_head_sha: None,
}
}
#[test]
fn test_mr_note_upsert_captures_author_id() {
let conn = setup_mr();
let disc_id = get_mr_discussion_id(&conn);
let mut note = make_mr_note(8001, 1, "MR note", None, 1000, 2000, false, None);
note.author_id = Some(12345);
upsert_note(&conn, disc_id, &note, 5000, None).unwrap();
let stored: Option<i64> = conn
.query_row(
"SELECT author_id FROM notes WHERE gitlab_id = ?",
[8001_i64],
|row| row.get(0),
)
.unwrap();
assert_eq!(stored, Some(12345));
}
fn insert_note_document(conn: &Connection, note_local_id: i64) {
conn.execute(
"INSERT INTO documents (source_type, source_id, project_id, content_text, content_hash) \
VALUES ('note', ?1, 1, 'note content', 'hash123')",
[note_local_id],
)
.unwrap();
}
fn count_note_documents(conn: &Connection, note_local_id: i64) -> i64 {
conn.query_row(
"SELECT COUNT(*) FROM documents WHERE source_type = 'note' AND source_id = ?",
[note_local_id],
|row| row.get(0),
)
.unwrap()
}
#[test]
fn test_mr_note_sweep_deletes_note_documents_immediately() {
let conn = setup_mr();
let disc_id = get_mr_discussion_id(&conn);
let local_project_id = 1;
let local_mr_id = 1;
// Insert 3 notes
let note1 = make_mr_note(8101, 1, "Keep", None, 1000, 2000, false, None);
let note2 = make_mr_note(8102, 1, "Keep too", None, 1000, 2000, false, None);
let note3 = make_mr_note(8103, 1, "Stale", None, 1000, 2000, false, None);
let out1 = upsert_note(&conn, disc_id, &note1, 5000, None).unwrap();
let out2 = upsert_note(&conn, disc_id, &note2, 5000, None).unwrap();
let out3 = upsert_note(&conn, disc_id, &note3, 5000, None).unwrap();
// Add documents for all 3
insert_note_document(&conn, out1.local_note_id);
insert_note_document(&conn, out2.local_note_id);
insert_note_document(&conn, out3.local_note_id);
// Re-sync only notes 1 and 2
upsert_note(&conn, disc_id, &note1, 6000, None).unwrap();
upsert_note(&conn, disc_id, &note2, 6000, None).unwrap();
// Sweep stale notes
sweep_stale_notes(&conn, local_project_id, local_mr_id, 6000).unwrap();
// Stale note's document should be gone
assert_eq!(count_note_documents(&conn, out3.local_note_id), 0);
// Kept notes' documents should survive
assert_eq!(count_note_documents(&conn, out1.local_note_id), 1);
assert_eq!(count_note_documents(&conn, out2.local_note_id), 1);
}
}