import hashlib import json import subprocess import time from datetime import datetime, timezone from pathlib import Path from amc_server.context import ( EVENTS_DIR, SESSIONS_DIR, STALE_EVENT_AGE, STALE_STARTING_AGE, ZELLIJ_BIN, _state_lock, _zellij_cache, ) from amc_server.logging_utils import LOGGER class StateMixin: def _serve_state(self): payload = self._build_state_payload() self._send_json(200, payload) def _serve_stream(self): """SSE stream of full state snapshots, emitted on change.""" self.send_response(200) self.send_header("Content-Type", "text/event-stream") self.send_header("Cache-Control", "no-cache") self.send_header("Connection", "keep-alive") self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() # Ask clients to reconnect quickly on transient errors. try: self.wfile.write(b"retry: 2000\n\n") self.wfile.flush() except (BrokenPipeError, ConnectionResetError, OSError): return last_hash = None event_id = 0 last_heartbeat_at = time.time() heartbeat_interval = 15 poll_interval = 1 try: while True: payload = self._build_state_payload() payload_json = json.dumps(payload, separators=(",", ":")) payload_hash = hashlib.sha1(payload_json.encode("utf-8")).hexdigest() if payload_hash != last_hash: event_id += 1 self._write_sse_event("state", payload_json, event_id) last_hash = payload_hash now = time.time() if now - last_heartbeat_at >= heartbeat_interval: self.wfile.write(b": ping\n\n") self.wfile.flush() last_heartbeat_at = now time.sleep(poll_interval) except (BrokenPipeError, ConnectionResetError, OSError): # Client disconnected. return except Exception: LOGGER.exception("Unhandled SSE stream error") return def _write_sse_event(self, event_name, data, event_id): """Write one SSE event frame.""" # JSON payload is compact single-line; still split defensively for SSE format. frame = [f"id: {event_id}", f"event: {event_name}"] for line in str(data).splitlines(): frame.append(f"data: {line}") frame.append("") frame.append("") self.wfile.write("\n".join(frame).encode("utf-8")) self.wfile.flush() def _build_state_payload(self): """Build `/api/state` payload data used by JSON and SSE endpoints.""" sessions = self._collect_sessions() return { "sessions": sessions, "server_time": datetime.now(timezone.utc).isoformat(), } def _collect_sessions(self): """Collect and normalize all session records from disk.""" with _state_lock: sessions = [] SESSIONS_DIR.mkdir(parents=True, exist_ok=True) # Discover active Codex sessions and create session files for them self._discover_active_codex_sessions() # Get active Zellij sessions for liveness check active_zellij_sessions = self._get_active_zellij_sessions() for f in SESSIONS_DIR.glob("*.json"): try: data = json.loads(f.read_text()) if not isinstance(data, dict): continue # Proactive liveness check: only auto-delete orphan "starting" sessions. # Other statuses can still be useful as historical/debug context. zellij_session = data.get("zellij_session", "") if zellij_session and active_zellij_sessions is not None: if zellij_session not in active_zellij_sessions: if data.get("status") == "starting": # A missing Zellij session while "starting" indicates an orphan. f.unlink(missing_ok=True) continue context_usage = self._get_context_usage_for_session(data) if context_usage: data["context_usage"] = context_usage # Track conversation file mtime for real-time update detection conv_mtime = self._get_conversation_mtime(data) if conv_mtime: data["conversation_mtime_ns"] = conv_mtime sessions.append(data) except (json.JSONDecodeError, OSError): continue except Exception: LOGGER.exception("Failed processing session file %s", f) continue # Sort by last_event_at descending sessions.sort(key=lambda s: s.get("last_event_at", ""), reverse=True) # Clean orphan event logs (sessions persist until manually dismissed or SessionEnd) self._cleanup_stale(sessions) return sessions def _get_active_zellij_sessions(self): """Query Zellij for active sessions. Returns set of session names, or None on error.""" now = time.time() # Use cached value if fresh (cache for 5 seconds to avoid hammering zellij) if _zellij_cache["sessions"] is not None and now < _zellij_cache["expires"]: return _zellij_cache["sessions"] try: result = subprocess.run( [ZELLIJ_BIN, "list-sessions", "--no-formatting"], capture_output=True, text=True, timeout=2, ) if result.returncode == 0: # Parse session names (one per line, format: "session_name [created ...]" or just "session_name") sessions = set() for line in result.stdout.strip().splitlines(): if line: # Session name is the first word session_name = line.split()[0] if line.split() else "" if session_name: sessions.add(session_name) _zellij_cache["sessions"] = sessions _zellij_cache["expires"] = now + 5 # Cache for 5 seconds return sessions except (subprocess.TimeoutExpired, FileNotFoundError, Exception): pass return None # Return None on error (don't clean up if we can't verify) def _get_conversation_mtime(self, session_data): """Get the conversation file's mtime for real-time change detection.""" agent = session_data.get("agent") if agent == "claude": conv_file = self._get_claude_conversation_file( session_data.get("session_id", ""), session_data.get("project_dir", ""), ) if conv_file: try: return conv_file.stat().st_mtime_ns except OSError: pass elif agent == "codex": transcript_path = session_data.get("transcript_path", "") if transcript_path: try: return Path(transcript_path).stat().st_mtime_ns except OSError: pass # Fallback to discovery transcript_file = self._find_codex_transcript_file(session_data.get("session_id", "")) if transcript_file: try: return transcript_file.stat().st_mtime_ns except OSError: pass return None def _cleanup_stale(self, sessions): """Remove orphan event logs >24h and stale 'starting' sessions >1h.""" active_ids = {s.get("session_id") for s in sessions if s.get("session_id")} now = time.time() # Clean up orphan event logs EVENTS_DIR.mkdir(parents=True, exist_ok=True) for f in EVENTS_DIR.glob("*.jsonl"): session_id = f.stem if session_id not in active_ids: try: age = now - f.stat().st_mtime if age > STALE_EVENT_AGE: f.unlink() except OSError: pass # Clean up orphan "starting" sessions (never became active) for f in SESSIONS_DIR.glob("*.json"): try: age = now - f.stat().st_mtime if age > STALE_STARTING_AGE: data = json.loads(f.read_text()) if data.get("status") == "starting": f.unlink() except (json.JSONDecodeError, OSError): pass