import json import os import re import subprocess import time from datetime import datetime, timezone from amc_server.agents import ( CODEX_ACTIVE_WINDOW, CODEX_SESSIONS_DIR, _CODEX_CACHE_MAX, _codex_pane_cache, _codex_transcript_cache, _dismissed_codex_ids, ) from amc_server.config import SESSIONS_DIR from amc_server.spawn_config import PENDING_SPAWNS_DIR from amc_server.logging_utils import LOGGER def _parse_session_timestamp(session_ts): """Parse Codex session timestamp to Unix time. Returns None on failure.""" if not session_ts: return None try: # Codex uses ISO format, possibly with Z suffix or +00:00 ts_str = session_ts.replace('Z', '+00:00') dt = datetime.fromisoformat(ts_str) return dt.timestamp() except (ValueError, TypeError, AttributeError): return None def _match_pending_spawn(session_cwd, session_start_ts): """Match a Codex session to a pending spawn by CWD and timestamp. Args: session_cwd: The CWD of the Codex session session_start_ts: The session's START timestamp (ISO string from Codex metadata) IMPORTANT: Must be session start time, not file mtime, to avoid false matches with pre-existing sessions that were recently active. Returns: spawn_id if matched (and deletes the pending file), None otherwise """ if not PENDING_SPAWNS_DIR.exists(): return None normalized_cwd = os.path.normpath(session_cwd) if session_cwd else "" if not normalized_cwd: return None # Parse session start time - if we can't parse it, we can't safely match session_start_unix = _parse_session_timestamp(session_start_ts) if session_start_unix is None: return None try: for pending_file in PENDING_SPAWNS_DIR.glob('*.json'): try: data = json.loads(pending_file.read_text()) if not isinstance(data, dict): continue # Check agent type (only match codex to codex) if data.get('agent_type') != 'codex': continue # Check CWD match pending_path = os.path.normpath(data.get('project_path', '')) if normalized_cwd != pending_path: continue # Check timing: session must have STARTED after spawn was initiated # Using session start time (not mtime) prevents false matches with # pre-existing sessions that happen to be recently active spawn_ts = data.get('timestamp', 0) if session_start_unix < spawn_ts: continue # Match found - claim the spawn_id and delete the pending file spawn_id = data.get('spawn_id') try: pending_file.unlink() except OSError: pass LOGGER.info( 'Matched Codex session (cwd=%s) to pending spawn_id=%s', session_cwd, spawn_id, ) return spawn_id except (json.JSONDecodeError, OSError): continue except OSError: pass return None class SessionDiscoveryMixin: def _discover_active_codex_sessions(self): """Find active Codex sessions and create/update session files with Zellij pane info.""" if not CODEX_SESSIONS_DIR.exists(): return # Get Zellij pane info for running codex processes pid_info, cwd_map = self._get_codex_pane_info() # Only look at sessions modified recently (active) now = time.time() cutoff = now - CODEX_ACTIVE_WINDOW for jsonl_file in CODEX_SESSIONS_DIR.rglob("*.jsonl"): try: # Skip old files mtime = jsonl_file.stat().st_mtime if mtime < cutoff: continue # Extract session ID from filename match = re.search(r"([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})", jsonl_file.name) if not match: continue session_id = match.group(1) # Evict old entries if cache is full (simple FIFO) if len(_codex_transcript_cache) >= _CODEX_CACHE_MAX: keys_to_remove = list(_codex_transcript_cache.keys())[: _CODEX_CACHE_MAX // 5] for k in keys_to_remove: _codex_transcript_cache.pop(k, None) _codex_transcript_cache[session_id] = str(jsonl_file) # Skip sessions the user has dismissed if session_id in _dismissed_codex_ids: continue session_file = SESSIONS_DIR / f"{session_id}.json" # Parse first line to get session metadata with jsonl_file.open() as f: first_line = f.readline().strip() if not first_line: continue meta = json.loads(first_line) if not isinstance(meta, dict): continue if meta.get("type") != "session_meta": continue payload = self._as_dict(meta.get("payload")) cwd = payload.get("cwd", "") project = os.path.basename(cwd) if cwd else "Unknown" # Match session to Zellij pane (UUID match via lsof, CWD fallback) zellij_session, zellij_pane = self._match_codex_session_to_pane( jsonl_file, cwd, pid_info, cwd_map ) # Determine status based on file age file_age_minutes = (now - mtime) / 60 if file_age_minutes < 2: status = "active" else: status = "done" # Read existing session to preserve some fields existing = {} if session_file.exists(): try: loaded_existing = json.loads(session_file.read_text()) if isinstance(loaded_existing, dict): existing = loaded_existing # Don't downgrade active to done if file was just updated if existing.get("status") == "active" and status == "done": # Check if we should keep it active if file_age_minutes < 5: status = "active" except (json.JSONDecodeError, OSError): pass # Get last message preview from recent lines last_message = "" try: tail_entries = self._read_jsonl_tail_entries(jsonl_file, max_lines=60, max_bytes=800 * 1024) for entry in reversed(tail_entries): if not isinstance(entry, dict): continue if entry.get("type") == "response_item": payload_item = self._as_dict(entry.get("payload")) if payload_item.get("role") == "assistant": content = payload_item.get("content", []) if not isinstance(content, list): continue for part in content: if isinstance(part, dict) and part.get("text"): text = part["text"] # Skip system content if not text.startswith("<") and not text.startswith("#"): last_message = text[:200] break if last_message: break except (json.JSONDecodeError, OSError): pass context_usage = self._get_cached_context_usage( jsonl_file, self._parse_codex_context_usage_from_file ) session_ts = payload.get("timestamp", "") last_event_at = datetime.fromtimestamp(mtime, tz=timezone.utc).isoformat() # Check for spawn_id: preserve existing, or match to pending spawn # Use session_ts (start time) not mtime to avoid false matches # with pre-existing sessions that were recently active spawn_id = existing.get("spawn_id") if not spawn_id: spawn_id = _match_pending_spawn(cwd, session_ts) session_data = { "session_id": session_id, "agent": "codex", "project": project, "project_dir": cwd, "status": status, "started_at": existing.get("started_at", session_ts), "last_event_at": last_event_at, "last_event": "CodexSession", "last_message_preview": last_message, "zellij_session": zellij_session or existing.get("zellij_session", ""), "zellij_pane": zellij_pane or existing.get("zellij_pane", ""), "transcript_path": str(jsonl_file), } if spawn_id: session_data["spawn_id"] = spawn_id if context_usage: session_data["context_usage"] = context_usage elif existing.get("context_usage"): session_data["context_usage"] = existing.get("context_usage") session_file.write_text(json.dumps(session_data, indent=2)) except (OSError, json.JSONDecodeError): continue except Exception: LOGGER.exception("Failed to discover Codex session from %s", jsonl_file) continue def _get_codex_pane_info(self): """Get Zellij pane info for running codex processes via process inspection. Extracts ZELLIJ_PANE_ID from each codex process's inherited environment, since zellij dump-layout doesn't provide pane IDs. Results are cached for 5 seconds to avoid running pgrep/ps/lsof on every dashboard poll. Returns: tuple: (pid_info, cwd_map) pid_info: {pid_str: {"pane_id": str, "zellij_session": str}} cwd_map: {cwd_path: {"session": str, "pane_id": str}} """ now = time.time() if now < _codex_pane_cache["expires"]: return _codex_pane_cache["pid_info"], _codex_pane_cache["cwd_map"] pid_info = {} cwd_map = {} try: # Step 1: Find codex process PIDs result = subprocess.run( ["pgrep", "-x", "codex"], capture_output=True, text=True, timeout=2, ) pids = [p.strip() for p in result.stdout.strip().splitlines() if p.strip()] if result.returncode == 0 else [] # Step 2: Extract ZELLIJ env vars from each process for pid in pids: try: env_result = subprocess.run( ["ps", "eww", "-o", "args=", "-p", pid], capture_output=True, text=True, timeout=2, ) if env_result.returncode != 0: continue env_str = env_result.stdout pane_match = re.search(r"ZELLIJ_PANE_ID=(\d+)", env_str) session_match = re.search(r"ZELLIJ_SESSION_NAME=(\S+)", env_str) if pane_match and session_match: pid_info[pid] = { "pane_id": pane_match.group(1), "zellij_session": session_match.group(1), } except (subprocess.TimeoutExpired, Exception): continue # Step 3: Get CWDs via single batched lsof call if pid_info: pid_list = ",".join(pid_info.keys()) try: cwd_result = subprocess.run( ["lsof", "-a", "-p", pid_list, "-d", "cwd", "-Fn"], capture_output=True, text=True, timeout=3, ) if cwd_result.returncode == 0: current_pid = None for line in cwd_result.stdout.splitlines(): if line.startswith("p"): current_pid = line[1:] elif line.startswith("n/") and current_pid and current_pid in pid_info: cwd = line[1:] info = pid_info[current_pid] cwd_map[cwd] = { "session": info["zellij_session"], "pane_id": info["pane_id"], } except (subprocess.TimeoutExpired, Exception): pass except (subprocess.TimeoutExpired, FileNotFoundError, Exception): pass _codex_pane_cache["pid_info"] = pid_info _codex_pane_cache["cwd_map"] = cwd_map _codex_pane_cache["expires"] = now + 5 # Cache for 5 seconds return pid_info, cwd_map def _match_codex_session_to_pane(self, session_file, session_cwd, pid_info, cwd_map): """Match a Codex session file to a Zellij pane. Tries session-file-to-PID matching first (via lsof), falls back to CWD. Returns: tuple: (zellij_session, pane_id) or ("", "") """ # Try precise match: which process has this session file open? try: result = subprocess.run( ["lsof", "-t", str(session_file)], capture_output=True, text=True, timeout=2, ) if result.returncode == 0 and result.stdout.strip(): for pid in result.stdout.strip().splitlines(): pid = pid.strip() if pid in pid_info: info = pid_info[pid] return info["zellij_session"], info["pane_id"] except (subprocess.TimeoutExpired, Exception): pass # Fall back to CWD match normalized_cwd = os.path.normpath(session_cwd) if session_cwd else "" for pane_cwd, info in cwd_map.items(): if os.path.normpath(pane_cwd) == normalized_cwd: return info["session"], info["pane_id"] return "", ""