# Edge Cases and Error Handling Comprehensive guide to edge cases, malformed input handling, and error recovery in Claude JSONL processing. ## Parsing Edge Cases ### 1. Invalid JSON Lines **Scenario:** Corrupted or truncated JSON line. ```python # BAD: Crashes on invalid JSON for line in file: data = json.loads(line) # Raises JSONDecodeError # GOOD: Skip invalid lines for line in file: if not line.strip(): continue try: data = json.loads(line) except json.JSONDecodeError: continue # Skip malformed line ``` ### 2. Content Type Ambiguity **Scenario:** User message content can be string OR array. ```python # BAD: Assumes string user_text = message['content'] # GOOD: Check type content = message['content'] if isinstance(content, str): user_text = content elif isinstance(content, list): # This is tool results, not user input user_text = None ``` ### 3. Missing Optional Fields **Scenario:** Fields may be absent in older versions. ```python # BAD: Assumes field exists tokens = message['usage']['cache_read_input_tokens'] # GOOD: Safe access usage = message.get('usage', {}) tokens = usage.get('cache_read_input_tokens', 0) ``` ### 4. Partial File Reads **Scenario:** Reading last N bytes may cut first line. ```python # When seeking to end - N bytes, first line may be partial def read_tail(file_path, max_bytes=1_000_000): with open(file_path, 'r') as f: f.seek(0, 2) # End size = f.tell() if size > max_bytes: f.seek(size - max_bytes) f.readline() # Discard partial first line else: f.seek(0) return f.readlines() ``` ### 5. Non-Dict JSON Values **Scenario:** Line contains valid JSON but not an object. ```python # File might contain: 123, "string", [1,2,3], null data = json.loads(line) if not isinstance(data, dict): continue # Skip non-object JSON ``` ## Type Coercion Edge Cases ### Integer Conversion ```python def safe_int(value): """Convert to int, rejecting booleans.""" # Python: isinstance(True, int) == True, so check explicitly if isinstance(value, bool): return None if isinstance(value, int): return value if isinstance(value, float): return int(value) if isinstance(value, str): try: return int(value) except ValueError: return None return None ``` ### Token Summation ```python def sum_tokens(*values): """Sum token counts, handling None/missing.""" valid = [v for v in values if isinstance(v, (int, float)) and not isinstance(v, bool)] return sum(valid) if valid else None ``` ## Session State Edge Cases ### 1. Orphan Sessions **Scenario:** Multiple sessions claim same Zellij pane (e.g., after --resume). **Resolution:** Keep session with: 1. Highest priority: Has `context_usage` (indicates real work) 2. Second priority: Latest `conversation_mtime_ns` ```python def dedupe_sessions(sessions): by_pane = {} for s in sessions: key = (s['zellij_session'], s['zellij_pane']) if key not in by_pane: by_pane[key] = s else: existing = by_pane[key] # Prefer session with context_usage if s.get('context_usage') and not existing.get('context_usage'): by_pane[key] = s elif s.get('conversation_mtime_ns', 0) > existing.get('conversation_mtime_ns', 0): by_pane[key] = s return list(by_pane.values()) ``` ### 2. Dead Session Detection **Claude:** Check Zellij session exists ```python def is_claude_dead(session): if session['status'] == 'starting': return False # Benefit of doubt zellij = session.get('zellij_session') if not zellij: return True # Check if Zellij session exists result = subprocess.run(['zellij', 'list-sessions'], capture_output=True) return zellij not in result.stdout.decode() ``` **Codex:** Check if process has file open ```python def is_codex_dead(session): transcript = session.get('transcript_path') if not transcript: return True # Check if any process has file open result = subprocess.run(['lsof', transcript], capture_output=True) return result.returncode != 0 ``` ### 3. Stale Session Cleanup ```python ORPHAN_AGE_HOURS = 24 STARTING_AGE_HOURS = 1 def should_cleanup(session, now): age = now - session['started_at'] if session['status'] == 'starting' and age > timedelta(hours=STARTING_AGE_HOURS): return True # Stuck in starting if session.get('is_dead') and age > timedelta(hours=ORPHAN_AGE_HOURS): return True # Dead and old return False ``` ## Tool Call Edge Cases ### 1. Missing Tool Results **Scenario:** Session interrupted between tool_use and tool_result. ```python def pair_tool_calls(messages): pending = {} # tool_use_id -> tool_use for msg in messages: if msg['type'] == 'assistant': for block in msg['message'].get('content', []): if block.get('type') == 'tool_use': pending[block['id']] = block elif msg['type'] == 'user': content = msg['message'].get('content', []) if isinstance(content, list): for block in content: if block.get('type') == 'tool_result': tool_id = block.get('tool_use_id') if tool_id in pending: pending[tool_id]['result'] = block # Any pending without result = interrupted incomplete = [t for t in pending.values() if 'result' not in t] return pending, incomplete ``` ### 2. Parallel Tool Call Ordering **Scenario:** Multiple tool_use in one message, results may come in different order. ```python # Match by ID, not by position tool_uses = [b for b in assistant_content if b['type'] == 'tool_use'] tool_results = [b for b in user_content if b['type'] == 'tool_result'] paired = {} for result in tool_results: paired[result['tool_use_id']] = result for use in tool_uses: result = paired.get(use['id']) # result may be None if missing ``` ### 3. Tool Error Results ```python def is_tool_error(result_block): return result_block.get('is_error', False) def extract_error_message(result_block): content = result_block.get('content', '') if content.startswith('Error:'): return content return None ``` ## Codex-Specific Edge Cases ### 1. Content Injection Filtering Codex may include system context in messages that should be filtered: ```python SKIP_PREFIXES = [ '', '', '', '# AGENTS.md instructions' ] def should_skip_content(text): return any(text.startswith(prefix) for prefix in SKIP_PREFIXES) ``` ### 2. Developer Role Filtering ```python def parse_codex_message(payload): role = payload.get('role') if role == 'developer': return None # Skip system/developer messages return payload ``` ### 3. Function Call Arguments Parsing ```python def parse_arguments(arguments): if isinstance(arguments, dict): return arguments if isinstance(arguments, str): try: return json.loads(arguments) except json.JSONDecodeError: return {'raw': arguments} return {} ``` ### 4. Tool Call Buffering Codex tool calls need buffering until next message: ```python class CodexParser: def __init__(self): self.pending_tools = [] def process_entry(self, entry): payload = entry.get('payload', {}) ptype = payload.get('type') if ptype == 'function_call': self.pending_tools.append({ 'name': payload['name'], 'input': self.parse_arguments(payload['arguments']) }) return None # Don't emit yet elif ptype == 'message' and payload.get('role') == 'assistant': msg = self.create_message(payload) if self.pending_tools: msg['tool_calls'] = self.pending_tools self.pending_tools = [] return msg elif ptype == 'message' and payload.get('role') == 'user': # Flush pending tools before user message msgs = [] if self.pending_tools: msgs.append({'role': 'assistant', 'tool_calls': self.pending_tools}) self.pending_tools = [] msgs.append(self.create_message(payload)) return msgs ``` ## File System Edge Cases ### 1. Path Traversal Prevention ```python import os def validate_session_id(session_id): # Must be basename only if os.path.basename(session_id) != session_id: raise ValueError("Invalid session ID") # No special characters if any(c in session_id for c in ['/', '\\', '..', '\x00']): raise ValueError("Invalid session ID") def validate_project_path(project_path, base_dir): resolved = os.path.realpath(project_path) base = os.path.realpath(base_dir) if not resolved.startswith(base + os.sep): raise ValueError("Path traversal detected") ``` ### 2. File Not Found ```python def read_session_file(path): try: with open(path, 'r') as f: return f.read() except FileNotFoundError: return None except PermissionError: return None except OSError: return None ``` ### 3. Empty Files ```python def parse_jsonl(path): with open(path, 'r') as f: content = f.read() if not content.strip(): return [] # Empty file return [json.loads(line) for line in content.strip().split('\n') if line.strip()] ``` ## Subprocess Edge Cases ### 1. Timeout Handling ```python import subprocess def run_with_timeout(cmd, timeout=5): try: result = subprocess.run( cmd, capture_output=True, timeout=timeout, text=True ) return result.stdout except subprocess.TimeoutExpired: return None except FileNotFoundError: return None except OSError: return None ``` ### 2. ANSI Code Stripping ```python import re ANSI_PATTERN = re.compile(r'\x1b\[[0-9;]*m') def strip_ansi(text): return ANSI_PATTERN.sub('', text) ``` ## Cache Invalidation ### Mtime-Based Cache ```python class FileCache: def __init__(self, max_size=100): self.cache = {} self.max_size = max_size def get(self, path): if path not in self.cache: return None entry = self.cache[path] stat = os.stat(path) # Invalidate if file changed if stat.st_mtime_ns != entry['mtime_ns'] or stat.st_size != entry['size']: del self.cache[path] return None return entry['data'] def set(self, path, data): # Evict oldest if full if len(self.cache) >= self.max_size: oldest = next(iter(self.cache)) del self.cache[oldest] stat = os.stat(path) self.cache[path] = { 'mtime_ns': stat.st_mtime_ns, 'size': stat.st_size, 'data': data } ``` ## Testing Edge Cases Checklist - [ ] Empty JSONL file - [ ] Single-line JSONL file - [ ] Truncated JSON line - [ ] Non-object JSON values (numbers, strings, arrays) - [ ] Missing required fields - [ ] Unknown message types - [ ] Content as string vs array - [ ] Boolean vs integer confusion - [ ] Unicode in content - [ ] Very long lines (>64KB) - [ ] Concurrent file modifications - [ ] Missing tool results - [ ] Multiple tool calls in single message - [ ] Session without Zellij pane - [ ] Codex developer messages - [ ] Path traversal attempts - [ ] Symlink escape attempts