Files
amc/bin/amc-hook
2026-02-25 09:21:59 -05:00

271 lines
8.4 KiB
Python
Executable File

#!/usr/bin/env python3
"""AMC hook — writes Claude Code session state to disk.
Called by Claude Code hooks:
SessionStart, UserPromptSubmit, Stop, SessionEnd
PreToolUse(AskUserQuestion), PostToolUse(AskUserQuestion)
Reads hook JSON from stdin, writes session state + appends event log.
MUST be fail-open: never exit nonzero, never block, never crash Claude.
"""
import json
import os
import sys
import tempfile
from datetime import datetime, timezone
from pathlib import Path
DATA_DIR = Path.home() / ".local" / "share" / "amc"
SESSIONS_DIR = DATA_DIR / "sessions"
EVENTS_DIR = DATA_DIR / "events"
STATUS_MAP = {
"SessionStart": "starting",
"UserPromptSubmit": "active",
"Stop": "done",
}
MAX_PREVIEW_LEN = 200
MAX_QUESTION_LEN = 500
def _detect_prose_question(message):
"""Detect if message ends with a question. Returns question text or None."""
if not message:
return None
# Strip trailing whitespace and check for question mark
text = message.rstrip()
if not text.endswith("?"):
return None
# Extract the question - find the last paragraph or sentence with "?"
# Split by double newlines (paragraphs) first
paragraphs = text.split("\n\n")
last_para = paragraphs[-1].strip()
# If the last paragraph has a question mark, use it
if "?" in last_para:
# Truncate if too long
if len(last_para) > MAX_QUESTION_LEN:
last_para = last_para[-MAX_QUESTION_LEN:]
# Try to start at a sentence boundary
first_period = last_para.find(". ")
if first_period > 0:
last_para = last_para[first_period + 2:]
return last_para
return None
def _extract_questions(hook):
"""Extract question text from AskUserQuestion tool_input."""
tool_input = hook.get("tool_input", {})
if isinstance(tool_input, str):
try:
tool_input = json.loads(tool_input)
except (json.JSONDecodeError, TypeError):
return []
# Guard against non-dict tool_input (null, list, etc.)
if not isinstance(tool_input, dict):
return []
questions = tool_input.get("questions", [])
result = []
for q in questions:
entry = {
"question": q.get("question", ""),
"header": q.get("header", ""),
"options": [],
}
for opt in q.get("options", []):
entry["options"].append({
"label": opt.get("label", ""),
"description": opt.get("description", ""),
})
result.append(entry)
return result
def _atomic_write(path, data):
"""Write JSON data atomically via temp file + os.replace()."""
fd, tmp = tempfile.mkstemp(dir=path.parent, suffix=".tmp")
closed = False
try:
os.write(fd, json.dumps(data, indent=2).encode())
os.close(fd)
closed = True
os.replace(tmp, path)
except Exception:
if not closed:
try:
os.close(fd)
except OSError:
pass
try:
os.unlink(tmp)
except OSError:
pass
raise
def _read_session(session_file):
"""Read existing session state, or return empty dict."""
if session_file.exists():
try:
return json.loads(session_file.read_text())
except (json.JSONDecodeError, OSError):
pass
return {}
def main():
try:
raw = sys.stdin.read()
if not raw.strip():
return
hook = json.loads(raw)
event = hook.get("hook_event_name", "")
session_id = hook.get("session_id", "")
if not session_id or not event:
return
SESSIONS_DIR.mkdir(parents=True, exist_ok=True)
EVENTS_DIR.mkdir(parents=True, exist_ok=True)
now = datetime.now(timezone.utc).isoformat()
# Sanitize session_id to prevent path traversal
session_id = os.path.basename(session_id)
if not session_id:
return
session_file = SESSIONS_DIR / f"{session_id}.json"
# SessionEnd: delete session file (session is gone)
if event == "SessionEnd":
_append_event(session_id, {"event": event, "at": now})
try:
session_file.unlink(missing_ok=True)
except OSError:
pass
return
# PreToolUse(AskUserQuestion): mark needs_attention + store questions
if event == "PreToolUse":
tool_name = hook.get("tool_name", "")
if tool_name == "AskUserQuestion":
existing = _read_session(session_file)
if not existing:
return
existing["status"] = "needs_attention"
existing["last_event"] = f"PreToolUse({tool_name})"
existing["last_event_at"] = now
existing["pending_questions"] = _extract_questions(hook)
_atomic_write(session_file, existing)
_append_event(session_id, {
"event": f"PreToolUse({tool_name})",
"at": now,
"status": "needs_attention",
})
return
# PostToolUse(AskUserQuestion): question answered, back to active
if event == "PostToolUse":
tool_name = hook.get("tool_name", "")
if tool_name == "AskUserQuestion":
existing = _read_session(session_file)
if not existing:
return
existing["status"] = "active"
existing["last_event"] = f"PostToolUse({tool_name})"
existing["last_event_at"] = now
existing.pop("pending_questions", None)
_atomic_write(session_file, existing)
_append_event(session_id, {
"event": f"PostToolUse({tool_name})",
"at": now,
"status": "active",
})
return
# Guard: don't resurrect a session after SessionEnd deleted it.
if event != "SessionStart" and not session_file.exists():
return
# Build session state for SessionStart, UserPromptSubmit, Stop
project_dir = os.environ.get("CLAUDE_PROJECT_DIR", hook.get("cwd", ""))
project = os.path.basename(project_dir) if project_dir else "unknown"
existing = _read_session(session_file)
# Get the full message for question detection
full_message = hook.get("last_assistant_message", "") or ""
# Truncate message preview
preview = full_message
if len(preview) > MAX_PREVIEW_LEN:
preview = preview[:MAX_PREVIEW_LEN] + "..."
# Determine status - check for prose questions on Stop
status = STATUS_MAP.get(event, existing.get("status", "unknown"))
prose_question = None
if event == "Stop":
prose_question = _detect_prose_question(full_message)
if prose_question:
status = "needs_attention"
state = {
"session_id": session_id,
"agent": "claude",
"project": project,
"project_dir": project_dir,
"status": status,
"started_at": existing.get("started_at", now),
"last_event_at": now,
"last_event": event,
"last_message_preview": preview,
"zellij_session": os.environ.get("ZELLIJ_SESSION_NAME", ""),
"zellij_pane": os.environ.get("ZELLIJ_PANE_ID", ""),
}
# Store prose question if detected
if prose_question:
state["pending_questions"] = [{
"question": prose_question,
"header": "Question",
"options": [],
}]
_atomic_write(session_file, state)
event_name = event
if event == "Stop" and prose_question:
event_name = "Stop(question)"
_append_event(session_id, {
"event": event_name,
"at": now,
"status": status,
})
except Exception:
# Fail open: never let a hook error affect Claude
pass
def _append_event(session_id, event_data):
"""Append a single JSON line to the session's event log."""
event_file = EVENTS_DIR / f"{session_id}.jsonl"
try:
with open(event_file, "a") as f:
f.write(json.dumps(event_data) + "\n")
except OSError:
pass
if __name__ == "__main__":
main()