Add conversation_mtime_ns field to session state that tracks the actual
modification time of conversation files. This enables more responsive
dashboard updates by detecting changes that occur between hook events
(e.g., during streaming tool execution).
Changes:
- state.py: Add _get_conversation_mtime() to stat conversation files
and include mtime_ns in session payloads when available
- conversation.py: Add stable message IDs (claude-{session}-{n} format)
for React key stability and message deduplication
- control.py: Fix FIFO eviction for dismissed_codex_ids - set.pop()
removes arbitrary element, now uses dict with insertion-order iteration
- context.py: Update dismissed_codex_ids type from set to dict
The mtime approach complements existing last_event_at tracking:
- last_event_at: Changes on hook events (session boundaries)
- conversation_mtime_ns: Changes on every file write (real-time)
Dashboard can now detect mid-session conversation updates without
waiting for the next hook event.
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
263 lines
9.8 KiB
Python
263 lines
9.8 KiB
Python
import json
|
|
import os
|
|
import subprocess
|
|
import time
|
|
|
|
from amc_server.context import SESSIONS_DIR, ZELLIJ_BIN, ZELLIJ_PLUGIN, _DISMISSED_MAX, _dismissed_codex_ids
|
|
from amc_server.logging_utils import LOGGER
|
|
|
|
|
|
class SessionControlMixin:
|
|
_FREEFORM_MODE_SWITCH_DELAY_SEC = 0.30
|
|
_DEFAULT_SUBMIT_ENTER_DELAY_SEC = 0.20
|
|
|
|
def _dismiss_session(self, session_id):
|
|
"""Delete a session file (manual dismiss from dashboard)."""
|
|
safe_id = os.path.basename(session_id)
|
|
session_file = SESSIONS_DIR / f"{safe_id}.json"
|
|
# Track dismissed Codex sessions to prevent re-discovery
|
|
# Evict oldest entries via FIFO (dict maintains insertion order in Python 3.7+)
|
|
while len(_dismissed_codex_ids) >= _DISMISSED_MAX:
|
|
oldest_key = next(iter(_dismissed_codex_ids))
|
|
del _dismissed_codex_ids[oldest_key]
|
|
_dismissed_codex_ids[safe_id] = True
|
|
session_file.unlink(missing_ok=True)
|
|
self._send_json(200, {"ok": True})
|
|
|
|
def _respond_to_session(self, session_id):
|
|
"""Inject a response into the session's Zellij pane."""
|
|
safe_id = os.path.basename(session_id)
|
|
session_file = SESSIONS_DIR / f"{safe_id}.json"
|
|
|
|
# Read request body
|
|
try:
|
|
content_length = int(self.headers.get("Content-Length", 0))
|
|
body = json.loads(self.rfile.read(content_length))
|
|
if not isinstance(body, dict):
|
|
self._json_error(400, "Invalid JSON body")
|
|
return
|
|
text = body.get("text", "")
|
|
is_freeform = body.get("freeform", False)
|
|
try:
|
|
option_count = int(body.get("optionCount", 0))
|
|
except (TypeError, ValueError):
|
|
option_count = 0
|
|
except (json.JSONDecodeError, ValueError):
|
|
self._json_error(400, "Invalid JSON body")
|
|
return
|
|
|
|
if not isinstance(text, str):
|
|
self._json_error(400, "Missing or empty 'text' field")
|
|
return
|
|
if not text or not text.strip():
|
|
self._json_error(400, "Missing or empty 'text' field")
|
|
return
|
|
|
|
# Load session
|
|
if not session_file.exists():
|
|
self._json_error(404, "Session not found")
|
|
return
|
|
|
|
try:
|
|
session = json.loads(session_file.read_text())
|
|
if not isinstance(session, dict):
|
|
self._json_error(500, "Failed to read session")
|
|
return
|
|
except (json.JSONDecodeError, OSError):
|
|
self._json_error(500, "Failed to read session")
|
|
return
|
|
|
|
zellij_session = session.get("zellij_session", "")
|
|
zellij_pane = session.get("zellij_pane", "")
|
|
|
|
if not zellij_session or not zellij_pane:
|
|
self._json_error(400, "Session missing Zellij pane info - cannot send input without a pane target")
|
|
return
|
|
|
|
# Parse pane ID from "terminal_N" format
|
|
pane_id = self._parse_pane_id(zellij_pane)
|
|
if pane_id is None:
|
|
self._json_error(400, f"Invalid pane format: {zellij_pane}")
|
|
return
|
|
|
|
# For freeform responses, we need two-step injection:
|
|
# 1. Send "Other" option number (optionCount + 1) WITHOUT Enter
|
|
# 2. Wait for Claude Code to switch to text input mode
|
|
# 3. Send the actual text WITH Enter
|
|
if is_freeform and option_count > 0:
|
|
other_num = str(option_count + 1)
|
|
result = self._inject_to_pane(zellij_session, pane_id, other_num, send_enter=False)
|
|
if not result["ok"]:
|
|
self._send_json(500, {"ok": False, "error": f"Failed to activate freeform mode: {result['error']}"})
|
|
return
|
|
# Delay for Claude Code to switch to text input mode
|
|
time.sleep(self._FREEFORM_MODE_SWITCH_DELAY_SEC)
|
|
|
|
# Inject the actual text first, then submit with delayed Enter.
|
|
result = self._inject_text_then_enter(zellij_session, pane_id, text)
|
|
|
|
if result["ok"]:
|
|
self._send_json(200, {"ok": True})
|
|
else:
|
|
self._send_json(500, {"ok": False, "error": result["error"]})
|
|
|
|
def _inject_text_then_enter(self, zellij_session, pane_id, text):
|
|
"""Send text and trigger Enter in two steps to avoid newline-only races."""
|
|
result = self._inject_to_pane(zellij_session, pane_id, text, send_enter=False)
|
|
if not result["ok"]:
|
|
return result
|
|
|
|
time.sleep(self._get_submit_enter_delay_sec())
|
|
# Send Enter as its own action after the text has landed.
|
|
return self._inject_to_pane(zellij_session, pane_id, "", send_enter=True)
|
|
|
|
def _get_submit_enter_delay_sec(self):
|
|
raw = os.environ.get("AMC_SUBMIT_ENTER_DELAY_MS", "").strip()
|
|
if not raw:
|
|
return self._DEFAULT_SUBMIT_ENTER_DELAY_SEC
|
|
try:
|
|
ms = float(raw)
|
|
if ms < 0:
|
|
return 0.0
|
|
if ms > 2000:
|
|
ms = 2000
|
|
return ms / 1000.0
|
|
except ValueError:
|
|
return self._DEFAULT_SUBMIT_ENTER_DELAY_SEC
|
|
|
|
def _parse_pane_id(self, zellij_pane):
|
|
"""Extract numeric pane ID from various formats."""
|
|
if not zellij_pane:
|
|
return None
|
|
|
|
# Try direct integer (e.g., "10")
|
|
try:
|
|
return int(zellij_pane)
|
|
except ValueError:
|
|
pass
|
|
|
|
# Try "terminal_N" format
|
|
parts = zellij_pane.split("_")
|
|
if len(parts) == 2 and parts[0] in ("terminal", "plugin"):
|
|
try:
|
|
return int(parts[1])
|
|
except ValueError:
|
|
pass
|
|
|
|
return None
|
|
|
|
def _inject_to_pane(self, zellij_session, pane_id, text, send_enter=True):
|
|
"""Inject text into a pane using zellij actions."""
|
|
env = os.environ.copy()
|
|
env["ZELLIJ_SESSION_NAME"] = zellij_session
|
|
# Best-effort: some zellij actions respect this pane env.
|
|
env["ZELLIJ_PANE_ID"] = f"terminal_{pane_id}"
|
|
|
|
# Pane-accurate routing requires the plugin.
|
|
if ZELLIJ_PLUGIN.exists():
|
|
result = self._try_plugin_inject(env, zellij_session, pane_id, text, send_enter)
|
|
if result["ok"]:
|
|
return result
|
|
LOGGER.warning(
|
|
"Plugin injection failed for session=%s pane=%s: %s",
|
|
zellij_session,
|
|
pane_id,
|
|
result.get("error", "unknown error"),
|
|
)
|
|
else:
|
|
LOGGER.warning("Zellij plugin missing at %s", ZELLIJ_PLUGIN)
|
|
|
|
# `write-chars` targets whichever pane is focused, which is unsafe for AMC.
|
|
if self._allow_unsafe_write_chars_fallback():
|
|
LOGGER.warning("Using unsafe write-chars fallback (focused pane only)")
|
|
return self._try_write_chars_inject(env, zellij_session, text, send_enter)
|
|
|
|
return {
|
|
"ok": False,
|
|
"error": (
|
|
"Pane-targeted injection requires zellij-send-keys plugin; "
|
|
"set AMC_ALLOW_UNSAFE_WRITE_CHARS_FALLBACK=1 to force focused-pane fallback"
|
|
),
|
|
}
|
|
|
|
def _allow_unsafe_write_chars_fallback(self):
|
|
value = os.environ.get("AMC_ALLOW_UNSAFE_WRITE_CHARS_FALLBACK", "").strip().lower()
|
|
return value in ("1", "true", "yes", "on")
|
|
|
|
def _try_plugin_inject(self, env, zellij_session, pane_id, text, send_enter=True):
|
|
"""Try injecting via zellij-send-keys plugin (no focus change)."""
|
|
payload = json.dumps({
|
|
"pane_id": pane_id,
|
|
"text": text,
|
|
"send_enter": bool(send_enter),
|
|
})
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
[
|
|
ZELLIJ_BIN,
|
|
"--session",
|
|
zellij_session,
|
|
"action",
|
|
"pipe",
|
|
"--plugin",
|
|
f"file:{ZELLIJ_PLUGIN}",
|
|
"--name",
|
|
"send_keys",
|
|
"--floating-plugin",
|
|
"false",
|
|
"--",
|
|
payload,
|
|
],
|
|
env=env,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=3,
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
return {"ok": True}
|
|
return {"ok": False, "error": result.stderr or "plugin failed"}
|
|
|
|
except subprocess.TimeoutExpired:
|
|
return {"ok": False, "error": "plugin timed out"}
|
|
except Exception as e:
|
|
return {"ok": False, "error": str(e)}
|
|
|
|
def _try_write_chars_inject(self, env, zellij_session, text, send_enter=True):
|
|
"""Inject via write-chars (UNSAFE: writes to focused pane)."""
|
|
try:
|
|
# Write the text
|
|
result = subprocess.run(
|
|
[ZELLIJ_BIN, "--session", zellij_session, "action", "write-chars", text],
|
|
env=env,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=2,
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
return {"ok": False, "error": result.stderr or "write-chars failed"}
|
|
|
|
# Send Enter if requested
|
|
if send_enter:
|
|
result = subprocess.run(
|
|
[ZELLIJ_BIN, "--session", zellij_session, "action", "write", "13"], # 13 = Enter
|
|
env=env,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=2,
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
return {"ok": False, "error": result.stderr or "write Enter failed"}
|
|
|
|
return {"ok": True}
|
|
|
|
except subprocess.TimeoutExpired:
|
|
return {"ok": False, "error": "write-chars timed out"}
|
|
except FileNotFoundError:
|
|
return {"ok": False, "error": f"zellij not found (resolved binary: {ZELLIJ_BIN})"}
|
|
except Exception as e:
|
|
return {"ok": False, "error": str(e)}
|