Read AMC_SPAWN_ID env var and include spawn_id in session JSON when present. This enables deterministic spawn correlation: the server generates a UUID, passes it via env to the spawned agent, and then polls for a session file containing that specific spawn_id. Closes: bd-1zy
306 lines
10 KiB
Python
Executable File
306 lines
10 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""AMC hook — writes Claude Code session state to disk.
|
|
|
|
Called by Claude Code hooks:
|
|
SessionStart, UserPromptSubmit, Stop, SessionEnd
|
|
PreToolUse(AskUserQuestion), PostToolUse(AskUserQuestion)
|
|
|
|
Reads hook JSON from stdin, writes session state + appends event log.
|
|
|
|
MUST be fail-open: never exit nonzero, never block, never crash Claude.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
DATA_DIR = Path.home() / ".local" / "share" / "amc"
|
|
SESSIONS_DIR = DATA_DIR / "sessions"
|
|
EVENTS_DIR = DATA_DIR / "events"
|
|
|
|
STATUS_MAP = {
|
|
"SessionStart": "starting",
|
|
"UserPromptSubmit": "active",
|
|
"Stop": "done",
|
|
}
|
|
|
|
MAX_PREVIEW_LEN = 200
|
|
MAX_QUESTION_LEN = 500
|
|
|
|
|
|
def _detect_prose_question(message):
|
|
"""Detect if message ends with a question. Returns question text or None."""
|
|
if not message:
|
|
return None
|
|
|
|
# Strip trailing whitespace and check for question mark
|
|
text = message.rstrip()
|
|
if not text.endswith("?"):
|
|
return None
|
|
|
|
# Extract the question - find the last paragraph or sentence with "?"
|
|
# Split by double newlines (paragraphs) first
|
|
paragraphs = text.split("\n\n")
|
|
last_para = paragraphs[-1].strip()
|
|
|
|
# If the last paragraph has a question mark, use it
|
|
if "?" in last_para:
|
|
# Truncate if too long
|
|
if len(last_para) > MAX_QUESTION_LEN:
|
|
last_para = last_para[-MAX_QUESTION_LEN:]
|
|
# Try to start at a sentence boundary
|
|
first_period = last_para.find(". ")
|
|
if first_period > 0:
|
|
last_para = last_para[first_period + 2:]
|
|
return last_para
|
|
|
|
return None
|
|
|
|
|
|
def _extract_questions(hook):
|
|
"""Extract question text from AskUserQuestion tool_input."""
|
|
tool_input = hook.get("tool_input", {})
|
|
if isinstance(tool_input, str):
|
|
try:
|
|
tool_input = json.loads(tool_input)
|
|
except (json.JSONDecodeError, TypeError):
|
|
return []
|
|
|
|
# Guard against non-dict tool_input (null, list, etc.)
|
|
if not isinstance(tool_input, dict):
|
|
return []
|
|
|
|
questions = tool_input.get("questions") or []
|
|
result = []
|
|
for q in questions:
|
|
entry = {
|
|
"question": q.get("question", ""),
|
|
"header": q.get("header", ""),
|
|
"options": [],
|
|
}
|
|
for opt in q.get("options", []):
|
|
opt_entry = {
|
|
"label": opt.get("label", ""),
|
|
"description": opt.get("description", ""),
|
|
}
|
|
# Include markdown preview if present
|
|
if opt.get("markdown"):
|
|
opt_entry["markdown"] = opt.get("markdown")
|
|
entry["options"].append(opt_entry)
|
|
result.append(entry)
|
|
return result
|
|
|
|
|
|
def _atomic_write(path, data):
|
|
"""Write JSON data atomically via temp file + os.replace()."""
|
|
fd, tmp = tempfile.mkstemp(dir=path.parent, suffix=".tmp")
|
|
closed = False
|
|
try:
|
|
os.write(fd, json.dumps(data, indent=2).encode())
|
|
os.close(fd)
|
|
closed = True
|
|
os.replace(tmp, path)
|
|
except Exception:
|
|
if not closed:
|
|
try:
|
|
os.close(fd)
|
|
except OSError:
|
|
pass
|
|
try:
|
|
os.unlink(tmp)
|
|
except OSError:
|
|
pass
|
|
raise
|
|
|
|
|
|
def _read_session(session_file):
|
|
"""Read existing session state, or return empty dict."""
|
|
if session_file.exists():
|
|
try:
|
|
return json.loads(session_file.read_text())
|
|
except (json.JSONDecodeError, OSError):
|
|
pass
|
|
return {}
|
|
|
|
|
|
def main():
|
|
try:
|
|
raw = sys.stdin.read()
|
|
if not raw.strip():
|
|
return
|
|
|
|
hook = json.loads(raw)
|
|
event = hook.get("hook_event_name", "")
|
|
session_id = hook.get("session_id", "")
|
|
if not session_id or not event:
|
|
return
|
|
|
|
SESSIONS_DIR.mkdir(parents=True, exist_ok=True)
|
|
EVENTS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
|
|
# Sanitize session_id to prevent path traversal
|
|
session_id = os.path.basename(session_id)
|
|
if not session_id:
|
|
return
|
|
|
|
session_file = SESSIONS_DIR / f"{session_id}.json"
|
|
|
|
# SessionEnd: delete session file (session is gone)
|
|
if event == "SessionEnd":
|
|
_append_event(session_id, {"event": event, "at": now})
|
|
try:
|
|
session_file.unlink(missing_ok=True)
|
|
except OSError:
|
|
pass
|
|
return
|
|
|
|
# PreToolUse(AskUserQuestion): mark needs_attention + store questions
|
|
if event == "PreToolUse":
|
|
tool_name = hook.get("tool_name", "")
|
|
if tool_name == "AskUserQuestion":
|
|
existing = _read_session(session_file)
|
|
if not existing:
|
|
return
|
|
existing["status"] = "needs_attention"
|
|
existing["last_event"] = f"PreToolUse({tool_name})"
|
|
existing["last_event_at"] = now
|
|
existing["pending_questions"] = _extract_questions(hook)
|
|
# Track when turn paused for duration calculation
|
|
existing["turn_paused_at"] = now
|
|
_atomic_write(session_file, existing)
|
|
_append_event(session_id, {
|
|
"event": f"PreToolUse({tool_name})",
|
|
"at": now,
|
|
"status": "needs_attention",
|
|
})
|
|
return
|
|
|
|
# PostToolUse(AskUserQuestion): question answered, back to active
|
|
if event == "PostToolUse":
|
|
tool_name = hook.get("tool_name", "")
|
|
if tool_name == "AskUserQuestion":
|
|
existing = _read_session(session_file)
|
|
if not existing:
|
|
return
|
|
existing["status"] = "active"
|
|
existing["last_event"] = f"PostToolUse({tool_name})"
|
|
existing["last_event_at"] = now
|
|
existing.pop("pending_questions", None)
|
|
# Accumulate paused time for turn duration calculation
|
|
paused_at = existing.pop("turn_paused_at", None)
|
|
if paused_at:
|
|
try:
|
|
paused_start = datetime.fromisoformat(paused_at.replace("Z", "+00:00"))
|
|
paused_end = datetime.fromisoformat(now.replace("Z", "+00:00"))
|
|
paused_ms = int((paused_end - paused_start).total_seconds() * 1000)
|
|
existing["turn_paused_ms"] = existing.get("turn_paused_ms", 0) + paused_ms
|
|
except (ValueError, TypeError):
|
|
pass
|
|
_atomic_write(session_file, existing)
|
|
_append_event(session_id, {
|
|
"event": f"PostToolUse({tool_name})",
|
|
"at": now,
|
|
"status": "active",
|
|
})
|
|
return
|
|
|
|
# Guard: don't resurrect a session after SessionEnd deleted it.
|
|
if event != "SessionStart" and not session_file.exists():
|
|
return
|
|
|
|
# Build session state for SessionStart, UserPromptSubmit, Stop
|
|
project_dir = os.environ.get("CLAUDE_PROJECT_DIR", hook.get("cwd", ""))
|
|
project = os.path.basename(project_dir) if project_dir else "unknown"
|
|
|
|
existing = _read_session(session_file)
|
|
|
|
# Get the full message for question detection
|
|
full_message = hook.get("last_assistant_message", "") or ""
|
|
|
|
# Truncate message preview
|
|
preview = full_message
|
|
if len(preview) > MAX_PREVIEW_LEN:
|
|
preview = preview[:MAX_PREVIEW_LEN] + "..."
|
|
|
|
# Determine status - check for prose questions on Stop
|
|
status = STATUS_MAP.get(event, existing.get("status", "unknown"))
|
|
prose_question = None
|
|
if event == "Stop":
|
|
prose_question = _detect_prose_question(full_message)
|
|
if prose_question:
|
|
status = "needs_attention"
|
|
|
|
state = {
|
|
"session_id": session_id,
|
|
"agent": "claude",
|
|
"project": project,
|
|
"project_dir": project_dir,
|
|
"status": status,
|
|
"started_at": existing.get("started_at", now),
|
|
"last_event_at": now,
|
|
"last_event": event,
|
|
"last_message_preview": preview,
|
|
"zellij_session": os.environ.get("ZELLIJ_SESSION_NAME", ""),
|
|
"zellij_pane": os.environ.get("ZELLIJ_PANE_ID", ""),
|
|
}
|
|
|
|
# Include spawn_id if present in environment (for spawn correlation)
|
|
spawn_id = os.environ.get("AMC_SPAWN_ID")
|
|
if spawn_id:
|
|
state["spawn_id"] = spawn_id
|
|
|
|
# Turn timing: track working time from user prompt to completion
|
|
if event == "UserPromptSubmit":
|
|
# New turn starting - reset turn timing
|
|
state["turn_started_at"] = now
|
|
state["turn_paused_ms"] = 0
|
|
else:
|
|
# Preserve turn timing from existing state
|
|
if "turn_started_at" in existing:
|
|
state["turn_started_at"] = existing["turn_started_at"]
|
|
if "turn_paused_ms" in existing:
|
|
state["turn_paused_ms"] = existing["turn_paused_ms"]
|
|
if "turn_paused_at" in existing:
|
|
state["turn_paused_at"] = existing["turn_paused_at"]
|
|
|
|
# Store prose question if detected
|
|
if prose_question:
|
|
state["pending_questions"] = [{
|
|
"question": prose_question,
|
|
"header": "Question",
|
|
"options": [],
|
|
}]
|
|
|
|
_atomic_write(session_file, state)
|
|
event_name = event
|
|
if event == "Stop" and prose_question:
|
|
event_name = "Stop(question)"
|
|
_append_event(session_id, {
|
|
"event": event_name,
|
|
"at": now,
|
|
"status": status,
|
|
})
|
|
|
|
except Exception:
|
|
# Fail open: never let a hook error affect Claude
|
|
pass
|
|
|
|
|
|
def _append_event(session_id, event_data):
|
|
"""Append a single JSON line to the session's event log."""
|
|
event_file = EVENTS_DIR / f"{session_id}.jsonl"
|
|
try:
|
|
with open(event_file, "a") as f:
|
|
f.write(json.dumps(event_data) + "\n")
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|