Closes bd-3ny. Added mousedown listener that dismisses the dropdown when clicking outside both the dropdown and textarea. Uses early return to avoid registering listeners when dropdown is already closed.
384 lines
15 KiB
Python
384 lines
15 KiB
Python
import hashlib
|
|
import json
|
|
import subprocess
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from amc_server.context import (
|
|
EVENTS_DIR,
|
|
SESSIONS_DIR,
|
|
STALE_EVENT_AGE,
|
|
STALE_STARTING_AGE,
|
|
ZELLIJ_BIN,
|
|
_state_lock,
|
|
_zellij_cache,
|
|
)
|
|
from amc_server.logging_utils import LOGGER
|
|
|
|
|
|
class StateMixin:
|
|
def _serve_state(self):
|
|
payload = self._build_state_payload()
|
|
self._send_json(200, payload)
|
|
|
|
def _serve_stream(self):
|
|
"""SSE stream of full state snapshots, emitted on change."""
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "text/event-stream")
|
|
self.send_header("Cache-Control", "no-cache")
|
|
self.send_header("Connection", "keep-alive")
|
|
self.send_header("Access-Control-Allow-Origin", "*")
|
|
self.end_headers()
|
|
|
|
# Ask clients to reconnect quickly on transient errors.
|
|
try:
|
|
self.wfile.write(b"retry: 2000\n\n")
|
|
self.wfile.flush()
|
|
except (BrokenPipeError, ConnectionResetError, OSError):
|
|
return
|
|
|
|
last_hash = None
|
|
event_id = 0
|
|
last_heartbeat_at = time.time()
|
|
heartbeat_interval = 15
|
|
poll_interval = 1
|
|
|
|
try:
|
|
while True:
|
|
payload = self._build_state_payload()
|
|
payload_json = json.dumps(payload, separators=(",", ":"))
|
|
payload_hash = hashlib.sha1(payload_json.encode("utf-8")).hexdigest()
|
|
|
|
if payload_hash != last_hash:
|
|
event_id += 1
|
|
self._write_sse_event("state", payload_json, event_id)
|
|
last_hash = payload_hash
|
|
|
|
now = time.time()
|
|
if now - last_heartbeat_at >= heartbeat_interval:
|
|
self.wfile.write(b": ping\n\n")
|
|
self.wfile.flush()
|
|
last_heartbeat_at = now
|
|
|
|
time.sleep(poll_interval)
|
|
except (BrokenPipeError, ConnectionResetError, OSError):
|
|
# Client disconnected.
|
|
return
|
|
except Exception:
|
|
LOGGER.exception("Unhandled SSE stream error")
|
|
return
|
|
|
|
def _write_sse_event(self, event_name, data, event_id):
|
|
"""Write one SSE event frame."""
|
|
# JSON payload is compact single-line; still split defensively for SSE format.
|
|
frame = [f"id: {event_id}", f"event: {event_name}"]
|
|
for line in str(data).splitlines():
|
|
frame.append(f"data: {line}")
|
|
frame.append("")
|
|
frame.append("")
|
|
self.wfile.write("\n".join(frame).encode("utf-8"))
|
|
self.wfile.flush()
|
|
|
|
def _build_state_payload(self):
|
|
"""Build `/api/state` payload data used by JSON and SSE endpoints."""
|
|
sessions = self._collect_sessions()
|
|
return {
|
|
"sessions": sessions,
|
|
"server_time": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
|
|
def _collect_sessions(self):
|
|
"""Collect and normalize all session records from disk."""
|
|
with _state_lock:
|
|
sessions = []
|
|
SESSIONS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Discover active Codex sessions and create session files for them
|
|
self._discover_active_codex_sessions()
|
|
|
|
# Get active Zellij sessions for liveness check
|
|
active_zellij_sessions = self._get_active_zellij_sessions()
|
|
|
|
# Get set of transcript files with active processes (for dead detection)
|
|
active_transcript_files = self._get_active_transcript_files()
|
|
|
|
for f in SESSIONS_DIR.glob("*.json"):
|
|
try:
|
|
data = json.loads(f.read_text())
|
|
if not isinstance(data, dict):
|
|
continue
|
|
|
|
# Proactive liveness check: only auto-delete orphan "starting" sessions.
|
|
# Other statuses can still be useful as historical/debug context.
|
|
zellij_session = data.get("zellij_session", "")
|
|
if zellij_session and active_zellij_sessions is not None:
|
|
if zellij_session not in active_zellij_sessions:
|
|
if data.get("status") == "starting":
|
|
# A missing Zellij session while "starting" indicates an orphan.
|
|
f.unlink(missing_ok=True)
|
|
continue
|
|
|
|
context_usage = self._get_context_usage_for_session(data)
|
|
if context_usage:
|
|
data["context_usage"] = context_usage
|
|
|
|
# Capture turn token baseline on UserPromptSubmit (for per-turn token display)
|
|
# Only write once when the turn starts and we have token data
|
|
if (
|
|
data.get("last_event") == "UserPromptSubmit"
|
|
and "turn_start_tokens" not in data
|
|
and context_usage
|
|
and context_usage.get("current_tokens") is not None
|
|
):
|
|
data["turn_start_tokens"] = context_usage["current_tokens"]
|
|
# Persist to session file so it survives server restarts
|
|
try:
|
|
f.write_text(json.dumps(data, indent=2))
|
|
except OSError:
|
|
pass
|
|
|
|
# Track conversation file mtime for real-time update detection
|
|
conv_mtime = self._get_conversation_mtime(data)
|
|
if conv_mtime:
|
|
data["conversation_mtime_ns"] = conv_mtime
|
|
|
|
# Determine if session is "dead" (no longer interactable)
|
|
data["is_dead"] = self._is_session_dead(
|
|
data, active_zellij_sessions, active_transcript_files
|
|
)
|
|
|
|
sessions.append(data)
|
|
except (json.JSONDecodeError, OSError):
|
|
continue
|
|
except Exception:
|
|
LOGGER.exception("Failed processing session file %s", f)
|
|
continue
|
|
|
|
# Sort by session_id for stable, deterministic ordering (no visual jumping)
|
|
sessions.sort(key=lambda s: s.get("session_id", ""))
|
|
|
|
# Clean orphan event logs (sessions persist until manually dismissed or SessionEnd)
|
|
self._cleanup_stale(sessions)
|
|
|
|
return sessions
|
|
|
|
def _get_active_zellij_sessions(self):
|
|
"""Query Zellij for active sessions. Returns set of session names, or None on error."""
|
|
now = time.time()
|
|
|
|
# Use cached value if fresh (cache for 5 seconds to avoid hammering zellij)
|
|
if _zellij_cache["sessions"] is not None and now < _zellij_cache["expires"]:
|
|
return _zellij_cache["sessions"]
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
[ZELLIJ_BIN, "list-sessions", "--no-formatting"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=2,
|
|
)
|
|
if result.returncode == 0:
|
|
# Parse session names (one per line, format: "session_name [created ...]" or just "session_name")
|
|
sessions = set()
|
|
for line in result.stdout.strip().splitlines():
|
|
if line:
|
|
# Session name is the first word
|
|
session_name = line.split()[0] if line.split() else ""
|
|
if session_name:
|
|
sessions.add(session_name)
|
|
_zellij_cache["sessions"] = sessions
|
|
_zellij_cache["expires"] = now + 5 # Cache for 5 seconds
|
|
return sessions
|
|
except (subprocess.TimeoutExpired, FileNotFoundError, Exception):
|
|
pass
|
|
|
|
return None # Return None on error (don't clean up if we can't verify)
|
|
|
|
def _get_conversation_mtime(self, session_data):
|
|
"""Get the conversation file's mtime for real-time change detection."""
|
|
agent = session_data.get("agent")
|
|
|
|
if agent == "claude":
|
|
conv_file = self._get_claude_conversation_file(
|
|
session_data.get("session_id", ""),
|
|
session_data.get("project_dir", ""),
|
|
)
|
|
if conv_file:
|
|
try:
|
|
return conv_file.stat().st_mtime_ns
|
|
except OSError:
|
|
pass
|
|
|
|
elif agent == "codex":
|
|
transcript_path = session_data.get("transcript_path", "")
|
|
if transcript_path:
|
|
try:
|
|
return Path(transcript_path).stat().st_mtime_ns
|
|
except OSError:
|
|
pass
|
|
# Fallback to discovery
|
|
transcript_file = self._find_codex_transcript_file(session_data.get("session_id", ""))
|
|
if transcript_file:
|
|
try:
|
|
return transcript_file.stat().st_mtime_ns
|
|
except OSError:
|
|
pass
|
|
|
|
return None
|
|
|
|
def _get_active_transcript_files(self):
|
|
"""Get set of transcript files that have active processes.
|
|
|
|
Uses a batched lsof call to efficiently check which Codex transcript
|
|
files are currently open by a process.
|
|
|
|
Returns:
|
|
set: Absolute paths of transcript files with active processes.
|
|
"""
|
|
from amc_server.context import CODEX_SESSIONS_DIR
|
|
|
|
if not CODEX_SESSIONS_DIR.exists():
|
|
return set()
|
|
|
|
# Find all recent transcript files
|
|
transcript_files = []
|
|
now = time.time()
|
|
cutoff = now - 3600 # Only check files modified in the last hour
|
|
|
|
for jsonl_file in CODEX_SESSIONS_DIR.rglob("*.jsonl"):
|
|
try:
|
|
if jsonl_file.stat().st_mtime > cutoff:
|
|
transcript_files.append(str(jsonl_file))
|
|
except OSError:
|
|
continue
|
|
|
|
if not transcript_files:
|
|
return set()
|
|
|
|
# Batch lsof check for all transcript files
|
|
active_files = set()
|
|
try:
|
|
# lsof with multiple files: returns PIDs for any that are open
|
|
result = subprocess.run(
|
|
["lsof", "-t"] + transcript_files,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5,
|
|
)
|
|
# If any file is open, lsof returns 0
|
|
# We need to check which specific files are open
|
|
if result.returncode == 0 and result.stdout.strip():
|
|
# At least one file is open - check each one
|
|
for tf in transcript_files:
|
|
try:
|
|
check = subprocess.run(
|
|
["lsof", "-t", tf],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=2,
|
|
)
|
|
if check.returncode == 0 and check.stdout.strip():
|
|
active_files.add(tf)
|
|
except (subprocess.TimeoutExpired, Exception):
|
|
continue
|
|
except (subprocess.TimeoutExpired, FileNotFoundError, Exception):
|
|
pass
|
|
|
|
return active_files
|
|
|
|
def _is_session_dead(self, session_data, active_zellij_sessions, active_transcript_files):
|
|
"""Determine if a session is 'dead' (no longer interactable).
|
|
|
|
A dead session cannot receive input and won't produce more output.
|
|
These should be shown separately from active sessions in the UI.
|
|
|
|
Args:
|
|
session_data: The session dict
|
|
active_zellij_sessions: Set of active zellij session names (or None)
|
|
active_transcript_files: Set of transcript file paths with active processes
|
|
|
|
Returns:
|
|
bool: True if the session is dead
|
|
"""
|
|
agent = session_data.get("agent")
|
|
zellij_session = session_data.get("zellij_session", "")
|
|
status = session_data.get("status", "")
|
|
|
|
# Sessions that are still starting are not dead (yet)
|
|
if status == "starting":
|
|
return False
|
|
|
|
if agent == "codex":
|
|
# Codex session is dead if no process has the transcript file open
|
|
transcript_path = session_data.get("transcript_path", "")
|
|
if not transcript_path:
|
|
return True # No transcript path = malformed, treat as dead
|
|
|
|
# Check cached set first (covers recently-modified files)
|
|
if transcript_path in active_transcript_files:
|
|
return False # Process is running
|
|
|
|
# For older files not in cached set, do explicit lsof check
|
|
# This handles long-idle but still-running processes
|
|
if self._is_file_open(transcript_path):
|
|
return False # Process is running
|
|
|
|
# No process running - it's dead
|
|
return True
|
|
|
|
elif agent == "claude":
|
|
# Claude session is dead if:
|
|
# 1. No zellij session attached, OR
|
|
# 2. The zellij session no longer exists
|
|
if not zellij_session:
|
|
return True
|
|
if active_zellij_sessions is not None:
|
|
return zellij_session not in active_zellij_sessions
|
|
# If we couldn't query zellij, assume alive (don't false-positive)
|
|
return False
|
|
|
|
# Unknown agent type - assume alive
|
|
return False
|
|
|
|
def _is_file_open(self, file_path):
|
|
"""Check if any process has a file open using lsof."""
|
|
try:
|
|
result = subprocess.run(
|
|
["lsof", "-t", file_path],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=2,
|
|
)
|
|
return result.returncode == 0 and result.stdout.strip()
|
|
except (subprocess.TimeoutExpired, FileNotFoundError, Exception):
|
|
return False # Assume not open on error (conservative)
|
|
|
|
def _cleanup_stale(self, sessions):
|
|
"""Remove orphan event logs >24h and stale 'starting' sessions >1h."""
|
|
active_ids = {s.get("session_id") for s in sessions if s.get("session_id")}
|
|
now = time.time()
|
|
|
|
# Clean up orphan event logs
|
|
EVENTS_DIR.mkdir(parents=True, exist_ok=True)
|
|
for f in EVENTS_DIR.glob("*.jsonl"):
|
|
session_id = f.stem
|
|
if session_id not in active_ids:
|
|
try:
|
|
age = now - f.stat().st_mtime
|
|
if age > STALE_EVENT_AGE:
|
|
f.unlink()
|
|
except OSError:
|
|
pass
|
|
|
|
# Clean up orphan "starting" sessions (never became active)
|
|
for f in SESSIONS_DIR.glob("*.json"):
|
|
try:
|
|
age = now - f.stat().st_mtime
|
|
if age > STALE_STARTING_AGE:
|
|
data = json.loads(f.read_text())
|
|
if data.get("status") == "starting":
|
|
f.unlink()
|
|
except (json.JSONDecodeError, OSError):
|
|
pass
|