Closes bd-3ny. Added mousedown listener that dismisses the dropdown when clicking outside both the dropdown and textarea. Uses early return to avoid registering listeners when dropdown is already closed.
301 lines
10 KiB
Python
Executable File
301 lines
10 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""AMC hook — writes Claude Code session state to disk.
|
|
|
|
Called by Claude Code hooks:
|
|
SessionStart, UserPromptSubmit, Stop, SessionEnd
|
|
PreToolUse(AskUserQuestion), PostToolUse(AskUserQuestion)
|
|
|
|
Reads hook JSON from stdin, writes session state + appends event log.
|
|
|
|
MUST be fail-open: never exit nonzero, never block, never crash Claude.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
DATA_DIR = Path.home() / ".local" / "share" / "amc"
|
|
SESSIONS_DIR = DATA_DIR / "sessions"
|
|
EVENTS_DIR = DATA_DIR / "events"
|
|
|
|
STATUS_MAP = {
|
|
"SessionStart": "starting",
|
|
"UserPromptSubmit": "active",
|
|
"Stop": "done",
|
|
}
|
|
|
|
MAX_PREVIEW_LEN = 200
|
|
MAX_QUESTION_LEN = 500
|
|
|
|
|
|
def _detect_prose_question(message):
|
|
"""Detect if message ends with a question. Returns question text or None."""
|
|
if not message:
|
|
return None
|
|
|
|
# Strip trailing whitespace and check for question mark
|
|
text = message.rstrip()
|
|
if not text.endswith("?"):
|
|
return None
|
|
|
|
# Extract the question - find the last paragraph or sentence with "?"
|
|
# Split by double newlines (paragraphs) first
|
|
paragraphs = text.split("\n\n")
|
|
last_para = paragraphs[-1].strip()
|
|
|
|
# If the last paragraph has a question mark, use it
|
|
if "?" in last_para:
|
|
# Truncate if too long
|
|
if len(last_para) > MAX_QUESTION_LEN:
|
|
last_para = last_para[-MAX_QUESTION_LEN:]
|
|
# Try to start at a sentence boundary
|
|
first_period = last_para.find(". ")
|
|
if first_period > 0:
|
|
last_para = last_para[first_period + 2:]
|
|
return last_para
|
|
|
|
return None
|
|
|
|
|
|
def _extract_questions(hook):
|
|
"""Extract question text from AskUserQuestion tool_input."""
|
|
tool_input = hook.get("tool_input", {})
|
|
if isinstance(tool_input, str):
|
|
try:
|
|
tool_input = json.loads(tool_input)
|
|
except (json.JSONDecodeError, TypeError):
|
|
return []
|
|
|
|
# Guard against non-dict tool_input (null, list, etc.)
|
|
if not isinstance(tool_input, dict):
|
|
return []
|
|
|
|
questions = tool_input.get("questions") or []
|
|
result = []
|
|
for q in questions:
|
|
entry = {
|
|
"question": q.get("question", ""),
|
|
"header": q.get("header", ""),
|
|
"options": [],
|
|
}
|
|
for opt in q.get("options", []):
|
|
opt_entry = {
|
|
"label": opt.get("label", ""),
|
|
"description": opt.get("description", ""),
|
|
}
|
|
# Include markdown preview if present
|
|
if opt.get("markdown"):
|
|
opt_entry["markdown"] = opt.get("markdown")
|
|
entry["options"].append(opt_entry)
|
|
result.append(entry)
|
|
return result
|
|
|
|
|
|
def _atomic_write(path, data):
|
|
"""Write JSON data atomically via temp file + os.replace()."""
|
|
fd, tmp = tempfile.mkstemp(dir=path.parent, suffix=".tmp")
|
|
closed = False
|
|
try:
|
|
os.write(fd, json.dumps(data, indent=2).encode())
|
|
os.close(fd)
|
|
closed = True
|
|
os.replace(tmp, path)
|
|
except Exception:
|
|
if not closed:
|
|
try:
|
|
os.close(fd)
|
|
except OSError:
|
|
pass
|
|
try:
|
|
os.unlink(tmp)
|
|
except OSError:
|
|
pass
|
|
raise
|
|
|
|
|
|
def _read_session(session_file):
|
|
"""Read existing session state, or return empty dict."""
|
|
if session_file.exists():
|
|
try:
|
|
return json.loads(session_file.read_text())
|
|
except (json.JSONDecodeError, OSError):
|
|
pass
|
|
return {}
|
|
|
|
|
|
def main():
|
|
try:
|
|
raw = sys.stdin.read()
|
|
if not raw.strip():
|
|
return
|
|
|
|
hook = json.loads(raw)
|
|
event = hook.get("hook_event_name", "")
|
|
session_id = hook.get("session_id", "")
|
|
if not session_id or not event:
|
|
return
|
|
|
|
SESSIONS_DIR.mkdir(parents=True, exist_ok=True)
|
|
EVENTS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
|
|
# Sanitize session_id to prevent path traversal
|
|
session_id = os.path.basename(session_id)
|
|
if not session_id:
|
|
return
|
|
|
|
session_file = SESSIONS_DIR / f"{session_id}.json"
|
|
|
|
# SessionEnd: delete session file (session is gone)
|
|
if event == "SessionEnd":
|
|
_append_event(session_id, {"event": event, "at": now})
|
|
try:
|
|
session_file.unlink(missing_ok=True)
|
|
except OSError:
|
|
pass
|
|
return
|
|
|
|
# PreToolUse(AskUserQuestion): mark needs_attention + store questions
|
|
if event == "PreToolUse":
|
|
tool_name = hook.get("tool_name", "")
|
|
if tool_name == "AskUserQuestion":
|
|
existing = _read_session(session_file)
|
|
if not existing:
|
|
return
|
|
existing["status"] = "needs_attention"
|
|
existing["last_event"] = f"PreToolUse({tool_name})"
|
|
existing["last_event_at"] = now
|
|
existing["pending_questions"] = _extract_questions(hook)
|
|
# Track when turn paused for duration calculation
|
|
existing["turn_paused_at"] = now
|
|
_atomic_write(session_file, existing)
|
|
_append_event(session_id, {
|
|
"event": f"PreToolUse({tool_name})",
|
|
"at": now,
|
|
"status": "needs_attention",
|
|
})
|
|
return
|
|
|
|
# PostToolUse(AskUserQuestion): question answered, back to active
|
|
if event == "PostToolUse":
|
|
tool_name = hook.get("tool_name", "")
|
|
if tool_name == "AskUserQuestion":
|
|
existing = _read_session(session_file)
|
|
if not existing:
|
|
return
|
|
existing["status"] = "active"
|
|
existing["last_event"] = f"PostToolUse({tool_name})"
|
|
existing["last_event_at"] = now
|
|
existing.pop("pending_questions", None)
|
|
# Accumulate paused time for turn duration calculation
|
|
paused_at = existing.pop("turn_paused_at", None)
|
|
if paused_at:
|
|
try:
|
|
paused_start = datetime.fromisoformat(paused_at.replace("Z", "+00:00"))
|
|
paused_end = datetime.fromisoformat(now.replace("Z", "+00:00"))
|
|
paused_ms = int((paused_end - paused_start).total_seconds() * 1000)
|
|
existing["turn_paused_ms"] = existing.get("turn_paused_ms", 0) + paused_ms
|
|
except (ValueError, TypeError):
|
|
pass
|
|
_atomic_write(session_file, existing)
|
|
_append_event(session_id, {
|
|
"event": f"PostToolUse({tool_name})",
|
|
"at": now,
|
|
"status": "active",
|
|
})
|
|
return
|
|
|
|
# Guard: don't resurrect a session after SessionEnd deleted it.
|
|
if event != "SessionStart" and not session_file.exists():
|
|
return
|
|
|
|
# Build session state for SessionStart, UserPromptSubmit, Stop
|
|
project_dir = os.environ.get("CLAUDE_PROJECT_DIR", hook.get("cwd", ""))
|
|
project = os.path.basename(project_dir) if project_dir else "unknown"
|
|
|
|
existing = _read_session(session_file)
|
|
|
|
# Get the full message for question detection
|
|
full_message = hook.get("last_assistant_message", "") or ""
|
|
|
|
# Truncate message preview
|
|
preview = full_message
|
|
if len(preview) > MAX_PREVIEW_LEN:
|
|
preview = preview[:MAX_PREVIEW_LEN] + "..."
|
|
|
|
# Determine status - check for prose questions on Stop
|
|
status = STATUS_MAP.get(event, existing.get("status", "unknown"))
|
|
prose_question = None
|
|
if event == "Stop":
|
|
prose_question = _detect_prose_question(full_message)
|
|
if prose_question:
|
|
status = "needs_attention"
|
|
|
|
state = {
|
|
"session_id": session_id,
|
|
"agent": "claude",
|
|
"project": project,
|
|
"project_dir": project_dir,
|
|
"status": status,
|
|
"started_at": existing.get("started_at", now),
|
|
"last_event_at": now,
|
|
"last_event": event,
|
|
"last_message_preview": preview,
|
|
"zellij_session": os.environ.get("ZELLIJ_SESSION_NAME", ""),
|
|
"zellij_pane": os.environ.get("ZELLIJ_PANE_ID", ""),
|
|
}
|
|
|
|
# Turn timing: track working time from user prompt to completion
|
|
if event == "UserPromptSubmit":
|
|
# New turn starting - reset turn timing
|
|
state["turn_started_at"] = now
|
|
state["turn_paused_ms"] = 0
|
|
else:
|
|
# Preserve turn timing from existing state
|
|
if "turn_started_at" in existing:
|
|
state["turn_started_at"] = existing["turn_started_at"]
|
|
if "turn_paused_ms" in existing:
|
|
state["turn_paused_ms"] = existing["turn_paused_ms"]
|
|
if "turn_paused_at" in existing:
|
|
state["turn_paused_at"] = existing["turn_paused_at"]
|
|
|
|
# Store prose question if detected
|
|
if prose_question:
|
|
state["pending_questions"] = [{
|
|
"question": prose_question,
|
|
"header": "Question",
|
|
"options": [],
|
|
}]
|
|
|
|
_atomic_write(session_file, state)
|
|
event_name = event
|
|
if event == "Stop" and prose_question:
|
|
event_name = "Stop(question)"
|
|
_append_event(session_id, {
|
|
"event": event_name,
|
|
"at": now,
|
|
"status": status,
|
|
})
|
|
|
|
except Exception:
|
|
# Fail open: never let a hook error affect Claude
|
|
pass
|
|
|
|
|
|
def _append_event(session_id, event_data):
|
|
"""Append a single JSON line to the session's event log."""
|
|
event_file = EVENTS_DIR / f"{session_id}.jsonl"
|
|
try:
|
|
with open(event_file, "a") as f:
|
|
f.write(json.dumps(event_data) + "\n")
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|