fix(explain): address review findings — N+1 queries, duplicate decisions, silent errors

1. fetch_open_threads: replace N+1 loop (2 queries per thread) with a
   single query using correlated subqueries for note_count and started_by.
2. extract_key_decisions: track consumed notes so the same note is not
   matched to multiple events, preventing duplicate decision entries.
3. build_timeline_excerpt_from_pipeline: log tracing::warn on seed/collect
   failures instead of silently returning empty timeline.
This commit is contained in:
teernisse
2026-03-10 16:43:06 -04:00
parent 08bda08934
commit 06889ec85a
10 changed files with 92 additions and 248 deletions

View File

@@ -75,7 +75,8 @@ pub struct ActivitySummary {
#[derive(Debug, Serialize)]
pub struct OpenThread {
pub discussion_id: String,
pub started_by: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub started_by: Option<String>,
pub started_at: String,
pub note_count: usize,
pub last_note_at: String,
@@ -626,18 +627,22 @@ pub fn extract_key_decisions(
let notes = query_non_system_notes(conn, entity_type, entity_id, since)?;
let mut decisions = Vec::new();
let mut used_notes: Vec<bool> = vec![false; notes.len()];
for event in &events {
if decisions.len() >= max_decisions {
break;
}
// Find the FIRST non-system note by the SAME actor within 60 minutes AFTER the event
let matching_note = notes.iter().find(|n| {
n.author == event.actor
// Find the FIRST unconsumed non-system note by the SAME actor within 60 minutes
// AFTER the event. Each note is used at most once to avoid duplicate decisions.
let matching = notes.iter().enumerate().find(|(i, n)| {
!used_notes[*i]
&& n.author == event.actor
&& n.created_at >= event.created_at
&& n.created_at <= event.created_at + DECISION_WINDOW_MS
});
if let Some(note) = matching_note {
if let Some((idx, note)) = matching {
used_notes[idx] = true;
decisions.push(KeyDecision {
timestamp: ms_to_iso(event.created_at),
actor: event.actor.clone(),
@@ -728,8 +733,14 @@ fn fetch_open_threads(
) -> Result<Vec<OpenThread>> {
let id_col = id_column_for(entity_type);
// Single query with scalar subqueries — avoids N+1.
let sql = format!(
"SELECT d.id, d.gitlab_discussion_id, d.first_note_at, d.last_note_at \
"SELECT d.gitlab_discussion_id, d.first_note_at, d.last_note_at, \
(SELECT COUNT(*) FROM notes n2 \
WHERE n2.discussion_id = d.id AND n2.is_system = 0) AS note_count, \
(SELECT n3.author_username FROM notes n3 \
WHERE n3.discussion_id = d.id \
ORDER BY n3.created_at ASC LIMIT 1) AS started_by \
FROM discussions d \
WHERE d.{id_col} = ?1 \
AND d.resolvable = 1 \
@@ -738,42 +749,19 @@ fn fetch_open_threads(
);
let mut stmt = conn.prepare(&sql)?;
let rows: Vec<(i64, String, i64, i64)> = stmt
let threads = stmt
.query_map([entity_id], |row| {
Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
let count: i64 = row.get(3)?;
Ok(OpenThread {
discussion_id: row.get(0)?,
started_at: ms_to_iso(row.get::<_, i64>(1)?),
last_note_at: ms_to_iso(row.get::<_, i64>(2)?),
note_count: count as usize,
started_by: row.get(4)?,
})
})?
.collect::<std::result::Result<Vec<_>, _>>()?;
let mut threads = Vec::with_capacity(rows.len());
for (local_id, gitlab_discussion_id, first_note_at, last_note_at) in rows {
let started_by: String = conn
.query_row(
"SELECT author_username FROM notes \
WHERE discussion_id = ?1 \
ORDER BY created_at ASC LIMIT 1",
[local_id],
|row| row.get(0),
)
.unwrap_or_else(|_| "unknown".to_owned());
let note_count_i64: i64 = conn.query_row(
"SELECT COUNT(*) FROM notes \
WHERE discussion_id = ?1 AND is_system = 0",
[local_id],
|row| row.get(0),
)?;
let note_count = note_count_i64 as usize;
threads.push(OpenThread {
discussion_id: gitlab_discussion_id,
started_by,
started_at: ms_to_iso(first_note_at),
note_count,
last_note_at: ms_to_iso(last_note_at),
});
}
Ok(threads)
}
@@ -908,7 +896,10 @@ fn build_timeline_excerpt_from_pipeline(
let seed_result = match seed_timeline_direct(conn, timeline_entity_type, params.iid, project_id)
{
Ok(result) => result,
Err(_) => return Some(vec![]),
Err(e) => {
tracing::warn!("explain: timeline seed failed: {e}");
return Some(vec![]);
}
};
let (mut events, _total) = match collect_events(
@@ -921,7 +912,10 @@ fn build_timeline_excerpt_from_pipeline(
MAX_TIMELINE_EVENTS,
) {
Ok(result) => result,
Err(_) => return Some(vec![]),
Err(e) => {
tracing::warn!("explain: timeline collect failed: {e}");
return Some(vec![]);
}
};
events.truncate(MAX_TIMELINE_EVENTS);
@@ -1135,7 +1129,10 @@ pub fn print_explain(result: &ExplainResult) {
for t in threads {
println!(
" {} by {} ({} notes, last: {})",
t.discussion_id, t.started_by, t.note_count, t.last_note_at
t.discussion_id,
t.started_by.as_deref().unwrap_or("unknown"),
t.note_count,
t.last_note_at
);
}
}
@@ -1809,7 +1806,7 @@ mod tests {
assert_eq!(threads.len(), 1, "Only unresolved thread should appear");
assert_eq!(threads[0].discussion_id, "disc-unresolved");
assert_eq!(threads[0].started_by, "alice");
assert_eq!(threads[0].started_by.as_deref(), Some("alice"));
assert_eq!(threads[0].note_count, 2);
}