#!/usr/bin/env python3 """AMC hook — writes Claude Code session state to disk. Called by Claude Code hooks: SessionStart, UserPromptSubmit, Stop, SessionEnd PreToolUse(AskUserQuestion), PostToolUse(AskUserQuestion) Reads hook JSON from stdin, writes session state + appends event log. MUST be fail-open: never exit nonzero, never block, never crash Claude. """ import json import os import sys import tempfile from datetime import datetime, timezone from pathlib import Path DATA_DIR = Path.home() / ".local" / "share" / "amc" SESSIONS_DIR = DATA_DIR / "sessions" EVENTS_DIR = DATA_DIR / "events" STATUS_MAP = { "SessionStart": "starting", "UserPromptSubmit": "active", "Stop": "done", } MAX_PREVIEW_LEN = 200 MAX_QUESTION_LEN = 500 def _detect_prose_question(message): """Detect if message ends with a question. Returns question text or None.""" if not message: return None # Strip trailing whitespace and check for question mark text = message.rstrip() if not text.endswith("?"): return None # Extract the question - find the last paragraph or sentence with "?" # Split by double newlines (paragraphs) first paragraphs = text.split("\n\n") last_para = paragraphs[-1].strip() # If the last paragraph has a question mark, use it if "?" in last_para: # Truncate if too long if len(last_para) > MAX_QUESTION_LEN: last_para = last_para[-MAX_QUESTION_LEN:] # Try to start at a sentence boundary first_period = last_para.find(". ") if first_period > 0: last_para = last_para[first_period + 2:] return last_para return None def _extract_questions(hook): """Extract question text from AskUserQuestion tool_input.""" tool_input = hook.get("tool_input", {}) if isinstance(tool_input, str): try: tool_input = json.loads(tool_input) except (json.JSONDecodeError, TypeError): return [] # Guard against non-dict tool_input (null, list, etc.) if not isinstance(tool_input, dict): return [] questions = tool_input.get("questions") or [] result = [] for q in questions: entry = { "question": q.get("question", ""), "header": q.get("header", ""), "options": [], } for opt in q.get("options", []): opt_entry = { "label": opt.get("label", ""), "description": opt.get("description", ""), } # Include markdown preview if present if opt.get("markdown"): opt_entry["markdown"] = opt.get("markdown") entry["options"].append(opt_entry) result.append(entry) return result def _atomic_write(path, data): """Write JSON data atomically via temp file + os.replace().""" fd, tmp = tempfile.mkstemp(dir=path.parent, suffix=".tmp") closed = False try: os.write(fd, json.dumps(data, indent=2).encode()) os.close(fd) closed = True os.replace(tmp, path) except Exception: if not closed: try: os.close(fd) except OSError: pass try: os.unlink(tmp) except OSError: pass raise def _read_session(session_file): """Read existing session state, or return empty dict.""" if session_file.exists(): try: return json.loads(session_file.read_text()) except (json.JSONDecodeError, OSError): pass return {} def main(): try: raw = sys.stdin.read() if not raw.strip(): return hook = json.loads(raw) event = hook.get("hook_event_name", "") session_id = hook.get("session_id", "") if not session_id or not event: return SESSIONS_DIR.mkdir(parents=True, exist_ok=True) EVENTS_DIR.mkdir(parents=True, exist_ok=True) now = datetime.now(timezone.utc).isoformat() # Sanitize session_id to prevent path traversal session_id = os.path.basename(session_id) if not session_id: return session_file = SESSIONS_DIR / f"{session_id}.json" # SessionEnd: delete session file (session is gone) if event == "SessionEnd": _append_event(session_id, {"event": event, "at": now}) try: session_file.unlink(missing_ok=True) except OSError: pass return # PreToolUse(AskUserQuestion): mark needs_attention + store questions if event == "PreToolUse": tool_name = hook.get("tool_name", "") if tool_name == "AskUserQuestion": existing = _read_session(session_file) if not existing: return existing["status"] = "needs_attention" existing["last_event"] = f"PreToolUse({tool_name})" existing["last_event_at"] = now existing["pending_questions"] = _extract_questions(hook) # Track when turn paused for duration calculation existing["turn_paused_at"] = now _atomic_write(session_file, existing) _append_event(session_id, { "event": f"PreToolUse({tool_name})", "at": now, "status": "needs_attention", }) return # PostToolUse(AskUserQuestion): question answered, back to active if event == "PostToolUse": tool_name = hook.get("tool_name", "") if tool_name == "AskUserQuestion": existing = _read_session(session_file) if not existing: return existing["status"] = "active" existing["last_event"] = f"PostToolUse({tool_name})" existing["last_event_at"] = now existing.pop("pending_questions", None) # Accumulate paused time for turn duration calculation paused_at = existing.pop("turn_paused_at", None) if paused_at: try: paused_start = datetime.fromisoformat(paused_at.replace("Z", "+00:00")) paused_end = datetime.fromisoformat(now.replace("Z", "+00:00")) paused_ms = int((paused_end - paused_start).total_seconds() * 1000) existing["turn_paused_ms"] = existing.get("turn_paused_ms", 0) + paused_ms except (ValueError, TypeError): pass _atomic_write(session_file, existing) _append_event(session_id, { "event": f"PostToolUse({tool_name})", "at": now, "status": "active", }) return # Guard: don't resurrect a session after SessionEnd deleted it. if event != "SessionStart" and not session_file.exists(): return # Build session state for SessionStart, UserPromptSubmit, Stop project_dir = os.environ.get("CLAUDE_PROJECT_DIR", hook.get("cwd", "")) project = os.path.basename(project_dir) if project_dir else "unknown" existing = _read_session(session_file) # Get the full message for question detection full_message = hook.get("last_assistant_message", "") or "" # Truncate message preview preview = full_message if len(preview) > MAX_PREVIEW_LEN: preview = preview[:MAX_PREVIEW_LEN] + "..." # Determine status - check for prose questions on Stop status = STATUS_MAP.get(event, existing.get("status", "unknown")) prose_question = None if event == "Stop": prose_question = _detect_prose_question(full_message) if prose_question: status = "needs_attention" state = { "session_id": session_id, "agent": "claude", "project": project, "project_dir": project_dir, "status": status, "started_at": existing.get("started_at", now), "last_event_at": now, "last_event": event, "last_message_preview": preview, "zellij_session": os.environ.get("ZELLIJ_SESSION_NAME", ""), "zellij_pane": os.environ.get("ZELLIJ_PANE_ID", ""), } # Include spawn_id if present in environment (for spawn correlation) spawn_id = os.environ.get("AMC_SPAWN_ID") if spawn_id: state["spawn_id"] = spawn_id # Turn timing: track working time from user prompt to completion if event == "UserPromptSubmit": # New turn starting - reset turn timing state["turn_started_at"] = now state["turn_paused_ms"] = 0 else: # Preserve turn timing from existing state if "turn_started_at" in existing: state["turn_started_at"] = existing["turn_started_at"] if "turn_paused_ms" in existing: state["turn_paused_ms"] = existing["turn_paused_ms"] if "turn_paused_at" in existing: state["turn_paused_at"] = existing["turn_paused_at"] # Store prose question if detected if prose_question: state["pending_questions"] = [{ "question": prose_question, "header": "Question", "options": [], }] _atomic_write(session_file, state) event_name = event if event == "Stop" and prose_question: event_name = "Stop(question)" _append_event(session_id, { "event": event_name, "at": now, "status": status, }) except Exception: # Fail open: never let a hook error affect Claude pass def _append_event(session_id, event_data): """Append a single JSON line to the session's event log.""" event_file = EVENTS_DIR / f"{session_id}.jsonl" try: with open(event_file, "a") as f: f.write(json.dumps(event_data) + "\n") except OSError: pass if __name__ == "__main__": main()