feat: Implement Gate 3 timeline pipeline and Gate 4 migration scaffolding

Complete 5 beads for the Phase B temporal intelligence feature:

- bd-1oo: Register migration 015 (commit SHAs, closes watermark) and
  create migration 016 (mr_file_changes table with 4 indexes for
  Gate 4 file-history)

- bd-20e: Define TimelineEvent model with 9 event type variants,
  EntityRef, ExpandedEntityRef, UnresolvedRef, and TimelineResult
  types. Ord impl for chronological sorting with stable tiebreak.

- bd-32q: Implement timeline seed phase - FTS5 keyword search to
  entity IDs with discussion-to-parent resolution, entity dedup,
  and evidence note extraction with snippet truncation.

- bd-ypa: Implement timeline expand phase - BFS cross-reference
  expansion over entity_references with bidirectional traversal,
  depth limiting, mention filtering, provenance tracking, and
  unresolved reference collection.

- bd-3as: Implement timeline event collection - gathers Created,
  StateChanged, LabelAdded/Removed, MilestoneSet/Removed, Merged,
  and NoteEvidence events. Merged dedup (state=merged -> Merged
  variant only). NULL label/milestone fallbacks. Chronological
  interleaving with since filter and limit.

38 new tests, all 445 tests pass. All quality gates clean.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Taylor Eernisse
2026-02-05 16:54:28 -05:00
parent d1b2b5fa7d
commit 3767c33c28
9 changed files with 2143 additions and 6 deletions

573
src/core/timeline_seed.rs Normal file
View File

@@ -0,0 +1,573 @@
use std::collections::HashSet;
use rusqlite::Connection;
use crate::core::error::Result;
use crate::core::timeline::{EntityRef, TimelineEvent, TimelineEventType};
use crate::search::{FtsQueryMode, to_fts_query};
/// Result of the seed + hydrate phases.
pub struct SeedResult {
pub seed_entities: Vec<EntityRef>,
pub evidence_notes: Vec<TimelineEvent>,
}
/// Run the SEED + HYDRATE phases of the timeline pipeline.
///
/// 1. SEED: FTS5 keyword search over documents -> matched document IDs
/// 2. HYDRATE: Map document IDs -> source entities + top matched notes as evidence
///
/// Discussion documents are resolved to their parent entity (issue or MR).
/// Entities are deduplicated. Evidence notes are capped at `max_evidence`.
pub fn seed_timeline(
conn: &Connection,
query: &str,
project_id: Option<i64>,
since_ms: Option<i64>,
max_seeds: usize,
max_evidence: usize,
) -> Result<SeedResult> {
let fts_query = to_fts_query(query, FtsQueryMode::Safe);
if fts_query.is_empty() {
return Ok(SeedResult {
seed_entities: Vec::new(),
evidence_notes: Vec::new(),
});
}
let seed_entities = find_seed_entities(conn, &fts_query, project_id, since_ms, max_seeds)?;
let evidence_notes = find_evidence_notes(conn, &fts_query, project_id, since_ms, max_evidence)?;
Ok(SeedResult {
seed_entities,
evidence_notes,
})
}
/// Find seed entities via FTS5 search, resolving discussions to their parent entity.
fn find_seed_entities(
conn: &Connection,
fts_query: &str,
project_id: Option<i64>,
since_ms: Option<i64>,
max_seeds: usize,
) -> Result<Vec<EntityRef>> {
let sql = r"
SELECT d.source_type, d.source_id, d.project_id,
disc.issue_id, disc.merge_request_id
FROM documents_fts
JOIN documents d ON d.id = documents_fts.rowid
LEFT JOIN discussions disc ON disc.id = d.source_id AND d.source_type = 'discussion'
WHERE documents_fts MATCH ?1
AND (?2 IS NULL OR d.project_id = ?2)
AND (?3 IS NULL OR d.updated_at >= ?3)
ORDER BY rank
LIMIT ?4
";
let mut stmt = conn.prepare(sql)?;
let rows = stmt.query_map(
rusqlite::params![fts_query, project_id, since_ms, (max_seeds * 3) as i64],
|row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, i64>(1)?,
row.get::<_, i64>(2)?,
row.get::<_, Option<i64>>(3)?,
row.get::<_, Option<i64>>(4)?,
))
},
)?;
let mut seen = HashSet::new();
let mut entities = Vec::new();
for row_result in rows {
let (source_type, source_id, proj_id, disc_issue_id, disc_mr_id) = row_result?;
let (entity_type, entity_id) = match source_type.as_str() {
"issue" => ("issue".to_owned(), source_id),
"merge_request" => ("merge_request".to_owned(), source_id),
"discussion" => {
if let Some(issue_id) = disc_issue_id {
("issue".to_owned(), issue_id)
} else if let Some(mr_id) = disc_mr_id {
("merge_request".to_owned(), mr_id)
} else {
continue; // orphaned discussion
}
}
_ => continue,
};
let key = (entity_type.clone(), entity_id);
if !seen.insert(key) {
continue;
}
if let Some(entity_ref) = resolve_entity(conn, &entity_type, entity_id, proj_id)? {
entities.push(entity_ref);
}
if entities.len() >= max_seeds {
break;
}
}
Ok(entities)
}
/// Resolve an entity ID to a full EntityRef with iid and project_path.
fn resolve_entity(
conn: &Connection,
entity_type: &str,
entity_id: i64,
project_id: i64,
) -> Result<Option<EntityRef>> {
let (table, id_col) = match entity_type {
"issue" => ("issues", "id"),
"merge_request" => ("merge_requests", "id"),
_ => return Ok(None),
};
let sql = format!(
"SELECT e.iid, p.path_with_namespace
FROM {table} e
JOIN projects p ON p.id = e.project_id
WHERE e.{id_col} = ?1 AND e.project_id = ?2"
);
let result = conn.query_row(&sql, rusqlite::params![entity_id, project_id], |row| {
Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
});
match result {
Ok((iid, project_path)) => Ok(Some(EntityRef {
entity_type: entity_type.to_owned(),
entity_id,
entity_iid: iid,
project_path,
})),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
/// Find evidence notes: FTS5-matched discussion notes that provide context.
fn find_evidence_notes(
conn: &Connection,
fts_query: &str,
project_id: Option<i64>,
since_ms: Option<i64>,
max_evidence: usize,
) -> Result<Vec<TimelineEvent>> {
let sql = r"
SELECT n.id AS note_id, n.body, n.created_at, n.author_username,
disc.id AS discussion_id,
CASE WHEN disc.issue_id IS NOT NULL THEN 'issue' ELSE 'merge_request' END AS parent_type,
COALESCE(disc.issue_id, disc.merge_request_id) AS parent_entity_id,
d.project_id
FROM documents_fts
JOIN documents d ON d.id = documents_fts.rowid
JOIN discussions disc ON disc.id = d.source_id AND d.source_type = 'discussion'
JOIN notes n ON n.discussion_id = disc.id AND n.is_system = 0
WHERE documents_fts MATCH ?1
AND (?2 IS NULL OR d.project_id = ?2)
AND (?3 IS NULL OR d.updated_at >= ?3)
ORDER BY rank
LIMIT ?4
";
let mut stmt = conn.prepare(sql)?;
let rows = stmt.query_map(
rusqlite::params![fts_query, project_id, since_ms, max_evidence as i64],
|row| {
Ok((
row.get::<_, i64>(0)?, // note_id
row.get::<_, Option<String>>(1)?, // body
row.get::<_, i64>(2)?, // created_at
row.get::<_, Option<String>>(3)?, // author
row.get::<_, i64>(4)?, // discussion_id
row.get::<_, String>(5)?, // parent_type
row.get::<_, i64>(6)?, // parent_entity_id
row.get::<_, i64>(7)?, // project_id
))
},
)?;
let mut events = Vec::new();
for row_result in rows {
let (
note_id,
body,
created_at,
author,
discussion_id,
parent_type,
parent_entity_id,
proj_id,
) = row_result?;
let snippet = truncate_to_chars(body.as_deref().unwrap_or(""), 200);
let entity_ref = resolve_entity(conn, &parent_type, parent_entity_id, proj_id)?;
let (iid, project_path) = match entity_ref {
Some(ref e) => (e.entity_iid, e.project_path.clone()),
None => continue,
};
events.push(TimelineEvent {
timestamp: created_at,
entity_type: parent_type,
entity_id: parent_entity_id,
entity_iid: iid,
project_path,
event_type: TimelineEventType::NoteEvidence {
note_id,
snippet,
discussion_id: Some(discussion_id),
},
summary: format!("Note by {}", author.as_deref().unwrap_or("unknown")),
actor: author,
url: None,
is_seed: true,
});
}
Ok(events)
}
/// Truncate a string to at most `max_chars` characters on a safe UTF-8 boundary.
fn truncate_to_chars(s: &str, max_chars: usize) -> String {
let char_count = s.chars().count();
if char_count <= max_chars {
return s.to_owned();
}
let byte_end = s
.char_indices()
.nth(max_chars)
.map(|(i, _)| i)
.unwrap_or(s.len());
s[..byte_end].to_owned()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::db::{create_connection, run_migrations};
use std::path::Path;
fn setup_test_db() -> Connection {
let conn = create_connection(Path::new(":memory:")).unwrap();
run_migrations(&conn).unwrap();
conn
}
fn insert_test_project(conn: &Connection) -> i64 {
conn.execute(
"INSERT INTO projects (gitlab_project_id, path_with_namespace, web_url) VALUES (1, 'group/project', 'https://gitlab.com/group/project')",
[],
)
.unwrap();
conn.last_insert_rowid()
}
fn insert_test_issue(conn: &Connection, project_id: i64, iid: i64) -> i64 {
conn.execute(
"INSERT INTO issues (gitlab_id, project_id, iid, title, state, author_username, created_at, updated_at, last_seen_at) VALUES (?1, ?2, ?3, 'Test issue', 'opened', 'alice', 1000, 2000, 3000)",
rusqlite::params![iid * 100, project_id, iid],
)
.unwrap();
conn.last_insert_rowid()
}
fn insert_test_mr(conn: &Connection, project_id: i64, iid: i64) -> i64 {
conn.execute(
"INSERT INTO merge_requests (gitlab_id, project_id, iid, title, state, author_username, created_at, updated_at, last_seen_at) VALUES (?1, ?2, ?3, 'Test MR', 'opened', 'bob', 1000, 2000, 3000)",
rusqlite::params![iid * 100, project_id, iid],
)
.unwrap();
conn.last_insert_rowid()
}
fn insert_document(
conn: &Connection,
source_type: &str,
source_id: i64,
project_id: i64,
content: &str,
) -> i64 {
conn.execute(
"INSERT INTO documents (source_type, source_id, project_id, content_text, content_hash) VALUES (?1, ?2, ?3, ?4, ?5)",
rusqlite::params![source_type, source_id, project_id, content, format!("hash_{source_id}")],
)
.unwrap();
conn.last_insert_rowid()
}
fn insert_discussion(
conn: &Connection,
project_id: i64,
issue_id: Option<i64>,
mr_id: Option<i64>,
) -> i64 {
let noteable_type = if issue_id.is_some() {
"Issue"
} else {
"MergeRequest"
};
conn.execute(
"INSERT INTO discussions (gitlab_discussion_id, project_id, issue_id, merge_request_id, noteable_type, last_seen_at) VALUES (?1, ?2, ?3, ?4, ?5, 0)",
rusqlite::params![format!("disc_{}", rand::random::<u32>()), project_id, issue_id, mr_id, noteable_type],
)
.unwrap();
conn.last_insert_rowid()
}
fn insert_note(
conn: &Connection,
discussion_id: i64,
project_id: i64,
body: &str,
is_system: bool,
) -> i64 {
let gitlab_id: i64 = rand::random::<u32>().into();
conn.execute(
"INSERT INTO notes (gitlab_id, discussion_id, project_id, is_system, author_username, body, created_at, updated_at, last_seen_at) VALUES (?1, ?2, ?3, ?4, 'alice', ?5, 5000, 5000, 5000)",
rusqlite::params![gitlab_id, discussion_id, project_id, is_system as i32, body],
)
.unwrap();
conn.last_insert_rowid()
}
#[test]
fn test_seed_empty_query_returns_empty() {
let conn = setup_test_db();
let result = seed_timeline(&conn, "", None, None, 50, 10).unwrap();
assert!(result.seed_entities.is_empty());
assert!(result.evidence_notes.is_empty());
}
#[test]
fn test_seed_no_matches_returns_empty() {
let conn = setup_test_db();
let project_id = insert_test_project(&conn);
let issue_id = insert_test_issue(&conn, project_id, 1);
insert_document(
&conn,
"issue",
issue_id,
project_id,
"unrelated content here",
);
let result = seed_timeline(&conn, "nonexistent_xyzzy_query", None, None, 50, 10).unwrap();
assert!(result.seed_entities.is_empty());
}
#[test]
fn test_seed_finds_issue() {
let conn = setup_test_db();
let project_id = insert_test_project(&conn);
let issue_id = insert_test_issue(&conn, project_id, 42);
insert_document(
&conn,
"issue",
issue_id,
project_id,
"authentication error in login flow",
);
let result = seed_timeline(&conn, "authentication", None, None, 50, 10).unwrap();
assert_eq!(result.seed_entities.len(), 1);
assert_eq!(result.seed_entities[0].entity_type, "issue");
assert_eq!(result.seed_entities[0].entity_iid, 42);
assert_eq!(result.seed_entities[0].project_path, "group/project");
}
#[test]
fn test_seed_finds_mr() {
let conn = setup_test_db();
let project_id = insert_test_project(&conn);
let mr_id = insert_test_mr(&conn, project_id, 99);
insert_document(
&conn,
"merge_request",
mr_id,
project_id,
"fix authentication bug",
);
let result = seed_timeline(&conn, "authentication", None, None, 50, 10).unwrap();
assert_eq!(result.seed_entities.len(), 1);
assert_eq!(result.seed_entities[0].entity_type, "merge_request");
assert_eq!(result.seed_entities[0].entity_iid, 99);
}
#[test]
fn test_seed_deduplicates_entities() {
let conn = setup_test_db();
let project_id = insert_test_project(&conn);
let issue_id = insert_test_issue(&conn, project_id, 10);
// Two documents referencing the same issue
insert_document(
&conn,
"issue",
issue_id,
project_id,
"authentication error first doc",
);
let disc_id = insert_discussion(&conn, project_id, Some(issue_id), None);
insert_document(
&conn,
"discussion",
disc_id,
project_id,
"authentication error second doc",
);
let result = seed_timeline(&conn, "authentication", None, None, 50, 10).unwrap();
// Should deduplicate: both map to the same issue
assert_eq!(result.seed_entities.len(), 1);
assert_eq!(result.seed_entities[0].entity_iid, 10);
}
#[test]
fn test_seed_resolves_discussion_to_parent() {
let conn = setup_test_db();
let project_id = insert_test_project(&conn);
let issue_id = insert_test_issue(&conn, project_id, 7);
let disc_id = insert_discussion(&conn, project_id, Some(issue_id), None);
insert_document(
&conn,
"discussion",
disc_id,
project_id,
"deployment pipeline failed",
);
let result = seed_timeline(&conn, "deployment", None, None, 50, 10).unwrap();
assert_eq!(result.seed_entities.len(), 1);
assert_eq!(result.seed_entities[0].entity_type, "issue");
assert_eq!(result.seed_entities[0].entity_iid, 7);
}
#[test]
fn test_seed_evidence_capped() {
let conn = setup_test_db();
let project_id = insert_test_project(&conn);
let issue_id = insert_test_issue(&conn, project_id, 1);
// Create 15 discussion documents with notes about "deployment"
for i in 0..15 {
let disc_id = insert_discussion(&conn, project_id, Some(issue_id), None);
insert_document(
&conn,
"discussion",
disc_id,
project_id,
&format!("deployment issue number {i}"),
);
insert_note(
&conn,
disc_id,
project_id,
&format!("deployment note {i}"),
false,
);
}
let result = seed_timeline(&conn, "deployment", None, None, 50, 5).unwrap();
assert!(result.evidence_notes.len() <= 5);
}
#[test]
fn test_seed_evidence_snippet_truncated() {
let conn = setup_test_db();
let project_id = insert_test_project(&conn);
let issue_id = insert_test_issue(&conn, project_id, 1);
let disc_id = insert_discussion(&conn, project_id, Some(issue_id), None);
insert_document(
&conn,
"discussion",
disc_id,
project_id,
"deployment configuration",
);
let long_body = "x".repeat(500);
insert_note(&conn, disc_id, project_id, &long_body, false);
let result = seed_timeline(&conn, "deployment", None, None, 50, 10).unwrap();
assert!(!result.evidence_notes.is_empty());
if let TimelineEventType::NoteEvidence { snippet, .. } =
&result.evidence_notes[0].event_type
{
assert!(snippet.chars().count() <= 200);
} else {
panic!("Expected NoteEvidence");
}
}
#[test]
fn test_seed_respects_project_filter() {
let conn = setup_test_db();
let project_id = insert_test_project(&conn);
// Insert a second project
conn.execute(
"INSERT INTO projects (gitlab_project_id, path_with_namespace, web_url) VALUES (2, 'other/repo', 'https://gitlab.com/other/repo')",
[],
)
.unwrap();
let project2_id = conn.last_insert_rowid();
let issue1_id = insert_test_issue(&conn, project_id, 1);
insert_document(
&conn,
"issue",
issue1_id,
project_id,
"authentication error",
);
let issue2_id = insert_test_issue(&conn, project2_id, 2);
insert_document(
&conn,
"issue",
issue2_id,
project2_id,
"authentication error",
);
// Filter to project 1 only
let result =
seed_timeline(&conn, "authentication", Some(project_id), None, 50, 10).unwrap();
assert_eq!(result.seed_entities.len(), 1);
assert_eq!(result.seed_entities[0].project_path, "group/project");
}
#[test]
fn test_truncate_to_chars_short() {
assert_eq!(truncate_to_chars("hello", 200), "hello");
}
#[test]
fn test_truncate_to_chars_long() {
let long = "a".repeat(300);
let result = truncate_to_chars(&long, 200);
assert_eq!(result.chars().count(), 200);
}
#[test]
fn test_truncate_to_chars_multibyte() {
let s = "\u{1F600}".repeat(300); // emoji
let result = truncate_to_chars(&s, 200);
assert_eq!(result.chars().count(), 200);
// Verify valid UTF-8
assert!(std::str::from_utf8(result.as_bytes()).is_ok());
}
}