Create authoritative documentation suite for Claude Code JSONL session log processing, synthesized from codebase analysis, official Anthropic documentation, and community tooling research. Documentation structure (docs/claude-jsonl-reference/): 01-format-specification.md (214 lines): - Complete message envelope structure with all fields - Content block types (text, thinking, tool_use, tool_result) - Usage object for token reporting - Model identifiers and version history - Conversation DAG structure via parentUuid 02-message-types.md (346 lines): - Every message type with concrete JSON examples - User messages (string content vs array for tool results) - Assistant messages with all content block variants - Progress events (hooks, bash, MCP) - System, summary, and file-history-snapshot types - Codex format differences (response_item, function_call) 03-tool-lifecycle.md (341 lines): - Complete tool invocation to result flow - Hook input/output formats (PreToolUse, PostToolUse) - Parallel tool call handling - Tool-to-result pairing algorithm - Missing result edge cases - Codex tool format differences 04-subagent-teams.md (363 lines): - Task tool invocation and input fields - Subagent transcript locations and format - Team coordination (TeamCreate, SendMessage) - Hook events (SubagentStart, SubagentStop) - AMC spawn tracking with pending spawn registry - Worktree isolation for subagents 05-edge-cases.md (475 lines): - Parsing edge cases (invalid JSON, type ambiguity) - Type coercion gotchas (bool vs int in Python) - Session state edge cases (orphans, dead detection) - Tool call edge cases (missing results, parallel ordering) - Codex-specific quirks (content injection, buffering) - File system safety (path traversal, permissions) - Cache invalidation strategies 06-quick-reference.md (238 lines): - File locations cheat sheet - jq recipes for common queries - Python parsing snippets - Common gotchas table - Useful constants - Debugging commands Also adds CLAUDE.md at project root linking to documentation and providing project overview for agents working on AMC. Sources include Claude Code hooks.md, headless.md, Anthropic Messages API reference, and community tools (claude-code-log, claude-JSONL-browser). Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
476 lines
12 KiB
Markdown
476 lines
12 KiB
Markdown
# Edge Cases and Error Handling
|
|
|
|
Comprehensive guide to edge cases, malformed input handling, and error recovery in Claude JSONL processing.
|
|
|
|
## Parsing Edge Cases
|
|
|
|
### 1. Invalid JSON Lines
|
|
|
|
**Scenario:** Corrupted or truncated JSON line.
|
|
|
|
```python
|
|
# BAD: Crashes on invalid JSON
|
|
for line in file:
|
|
data = json.loads(line) # Raises JSONDecodeError
|
|
|
|
# GOOD: Skip invalid lines
|
|
for line in file:
|
|
if not line.strip():
|
|
continue
|
|
try:
|
|
data = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue # Skip malformed line
|
|
```
|
|
|
|
### 2. Content Type Ambiguity
|
|
|
|
**Scenario:** User message content can be string OR array.
|
|
|
|
```python
|
|
# BAD: Assumes string
|
|
user_text = message['content']
|
|
|
|
# GOOD: Check type
|
|
content = message['content']
|
|
if isinstance(content, str):
|
|
user_text = content
|
|
elif isinstance(content, list):
|
|
# This is tool results, not user input
|
|
user_text = None
|
|
```
|
|
|
|
### 3. Missing Optional Fields
|
|
|
|
**Scenario:** Fields may be absent in older versions.
|
|
|
|
```python
|
|
# BAD: Assumes field exists
|
|
tokens = message['usage']['cache_read_input_tokens']
|
|
|
|
# GOOD: Safe access
|
|
usage = message.get('usage', {})
|
|
tokens = usage.get('cache_read_input_tokens', 0)
|
|
```
|
|
|
|
### 4. Partial File Reads
|
|
|
|
**Scenario:** Reading last N bytes may cut first line.
|
|
|
|
```python
|
|
# When seeking to end - N bytes, first line may be partial
|
|
def read_tail(file_path, max_bytes=1_000_000):
|
|
with open(file_path, 'r') as f:
|
|
f.seek(0, 2) # End
|
|
size = f.tell()
|
|
|
|
if size > max_bytes:
|
|
f.seek(size - max_bytes)
|
|
f.readline() # Discard partial first line
|
|
else:
|
|
f.seek(0)
|
|
|
|
return f.readlines()
|
|
```
|
|
|
|
### 5. Non-Dict JSON Values
|
|
|
|
**Scenario:** Line contains valid JSON but not an object.
|
|
|
|
```python
|
|
# File might contain: 123, "string", [1,2,3], null
|
|
data = json.loads(line)
|
|
if not isinstance(data, dict):
|
|
continue # Skip non-object JSON
|
|
```
|
|
|
|
## Type Coercion Edge Cases
|
|
|
|
### Integer Conversion
|
|
|
|
```python
|
|
def safe_int(value):
|
|
"""Convert to int, rejecting booleans."""
|
|
# Python: isinstance(True, int) == True, so check explicitly
|
|
if isinstance(value, bool):
|
|
return None
|
|
if isinstance(value, int):
|
|
return value
|
|
if isinstance(value, float):
|
|
return int(value)
|
|
if isinstance(value, str):
|
|
try:
|
|
return int(value)
|
|
except ValueError:
|
|
return None
|
|
return None
|
|
```
|
|
|
|
### Token Summation
|
|
|
|
```python
|
|
def sum_tokens(*values):
|
|
"""Sum token counts, handling None/missing."""
|
|
valid = [v for v in values if isinstance(v, (int, float)) and not isinstance(v, bool)]
|
|
return sum(valid) if valid else None
|
|
```
|
|
|
|
## Session State Edge Cases
|
|
|
|
### 1. Orphan Sessions
|
|
|
|
**Scenario:** Multiple sessions claim same Zellij pane (e.g., after --resume).
|
|
|
|
**Resolution:** Keep session with:
|
|
1. Highest priority: Has `context_usage` (indicates real work)
|
|
2. Second priority: Latest `conversation_mtime_ns`
|
|
|
|
```python
|
|
def dedupe_sessions(sessions):
|
|
by_pane = {}
|
|
for s in sessions:
|
|
key = (s['zellij_session'], s['zellij_pane'])
|
|
if key not in by_pane:
|
|
by_pane[key] = s
|
|
else:
|
|
existing = by_pane[key]
|
|
# Prefer session with context_usage
|
|
if s.get('context_usage') and not existing.get('context_usage'):
|
|
by_pane[key] = s
|
|
elif s.get('conversation_mtime_ns', 0) > existing.get('conversation_mtime_ns', 0):
|
|
by_pane[key] = s
|
|
return list(by_pane.values())
|
|
```
|
|
|
|
### 2. Dead Session Detection
|
|
|
|
**Claude:** Check Zellij session exists
|
|
```python
|
|
def is_claude_dead(session):
|
|
if session['status'] == 'starting':
|
|
return False # Benefit of doubt
|
|
|
|
zellij = session.get('zellij_session')
|
|
if not zellij:
|
|
return True
|
|
|
|
# Check if Zellij session exists
|
|
result = subprocess.run(['zellij', 'list-sessions'], capture_output=True)
|
|
return zellij not in result.stdout.decode()
|
|
```
|
|
|
|
**Codex:** Check if process has file open
|
|
```python
|
|
def is_codex_dead(session):
|
|
transcript = session.get('transcript_path')
|
|
if not transcript:
|
|
return True
|
|
|
|
# Check if any process has file open
|
|
result = subprocess.run(['lsof', transcript], capture_output=True)
|
|
return result.returncode != 0
|
|
```
|
|
|
|
### 3. Stale Session Cleanup
|
|
|
|
```python
|
|
ORPHAN_AGE_HOURS = 24
|
|
STARTING_AGE_HOURS = 1
|
|
|
|
def should_cleanup(session, now):
|
|
age = now - session['started_at']
|
|
|
|
if session['status'] == 'starting' and age > timedelta(hours=STARTING_AGE_HOURS):
|
|
return True # Stuck in starting
|
|
|
|
if session.get('is_dead') and age > timedelta(hours=ORPHAN_AGE_HOURS):
|
|
return True # Dead and old
|
|
|
|
return False
|
|
```
|
|
|
|
## Tool Call Edge Cases
|
|
|
|
### 1. Missing Tool Results
|
|
|
|
**Scenario:** Session interrupted between tool_use and tool_result.
|
|
|
|
```python
|
|
def pair_tool_calls(messages):
|
|
pending = {} # tool_use_id -> tool_use
|
|
|
|
for msg in messages:
|
|
if msg['type'] == 'assistant':
|
|
for block in msg['message'].get('content', []):
|
|
if block.get('type') == 'tool_use':
|
|
pending[block['id']] = block
|
|
|
|
elif msg['type'] == 'user':
|
|
content = msg['message'].get('content', [])
|
|
if isinstance(content, list):
|
|
for block in content:
|
|
if block.get('type') == 'tool_result':
|
|
tool_id = block.get('tool_use_id')
|
|
if tool_id in pending:
|
|
pending[tool_id]['result'] = block
|
|
|
|
# Any pending without result = interrupted
|
|
incomplete = [t for t in pending.values() if 'result' not in t]
|
|
return pending, incomplete
|
|
```
|
|
|
|
### 2. Parallel Tool Call Ordering
|
|
|
|
**Scenario:** Multiple tool_use in one message, results may come in different order.
|
|
|
|
```python
|
|
# Match by ID, not by position
|
|
tool_uses = [b for b in assistant_content if b['type'] == 'tool_use']
|
|
tool_results = [b for b in user_content if b['type'] == 'tool_result']
|
|
|
|
paired = {}
|
|
for result in tool_results:
|
|
paired[result['tool_use_id']] = result
|
|
|
|
for use in tool_uses:
|
|
result = paired.get(use['id'])
|
|
# result may be None if missing
|
|
```
|
|
|
|
### 3. Tool Error Results
|
|
|
|
```python
|
|
def is_tool_error(result_block):
|
|
return result_block.get('is_error', False)
|
|
|
|
def extract_error_message(result_block):
|
|
content = result_block.get('content', '')
|
|
if content.startswith('Error:'):
|
|
return content
|
|
return None
|
|
```
|
|
|
|
## Codex-Specific Edge Cases
|
|
|
|
### 1. Content Injection Filtering
|
|
|
|
Codex may include system context in messages that should be filtered:
|
|
|
|
```python
|
|
SKIP_PREFIXES = [
|
|
'<INSTRUCTIONS>',
|
|
'<environment_context>',
|
|
'<permissions instructions>',
|
|
'# AGENTS.md instructions'
|
|
]
|
|
|
|
def should_skip_content(text):
|
|
return any(text.startswith(prefix) for prefix in SKIP_PREFIXES)
|
|
```
|
|
|
|
### 2. Developer Role Filtering
|
|
|
|
```python
|
|
def parse_codex_message(payload):
|
|
role = payload.get('role')
|
|
if role == 'developer':
|
|
return None # Skip system/developer messages
|
|
return payload
|
|
```
|
|
|
|
### 3. Function Call Arguments Parsing
|
|
|
|
```python
|
|
def parse_arguments(arguments):
|
|
if isinstance(arguments, dict):
|
|
return arguments
|
|
if isinstance(arguments, str):
|
|
try:
|
|
return json.loads(arguments)
|
|
except json.JSONDecodeError:
|
|
return {'raw': arguments}
|
|
return {}
|
|
```
|
|
|
|
### 4. Tool Call Buffering
|
|
|
|
Codex tool calls need buffering until next message:
|
|
|
|
```python
|
|
class CodexParser:
|
|
def __init__(self):
|
|
self.pending_tools = []
|
|
|
|
def process_entry(self, entry):
|
|
payload = entry.get('payload', {})
|
|
ptype = payload.get('type')
|
|
|
|
if ptype == 'function_call':
|
|
self.pending_tools.append({
|
|
'name': payload['name'],
|
|
'input': self.parse_arguments(payload['arguments'])
|
|
})
|
|
return None # Don't emit yet
|
|
|
|
elif ptype == 'message' and payload.get('role') == 'assistant':
|
|
msg = self.create_message(payload)
|
|
if self.pending_tools:
|
|
msg['tool_calls'] = self.pending_tools
|
|
self.pending_tools = []
|
|
return msg
|
|
|
|
elif ptype == 'message' and payload.get('role') == 'user':
|
|
# Flush pending tools before user message
|
|
msgs = []
|
|
if self.pending_tools:
|
|
msgs.append({'role': 'assistant', 'tool_calls': self.pending_tools})
|
|
self.pending_tools = []
|
|
msgs.append(self.create_message(payload))
|
|
return msgs
|
|
```
|
|
|
|
## File System Edge Cases
|
|
|
|
### 1. Path Traversal Prevention
|
|
|
|
```python
|
|
import os
|
|
|
|
def validate_session_id(session_id):
|
|
# Must be basename only
|
|
if os.path.basename(session_id) != session_id:
|
|
raise ValueError("Invalid session ID")
|
|
|
|
# No special characters
|
|
if any(c in session_id for c in ['/', '\\', '..', '\x00']):
|
|
raise ValueError("Invalid session ID")
|
|
|
|
def validate_project_path(project_path, base_dir):
|
|
resolved = os.path.realpath(project_path)
|
|
base = os.path.realpath(base_dir)
|
|
|
|
if not resolved.startswith(base + os.sep):
|
|
raise ValueError("Path traversal detected")
|
|
```
|
|
|
|
### 2. File Not Found
|
|
|
|
```python
|
|
def read_session_file(path):
|
|
try:
|
|
with open(path, 'r') as f:
|
|
return f.read()
|
|
except FileNotFoundError:
|
|
return None
|
|
except PermissionError:
|
|
return None
|
|
except OSError:
|
|
return None
|
|
```
|
|
|
|
### 3. Empty Files
|
|
|
|
```python
|
|
def parse_jsonl(path):
|
|
with open(path, 'r') as f:
|
|
content = f.read()
|
|
|
|
if not content.strip():
|
|
return [] # Empty file
|
|
|
|
return [json.loads(line) for line in content.strip().split('\n') if line.strip()]
|
|
```
|
|
|
|
## Subprocess Edge Cases
|
|
|
|
### 1. Timeout Handling
|
|
|
|
```python
|
|
import subprocess
|
|
|
|
def run_with_timeout(cmd, timeout=5):
|
|
try:
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
timeout=timeout,
|
|
text=True
|
|
)
|
|
return result.stdout
|
|
except subprocess.TimeoutExpired:
|
|
return None
|
|
except FileNotFoundError:
|
|
return None
|
|
except OSError:
|
|
return None
|
|
```
|
|
|
|
### 2. ANSI Code Stripping
|
|
|
|
```python
|
|
import re
|
|
|
|
ANSI_PATTERN = re.compile(r'\x1b\[[0-9;]*m')
|
|
|
|
def strip_ansi(text):
|
|
return ANSI_PATTERN.sub('', text)
|
|
```
|
|
|
|
## Cache Invalidation
|
|
|
|
### Mtime-Based Cache
|
|
|
|
```python
|
|
class FileCache:
|
|
def __init__(self, max_size=100):
|
|
self.cache = {}
|
|
self.max_size = max_size
|
|
|
|
def get(self, path):
|
|
if path not in self.cache:
|
|
return None
|
|
|
|
entry = self.cache[path]
|
|
stat = os.stat(path)
|
|
|
|
# Invalidate if file changed
|
|
if stat.st_mtime_ns != entry['mtime_ns'] or stat.st_size != entry['size']:
|
|
del self.cache[path]
|
|
return None
|
|
|
|
return entry['data']
|
|
|
|
def set(self, path, data):
|
|
# Evict oldest if full
|
|
if len(self.cache) >= self.max_size:
|
|
oldest = next(iter(self.cache))
|
|
del self.cache[oldest]
|
|
|
|
stat = os.stat(path)
|
|
self.cache[path] = {
|
|
'mtime_ns': stat.st_mtime_ns,
|
|
'size': stat.st_size,
|
|
'data': data
|
|
}
|
|
```
|
|
|
|
## Testing Edge Cases Checklist
|
|
|
|
- [ ] Empty JSONL file
|
|
- [ ] Single-line JSONL file
|
|
- [ ] Truncated JSON line
|
|
- [ ] Non-object JSON values (numbers, strings, arrays)
|
|
- [ ] Missing required fields
|
|
- [ ] Unknown message types
|
|
- [ ] Content as string vs array
|
|
- [ ] Boolean vs integer confusion
|
|
- [ ] Unicode in content
|
|
- [ ] Very long lines (>64KB)
|
|
- [ ] Concurrent file modifications
|
|
- [ ] Missing tool results
|
|
- [ ] Multiple tool calls in single message
|
|
- [ ] Session without Zellij pane
|
|
- [ ] Codex developer messages
|
|
- [ ] Path traversal attempts
|
|
- [ ] Symlink escape attempts
|