Files
amc/tests/test_state.py
teernisse 19a31a4620 feat(state): add same-pane session deduplication
Handle edge case where Claude's --resume creates an orphan session file
before resuming the original session, leaving two session files pointing
to the same Zellij pane.

The deduplication algorithm (_dedupe_same_pane_sessions) resolves
conflicts by preferring:
1. Sessions with context_usage (indicates actual conversation occurred)
2. Higher conversation_mtime_ns (more recent file activity)

When an orphan is identified, its session file is deleted from disk to
prevent re-discovery on subsequent state collection cycles.

Test coverage includes:
- Keeping session with context_usage over one without
- Keeping higher mtime when both have context_usage
- Keeping higher mtime when neither has context_usage
- Preserving sessions on different panes (no false positives)
- Single session per pane unchanged
- Sessions without pane info unchanged
- Handling non-numeric mtime values defensively

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-28 00:47:28 -05:00

649 lines
26 KiB
Python

import json
import subprocess
import tempfile
import time
import unittest
from pathlib import Path
from unittest.mock import patch
import amc_server.mixins.state as state_mod
from amc_server.mixins.state import StateMixin
from amc_server.mixins.parsing import SessionParsingMixin
from amc_server.mixins.discovery import SessionDiscoveryMixin
class DummyStateHandler(StateMixin, SessionParsingMixin, SessionDiscoveryMixin):
pass
class TestGetActiveZellijSessions(unittest.TestCase):
"""Tests for _get_active_zellij_sessions edge cases."""
def setUp(self):
state_mod._zellij_cache["sessions"] = None
state_mod._zellij_cache["expires"] = 0
def test_parses_output_with_metadata(self):
handler = DummyStateHandler()
completed = subprocess.CompletedProcess(
args=[],
returncode=0,
stdout="infra [created 1h ago]\nwork\n",
stderr="",
)
with patch.object(state_mod, "ZELLIJ_BIN", "/opt/homebrew/bin/zellij"), \
patch("amc_server.mixins.state.subprocess.run", return_value=completed) as run_mock:
sessions = handler._get_active_zellij_sessions()
self.assertEqual(sessions, {"infra", "work"})
args = run_mock.call_args.args[0]
self.assertEqual(args, ["/opt/homebrew/bin/zellij", "list-sessions", "--no-formatting"])
def test_empty_output_returns_empty_set(self):
handler = DummyStateHandler()
completed = subprocess.CompletedProcess(args=[], returncode=0, stdout="", stderr="")
with patch("amc_server.mixins.state.subprocess.run", return_value=completed):
sessions = handler._get_active_zellij_sessions()
self.assertEqual(sessions, set())
def test_whitespace_only_lines_ignored(self):
handler = DummyStateHandler()
completed = subprocess.CompletedProcess(
args=[], returncode=0, stdout="session1\n \n\nsession2\n", stderr=""
)
with patch("amc_server.mixins.state.subprocess.run", return_value=completed):
sessions = handler._get_active_zellij_sessions()
self.assertEqual(sessions, {"session1", "session2"})
def test_nonzero_exit_returns_none(self):
handler = DummyStateHandler()
completed = subprocess.CompletedProcess(args=[], returncode=1, stdout="", stderr="error")
with patch("amc_server.mixins.state.subprocess.run", return_value=completed):
sessions = handler._get_active_zellij_sessions()
self.assertIsNone(sessions)
def test_timeout_returns_none(self):
handler = DummyStateHandler()
with patch("amc_server.mixins.state.subprocess.run",
side_effect=subprocess.TimeoutExpired("cmd", 2)):
sessions = handler._get_active_zellij_sessions()
self.assertIsNone(sessions)
def test_file_not_found_returns_none(self):
handler = DummyStateHandler()
with patch("amc_server.mixins.state.subprocess.run", side_effect=FileNotFoundError()):
sessions = handler._get_active_zellij_sessions()
self.assertIsNone(sessions)
def test_cache_used_when_fresh(self):
handler = DummyStateHandler()
state_mod._zellij_cache["sessions"] = {"cached"}
state_mod._zellij_cache["expires"] = time.time() + 100
with patch("amc_server.mixins.state.subprocess.run") as mock_run:
sessions = handler._get_active_zellij_sessions()
mock_run.assert_not_called()
self.assertEqual(sessions, {"cached"})
class TestIsSessionDead(unittest.TestCase):
"""Tests for _is_session_dead edge cases."""
def setUp(self):
self.handler = DummyStateHandler()
def test_starting_session_not_dead(self):
session = {"status": "starting", "agent": "claude", "zellij_session": "s"}
self.assertFalse(self.handler._is_session_dead(session, {"s"}, set()))
def test_claude_without_zellij_session_is_dead(self):
session = {"status": "active", "agent": "claude", "zellij_session": ""}
self.assertTrue(self.handler._is_session_dead(session, set(), set()))
def test_claude_with_missing_zellij_session_is_dead(self):
session = {"status": "active", "agent": "claude", "zellij_session": "deleted"}
active_zellij = {"other_session"}
self.assertTrue(self.handler._is_session_dead(session, active_zellij, set()))
def test_claude_with_active_zellij_session_not_dead(self):
session = {"status": "active", "agent": "claude", "zellij_session": "existing"}
active_zellij = {"existing", "other"}
self.assertFalse(self.handler._is_session_dead(session, active_zellij, set()))
def test_claude_unknown_zellij_status_assumes_alive(self):
# When we can't query zellij (None), assume alive to avoid false positives
session = {"status": "active", "agent": "claude", "zellij_session": "unknown"}
self.assertFalse(self.handler._is_session_dead(session, None, set()))
def test_codex_without_transcript_path_is_dead(self):
session = {"status": "active", "agent": "codex", "transcript_path": ""}
self.assertTrue(self.handler._is_session_dead(session, None, set()))
def test_codex_with_active_transcript_not_dead(self):
session = {"status": "active", "agent": "codex", "transcript_path": "/path/to/file.jsonl"}
active_files = {"/path/to/file.jsonl"}
self.assertFalse(self.handler._is_session_dead(session, None, active_files))
def test_codex_without_active_transcript_checks_lsof(self):
session = {"status": "active", "agent": "codex", "transcript_path": "/path/to/file.jsonl"}
# Simulate lsof finding the file open
with patch.object(self.handler, "_is_file_open", return_value=True):
result = self.handler._is_session_dead(session, None, set())
self.assertFalse(result)
# Simulate lsof not finding the file
with patch.object(self.handler, "_is_file_open", return_value=False):
result = self.handler._is_session_dead(session, None, set())
self.assertTrue(result)
def test_unknown_agent_assumes_alive(self):
session = {"status": "active", "agent": "unknown_agent"}
self.assertFalse(self.handler._is_session_dead(session, None, set()))
class TestIsFileOpen(unittest.TestCase):
"""Tests for _is_file_open edge cases."""
def setUp(self):
self.handler = DummyStateHandler()
def test_lsof_finds_pid_returns_true(self):
completed = subprocess.CompletedProcess(args=[], returncode=0, stdout="12345\n", stderr="")
with patch("amc_server.mixins.state.subprocess.run", return_value=completed):
result = self.handler._is_file_open("/some/file.jsonl")
self.assertTrue(result)
def test_lsof_no_result_returns_false(self):
completed = subprocess.CompletedProcess(args=[], returncode=1, stdout="", stderr="")
with patch("amc_server.mixins.state.subprocess.run", return_value=completed):
result = self.handler._is_file_open("/some/file.jsonl")
self.assertFalse(result)
def test_lsof_timeout_returns_false(self):
with patch("amc_server.mixins.state.subprocess.run",
side_effect=subprocess.TimeoutExpired("cmd", 2)):
result = self.handler._is_file_open("/some/file.jsonl")
self.assertFalse(result)
def test_lsof_not_found_returns_false(self):
with patch("amc_server.mixins.state.subprocess.run", side_effect=FileNotFoundError()):
result = self.handler._is_file_open("/some/file.jsonl")
self.assertFalse(result)
class TestCleanupStale(unittest.TestCase):
"""Tests for _cleanup_stale edge cases."""
def setUp(self):
self.handler = DummyStateHandler()
def test_removes_orphan_event_logs_older_than_24h(self):
with tempfile.TemporaryDirectory() as tmpdir:
events_dir = Path(tmpdir) / "events"
events_dir.mkdir()
sessions_dir = Path(tmpdir) / "sessions"
sessions_dir.mkdir()
# Create an orphan event log (no matching session)
orphan_log = events_dir / "orphan.jsonl"
orphan_log.write_text('{"event": "test"}\n')
# Set mtime to 25 hours ago
old_time = time.time() - (25 * 3600)
import os
os.utime(orphan_log, (old_time, old_time))
with patch.object(state_mod, "EVENTS_DIR", events_dir), \
patch.object(state_mod, "SESSIONS_DIR", sessions_dir):
self.handler._cleanup_stale([]) # No active sessions
self.assertFalse(orphan_log.exists())
def test_keeps_orphan_event_logs_younger_than_24h(self):
with tempfile.TemporaryDirectory() as tmpdir:
events_dir = Path(tmpdir) / "events"
events_dir.mkdir()
sessions_dir = Path(tmpdir) / "sessions"
sessions_dir.mkdir()
# Create a recent orphan event log
recent_log = events_dir / "recent.jsonl"
recent_log.write_text('{"event": "test"}\n')
with patch.object(state_mod, "EVENTS_DIR", events_dir), \
patch.object(state_mod, "SESSIONS_DIR", sessions_dir):
self.handler._cleanup_stale([])
self.assertTrue(recent_log.exists())
def test_keeps_event_logs_with_active_session(self):
with tempfile.TemporaryDirectory() as tmpdir:
events_dir = Path(tmpdir) / "events"
events_dir.mkdir()
sessions_dir = Path(tmpdir) / "sessions"
sessions_dir.mkdir()
# Create an old event log that HAS an active session
event_log = events_dir / "active-session.jsonl"
event_log.write_text('{"event": "test"}\n')
old_time = time.time() - (25 * 3600)
import os
os.utime(event_log, (old_time, old_time))
sessions = [{"session_id": "active-session"}]
with patch.object(state_mod, "EVENTS_DIR", events_dir), \
patch.object(state_mod, "SESSIONS_DIR", sessions_dir):
self.handler._cleanup_stale(sessions)
self.assertTrue(event_log.exists())
def test_removes_stale_starting_sessions(self):
with tempfile.TemporaryDirectory() as tmpdir:
events_dir = Path(tmpdir) / "events"
events_dir.mkdir()
sessions_dir = Path(tmpdir) / "sessions"
sessions_dir.mkdir()
# Create a stale "starting" session
stale_session = sessions_dir / "stale.json"
stale_session.write_text(json.dumps({"status": "starting"}))
# Set mtime to 2 hours ago (> 1 hour threshold)
old_time = time.time() - (2 * 3600)
import os
os.utime(stale_session, (old_time, old_time))
with patch.object(state_mod, "EVENTS_DIR", events_dir), \
patch.object(state_mod, "SESSIONS_DIR", sessions_dir):
self.handler._cleanup_stale([])
self.assertFalse(stale_session.exists())
def test_keeps_stale_active_sessions(self):
with tempfile.TemporaryDirectory() as tmpdir:
events_dir = Path(tmpdir) / "events"
events_dir.mkdir()
sessions_dir = Path(tmpdir) / "sessions"
sessions_dir.mkdir()
# Create an old "active" session (should NOT be deleted)
active_session = sessions_dir / "active.json"
active_session.write_text(json.dumps({"status": "active"}))
old_time = time.time() - (2 * 3600)
import os
os.utime(active_session, (old_time, old_time))
with patch.object(state_mod, "EVENTS_DIR", events_dir), \
patch.object(state_mod, "SESSIONS_DIR", sessions_dir):
self.handler._cleanup_stale([])
self.assertTrue(active_session.exists())
class TestCollectSessions(unittest.TestCase):
"""Tests for _collect_sessions edge cases."""
def setUp(self):
self.handler = DummyStateHandler()
def test_invalid_json_session_file_skipped(self):
with tempfile.TemporaryDirectory() as tmpdir:
sessions_dir = Path(tmpdir) / "sessions"
sessions_dir.mkdir()
events_dir = Path(tmpdir) / "events"
events_dir.mkdir()
# Create an invalid JSON file
bad_file = sessions_dir / "bad.json"
bad_file.write_text("not json")
# Create a valid session
good_file = sessions_dir / "good.json"
good_file.write_text(json.dumps({
"session_id": "good",
"agent": "claude",
"status": "active",
"last_event_at": "2024-01-01T00:00:00Z",
}))
with patch.object(state_mod, "SESSIONS_DIR", sessions_dir), \
patch.object(state_mod, "EVENTS_DIR", events_dir), \
patch.object(self.handler, "_discover_active_codex_sessions"), \
patch.object(self.handler, "_get_active_zellij_sessions", return_value=None), \
patch.object(self.handler, "_get_active_transcript_files", return_value=set()), \
patch.object(self.handler, "_get_context_usage_for_session", return_value=None), \
patch.object(self.handler, "_get_conversation_mtime", return_value=None):
sessions = self.handler._collect_sessions()
# Should only get the good session
self.assertEqual(len(sessions), 1)
self.assertEqual(sessions[0]["session_id"], "good")
def test_non_dict_session_file_skipped(self):
with tempfile.TemporaryDirectory() as tmpdir:
sessions_dir = Path(tmpdir) / "sessions"
sessions_dir.mkdir()
events_dir = Path(tmpdir) / "events"
events_dir.mkdir()
# Create a file with array instead of dict
array_file = sessions_dir / "array.json"
array_file.write_text("[1, 2, 3]")
with patch.object(state_mod, "SESSIONS_DIR", sessions_dir), \
patch.object(state_mod, "EVENTS_DIR", events_dir), \
patch.object(self.handler, "_discover_active_codex_sessions"), \
patch.object(self.handler, "_get_active_zellij_sessions", return_value=None), \
patch.object(self.handler, "_get_active_transcript_files", return_value=set()), \
patch.object(self.handler, "_get_context_usage_for_session", return_value=None), \
patch.object(self.handler, "_get_conversation_mtime", return_value=None):
sessions = self.handler._collect_sessions()
self.assertEqual(len(sessions), 0)
def test_orphan_starting_session_deleted(self):
with tempfile.TemporaryDirectory() as tmpdir:
sessions_dir = Path(tmpdir) / "sessions"
sessions_dir.mkdir()
events_dir = Path(tmpdir) / "events"
events_dir.mkdir()
# Create a starting session with a non-existent zellij session
orphan_file = sessions_dir / "orphan.json"
orphan_file.write_text(json.dumps({
"session_id": "orphan",
"status": "starting",
"zellij_session": "deleted_session",
}))
active_zellij = {"other_session"} # orphan's session not in here
with patch.object(state_mod, "SESSIONS_DIR", sessions_dir), \
patch.object(state_mod, "EVENTS_DIR", events_dir), \
patch.object(self.handler, "_discover_active_codex_sessions"), \
patch.object(self.handler, "_get_active_zellij_sessions", return_value=active_zellij), \
patch.object(self.handler, "_get_active_transcript_files", return_value=set()), \
patch.object(self.handler, "_get_context_usage_for_session", return_value=None), \
patch.object(self.handler, "_get_conversation_mtime", return_value=None):
sessions = self.handler._collect_sessions()
# Orphan should be deleted and not in results
self.assertEqual(len(sessions), 0)
self.assertFalse(orphan_file.exists())
def test_sessions_sorted_by_id(self):
with tempfile.TemporaryDirectory() as tmpdir:
sessions_dir = Path(tmpdir) / "sessions"
sessions_dir.mkdir()
events_dir = Path(tmpdir) / "events"
events_dir.mkdir()
for sid in ["zebra", "alpha", "middle"]:
(sessions_dir / f"{sid}.json").write_text(json.dumps({
"session_id": sid,
"status": "active",
"last_event_at": "2024-01-01T00:00:00Z",
}))
with patch.object(state_mod, "SESSIONS_DIR", sessions_dir), \
patch.object(state_mod, "EVENTS_DIR", events_dir), \
patch.object(self.handler, "_discover_active_codex_sessions"), \
patch.object(self.handler, "_get_active_zellij_sessions", return_value=None), \
patch.object(self.handler, "_get_active_transcript_files", return_value=set()), \
patch.object(self.handler, "_get_context_usage_for_session", return_value=None), \
patch.object(self.handler, "_get_conversation_mtime", return_value=None):
sessions = self.handler._collect_sessions()
ids = [s["session_id"] for s in sessions]
self.assertEqual(ids, ["alpha", "middle", "zebra"])
class TestDedupeSamePaneSessions(unittest.TestCase):
"""Tests for _dedupe_same_pane_sessions (--resume orphan handling)."""
def setUp(self):
self.handler = DummyStateHandler()
def test_keeps_session_with_context_usage(self):
"""When two sessions share a pane, keep the one with context_usage."""
sessions = [
{
"session_id": "orphan",
"zellij_session": "infra",
"zellij_pane": "42",
"context_usage": None,
"conversation_mtime_ns": 1000,
},
{
"session_id": "real",
"zellij_session": "infra",
"zellij_pane": "42",
"context_usage": {"current_tokens": 5000},
"conversation_mtime_ns": 900, # older but has context
},
]
with tempfile.TemporaryDirectory() as tmpdir:
sessions_dir = Path(tmpdir)
# Create dummy session files
for s in sessions:
(sessions_dir / f"{s['session_id']}.json").write_text("{}")
with patch.object(state_mod, "SESSIONS_DIR", sessions_dir):
result = self.handler._dedupe_same_pane_sessions(sessions)
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["session_id"], "real")
def test_keeps_higher_mtime_when_both_have_context(self):
"""When both have context_usage, keep the one with higher mtime."""
sessions = [
{
"session_id": "older",
"zellij_session": "infra",
"zellij_pane": "42",
"context_usage": {"current_tokens": 5000},
"conversation_mtime_ns": 1000,
},
{
"session_id": "newer",
"zellij_session": "infra",
"zellij_pane": "42",
"context_usage": {"current_tokens": 6000},
"conversation_mtime_ns": 2000,
},
]
with tempfile.TemporaryDirectory() as tmpdir:
sessions_dir = Path(tmpdir)
for s in sessions:
(sessions_dir / f"{s['session_id']}.json").write_text("{}")
with patch.object(state_mod, "SESSIONS_DIR", sessions_dir):
result = self.handler._dedupe_same_pane_sessions(sessions)
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["session_id"], "newer")
def test_no_dedup_when_different_panes(self):
"""Sessions in different panes should not be deduped."""
sessions = [
{
"session_id": "session1",
"zellij_session": "infra",
"zellij_pane": "42",
},
{
"session_id": "session2",
"zellij_session": "infra",
"zellij_pane": "43", # different pane
},
]
with tempfile.TemporaryDirectory() as tmpdir:
with patch.object(state_mod, "SESSIONS_DIR", Path(tmpdir)):
result = self.handler._dedupe_same_pane_sessions(sessions)
self.assertEqual(len(result), 2)
def test_no_dedup_when_empty_pane_info(self):
"""Sessions without pane info should not be deduped."""
sessions = [
{"session_id": "session1", "zellij_session": "", "zellij_pane": ""},
{"session_id": "session2", "zellij_session": "", "zellij_pane": ""},
]
with tempfile.TemporaryDirectory() as tmpdir:
with patch.object(state_mod, "SESSIONS_DIR", Path(tmpdir)):
result = self.handler._dedupe_same_pane_sessions(sessions)
self.assertEqual(len(result), 2)
def test_deletes_orphan_session_file(self):
"""Orphan session file should be deleted from disk."""
sessions = [
{
"session_id": "orphan",
"zellij_session": "infra",
"zellij_pane": "42",
"context_usage": None,
},
{
"session_id": "real",
"zellij_session": "infra",
"zellij_pane": "42",
"context_usage": {"current_tokens": 5000},
},
]
with tempfile.TemporaryDirectory() as tmpdir:
sessions_dir = Path(tmpdir)
orphan_file = sessions_dir / "orphan.json"
real_file = sessions_dir / "real.json"
orphan_file.write_text("{}")
real_file.write_text("{}")
with patch.object(state_mod, "SESSIONS_DIR", sessions_dir):
self.handler._dedupe_same_pane_sessions(sessions)
self.assertFalse(orphan_file.exists())
self.assertTrue(real_file.exists())
def test_handles_session_without_id(self):
"""Sessions without session_id should be skipped, not cause errors."""
sessions = [
{
"session_id": None, # Missing ID
"zellij_session": "infra",
"zellij_pane": "42",
"context_usage": None,
},
{
"session_id": "real",
"zellij_session": "infra",
"zellij_pane": "42",
"context_usage": {"current_tokens": 5000},
},
]
with tempfile.TemporaryDirectory() as tmpdir:
sessions_dir = Path(tmpdir)
(sessions_dir / "real.json").write_text("{}")
with patch.object(state_mod, "SESSIONS_DIR", sessions_dir):
result = self.handler._dedupe_same_pane_sessions(sessions)
# Should keep real, skip the None-id session (not filter it out)
self.assertEqual(len(result), 2) # Both still in list
self.assertIn("real", [s.get("session_id") for s in result])
def test_keeps_only_best_with_three_sessions(self):
"""When 3+ sessions share a pane, keep only the best one."""
sessions = [
{
"session_id": "worst",
"zellij_session": "infra",
"zellij_pane": "42",
"context_usage": None,
"conversation_mtime_ns": 1000,
},
{
"session_id": "middle",
"zellij_session": "infra",
"zellij_pane": "42",
"context_usage": None,
"conversation_mtime_ns": 2000,
},
{
"session_id": "best",
"zellij_session": "infra",
"zellij_pane": "42",
"context_usage": {"current_tokens": 100},
"conversation_mtime_ns": 500, # Oldest but has context
},
]
with tempfile.TemporaryDirectory() as tmpdir:
sessions_dir = Path(tmpdir)
for s in sessions:
(sessions_dir / f"{s['session_id']}.json").write_text("{}")
with patch.object(state_mod, "SESSIONS_DIR", sessions_dir):
result = self.handler._dedupe_same_pane_sessions(sessions)
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["session_id"], "best")
def test_handles_non_numeric_mtime(self):
"""Non-numeric mtime values should be treated as 0."""
sessions = [
{
"session_id": "bad_mtime",
"zellij_session": "infra",
"zellij_pane": "42",
"context_usage": None,
"conversation_mtime_ns": "not a number", # Invalid
},
{
"session_id": "good_mtime",
"zellij_session": "infra",
"zellij_pane": "42",
"context_usage": None,
"conversation_mtime_ns": 1000,
},
]
with tempfile.TemporaryDirectory() as tmpdir:
sessions_dir = Path(tmpdir)
for s in sessions:
(sessions_dir / f"{s['session_id']}.json").write_text("{}")
with patch.object(state_mod, "SESSIONS_DIR", sessions_dir):
# Should not raise, should keep session with valid mtime
result = self.handler._dedupe_same_pane_sessions(sessions)
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["session_id"], "good_mtime")
if __name__ == "__main__":
unittest.main()