Closes bd-3ny. Added mousedown listener that dismisses the dropdown when clicking outside both the dropdown and textarea. Uses early return to avoid registering listeners when dropdown is already closed.
296 lines
11 KiB
Python
296 lines
11 KiB
Python
import json
|
|
import os
|
|
import subprocess
|
|
import time
|
|
|
|
from amc_server.context import SESSIONS_DIR, ZELLIJ_BIN, ZELLIJ_PLUGIN, _DISMISSED_MAX, _dismissed_codex_ids
|
|
from amc_server.logging_utils import LOGGER
|
|
|
|
|
|
class SessionControlMixin:
|
|
_FREEFORM_MODE_SWITCH_DELAY_SEC = 0.30
|
|
_DEFAULT_SUBMIT_ENTER_DELAY_SEC = 0.20
|
|
|
|
def _dismiss_session(self, session_id):
|
|
"""Delete a session file (manual dismiss from dashboard)."""
|
|
safe_id = os.path.basename(session_id)
|
|
session_file = SESSIONS_DIR / f"{safe_id}.json"
|
|
# Track dismissed Codex sessions to prevent re-discovery
|
|
# Evict oldest entries via FIFO (dict maintains insertion order in Python 3.7+)
|
|
while len(_dismissed_codex_ids) >= _DISMISSED_MAX:
|
|
oldest_key = next(iter(_dismissed_codex_ids))
|
|
del _dismissed_codex_ids[oldest_key]
|
|
_dismissed_codex_ids[safe_id] = True
|
|
session_file.unlink(missing_ok=True)
|
|
self._send_json(200, {"ok": True})
|
|
|
|
def _dismiss_dead_sessions(self):
|
|
"""Delete all dead session files (clear all from dashboard).
|
|
|
|
Note: is_dead is computed dynamically, not stored on disk, so we must
|
|
recompute it here using the same logic as _collect_sessions.
|
|
"""
|
|
# Get liveness data (same as _collect_sessions)
|
|
active_zellij_sessions = self._get_active_zellij_sessions()
|
|
active_transcript_files = self._get_active_transcript_files()
|
|
|
|
dismissed_count = 0
|
|
for f in SESSIONS_DIR.glob("*.json"):
|
|
try:
|
|
data = json.loads(f.read_text())
|
|
if not isinstance(data, dict):
|
|
continue
|
|
# Recompute is_dead (it's not persisted to disk)
|
|
is_dead = self._is_session_dead(
|
|
data, active_zellij_sessions, active_transcript_files
|
|
)
|
|
if is_dead:
|
|
safe_id = f.stem
|
|
# Track dismissed Codex sessions
|
|
while len(_dismissed_codex_ids) >= _DISMISSED_MAX:
|
|
oldest_key = next(iter(_dismissed_codex_ids))
|
|
del _dismissed_codex_ids[oldest_key]
|
|
_dismissed_codex_ids[safe_id] = True
|
|
f.unlink(missing_ok=True)
|
|
dismissed_count += 1
|
|
except (json.JSONDecodeError, OSError):
|
|
continue
|
|
self._send_json(200, {"ok": True, "dismissed": dismissed_count})
|
|
|
|
def _respond_to_session(self, session_id):
|
|
"""Inject a response into the session's Zellij pane."""
|
|
safe_id = os.path.basename(session_id)
|
|
session_file = SESSIONS_DIR / f"{safe_id}.json"
|
|
|
|
# Read request body
|
|
try:
|
|
content_length = int(self.headers.get("Content-Length", 0))
|
|
body = json.loads(self.rfile.read(content_length))
|
|
if not isinstance(body, dict):
|
|
self._json_error(400, "Invalid JSON body")
|
|
return
|
|
text = body.get("text", "")
|
|
is_freeform = body.get("freeform", False)
|
|
try:
|
|
option_count = int(body.get("optionCount", 0))
|
|
except (TypeError, ValueError):
|
|
option_count = 0
|
|
except (json.JSONDecodeError, ValueError):
|
|
self._json_error(400, "Invalid JSON body")
|
|
return
|
|
|
|
if not isinstance(text, str):
|
|
self._json_error(400, "Missing or empty 'text' field")
|
|
return
|
|
if not text or not text.strip():
|
|
self._json_error(400, "Missing or empty 'text' field")
|
|
return
|
|
|
|
# Load session
|
|
if not session_file.exists():
|
|
self._json_error(404, "Session not found")
|
|
return
|
|
|
|
try:
|
|
session = json.loads(session_file.read_text())
|
|
if not isinstance(session, dict):
|
|
self._json_error(500, "Failed to read session")
|
|
return
|
|
except (json.JSONDecodeError, OSError):
|
|
self._json_error(500, "Failed to read session")
|
|
return
|
|
|
|
zellij_session = session.get("zellij_session", "")
|
|
zellij_pane = session.get("zellij_pane", "")
|
|
|
|
if not zellij_session or not zellij_pane:
|
|
self._json_error(400, "Session missing Zellij pane info - cannot send input without a pane target")
|
|
return
|
|
|
|
# Parse pane ID from "terminal_N" format
|
|
pane_id = self._parse_pane_id(zellij_pane)
|
|
if pane_id is None:
|
|
self._json_error(400, f"Invalid pane format: {zellij_pane}")
|
|
return
|
|
|
|
# For freeform responses, we need two-step injection:
|
|
# 1. Send "Other" option number (optionCount + 1) WITHOUT Enter
|
|
# 2. Wait for Claude Code to switch to text input mode
|
|
# 3. Send the actual text WITH Enter
|
|
if is_freeform and option_count > 0:
|
|
other_num = str(option_count + 1)
|
|
result = self._inject_to_pane(zellij_session, pane_id, other_num, send_enter=False)
|
|
if not result["ok"]:
|
|
self._send_json(500, {"ok": False, "error": f"Failed to activate freeform mode: {result['error']}"})
|
|
return
|
|
# Delay for Claude Code to switch to text input mode
|
|
time.sleep(self._FREEFORM_MODE_SWITCH_DELAY_SEC)
|
|
|
|
# Inject the actual text first, then submit with delayed Enter.
|
|
result = self._inject_text_then_enter(zellij_session, pane_id, text)
|
|
|
|
if result["ok"]:
|
|
self._send_json(200, {"ok": True})
|
|
else:
|
|
self._send_json(500, {"ok": False, "error": result["error"]})
|
|
|
|
def _inject_text_then_enter(self, zellij_session, pane_id, text):
|
|
"""Send text and trigger Enter in two steps to avoid newline-only races."""
|
|
result = self._inject_to_pane(zellij_session, pane_id, text, send_enter=False)
|
|
if not result["ok"]:
|
|
return result
|
|
|
|
time.sleep(self._get_submit_enter_delay_sec())
|
|
# Send Enter as its own action after the text has landed.
|
|
return self._inject_to_pane(zellij_session, pane_id, "", send_enter=True)
|
|
|
|
def _get_submit_enter_delay_sec(self):
|
|
raw = os.environ.get("AMC_SUBMIT_ENTER_DELAY_MS", "").strip()
|
|
if not raw:
|
|
return self._DEFAULT_SUBMIT_ENTER_DELAY_SEC
|
|
try:
|
|
ms = float(raw)
|
|
if ms < 0:
|
|
return 0.0
|
|
if ms > 2000:
|
|
ms = 2000
|
|
return ms / 1000.0
|
|
except ValueError:
|
|
return self._DEFAULT_SUBMIT_ENTER_DELAY_SEC
|
|
|
|
def _parse_pane_id(self, zellij_pane):
|
|
"""Extract numeric pane ID from various formats."""
|
|
if not zellij_pane:
|
|
return None
|
|
|
|
# Try direct integer (e.g., "10")
|
|
try:
|
|
return int(zellij_pane)
|
|
except ValueError:
|
|
pass
|
|
|
|
# Try "terminal_N" format
|
|
parts = zellij_pane.split("_")
|
|
if len(parts) == 2 and parts[0] in ("terminal", "plugin"):
|
|
try:
|
|
return int(parts[1])
|
|
except ValueError:
|
|
pass
|
|
|
|
return None
|
|
|
|
def _inject_to_pane(self, zellij_session, pane_id, text, send_enter=True):
|
|
"""Inject text into a pane using zellij actions."""
|
|
env = os.environ.copy()
|
|
env["ZELLIJ_SESSION_NAME"] = zellij_session
|
|
# Best-effort: some zellij actions respect this pane env.
|
|
env["ZELLIJ_PANE_ID"] = f"terminal_{pane_id}"
|
|
|
|
# Pane-accurate routing requires the plugin.
|
|
if ZELLIJ_PLUGIN.exists():
|
|
result = self._try_plugin_inject(env, zellij_session, pane_id, text, send_enter)
|
|
if result["ok"]:
|
|
return result
|
|
LOGGER.warning(
|
|
"Plugin injection failed for session=%s pane=%s: %s",
|
|
zellij_session,
|
|
pane_id,
|
|
result.get("error", "unknown error"),
|
|
)
|
|
else:
|
|
LOGGER.warning("Zellij plugin missing at %s", ZELLIJ_PLUGIN)
|
|
|
|
# `write-chars` targets whichever pane is focused, which is unsafe for AMC.
|
|
if self._allow_unsafe_write_chars_fallback():
|
|
LOGGER.warning("Using unsafe write-chars fallback (focused pane only)")
|
|
return self._try_write_chars_inject(env, zellij_session, text, send_enter)
|
|
|
|
return {
|
|
"ok": False,
|
|
"error": (
|
|
"Pane-targeted injection requires zellij-send-keys plugin; "
|
|
"set AMC_ALLOW_UNSAFE_WRITE_CHARS_FALLBACK=1 to force focused-pane fallback"
|
|
),
|
|
}
|
|
|
|
def _allow_unsafe_write_chars_fallback(self):
|
|
value = os.environ.get("AMC_ALLOW_UNSAFE_WRITE_CHARS_FALLBACK", "").strip().lower()
|
|
return value in ("1", "true", "yes", "on")
|
|
|
|
def _try_plugin_inject(self, env, zellij_session, pane_id, text, send_enter=True):
|
|
"""Try injecting via zellij-send-keys plugin (no focus change)."""
|
|
payload = json.dumps({
|
|
"pane_id": pane_id,
|
|
"text": text,
|
|
"send_enter": bool(send_enter),
|
|
})
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
[
|
|
ZELLIJ_BIN,
|
|
"--session",
|
|
zellij_session,
|
|
"action",
|
|
"pipe",
|
|
"--plugin",
|
|
f"file:{ZELLIJ_PLUGIN}",
|
|
"--name",
|
|
"send_keys",
|
|
"--floating-plugin",
|
|
"false",
|
|
"--",
|
|
payload,
|
|
],
|
|
env=env,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=3,
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
return {"ok": True}
|
|
return {"ok": False, "error": result.stderr or "plugin failed"}
|
|
|
|
except subprocess.TimeoutExpired:
|
|
return {"ok": False, "error": "plugin timed out"}
|
|
except Exception as e:
|
|
return {"ok": False, "error": str(e)}
|
|
|
|
def _try_write_chars_inject(self, env, zellij_session, text, send_enter=True):
|
|
"""Inject via write-chars (UNSAFE: writes to focused pane)."""
|
|
try:
|
|
# Write the text
|
|
result = subprocess.run(
|
|
[ZELLIJ_BIN, "--session", zellij_session, "action", "write-chars", text],
|
|
env=env,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=2,
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
return {"ok": False, "error": result.stderr or "write-chars failed"}
|
|
|
|
# Send Enter if requested
|
|
if send_enter:
|
|
result = subprocess.run(
|
|
[ZELLIJ_BIN, "--session", zellij_session, "action", "write", "13"], # 13 = Enter
|
|
env=env,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=2,
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
return {"ok": False, "error": result.stderr or "write Enter failed"}
|
|
|
|
return {"ok": True}
|
|
|
|
except subprocess.TimeoutExpired:
|
|
return {"ok": False, "error": "write-chars timed out"}
|
|
except FileNotFoundError:
|
|
return {"ok": False, "error": f"zellij not found (resolved binary: {ZELLIJ_BIN})"}
|
|
except Exception as e:
|
|
return {"ok": False, "error": str(e)}
|