From e61afc9dc42cede53f8ddd3d4a2001709f65f5fb Mon Sep 17 00:00:00 2001 From: teernisse Date: Fri, 30 Jan 2026 23:03:14 -0500 Subject: [PATCH] Add server-side progress grouper and fix session cache race condition New progress-grouper service partitions ParsedMessage arrays into two outputs: a filtered messages list (orphaned progress stays inline) and a toolProgress map keyed by parentToolUseId. Only hook_progress events whose parentToolUseId matches an existing tool_call are extracted; all others remain in the main message stream. Each group is sorted by rawIndex for chronological display. Session route integration: - Pipe parseSession output through groupProgress before responding - Return toolProgress map alongside messages in session detail endpoint Cache improvements: - Deduplicate concurrent getCachedSessions() calls with a shared in-flight promise (cachePromise) to prevent thundering herd on multiple simultaneous requests - Track cache generation to avoid stale writes when a newer discovery supersedes an in-flight one - Clear cachePromise on refresh=1 to force a fresh discovery cycle Co-Authored-By: Claude Opus 4.5 --- src/server/routes/sessions.ts | 27 ++++++++++++++-- src/server/services/progress-grouper.ts | 42 +++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 3 deletions(-) create mode 100644 src/server/services/progress-grouper.ts diff --git a/src/server/routes/sessions.ts b/src/server/routes/sessions.ts index 6485dfa..ca07e62 100644 --- a/src/server/routes/sessions.ts +++ b/src/server/routes/sessions.ts @@ -1,6 +1,7 @@ import { Router } from "express"; import { discoverSessions } from "../services/session-discovery.js"; import { parseSession } from "../services/session-parser.js"; +import { groupProgress } from "../services/progress-grouper.js"; import type { SessionEntry } from "../../shared/types.js"; export const sessionsRouter = Router(); @@ -8,13 +9,30 @@ export const sessionsRouter = Router(); // Simple cache to avoid re-discovering sessions on every detail request let cachedSessions: SessionEntry[] = []; let cacheTimestamp = 0; +let cachePromise: Promise | null = null; +let cacheGeneration = 0; const CACHE_TTL_MS = 30_000; async function getCachedSessions(): Promise { const now = Date.now(); if (now - cacheTimestamp > CACHE_TTL_MS) { - cachedSessions = await discoverSessions(); - cacheTimestamp = now; + // Deduplicate concurrent calls: reuse in-flight promise + if (!cachePromise) { + const gen = ++cacheGeneration; + cachePromise = discoverSessions().then((sessions) => { + // Only write cache if no newer generation has started + if (gen === cacheGeneration) { + cachedSessions = sessions; + cacheTimestamp = Date.now(); + } + cachePromise = null; + return sessions; + }).catch((err) => { + cachePromise = null; + throw err; + }); + } + return cachePromise; } return cachedSessions; } @@ -23,6 +41,7 @@ sessionsRouter.get("/", async (req, res) => { try { if (req.query.refresh === "1") { cacheTimestamp = 0; + cachePromise = null; // Discard any in-flight request so we force a fresh discovery } const sessions = await getCachedSessions(); res.json({ sessions }); @@ -40,11 +59,13 @@ sessionsRouter.get("/:id", async (req, res) => { res.status(404).json({ error: "Session not found" }); return; } - const messages = await parseSession(entry.path); + const allMessages = await parseSession(entry.path); + const { messages, toolProgress } = groupProgress(allMessages); res.json({ id: entry.id, project: entry.project, messages, + toolProgress, }); } catch (err) { console.error("Failed to load session:", err); diff --git a/src/server/services/progress-grouper.ts b/src/server/services/progress-grouper.ts new file mode 100644 index 0000000..3c9bb95 --- /dev/null +++ b/src/server/services/progress-grouper.ts @@ -0,0 +1,42 @@ +import type { ParsedMessage } from "../../shared/types.js"; + +export interface GroupedProgress { + messages: ParsedMessage[]; + toolProgress: Record; +} + +export function groupProgress(messages: ParsedMessage[]): GroupedProgress { + // Build set of all toolUseId values from tool_call messages + const toolUseIds = new Set(); + for (const msg of messages) { + if (msg.category === "tool_call" && msg.toolUseId) { + toolUseIds.add(msg.toolUseId); + } + } + + const filtered: ParsedMessage[] = []; + const toolProgress: Record = {}; + + for (const msg of messages) { + // Parented progress: hook_progress with a parentToolUseId matching a known tool_call + if ( + msg.category === "hook_progress" && + msg.parentToolUseId && + toolUseIds.has(msg.parentToolUseId) + ) { + if (!toolProgress[msg.parentToolUseId]) { + toolProgress[msg.parentToolUseId] = []; + } + toolProgress[msg.parentToolUseId].push(msg); + } else { + filtered.push(msg); + } + } + + // Sort each group by rawIndex + for (const key of Object.keys(toolProgress)) { + toolProgress[key].sort((a, b) => a.rawIndex - b.rawIndex); + } + + return { messages: filtered, toolProgress }; +}