Files
gitlore/src/timeline/collect.rs

497 lines
16 KiB
Rust

use rusqlite::Connection;
use std::collections::HashSet;
use super::types::{
EntityRef, ExpandedEntityRef, MatchedDiscussion, THREAD_MAX_NOTES, THREAD_NOTE_MAX_CHARS,
ThreadNote, TimelineEvent, TimelineEventType, truncate_to_chars,
};
use crate::core::error::{LoreError, Result};
/// Collect all events for seed and expanded entities, interleave chronologically.
///
/// Steps 4-5 of the timeline pipeline:
/// 1. For each entity, collect Created, StateChanged, Label, Milestone, Merged events
/// 2. Collect discussion threads from matched discussions
/// 3. Merge in evidence notes from the seed phase
/// 4. Sort chronologically with stable tiebreak
/// 5. Apply --since filter and --limit
pub fn collect_events(
conn: &Connection,
seed_entities: &[EntityRef],
expanded_entities: &[ExpandedEntityRef],
evidence_notes: &[TimelineEvent],
matched_discussions: &[MatchedDiscussion],
since_ms: Option<i64>,
limit: usize,
) -> Result<(Vec<TimelineEvent>, usize)> {
let mut all_events: Vec<TimelineEvent> = Vec::new();
// Collect events for seed entities
for entity in seed_entities {
collect_entity_events(conn, entity, true, &mut all_events)?;
}
// Collect events for expanded entities
for expanded in expanded_entities {
collect_entity_events(conn, &expanded.entity_ref, false, &mut all_events)?;
}
// Collect discussion threads
let entity_lookup = build_entity_lookup(seed_entities, expanded_entities);
collect_discussion_threads(conn, matched_discussions, &entity_lookup, &mut all_events)?;
// Add evidence notes from seed phase
all_events.extend(evidence_notes.iter().cloned());
// Sort chronologically (uses Ord impl from timeline.rs)
all_events.sort();
// Apply --since filter
if let Some(since) = since_ms {
all_events.retain(|e| e.timestamp >= since);
}
// Capture total before applying limit (for meta.total_events vs meta.showing)
let total_before_limit = all_events.len();
// Apply limit
all_events.truncate(limit);
Ok((all_events, total_before_limit))
}
/// Collect all events for a single entity.
fn collect_entity_events(
conn: &Connection,
entity: &EntityRef,
is_seed: bool,
events: &mut Vec<TimelineEvent>,
) -> Result<()> {
collect_creation_event(conn, entity, is_seed, events)?;
collect_state_events(conn, entity, is_seed, events)?;
collect_label_events(conn, entity, is_seed, events)?;
collect_milestone_events(conn, entity, is_seed, events)?;
collect_merged_event(conn, entity, is_seed, events)?;
Ok(())
}
/// Collect the Created event from the entity's own table.
fn collect_creation_event(
conn: &Connection,
entity: &EntityRef,
is_seed: bool,
events: &mut Vec<TimelineEvent>,
) -> Result<()> {
let table = match entity.entity_type.as_str() {
"issue" => "issues",
"merge_request" => "merge_requests",
_ => return Ok(()),
};
let sql =
format!("SELECT created_at, author_username, title, web_url FROM {table} WHERE id = ?1");
let result = conn.query_row(&sql, rusqlite::params![entity.entity_id], |row| {
Ok((
row.get::<_, Option<i64>>(0)?,
row.get::<_, Option<String>>(1)?,
row.get::<_, Option<String>>(2)?,
row.get::<_, Option<String>>(3)?,
))
});
if let Ok((Some(created_at), author, title, url)) = result {
let type_label = if entity.entity_type == "issue" {
"Issue"
} else {
"MR"
};
let title_str = title.as_deref().unwrap_or("(untitled)");
events.push(TimelineEvent {
timestamp: created_at,
entity_type: entity.entity_type.clone(),
entity_id: entity.entity_id,
entity_iid: entity.entity_iid,
project_path: entity.project_path.clone(),
event_type: TimelineEventType::Created,
summary: format!("{type_label} #{} created: {title_str}", entity.entity_iid),
actor: author,
url,
is_seed,
});
}
Ok(())
}
/// Collect state change events. State='merged' produces Merged, not StateChanged.
fn collect_state_events(
conn: &Connection,
entity: &EntityRef,
is_seed: bool,
events: &mut Vec<TimelineEvent>,
) -> Result<()> {
let (id_col, id_val) = entity_id_column(entity)?;
let sql = format!(
"SELECT state, actor_username, created_at FROM resource_state_events
WHERE {id_col} = ?1
ORDER BY created_at ASC"
);
let mut stmt = conn.prepare(&sql)?;
let rows = stmt.query_map(rusqlite::params![id_val], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, Option<String>>(1)?,
row.get::<_, i64>(2)?,
))
})?;
for row_result in rows {
let (state, actor, created_at) = row_result?;
// state='merged' is handled by collect_merged_event — skip here
if state == "merged" {
continue;
}
let summary = format!("State changed to {state}");
events.push(TimelineEvent {
timestamp: created_at,
entity_type: entity.entity_type.clone(),
entity_id: entity.entity_id,
entity_iid: entity.entity_iid,
project_path: entity.project_path.clone(),
event_type: TimelineEventType::StateChanged { state },
summary,
actor,
url: None,
is_seed,
});
}
Ok(())
}
/// Collect label add/remove events.
fn collect_label_events(
conn: &Connection,
entity: &EntityRef,
is_seed: bool,
events: &mut Vec<TimelineEvent>,
) -> Result<()> {
let (id_col, id_val) = entity_id_column(entity)?;
let sql = format!(
"SELECT action, label_name, actor_username, created_at FROM resource_label_events
WHERE {id_col} = ?1
ORDER BY created_at ASC"
);
let mut stmt = conn.prepare(&sql)?;
let rows = stmt.query_map(rusqlite::params![id_val], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, Option<String>>(1)?,
row.get::<_, Option<String>>(2)?,
row.get::<_, i64>(3)?,
))
})?;
for row_result in rows {
let (action, label_name, actor, created_at) = row_result?;
let label = label_name.unwrap_or_else(|| "[deleted label]".to_owned());
let (event_type, summary) = match action.as_str() {
"add" => {
let summary = format!("Label added: {label}");
(TimelineEventType::LabelAdded { label }, summary)
}
"remove" => {
let summary = format!("Label removed: {label}");
(TimelineEventType::LabelRemoved { label }, summary)
}
_ => continue,
};
events.push(TimelineEvent {
timestamp: created_at,
entity_type: entity.entity_type.clone(),
entity_id: entity.entity_id,
entity_iid: entity.entity_iid,
project_path: entity.project_path.clone(),
event_type,
summary,
actor,
url: None,
is_seed,
});
}
Ok(())
}
/// Collect milestone add/remove events.
fn collect_milestone_events(
conn: &Connection,
entity: &EntityRef,
is_seed: bool,
events: &mut Vec<TimelineEvent>,
) -> Result<()> {
let (id_col, id_val) = entity_id_column(entity)?;
let sql = format!(
"SELECT action, milestone_title, actor_username, created_at FROM resource_milestone_events
WHERE {id_col} = ?1
ORDER BY created_at ASC"
);
let mut stmt = conn.prepare(&sql)?;
let rows = stmt.query_map(rusqlite::params![id_val], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, Option<String>>(1)?,
row.get::<_, Option<String>>(2)?,
row.get::<_, i64>(3)?,
))
})?;
for row_result in rows {
let (action, milestone_title, actor, created_at) = row_result?;
let milestone = milestone_title.unwrap_or_else(|| "[deleted milestone]".to_owned());
let (event_type, summary) = match action.as_str() {
"add" => {
let summary = format!("Milestone set: {milestone}");
(TimelineEventType::MilestoneSet { milestone }, summary)
}
"remove" => {
let summary = format!("Milestone removed: {milestone}");
(TimelineEventType::MilestoneRemoved { milestone }, summary)
}
_ => continue,
};
events.push(TimelineEvent {
timestamp: created_at,
entity_type: entity.entity_type.clone(),
entity_id: entity.entity_id,
entity_iid: entity.entity_iid,
project_path: entity.project_path.clone(),
event_type,
summary,
actor,
url: None,
is_seed,
});
}
Ok(())
}
/// Collect Merged event for MRs. Prefers merged_at from the MR table.
/// Falls back to resource_state_events WHERE state='merged' if merged_at is NULL.
fn collect_merged_event(
conn: &Connection,
entity: &EntityRef,
is_seed: bool,
events: &mut Vec<TimelineEvent>,
) -> Result<()> {
if entity.entity_type != "merge_request" {
return Ok(());
}
// Try merged_at from merge_requests table first
let mr_result = conn.query_row(
"SELECT merged_at, merge_user_username, web_url FROM merge_requests WHERE id = ?1",
rusqlite::params![entity.entity_id],
|row| {
Ok((
row.get::<_, Option<i64>>(0)?,
row.get::<_, Option<String>>(1)?,
row.get::<_, Option<String>>(2)?,
))
},
);
match mr_result {
Ok((Some(merged_at), merge_user, url)) => {
events.push(TimelineEvent {
timestamp: merged_at,
entity_type: entity.entity_type.clone(),
entity_id: entity.entity_id,
entity_iid: entity.entity_iid,
project_path: entity.project_path.clone(),
event_type: TimelineEventType::Merged,
summary: format!("MR !{} merged", entity.entity_iid),
actor: merge_user,
url,
is_seed,
});
return Ok(());
}
Ok((None, _, _)) => {} // merged_at is NULL, try fallback
Err(rusqlite::Error::QueryReturnedNoRows) => {} // entity not found, try fallback
Err(e) => return Err(e.into()),
}
// Fallback: check resource_state_events for state='merged'
let fallback_result = conn.query_row(
"SELECT actor_username, created_at FROM resource_state_events
WHERE merge_request_id = ?1 AND state = 'merged'
ORDER BY created_at DESC LIMIT 1",
rusqlite::params![entity.entity_id],
|row| Ok((row.get::<_, Option<String>>(0)?, row.get::<_, i64>(1)?)),
);
match fallback_result {
Ok((actor, created_at)) => {
events.push(TimelineEvent {
timestamp: created_at,
entity_type: entity.entity_type.clone(),
entity_id: entity.entity_id,
entity_iid: entity.entity_iid,
project_path: entity.project_path.clone(),
event_type: TimelineEventType::Merged,
summary: format!("MR !{} merged", entity.entity_iid),
actor,
url: None,
is_seed,
});
}
Err(rusqlite::Error::QueryReturnedNoRows) => {} // no merged state event, MR wasn't merged
Err(e) => return Err(e.into()),
}
Ok(())
}
/// Return the correct column name and value for querying resource event tables.
fn entity_id_column(entity: &EntityRef) -> Result<(&'static str, i64)> {
match entity.entity_type.as_str() {
"issue" => Ok(("issue_id", entity.entity_id)),
"merge_request" => Ok(("merge_request_id", entity.entity_id)),
_ => Err(LoreError::Other(format!(
"Unknown entity type for event collection: {}",
entity.entity_type
))),
}
}
/// Lookup key: (entity_type, entity_id) -> (iid, project_path)
type EntityLookup = std::collections::HashMap<(String, i64), (i64, String)>;
fn build_entity_lookup(seeds: &[EntityRef], expanded: &[ExpandedEntityRef]) -> EntityLookup {
let mut lookup = EntityLookup::new();
for e in seeds {
lookup.insert(
(e.entity_type.clone(), e.entity_id),
(e.entity_iid, e.project_path.clone()),
);
}
for exp in expanded {
let e = &exp.entity_ref;
lookup.insert(
(e.entity_type.clone(), e.entity_id),
(e.entity_iid, e.project_path.clone()),
);
}
lookup
}
/// Collect full discussion threads for matched discussions.
fn collect_discussion_threads(
conn: &Connection,
matched_discussions: &[MatchedDiscussion],
entity_lookup: &EntityLookup,
events: &mut Vec<TimelineEvent>,
) -> Result<()> {
// Deduplicate by discussion_id
let mut seen = HashSet::new();
let mut stmt = conn.prepare(
"SELECT id, author_username, body, created_at FROM notes
WHERE discussion_id = ?1 AND is_system = 0
ORDER BY created_at ASC",
)?;
for disc in matched_discussions {
if !seen.insert(disc.discussion_id) {
continue;
}
let (iid, project_path) =
match entity_lookup.get(&(disc.entity_type.clone(), disc.entity_id)) {
Some(val) => val.clone(),
None => continue, // entity not in seed or expanded set
};
let rows = stmt.query_map(rusqlite::params![disc.discussion_id], |row| {
Ok((
row.get::<_, i64>(0)?, // id
row.get::<_, Option<String>>(1)?, // author_username
row.get::<_, Option<String>>(2)?, // body
row.get::<_, i64>(3)?, // created_at
))
})?;
let mut notes = Vec::new();
for row_result in rows {
let (note_id, author, body, created_at) = row_result?;
let body = truncate_to_chars(body.as_deref().unwrap_or(""), THREAD_NOTE_MAX_CHARS);
notes.push(ThreadNote {
note_id,
author,
body,
created_at,
});
}
// Skip empty threads (all notes were system notes)
if notes.is_empty() {
continue;
}
let first_created_at = notes[0].created_at;
// Cap notes per thread
let total_notes = notes.len();
if total_notes > THREAD_MAX_NOTES {
notes.truncate(THREAD_MAX_NOTES);
notes.push(ThreadNote {
note_id: -1,
author: None,
body: format!("[{} more notes not shown]", total_notes - THREAD_MAX_NOTES),
created_at: notes.last().map_or(first_created_at, |n| n.created_at),
});
}
let note_count = notes.len();
let actor = notes.first().and_then(|n| n.author.clone());
events.push(TimelineEvent {
timestamp: first_created_at,
entity_type: disc.entity_type.clone(),
entity_id: disc.entity_id,
entity_iid: iid,
project_path,
event_type: TimelineEventType::DiscussionThread {
discussion_id: disc.discussion_id,
notes,
},
summary: format!("Discussion ({note_count} notes)"),
actor,
url: None,
is_seed: true,
});
}
Ok(())
}
#[cfg(test)]
#[path = "timeline_collect_tests.rs"]
mod tests;