import json import os from pathlib import Path from amc_server.context import ( CLAUDE_PROJECTS_DIR, CODEX_SESSIONS_DIR, _CONTEXT_CACHE_MAX, _codex_transcript_cache, _context_usage_cache, ) from amc_server.logging_utils import LOGGER class SessionParsingMixin: def _get_claude_conversation_file(self, session_id, project_dir): """Resolve Claude conversation file path from session id + project dir.""" if not project_dir: return None encoded_dir = project_dir.replace("/", "-") if not encoded_dir.startswith("-"): encoded_dir = "-" + encoded_dir conv_file = CLAUDE_PROJECTS_DIR / encoded_dir / f"{session_id}.jsonl" return conv_file if conv_file.exists() else None def _find_codex_transcript_file(self, session_id): """Resolve Codex transcript path for a session id with lightweight caching.""" if not session_id: return None if session_id in _codex_transcript_cache: cached = _codex_transcript_cache.get(session_id) if cached: path = Path(cached) if path.exists(): return path return None if not CODEX_SESSIONS_DIR.exists(): _codex_transcript_cache[session_id] = None return None try: for jsonl_file in CODEX_SESSIONS_DIR.rglob(f"*{session_id}*.jsonl"): _codex_transcript_cache[session_id] = str(jsonl_file) return jsonl_file except OSError: pass _codex_transcript_cache[session_id] = None return None def _read_jsonl_tail_entries(self, file_path, max_lines=300, max_bytes=1024 * 1024): """Read only the tail of a JSONL file and return parsed entries.""" entries = [] try: with file_path.open("rb") as f: f.seek(0, os.SEEK_END) file_size = f.tell() if file_size <= 0: return entries read_size = min(file_size, max_bytes) f.seek(file_size - read_size) chunk = f.read(read_size) except OSError: return entries lines = chunk.splitlines() if file_size > read_size and lines: # First line may be partial because we started in the middle. lines = lines[1:] for raw_line in lines[-max_lines:]: if not raw_line: continue try: entries.append(json.loads(raw_line.decode("utf-8", errors="replace"))) except json.JSONDecodeError: continue return entries def _to_int(self, value): """Best-effort integer conversion.""" if isinstance(value, bool): return None if isinstance(value, int): return value if isinstance(value, float): return int(value) if isinstance(value, str): try: return int(value) except ValueError: return None return None def _sum_optional_ints(self, values): """Sum available ints, return None when no values are present.""" present = [v for v in values if isinstance(v, int)] if not present: return None return sum(present) def _as_dict(self, value): """Normalize potentially-null JSON objects into dicts.""" return value if isinstance(value, dict) else {} def _parse_codex_context_usage_from_file(self, file_path): """Extract the latest Codex context usage snapshot from transcript tail.""" entries = self._read_jsonl_tail_entries(file_path, max_lines=600, max_bytes=1024 * 1024) for entry in reversed(entries): if not isinstance(entry, dict): continue if entry.get("type") != "event_msg": continue payload = self._as_dict(entry.get("payload")) if payload.get("type") != "token_count": continue info = self._as_dict(payload.get("info")) last_usage = self._as_dict(info.get("last_token_usage")) total_usage = self._as_dict(info.get("total_token_usage")) input_tokens = self._to_int(last_usage.get("input_tokens")) output_tokens = self._to_int(last_usage.get("output_tokens")) cached_input_tokens = self._to_int(last_usage.get("cached_input_tokens")) current_tokens = self._to_int(last_usage.get("total_tokens")) if current_tokens is None: current_tokens = self._sum_optional_ints([input_tokens, output_tokens, cached_input_tokens]) usage = { "window_tokens": self._to_int(info.get("model_context_window")), "current_tokens": current_tokens, "input_tokens": input_tokens, "output_tokens": output_tokens, "cached_input_tokens": cached_input_tokens, "session_total_tokens": self._to_int(total_usage.get("total_tokens")), "updated_at": entry.get("timestamp", ""), } if usage["current_tokens"] is None and usage["session_total_tokens"] is None: continue return usage return None def _get_claude_context_window(self, model): """Return context window size for Claude models.""" if not model: return 200_000 # Default for unknown Claude models # All current Claude 3.5/4 models have 200K context # Legacy claude-3-opus/sonnet/haiku also 200K, claude-2 was 100K if "claude-2" in model: return 100_000 return 200_000 def _parse_claude_context_usage_from_file(self, file_path): """Extract Claude usage with context window.""" entries = self._read_jsonl_tail_entries(file_path, max_lines=400, max_bytes=1024 * 1024) for entry in reversed(entries): if not isinstance(entry, dict): continue if entry.get("type") != "assistant": continue message = self._as_dict(entry.get("message")) usage = self._as_dict(message.get("usage")) if not usage: continue input_tokens = self._to_int(usage.get("input_tokens")) output_tokens = self._to_int(usage.get("output_tokens")) cache_read_input_tokens = self._to_int(usage.get("cache_read_input_tokens")) cache_creation_input_tokens = self._to_int(usage.get("cache_creation_input_tokens")) cached_input_tokens = self._sum_optional_ints([ cache_read_input_tokens, cache_creation_input_tokens, ]) current_tokens = self._sum_optional_ints([ input_tokens, output_tokens, cache_read_input_tokens, cache_creation_input_tokens, ]) if current_tokens is None: continue model = message.get("model", "") return { "window_tokens": self._get_claude_context_window(model), "current_tokens": current_tokens, "input_tokens": input_tokens, "output_tokens": output_tokens, "cached_input_tokens": cached_input_tokens, "session_total_tokens": None, "updated_at": entry.get("timestamp", ""), "model": model, } return None def _get_cached_context_usage(self, file_path, parser): """Cache parsed usage by transcript mtime/size for poll efficiency.""" try: stat = file_path.stat() except OSError: return None key = str(file_path) cached = _context_usage_cache.get(key) if cached and cached.get("mtime_ns") == stat.st_mtime_ns and cached.get("size") == stat.st_size: return cached.get("usage") try: usage = parser(file_path) except Exception: LOGGER.exception("Failed to parse context usage for %s", file_path) usage = None # Evict oldest entries if cache is full (simple FIFO) if len(_context_usage_cache) >= _CONTEXT_CACHE_MAX: keys_to_remove = list(_context_usage_cache.keys())[: _CONTEXT_CACHE_MAX // 5] for k in keys_to_remove: _context_usage_cache.pop(k, None) _context_usage_cache[key] = { "mtime_ns": stat.st_mtime_ns, "size": stat.st_size, "usage": usage, } return usage def _get_context_usage_for_session(self, session_data): """Attach context/token usage info for both Codex and Claude sessions.""" agent = session_data.get("agent") existing = session_data.get("context_usage") if agent == "codex": transcript_path = session_data.get("transcript_path", "") transcript_file = Path(transcript_path) if transcript_path else None if transcript_file and not transcript_file.exists(): transcript_file = None if not transcript_file: transcript_file = self._find_codex_transcript_file(session_data.get("session_id", "")) if not transcript_file: return existing parsed = self._get_cached_context_usage(transcript_file, self._parse_codex_context_usage_from_file) return parsed or existing if agent == "claude": conv_file = self._get_claude_conversation_file( session_data.get("session_id", ""), session_data.get("project_dir", ""), ) if not conv_file: return existing parsed = self._get_cached_context_usage(conv_file, self._parse_claude_context_usage_from_file) return parsed or existing return existing