Split the 860+ line bin/amc-server into a modular Python package:
amc_server/
__init__.py - Package marker
context.py - Shared constants (DATA_DIR, PORT, CLAUDE_PROJECTS_DIR, etc.)
handler.py - AMCHandler class using mixin composition
logging_utils.py - Structured logging setup with signal handlers
server.py - Main entry point (ThreadingHTTPServer)
mixins/
__init__.py - Mixin package marker
control.py - Session control (dismiss, respond via Zellij)
conversation.py - Conversation history parsing (Claude JSONL format)
discovery.py - Session discovery (Codex pane inspection, Zellij cache)
http.py - HTTP response helpers (CORS, JSON, static files)
parsing.py - Session state parsing and aggregation
state.py - Session state endpoint logic
The monolithic bin/amc-server becomes a thin launcher that just imports
and calls main(). This separation enables:
- Easier testing of individual components
- Better IDE support (proper Python package structure)
- Cleaner separation of concerns (discovery vs parsing vs control)
- ThreadingHTTPServer instead of single-threaded (handles concurrent requests)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
195 lines
7.4 KiB
Python
195 lines
7.4 KiB
Python
import hashlib
|
|
import json
|
|
import subprocess
|
|
import time
|
|
from datetime import datetime, timezone
|
|
|
|
from amc_server.context import (
|
|
EVENTS_DIR,
|
|
SESSIONS_DIR,
|
|
STALE_EVENT_AGE,
|
|
STALE_STARTING_AGE,
|
|
_state_lock,
|
|
_zellij_cache,
|
|
)
|
|
from amc_server.logging_utils import LOGGER
|
|
|
|
|
|
class StateMixin:
|
|
def _serve_state(self):
|
|
payload = self._build_state_payload()
|
|
self._send_json(200, payload)
|
|
|
|
def _serve_stream(self):
|
|
"""SSE stream of full state snapshots, emitted on change."""
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "text/event-stream")
|
|
self.send_header("Cache-Control", "no-cache")
|
|
self.send_header("Connection", "keep-alive")
|
|
self.send_header("Access-Control-Allow-Origin", "*")
|
|
self.end_headers()
|
|
|
|
# Ask clients to reconnect quickly on transient errors.
|
|
try:
|
|
self.wfile.write(b"retry: 2000\n\n")
|
|
self.wfile.flush()
|
|
except (BrokenPipeError, ConnectionResetError, OSError):
|
|
return
|
|
|
|
last_hash = None
|
|
event_id = 0
|
|
last_heartbeat_at = time.time()
|
|
heartbeat_interval = 15
|
|
poll_interval = 1
|
|
|
|
try:
|
|
while True:
|
|
payload = self._build_state_payload()
|
|
payload_json = json.dumps(payload, separators=(",", ":"))
|
|
payload_hash = hashlib.sha1(payload_json.encode("utf-8")).hexdigest()
|
|
|
|
if payload_hash != last_hash:
|
|
event_id += 1
|
|
self._write_sse_event("state", payload_json, event_id)
|
|
last_hash = payload_hash
|
|
|
|
now = time.time()
|
|
if now - last_heartbeat_at >= heartbeat_interval:
|
|
self.wfile.write(b": ping\n\n")
|
|
self.wfile.flush()
|
|
last_heartbeat_at = now
|
|
|
|
time.sleep(poll_interval)
|
|
except (BrokenPipeError, ConnectionResetError, OSError):
|
|
# Client disconnected.
|
|
return
|
|
except Exception:
|
|
LOGGER.exception("Unhandled SSE stream error")
|
|
return
|
|
|
|
def _write_sse_event(self, event_name, data, event_id):
|
|
"""Write one SSE event frame."""
|
|
# JSON payload is compact single-line; still split defensively for SSE format.
|
|
frame = [f"id: {event_id}", f"event: {event_name}"]
|
|
for line in str(data).splitlines():
|
|
frame.append(f"data: {line}")
|
|
frame.append("")
|
|
frame.append("")
|
|
self.wfile.write("\n".join(frame).encode("utf-8"))
|
|
self.wfile.flush()
|
|
|
|
def _build_state_payload(self):
|
|
"""Build `/api/state` payload data used by JSON and SSE endpoints."""
|
|
sessions = self._collect_sessions()
|
|
return {
|
|
"sessions": sessions,
|
|
"server_time": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
|
|
def _collect_sessions(self):
|
|
"""Collect and normalize all session records from disk."""
|
|
with _state_lock:
|
|
sessions = []
|
|
SESSIONS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Discover active Codex sessions and create session files for them
|
|
self._discover_active_codex_sessions()
|
|
|
|
# Get active Zellij sessions for liveness check
|
|
active_zellij_sessions = self._get_active_zellij_sessions()
|
|
|
|
for f in SESSIONS_DIR.glob("*.json"):
|
|
try:
|
|
data = json.loads(f.read_text())
|
|
if not isinstance(data, dict):
|
|
continue
|
|
|
|
# Proactive liveness check: only auto-delete orphan "starting" sessions.
|
|
# Other statuses can still be useful as historical/debug context.
|
|
zellij_session = data.get("zellij_session", "")
|
|
if zellij_session and active_zellij_sessions is not None:
|
|
if zellij_session not in active_zellij_sessions:
|
|
if data.get("status") == "starting":
|
|
# A missing Zellij session while "starting" indicates an orphan.
|
|
f.unlink(missing_ok=True)
|
|
continue
|
|
|
|
context_usage = self._get_context_usage_for_session(data)
|
|
if context_usage:
|
|
data["context_usage"] = context_usage
|
|
|
|
sessions.append(data)
|
|
except (json.JSONDecodeError, OSError):
|
|
continue
|
|
except Exception:
|
|
LOGGER.exception("Failed processing session file %s", f)
|
|
continue
|
|
|
|
# Sort by last_event_at descending
|
|
sessions.sort(key=lambda s: s.get("last_event_at", ""), reverse=True)
|
|
|
|
# Clean orphan event logs (sessions persist until manually dismissed or SessionEnd)
|
|
self._cleanup_stale(sessions)
|
|
|
|
return sessions
|
|
|
|
def _get_active_zellij_sessions(self):
|
|
"""Query Zellij for active sessions. Returns set of session names, or None on error."""
|
|
now = time.time()
|
|
|
|
# Use cached value if fresh (cache for 5 seconds to avoid hammering zellij)
|
|
if _zellij_cache["sessions"] is not None and now < _zellij_cache["expires"]:
|
|
return _zellij_cache["sessions"]
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
["zellij", "list-sessions", "--no-formatting"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=2,
|
|
)
|
|
if result.returncode == 0:
|
|
# Parse session names (one per line, format: "session_name [created ...]" or just "session_name")
|
|
sessions = set()
|
|
for line in result.stdout.strip().splitlines():
|
|
if line:
|
|
# Session name is the first word
|
|
session_name = line.split()[0] if line.split() else ""
|
|
if session_name:
|
|
sessions.add(session_name)
|
|
_zellij_cache["sessions"] = sessions
|
|
_zellij_cache["expires"] = now + 5 # Cache for 5 seconds
|
|
return sessions
|
|
except (subprocess.TimeoutExpired, FileNotFoundError, Exception):
|
|
pass
|
|
|
|
return None # Return None on error (don't clean up if we can't verify)
|
|
|
|
def _cleanup_stale(self, sessions):
|
|
"""Remove orphan event logs >24h and stale 'starting' sessions >1h."""
|
|
active_ids = {s.get("session_id") for s in sessions if s.get("session_id")}
|
|
now = time.time()
|
|
|
|
# Clean up orphan event logs
|
|
EVENTS_DIR.mkdir(parents=True, exist_ok=True)
|
|
for f in EVENTS_DIR.glob("*.jsonl"):
|
|
session_id = f.stem
|
|
if session_id not in active_ids:
|
|
try:
|
|
age = now - f.stat().st_mtime
|
|
if age > STALE_EVENT_AGE:
|
|
f.unlink()
|
|
except OSError:
|
|
pass
|
|
|
|
# Clean up orphan "starting" sessions (never became active)
|
|
for f in SESSIONS_DIR.glob("*.json"):
|
|
try:
|
|
age = now - f.stat().st_mtime
|
|
if age > STALE_STARTING_AGE:
|
|
data = json.loads(f.read_text())
|
|
if data.get("status") == "starting":
|
|
f.unlink()
|
|
except (json.JSONDecodeError, OSError):
|
|
pass
|