Initial work, pre-preact refactor

This commit is contained in:
teernisse
2026-02-25 09:21:56 -05:00
commit b2a5712202
4 changed files with 2266 additions and 0 deletions

464
bin/amc-server Executable file
View File

@@ -0,0 +1,464 @@
#!/usr/bin/env python3
"""AMC server — serves the dashboard and session state API.
Endpoints:
GET / → dashboard.html
GET /api/state → aggregated session state JSON
GET /api/events/ID → event timeline for one session
GET /api/conversation/ID → conversation history for a session
POST /api/dismiss/ID → dismiss (delete) a completed session
POST /api/respond/ID → inject response into session's Zellij pane
"""
import json
import os
import subprocess
import time
import urllib.parse
from datetime import datetime, timezone
from http.server import HTTPServer, BaseHTTPRequestHandler
from pathlib import Path
# Claude Code conversation directory
CLAUDE_PROJECTS_DIR = Path.home() / ".claude" / "projects"
# Plugin path for zellij-send-keys
ZELLIJ_PLUGIN = Path.home() / ".config" / "zellij" / "plugins" / "zellij-send-keys.wasm"
# Runtime data lives in XDG data dir
DATA_DIR = Path.home() / ".local" / "share" / "amc"
SESSIONS_DIR = DATA_DIR / "sessions"
EVENTS_DIR = DATA_DIR / "events"
# Source files live in project directory (relative to this script)
PROJECT_DIR = Path(__file__).resolve().parent.parent
DASHBOARD_FILE = PROJECT_DIR / "dashboard.html"
PORT = 7400
STALE_EVENT_AGE = 86400 # 24 hours in seconds
class AMCHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/" or self.path == "/index.html":
self._serve_dashboard()
elif self.path == "/api/state":
self._serve_state()
elif self.path.startswith("/api/events/"):
session_id = urllib.parse.unquote(self.path[len("/api/events/"):])
self._serve_events(session_id)
elif self.path.startswith("/api/conversation/"):
# Parse session_id and query params
path_part = self.path[len("/api/conversation/"):]
if "?" in path_part:
session_id, query = path_part.split("?", 1)
params = urllib.parse.parse_qs(query)
project_dir = params.get("project_dir", [""])[0]
else:
session_id = path_part
project_dir = ""
self._serve_conversation(urllib.parse.unquote(session_id), urllib.parse.unquote(project_dir))
else:
self.send_error(404)
def do_POST(self):
if self.path.startswith("/api/dismiss/"):
session_id = urllib.parse.unquote(self.path[len("/api/dismiss/"):])
self._dismiss_session(session_id)
elif self.path.startswith("/api/respond/"):
session_id = urllib.parse.unquote(self.path[len("/api/respond/"):])
self._respond_to_session(session_id)
else:
self.send_error(404)
def do_OPTIONS(self):
# CORS preflight for respond endpoint
self.send_response(204)
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "POST, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type")
self.end_headers()
def _serve_dashboard(self):
try:
content = DASHBOARD_FILE.read_bytes()
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(content)))
self.end_headers()
self.wfile.write(content)
except FileNotFoundError:
self.send_error(500, "dashboard.html not found")
def _serve_state(self):
sessions = []
SESSIONS_DIR.mkdir(parents=True, exist_ok=True)
for f in SESSIONS_DIR.glob("*.json"):
try:
data = json.loads(f.read_text())
sessions.append(data)
except (json.JSONDecodeError, OSError):
continue
# Sort by last_event_at descending
sessions.sort(key=lambda s: s.get("last_event_at", ""), reverse=True)
# Clean orphan event logs (sessions persist until manually dismissed or SessionEnd)
self._cleanup_stale(sessions)
response = json.dumps({
"sessions": sessions,
"server_time": datetime.now(timezone.utc).isoformat(),
}).encode()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Content-Length", str(len(response)))
self.end_headers()
self.wfile.write(response)
def _serve_events(self, session_id):
# Sanitize session_id to prevent path traversal
safe_id = os.path.basename(session_id)
event_file = EVENTS_DIR / f"{safe_id}.jsonl"
events = []
if event_file.exists():
try:
for line in event_file.read_text().splitlines():
if line.strip():
try:
events.append(json.loads(line))
except json.JSONDecodeError:
continue
except OSError:
pass
response = json.dumps({"session_id": safe_id, "events": events}).encode()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Content-Length", str(len(response)))
self.end_headers()
self.wfile.write(response)
def _serve_conversation(self, session_id, project_dir):
"""Serve conversation history from Claude Code JSONL file."""
safe_id = os.path.basename(session_id)
# Convert project_dir to Claude's encoded format
# /Users/foo/projects/bar -> -Users-foo-projects-bar
if project_dir:
encoded_dir = project_dir.replace("/", "-")
if encoded_dir.startswith("-"):
encoded_dir = encoded_dir # Already starts with -
else:
encoded_dir = "-" + encoded_dir
else:
encoded_dir = ""
# Find the conversation file
conv_file = None
if encoded_dir:
conv_file = CLAUDE_PROJECTS_DIR / encoded_dir / f"{safe_id}.jsonl"
messages = []
if conv_file and conv_file.exists():
try:
for line in conv_file.read_text().splitlines():
if not line.strip():
continue
try:
entry = json.loads(line)
msg_type = entry.get("type")
if msg_type == "user":
content = entry.get("message", {}).get("content", "")
# Only include actual human messages (strings), not tool results (arrays)
if content and isinstance(content, str):
messages.append({
"role": "user",
"content": content,
"timestamp": entry.get("timestamp", "")
})
elif msg_type == "assistant":
# Assistant messages have structured content
raw_content = entry.get("message", {}).get("content", [])
text_parts = []
for part in raw_content:
if isinstance(part, dict):
if part.get("type") == "text":
text_parts.append(part.get("text", ""))
elif isinstance(part, str):
text_parts.append(part)
if text_parts:
messages.append({
"role": "assistant",
"content": "\n".join(text_parts),
"timestamp": entry.get("timestamp", "")
})
except json.JSONDecodeError:
continue
except OSError:
pass
response = json.dumps({"session_id": safe_id, "messages": messages}).encode()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Content-Length", str(len(response)))
self.end_headers()
self.wfile.write(response)
def _dismiss_session(self, session_id):
"""Delete a session file (manual dismiss from dashboard)."""
safe_id = os.path.basename(session_id)
session_file = SESSIONS_DIR / f"{safe_id}.json"
session_file.unlink(missing_ok=True)
response = json.dumps({"ok": True}).encode()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(response)))
self.end_headers()
self.wfile.write(response)
def _respond_to_session(self, session_id):
"""Inject a response into the session's Zellij pane."""
safe_id = os.path.basename(session_id)
session_file = SESSIONS_DIR / f"{safe_id}.json"
# Read request body
try:
content_length = int(self.headers.get("Content-Length", 0))
body = json.loads(self.rfile.read(content_length))
text = body.get("text", "")
is_freeform = body.get("freeform", False)
option_count = body.get("optionCount", 0)
except (json.JSONDecodeError, ValueError):
self._json_error(400, "Invalid JSON body")
return
if not text:
self._json_error(400, "Missing 'text' field")
return
# Load session
if not session_file.exists():
self._json_error(404, "Session not found")
return
try:
session = json.loads(session_file.read_text())
except (json.JSONDecodeError, OSError):
self._json_error(500, "Failed to read session")
return
zellij_session = session.get("zellij_session", "")
zellij_pane = session.get("zellij_pane", "")
if not zellij_session or not zellij_pane:
self._json_error(400, "Session missing Zellij info")
return
# Parse pane ID from "terminal_N" format
pane_id = self._parse_pane_id(zellij_pane)
if pane_id is None:
self._json_error(400, f"Invalid pane format: {zellij_pane}")
return
# For freeform responses, we need two-step injection:
# 1. Send "Other" option number (optionCount + 1) WITHOUT Enter
# 2. Wait for Claude Code to switch to text input mode
# 3. Send the actual text WITH Enter
if is_freeform and option_count > 0:
other_num = str(option_count + 1)
result = self._inject_to_pane(zellij_session, pane_id, other_num, send_enter=False)
if not result["ok"]:
response = json.dumps({"ok": False, "error": result["error"]}).encode()
self.send_response(500)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Content-Length", str(len(response)))
self.end_headers()
self.wfile.write(response)
return
# Delay for Claude Code to switch to text input mode
time.sleep(0.3)
# Inject the actual text (with Enter)
result = self._inject_to_pane(zellij_session, pane_id, text, send_enter=True)
if result["ok"]:
response = json.dumps({"ok": True}).encode()
self.send_response(200)
else:
response = json.dumps({"ok": False, "error": result["error"]}).encode()
self.send_response(500)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Content-Length", str(len(response)))
self.end_headers()
self.wfile.write(response)
def _parse_pane_id(self, zellij_pane):
"""Extract numeric pane ID from various formats."""
if not zellij_pane:
return None
# Try direct integer (e.g., "10")
try:
return int(zellij_pane)
except ValueError:
pass
# Try "terminal_N" format
parts = zellij_pane.split("_")
if len(parts) == 2 and parts[0] in ("terminal", "plugin"):
try:
return int(parts[1])
except ValueError:
pass
return None
def _inject_to_pane(self, zellij_session, pane_id, text, send_enter=True):
"""Inject text into a pane using zellij actions."""
env = os.environ.copy()
env["ZELLIJ_SESSION_NAME"] = zellij_session
# Try plugin first (no focus change), fall back to write-chars (changes focus)
if ZELLIJ_PLUGIN.exists():
result = self._try_plugin_inject(env, pane_id, text, send_enter)
if result["ok"]:
return result
# Plugin failed, fall back to write-chars
return self._try_write_chars_inject(env, text, send_enter)
def _try_plugin_inject(self, env, pane_id, text, send_enter=True):
"""Try injecting via zellij-send-keys plugin (no focus change)."""
payload = json.dumps({
"pane_id": pane_id,
"text": text,
"send_enter": send_enter,
})
try:
result = subprocess.run(
[
"zellij", "action", "pipe",
"--plugin", f"file:{ZELLIJ_PLUGIN}",
"--name", "send_keys",
"--floating-plugin", "false",
"--", payload,
],
env=env,
capture_output=True,
text=True,
timeout=3,
)
if result.returncode == 0:
return {"ok": True}
return {"ok": False, "error": result.stderr or "plugin failed"}
except subprocess.TimeoutExpired:
return {"ok": False, "error": "plugin timed out"}
except Exception as e:
return {"ok": False, "error": str(e)}
def _try_write_chars_inject(self, env, text, send_enter=True):
"""Inject via write-chars (writes to focused pane, simpler but changes focus)."""
try:
# Write the text
result = subprocess.run(
["zellij", "action", "write-chars", text],
env=env,
capture_output=True,
text=True,
timeout=2,
)
if result.returncode != 0:
return {"ok": False, "error": result.stderr or "write-chars failed"}
# Send Enter if requested
if send_enter:
result = subprocess.run(
["zellij", "action", "write", "13"], # 13 = Enter
env=env,
capture_output=True,
text=True,
timeout=2,
)
if result.returncode != 0:
return {"ok": False, "error": result.stderr or "write Enter failed"}
return {"ok": True}
except subprocess.TimeoutExpired:
return {"ok": False, "error": "write-chars timed out"}
except FileNotFoundError:
return {"ok": False, "error": "zellij not found in PATH"}
except Exception as e:
return {"ok": False, "error": str(e)}
def _json_error(self, code, message):
"""Send a JSON error response."""
response = json.dumps({"ok": False, "error": message}).encode()
self.send_response(code)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Content-Length", str(len(response)))
self.end_headers()
self.wfile.write(response)
def _cleanup_stale(self, sessions):
"""Remove orphan event logs >24h (no matching session file)."""
active_ids = {s.get("session_id") for s in sessions if s.get("session_id")}
now = time.time()
EVENTS_DIR.mkdir(parents=True, exist_ok=True)
for f in EVENTS_DIR.glob("*.jsonl"):
session_id = f.stem
if session_id not in active_ids:
try:
age = now - f.stat().st_mtime
if age > STALE_EVENT_AGE:
f.unlink()
except OSError:
pass
def log_message(self, format, *args):
"""Suppress default request logging to keep output clean."""
pass
def main():
server = HTTPServer(("127.0.0.1", PORT), AMCHandler)
print(f"AMC server listening on http://127.0.0.1:{PORT}")
# Write PID file
pid_file = DATA_DIR / "server.pid"
pid_file.write_text(str(os.getpid()))
try:
server.serve_forever()
except KeyboardInterrupt:
pass
finally:
pid_file.unlink(missing_ok=True)
server.server_close()
if __name__ == "__main__":
main()