#!/usr/bin/env python3 """AMC hook — writes Claude Code session state to disk. Called by Claude Code hooks: SessionStart, UserPromptSubmit, Stop, SessionEnd PreToolUse(AskUserQuestion), PostToolUse(AskUserQuestion) Reads hook JSON from stdin, writes session state + appends event log. MUST be fail-open: never exit nonzero, never block, never crash Claude. """ import json import os import sys import tempfile from datetime import datetime, timezone from pathlib import Path DATA_DIR = Path.home() / ".local" / "share" / "amc" SESSIONS_DIR = DATA_DIR / "sessions" EVENTS_DIR = DATA_DIR / "events" STATUS_MAP = { "SessionStart": "starting", "UserPromptSubmit": "active", "Stop": "done", } MAX_PREVIEW_LEN = 200 MAX_QUESTION_LEN = 500 def _detect_prose_question(message): """Detect if message ends with a question. Returns question text or None.""" if not message: return None # Strip trailing whitespace and check for question mark text = message.rstrip() if not text.endswith("?"): return None # Extract the question - find the last paragraph or sentence with "?" # Split by double newlines (paragraphs) first paragraphs = text.split("\n\n") last_para = paragraphs[-1].strip() # If the last paragraph has a question mark, use it if "?" in last_para: # Truncate if too long if len(last_para) > MAX_QUESTION_LEN: last_para = last_para[-MAX_QUESTION_LEN:] # Try to start at a sentence boundary first_period = last_para.find(". ") if first_period > 0: last_para = last_para[first_period + 2:] return last_para return None def _extract_questions(hook): """Extract question text from AskUserQuestion tool_input.""" tool_input = hook.get("tool_input", {}) if isinstance(tool_input, str): try: tool_input = json.loads(tool_input) except (json.JSONDecodeError, TypeError): return [] # Guard against non-dict tool_input (null, list, etc.) if not isinstance(tool_input, dict): return [] questions = tool_input.get("questions", []) result = [] for q in questions: entry = { "question": q.get("question", ""), "header": q.get("header", ""), "options": [], } for opt in q.get("options", []): entry["options"].append({ "label": opt.get("label", ""), "description": opt.get("description", ""), }) result.append(entry) return result def _atomic_write(path, data): """Write JSON data atomically via temp file + os.replace().""" fd, tmp = tempfile.mkstemp(dir=path.parent, suffix=".tmp") closed = False try: os.write(fd, json.dumps(data, indent=2).encode()) os.close(fd) closed = True os.replace(tmp, path) except Exception: if not closed: try: os.close(fd) except OSError: pass try: os.unlink(tmp) except OSError: pass raise def _read_session(session_file): """Read existing session state, or return empty dict.""" if session_file.exists(): try: return json.loads(session_file.read_text()) except (json.JSONDecodeError, OSError): pass return {} def main(): try: raw = sys.stdin.read() if not raw.strip(): return hook = json.loads(raw) event = hook.get("hook_event_name", "") session_id = hook.get("session_id", "") if not session_id or not event: return SESSIONS_DIR.mkdir(parents=True, exist_ok=True) EVENTS_DIR.mkdir(parents=True, exist_ok=True) now = datetime.now(timezone.utc).isoformat() # Sanitize session_id to prevent path traversal session_id = os.path.basename(session_id) if not session_id: return session_file = SESSIONS_DIR / f"{session_id}.json" # SessionEnd: delete session file (session is gone) if event == "SessionEnd": _append_event(session_id, {"event": event, "at": now}) try: session_file.unlink(missing_ok=True) except OSError: pass return # PreToolUse(AskUserQuestion): mark needs_attention + store questions if event == "PreToolUse": tool_name = hook.get("tool_name", "") if tool_name == "AskUserQuestion": existing = _read_session(session_file) if not existing: return existing["status"] = "needs_attention" existing["last_event"] = f"PreToolUse({tool_name})" existing["last_event_at"] = now existing["pending_questions"] = _extract_questions(hook) _atomic_write(session_file, existing) _append_event(session_id, { "event": f"PreToolUse({tool_name})", "at": now, "status": "needs_attention", }) return # PostToolUse(AskUserQuestion): question answered, back to active if event == "PostToolUse": tool_name = hook.get("tool_name", "") if tool_name == "AskUserQuestion": existing = _read_session(session_file) if not existing: return existing["status"] = "active" existing["last_event"] = f"PostToolUse({tool_name})" existing["last_event_at"] = now existing.pop("pending_questions", None) _atomic_write(session_file, existing) _append_event(session_id, { "event": f"PostToolUse({tool_name})", "at": now, "status": "active", }) return # Guard: don't resurrect a session after SessionEnd deleted it. if event != "SessionStart" and not session_file.exists(): return # Build session state for SessionStart, UserPromptSubmit, Stop project_dir = os.environ.get("CLAUDE_PROJECT_DIR", hook.get("cwd", "")) project = os.path.basename(project_dir) if project_dir else "unknown" existing = _read_session(session_file) # Get the full message for question detection full_message = hook.get("last_assistant_message", "") or "" # Truncate message preview preview = full_message if len(preview) > MAX_PREVIEW_LEN: preview = preview[:MAX_PREVIEW_LEN] + "..." # Determine status - check for prose questions on Stop status = STATUS_MAP.get(event, existing.get("status", "unknown")) prose_question = None if event == "Stop": prose_question = _detect_prose_question(full_message) if prose_question: status = "needs_attention" state = { "session_id": session_id, "agent": "claude", "project": project, "project_dir": project_dir, "status": status, "started_at": existing.get("started_at", now), "last_event_at": now, "last_event": event, "last_message_preview": preview, "zellij_session": os.environ.get("ZELLIJ_SESSION_NAME", ""), "zellij_pane": os.environ.get("ZELLIJ_PANE_ID", ""), } # Store prose question if detected if prose_question: state["pending_questions"] = [{ "question": prose_question, "header": "Question", "options": [], }] _atomic_write(session_file, state) event_name = event if event == "Stop" and prose_question: event_name = "Stop(question)" _append_event(session_id, { "event": event_name, "at": now, "status": status, }) except Exception: # Fail open: never let a hook error affect Claude pass def _append_event(session_id, event_data): """Append a single JSON line to the session's event log.""" event_file = EVENTS_DIR / f"{session_id}.jsonl" try: with open(event_file, "a") as f: f.write(json.dumps(event_data) + "\n") except OSError: pass if __name__ == "__main__": main()