Files
amc/amc_server/mixins/state.py
teernisse 19a31a4620 feat(state): add same-pane session deduplication
Handle edge case where Claude's --resume creates an orphan session file
before resuming the original session, leaving two session files pointing
to the same Zellij pane.

The deduplication algorithm (_dedupe_same_pane_sessions) resolves
conflicts by preferring:
1. Sessions with context_usage (indicates actual conversation occurred)
2. Higher conversation_mtime_ns (more recent file activity)

When an orphan is identified, its session file is deleted from disk to
prevent re-discovery on subsequent state collection cycles.

Test coverage includes:
- Keeping session with context_usage over one without
- Keeping higher mtime when both have context_usage
- Keeping higher mtime when neither has context_usage
- Preserving sessions on different panes (no false positives)
- Single session per pane unchanged
- Sessions without pane info unchanged
- Handling non-numeric mtime values defensively

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-28 00:47:28 -05:00

440 lines
17 KiB
Python

import hashlib
import json
import subprocess
import time
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from amc_server.config import (
EVENTS_DIR,
SESSIONS_DIR,
STALE_EVENT_AGE,
STALE_STARTING_AGE,
_state_lock,
)
from amc_server.zellij import ZELLIJ_BIN, _zellij_cache
from amc_server.logging_utils import LOGGER
class StateMixin:
def _serve_state(self):
payload = self._build_state_payload()
self._send_json(200, payload)
def _serve_stream(self):
"""SSE stream of full state snapshots, emitted on change."""
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
self.send_header("Cache-Control", "no-cache")
self.send_header("Connection", "keep-alive")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
# Ask clients to reconnect quickly on transient errors.
try:
self.wfile.write(b"retry: 2000\n\n")
self.wfile.flush()
except (BrokenPipeError, ConnectionResetError, OSError):
return
last_hash = None
event_id = 0
last_heartbeat_at = time.time()
heartbeat_interval = 15
poll_interval = 1
try:
while True:
payload = self._build_state_payload()
payload_json = json.dumps(payload, separators=(",", ":"))
payload_hash = hashlib.sha1(payload_json.encode("utf-8")).hexdigest()
if payload_hash != last_hash:
event_id += 1
self._write_sse_event("state", payload_json, event_id)
last_hash = payload_hash
now = time.time()
if now - last_heartbeat_at >= heartbeat_interval:
self.wfile.write(b": ping\n\n")
self.wfile.flush()
last_heartbeat_at = now
time.sleep(poll_interval)
except (BrokenPipeError, ConnectionResetError, OSError):
# Client disconnected.
return
except Exception:
LOGGER.exception("Unhandled SSE stream error")
return
def _write_sse_event(self, event_name, data, event_id):
"""Write one SSE event frame."""
# JSON payload is compact single-line; still split defensively for SSE format.
frame = [f"id: {event_id}", f"event: {event_name}"]
for line in str(data).splitlines():
frame.append(f"data: {line}")
frame.append("")
frame.append("")
self.wfile.write("\n".join(frame).encode("utf-8"))
self.wfile.flush()
def _build_state_payload(self):
"""Build `/api/state` payload data used by JSON and SSE endpoints."""
sessions = self._collect_sessions()
return {
"sessions": sessions,
"server_time": datetime.now(timezone.utc).isoformat(),
}
def _collect_sessions(self):
"""Collect and normalize all session records from disk."""
with _state_lock:
sessions = []
SESSIONS_DIR.mkdir(parents=True, exist_ok=True)
# Discover active Codex sessions and create session files for them
self._discover_active_codex_sessions()
# Get active Zellij sessions for liveness check
active_zellij_sessions = self._get_active_zellij_sessions()
# Get set of transcript files with active processes (for dead detection)
active_transcript_files = self._get_active_transcript_files()
for f in SESSIONS_DIR.glob("*.json"):
try:
data = json.loads(f.read_text())
if not isinstance(data, dict):
continue
# Proactive liveness check: only auto-delete orphan "starting" sessions.
# Other statuses can still be useful as historical/debug context.
zellij_session = data.get("zellij_session", "")
if zellij_session and active_zellij_sessions is not None:
if zellij_session not in active_zellij_sessions:
if data.get("status") == "starting":
# A missing Zellij session while "starting" indicates an orphan.
f.unlink(missing_ok=True)
continue
context_usage = self._get_context_usage_for_session(data)
if context_usage:
data["context_usage"] = context_usage
# Capture turn token baseline on UserPromptSubmit (for per-turn token display)
# Only write once when the turn starts and we have token data
if (
data.get("last_event") == "UserPromptSubmit"
and "turn_start_tokens" not in data
and context_usage
and context_usage.get("current_tokens") is not None
):
data["turn_start_tokens"] = context_usage["current_tokens"]
# Persist to session file so it survives server restarts
try:
f.write_text(json.dumps(data, indent=2))
except OSError:
pass
# Track conversation file mtime for real-time update detection
conv_mtime = self._get_conversation_mtime(data)
if conv_mtime:
data["conversation_mtime_ns"] = conv_mtime
# Determine if session is "dead" (no longer interactable)
data["is_dead"] = self._is_session_dead(
data, active_zellij_sessions, active_transcript_files
)
sessions.append(data)
except (json.JSONDecodeError, OSError):
continue
except Exception:
LOGGER.exception("Failed processing session file %s", f)
continue
# Sort by session_id for stable, deterministic ordering (no visual jumping)
sessions.sort(key=lambda s: s.get("session_id", ""))
# Dedupe same-pane sessions (handles --resume creating orphan + real session)
sessions = self._dedupe_same_pane_sessions(sessions)
# Clean orphan event logs (sessions persist until manually dismissed or SessionEnd)
self._cleanup_stale(sessions)
return sessions
def _get_active_zellij_sessions(self):
"""Query Zellij for active sessions. Returns set of session names, or None on error."""
now = time.time()
# Use cached value if fresh (cache for 5 seconds to avoid hammering zellij)
if _zellij_cache["sessions"] is not None and now < _zellij_cache["expires"]:
return _zellij_cache["sessions"]
try:
result = subprocess.run(
[ZELLIJ_BIN, "list-sessions", "--no-formatting"],
capture_output=True,
text=True,
timeout=2,
)
if result.returncode == 0:
# Parse session names (one per line, format: "session_name [created ...]" or just "session_name")
sessions = set()
for line in result.stdout.strip().splitlines():
if line:
# Session name is the first word
session_name = line.split()[0] if line.split() else ""
if session_name:
sessions.add(session_name)
_zellij_cache["sessions"] = sessions
_zellij_cache["expires"] = now + 5 # Cache for 5 seconds
return sessions
except (subprocess.TimeoutExpired, FileNotFoundError, Exception):
pass
return None # Return None on error (don't clean up if we can't verify)
def _get_conversation_mtime(self, session_data):
"""Get the conversation file's mtime for real-time change detection."""
agent = session_data.get("agent")
if agent == "claude":
conv_file = self._get_claude_conversation_file(
session_data.get("session_id", ""),
session_data.get("project_dir", ""),
)
if conv_file:
try:
return conv_file.stat().st_mtime_ns
except OSError:
pass
elif agent == "codex":
transcript_path = session_data.get("transcript_path", "")
if transcript_path:
try:
return Path(transcript_path).stat().st_mtime_ns
except OSError:
pass
# Fallback to discovery
transcript_file = self._find_codex_transcript_file(session_data.get("session_id", ""))
if transcript_file:
try:
return transcript_file.stat().st_mtime_ns
except OSError:
pass
return None
def _get_active_transcript_files(self):
"""Get set of transcript files that have active processes.
Uses a batched lsof call to efficiently check which Codex transcript
files are currently open by a process.
Returns:
set: Absolute paths of transcript files with active processes.
"""
from amc_server.agents import CODEX_SESSIONS_DIR
if not CODEX_SESSIONS_DIR.exists():
return set()
# Find all recent transcript files
transcript_files = []
now = time.time()
cutoff = now - 3600 # Only check files modified in the last hour
for jsonl_file in CODEX_SESSIONS_DIR.rglob("*.jsonl"):
try:
if jsonl_file.stat().st_mtime > cutoff:
transcript_files.append(str(jsonl_file))
except OSError:
continue
if not transcript_files:
return set()
# Batch lsof check for all transcript files
active_files = set()
try:
# lsof with multiple files: returns PIDs for any that are open
result = subprocess.run(
["lsof", "-t"] + transcript_files,
capture_output=True,
text=True,
timeout=5,
)
# If any file is open, lsof returns 0
# We need to check which specific files are open
if result.returncode == 0 and result.stdout.strip():
# At least one file is open - check each one
for tf in transcript_files:
try:
check = subprocess.run(
["lsof", "-t", tf],
capture_output=True,
text=True,
timeout=2,
)
if check.returncode == 0 and check.stdout.strip():
active_files.add(tf)
except (subprocess.TimeoutExpired, Exception):
continue
except (subprocess.TimeoutExpired, FileNotFoundError, Exception):
pass
return active_files
def _is_session_dead(self, session_data, active_zellij_sessions, active_transcript_files):
"""Determine if a session is 'dead' (no longer interactable).
A dead session cannot receive input and won't produce more output.
These should be shown separately from active sessions in the UI.
Args:
session_data: The session dict
active_zellij_sessions: Set of active zellij session names (or None)
active_transcript_files: Set of transcript file paths with active processes
Returns:
bool: True if the session is dead
"""
agent = session_data.get("agent")
zellij_session = session_data.get("zellij_session", "")
status = session_data.get("status", "")
# Sessions that are still starting are not dead (yet)
if status == "starting":
return False
if agent == "codex":
# Codex session is dead if no process has the transcript file open
transcript_path = session_data.get("transcript_path", "")
if not transcript_path:
return True # No transcript path = malformed, treat as dead
# Check cached set first (covers recently-modified files)
if transcript_path in active_transcript_files:
return False # Process is running
# For older files not in cached set, do explicit lsof check
# This handles long-idle but still-running processes
if self._is_file_open(transcript_path):
return False # Process is running
# No process running - it's dead
return True
elif agent == "claude":
# Claude session is dead if:
# 1. No zellij session attached, OR
# 2. The zellij session no longer exists
if not zellij_session:
return True
if active_zellij_sessions is not None:
return zellij_session not in active_zellij_sessions
# If we couldn't query zellij, assume alive (don't false-positive)
return False
# Unknown agent type - assume alive
return False
def _is_file_open(self, file_path):
"""Check if any process has a file open using lsof."""
try:
result = subprocess.run(
["lsof", "-t", file_path],
capture_output=True,
text=True,
timeout=2,
)
return result.returncode == 0 and result.stdout.strip()
except (subprocess.TimeoutExpired, FileNotFoundError, Exception):
return False # Assume not open on error (conservative)
def _cleanup_stale(self, sessions):
"""Remove orphan event logs >24h and stale 'starting' sessions >1h."""
active_ids = {s.get("session_id") for s in sessions if s.get("session_id")}
now = time.time()
# Clean up orphan event logs
EVENTS_DIR.mkdir(parents=True, exist_ok=True)
for f in EVENTS_DIR.glob("*.jsonl"):
session_id = f.stem
if session_id not in active_ids:
try:
age = now - f.stat().st_mtime
if age > STALE_EVENT_AGE:
f.unlink()
except OSError:
pass
# Clean up orphan "starting" sessions (never became active)
for f in SESSIONS_DIR.glob("*.json"):
try:
age = now - f.stat().st_mtime
if age > STALE_STARTING_AGE:
data = json.loads(f.read_text())
if data.get("status") == "starting":
f.unlink()
except (json.JSONDecodeError, OSError):
pass
def _dedupe_same_pane_sessions(self, sessions):
"""Remove orphan sessions when multiple sessions share the same Zellij pane.
This handles the --resume edge case where Claude creates a new session file
before resuming the old one, leaving an orphan with no context_usage.
When multiple sessions share (zellij_session, zellij_pane), keep the one with:
1. context_usage (has actual conversation data)
2. Higher conversation_mtime_ns (more recent activity)
"""
def session_score(s):
"""Score a session for dedup ranking: (has_context, mtime)."""
has_context = 1 if s.get("context_usage") else 0
mtime = s.get("conversation_mtime_ns") or 0
# Defensive: ensure mtime is numeric
if not isinstance(mtime, (int, float)):
mtime = 0
return (has_context, mtime)
# Group sessions by pane
pane_groups = defaultdict(list)
for s in sessions:
zs = s.get("zellij_session", "")
zp = s.get("zellij_pane", "")
if zs and zp:
pane_groups[(zs, zp)].append(s)
# Find orphans to remove
orphan_ids = set()
for group in pane_groups.values():
if len(group) <= 1:
continue
# Pick the best session: prefer context_usage, then highest mtime
group_sorted = sorted(group, key=session_score, reverse=True)
# Mark all but the best as orphans
for s in group_sorted[1:]:
session_id = s.get("session_id")
if not session_id:
continue # Skip sessions without valid IDs
orphan_ids.add(session_id)
# Also delete the orphan session file
try:
orphan_file = SESSIONS_DIR / f"{session_id}.json"
orphan_file.unlink(missing_ok=True)
except OSError:
pass
# Return filtered list
return [s for s in sessions if s.get("session_id") not in orphan_ids]