use std::collections::HashSet; use rusqlite::Connection; use tracing::debug; use super::types::{ EntityRef, MatchedDiscussion, TimelineEvent, TimelineEventType, resolve_entity_by_iid, resolve_entity_ref, truncate_to_chars, }; use crate::core::error::Result; use crate::embedding::ollama::OllamaClient; use crate::search::{FtsQueryMode, SearchFilters, SearchMode, search_hybrid, to_fts_query}; /// Result of the seed + hydrate phases. pub struct SeedResult { pub seed_entities: Vec, pub evidence_notes: Vec, /// Discussions matched during seeding, to be collected as full threads. pub matched_discussions: Vec, /// The search mode actually used (hybrid with fallback info). pub search_mode: String, } /// Run the SEED + HYDRATE phases of the timeline pipeline. /// /// 1. SEED: Hybrid search (FTS + vector via RRF) over documents -> matched document IDs /// 2. HYDRATE: Map document IDs -> source entities + top matched notes as evidence /// /// When `client` is `None` or Ollama is unavailable, falls back to FTS-only search. /// Discussion documents are resolved to their parent entity (issue or MR). /// Entities are deduplicated. Evidence notes are capped at `max_evidence`. pub async fn seed_timeline( conn: &Connection, client: Option<&OllamaClient>, query: &str, project_id: Option, since_ms: Option, max_seeds: usize, max_evidence: usize, ) -> Result { let fts_query = to_fts_query(query, FtsQueryMode::Safe); if fts_query.is_empty() { return Ok(SeedResult { seed_entities: Vec::new(), evidence_notes: Vec::new(), matched_discussions: Vec::new(), search_mode: "lexical".to_owned(), }); } // Use hybrid search for seed entity discovery (better recall than FTS alone). // search_hybrid gracefully falls back to FTS-only when Ollama is unavailable. let filters = SearchFilters { project_id, updated_since: since_ms, limit: max_seeds.saturating_mul(3), ..SearchFilters::default() }; let (hybrid_results, warnings) = search_hybrid( conn, client, query, SearchMode::Hybrid, &filters, FtsQueryMode::Safe, ) .await?; let search_mode = if warnings .iter() .any(|w| w.contains("falling back") || w.contains("FTS only")) { "lexical (hybrid fallback)".to_owned() } else if client.is_some() && !hybrid_results.is_empty() { "hybrid".to_owned() } else { "lexical".to_owned() }; for w in &warnings { debug!(warning = %w, "hybrid search warning during timeline seeding"); } let (seed_entities, matched_discussions) = resolve_documents_to_entities( conn, &hybrid_results .iter() .map(|r| r.document_id) .collect::>(), max_seeds, )?; // Evidence notes stay FTS-only (supplementary context, not worth a second embedding call) let evidence_notes = find_evidence_notes(conn, &fts_query, project_id, since_ms, max_evidence)?; Ok(SeedResult { seed_entities, evidence_notes, matched_discussions, search_mode, }) } /// Seed the timeline directly from an entity IID, bypassing search entirely. /// /// Used for `issue:42` / `mr:99` syntax. Resolves the entity, gathers ALL its /// discussions, and returns a `SeedResult` compatible with the rest of the pipeline. pub fn seed_timeline_direct( conn: &Connection, entity_type: &str, iid: i64, project_id: Option, ) -> Result { let entity_ref = resolve_entity_by_iid(conn, entity_type, iid, project_id)?; // Gather all discussions for this entity (not search-matched, ALL of them) let entity_id_col = match entity_type { "issue" => "issue_id", "merge_request" => "merge_request_id", _ => { return Ok(SeedResult { seed_entities: vec![entity_ref], evidence_notes: Vec::new(), matched_discussions: Vec::new(), search_mode: "direct".to_owned(), }); } }; let sql = format!("SELECT id, project_id FROM discussions WHERE {entity_id_col} = ?1"); let mut stmt = conn.prepare(&sql)?; let matched_discussions: Vec = stmt .query_map(rusqlite::params![entity_ref.entity_id], |row| { Ok(MatchedDiscussion { discussion_id: row.get(0)?, entity_type: entity_type.to_owned(), entity_id: entity_ref.entity_id, project_id: row.get(1)?, }) })? .collect::, _>>()?; Ok(SeedResult { seed_entities: vec![entity_ref], evidence_notes: Vec::new(), matched_discussions, search_mode: "direct".to_owned(), }) } /// Resolve a list of document IDs to deduplicated entity refs and matched discussions. /// Discussion and note documents are resolved to their parent entity (issue or MR). /// Returns (entities, matched_discussions). fn resolve_documents_to_entities( conn: &Connection, document_ids: &[i64], max_entities: usize, ) -> Result<(Vec, Vec)> { if document_ids.is_empty() { return Ok((Vec::new(), Vec::new())); } let placeholders: String = document_ids .iter() .map(|_| "?") .collect::>() .join(","); let sql = format!( r" SELECT d.source_type, d.source_id, d.project_id, COALESCE(disc.issue_id, note_disc.issue_id) AS issue_id, COALESCE(disc.merge_request_id, note_disc.merge_request_id) AS mr_id, COALESCE(disc.id, note_disc.id) AS discussion_id FROM documents d LEFT JOIN discussions disc ON disc.id = d.source_id AND d.source_type = 'discussion' LEFT JOIN notes n ON n.id = d.source_id AND d.source_type = 'note' LEFT JOIN discussions note_disc ON note_disc.id = n.discussion_id AND d.source_type = 'note' WHERE d.id IN ({placeholders}) ORDER BY CASE d.id {order_clause} END ", order_clause = document_ids .iter() .enumerate() .map(|(i, id)| format!("WHEN {id} THEN {i}")) .collect::>() .join(" "), ); let mut stmt = conn.prepare(&sql)?; let params: Vec<&dyn rusqlite::types::ToSql> = document_ids .iter() .map(|id| id as &dyn rusqlite::types::ToSql) .collect(); let rows = stmt.query_map(params.as_slice(), |row| { Ok(( row.get::<_, String>(0)?, // source_type row.get::<_, i64>(1)?, // source_id row.get::<_, i64>(2)?, // project_id row.get::<_, Option>(3)?, // issue_id (coalesced) row.get::<_, Option>(4)?, // mr_id (coalesced) row.get::<_, Option>(5)?, // discussion_id (coalesced) )) })?; let mut seen_entities = HashSet::new(); let mut seen_discussions = HashSet::new(); let mut entities = Vec::new(); let mut matched_discussions = Vec::new(); for row_result in rows { let (source_type, source_id, proj_id, disc_issue_id, disc_mr_id, discussion_id) = row_result?; let (entity_type, entity_id) = match source_type.as_str() { "issue" => ("issue".to_owned(), source_id), "merge_request" => ("merge_request".to_owned(), source_id), "discussion" | "note" => { if let Some(issue_id) = disc_issue_id { ("issue".to_owned(), issue_id) } else if let Some(mr_id) = disc_mr_id { ("merge_request".to_owned(), mr_id) } else { continue; // orphaned discussion/note } } _ => continue, }; // Capture matched discussion (deduplicated) if let Some(disc_id) = discussion_id && (source_type == "discussion" || source_type == "note") && seen_discussions.insert(disc_id) { matched_discussions.push(MatchedDiscussion { discussion_id: disc_id, entity_type: entity_type.clone(), entity_id, project_id: proj_id, }); } // Entity dedup let key = (entity_type.clone(), entity_id); if !seen_entities.insert(key) { continue; } if let Some(entity_ref) = resolve_entity_ref(conn, &entity_type, entity_id, Some(proj_id))? { entities.push(entity_ref); } if entities.len() >= max_entities { break; } } Ok((entities, matched_discussions)) } /// Find evidence notes: FTS5-matched discussion notes that provide context. /// /// Uses round-robin selection across discussions to ensure diverse evidence /// rather than all notes coming from a single high-traffic discussion. fn find_evidence_notes( conn: &Connection, fts_query: &str, project_id: Option, since_ms: Option, max_evidence: usize, ) -> Result> { // Fetch extra rows to enable round-robin across discussions. // We'll select from multiple discussions in rotation. let fetch_limit = (max_evidence * 5).max(50); let sql = r" SELECT n.id AS note_id, n.body, n.created_at, n.author_username, disc.id AS discussion_id, CASE WHEN disc.issue_id IS NOT NULL THEN 'issue' ELSE 'merge_request' END AS parent_type, COALESCE(disc.issue_id, disc.merge_request_id) AS parent_entity_id, d.project_id FROM documents_fts JOIN documents d ON d.id = documents_fts.rowid JOIN discussions disc ON disc.id = d.source_id AND d.source_type = 'discussion' JOIN notes n ON n.discussion_id = disc.id AND n.is_system = 0 WHERE documents_fts MATCH ?1 AND (?2 IS NULL OR d.project_id = ?2) AND (?3 IS NULL OR d.updated_at >= ?3) ORDER BY rank LIMIT ?4 "; let mut stmt = conn.prepare(sql)?; let rows = stmt.query_map( rusqlite::params![fts_query, project_id, since_ms, fetch_limit as i64], |row| { Ok(( row.get::<_, i64>(0)?, // note_id row.get::<_, Option>(1)?, // body row.get::<_, i64>(2)?, // created_at row.get::<_, Option>(3)?, // author row.get::<_, i64>(4)?, // discussion_id row.get::<_, String>(5)?, // parent_type row.get::<_, i64>(6)?, // parent_entity_id row.get::<_, i64>(7)?, // project_id )) }, )?; let mut events = Vec::new(); for row_result in rows { let ( note_id, body, created_at, author, discussion_id, parent_type, parent_entity_id, proj_id, ) = row_result?; let snippet = truncate_to_chars(body.as_deref().unwrap_or(""), 200); let entity_ref = resolve_entity_ref(conn, &parent_type, parent_entity_id, Some(proj_id))?; let (iid, project_path) = match entity_ref { Some(ref e) => (e.entity_iid, e.project_path.clone()), None => { debug!( parent_type, parent_entity_id, proj_id, "Skipping evidence note: parent entity not found (orphaned discussion)" ); continue; } }; events.push(( discussion_id, TimelineEvent { timestamp: created_at, entity_type: parent_type, entity_id: parent_entity_id, entity_iid: iid, project_path, event_type: TimelineEventType::NoteEvidence { note_id, snippet, discussion_id: Some(discussion_id), }, summary: format!("Note by {}", author.as_deref().unwrap_or("unknown")), actor: author, url: None, is_seed: true, }, )); } // Round-robin selection across discussions for diverse evidence Ok(round_robin_select_by_discussion(events, max_evidence)) } /// Round-robin select events across discussions to ensure diverse evidence. /// /// Groups events by discussion_id, then iterates through discussions in order, /// taking one event from each until the limit is reached. fn round_robin_select_by_discussion( events: Vec<(i64, TimelineEvent)>, max_evidence: usize, ) -> Vec { use std::collections::HashMap; if events.is_empty() || max_evidence == 0 { return Vec::new(); } // Group events by discussion_id, preserving order within each group let mut by_discussion: HashMap> = HashMap::new(); let mut discussion_order: Vec = Vec::new(); for (discussion_id, event) in events { if !by_discussion.contains_key(&discussion_id) { discussion_order.push(discussion_id); } by_discussion.entry(discussion_id).or_default().push(event); } // Round-robin selection let mut result = Vec::with_capacity(max_evidence); let mut indices: Vec = vec![0; discussion_order.len()]; 'outer: loop { let mut made_progress = false; for (disc_idx, &discussion_id) in discussion_order.iter().enumerate() { let notes = by_discussion.get(&discussion_id).unwrap(); let note_idx = indices[disc_idx]; if note_idx < notes.len() { result.push(notes[note_idx].clone()); indices[disc_idx] += 1; made_progress = true; if result.len() >= max_evidence { break 'outer; } } } if !made_progress { break; } } result } #[cfg(test)] #[path = "timeline_seed_tests.rs"] mod tests;