use rusqlite::Connection; use std::collections::HashSet; use crate::core::error::{LoreError, Result}; use crate::core::timeline::{ EntityRef, ExpandedEntityRef, MatchedDiscussion, THREAD_MAX_NOTES, THREAD_NOTE_MAX_CHARS, ThreadNote, TimelineEvent, TimelineEventType, truncate_to_chars, }; /// Collect all events for seed and expanded entities, interleave chronologically. /// /// Steps 4-5 of the timeline pipeline: /// 1. For each entity, collect Created, StateChanged, Label, Milestone, Merged events /// 2. Collect discussion threads from matched discussions /// 3. Merge in evidence notes from the seed phase /// 4. Sort chronologically with stable tiebreak /// 5. Apply --since filter and --limit pub fn collect_events( conn: &Connection, seed_entities: &[EntityRef], expanded_entities: &[ExpandedEntityRef], evidence_notes: &[TimelineEvent], matched_discussions: &[MatchedDiscussion], since_ms: Option, limit: usize, ) -> Result<(Vec, usize)> { let mut all_events: Vec = Vec::new(); // Collect events for seed entities for entity in seed_entities { collect_entity_events(conn, entity, true, &mut all_events)?; } // Collect events for expanded entities for expanded in expanded_entities { collect_entity_events(conn, &expanded.entity_ref, false, &mut all_events)?; } // Collect discussion threads let entity_lookup = build_entity_lookup(seed_entities, expanded_entities); collect_discussion_threads(conn, matched_discussions, &entity_lookup, &mut all_events)?; // Add evidence notes from seed phase all_events.extend(evidence_notes.iter().cloned()); // Sort chronologically (uses Ord impl from timeline.rs) all_events.sort(); // Apply --since filter if let Some(since) = since_ms { all_events.retain(|e| e.timestamp >= since); } // Capture total before applying limit (for meta.total_events vs meta.showing) let total_before_limit = all_events.len(); // Apply limit all_events.truncate(limit); Ok((all_events, total_before_limit)) } /// Collect all events for a single entity. fn collect_entity_events( conn: &Connection, entity: &EntityRef, is_seed: bool, events: &mut Vec, ) -> Result<()> { collect_creation_event(conn, entity, is_seed, events)?; collect_state_events(conn, entity, is_seed, events)?; collect_label_events(conn, entity, is_seed, events)?; collect_milestone_events(conn, entity, is_seed, events)?; collect_merged_event(conn, entity, is_seed, events)?; Ok(()) } /// Collect the Created event from the entity's own table. fn collect_creation_event( conn: &Connection, entity: &EntityRef, is_seed: bool, events: &mut Vec, ) -> Result<()> { let table = match entity.entity_type.as_str() { "issue" => "issues", "merge_request" => "merge_requests", _ => return Ok(()), }; let sql = format!("SELECT created_at, author_username, title, web_url FROM {table} WHERE id = ?1"); let result = conn.query_row(&sql, rusqlite::params![entity.entity_id], |row| { Ok(( row.get::<_, Option>(0)?, row.get::<_, Option>(1)?, row.get::<_, Option>(2)?, row.get::<_, Option>(3)?, )) }); if let Ok((Some(created_at), author, title, url)) = result { let type_label = if entity.entity_type == "issue" { "Issue" } else { "MR" }; let title_str = title.as_deref().unwrap_or("(untitled)"); events.push(TimelineEvent { timestamp: created_at, entity_type: entity.entity_type.clone(), entity_id: entity.entity_id, entity_iid: entity.entity_iid, project_path: entity.project_path.clone(), event_type: TimelineEventType::Created, summary: format!("{type_label} #{} created: {title_str}", entity.entity_iid), actor: author, url, is_seed, }); } Ok(()) } /// Collect state change events. State='merged' produces Merged, not StateChanged. fn collect_state_events( conn: &Connection, entity: &EntityRef, is_seed: bool, events: &mut Vec, ) -> Result<()> { let (id_col, id_val) = entity_id_column(entity)?; let sql = format!( "SELECT state, actor_username, created_at FROM resource_state_events WHERE {id_col} = ?1 ORDER BY created_at ASC" ); let mut stmt = conn.prepare(&sql)?; let rows = stmt.query_map(rusqlite::params![id_val], |row| { Ok(( row.get::<_, String>(0)?, row.get::<_, Option>(1)?, row.get::<_, i64>(2)?, )) })?; for row_result in rows { let (state, actor, created_at) = row_result?; // state='merged' is handled by collect_merged_event — skip here if state == "merged" { continue; } let summary = format!("State changed to {state}"); events.push(TimelineEvent { timestamp: created_at, entity_type: entity.entity_type.clone(), entity_id: entity.entity_id, entity_iid: entity.entity_iid, project_path: entity.project_path.clone(), event_type: TimelineEventType::StateChanged { state }, summary, actor, url: None, is_seed, }); } Ok(()) } /// Collect label add/remove events. fn collect_label_events( conn: &Connection, entity: &EntityRef, is_seed: bool, events: &mut Vec, ) -> Result<()> { let (id_col, id_val) = entity_id_column(entity)?; let sql = format!( "SELECT action, label_name, actor_username, created_at FROM resource_label_events WHERE {id_col} = ?1 ORDER BY created_at ASC" ); let mut stmt = conn.prepare(&sql)?; let rows = stmt.query_map(rusqlite::params![id_val], |row| { Ok(( row.get::<_, String>(0)?, row.get::<_, Option>(1)?, row.get::<_, Option>(2)?, row.get::<_, i64>(3)?, )) })?; for row_result in rows { let (action, label_name, actor, created_at) = row_result?; let label = label_name.unwrap_or_else(|| "[deleted label]".to_owned()); let (event_type, summary) = match action.as_str() { "add" => { let summary = format!("Label added: {label}"); (TimelineEventType::LabelAdded { label }, summary) } "remove" => { let summary = format!("Label removed: {label}"); (TimelineEventType::LabelRemoved { label }, summary) } _ => continue, }; events.push(TimelineEvent { timestamp: created_at, entity_type: entity.entity_type.clone(), entity_id: entity.entity_id, entity_iid: entity.entity_iid, project_path: entity.project_path.clone(), event_type, summary, actor, url: None, is_seed, }); } Ok(()) } /// Collect milestone add/remove events. fn collect_milestone_events( conn: &Connection, entity: &EntityRef, is_seed: bool, events: &mut Vec, ) -> Result<()> { let (id_col, id_val) = entity_id_column(entity)?; let sql = format!( "SELECT action, milestone_title, actor_username, created_at FROM resource_milestone_events WHERE {id_col} = ?1 ORDER BY created_at ASC" ); let mut stmt = conn.prepare(&sql)?; let rows = stmt.query_map(rusqlite::params![id_val], |row| { Ok(( row.get::<_, String>(0)?, row.get::<_, Option>(1)?, row.get::<_, Option>(2)?, row.get::<_, i64>(3)?, )) })?; for row_result in rows { let (action, milestone_title, actor, created_at) = row_result?; let milestone = milestone_title.unwrap_or_else(|| "[deleted milestone]".to_owned()); let (event_type, summary) = match action.as_str() { "add" => { let summary = format!("Milestone set: {milestone}"); (TimelineEventType::MilestoneSet { milestone }, summary) } "remove" => { let summary = format!("Milestone removed: {milestone}"); (TimelineEventType::MilestoneRemoved { milestone }, summary) } _ => continue, }; events.push(TimelineEvent { timestamp: created_at, entity_type: entity.entity_type.clone(), entity_id: entity.entity_id, entity_iid: entity.entity_iid, project_path: entity.project_path.clone(), event_type, summary, actor, url: None, is_seed, }); } Ok(()) } /// Collect Merged event for MRs. Prefers merged_at from the MR table. /// Falls back to resource_state_events WHERE state='merged' if merged_at is NULL. fn collect_merged_event( conn: &Connection, entity: &EntityRef, is_seed: bool, events: &mut Vec, ) -> Result<()> { if entity.entity_type != "merge_request" { return Ok(()); } // Try merged_at from merge_requests table first let mr_result = conn.query_row( "SELECT merged_at, merge_user_username, web_url FROM merge_requests WHERE id = ?1", rusqlite::params![entity.entity_id], |row| { Ok(( row.get::<_, Option>(0)?, row.get::<_, Option>(1)?, row.get::<_, Option>(2)?, )) }, ); match mr_result { Ok((Some(merged_at), merge_user, url)) => { events.push(TimelineEvent { timestamp: merged_at, entity_type: entity.entity_type.clone(), entity_id: entity.entity_id, entity_iid: entity.entity_iid, project_path: entity.project_path.clone(), event_type: TimelineEventType::Merged, summary: format!("MR !{} merged", entity.entity_iid), actor: merge_user, url, is_seed, }); return Ok(()); } Ok((None, _, _)) => {} // merged_at is NULL, try fallback Err(rusqlite::Error::QueryReturnedNoRows) => {} // entity not found, try fallback Err(e) => return Err(e.into()), } // Fallback: check resource_state_events for state='merged' let fallback_result = conn.query_row( "SELECT actor_username, created_at FROM resource_state_events WHERE merge_request_id = ?1 AND state = 'merged' ORDER BY created_at DESC LIMIT 1", rusqlite::params![entity.entity_id], |row| Ok((row.get::<_, Option>(0)?, row.get::<_, i64>(1)?)), ); match fallback_result { Ok((actor, created_at)) => { events.push(TimelineEvent { timestamp: created_at, entity_type: entity.entity_type.clone(), entity_id: entity.entity_id, entity_iid: entity.entity_iid, project_path: entity.project_path.clone(), event_type: TimelineEventType::Merged, summary: format!("MR !{} merged", entity.entity_iid), actor, url: None, is_seed, }); } Err(rusqlite::Error::QueryReturnedNoRows) => {} // no merged state event, MR wasn't merged Err(e) => return Err(e.into()), } Ok(()) } /// Return the correct column name and value for querying resource event tables. fn entity_id_column(entity: &EntityRef) -> Result<(&'static str, i64)> { match entity.entity_type.as_str() { "issue" => Ok(("issue_id", entity.entity_id)), "merge_request" => Ok(("merge_request_id", entity.entity_id)), _ => Err(LoreError::Other(format!( "Unknown entity type for event collection: {}", entity.entity_type ))), } } /// Lookup key: (entity_type, entity_id) -> (iid, project_path) type EntityLookup = std::collections::HashMap<(String, i64), (i64, String)>; fn build_entity_lookup(seeds: &[EntityRef], expanded: &[ExpandedEntityRef]) -> EntityLookup { let mut lookup = EntityLookup::new(); for e in seeds { lookup.insert( (e.entity_type.clone(), e.entity_id), (e.entity_iid, e.project_path.clone()), ); } for exp in expanded { let e = &exp.entity_ref; lookup.insert( (e.entity_type.clone(), e.entity_id), (e.entity_iid, e.project_path.clone()), ); } lookup } /// Collect full discussion threads for matched discussions. fn collect_discussion_threads( conn: &Connection, matched_discussions: &[MatchedDiscussion], entity_lookup: &EntityLookup, events: &mut Vec, ) -> Result<()> { // Deduplicate by discussion_id let mut seen = HashSet::new(); let mut stmt = conn.prepare( "SELECT id, author_username, body, created_at FROM notes WHERE discussion_id = ?1 AND is_system = 0 ORDER BY created_at ASC", )?; for disc in matched_discussions { if !seen.insert(disc.discussion_id) { continue; } let (iid, project_path) = match entity_lookup.get(&(disc.entity_type.clone(), disc.entity_id)) { Some(val) => val.clone(), None => continue, // entity not in seed or expanded set }; let rows = stmt.query_map(rusqlite::params![disc.discussion_id], |row| { Ok(( row.get::<_, i64>(0)?, // id row.get::<_, Option>(1)?, // author_username row.get::<_, Option>(2)?, // body row.get::<_, i64>(3)?, // created_at )) })?; let mut notes = Vec::new(); for row_result in rows { let (note_id, author, body, created_at) = row_result?; let body = truncate_to_chars(body.as_deref().unwrap_or(""), THREAD_NOTE_MAX_CHARS); notes.push(ThreadNote { note_id, author, body, created_at, }); } // Skip empty threads (all notes were system notes) if notes.is_empty() { continue; } let first_created_at = notes[0].created_at; // Cap notes per thread let total_notes = notes.len(); if total_notes > THREAD_MAX_NOTES { notes.truncate(THREAD_MAX_NOTES); notes.push(ThreadNote { note_id: -1, author: None, body: format!("[{} more notes not shown]", total_notes - THREAD_MAX_NOTES), created_at: notes.last().map_or(first_created_at, |n| n.created_at), }); } let note_count = notes.len(); let actor = notes.first().and_then(|n| n.author.clone()); events.push(TimelineEvent { timestamp: first_created_at, entity_type: disc.entity_type.clone(), entity_id: disc.entity_id, entity_iid: iid, project_path, event_type: TimelineEventType::DiscussionThread { discussion_id: disc.discussion_id, notes, }, summary: format!("Discussion ({note_count} notes)"), actor, url: None, is_seed: true, }); } Ok(()) } #[cfg(test)] #[path = "timeline_collect_tests.rs"] mod tests;