Files
amc/amc_server/mixins/control.py
teernisse dcbaf12f07 feat(server): add conversation mtime tracking for real-time updates
Add conversation_mtime_ns field to session state that tracks the actual
modification time of conversation files. This enables more responsive
dashboard updates by detecting changes that occur between hook events
(e.g., during streaming tool execution).

Changes:
- state.py: Add _get_conversation_mtime() to stat conversation files
  and include mtime_ns in session payloads when available
- conversation.py: Add stable message IDs (claude-{session}-{n} format)
  for React key stability and message deduplication
- control.py: Fix FIFO eviction for dismissed_codex_ids - set.pop()
  removes arbitrary element, now uses dict with insertion-order iteration
- context.py: Update dismissed_codex_ids type from set to dict

The mtime approach complements existing last_event_at tracking:
- last_event_at: Changes on hook events (session boundaries)
- conversation_mtime_ns: Changes on every file write (real-time)

Dashboard can now detect mid-session conversation updates without
waiting for the next hook event.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-26 15:23:42 -05:00

263 lines
9.8 KiB
Python

import json
import os
import subprocess
import time
from amc_server.context import SESSIONS_DIR, ZELLIJ_BIN, ZELLIJ_PLUGIN, _DISMISSED_MAX, _dismissed_codex_ids
from amc_server.logging_utils import LOGGER
class SessionControlMixin:
_FREEFORM_MODE_SWITCH_DELAY_SEC = 0.30
_DEFAULT_SUBMIT_ENTER_DELAY_SEC = 0.20
def _dismiss_session(self, session_id):
"""Delete a session file (manual dismiss from dashboard)."""
safe_id = os.path.basename(session_id)
session_file = SESSIONS_DIR / f"{safe_id}.json"
# Track dismissed Codex sessions to prevent re-discovery
# Evict oldest entries via FIFO (dict maintains insertion order in Python 3.7+)
while len(_dismissed_codex_ids) >= _DISMISSED_MAX:
oldest_key = next(iter(_dismissed_codex_ids))
del _dismissed_codex_ids[oldest_key]
_dismissed_codex_ids[safe_id] = True
session_file.unlink(missing_ok=True)
self._send_json(200, {"ok": True})
def _respond_to_session(self, session_id):
"""Inject a response into the session's Zellij pane."""
safe_id = os.path.basename(session_id)
session_file = SESSIONS_DIR / f"{safe_id}.json"
# Read request body
try:
content_length = int(self.headers.get("Content-Length", 0))
body = json.loads(self.rfile.read(content_length))
if not isinstance(body, dict):
self._json_error(400, "Invalid JSON body")
return
text = body.get("text", "")
is_freeform = body.get("freeform", False)
try:
option_count = int(body.get("optionCount", 0))
except (TypeError, ValueError):
option_count = 0
except (json.JSONDecodeError, ValueError):
self._json_error(400, "Invalid JSON body")
return
if not isinstance(text, str):
self._json_error(400, "Missing or empty 'text' field")
return
if not text or not text.strip():
self._json_error(400, "Missing or empty 'text' field")
return
# Load session
if not session_file.exists():
self._json_error(404, "Session not found")
return
try:
session = json.loads(session_file.read_text())
if not isinstance(session, dict):
self._json_error(500, "Failed to read session")
return
except (json.JSONDecodeError, OSError):
self._json_error(500, "Failed to read session")
return
zellij_session = session.get("zellij_session", "")
zellij_pane = session.get("zellij_pane", "")
if not zellij_session or not zellij_pane:
self._json_error(400, "Session missing Zellij pane info - cannot send input without a pane target")
return
# Parse pane ID from "terminal_N" format
pane_id = self._parse_pane_id(zellij_pane)
if pane_id is None:
self._json_error(400, f"Invalid pane format: {zellij_pane}")
return
# For freeform responses, we need two-step injection:
# 1. Send "Other" option number (optionCount + 1) WITHOUT Enter
# 2. Wait for Claude Code to switch to text input mode
# 3. Send the actual text WITH Enter
if is_freeform and option_count > 0:
other_num = str(option_count + 1)
result = self._inject_to_pane(zellij_session, pane_id, other_num, send_enter=False)
if not result["ok"]:
self._send_json(500, {"ok": False, "error": f"Failed to activate freeform mode: {result['error']}"})
return
# Delay for Claude Code to switch to text input mode
time.sleep(self._FREEFORM_MODE_SWITCH_DELAY_SEC)
# Inject the actual text first, then submit with delayed Enter.
result = self._inject_text_then_enter(zellij_session, pane_id, text)
if result["ok"]:
self._send_json(200, {"ok": True})
else:
self._send_json(500, {"ok": False, "error": result["error"]})
def _inject_text_then_enter(self, zellij_session, pane_id, text):
"""Send text and trigger Enter in two steps to avoid newline-only races."""
result = self._inject_to_pane(zellij_session, pane_id, text, send_enter=False)
if not result["ok"]:
return result
time.sleep(self._get_submit_enter_delay_sec())
# Send Enter as its own action after the text has landed.
return self._inject_to_pane(zellij_session, pane_id, "", send_enter=True)
def _get_submit_enter_delay_sec(self):
raw = os.environ.get("AMC_SUBMIT_ENTER_DELAY_MS", "").strip()
if not raw:
return self._DEFAULT_SUBMIT_ENTER_DELAY_SEC
try:
ms = float(raw)
if ms < 0:
return 0.0
if ms > 2000:
ms = 2000
return ms / 1000.0
except ValueError:
return self._DEFAULT_SUBMIT_ENTER_DELAY_SEC
def _parse_pane_id(self, zellij_pane):
"""Extract numeric pane ID from various formats."""
if not zellij_pane:
return None
# Try direct integer (e.g., "10")
try:
return int(zellij_pane)
except ValueError:
pass
# Try "terminal_N" format
parts = zellij_pane.split("_")
if len(parts) == 2 and parts[0] in ("terminal", "plugin"):
try:
return int(parts[1])
except ValueError:
pass
return None
def _inject_to_pane(self, zellij_session, pane_id, text, send_enter=True):
"""Inject text into a pane using zellij actions."""
env = os.environ.copy()
env["ZELLIJ_SESSION_NAME"] = zellij_session
# Best-effort: some zellij actions respect this pane env.
env["ZELLIJ_PANE_ID"] = f"terminal_{pane_id}"
# Pane-accurate routing requires the plugin.
if ZELLIJ_PLUGIN.exists():
result = self._try_plugin_inject(env, zellij_session, pane_id, text, send_enter)
if result["ok"]:
return result
LOGGER.warning(
"Plugin injection failed for session=%s pane=%s: %s",
zellij_session,
pane_id,
result.get("error", "unknown error"),
)
else:
LOGGER.warning("Zellij plugin missing at %s", ZELLIJ_PLUGIN)
# `write-chars` targets whichever pane is focused, which is unsafe for AMC.
if self._allow_unsafe_write_chars_fallback():
LOGGER.warning("Using unsafe write-chars fallback (focused pane only)")
return self._try_write_chars_inject(env, zellij_session, text, send_enter)
return {
"ok": False,
"error": (
"Pane-targeted injection requires zellij-send-keys plugin; "
"set AMC_ALLOW_UNSAFE_WRITE_CHARS_FALLBACK=1 to force focused-pane fallback"
),
}
def _allow_unsafe_write_chars_fallback(self):
value = os.environ.get("AMC_ALLOW_UNSAFE_WRITE_CHARS_FALLBACK", "").strip().lower()
return value in ("1", "true", "yes", "on")
def _try_plugin_inject(self, env, zellij_session, pane_id, text, send_enter=True):
"""Try injecting via zellij-send-keys plugin (no focus change)."""
payload = json.dumps({
"pane_id": pane_id,
"text": text,
"send_enter": bool(send_enter),
})
try:
result = subprocess.run(
[
ZELLIJ_BIN,
"--session",
zellij_session,
"action",
"pipe",
"--plugin",
f"file:{ZELLIJ_PLUGIN}",
"--name",
"send_keys",
"--floating-plugin",
"false",
"--",
payload,
],
env=env,
capture_output=True,
text=True,
timeout=3,
)
if result.returncode == 0:
return {"ok": True}
return {"ok": False, "error": result.stderr or "plugin failed"}
except subprocess.TimeoutExpired:
return {"ok": False, "error": "plugin timed out"}
except Exception as e:
return {"ok": False, "error": str(e)}
def _try_write_chars_inject(self, env, zellij_session, text, send_enter=True):
"""Inject via write-chars (UNSAFE: writes to focused pane)."""
try:
# Write the text
result = subprocess.run(
[ZELLIJ_BIN, "--session", zellij_session, "action", "write-chars", text],
env=env,
capture_output=True,
text=True,
timeout=2,
)
if result.returncode != 0:
return {"ok": False, "error": result.stderr or "write-chars failed"}
# Send Enter if requested
if send_enter:
result = subprocess.run(
[ZELLIJ_BIN, "--session", zellij_session, "action", "write", "13"], # 13 = Enter
env=env,
capture_output=True,
text=True,
timeout=2,
)
if result.returncode != 0:
return {"ok": False, "error": result.stderr or "write Enter failed"}
return {"ok": True}
except subprocess.TimeoutExpired:
return {"ok": False, "error": "write-chars timed out"}
except FileNotFoundError:
return {"ok": False, "error": f"zellij not found (resolved binary: {ZELLIJ_BIN})"}
except Exception as e:
return {"ok": False, "error": str(e)}