diff --git a/amc_server/__init__.py b/amc_server/__init__.py new file mode 100644 index 0000000..f826171 --- /dev/null +++ b/amc_server/__init__.py @@ -0,0 +1,3 @@ +from amc_server.server import main + +__all__ = ["main"] diff --git a/amc_server/context.py b/amc_server/context.py new file mode 100644 index 0000000..78daed6 --- /dev/null +++ b/amc_server/context.py @@ -0,0 +1,47 @@ +from pathlib import Path +import threading + +# Claude Code conversation directory +CLAUDE_PROJECTS_DIR = Path.home() / ".claude" / "projects" + +# Codex conversation directory +CODEX_SESSIONS_DIR = Path.home() / ".codex" / "sessions" + +# Plugin path for zellij-send-keys +ZELLIJ_PLUGIN = Path.home() / ".config" / "zellij" / "plugins" / "zellij-send-keys.wasm" + +# Runtime data lives in XDG data dir +DATA_DIR = Path.home() / ".local" / "share" / "amc" +SESSIONS_DIR = DATA_DIR / "sessions" +EVENTS_DIR = DATA_DIR / "events" + +# Source files live in project directory (relative to this module) +PROJECT_DIR = Path(__file__).resolve().parent.parent +DASHBOARD_DIR = PROJECT_DIR / "dashboard" + +PORT = 7400 +STALE_EVENT_AGE = 86400 # 24 hours in seconds +STALE_STARTING_AGE = 3600 # 1 hour - sessions stuck in "starting" are orphans +CODEX_ACTIVE_WINDOW = 600 # 10 minutes - only discover recently-active Codex sessions + +# Cache for Zellij session list (avoid calling zellij on every request) +_zellij_cache = {"sessions": None, "expires": 0} + +# Cache for Codex pane info (avoid running pgrep/ps/lsof on every request) +_codex_pane_cache = {"pid_info": {}, "cwd_map": {}, "expires": 0} + +# Cache for parsed context usage by transcript file path + mtime/size +# Limited to prevent unbounded memory growth +_context_usage_cache = {} +_CONTEXT_CACHE_MAX = 100 + +# Cache mapping Codex session IDs to transcript paths (or None when missing) +_codex_transcript_cache = {} +_CODEX_CACHE_MAX = 200 + +# Codex sessions dismissed during this server lifetime (prevents re-discovery) +_dismissed_codex_ids = set() +_DISMISSED_MAX = 500 + +# Serialize state collection because it mutates session files/caches. +_state_lock = threading.Lock() diff --git a/amc_server/handler.py b/amc_server/handler.py new file mode 100644 index 0000000..76dccb5 --- /dev/null +++ b/amc_server/handler.py @@ -0,0 +1,27 @@ +from http.server import BaseHTTPRequestHandler + +from amc_server.mixins.conversation import ConversationMixin +from amc_server.mixins.control import SessionControlMixin +from amc_server.mixins.discovery import SessionDiscoveryMixin +from amc_server.mixins.http import HttpMixin +from amc_server.mixins.parsing import SessionParsingMixin +from amc_server.mixins.state import StateMixin + + +class AMCHandler( + HttpMixin, + StateMixin, + ConversationMixin, + SessionControlMixin, + SessionDiscoveryMixin, + SessionParsingMixin, + BaseHTTPRequestHandler, +): + """HTTP handler composed from focused mixins.""" + + def handle(self): + """Ignore expected disconnect noise from short-lived HTTP/SSE clients.""" + try: + super().handle() + except (ConnectionResetError, BrokenPipeError): + return diff --git a/amc_server/logging_utils.py b/amc_server/logging_utils.py new file mode 100644 index 0000000..a8499b8 --- /dev/null +++ b/amc_server/logging_utils.py @@ -0,0 +1,31 @@ +import logging +import signal +import sys +import threading + +LOGGER = logging.getLogger("amc.server") + + +def configure_logging(): + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(message)s", + stream=sys.stdout, + force=True, + ) + + +def install_signal_handlers(server): + """Log termination signals and shut down cleanly.""" + + def _handle(signum, _frame): + LOGGER.warning("Received signal %s, shutting down AMC server", signum) + # shutdown() blocks until serve_forever loop wakes; call from a helper thread. + threading.Thread(target=server.shutdown, daemon=True).start() + + for sig in (signal.SIGTERM, signal.SIGINT, signal.SIGHUP): + try: + signal.signal(sig, _handle) + except (ValueError, OSError, RuntimeError): + # Some environments may restrict signal registration. + continue diff --git a/amc_server/mixins/__init__.py b/amc_server/mixins/__init__.py new file mode 100644 index 0000000..149a0b8 --- /dev/null +++ b/amc_server/mixins/__init__.py @@ -0,0 +1 @@ +# Mixin package for AMC server handler composition. diff --git a/amc_server/mixins/control.py b/amc_server/mixins/control.py new file mode 100644 index 0000000..aab9439 --- /dev/null +++ b/amc_server/mixins/control.py @@ -0,0 +1,232 @@ +import json +import os +import subprocess +import time + +from amc_server.context import SESSIONS_DIR, ZELLIJ_PLUGIN, _DISMISSED_MAX, _dismissed_codex_ids +from amc_server.logging_utils import LOGGER + + +class SessionControlMixin: + def _dismiss_session(self, session_id): + """Delete a session file (manual dismiss from dashboard).""" + safe_id = os.path.basename(session_id) + session_file = SESSIONS_DIR / f"{safe_id}.json" + # Track dismissed Codex sessions to prevent re-discovery + # Evict oldest entries if set is full (prevents unbounded growth) + while len(_dismissed_codex_ids) >= _DISMISSED_MAX: + _dismissed_codex_ids.pop() + _dismissed_codex_ids.add(safe_id) + session_file.unlink(missing_ok=True) + self._send_json(200, {"ok": True}) + + def _respond_to_session(self, session_id): + """Inject a response into the session's Zellij pane.""" + safe_id = os.path.basename(session_id) + session_file = SESSIONS_DIR / f"{safe_id}.json" + + # Read request body + try: + content_length = int(self.headers.get("Content-Length", 0)) + body = json.loads(self.rfile.read(content_length)) + if not isinstance(body, dict): + self._json_error(400, "Invalid JSON body") + return + text = body.get("text", "") + is_freeform = body.get("freeform", False) + try: + option_count = int(body.get("optionCount", 0)) + except (TypeError, ValueError): + option_count = 0 + except (json.JSONDecodeError, ValueError): + self._json_error(400, "Invalid JSON body") + return + + if not isinstance(text, str): + self._json_error(400, "Missing or empty 'text' field") + return + if not text or not text.strip(): + self._json_error(400, "Missing or empty 'text' field") + return + + # Load session + if not session_file.exists(): + self._json_error(404, "Session not found") + return + + try: + session = json.loads(session_file.read_text()) + if not isinstance(session, dict): + self._json_error(500, "Failed to read session") + return + except (json.JSONDecodeError, OSError): + self._json_error(500, "Failed to read session") + return + + zellij_session = session.get("zellij_session", "") + zellij_pane = session.get("zellij_pane", "") + + if not zellij_session or not zellij_pane: + self._json_error(400, "Session missing Zellij pane info - cannot send input without a pane target") + return + + # Parse pane ID from "terminal_N" format + pane_id = self._parse_pane_id(zellij_pane) + if pane_id is None: + self._json_error(400, f"Invalid pane format: {zellij_pane}") + return + + # For freeform responses, we need two-step injection: + # 1. Send "Other" option number (optionCount + 1) WITHOUT Enter + # 2. Wait for Claude Code to switch to text input mode + # 3. Send the actual text WITH Enter + if is_freeform and option_count > 0: + other_num = str(option_count + 1) + result = self._inject_to_pane(zellij_session, pane_id, other_num, send_enter=False) + if not result["ok"]: + self._send_json(500, {"ok": False, "error": f"Failed to activate freeform mode: {result['error']}"}) + return + # Delay for Claude Code to switch to text input mode + time.sleep(0.3) + + # Inject the actual text (with Enter) + result = self._inject_to_pane(zellij_session, pane_id, text, send_enter=True) + + if result["ok"]: + self._send_json(200, {"ok": True}) + else: + self._send_json(500, {"ok": False, "error": result["error"]}) + + def _parse_pane_id(self, zellij_pane): + """Extract numeric pane ID from various formats.""" + if not zellij_pane: + return None + + # Try direct integer (e.g., "10") + try: + return int(zellij_pane) + except ValueError: + pass + + # Try "terminal_N" format + parts = zellij_pane.split("_") + if len(parts) == 2 and parts[0] in ("terminal", "plugin"): + try: + return int(parts[1]) + except ValueError: + pass + + return None + + def _inject_to_pane(self, zellij_session, pane_id, text, send_enter=True): + """Inject text into a pane using zellij actions.""" + env = os.environ.copy() + env["ZELLIJ_SESSION_NAME"] = zellij_session + # Best-effort: some zellij actions respect this pane env. + env["ZELLIJ_PANE_ID"] = f"terminal_{pane_id}" + + # Pane-accurate routing requires the plugin. + if ZELLIJ_PLUGIN.exists(): + result = self._try_plugin_inject(env, pane_id, text, send_enter) + if result["ok"]: + return result + LOGGER.warning( + "Plugin injection failed for session=%s pane=%s: %s", + zellij_session, + pane_id, + result.get("error", "unknown error"), + ) + else: + LOGGER.warning("Zellij plugin missing at %s", ZELLIJ_PLUGIN) + + # `write-chars` targets whichever pane is focused, which is unsafe for AMC. + if self._allow_unsafe_write_chars_fallback(): + LOGGER.warning("Using unsafe write-chars fallback (focused pane only)") + return self._try_write_chars_inject(env, text, send_enter) + + return { + "ok": False, + "error": ( + "Pane-targeted injection requires zellij-send-keys plugin; " + "set AMC_ALLOW_UNSAFE_WRITE_CHARS_FALLBACK=1 to force focused-pane fallback" + ), + } + + def _allow_unsafe_write_chars_fallback(self): + value = os.environ.get("AMC_ALLOW_UNSAFE_WRITE_CHARS_FALLBACK", "").strip().lower() + return value in ("1", "true", "yes", "on") + + def _try_plugin_inject(self, env, pane_id, text, send_enter=True): + """Try injecting via zellij-send-keys plugin (no focus change).""" + payload = json.dumps({ + "pane_id": pane_id, + "text": text, + "send_enter": bool(send_enter), + }) + + try: + result = subprocess.run( + [ + "zellij", + "action", + "pipe", + "--plugin", + f"file:{ZELLIJ_PLUGIN}", + "--name", + "send_keys", + "--floating-plugin", + "false", + "--", + payload, + ], + env=env, + capture_output=True, + text=True, + timeout=3, + ) + + if result.returncode == 0: + return {"ok": True} + return {"ok": False, "error": result.stderr or "plugin failed"} + + except subprocess.TimeoutExpired: + return {"ok": False, "error": "plugin timed out"} + except Exception as e: + return {"ok": False, "error": str(e)} + + def _try_write_chars_inject(self, env, text, send_enter=True): + """Inject via write-chars (UNSAFE: writes to focused pane).""" + try: + # Write the text + result = subprocess.run( + ["zellij", "action", "write-chars", text], + env=env, + capture_output=True, + text=True, + timeout=2, + ) + + if result.returncode != 0: + return {"ok": False, "error": result.stderr or "write-chars failed"} + + # Send Enter if requested + if send_enter: + result = subprocess.run( + ["zellij", "action", "write", "13"], # 13 = Enter + env=env, + capture_output=True, + text=True, + timeout=2, + ) + + if result.returncode != 0: + return {"ok": False, "error": result.stderr or "write Enter failed"} + + return {"ok": True} + + except subprocess.TimeoutExpired: + return {"ok": False, "error": "write-chars timed out"} + except FileNotFoundError: + return {"ok": False, "error": "zellij not found in PATH"} + except Exception as e: + return {"ok": False, "error": str(e)} diff --git a/amc_server/mixins/conversation.py b/amc_server/mixins/conversation.py new file mode 100644 index 0000000..ef0e512 --- /dev/null +++ b/amc_server/mixins/conversation.py @@ -0,0 +1,175 @@ +import json +import os + +from amc_server.context import EVENTS_DIR + + +class ConversationMixin: + def _serve_events(self, session_id): + # Sanitize session_id to prevent path traversal + safe_id = os.path.basename(session_id) + event_file = EVENTS_DIR / f"{safe_id}.jsonl" + + events = [] + if event_file.exists(): + try: + for line in event_file.read_text().splitlines(): + if line.strip(): + try: + events.append(json.loads(line)) + except json.JSONDecodeError: + continue + except OSError: + pass + + self._send_json(200, {"session_id": safe_id, "events": events}) + + def _serve_conversation(self, session_id, project_dir, agent="claude"): + """Serve conversation history from Claude Code or Codex JSONL file.""" + safe_id = os.path.basename(session_id) + messages = [] + + if agent == "codex": + messages = self._parse_codex_conversation(safe_id) + else: + messages = self._parse_claude_conversation(safe_id, project_dir) + + self._send_json(200, {"session_id": safe_id, "messages": messages}) + + def _parse_claude_conversation(self, session_id, project_dir): + """Parse Claude Code JSONL conversation format.""" + messages = [] + + conv_file = self._get_claude_conversation_file(session_id, project_dir) + + if conv_file and conv_file.exists(): + try: + for line in conv_file.read_text().splitlines(): + if not line.strip(): + continue + try: + entry = json.loads(line) + if not isinstance(entry, dict): + continue + msg_type = entry.get("type") + + if msg_type == "user": + content = entry.get("message", {}).get("content", "") + # Only include actual human messages (strings), not tool results (arrays) + if content and isinstance(content, str): + messages.append({ + "role": "user", + "content": content, + "timestamp": entry.get("timestamp", ""), + }) + + elif msg_type == "assistant": + # Assistant messages have structured content + message = entry.get("message", {}) + if not isinstance(message, dict): + continue + raw_content = message.get("content", []) + if not isinstance(raw_content, list): + continue + text_parts = [] + tool_calls = [] + thinking_parts = [] + for part in raw_content: + if isinstance(part, dict): + ptype = part.get("type") + if ptype == "text": + text_parts.append(part.get("text", "")) + elif ptype == "tool_use": + tool_calls.append({ + "name": part.get("name", "unknown"), + "input": part.get("input", {}), + }) + elif ptype == "thinking": + thinking_parts.append(part.get("thinking", "")) + elif isinstance(part, str): + text_parts.append(part) + if text_parts or tool_calls or thinking_parts: + msg = { + "role": "assistant", + "content": "\n".join(text_parts) if text_parts else "", + "timestamp": entry.get("timestamp", ""), + } + if tool_calls: + msg["tool_calls"] = tool_calls + if thinking_parts: + msg["thinking"] = "\n\n".join(thinking_parts) + messages.append(msg) + + except json.JSONDecodeError: + continue + except OSError: + pass + + return messages + + def _parse_codex_conversation(self, session_id): + """Parse Codex JSONL conversation format.""" + messages = [] + + conv_file = self._find_codex_transcript_file(session_id) + + if conv_file and conv_file.exists(): + try: + for line in conv_file.read_text().splitlines(): + if not line.strip(): + continue + try: + entry = json.loads(line) + if not isinstance(entry, dict): + continue + + # Codex format: type="response_item", payload.type="message" + if entry.get("type") != "response_item": + continue + + payload = entry.get("payload", {}) + if not isinstance(payload, dict): + continue + if payload.get("type") != "message": + continue + + role = payload.get("role", "") + content_parts = payload.get("content", []) + if not isinstance(content_parts, list): + continue + + # Skip developer role (system context/permissions) + if role == "developer": + continue + + # Extract text from content array + text_parts = [] + for part in content_parts: + if isinstance(part, dict): + # Codex uses "input_text" for user, "output_text" for assistant + text = part.get("text", "") + if text: + # Skip injected context (AGENTS.md, environment, permissions) + skip_prefixes = ( + "", + "", + "", + "# AGENTS.md instructions", + ) + if any(text.startswith(p) for p in skip_prefixes): + continue + text_parts.append(text) + + if text_parts and role in ("user", "assistant"): + messages.append({ + "role": role, + "content": "\n".join(text_parts), + "timestamp": entry.get("timestamp", ""), + }) + + except json.JSONDecodeError: + continue + except OSError: + pass + + return messages diff --git a/amc_server/mixins/discovery.py b/amc_server/mixins/discovery.py new file mode 100644 index 0000000..31ee280 --- /dev/null +++ b/amc_server/mixins/discovery.py @@ -0,0 +1,281 @@ +import json +import os +import re +import subprocess +import time +from datetime import datetime, timezone + +from amc_server.context import ( + CODEX_ACTIVE_WINDOW, + CODEX_SESSIONS_DIR, + SESSIONS_DIR, + _CODEX_CACHE_MAX, + _codex_pane_cache, + _codex_transcript_cache, + _dismissed_codex_ids, +) +from amc_server.logging_utils import LOGGER + + +class SessionDiscoveryMixin: + def _discover_active_codex_sessions(self): + """Find active Codex sessions and create/update session files with Zellij pane info.""" + if not CODEX_SESSIONS_DIR.exists(): + return + + # Get Zellij pane info for running codex processes + pid_info, cwd_map = self._get_codex_pane_info() + + # Only look at sessions modified recently (active) + now = time.time() + cutoff = now - CODEX_ACTIVE_WINDOW + + for jsonl_file in CODEX_SESSIONS_DIR.rglob("*.jsonl"): + try: + # Skip old files + mtime = jsonl_file.stat().st_mtime + if mtime < cutoff: + continue + + # Extract session ID from filename + match = re.search(r"([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})", jsonl_file.name) + if not match: + continue + + session_id = match.group(1) + # Evict old entries if cache is full (simple FIFO) + if len(_codex_transcript_cache) >= _CODEX_CACHE_MAX: + keys_to_remove = list(_codex_transcript_cache.keys())[: _CODEX_CACHE_MAX // 5] + for k in keys_to_remove: + _codex_transcript_cache.pop(k, None) + _codex_transcript_cache[session_id] = str(jsonl_file) + + # Skip sessions the user has dismissed + if session_id in _dismissed_codex_ids: + continue + + session_file = SESSIONS_DIR / f"{session_id}.json" + + # Parse first line to get session metadata + with jsonl_file.open() as f: + first_line = f.readline().strip() + if not first_line: + continue + + meta = json.loads(first_line) + if not isinstance(meta, dict): + continue + if meta.get("type") != "session_meta": + continue + + payload = self._as_dict(meta.get("payload")) + cwd = payload.get("cwd", "") + project = os.path.basename(cwd) if cwd else "Unknown" + + # Match session to Zellij pane (UUID match via lsof, CWD fallback) + zellij_session, zellij_pane = self._match_codex_session_to_pane( + jsonl_file, cwd, pid_info, cwd_map + ) + + # Determine status based on file age + file_age_minutes = (now - mtime) / 60 + if file_age_minutes < 2: + status = "active" + else: + status = "done" + + # Read existing session to preserve some fields + existing = {} + if session_file.exists(): + try: + loaded_existing = json.loads(session_file.read_text()) + if isinstance(loaded_existing, dict): + existing = loaded_existing + # Don't downgrade active to done if file was just updated + if existing.get("status") == "active" and status == "done": + # Check if we should keep it active + if file_age_minutes < 5: + status = "active" + except (json.JSONDecodeError, OSError): + pass + + # Get last message preview from recent lines + last_message = "" + try: + tail_entries = self._read_jsonl_tail_entries(jsonl_file, max_lines=60, max_bytes=800 * 1024) + for entry in reversed(tail_entries): + if not isinstance(entry, dict): + continue + if entry.get("type") == "response_item": + payload_item = self._as_dict(entry.get("payload")) + if payload_item.get("role") == "assistant": + content = payload_item.get("content", []) + if not isinstance(content, list): + continue + for part in content: + if isinstance(part, dict) and part.get("text"): + text = part["text"] + # Skip system content + if not text.startswith("<") and not text.startswith("#"): + last_message = text[:200] + break + if last_message: + break + except (json.JSONDecodeError, OSError): + pass + + context_usage = self._get_cached_context_usage( + jsonl_file, self._parse_codex_context_usage_from_file + ) + + session_ts = payload.get("timestamp", "") + last_event_at = datetime.fromtimestamp(mtime, tz=timezone.utc).isoformat() + + session_data = { + "session_id": session_id, + "agent": "codex", + "project": project, + "project_dir": cwd, + "status": status, + "started_at": existing.get("started_at", session_ts), + "last_event_at": last_event_at, + "last_event": "CodexSession", + "last_message_preview": last_message, + "zellij_session": zellij_session or existing.get("zellij_session", ""), + "zellij_pane": zellij_pane or existing.get("zellij_pane", ""), + "transcript_path": str(jsonl_file), + } + if context_usage: + session_data["context_usage"] = context_usage + elif existing.get("context_usage"): + session_data["context_usage"] = existing.get("context_usage") + + session_file.write_text(json.dumps(session_data, indent=2)) + + except (OSError, json.JSONDecodeError): + continue + except Exception: + LOGGER.exception("Failed to discover Codex session from %s", jsonl_file) + continue + + def _get_codex_pane_info(self): + """Get Zellij pane info for running codex processes via process inspection. + + Extracts ZELLIJ_PANE_ID from each codex process's inherited environment, + since zellij dump-layout doesn't provide pane IDs. + + Results are cached for 5 seconds to avoid running pgrep/ps/lsof on + every dashboard poll. + + Returns: + tuple: (pid_info, cwd_map) + pid_info: {pid_str: {"pane_id": str, "zellij_session": str}} + cwd_map: {cwd_path: {"session": str, "pane_id": str}} + """ + now = time.time() + if now < _codex_pane_cache["expires"]: + return _codex_pane_cache["pid_info"], _codex_pane_cache["cwd_map"] + + pid_info = {} + cwd_map = {} + + try: + # Step 1: Find codex process PIDs + result = subprocess.run( + ["pgrep", "-x", "codex"], + capture_output=True, + text=True, + timeout=2, + ) + pids = [p.strip() for p in result.stdout.strip().splitlines() if p.strip()] if result.returncode == 0 else [] + + # Step 2: Extract ZELLIJ env vars from each process + for pid in pids: + try: + env_result = subprocess.run( + ["ps", "eww", "-o", "args=", "-p", pid], + capture_output=True, + text=True, + timeout=2, + ) + if env_result.returncode != 0: + continue + + env_str = env_result.stdout + pane_match = re.search(r"ZELLIJ_PANE_ID=(\d+)", env_str) + session_match = re.search(r"ZELLIJ_SESSION_NAME=(\S+)", env_str) + + if pane_match and session_match: + pid_info[pid] = { + "pane_id": pane_match.group(1), + "zellij_session": session_match.group(1), + } + except (subprocess.TimeoutExpired, Exception): + continue + + # Step 3: Get CWDs via single batched lsof call + if pid_info: + pid_list = ",".join(pid_info.keys()) + try: + cwd_result = subprocess.run( + ["lsof", "-a", "-p", pid_list, "-d", "cwd", "-Fn"], + capture_output=True, + text=True, + timeout=3, + ) + if cwd_result.returncode == 0: + current_pid = None + for line in cwd_result.stdout.splitlines(): + if line.startswith("p"): + current_pid = line[1:] + elif line.startswith("n/") and current_pid and current_pid in pid_info: + cwd = line[1:] + info = pid_info[current_pid] + cwd_map[cwd] = { + "session": info["zellij_session"], + "pane_id": info["pane_id"], + } + except (subprocess.TimeoutExpired, Exception): + pass + + except (subprocess.TimeoutExpired, FileNotFoundError, Exception): + pass + + _codex_pane_cache["pid_info"] = pid_info + _codex_pane_cache["cwd_map"] = cwd_map + _codex_pane_cache["expires"] = now + 5 # Cache for 5 seconds + + return pid_info, cwd_map + + def _match_codex_session_to_pane(self, session_file, session_cwd, pid_info, cwd_map): + """Match a Codex session file to a Zellij pane. + + Tries session-file-to-PID matching first (via lsof), falls back to CWD. + + Returns: + tuple: (zellij_session, pane_id) or ("", "") + """ + # Try precise match: which process has this session file open? + try: + result = subprocess.run( + ["lsof", "-t", str(session_file)], + capture_output=True, + text=True, + timeout=2, + ) + if result.returncode == 0 and result.stdout.strip(): + for pid in result.stdout.strip().splitlines(): + pid = pid.strip() + if pid in pid_info: + info = pid_info[pid] + return info["zellij_session"], info["pane_id"] + except (subprocess.TimeoutExpired, Exception): + pass + + # Fall back to CWD match + normalized_cwd = os.path.normpath(session_cwd) if session_cwd else "" + for pane_cwd, info in cwd_map.items(): + if os.path.normpath(pane_cwd) == normalized_cwd: + return info["session"], info["pane_id"] + + return "", "" diff --git a/amc_server/mixins/http.py b/amc_server/mixins/http.py new file mode 100644 index 0000000..e184809 --- /dev/null +++ b/amc_server/mixins/http.py @@ -0,0 +1,140 @@ +import json +import urllib.parse + +from amc_server.context import DASHBOARD_DIR +from amc_server.logging_utils import LOGGER + + +class HttpMixin: + def _send_bytes_response(self, code, content, content_type="application/json", extra_headers=None): + """Send a generic byte response; ignore expected disconnect errors.""" + try: + self.send_response(code) + self.send_header("Content-Type", content_type) + if extra_headers: + for key, value in extra_headers.items(): + self.send_header(key, value) + self.send_header("Content-Length", str(len(content))) + self.end_headers() + self.wfile.write(content) + return True + except (BrokenPipeError, ConnectionResetError, OSError): + return False + + def _send_json(self, code, payload): + """Send JSON response with CORS header.""" + content = json.dumps(payload).encode() + return self._send_bytes_response( + code, + content, + content_type="application/json", + extra_headers={"Access-Control-Allow-Origin": "*"}, + ) + + def do_GET(self): + try: + if self.path == "/" or self.path == "/index.html": + self._serve_dashboard_file("index.html") + elif self.path.startswith("/") and not self.path.startswith("/api/"): + # Serve static files from dashboard directory + file_path = self.path.lstrip("/") + if file_path and ".." not in file_path: + self._serve_dashboard_file(file_path) + else: + self._json_error(404, "Not Found") + elif self.path == "/api/state": + self._serve_state() + elif self.path == "/api/stream": + self._serve_stream() + elif self.path.startswith("/api/events/"): + session_id = urllib.parse.unquote(self.path[len("/api/events/"):]) + self._serve_events(session_id) + elif self.path.startswith("/api/conversation/"): + # Parse session_id and query params + path_part = self.path[len("/api/conversation/"):] + if "?" in path_part: + session_id, query = path_part.split("?", 1) + params = urllib.parse.parse_qs(query) + project_dir = params.get("project_dir", [""])[0] + agent = params.get("agent", ["claude"])[0] + else: + session_id = path_part + project_dir = "" + agent = "claude" + self._serve_conversation(urllib.parse.unquote(session_id), urllib.parse.unquote(project_dir), agent) + else: + self._json_error(404, "Not Found") + except Exception: + LOGGER.exception("Unhandled GET error for path=%s", self.path) + try: + self._json_error(500, "Internal Server Error") + except Exception: + pass + + def do_POST(self): + try: + if self.path.startswith("/api/dismiss/"): + session_id = urllib.parse.unquote(self.path[len("/api/dismiss/"):]) + self._dismiss_session(session_id) + elif self.path.startswith("/api/respond/"): + session_id = urllib.parse.unquote(self.path[len("/api/respond/"):]) + self._respond_to_session(session_id) + else: + self._json_error(404, "Not Found") + except Exception: + LOGGER.exception("Unhandled POST error for path=%s", self.path) + try: + self._json_error(500, "Internal Server Error") + except Exception: + pass + + def do_OPTIONS(self): + # CORS preflight for respond endpoint + self.send_response(204) + self.send_header("Access-Control-Allow-Origin", "*") + self.send_header("Access-Control-Allow-Methods", "POST, OPTIONS") + self.send_header("Access-Control-Allow-Headers", "Content-Type") + self.end_headers() + + def _serve_dashboard_file(self, file_path): + """Serve a static file from the dashboard directory.""" + # Content type mapping + content_types = { + ".html": "text/html; charset=utf-8", + ".css": "text/css; charset=utf-8", + ".js": "application/javascript; charset=utf-8", + ".json": "application/json; charset=utf-8", + ".svg": "image/svg+xml", + ".png": "image/png", + ".ico": "image/x-icon", + } + + try: + full_path = DASHBOARD_DIR / file_path + # Security: ensure path doesn't escape dashboard directory + full_path = full_path.resolve() + if not str(full_path).startswith(str(DASHBOARD_DIR.resolve())): + self._json_error(403, "Forbidden") + return + + content = full_path.read_bytes() + ext = full_path.suffix.lower() + content_type = content_types.get(ext, "application/octet-stream") + + # No caching during development + self._send_bytes_response( + 200, + content, + content_type=content_type, + extra_headers={"Cache-Control": "no-cache, no-store, must-revalidate"}, + ) + except FileNotFoundError: + self._json_error(404, f"File not found: {file_path}") + + def _json_error(self, code, message): + """Send a JSON error response.""" + self._send_json(code, {"ok": False, "error": message}) + + def log_message(self, format, *args): + """Suppress default request logging to keep output clean.""" + pass diff --git a/amc_server/mixins/parsing.py b/amc_server/mixins/parsing.py new file mode 100644 index 0000000..fda7245 --- /dev/null +++ b/amc_server/mixins/parsing.py @@ -0,0 +1,268 @@ +import json +import os +from pathlib import Path + +from amc_server.context import ( + CLAUDE_PROJECTS_DIR, + CODEX_SESSIONS_DIR, + _CONTEXT_CACHE_MAX, + _codex_transcript_cache, + _context_usage_cache, +) +from amc_server.logging_utils import LOGGER + + +class SessionParsingMixin: + def _get_claude_conversation_file(self, session_id, project_dir): + """Resolve Claude conversation file path from session id + project dir.""" + if not project_dir: + return None + + encoded_dir = project_dir.replace("/", "-") + if not encoded_dir.startswith("-"): + encoded_dir = "-" + encoded_dir + + conv_file = CLAUDE_PROJECTS_DIR / encoded_dir / f"{session_id}.jsonl" + return conv_file if conv_file.exists() else None + + def _find_codex_transcript_file(self, session_id): + """Resolve Codex transcript path for a session id with lightweight caching.""" + if not session_id: + return None + + if session_id in _codex_transcript_cache: + cached = _codex_transcript_cache.get(session_id) + if cached: + path = Path(cached) + if path.exists(): + return path + return None + + if not CODEX_SESSIONS_DIR.exists(): + _codex_transcript_cache[session_id] = None + return None + + try: + for jsonl_file in CODEX_SESSIONS_DIR.rglob(f"*{session_id}*.jsonl"): + _codex_transcript_cache[session_id] = str(jsonl_file) + return jsonl_file + except OSError: + pass + + _codex_transcript_cache[session_id] = None + return None + + def _read_jsonl_tail_entries(self, file_path, max_lines=300, max_bytes=1024 * 1024): + """Read only the tail of a JSONL file and return parsed entries.""" + entries = [] + + try: + with file_path.open("rb") as f: + f.seek(0, os.SEEK_END) + file_size = f.tell() + if file_size <= 0: + return entries + + read_size = min(file_size, max_bytes) + f.seek(file_size - read_size) + chunk = f.read(read_size) + except OSError: + return entries + + lines = chunk.splitlines() + if file_size > read_size and lines: + # First line may be partial because we started in the middle. + lines = lines[1:] + + for raw_line in lines[-max_lines:]: + if not raw_line: + continue + try: + entries.append(json.loads(raw_line.decode("utf-8", errors="replace"))) + except json.JSONDecodeError: + continue + + return entries + + def _to_int(self, value): + """Best-effort integer conversion.""" + if isinstance(value, bool): + return None + if isinstance(value, int): + return value + if isinstance(value, float): + return int(value) + if isinstance(value, str): + try: + return int(value) + except ValueError: + return None + return None + + def _sum_optional_ints(self, values): + """Sum available ints, return None when no values are present.""" + present = [v for v in values if isinstance(v, int)] + if not present: + return None + return sum(present) + + def _as_dict(self, value): + """Normalize potentially-null JSON objects into dicts.""" + return value if isinstance(value, dict) else {} + + def _parse_codex_context_usage_from_file(self, file_path): + """Extract the latest Codex context usage snapshot from transcript tail.""" + entries = self._read_jsonl_tail_entries(file_path, max_lines=600, max_bytes=1024 * 1024) + + for entry in reversed(entries): + if not isinstance(entry, dict): + continue + if entry.get("type") != "event_msg": + continue + payload = self._as_dict(entry.get("payload")) + if payload.get("type") != "token_count": + continue + + info = self._as_dict(payload.get("info")) + last_usage = self._as_dict(info.get("last_token_usage")) + total_usage = self._as_dict(info.get("total_token_usage")) + + input_tokens = self._to_int(last_usage.get("input_tokens")) + output_tokens = self._to_int(last_usage.get("output_tokens")) + cached_input_tokens = self._to_int(last_usage.get("cached_input_tokens")) + current_tokens = self._to_int(last_usage.get("total_tokens")) + if current_tokens is None: + current_tokens = self._sum_optional_ints([input_tokens, output_tokens, cached_input_tokens]) + + usage = { + "window_tokens": self._to_int(info.get("model_context_window")), + "current_tokens": current_tokens, + "input_tokens": input_tokens, + "output_tokens": output_tokens, + "cached_input_tokens": cached_input_tokens, + "session_total_tokens": self._to_int(total_usage.get("total_tokens")), + "updated_at": entry.get("timestamp", ""), + } + + if usage["current_tokens"] is None and usage["session_total_tokens"] is None: + continue + return usage + + return None + + def _get_claude_context_window(self, model): + """Return context window size for Claude models.""" + if not model: + return 200_000 # Default for unknown Claude models + # All current Claude 3.5/4 models have 200K context + # Legacy claude-3-opus/sonnet/haiku also 200K, claude-2 was 100K + if "claude-2" in model: + return 100_000 + return 200_000 + + def _parse_claude_context_usage_from_file(self, file_path): + """Extract Claude usage with context window.""" + entries = self._read_jsonl_tail_entries(file_path, max_lines=400, max_bytes=1024 * 1024) + + for entry in reversed(entries): + if not isinstance(entry, dict): + continue + if entry.get("type") != "assistant": + continue + + message = self._as_dict(entry.get("message")) + usage = self._as_dict(message.get("usage")) + if not usage: + continue + + input_tokens = self._to_int(usage.get("input_tokens")) + output_tokens = self._to_int(usage.get("output_tokens")) + cache_read_input_tokens = self._to_int(usage.get("cache_read_input_tokens")) + cache_creation_input_tokens = self._to_int(usage.get("cache_creation_input_tokens")) + cached_input_tokens = self._sum_optional_ints([ + cache_read_input_tokens, + cache_creation_input_tokens, + ]) + current_tokens = self._sum_optional_ints([ + input_tokens, + output_tokens, + cache_read_input_tokens, + cache_creation_input_tokens, + ]) + + if current_tokens is None: + continue + + model = message.get("model", "") + return { + "window_tokens": self._get_claude_context_window(model), + "current_tokens": current_tokens, + "input_tokens": input_tokens, + "output_tokens": output_tokens, + "cached_input_tokens": cached_input_tokens, + "session_total_tokens": None, + "updated_at": entry.get("timestamp", ""), + "model": model, + } + + return None + + def _get_cached_context_usage(self, file_path, parser): + """Cache parsed usage by transcript mtime/size for poll efficiency.""" + try: + stat = file_path.stat() + except OSError: + return None + + key = str(file_path) + cached = _context_usage_cache.get(key) + if cached and cached.get("mtime_ns") == stat.st_mtime_ns and cached.get("size") == stat.st_size: + return cached.get("usage") + + try: + usage = parser(file_path) + except Exception: + LOGGER.exception("Failed to parse context usage for %s", file_path) + usage = None + + # Evict oldest entries if cache is full (simple FIFO) + if len(_context_usage_cache) >= _CONTEXT_CACHE_MAX: + keys_to_remove = list(_context_usage_cache.keys())[: _CONTEXT_CACHE_MAX // 5] + for k in keys_to_remove: + _context_usage_cache.pop(k, None) + + _context_usage_cache[key] = { + "mtime_ns": stat.st_mtime_ns, + "size": stat.st_size, + "usage": usage, + } + return usage + + def _get_context_usage_for_session(self, session_data): + """Attach context/token usage info for both Codex and Claude sessions.""" + agent = session_data.get("agent") + existing = session_data.get("context_usage") + + if agent == "codex": + transcript_path = session_data.get("transcript_path", "") + transcript_file = Path(transcript_path) if transcript_path else None + if transcript_file and not transcript_file.exists(): + transcript_file = None + if not transcript_file: + transcript_file = self._find_codex_transcript_file(session_data.get("session_id", "")) + if not transcript_file: + return existing + parsed = self._get_cached_context_usage(transcript_file, self._parse_codex_context_usage_from_file) + return parsed or existing + + if agent == "claude": + conv_file = self._get_claude_conversation_file( + session_data.get("session_id", ""), + session_data.get("project_dir", ""), + ) + if not conv_file: + return existing + parsed = self._get_cached_context_usage(conv_file, self._parse_claude_context_usage_from_file) + return parsed or existing + + return existing diff --git a/amc_server/mixins/state.py b/amc_server/mixins/state.py new file mode 100644 index 0000000..54af6e4 --- /dev/null +++ b/amc_server/mixins/state.py @@ -0,0 +1,194 @@ +import hashlib +import json +import subprocess +import time +from datetime import datetime, timezone + +from amc_server.context import ( + EVENTS_DIR, + SESSIONS_DIR, + STALE_EVENT_AGE, + STALE_STARTING_AGE, + _state_lock, + _zellij_cache, +) +from amc_server.logging_utils import LOGGER + + +class StateMixin: + def _serve_state(self): + payload = self._build_state_payload() + self._send_json(200, payload) + + def _serve_stream(self): + """SSE stream of full state snapshots, emitted on change.""" + self.send_response(200) + self.send_header("Content-Type", "text/event-stream") + self.send_header("Cache-Control", "no-cache") + self.send_header("Connection", "keep-alive") + self.send_header("Access-Control-Allow-Origin", "*") + self.end_headers() + + # Ask clients to reconnect quickly on transient errors. + try: + self.wfile.write(b"retry: 2000\n\n") + self.wfile.flush() + except (BrokenPipeError, ConnectionResetError, OSError): + return + + last_hash = None + event_id = 0 + last_heartbeat_at = time.time() + heartbeat_interval = 15 + poll_interval = 1 + + try: + while True: + payload = self._build_state_payload() + payload_json = json.dumps(payload, separators=(",", ":")) + payload_hash = hashlib.sha1(payload_json.encode("utf-8")).hexdigest() + + if payload_hash != last_hash: + event_id += 1 + self._write_sse_event("state", payload_json, event_id) + last_hash = payload_hash + + now = time.time() + if now - last_heartbeat_at >= heartbeat_interval: + self.wfile.write(b": ping\n\n") + self.wfile.flush() + last_heartbeat_at = now + + time.sleep(poll_interval) + except (BrokenPipeError, ConnectionResetError, OSError): + # Client disconnected. + return + except Exception: + LOGGER.exception("Unhandled SSE stream error") + return + + def _write_sse_event(self, event_name, data, event_id): + """Write one SSE event frame.""" + # JSON payload is compact single-line; still split defensively for SSE format. + frame = [f"id: {event_id}", f"event: {event_name}"] + for line in str(data).splitlines(): + frame.append(f"data: {line}") + frame.append("") + frame.append("") + self.wfile.write("\n".join(frame).encode("utf-8")) + self.wfile.flush() + + def _build_state_payload(self): + """Build `/api/state` payload data used by JSON and SSE endpoints.""" + sessions = self._collect_sessions() + return { + "sessions": sessions, + "server_time": datetime.now(timezone.utc).isoformat(), + } + + def _collect_sessions(self): + """Collect and normalize all session records from disk.""" + with _state_lock: + sessions = [] + SESSIONS_DIR.mkdir(parents=True, exist_ok=True) + + # Discover active Codex sessions and create session files for them + self._discover_active_codex_sessions() + + # Get active Zellij sessions for liveness check + active_zellij_sessions = self._get_active_zellij_sessions() + + for f in SESSIONS_DIR.glob("*.json"): + try: + data = json.loads(f.read_text()) + if not isinstance(data, dict): + continue + + # Proactive liveness check: only auto-delete orphan "starting" sessions. + # Other statuses can still be useful as historical/debug context. + zellij_session = data.get("zellij_session", "") + if zellij_session and active_zellij_sessions is not None: + if zellij_session not in active_zellij_sessions: + if data.get("status") == "starting": + # A missing Zellij session while "starting" indicates an orphan. + f.unlink(missing_ok=True) + continue + + context_usage = self._get_context_usage_for_session(data) + if context_usage: + data["context_usage"] = context_usage + + sessions.append(data) + except (json.JSONDecodeError, OSError): + continue + except Exception: + LOGGER.exception("Failed processing session file %s", f) + continue + + # Sort by last_event_at descending + sessions.sort(key=lambda s: s.get("last_event_at", ""), reverse=True) + + # Clean orphan event logs (sessions persist until manually dismissed or SessionEnd) + self._cleanup_stale(sessions) + + return sessions + + def _get_active_zellij_sessions(self): + """Query Zellij for active sessions. Returns set of session names, or None on error.""" + now = time.time() + + # Use cached value if fresh (cache for 5 seconds to avoid hammering zellij) + if _zellij_cache["sessions"] is not None and now < _zellij_cache["expires"]: + return _zellij_cache["sessions"] + + try: + result = subprocess.run( + ["zellij", "list-sessions", "--no-formatting"], + capture_output=True, + text=True, + timeout=2, + ) + if result.returncode == 0: + # Parse session names (one per line, format: "session_name [created ...]" or just "session_name") + sessions = set() + for line in result.stdout.strip().splitlines(): + if line: + # Session name is the first word + session_name = line.split()[0] if line.split() else "" + if session_name: + sessions.add(session_name) + _zellij_cache["sessions"] = sessions + _zellij_cache["expires"] = now + 5 # Cache for 5 seconds + return sessions + except (subprocess.TimeoutExpired, FileNotFoundError, Exception): + pass + + return None # Return None on error (don't clean up if we can't verify) + + def _cleanup_stale(self, sessions): + """Remove orphan event logs >24h and stale 'starting' sessions >1h.""" + active_ids = {s.get("session_id") for s in sessions if s.get("session_id")} + now = time.time() + + # Clean up orphan event logs + EVENTS_DIR.mkdir(parents=True, exist_ok=True) + for f in EVENTS_DIR.glob("*.jsonl"): + session_id = f.stem + if session_id not in active_ids: + try: + age = now - f.stat().st_mtime + if age > STALE_EVENT_AGE: + f.unlink() + except OSError: + pass + + # Clean up orphan "starting" sessions (never became active) + for f in SESSIONS_DIR.glob("*.json"): + try: + age = now - f.stat().st_mtime + if age > STALE_STARTING_AGE: + data = json.loads(f.read_text()) + if data.get("status") == "starting": + f.unlink() + except (json.JSONDecodeError, OSError): + pass diff --git a/amc_server/server.py b/amc_server/server.py new file mode 100644 index 0000000..2b8efb7 --- /dev/null +++ b/amc_server/server.py @@ -0,0 +1,31 @@ +import os +from http.server import ThreadingHTTPServer + +from amc_server.context import DATA_DIR, PORT +from amc_server.handler import AMCHandler +from amc_server.logging_utils import LOGGER, configure_logging, install_signal_handlers + + +def main(): + configure_logging() + DATA_DIR.mkdir(parents=True, exist_ok=True) + LOGGER.info("Starting AMC server") + server = ThreadingHTTPServer(("127.0.0.1", PORT), AMCHandler) + install_signal_handlers(server) + LOGGER.info("AMC server listening on http://127.0.0.1:%s", PORT) + + # Write PID file + pid_file = DATA_DIR / "server.pid" + pid_file.write_text(str(os.getpid())) + + try: + server.serve_forever() + except KeyboardInterrupt: + LOGGER.info("AMC server interrupted; shutting down") + except Exception: + LOGGER.exception("AMC server crashed") + raise + finally: + pid_file.unlink(missing_ok=True) + server.server_close() + LOGGER.info("AMC server stopped") diff --git a/bin/amc-server b/bin/amc-server index 6ec51f0..7034290 100755 --- a/bin/amc-server +++ b/bin/amc-server @@ -1,863 +1,16 @@ #!/usr/bin/env python3 -"""AMC server — serves the dashboard and session state API. +"""AMC server launcher.""" -Endpoints: - GET / → dashboard.html - GET /api/state → aggregated session state JSON - GET /api/events/ID → event timeline for one session - GET /api/conversation/ID → conversation history for a session - POST /api/dismiss/ID → dismiss (delete) a completed session - POST /api/respond/ID → inject response into session's Zellij pane -""" - -import json -import os -import re -import subprocess -import time -import urllib.parse -from datetime import datetime, timezone -from http.server import HTTPServer, BaseHTTPRequestHandler +import sys from pathlib import Path -# Claude Code conversation directory -CLAUDE_PROJECTS_DIR = Path.home() / ".claude" / "projects" -# Codex conversation directory -CODEX_SESSIONS_DIR = Path.home() / ".codex" / "sessions" +SCRIPT_DIR = Path(__file__).resolve().parent +PROJECT_DIR = SCRIPT_DIR.parent +if str(PROJECT_DIR) not in sys.path: + sys.path.insert(0, str(PROJECT_DIR)) -# Plugin path for zellij-send-keys -ZELLIJ_PLUGIN = Path.home() / ".config" / "zellij" / "plugins" / "zellij-send-keys.wasm" - -# Runtime data lives in XDG data dir -DATA_DIR = Path.home() / ".local" / "share" / "amc" -SESSIONS_DIR = DATA_DIR / "sessions" -EVENTS_DIR = DATA_DIR / "events" - -# Source files live in project directory (relative to this script) -PROJECT_DIR = Path(__file__).resolve().parent.parent -DASHBOARD_FILE = PROJECT_DIR / "dashboard.html" - -PORT = 7400 -STALE_EVENT_AGE = 86400 # 24 hours in seconds -STALE_STARTING_AGE = 3600 # 1 hour - sessions stuck in "starting" are orphans - -# Cache for Zellij session list (avoid calling zellij on every request) -_zellij_cache = {"sessions": None, "expires": 0} - -# Cache for Codex pane info (avoid running pgrep/ps/lsof on every request) -_codex_pane_cache = {"pid_info": {}, "cwd_map": {}, "expires": 0} - -# Codex sessions dismissed during this server lifetime (prevents re-discovery) -_dismissed_codex_ids = set() - - -class AMCHandler(BaseHTTPRequestHandler): - def do_GET(self): - if self.path == "/" or self.path == "/index.html": - self._serve_preact_dashboard() - elif self.path == "/old" or self.path == "/dashboard.html": - self._serve_dashboard() - elif self.path == "/api/state": - self._serve_state() - elif self.path.startswith("/api/events/"): - session_id = urllib.parse.unquote(self.path[len("/api/events/"):]) - self._serve_events(session_id) - elif self.path.startswith("/api/conversation/"): - # Parse session_id and query params - path_part = self.path[len("/api/conversation/"):] - if "?" in path_part: - session_id, query = path_part.split("?", 1) - params = urllib.parse.parse_qs(query) - project_dir = params.get("project_dir", [""])[0] - agent = params.get("agent", ["claude"])[0] - else: - session_id = path_part - project_dir = "" - agent = "claude" - self._serve_conversation(urllib.parse.unquote(session_id), urllib.parse.unquote(project_dir), agent) - else: - self._json_error(404, "Not Found") - - def do_POST(self): - if self.path.startswith("/api/dismiss/"): - session_id = urllib.parse.unquote(self.path[len("/api/dismiss/"):]) - self._dismiss_session(session_id) - elif self.path.startswith("/api/respond/"): - session_id = urllib.parse.unquote(self.path[len("/api/respond/"):]) - self._respond_to_session(session_id) - else: - self._json_error(404, "Not Found") - - def do_OPTIONS(self): - # CORS preflight for respond endpoint - self.send_response(204) - self.send_header("Access-Control-Allow-Origin", "*") - self.send_header("Access-Control-Allow-Methods", "POST, OPTIONS") - self.send_header("Access-Control-Allow-Headers", "Content-Type") - self.end_headers() - - def _serve_dashboard(self): - try: - content = DASHBOARD_FILE.read_bytes() - self.send_response(200) - self.send_header("Content-Type", "text/html; charset=utf-8") - self.send_header("Content-Length", str(len(content))) - self.end_headers() - self.wfile.write(content) - except FileNotFoundError: - self.send_error(500, "dashboard.html not found") - - def _serve_preact_dashboard(self): - try: - preact_file = PROJECT_DIR / "dashboard-preact.html" - content = preact_file.read_bytes() - self.send_response(200) - self.send_header("Content-Type", "text/html; charset=utf-8") - self.send_header("Content-Length", str(len(content))) - self.end_headers() - self.wfile.write(content) - except FileNotFoundError: - self.send_error(500, "dashboard-preact.html not found") - - def _serve_state(self): - sessions = [] - SESSIONS_DIR.mkdir(parents=True, exist_ok=True) - - # Discover active Codex sessions and create session files for them - self._discover_active_codex_sessions() - - # Get active Zellij sessions for liveness check - active_zellij_sessions = self._get_active_zellij_sessions() - - for f in SESSIONS_DIR.glob("*.json"): - try: - data = json.loads(f.read_text()) - - # Proactive liveness check: only auto-delete orphan "starting" sessions. - # Other statuses can still be useful as historical/debug context. - zellij_session = data.get("zellij_session", "") - if zellij_session and active_zellij_sessions is not None: - if zellij_session not in active_zellij_sessions: - if data.get("status") == "starting": - # A missing Zellij session while "starting" indicates an orphan. - f.unlink(missing_ok=True) - continue - - sessions.append(data) - except (json.JSONDecodeError, OSError): - continue - - # Sort by last_event_at descending - sessions.sort(key=lambda s: s.get("last_event_at", ""), reverse=True) - - # Clean orphan event logs (sessions persist until manually dismissed or SessionEnd) - self._cleanup_stale(sessions) - - response = json.dumps({ - "sessions": sessions, - "server_time": datetime.now(timezone.utc).isoformat(), - }).encode() - - self.send_response(200) - self.send_header("Content-Type", "application/json") - self.send_header("Access-Control-Allow-Origin", "*") - self.send_header("Content-Length", str(len(response))) - self.end_headers() - self.wfile.write(response) - - def _serve_events(self, session_id): - # Sanitize session_id to prevent path traversal - safe_id = os.path.basename(session_id) - event_file = EVENTS_DIR / f"{safe_id}.jsonl" - - events = [] - if event_file.exists(): - try: - for line in event_file.read_text().splitlines(): - if line.strip(): - try: - events.append(json.loads(line)) - except json.JSONDecodeError: - continue - except OSError: - pass - - response = json.dumps({"session_id": safe_id, "events": events}).encode() - - self.send_response(200) - self.send_header("Content-Type", "application/json") - self.send_header("Access-Control-Allow-Origin", "*") - self.send_header("Content-Length", str(len(response))) - self.end_headers() - self.wfile.write(response) - - def _serve_conversation(self, session_id, project_dir, agent="claude"): - """Serve conversation history from Claude Code or Codex JSONL file.""" - safe_id = os.path.basename(session_id) - messages = [] - - if agent == "codex": - messages = self._parse_codex_conversation(safe_id) - else: - messages = self._parse_claude_conversation(safe_id, project_dir) - - response = json.dumps({"session_id": safe_id, "messages": messages}).encode() - - self.send_response(200) - self.send_header("Content-Type", "application/json") - self.send_header("Access-Control-Allow-Origin", "*") - self.send_header("Content-Length", str(len(response))) - self.end_headers() - self.wfile.write(response) - - def _parse_claude_conversation(self, session_id, project_dir): - """Parse Claude Code JSONL conversation format.""" - messages = [] - - # Convert project_dir to Claude's encoded format - # /Users/foo/projects/bar -> -Users-foo-projects-bar - if project_dir: - encoded_dir = project_dir.replace("/", "-") - if not encoded_dir.startswith("-"): - encoded_dir = "-" + encoded_dir - else: - encoded_dir = "" - - # Find the conversation file - conv_file = None - if encoded_dir: - conv_file = CLAUDE_PROJECTS_DIR / encoded_dir / f"{session_id}.jsonl" - - if conv_file and conv_file.exists(): - try: - for line in conv_file.read_text().splitlines(): - if not line.strip(): - continue - try: - entry = json.loads(line) - msg_type = entry.get("type") - - if msg_type == "user": - content = entry.get("message", {}).get("content", "") - # Only include actual human messages (strings), not tool results (arrays) - if content and isinstance(content, str): - messages.append({ - "role": "user", - "content": content, - "timestamp": entry.get("timestamp", "") - }) - - elif msg_type == "assistant": - # Assistant messages have structured content - raw_content = entry.get("message", {}).get("content", []) - text_parts = [] - for part in raw_content: - if isinstance(part, dict): - if part.get("type") == "text": - text_parts.append(part.get("text", "")) - elif isinstance(part, str): - text_parts.append(part) - if text_parts: - messages.append({ - "role": "assistant", - "content": "\n".join(text_parts), - "timestamp": entry.get("timestamp", "") - }) - - except json.JSONDecodeError: - continue - except OSError: - pass - - return messages - - def _parse_codex_conversation(self, session_id): - """Parse Codex JSONL conversation format.""" - messages = [] - - # Find the Codex session file by searching for files containing the session ID - # Codex files are named: rollout-YYYY-MM-DDTHH-MM-SS-SESSION_ID.jsonl - conv_file = None - if CODEX_SESSIONS_DIR.exists(): - for jsonl_file in CODEX_SESSIONS_DIR.rglob("*.jsonl"): - if session_id in jsonl_file.name: - conv_file = jsonl_file - break - - if conv_file and conv_file.exists(): - try: - for line in conv_file.read_text().splitlines(): - if not line.strip(): - continue - try: - entry = json.loads(line) - - # Codex format: type="response_item", payload.type="message" - if entry.get("type") != "response_item": - continue - - payload = entry.get("payload", {}) - if payload.get("type") != "message": - continue - - role = payload.get("role", "") - content_parts = payload.get("content", []) - - # Skip developer role (system context/permissions) - if role == "developer": - continue - - # Extract text from content array - text_parts = [] - for part in content_parts: - if isinstance(part, dict): - # Codex uses "input_text" for user, "output_text" for assistant - text = part.get("text", "") - if text: - # Skip injected context (AGENTS.md, environment, permissions) - skip_prefixes = ( - "", - "", - "", - "# AGENTS.md instructions", - ) - if any(text.startswith(p) for p in skip_prefixes): - continue - text_parts.append(text) - - if text_parts and role in ("user", "assistant"): - messages.append({ - "role": role, - "content": "\n".join(text_parts), - "timestamp": entry.get("timestamp", "") - }) - - except json.JSONDecodeError: - continue - except OSError: - pass - - return messages - - def _dismiss_session(self, session_id): - """Delete a session file (manual dismiss from dashboard).""" - safe_id = os.path.basename(session_id) - session_file = SESSIONS_DIR / f"{safe_id}.json" - # Track dismissed Codex sessions to prevent re-discovery - _dismissed_codex_ids.add(safe_id) - session_file.unlink(missing_ok=True) - - response = json.dumps({"ok": True}).encode() - self.send_response(200) - self.send_header("Content-Type", "application/json") - self.send_header("Access-Control-Allow-Origin", "*") - self.send_header("Content-Length", str(len(response))) - self.end_headers() - self.wfile.write(response) - - def _respond_to_session(self, session_id): - """Inject a response into the session's Zellij pane.""" - safe_id = os.path.basename(session_id) - session_file = SESSIONS_DIR / f"{safe_id}.json" - - # Read request body - try: - content_length = int(self.headers.get("Content-Length", 0)) - body = json.loads(self.rfile.read(content_length)) - text = body.get("text", "") - is_freeform = body.get("freeform", False) - option_count = body.get("optionCount", 0) - except (json.JSONDecodeError, ValueError): - self._json_error(400, "Invalid JSON body") - return - - if not text or not text.strip(): - self._json_error(400, "Missing or empty 'text' field") - return - - # Load session - if not session_file.exists(): - self._json_error(404, "Session not found") - return - - try: - session = json.loads(session_file.read_text()) - except (json.JSONDecodeError, OSError): - self._json_error(500, "Failed to read session") - return - - zellij_session = session.get("zellij_session", "") - zellij_pane = session.get("zellij_pane", "") - - if not zellij_session or not zellij_pane: - self._json_error(400, "Session missing Zellij pane info - cannot send input without a pane target") - return - - # Parse pane ID from "terminal_N" format - pane_id = self._parse_pane_id(zellij_pane) - if pane_id is None: - self._json_error(400, f"Invalid pane format: {zellij_pane}") - return - - # For freeform responses, we need two-step injection: - # 1. Send "Other" option number (optionCount + 1) WITHOUT Enter - # 2. Wait for Claude Code to switch to text input mode - # 3. Send the actual text WITH Enter - if is_freeform and option_count > 0: - other_num = str(option_count + 1) - result = self._inject_to_pane(zellij_session, pane_id, other_num, send_enter=False) - if not result["ok"]: - response = json.dumps({"ok": False, "error": result["error"]}).encode() - self.send_response(500) - self.send_header("Content-Type", "application/json") - self.send_header("Access-Control-Allow-Origin", "*") - self.send_header("Content-Length", str(len(response))) - self.end_headers() - self.wfile.write(response) - return - # Delay for Claude Code to switch to text input mode - time.sleep(0.3) - - # Inject the actual text (with Enter) - result = self._inject_to_pane(zellij_session, pane_id, text, send_enter=True) - - if result["ok"]: - response = json.dumps({"ok": True}).encode() - self.send_response(200) - else: - response = json.dumps({"ok": False, "error": result["error"]}).encode() - self.send_response(500) - - self.send_header("Content-Type", "application/json") - self.send_header("Access-Control-Allow-Origin", "*") - self.send_header("Content-Length", str(len(response))) - self.end_headers() - self.wfile.write(response) - - def _parse_pane_id(self, zellij_pane): - """Extract numeric pane ID from various formats.""" - if not zellij_pane: - return None - - # Try direct integer (e.g., "10") - try: - return int(zellij_pane) - except ValueError: - pass - - # Try "terminal_N" format - parts = zellij_pane.split("_") - if len(parts) == 2 and parts[0] in ("terminal", "plugin"): - try: - return int(parts[1]) - except ValueError: - pass - - return None - - def _inject_to_pane(self, zellij_session, pane_id, text, send_enter=True): - """Inject text into a pane using zellij actions.""" - env = os.environ.copy() - env["ZELLIJ_SESSION_NAME"] = zellij_session - - # Try plugin first (no focus change), fall back to write-chars (changes focus) - if ZELLIJ_PLUGIN.exists(): - result = self._try_plugin_inject(env, pane_id, text, send_enter) - if result["ok"]: - return result - # Plugin failed, fall back to write-chars - - return self._try_write_chars_inject(env, text, send_enter) - - def _try_plugin_inject(self, env, pane_id, text, send_enter=True): - """Try injecting via zellij-send-keys plugin (no focus change).""" - payload = json.dumps({ - "pane_id": pane_id, - "text": text, - "send_enter": send_enter, - }) - - try: - result = subprocess.run( - [ - "zellij", "action", "pipe", - "--plugin", f"file:{ZELLIJ_PLUGIN}", - "--name", "send_keys", - "--floating-plugin", "false", - "--", payload, - ], - env=env, - capture_output=True, - text=True, - timeout=3, - ) - - if result.returncode == 0: - return {"ok": True} - return {"ok": False, "error": result.stderr or "plugin failed"} - - except subprocess.TimeoutExpired: - return {"ok": False, "error": "plugin timed out"} - except Exception as e: - return {"ok": False, "error": str(e)} - - def _try_write_chars_inject(self, env, text, send_enter=True): - """Inject via write-chars (writes to focused pane, simpler but changes focus).""" - try: - # Write the text - result = subprocess.run( - ["zellij", "action", "write-chars", text], - env=env, - capture_output=True, - text=True, - timeout=2, - ) - - if result.returncode != 0: - return {"ok": False, "error": result.stderr or "write-chars failed"} - - # Send Enter if requested - if send_enter: - result = subprocess.run( - ["zellij", "action", "write", "13"], # 13 = Enter - env=env, - capture_output=True, - text=True, - timeout=2, - ) - - if result.returncode != 0: - return {"ok": False, "error": result.stderr or "write Enter failed"} - - return {"ok": True} - - except subprocess.TimeoutExpired: - return {"ok": False, "error": "write-chars timed out"} - except FileNotFoundError: - return {"ok": False, "error": "zellij not found in PATH"} - except Exception as e: - return {"ok": False, "error": str(e)} - - def _discover_active_codex_sessions(self): - """Find active Codex sessions and create/update session files with Zellij pane info.""" - if not CODEX_SESSIONS_DIR.exists(): - return - - # Get Zellij pane info for running codex processes - pid_info, cwd_map = self._get_codex_pane_info() - - # Only look at sessions modified in the last 10 minutes (active) - now = time.time() - cutoff = now - 600 # 10 minutes - - for jsonl_file in CODEX_SESSIONS_DIR.rglob("*.jsonl"): - try: - # Skip old files - mtime = jsonl_file.stat().st_mtime - if mtime < cutoff: - continue - - # Extract session ID from filename - match = re.search(r'([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})', jsonl_file.name) - if not match: - continue - - session_id = match.group(1) - - # Skip sessions the user has dismissed - if session_id in _dismissed_codex_ids: - continue - - session_file = SESSIONS_DIR / f"{session_id}.json" - - # Parse first line to get session metadata - with jsonl_file.open() as f: - first_line = f.readline().strip() - if not first_line: - continue - - meta = json.loads(first_line) - if meta.get("type") != "session_meta": - continue - - payload = meta.get("payload", {}) - cwd = payload.get("cwd", "") - project = os.path.basename(cwd) if cwd else "Unknown" - - # Match session to Zellij pane (UUID match via lsof, CWD fallback) - zellij_session, zellij_pane = self._match_codex_session_to_pane( - jsonl_file, cwd, pid_info, cwd_map - ) - - # Determine status based on file age - file_age_minutes = (now - mtime) / 60 - if file_age_minutes < 2: - status = "active" - else: - status = "done" - - # Read existing session to preserve some fields - existing = {} - if session_file.exists(): - try: - existing = json.loads(session_file.read_text()) - # Don't downgrade active to done if file was just updated - if existing.get("status") == "active" and status == "done": - # Check if we should keep it active - if file_age_minutes < 5: - status = "active" - except (json.JSONDecodeError, OSError): - pass - - # Get last message preview from recent lines - last_message = "" - try: - lines = jsonl_file.read_text().splitlines()[-30:] - for line in reversed(lines): - entry = json.loads(line) - if entry.get("type") == "response_item": - payload_item = entry.get("payload", {}) - if payload_item.get("role") == "assistant": - content = payload_item.get("content", []) - for part in content: - if isinstance(part, dict) and part.get("text"): - text = part["text"] - # Skip system content - if not text.startswith("<") and not text.startswith("#"): - last_message = text[:200] - break - if last_message: - break - except (json.JSONDecodeError, OSError): - pass - - session_ts = payload.get("timestamp", "") - last_event_at = datetime.fromtimestamp(mtime, tz=timezone.utc).isoformat() - - session_data = { - "session_id": session_id, - "agent": "codex", - "project": project, - "project_dir": cwd, - "status": status, - "started_at": existing.get("started_at", session_ts), - "last_event_at": last_event_at, - "last_event": "CodexSession", - "last_message_preview": last_message, - "zellij_session": zellij_session or existing.get("zellij_session", ""), - "zellij_pane": zellij_pane or existing.get("zellij_pane", ""), - } - - session_file.write_text(json.dumps(session_data, indent=2)) - - except (OSError, json.JSONDecodeError): - continue - - def _get_codex_pane_info(self): - """Get Zellij pane info for running codex processes via process inspection. - - Extracts ZELLIJ_PANE_ID from each codex process's inherited environment, - since zellij dump-layout doesn't provide pane IDs. - - Results are cached for 5 seconds to avoid running pgrep/ps/lsof on - every dashboard poll. - - Returns: - tuple: (pid_info, cwd_map) - pid_info: {pid_str: {"pane_id": str, "zellij_session": str}} - cwd_map: {cwd_path: {"session": str, "pane_id": str}} - """ - now = time.time() - if now < _codex_pane_cache["expires"]: - return _codex_pane_cache["pid_info"], _codex_pane_cache["cwd_map"] - - pid_info = {} - cwd_map = {} - - try: - # Step 1: Find codex process PIDs - result = subprocess.run( - ["pgrep", "-x", "codex"], - capture_output=True, text=True, timeout=2, - ) - pids = [p.strip() for p in result.stdout.strip().splitlines() if p.strip()] if result.returncode == 0 else [] - - # Step 2: Extract ZELLIJ env vars from each process - for pid in pids: - try: - env_result = subprocess.run( - ["ps", "eww", "-o", "args=", "-p", pid], - capture_output=True, text=True, timeout=2, - ) - if env_result.returncode != 0: - continue - - env_str = env_result.stdout - pane_match = re.search(r'ZELLIJ_PANE_ID=(\d+)', env_str) - session_match = re.search(r'ZELLIJ_SESSION_NAME=(\S+)', env_str) - - if pane_match and session_match: - pid_info[pid] = { - "pane_id": pane_match.group(1), - "zellij_session": session_match.group(1), - } - except (subprocess.TimeoutExpired, Exception): - continue - - # Step 3: Get CWDs via single batched lsof call - if pid_info: - pid_list = ",".join(pid_info.keys()) - try: - cwd_result = subprocess.run( - ["lsof", "-a", "-p", pid_list, "-d", "cwd", "-Fn"], - capture_output=True, text=True, timeout=3, - ) - if cwd_result.returncode == 0: - current_pid = None - for line in cwd_result.stdout.splitlines(): - if line.startswith("p"): - current_pid = line[1:] - elif line.startswith("n/") and current_pid and current_pid in pid_info: - cwd = line[1:] - info = pid_info[current_pid] - cwd_map[cwd] = { - "session": info["zellij_session"], - "pane_id": info["pane_id"], - } - except (subprocess.TimeoutExpired, Exception): - pass - - except (subprocess.TimeoutExpired, FileNotFoundError, Exception): - pass - - _codex_pane_cache["pid_info"] = pid_info - _codex_pane_cache["cwd_map"] = cwd_map - _codex_pane_cache["expires"] = now + 5 # Cache for 5 seconds - - return pid_info, cwd_map - - def _match_codex_session_to_pane(self, session_file, session_cwd, pid_info, cwd_map): - """Match a Codex session file to a Zellij pane. - - Tries session-file-to-PID matching first (via lsof), falls back to CWD. - - Returns: - tuple: (zellij_session, pane_id) or ("", "") - """ - # Try precise match: which process has this session file open? - try: - result = subprocess.run( - ["lsof", "-t", str(session_file)], - capture_output=True, text=True, timeout=2, - ) - if result.returncode == 0 and result.stdout.strip(): - for pid in result.stdout.strip().splitlines(): - pid = pid.strip() - if pid in pid_info: - info = pid_info[pid] - return info["zellij_session"], info["pane_id"] - except (subprocess.TimeoutExpired, Exception): - pass - - # Fall back to CWD match - normalized_cwd = os.path.normpath(session_cwd) if session_cwd else "" - for pane_cwd, info in cwd_map.items(): - if os.path.normpath(pane_cwd) == normalized_cwd: - return info["session"], info["pane_id"] - - return "", "" - - def _json_error(self, code, message): - """Send a JSON error response.""" - response = json.dumps({"ok": False, "error": message}).encode() - self.send_response(code) - self.send_header("Content-Type", "application/json") - self.send_header("Access-Control-Allow-Origin", "*") - self.send_header("Content-Length", str(len(response))) - self.end_headers() - self.wfile.write(response) - - def _get_active_zellij_sessions(self): - """Query Zellij for active sessions. Returns set of session names, or None on error.""" - now = time.time() - - # Use cached value if fresh (cache for 5 seconds to avoid hammering zellij) - if _zellij_cache["sessions"] is not None and now < _zellij_cache["expires"]: - return _zellij_cache["sessions"] - - try: - result = subprocess.run( - ["zellij", "list-sessions", "--no-formatting"], - capture_output=True, - text=True, - timeout=2, - ) - if result.returncode == 0: - # Parse session names (one per line, format: "session_name [created ...]" or just "session_name") - sessions = set() - for line in result.stdout.strip().splitlines(): - if line: - # Session name is the first word - session_name = line.split()[0] if line.split() else "" - if session_name: - sessions.add(session_name) - _zellij_cache["sessions"] = sessions - _zellij_cache["expires"] = now + 5 # Cache for 5 seconds - return sessions - except (subprocess.TimeoutExpired, FileNotFoundError, Exception): - pass - - return None # Return None on error (don't clean up if we can't verify) - - def _cleanup_stale(self, sessions): - """Remove orphan event logs >24h and stale 'starting' sessions >1h.""" - active_ids = {s.get("session_id") for s in sessions if s.get("session_id")} - now = time.time() - - # Clean up orphan event logs - EVENTS_DIR.mkdir(parents=True, exist_ok=True) - for f in EVENTS_DIR.glob("*.jsonl"): - session_id = f.stem - if session_id not in active_ids: - try: - age = now - f.stat().st_mtime - if age > STALE_EVENT_AGE: - f.unlink() - except OSError: - pass - - # Clean up orphan "starting" sessions (never became active) - for f in SESSIONS_DIR.glob("*.json"): - try: - age = now - f.stat().st_mtime - if age > STALE_STARTING_AGE: - data = json.loads(f.read_text()) - if data.get("status") == "starting": - f.unlink() - except (json.JSONDecodeError, OSError): - pass - - def log_message(self, format, *args): - """Suppress default request logging to keep output clean.""" - pass - - -def main(): - server = HTTPServer(("127.0.0.1", PORT), AMCHandler) - print(f"AMC server listening on http://127.0.0.1:{PORT}") - - # Write PID file - pid_file = DATA_DIR / "server.pid" - pid_file.write_text(str(os.getpid())) - - try: - server.serve_forever() - except KeyboardInterrupt: - pass - finally: - pid_file.unlink(missing_ok=True) - server.server_close() +from amc_server.server import main # noqa: E402 if __name__ == "__main__":