Add server-side progress grouper and fix session cache race condition
New progress-grouper service partitions ParsedMessage arrays into two outputs: a filtered messages list (orphaned progress stays inline) and a toolProgress map keyed by parentToolUseId. Only hook_progress events whose parentToolUseId matches an existing tool_call are extracted; all others remain in the main message stream. Each group is sorted by rawIndex for chronological display. Session route integration: - Pipe parseSession output through groupProgress before responding - Return toolProgress map alongside messages in session detail endpoint Cache improvements: - Deduplicate concurrent getCachedSessions() calls with a shared in-flight promise (cachePromise) to prevent thundering herd on multiple simultaneous requests - Track cache generation to avoid stale writes when a newer discovery supersedes an in-flight one - Clear cachePromise on refresh=1 to force a fresh discovery cycle Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -1,6 +1,7 @@
|
|||||||
import { Router } from "express";
|
import { Router } from "express";
|
||||||
import { discoverSessions } from "../services/session-discovery.js";
|
import { discoverSessions } from "../services/session-discovery.js";
|
||||||
import { parseSession } from "../services/session-parser.js";
|
import { parseSession } from "../services/session-parser.js";
|
||||||
|
import { groupProgress } from "../services/progress-grouper.js";
|
||||||
import type { SessionEntry } from "../../shared/types.js";
|
import type { SessionEntry } from "../../shared/types.js";
|
||||||
|
|
||||||
export const sessionsRouter = Router();
|
export const sessionsRouter = Router();
|
||||||
@@ -8,13 +9,30 @@ export const sessionsRouter = Router();
|
|||||||
// Simple cache to avoid re-discovering sessions on every detail request
|
// Simple cache to avoid re-discovering sessions on every detail request
|
||||||
let cachedSessions: SessionEntry[] = [];
|
let cachedSessions: SessionEntry[] = [];
|
||||||
let cacheTimestamp = 0;
|
let cacheTimestamp = 0;
|
||||||
|
let cachePromise: Promise<SessionEntry[]> | null = null;
|
||||||
|
let cacheGeneration = 0;
|
||||||
const CACHE_TTL_MS = 30_000;
|
const CACHE_TTL_MS = 30_000;
|
||||||
|
|
||||||
async function getCachedSessions(): Promise<SessionEntry[]> {
|
async function getCachedSessions(): Promise<SessionEntry[]> {
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
if (now - cacheTimestamp > CACHE_TTL_MS) {
|
if (now - cacheTimestamp > CACHE_TTL_MS) {
|
||||||
cachedSessions = await discoverSessions();
|
// Deduplicate concurrent calls: reuse in-flight promise
|
||||||
cacheTimestamp = now;
|
if (!cachePromise) {
|
||||||
|
const gen = ++cacheGeneration;
|
||||||
|
cachePromise = discoverSessions().then((sessions) => {
|
||||||
|
// Only write cache if no newer generation has started
|
||||||
|
if (gen === cacheGeneration) {
|
||||||
|
cachedSessions = sessions;
|
||||||
|
cacheTimestamp = Date.now();
|
||||||
|
}
|
||||||
|
cachePromise = null;
|
||||||
|
return sessions;
|
||||||
|
}).catch((err) => {
|
||||||
|
cachePromise = null;
|
||||||
|
throw err;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return cachePromise;
|
||||||
}
|
}
|
||||||
return cachedSessions;
|
return cachedSessions;
|
||||||
}
|
}
|
||||||
@@ -23,6 +41,7 @@ sessionsRouter.get("/", async (req, res) => {
|
|||||||
try {
|
try {
|
||||||
if (req.query.refresh === "1") {
|
if (req.query.refresh === "1") {
|
||||||
cacheTimestamp = 0;
|
cacheTimestamp = 0;
|
||||||
|
cachePromise = null; // Discard any in-flight request so we force a fresh discovery
|
||||||
}
|
}
|
||||||
const sessions = await getCachedSessions();
|
const sessions = await getCachedSessions();
|
||||||
res.json({ sessions });
|
res.json({ sessions });
|
||||||
@@ -40,11 +59,13 @@ sessionsRouter.get("/:id", async (req, res) => {
|
|||||||
res.status(404).json({ error: "Session not found" });
|
res.status(404).json({ error: "Session not found" });
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
const messages = await parseSession(entry.path);
|
const allMessages = await parseSession(entry.path);
|
||||||
|
const { messages, toolProgress } = groupProgress(allMessages);
|
||||||
res.json({
|
res.json({
|
||||||
id: entry.id,
|
id: entry.id,
|
||||||
project: entry.project,
|
project: entry.project,
|
||||||
messages,
|
messages,
|
||||||
|
toolProgress,
|
||||||
});
|
});
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error("Failed to load session:", err);
|
console.error("Failed to load session:", err);
|
||||||
|
|||||||
42
src/server/services/progress-grouper.ts
Normal file
42
src/server/services/progress-grouper.ts
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
import type { ParsedMessage } from "../../shared/types.js";
|
||||||
|
|
||||||
|
export interface GroupedProgress {
|
||||||
|
messages: ParsedMessage[];
|
||||||
|
toolProgress: Record<string, ParsedMessage[]>;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function groupProgress(messages: ParsedMessage[]): GroupedProgress {
|
||||||
|
// Build set of all toolUseId values from tool_call messages
|
||||||
|
const toolUseIds = new Set<string>();
|
||||||
|
for (const msg of messages) {
|
||||||
|
if (msg.category === "tool_call" && msg.toolUseId) {
|
||||||
|
toolUseIds.add(msg.toolUseId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const filtered: ParsedMessage[] = [];
|
||||||
|
const toolProgress: Record<string, ParsedMessage[]> = {};
|
||||||
|
|
||||||
|
for (const msg of messages) {
|
||||||
|
// Parented progress: hook_progress with a parentToolUseId matching a known tool_call
|
||||||
|
if (
|
||||||
|
msg.category === "hook_progress" &&
|
||||||
|
msg.parentToolUseId &&
|
||||||
|
toolUseIds.has(msg.parentToolUseId)
|
||||||
|
) {
|
||||||
|
if (!toolProgress[msg.parentToolUseId]) {
|
||||||
|
toolProgress[msg.parentToolUseId] = [];
|
||||||
|
}
|
||||||
|
toolProgress[msg.parentToolUseId].push(msg);
|
||||||
|
} else {
|
||||||
|
filtered.push(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sort each group by rawIndex
|
||||||
|
for (const key of Object.keys(toolProgress)) {
|
||||||
|
toolProgress[key].sort((a, b) => a.rawIndex - b.rawIndex);
|
||||||
|
}
|
||||||
|
|
||||||
|
return { messages: filtered, toolProgress };
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user