Files
amc/bin/amc-server
teernisse e994c7a0e8 feat(server): rewrite Codex pane discovery using process inspection
Replace the zellij dump-layout approach with direct process inspection
for matching Codex sessions to Zellij panes. The old method couldn't
extract pane IDs from layout output, leaving them empty. The new
approach uses a three-step pipeline:

1. pgrep -x codex to find running Codex PIDs
2. ps eww to extract ZELLIJ_PANE_ID and ZELLIJ_SESSION_NAME from each
   process's inherited environment variables
3. lsof -a -p <pids> -d cwd to batch-resolve working directories

Session-to-pane matching then uses a two-tier strategy:
- Primary: lsof -t <session_file> to find which PID has the JSONL open
- Fallback: normalized CWD comparison

Also adds:
- _codex_pane_cache with 5-second TTL to avoid running pgrep/ps/lsof
  on every dashboard poll cycle
- _dismissed_codex_ids set to track Codex sessions the user has
  dismissed, preventing re-discovery on subsequent polls
- Clearer error message when session lacks pane info for input routing
2026-02-25 11:51:28 -05:00

865 lines
34 KiB
Python
Executable File

#!/usr/bin/env python3
"""AMC server — serves the dashboard and session state API.
Endpoints:
GET / → dashboard.html
GET /api/state → aggregated session state JSON
GET /api/events/ID → event timeline for one session
GET /api/conversation/ID → conversation history for a session
POST /api/dismiss/ID → dismiss (delete) a completed session
POST /api/respond/ID → inject response into session's Zellij pane
"""
import json
import os
import re
import subprocess
import time
import urllib.parse
from datetime import datetime, timezone
from http.server import HTTPServer, BaseHTTPRequestHandler
from pathlib import Path
# Claude Code conversation directory
CLAUDE_PROJECTS_DIR = Path.home() / ".claude" / "projects"
# Codex conversation directory
CODEX_SESSIONS_DIR = Path.home() / ".codex" / "sessions"
# Plugin path for zellij-send-keys
ZELLIJ_PLUGIN = Path.home() / ".config" / "zellij" / "plugins" / "zellij-send-keys.wasm"
# Runtime data lives in XDG data dir
DATA_DIR = Path.home() / ".local" / "share" / "amc"
SESSIONS_DIR = DATA_DIR / "sessions"
EVENTS_DIR = DATA_DIR / "events"
# Source files live in project directory (relative to this script)
PROJECT_DIR = Path(__file__).resolve().parent.parent
DASHBOARD_FILE = PROJECT_DIR / "dashboard.html"
PORT = 7400
STALE_EVENT_AGE = 86400 # 24 hours in seconds
STALE_STARTING_AGE = 3600 # 1 hour - sessions stuck in "starting" are orphans
# Cache for Zellij session list (avoid calling zellij on every request)
_zellij_cache = {"sessions": None, "expires": 0}
# Cache for Codex pane info (avoid running pgrep/ps/lsof on every request)
_codex_pane_cache = {"pid_info": {}, "cwd_map": {}, "expires": 0}
# Codex sessions dismissed during this server lifetime (prevents re-discovery)
_dismissed_codex_ids = set()
class AMCHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/" or self.path == "/index.html":
self._serve_preact_dashboard()
elif self.path == "/old" or self.path == "/dashboard.html":
self._serve_dashboard()
elif self.path == "/api/state":
self._serve_state()
elif self.path.startswith("/api/events/"):
session_id = urllib.parse.unquote(self.path[len("/api/events/"):])
self._serve_events(session_id)
elif self.path.startswith("/api/conversation/"):
# Parse session_id and query params
path_part = self.path[len("/api/conversation/"):]
if "?" in path_part:
session_id, query = path_part.split("?", 1)
params = urllib.parse.parse_qs(query)
project_dir = params.get("project_dir", [""])[0]
agent = params.get("agent", ["claude"])[0]
else:
session_id = path_part
project_dir = ""
agent = "claude"
self._serve_conversation(urllib.parse.unquote(session_id), urllib.parse.unquote(project_dir), agent)
else:
self._json_error(404, "Not Found")
def do_POST(self):
if self.path.startswith("/api/dismiss/"):
session_id = urllib.parse.unquote(self.path[len("/api/dismiss/"):])
self._dismiss_session(session_id)
elif self.path.startswith("/api/respond/"):
session_id = urllib.parse.unquote(self.path[len("/api/respond/"):])
self._respond_to_session(session_id)
else:
self._json_error(404, "Not Found")
def do_OPTIONS(self):
# CORS preflight for respond endpoint
self.send_response(204)
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "POST, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type")
self.end_headers()
def _serve_dashboard(self):
try:
content = DASHBOARD_FILE.read_bytes()
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(content)))
self.end_headers()
self.wfile.write(content)
except FileNotFoundError:
self.send_error(500, "dashboard.html not found")
def _serve_preact_dashboard(self):
try:
preact_file = PROJECT_DIR / "dashboard-preact.html"
content = preact_file.read_bytes()
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(content)))
self.end_headers()
self.wfile.write(content)
except FileNotFoundError:
self.send_error(500, "dashboard-preact.html not found")
def _serve_state(self):
sessions = []
SESSIONS_DIR.mkdir(parents=True, exist_ok=True)
# Discover active Codex sessions and create session files for them
self._discover_active_codex_sessions()
# Get active Zellij sessions for liveness check
active_zellij_sessions = self._get_active_zellij_sessions()
for f in SESSIONS_DIR.glob("*.json"):
try:
data = json.loads(f.read_text())
# Proactive liveness check: only auto-delete orphan "starting" sessions.
# Other statuses can still be useful as historical/debug context.
zellij_session = data.get("zellij_session", "")
if zellij_session and active_zellij_sessions is not None:
if zellij_session not in active_zellij_sessions:
if data.get("status") == "starting":
# A missing Zellij session while "starting" indicates an orphan.
f.unlink(missing_ok=True)
continue
sessions.append(data)
except (json.JSONDecodeError, OSError):
continue
# Sort by last_event_at descending
sessions.sort(key=lambda s: s.get("last_event_at", ""), reverse=True)
# Clean orphan event logs (sessions persist until manually dismissed or SessionEnd)
self._cleanup_stale(sessions)
response = json.dumps({
"sessions": sessions,
"server_time": datetime.now(timezone.utc).isoformat(),
}).encode()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Content-Length", str(len(response)))
self.end_headers()
self.wfile.write(response)
def _serve_events(self, session_id):
# Sanitize session_id to prevent path traversal
safe_id = os.path.basename(session_id)
event_file = EVENTS_DIR / f"{safe_id}.jsonl"
events = []
if event_file.exists():
try:
for line in event_file.read_text().splitlines():
if line.strip():
try:
events.append(json.loads(line))
except json.JSONDecodeError:
continue
except OSError:
pass
response = json.dumps({"session_id": safe_id, "events": events}).encode()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Content-Length", str(len(response)))
self.end_headers()
self.wfile.write(response)
def _serve_conversation(self, session_id, project_dir, agent="claude"):
"""Serve conversation history from Claude Code or Codex JSONL file."""
safe_id = os.path.basename(session_id)
messages = []
if agent == "codex":
messages = self._parse_codex_conversation(safe_id)
else:
messages = self._parse_claude_conversation(safe_id, project_dir)
response = json.dumps({"session_id": safe_id, "messages": messages}).encode()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Content-Length", str(len(response)))
self.end_headers()
self.wfile.write(response)
def _parse_claude_conversation(self, session_id, project_dir):
"""Parse Claude Code JSONL conversation format."""
messages = []
# Convert project_dir to Claude's encoded format
# /Users/foo/projects/bar -> -Users-foo-projects-bar
if project_dir:
encoded_dir = project_dir.replace("/", "-")
if not encoded_dir.startswith("-"):
encoded_dir = "-" + encoded_dir
else:
encoded_dir = ""
# Find the conversation file
conv_file = None
if encoded_dir:
conv_file = CLAUDE_PROJECTS_DIR / encoded_dir / f"{session_id}.jsonl"
if conv_file and conv_file.exists():
try:
for line in conv_file.read_text().splitlines():
if not line.strip():
continue
try:
entry = json.loads(line)
msg_type = entry.get("type")
if msg_type == "user":
content = entry.get("message", {}).get("content", "")
# Only include actual human messages (strings), not tool results (arrays)
if content and isinstance(content, str):
messages.append({
"role": "user",
"content": content,
"timestamp": entry.get("timestamp", "")
})
elif msg_type == "assistant":
# Assistant messages have structured content
raw_content = entry.get("message", {}).get("content", [])
text_parts = []
for part in raw_content:
if isinstance(part, dict):
if part.get("type") == "text":
text_parts.append(part.get("text", ""))
elif isinstance(part, str):
text_parts.append(part)
if text_parts:
messages.append({
"role": "assistant",
"content": "\n".join(text_parts),
"timestamp": entry.get("timestamp", "")
})
except json.JSONDecodeError:
continue
except OSError:
pass
return messages
def _parse_codex_conversation(self, session_id):
"""Parse Codex JSONL conversation format."""
messages = []
# Find the Codex session file by searching for files containing the session ID
# Codex files are named: rollout-YYYY-MM-DDTHH-MM-SS-SESSION_ID.jsonl
conv_file = None
if CODEX_SESSIONS_DIR.exists():
for jsonl_file in CODEX_SESSIONS_DIR.rglob("*.jsonl"):
if session_id in jsonl_file.name:
conv_file = jsonl_file
break
if conv_file and conv_file.exists():
try:
for line in conv_file.read_text().splitlines():
if not line.strip():
continue
try:
entry = json.loads(line)
# Codex format: type="response_item", payload.type="message"
if entry.get("type") != "response_item":
continue
payload = entry.get("payload", {})
if payload.get("type") != "message":
continue
role = payload.get("role", "")
content_parts = payload.get("content", [])
# Skip developer role (system context/permissions)
if role == "developer":
continue
# Extract text from content array
text_parts = []
for part in content_parts:
if isinstance(part, dict):
# Codex uses "input_text" for user, "output_text" for assistant
text = part.get("text", "")
if text:
# Skip injected context (AGENTS.md, environment, permissions)
skip_prefixes = (
"<INSTRUCTIONS>",
"<environment_context>",
"<permissions instructions>",
"# AGENTS.md instructions",
)
if any(text.startswith(p) for p in skip_prefixes):
continue
text_parts.append(text)
if text_parts and role in ("user", "assistant"):
messages.append({
"role": role,
"content": "\n".join(text_parts),
"timestamp": entry.get("timestamp", "")
})
except json.JSONDecodeError:
continue
except OSError:
pass
return messages
def _dismiss_session(self, session_id):
"""Delete a session file (manual dismiss from dashboard)."""
safe_id = os.path.basename(session_id)
session_file = SESSIONS_DIR / f"{safe_id}.json"
# Track dismissed Codex sessions to prevent re-discovery
_dismissed_codex_ids.add(safe_id)
session_file.unlink(missing_ok=True)
response = json.dumps({"ok": True}).encode()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Content-Length", str(len(response)))
self.end_headers()
self.wfile.write(response)
def _respond_to_session(self, session_id):
"""Inject a response into the session's Zellij pane."""
safe_id = os.path.basename(session_id)
session_file = SESSIONS_DIR / f"{safe_id}.json"
# Read request body
try:
content_length = int(self.headers.get("Content-Length", 0))
body = json.loads(self.rfile.read(content_length))
text = body.get("text", "")
is_freeform = body.get("freeform", False)
option_count = body.get("optionCount", 0)
except (json.JSONDecodeError, ValueError):
self._json_error(400, "Invalid JSON body")
return
if not text or not text.strip():
self._json_error(400, "Missing or empty 'text' field")
return
# Load session
if not session_file.exists():
self._json_error(404, "Session not found")
return
try:
session = json.loads(session_file.read_text())
except (json.JSONDecodeError, OSError):
self._json_error(500, "Failed to read session")
return
zellij_session = session.get("zellij_session", "")
zellij_pane = session.get("zellij_pane", "")
if not zellij_session or not zellij_pane:
self._json_error(400, "Session missing Zellij pane info - cannot send input without a pane target")
return
# Parse pane ID from "terminal_N" format
pane_id = self._parse_pane_id(zellij_pane)
if pane_id is None:
self._json_error(400, f"Invalid pane format: {zellij_pane}")
return
# For freeform responses, we need two-step injection:
# 1. Send "Other" option number (optionCount + 1) WITHOUT Enter
# 2. Wait for Claude Code to switch to text input mode
# 3. Send the actual text WITH Enter
if is_freeform and option_count > 0:
other_num = str(option_count + 1)
result = self._inject_to_pane(zellij_session, pane_id, other_num, send_enter=False)
if not result["ok"]:
response = json.dumps({"ok": False, "error": result["error"]}).encode()
self.send_response(500)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Content-Length", str(len(response)))
self.end_headers()
self.wfile.write(response)
return
# Delay for Claude Code to switch to text input mode
time.sleep(0.3)
# Inject the actual text (with Enter)
result = self._inject_to_pane(zellij_session, pane_id, text, send_enter=True)
if result["ok"]:
response = json.dumps({"ok": True}).encode()
self.send_response(200)
else:
response = json.dumps({"ok": False, "error": result["error"]}).encode()
self.send_response(500)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Content-Length", str(len(response)))
self.end_headers()
self.wfile.write(response)
def _parse_pane_id(self, zellij_pane):
"""Extract numeric pane ID from various formats."""
if not zellij_pane:
return None
# Try direct integer (e.g., "10")
try:
return int(zellij_pane)
except ValueError:
pass
# Try "terminal_N" format
parts = zellij_pane.split("_")
if len(parts) == 2 and parts[0] in ("terminal", "plugin"):
try:
return int(parts[1])
except ValueError:
pass
return None
def _inject_to_pane(self, zellij_session, pane_id, text, send_enter=True):
"""Inject text into a pane using zellij actions."""
env = os.environ.copy()
env["ZELLIJ_SESSION_NAME"] = zellij_session
# Try plugin first (no focus change), fall back to write-chars (changes focus)
if ZELLIJ_PLUGIN.exists():
result = self._try_plugin_inject(env, pane_id, text, send_enter)
if result["ok"]:
return result
# Plugin failed, fall back to write-chars
return self._try_write_chars_inject(env, text, send_enter)
def _try_plugin_inject(self, env, pane_id, text, send_enter=True):
"""Try injecting via zellij-send-keys plugin (no focus change)."""
payload = json.dumps({
"pane_id": pane_id,
"text": text,
"send_enter": send_enter,
})
try:
result = subprocess.run(
[
"zellij", "action", "pipe",
"--plugin", f"file:{ZELLIJ_PLUGIN}",
"--name", "send_keys",
"--floating-plugin", "false",
"--", payload,
],
env=env,
capture_output=True,
text=True,
timeout=3,
)
if result.returncode == 0:
return {"ok": True}
return {"ok": False, "error": result.stderr or "plugin failed"}
except subprocess.TimeoutExpired:
return {"ok": False, "error": "plugin timed out"}
except Exception as e:
return {"ok": False, "error": str(e)}
def _try_write_chars_inject(self, env, text, send_enter=True):
"""Inject via write-chars (writes to focused pane, simpler but changes focus)."""
try:
# Write the text
result = subprocess.run(
["zellij", "action", "write-chars", text],
env=env,
capture_output=True,
text=True,
timeout=2,
)
if result.returncode != 0:
return {"ok": False, "error": result.stderr or "write-chars failed"}
# Send Enter if requested
if send_enter:
result = subprocess.run(
["zellij", "action", "write", "13"], # 13 = Enter
env=env,
capture_output=True,
text=True,
timeout=2,
)
if result.returncode != 0:
return {"ok": False, "error": result.stderr or "write Enter failed"}
return {"ok": True}
except subprocess.TimeoutExpired:
return {"ok": False, "error": "write-chars timed out"}
except FileNotFoundError:
return {"ok": False, "error": "zellij not found in PATH"}
except Exception as e:
return {"ok": False, "error": str(e)}
def _discover_active_codex_sessions(self):
"""Find active Codex sessions and create/update session files with Zellij pane info."""
if not CODEX_SESSIONS_DIR.exists():
return
# Get Zellij pane info for running codex processes
pid_info, cwd_map = self._get_codex_pane_info()
# Only look at sessions modified in the last 10 minutes (active)
now = time.time()
cutoff = now - 600 # 10 minutes
for jsonl_file in CODEX_SESSIONS_DIR.rglob("*.jsonl"):
try:
# Skip old files
mtime = jsonl_file.stat().st_mtime
if mtime < cutoff:
continue
# Extract session ID from filename
match = re.search(r'([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})', jsonl_file.name)
if not match:
continue
session_id = match.group(1)
# Skip sessions the user has dismissed
if session_id in _dismissed_codex_ids:
continue
session_file = SESSIONS_DIR / f"{session_id}.json"
# Parse first line to get session metadata
with jsonl_file.open() as f:
first_line = f.readline().strip()
if not first_line:
continue
meta = json.loads(first_line)
if meta.get("type") != "session_meta":
continue
payload = meta.get("payload", {})
cwd = payload.get("cwd", "")
project = os.path.basename(cwd) if cwd else "Unknown"
# Match session to Zellij pane (UUID match via lsof, CWD fallback)
zellij_session, zellij_pane = self._match_codex_session_to_pane(
jsonl_file, cwd, pid_info, cwd_map
)
# Determine status based on file age
file_age_minutes = (now - mtime) / 60
if file_age_minutes < 2:
status = "active"
else:
status = "done"
# Read existing session to preserve some fields
existing = {}
if session_file.exists():
try:
existing = json.loads(session_file.read_text())
# Don't downgrade active to done if file was just updated
if existing.get("status") == "active" and status == "done":
# Check if we should keep it active
if file_age_minutes < 5:
status = "active"
except (json.JSONDecodeError, OSError):
pass
# Get last message preview from recent lines
last_message = ""
try:
lines = jsonl_file.read_text().splitlines()[-30:]
for line in reversed(lines):
entry = json.loads(line)
if entry.get("type") == "response_item":
payload_item = entry.get("payload", {})
if payload_item.get("role") == "assistant":
content = payload_item.get("content", [])
for part in content:
if isinstance(part, dict) and part.get("text"):
text = part["text"]
# Skip system content
if not text.startswith("<") and not text.startswith("#"):
last_message = text[:200]
break
if last_message:
break
except (json.JSONDecodeError, OSError):
pass
session_ts = payload.get("timestamp", "")
last_event_at = datetime.fromtimestamp(mtime, tz=timezone.utc).isoformat()
session_data = {
"session_id": session_id,
"agent": "codex",
"project": project,
"project_dir": cwd,
"status": status,
"started_at": existing.get("started_at", session_ts),
"last_event_at": last_event_at,
"last_event": "CodexSession",
"last_message_preview": last_message,
"zellij_session": zellij_session or existing.get("zellij_session", ""),
"zellij_pane": zellij_pane or existing.get("zellij_pane", ""),
}
session_file.write_text(json.dumps(session_data, indent=2))
except (OSError, json.JSONDecodeError):
continue
def _get_codex_pane_info(self):
"""Get Zellij pane info for running codex processes via process inspection.
Extracts ZELLIJ_PANE_ID from each codex process's inherited environment,
since zellij dump-layout doesn't provide pane IDs.
Results are cached for 5 seconds to avoid running pgrep/ps/lsof on
every dashboard poll.
Returns:
tuple: (pid_info, cwd_map)
pid_info: {pid_str: {"pane_id": str, "zellij_session": str}}
cwd_map: {cwd_path: {"session": str, "pane_id": str}}
"""
now = time.time()
if now < _codex_pane_cache["expires"]:
return _codex_pane_cache["pid_info"], _codex_pane_cache["cwd_map"]
pid_info = {}
cwd_map = {}
try:
# Step 1: Find codex process PIDs
result = subprocess.run(
["pgrep", "-x", "codex"],
capture_output=True, text=True, timeout=2,
)
pids = [p.strip() for p in result.stdout.strip().splitlines() if p.strip()] if result.returncode == 0 else []
# Step 2: Extract ZELLIJ env vars from each process
for pid in pids:
try:
env_result = subprocess.run(
["ps", "eww", "-o", "args=", "-p", pid],
capture_output=True, text=True, timeout=2,
)
if env_result.returncode != 0:
continue
env_str = env_result.stdout
pane_match = re.search(r'ZELLIJ_PANE_ID=(\d+)', env_str)
session_match = re.search(r'ZELLIJ_SESSION_NAME=(\S+)', env_str)
if pane_match and session_match:
pid_info[pid] = {
"pane_id": pane_match.group(1),
"zellij_session": session_match.group(1),
}
except (subprocess.TimeoutExpired, Exception):
continue
# Step 3: Get CWDs via single batched lsof call
if pid_info:
pid_list = ",".join(pid_info.keys())
try:
cwd_result = subprocess.run(
["lsof", "-a", "-p", pid_list, "-d", "cwd", "-Fn"],
capture_output=True, text=True, timeout=3,
)
if cwd_result.returncode == 0:
current_pid = None
for line in cwd_result.stdout.splitlines():
if line.startswith("p"):
current_pid = line[1:]
elif line.startswith("n/") and current_pid and current_pid in pid_info:
cwd = line[1:]
info = pid_info[current_pid]
cwd_map[cwd] = {
"session": info["zellij_session"],
"pane_id": info["pane_id"],
}
except (subprocess.TimeoutExpired, Exception):
pass
except (subprocess.TimeoutExpired, FileNotFoundError, Exception):
pass
_codex_pane_cache["pid_info"] = pid_info
_codex_pane_cache["cwd_map"] = cwd_map
_codex_pane_cache["expires"] = now + 5 # Cache for 5 seconds
return pid_info, cwd_map
def _match_codex_session_to_pane(self, session_file, session_cwd, pid_info, cwd_map):
"""Match a Codex session file to a Zellij pane.
Tries session-file-to-PID matching first (via lsof), falls back to CWD.
Returns:
tuple: (zellij_session, pane_id) or ("", "")
"""
# Try precise match: which process has this session file open?
try:
result = subprocess.run(
["lsof", "-t", str(session_file)],
capture_output=True, text=True, timeout=2,
)
if result.returncode == 0 and result.stdout.strip():
for pid in result.stdout.strip().splitlines():
pid = pid.strip()
if pid in pid_info:
info = pid_info[pid]
return info["zellij_session"], info["pane_id"]
except (subprocess.TimeoutExpired, Exception):
pass
# Fall back to CWD match
normalized_cwd = os.path.normpath(session_cwd) if session_cwd else ""
for pane_cwd, info in cwd_map.items():
if os.path.normpath(pane_cwd) == normalized_cwd:
return info["session"], info["pane_id"]
return "", ""
def _json_error(self, code, message):
"""Send a JSON error response."""
response = json.dumps({"ok": False, "error": message}).encode()
self.send_response(code)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Content-Length", str(len(response)))
self.end_headers()
self.wfile.write(response)
def _get_active_zellij_sessions(self):
"""Query Zellij for active sessions. Returns set of session names, or None on error."""
now = time.time()
# Use cached value if fresh (cache for 5 seconds to avoid hammering zellij)
if _zellij_cache["sessions"] is not None and now < _zellij_cache["expires"]:
return _zellij_cache["sessions"]
try:
result = subprocess.run(
["zellij", "list-sessions", "--no-formatting"],
capture_output=True,
text=True,
timeout=2,
)
if result.returncode == 0:
# Parse session names (one per line, format: "session_name [created ...]" or just "session_name")
sessions = set()
for line in result.stdout.strip().splitlines():
if line:
# Session name is the first word
session_name = line.split()[0] if line.split() else ""
if session_name:
sessions.add(session_name)
_zellij_cache["sessions"] = sessions
_zellij_cache["expires"] = now + 5 # Cache for 5 seconds
return sessions
except (subprocess.TimeoutExpired, FileNotFoundError, Exception):
pass
return None # Return None on error (don't clean up if we can't verify)
def _cleanup_stale(self, sessions):
"""Remove orphan event logs >24h and stale 'starting' sessions >1h."""
active_ids = {s.get("session_id") for s in sessions if s.get("session_id")}
now = time.time()
# Clean up orphan event logs
EVENTS_DIR.mkdir(parents=True, exist_ok=True)
for f in EVENTS_DIR.glob("*.jsonl"):
session_id = f.stem
if session_id not in active_ids:
try:
age = now - f.stat().st_mtime
if age > STALE_EVENT_AGE:
f.unlink()
except OSError:
pass
# Clean up orphan "starting" sessions (never became active)
for f in SESSIONS_DIR.glob("*.json"):
try:
age = now - f.stat().st_mtime
if age > STALE_STARTING_AGE:
data = json.loads(f.read_text())
if data.get("status") == "starting":
f.unlink()
except (json.JSONDecodeError, OSError):
pass
def log_message(self, format, *args):
"""Suppress default request logging to keep output clean."""
pass
def main():
server = HTTPServer(("127.0.0.1", PORT), AMCHandler)
print(f"AMC server listening on http://127.0.0.1:{PORT}")
# Write PID file
pid_file = DATA_DIR / "server.pid"
pid_file.write_text(str(os.getpid()))
try:
server.serve_forever()
except KeyboardInterrupt:
pass
finally:
pid_file.unlink(missing_ok=True)
server.server_close()
if __name__ == "__main__":
main()