Add conversation_mtime_ns field to session state that tracks the actual
modification time of conversation files. This enables more responsive
dashboard updates by detecting changes that occur between hook events
(e.g., during streaming tool execution).
Changes:
- state.py: Add _get_conversation_mtime() to stat conversation files
and include mtime_ns in session payloads when available
- conversation.py: Add stable message IDs (claude-{session}-{n} format)
for React key stability and message deduplication
- control.py: Fix FIFO eviction for dismissed_codex_ids - set.pop()
removes arbitrary element, now uses dict with insertion-order iteration
- context.py: Update dismissed_codex_ids type from set to dict
The mtime approach complements existing last_event_at tracking:
- last_event_at: Changes on hook events (session boundaries)
- conversation_mtime_ns: Changes on every file write (real-time)
Dashboard can now detect mid-session conversation updates without
waiting for the next hook event.
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
279 lines
13 KiB
Python
279 lines
13 KiB
Python
import json
|
|
import os
|
|
|
|
from amc_server.context import EVENTS_DIR
|
|
|
|
|
|
class ConversationMixin:
|
|
def _serve_events(self, session_id):
|
|
# Sanitize session_id to prevent path traversal
|
|
safe_id = os.path.basename(session_id)
|
|
event_file = EVENTS_DIR / f"{safe_id}.jsonl"
|
|
|
|
events = []
|
|
if event_file.exists():
|
|
try:
|
|
for line in event_file.read_text().splitlines():
|
|
if line.strip():
|
|
try:
|
|
events.append(json.loads(line))
|
|
except json.JSONDecodeError:
|
|
continue
|
|
except OSError:
|
|
pass
|
|
|
|
self._send_json(200, {"session_id": safe_id, "events": events})
|
|
|
|
def _serve_conversation(self, session_id, project_dir, agent="claude"):
|
|
"""Serve conversation history from Claude Code or Codex JSONL file."""
|
|
safe_id = os.path.basename(session_id)
|
|
messages = []
|
|
|
|
if agent == "codex":
|
|
messages = self._parse_codex_conversation(safe_id)
|
|
else:
|
|
messages = self._parse_claude_conversation(safe_id, project_dir)
|
|
|
|
self._send_json(200, {"session_id": safe_id, "messages": messages})
|
|
|
|
def _parse_claude_conversation(self, session_id, project_dir):
|
|
"""Parse Claude Code JSONL conversation format."""
|
|
messages = []
|
|
msg_id = 0
|
|
|
|
conv_file = self._get_claude_conversation_file(session_id, project_dir)
|
|
|
|
if conv_file and conv_file.exists():
|
|
try:
|
|
for line in conv_file.read_text().splitlines():
|
|
if not line.strip():
|
|
continue
|
|
try:
|
|
entry = json.loads(line)
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
msg_type = entry.get("type")
|
|
|
|
if msg_type == "user":
|
|
content = entry.get("message", {}).get("content", "")
|
|
# Only include actual human messages (strings), not tool results (arrays)
|
|
if content and isinstance(content, str):
|
|
messages.append({
|
|
"id": f"claude-{session_id[:8]}-{msg_id}",
|
|
"role": "user",
|
|
"content": content,
|
|
"timestamp": entry.get("timestamp", ""),
|
|
})
|
|
msg_id += 1
|
|
|
|
elif msg_type == "assistant":
|
|
# Assistant messages have structured content
|
|
message = entry.get("message", {})
|
|
if not isinstance(message, dict):
|
|
continue
|
|
raw_content = message.get("content", [])
|
|
if not isinstance(raw_content, list):
|
|
continue
|
|
text_parts = []
|
|
tool_calls = []
|
|
thinking_parts = []
|
|
for part in raw_content:
|
|
if isinstance(part, dict):
|
|
ptype = part.get("type")
|
|
if ptype == "text":
|
|
text_parts.append(part.get("text", ""))
|
|
elif ptype == "tool_use":
|
|
tool_calls.append({
|
|
"name": part.get("name", "unknown"),
|
|
"input": part.get("input", {}),
|
|
})
|
|
elif ptype == "thinking":
|
|
thinking_parts.append(part.get("thinking", ""))
|
|
elif isinstance(part, str):
|
|
text_parts.append(part)
|
|
if text_parts or tool_calls or thinking_parts:
|
|
msg = {
|
|
"id": f"claude-{session_id[:8]}-{msg_id}",
|
|
"role": "assistant",
|
|
"content": "\n".join(text_parts) if text_parts else "",
|
|
"timestamp": entry.get("timestamp", ""),
|
|
}
|
|
if tool_calls:
|
|
msg["tool_calls"] = tool_calls
|
|
if thinking_parts:
|
|
msg["thinking"] = "\n\n".join(thinking_parts)
|
|
messages.append(msg)
|
|
msg_id += 1
|
|
|
|
except json.JSONDecodeError:
|
|
continue
|
|
except OSError:
|
|
pass
|
|
|
|
return messages
|
|
|
|
def _parse_codex_conversation(self, session_id):
|
|
"""Parse Codex JSONL conversation format.
|
|
|
|
Codex uses separate response_items for different content types:
|
|
- message: user/assistant text messages
|
|
- function_call: tool invocations (name, arguments, call_id)
|
|
- reasoning: thinking summaries (encrypted content, visible summary)
|
|
"""
|
|
messages = []
|
|
pending_tool_calls = [] # Accumulate tool calls to attach to next assistant message
|
|
msg_id = 0
|
|
|
|
conv_file = self._find_codex_transcript_file(session_id)
|
|
|
|
if conv_file and conv_file.exists():
|
|
try:
|
|
for line in conv_file.read_text().splitlines():
|
|
if not line.strip():
|
|
continue
|
|
try:
|
|
entry = json.loads(line)
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
|
|
if entry.get("type") != "response_item":
|
|
continue
|
|
|
|
payload = entry.get("payload", {})
|
|
if not isinstance(payload, dict):
|
|
continue
|
|
|
|
payload_type = payload.get("type")
|
|
timestamp = entry.get("timestamp", "")
|
|
|
|
# Handle function_call (tool invocations)
|
|
if payload_type == "function_call":
|
|
tool_call = {
|
|
"name": payload.get("name", "unknown"),
|
|
"input": self._parse_codex_arguments(payload.get("arguments", "{}")),
|
|
}
|
|
pending_tool_calls.append(tool_call)
|
|
continue
|
|
|
|
# Handle reasoning (thinking summaries)
|
|
if payload_type == "reasoning":
|
|
summary_parts = payload.get("summary", [])
|
|
if summary_parts:
|
|
thinking_text = []
|
|
for part in summary_parts:
|
|
if isinstance(part, dict) and part.get("type") == "summary_text":
|
|
thinking_text.append(part.get("text", ""))
|
|
if thinking_text:
|
|
# Flush any pending tool calls first
|
|
if pending_tool_calls:
|
|
messages.append({
|
|
"id": f"codex-{session_id[:8]}-{msg_id}",
|
|
"role": "assistant",
|
|
"content": "",
|
|
"tool_calls": pending_tool_calls,
|
|
"timestamp": timestamp,
|
|
})
|
|
msg_id += 1
|
|
pending_tool_calls = []
|
|
# Add thinking as assistant message
|
|
messages.append({
|
|
"id": f"codex-{session_id[:8]}-{msg_id}",
|
|
"role": "assistant",
|
|
"content": "",
|
|
"thinking": "\n".join(thinking_text),
|
|
"timestamp": timestamp,
|
|
})
|
|
msg_id += 1
|
|
continue
|
|
|
|
# Handle message (user/assistant text)
|
|
if payload_type == "message":
|
|
role = payload.get("role", "")
|
|
content_parts = payload.get("content", [])
|
|
if not isinstance(content_parts, list):
|
|
continue
|
|
|
|
# Skip developer role (system context/permissions)
|
|
if role == "developer":
|
|
continue
|
|
|
|
# Extract text from content array
|
|
text_parts = []
|
|
for part in content_parts:
|
|
if isinstance(part, dict):
|
|
text = part.get("text", "")
|
|
if text:
|
|
# Skip injected context (AGENTS.md, environment, permissions)
|
|
skip_prefixes = (
|
|
"<INSTRUCTIONS>",
|
|
"<environment_context>",
|
|
"<permissions instructions>",
|
|
"# AGENTS.md instructions",
|
|
)
|
|
if any(text.startswith(p) for p in skip_prefixes):
|
|
continue
|
|
text_parts.append(text)
|
|
|
|
if role == "user" and text_parts:
|
|
# Flush any pending tool calls before user message
|
|
if pending_tool_calls:
|
|
messages.append({
|
|
"id": f"codex-{session_id[:8]}-{msg_id}",
|
|
"role": "assistant",
|
|
"content": "",
|
|
"tool_calls": pending_tool_calls,
|
|
"timestamp": timestamp,
|
|
})
|
|
msg_id += 1
|
|
pending_tool_calls = []
|
|
messages.append({
|
|
"id": f"codex-{session_id[:8]}-{msg_id}",
|
|
"role": "user",
|
|
"content": "\n".join(text_parts),
|
|
"timestamp": timestamp,
|
|
})
|
|
msg_id += 1
|
|
elif role == "assistant":
|
|
msg = {
|
|
"id": f"codex-{session_id[:8]}-{msg_id}",
|
|
"role": "assistant",
|
|
"content": "\n".join(text_parts) if text_parts else "",
|
|
"timestamp": timestamp,
|
|
}
|
|
# Attach any pending tool calls to this assistant message
|
|
if pending_tool_calls:
|
|
msg["tool_calls"] = pending_tool_calls
|
|
pending_tool_calls = []
|
|
if text_parts or msg.get("tool_calls"):
|
|
messages.append(msg)
|
|
msg_id += 1
|
|
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
# Flush any remaining pending tool calls
|
|
if pending_tool_calls:
|
|
messages.append({
|
|
"id": f"codex-{session_id[:8]}-{msg_id}",
|
|
"role": "assistant",
|
|
"content": "",
|
|
"tool_calls": pending_tool_calls,
|
|
"timestamp": "",
|
|
})
|
|
|
|
except OSError:
|
|
pass
|
|
|
|
return messages
|
|
|
|
def _parse_codex_arguments(self, arguments_str):
|
|
"""Parse Codex function_call arguments (JSON string or dict)."""
|
|
if isinstance(arguments_str, dict):
|
|
return arguments_str
|
|
if isinstance(arguments_str, str):
|
|
try:
|
|
return json.loads(arguments_str)
|
|
except json.JSONDecodeError:
|
|
return {"raw": arguments_str}
|
|
return {}
|