Split the 860+ line bin/amc-server into a modular Python package:
amc_server/
__init__.py - Package marker
context.py - Shared constants (DATA_DIR, PORT, CLAUDE_PROJECTS_DIR, etc.)
handler.py - AMCHandler class using mixin composition
logging_utils.py - Structured logging setup with signal handlers
server.py - Main entry point (ThreadingHTTPServer)
mixins/
__init__.py - Mixin package marker
control.py - Session control (dismiss, respond via Zellij)
conversation.py - Conversation history parsing (Claude JSONL format)
discovery.py - Session discovery (Codex pane inspection, Zellij cache)
http.py - HTTP response helpers (CORS, JSON, static files)
parsing.py - Session state parsing and aggregation
state.py - Session state endpoint logic
The monolithic bin/amc-server becomes a thin launcher that just imports
and calls main(). This separation enables:
- Easier testing of individual components
- Better IDE support (proper Python package structure)
- Cleaner separation of concerns (discovery vs parsing vs control)
- ThreadingHTTPServer instead of single-threaded (handles concurrent requests)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
282 lines
12 KiB
Python
282 lines
12 KiB
Python
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import time
|
|
from datetime import datetime, timezone
|
|
|
|
from amc_server.context import (
|
|
CODEX_ACTIVE_WINDOW,
|
|
CODEX_SESSIONS_DIR,
|
|
SESSIONS_DIR,
|
|
_CODEX_CACHE_MAX,
|
|
_codex_pane_cache,
|
|
_codex_transcript_cache,
|
|
_dismissed_codex_ids,
|
|
)
|
|
from amc_server.logging_utils import LOGGER
|
|
|
|
|
|
class SessionDiscoveryMixin:
|
|
def _discover_active_codex_sessions(self):
|
|
"""Find active Codex sessions and create/update session files with Zellij pane info."""
|
|
if not CODEX_SESSIONS_DIR.exists():
|
|
return
|
|
|
|
# Get Zellij pane info for running codex processes
|
|
pid_info, cwd_map = self._get_codex_pane_info()
|
|
|
|
# Only look at sessions modified recently (active)
|
|
now = time.time()
|
|
cutoff = now - CODEX_ACTIVE_WINDOW
|
|
|
|
for jsonl_file in CODEX_SESSIONS_DIR.rglob("*.jsonl"):
|
|
try:
|
|
# Skip old files
|
|
mtime = jsonl_file.stat().st_mtime
|
|
if mtime < cutoff:
|
|
continue
|
|
|
|
# Extract session ID from filename
|
|
match = re.search(r"([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})", jsonl_file.name)
|
|
if not match:
|
|
continue
|
|
|
|
session_id = match.group(1)
|
|
# Evict old entries if cache is full (simple FIFO)
|
|
if len(_codex_transcript_cache) >= _CODEX_CACHE_MAX:
|
|
keys_to_remove = list(_codex_transcript_cache.keys())[: _CODEX_CACHE_MAX // 5]
|
|
for k in keys_to_remove:
|
|
_codex_transcript_cache.pop(k, None)
|
|
_codex_transcript_cache[session_id] = str(jsonl_file)
|
|
|
|
# Skip sessions the user has dismissed
|
|
if session_id in _dismissed_codex_ids:
|
|
continue
|
|
|
|
session_file = SESSIONS_DIR / f"{session_id}.json"
|
|
|
|
# Parse first line to get session metadata
|
|
with jsonl_file.open() as f:
|
|
first_line = f.readline().strip()
|
|
if not first_line:
|
|
continue
|
|
|
|
meta = json.loads(first_line)
|
|
if not isinstance(meta, dict):
|
|
continue
|
|
if meta.get("type") != "session_meta":
|
|
continue
|
|
|
|
payload = self._as_dict(meta.get("payload"))
|
|
cwd = payload.get("cwd", "")
|
|
project = os.path.basename(cwd) if cwd else "Unknown"
|
|
|
|
# Match session to Zellij pane (UUID match via lsof, CWD fallback)
|
|
zellij_session, zellij_pane = self._match_codex_session_to_pane(
|
|
jsonl_file, cwd, pid_info, cwd_map
|
|
)
|
|
|
|
# Determine status based on file age
|
|
file_age_minutes = (now - mtime) / 60
|
|
if file_age_minutes < 2:
|
|
status = "active"
|
|
else:
|
|
status = "done"
|
|
|
|
# Read existing session to preserve some fields
|
|
existing = {}
|
|
if session_file.exists():
|
|
try:
|
|
loaded_existing = json.loads(session_file.read_text())
|
|
if isinstance(loaded_existing, dict):
|
|
existing = loaded_existing
|
|
# Don't downgrade active to done if file was just updated
|
|
if existing.get("status") == "active" and status == "done":
|
|
# Check if we should keep it active
|
|
if file_age_minutes < 5:
|
|
status = "active"
|
|
except (json.JSONDecodeError, OSError):
|
|
pass
|
|
|
|
# Get last message preview from recent lines
|
|
last_message = ""
|
|
try:
|
|
tail_entries = self._read_jsonl_tail_entries(jsonl_file, max_lines=60, max_bytes=800 * 1024)
|
|
for entry in reversed(tail_entries):
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
if entry.get("type") == "response_item":
|
|
payload_item = self._as_dict(entry.get("payload"))
|
|
if payload_item.get("role") == "assistant":
|
|
content = payload_item.get("content", [])
|
|
if not isinstance(content, list):
|
|
continue
|
|
for part in content:
|
|
if isinstance(part, dict) and part.get("text"):
|
|
text = part["text"]
|
|
# Skip system content
|
|
if not text.startswith("<") and not text.startswith("#"):
|
|
last_message = text[:200]
|
|
break
|
|
if last_message:
|
|
break
|
|
except (json.JSONDecodeError, OSError):
|
|
pass
|
|
|
|
context_usage = self._get_cached_context_usage(
|
|
jsonl_file, self._parse_codex_context_usage_from_file
|
|
)
|
|
|
|
session_ts = payload.get("timestamp", "")
|
|
last_event_at = datetime.fromtimestamp(mtime, tz=timezone.utc).isoformat()
|
|
|
|
session_data = {
|
|
"session_id": session_id,
|
|
"agent": "codex",
|
|
"project": project,
|
|
"project_dir": cwd,
|
|
"status": status,
|
|
"started_at": existing.get("started_at", session_ts),
|
|
"last_event_at": last_event_at,
|
|
"last_event": "CodexSession",
|
|
"last_message_preview": last_message,
|
|
"zellij_session": zellij_session or existing.get("zellij_session", ""),
|
|
"zellij_pane": zellij_pane or existing.get("zellij_pane", ""),
|
|
"transcript_path": str(jsonl_file),
|
|
}
|
|
if context_usage:
|
|
session_data["context_usage"] = context_usage
|
|
elif existing.get("context_usage"):
|
|
session_data["context_usage"] = existing.get("context_usage")
|
|
|
|
session_file.write_text(json.dumps(session_data, indent=2))
|
|
|
|
except (OSError, json.JSONDecodeError):
|
|
continue
|
|
except Exception:
|
|
LOGGER.exception("Failed to discover Codex session from %s", jsonl_file)
|
|
continue
|
|
|
|
def _get_codex_pane_info(self):
|
|
"""Get Zellij pane info for running codex processes via process inspection.
|
|
|
|
Extracts ZELLIJ_PANE_ID from each codex process's inherited environment,
|
|
since zellij dump-layout doesn't provide pane IDs.
|
|
|
|
Results are cached for 5 seconds to avoid running pgrep/ps/lsof on
|
|
every dashboard poll.
|
|
|
|
Returns:
|
|
tuple: (pid_info, cwd_map)
|
|
pid_info: {pid_str: {"pane_id": str, "zellij_session": str}}
|
|
cwd_map: {cwd_path: {"session": str, "pane_id": str}}
|
|
"""
|
|
now = time.time()
|
|
if now < _codex_pane_cache["expires"]:
|
|
return _codex_pane_cache["pid_info"], _codex_pane_cache["cwd_map"]
|
|
|
|
pid_info = {}
|
|
cwd_map = {}
|
|
|
|
try:
|
|
# Step 1: Find codex process PIDs
|
|
result = subprocess.run(
|
|
["pgrep", "-x", "codex"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=2,
|
|
)
|
|
pids = [p.strip() for p in result.stdout.strip().splitlines() if p.strip()] if result.returncode == 0 else []
|
|
|
|
# Step 2: Extract ZELLIJ env vars from each process
|
|
for pid in pids:
|
|
try:
|
|
env_result = subprocess.run(
|
|
["ps", "eww", "-o", "args=", "-p", pid],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=2,
|
|
)
|
|
if env_result.returncode != 0:
|
|
continue
|
|
|
|
env_str = env_result.stdout
|
|
pane_match = re.search(r"ZELLIJ_PANE_ID=(\d+)", env_str)
|
|
session_match = re.search(r"ZELLIJ_SESSION_NAME=(\S+)", env_str)
|
|
|
|
if pane_match and session_match:
|
|
pid_info[pid] = {
|
|
"pane_id": pane_match.group(1),
|
|
"zellij_session": session_match.group(1),
|
|
}
|
|
except (subprocess.TimeoutExpired, Exception):
|
|
continue
|
|
|
|
# Step 3: Get CWDs via single batched lsof call
|
|
if pid_info:
|
|
pid_list = ",".join(pid_info.keys())
|
|
try:
|
|
cwd_result = subprocess.run(
|
|
["lsof", "-a", "-p", pid_list, "-d", "cwd", "-Fn"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=3,
|
|
)
|
|
if cwd_result.returncode == 0:
|
|
current_pid = None
|
|
for line in cwd_result.stdout.splitlines():
|
|
if line.startswith("p"):
|
|
current_pid = line[1:]
|
|
elif line.startswith("n/") and current_pid and current_pid in pid_info:
|
|
cwd = line[1:]
|
|
info = pid_info[current_pid]
|
|
cwd_map[cwd] = {
|
|
"session": info["zellij_session"],
|
|
"pane_id": info["pane_id"],
|
|
}
|
|
except (subprocess.TimeoutExpired, Exception):
|
|
pass
|
|
|
|
except (subprocess.TimeoutExpired, FileNotFoundError, Exception):
|
|
pass
|
|
|
|
_codex_pane_cache["pid_info"] = pid_info
|
|
_codex_pane_cache["cwd_map"] = cwd_map
|
|
_codex_pane_cache["expires"] = now + 5 # Cache for 5 seconds
|
|
|
|
return pid_info, cwd_map
|
|
|
|
def _match_codex_session_to_pane(self, session_file, session_cwd, pid_info, cwd_map):
|
|
"""Match a Codex session file to a Zellij pane.
|
|
|
|
Tries session-file-to-PID matching first (via lsof), falls back to CWD.
|
|
|
|
Returns:
|
|
tuple: (zellij_session, pane_id) or ("", "")
|
|
"""
|
|
# Try precise match: which process has this session file open?
|
|
try:
|
|
result = subprocess.run(
|
|
["lsof", "-t", str(session_file)],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=2,
|
|
)
|
|
if result.returncode == 0 and result.stdout.strip():
|
|
for pid in result.stdout.strip().splitlines():
|
|
pid = pid.strip()
|
|
if pid in pid_info:
|
|
info = pid_info[pid]
|
|
return info["zellij_session"], info["pane_id"]
|
|
except (subprocess.TimeoutExpired, Exception):
|
|
pass
|
|
|
|
# Fall back to CWD match
|
|
normalized_cwd = os.path.normpath(session_cwd) if session_cwd else ""
|
|
for pane_cwd, info in cwd_map.items():
|
|
if os.path.normpath(pane_cwd) == normalized_cwd:
|
|
return info["session"], info["pane_id"]
|
|
|
|
return "", ""
|