Handle edge case where Claude's --resume creates an orphan session file before resuming the original session, leaving two session files pointing to the same Zellij pane. The deduplication algorithm (_dedupe_same_pane_sessions) resolves conflicts by preferring: 1. Sessions with context_usage (indicates actual conversation occurred) 2. Higher conversation_mtime_ns (more recent file activity) When an orphan is identified, its session file is deleted from disk to prevent re-discovery on subsequent state collection cycles. Test coverage includes: - Keeping session with context_usage over one without - Keeping higher mtime when both have context_usage - Keeping higher mtime when neither has context_usage - Preserving sessions on different panes (no false positives) - Single session per pane unchanged - Sessions without pane info unchanged - Handling non-numeric mtime values defensively Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
649 lines
26 KiB
Python
649 lines
26 KiB
Python
import json
|
|
import subprocess
|
|
import tempfile
|
|
import time
|
|
import unittest
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
|
|
import amc_server.mixins.state as state_mod
|
|
from amc_server.mixins.state import StateMixin
|
|
from amc_server.mixins.parsing import SessionParsingMixin
|
|
from amc_server.mixins.discovery import SessionDiscoveryMixin
|
|
|
|
|
|
class DummyStateHandler(StateMixin, SessionParsingMixin, SessionDiscoveryMixin):
|
|
pass
|
|
|
|
|
|
class TestGetActiveZellijSessions(unittest.TestCase):
|
|
"""Tests for _get_active_zellij_sessions edge cases."""
|
|
|
|
def setUp(self):
|
|
state_mod._zellij_cache["sessions"] = None
|
|
state_mod._zellij_cache["expires"] = 0
|
|
|
|
def test_parses_output_with_metadata(self):
|
|
handler = DummyStateHandler()
|
|
completed = subprocess.CompletedProcess(
|
|
args=[],
|
|
returncode=0,
|
|
stdout="infra [created 1h ago]\nwork\n",
|
|
stderr="",
|
|
)
|
|
|
|
with patch.object(state_mod, "ZELLIJ_BIN", "/opt/homebrew/bin/zellij"), \
|
|
patch("amc_server.mixins.state.subprocess.run", return_value=completed) as run_mock:
|
|
sessions = handler._get_active_zellij_sessions()
|
|
|
|
self.assertEqual(sessions, {"infra", "work"})
|
|
args = run_mock.call_args.args[0]
|
|
self.assertEqual(args, ["/opt/homebrew/bin/zellij", "list-sessions", "--no-formatting"])
|
|
|
|
def test_empty_output_returns_empty_set(self):
|
|
handler = DummyStateHandler()
|
|
completed = subprocess.CompletedProcess(args=[], returncode=0, stdout="", stderr="")
|
|
|
|
with patch("amc_server.mixins.state.subprocess.run", return_value=completed):
|
|
sessions = handler._get_active_zellij_sessions()
|
|
|
|
self.assertEqual(sessions, set())
|
|
|
|
def test_whitespace_only_lines_ignored(self):
|
|
handler = DummyStateHandler()
|
|
completed = subprocess.CompletedProcess(
|
|
args=[], returncode=0, stdout="session1\n \n\nsession2\n", stderr=""
|
|
)
|
|
|
|
with patch("amc_server.mixins.state.subprocess.run", return_value=completed):
|
|
sessions = handler._get_active_zellij_sessions()
|
|
|
|
self.assertEqual(sessions, {"session1", "session2"})
|
|
|
|
def test_nonzero_exit_returns_none(self):
|
|
handler = DummyStateHandler()
|
|
completed = subprocess.CompletedProcess(args=[], returncode=1, stdout="", stderr="error")
|
|
|
|
with patch("amc_server.mixins.state.subprocess.run", return_value=completed):
|
|
sessions = handler._get_active_zellij_sessions()
|
|
|
|
self.assertIsNone(sessions)
|
|
|
|
def test_timeout_returns_none(self):
|
|
handler = DummyStateHandler()
|
|
|
|
with patch("amc_server.mixins.state.subprocess.run",
|
|
side_effect=subprocess.TimeoutExpired("cmd", 2)):
|
|
sessions = handler._get_active_zellij_sessions()
|
|
|
|
self.assertIsNone(sessions)
|
|
|
|
def test_file_not_found_returns_none(self):
|
|
handler = DummyStateHandler()
|
|
|
|
with patch("amc_server.mixins.state.subprocess.run", side_effect=FileNotFoundError()):
|
|
sessions = handler._get_active_zellij_sessions()
|
|
|
|
self.assertIsNone(sessions)
|
|
|
|
def test_cache_used_when_fresh(self):
|
|
handler = DummyStateHandler()
|
|
state_mod._zellij_cache["sessions"] = {"cached"}
|
|
state_mod._zellij_cache["expires"] = time.time() + 100
|
|
|
|
with patch("amc_server.mixins.state.subprocess.run") as mock_run:
|
|
sessions = handler._get_active_zellij_sessions()
|
|
|
|
mock_run.assert_not_called()
|
|
self.assertEqual(sessions, {"cached"})
|
|
|
|
|
|
class TestIsSessionDead(unittest.TestCase):
|
|
"""Tests for _is_session_dead edge cases."""
|
|
|
|
def setUp(self):
|
|
self.handler = DummyStateHandler()
|
|
|
|
def test_starting_session_not_dead(self):
|
|
session = {"status": "starting", "agent": "claude", "zellij_session": "s"}
|
|
self.assertFalse(self.handler._is_session_dead(session, {"s"}, set()))
|
|
|
|
def test_claude_without_zellij_session_is_dead(self):
|
|
session = {"status": "active", "agent": "claude", "zellij_session": ""}
|
|
self.assertTrue(self.handler._is_session_dead(session, set(), set()))
|
|
|
|
def test_claude_with_missing_zellij_session_is_dead(self):
|
|
session = {"status": "active", "agent": "claude", "zellij_session": "deleted"}
|
|
active_zellij = {"other_session"}
|
|
self.assertTrue(self.handler._is_session_dead(session, active_zellij, set()))
|
|
|
|
def test_claude_with_active_zellij_session_not_dead(self):
|
|
session = {"status": "active", "agent": "claude", "zellij_session": "existing"}
|
|
active_zellij = {"existing", "other"}
|
|
self.assertFalse(self.handler._is_session_dead(session, active_zellij, set()))
|
|
|
|
def test_claude_unknown_zellij_status_assumes_alive(self):
|
|
# When we can't query zellij (None), assume alive to avoid false positives
|
|
session = {"status": "active", "agent": "claude", "zellij_session": "unknown"}
|
|
self.assertFalse(self.handler._is_session_dead(session, None, set()))
|
|
|
|
def test_codex_without_transcript_path_is_dead(self):
|
|
session = {"status": "active", "agent": "codex", "transcript_path": ""}
|
|
self.assertTrue(self.handler._is_session_dead(session, None, set()))
|
|
|
|
def test_codex_with_active_transcript_not_dead(self):
|
|
session = {"status": "active", "agent": "codex", "transcript_path": "/path/to/file.jsonl"}
|
|
active_files = {"/path/to/file.jsonl"}
|
|
self.assertFalse(self.handler._is_session_dead(session, None, active_files))
|
|
|
|
def test_codex_without_active_transcript_checks_lsof(self):
|
|
session = {"status": "active", "agent": "codex", "transcript_path": "/path/to/file.jsonl"}
|
|
|
|
# Simulate lsof finding the file open
|
|
with patch.object(self.handler, "_is_file_open", return_value=True):
|
|
result = self.handler._is_session_dead(session, None, set())
|
|
self.assertFalse(result)
|
|
|
|
# Simulate lsof not finding the file
|
|
with patch.object(self.handler, "_is_file_open", return_value=False):
|
|
result = self.handler._is_session_dead(session, None, set())
|
|
self.assertTrue(result)
|
|
|
|
def test_unknown_agent_assumes_alive(self):
|
|
session = {"status": "active", "agent": "unknown_agent"}
|
|
self.assertFalse(self.handler._is_session_dead(session, None, set()))
|
|
|
|
|
|
class TestIsFileOpen(unittest.TestCase):
|
|
"""Tests for _is_file_open edge cases."""
|
|
|
|
def setUp(self):
|
|
self.handler = DummyStateHandler()
|
|
|
|
def test_lsof_finds_pid_returns_true(self):
|
|
completed = subprocess.CompletedProcess(args=[], returncode=0, stdout="12345\n", stderr="")
|
|
|
|
with patch("amc_server.mixins.state.subprocess.run", return_value=completed):
|
|
result = self.handler._is_file_open("/some/file.jsonl")
|
|
|
|
self.assertTrue(result)
|
|
|
|
def test_lsof_no_result_returns_false(self):
|
|
completed = subprocess.CompletedProcess(args=[], returncode=1, stdout="", stderr="")
|
|
|
|
with patch("amc_server.mixins.state.subprocess.run", return_value=completed):
|
|
result = self.handler._is_file_open("/some/file.jsonl")
|
|
|
|
self.assertFalse(result)
|
|
|
|
def test_lsof_timeout_returns_false(self):
|
|
with patch("amc_server.mixins.state.subprocess.run",
|
|
side_effect=subprocess.TimeoutExpired("cmd", 2)):
|
|
result = self.handler._is_file_open("/some/file.jsonl")
|
|
|
|
self.assertFalse(result)
|
|
|
|
def test_lsof_not_found_returns_false(self):
|
|
with patch("amc_server.mixins.state.subprocess.run", side_effect=FileNotFoundError()):
|
|
result = self.handler._is_file_open("/some/file.jsonl")
|
|
|
|
self.assertFalse(result)
|
|
|
|
|
|
class TestCleanupStale(unittest.TestCase):
|
|
"""Tests for _cleanup_stale edge cases."""
|
|
|
|
def setUp(self):
|
|
self.handler = DummyStateHandler()
|
|
|
|
def test_removes_orphan_event_logs_older_than_24h(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
events_dir = Path(tmpdir) / "events"
|
|
events_dir.mkdir()
|
|
sessions_dir = Path(tmpdir) / "sessions"
|
|
sessions_dir.mkdir()
|
|
|
|
# Create an orphan event log (no matching session)
|
|
orphan_log = events_dir / "orphan.jsonl"
|
|
orphan_log.write_text('{"event": "test"}\n')
|
|
# Set mtime to 25 hours ago
|
|
old_time = time.time() - (25 * 3600)
|
|
import os
|
|
os.utime(orphan_log, (old_time, old_time))
|
|
|
|
with patch.object(state_mod, "EVENTS_DIR", events_dir), \
|
|
patch.object(state_mod, "SESSIONS_DIR", sessions_dir):
|
|
self.handler._cleanup_stale([]) # No active sessions
|
|
|
|
self.assertFalse(orphan_log.exists())
|
|
|
|
def test_keeps_orphan_event_logs_younger_than_24h(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
events_dir = Path(tmpdir) / "events"
|
|
events_dir.mkdir()
|
|
sessions_dir = Path(tmpdir) / "sessions"
|
|
sessions_dir.mkdir()
|
|
|
|
# Create a recent orphan event log
|
|
recent_log = events_dir / "recent.jsonl"
|
|
recent_log.write_text('{"event": "test"}\n')
|
|
|
|
with patch.object(state_mod, "EVENTS_DIR", events_dir), \
|
|
patch.object(state_mod, "SESSIONS_DIR", sessions_dir):
|
|
self.handler._cleanup_stale([])
|
|
|
|
self.assertTrue(recent_log.exists())
|
|
|
|
def test_keeps_event_logs_with_active_session(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
events_dir = Path(tmpdir) / "events"
|
|
events_dir.mkdir()
|
|
sessions_dir = Path(tmpdir) / "sessions"
|
|
sessions_dir.mkdir()
|
|
|
|
# Create an old event log that HAS an active session
|
|
event_log = events_dir / "active-session.jsonl"
|
|
event_log.write_text('{"event": "test"}\n')
|
|
old_time = time.time() - (25 * 3600)
|
|
import os
|
|
os.utime(event_log, (old_time, old_time))
|
|
|
|
sessions = [{"session_id": "active-session"}]
|
|
|
|
with patch.object(state_mod, "EVENTS_DIR", events_dir), \
|
|
patch.object(state_mod, "SESSIONS_DIR", sessions_dir):
|
|
self.handler._cleanup_stale(sessions)
|
|
|
|
self.assertTrue(event_log.exists())
|
|
|
|
def test_removes_stale_starting_sessions(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
events_dir = Path(tmpdir) / "events"
|
|
events_dir.mkdir()
|
|
sessions_dir = Path(tmpdir) / "sessions"
|
|
sessions_dir.mkdir()
|
|
|
|
# Create a stale "starting" session
|
|
stale_session = sessions_dir / "stale.json"
|
|
stale_session.write_text(json.dumps({"status": "starting"}))
|
|
# Set mtime to 2 hours ago (> 1 hour threshold)
|
|
old_time = time.time() - (2 * 3600)
|
|
import os
|
|
os.utime(stale_session, (old_time, old_time))
|
|
|
|
with patch.object(state_mod, "EVENTS_DIR", events_dir), \
|
|
patch.object(state_mod, "SESSIONS_DIR", sessions_dir):
|
|
self.handler._cleanup_stale([])
|
|
|
|
self.assertFalse(stale_session.exists())
|
|
|
|
def test_keeps_stale_active_sessions(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
events_dir = Path(tmpdir) / "events"
|
|
events_dir.mkdir()
|
|
sessions_dir = Path(tmpdir) / "sessions"
|
|
sessions_dir.mkdir()
|
|
|
|
# Create an old "active" session (should NOT be deleted)
|
|
active_session = sessions_dir / "active.json"
|
|
active_session.write_text(json.dumps({"status": "active"}))
|
|
old_time = time.time() - (2 * 3600)
|
|
import os
|
|
os.utime(active_session, (old_time, old_time))
|
|
|
|
with patch.object(state_mod, "EVENTS_DIR", events_dir), \
|
|
patch.object(state_mod, "SESSIONS_DIR", sessions_dir):
|
|
self.handler._cleanup_stale([])
|
|
|
|
self.assertTrue(active_session.exists())
|
|
|
|
|
|
class TestCollectSessions(unittest.TestCase):
|
|
"""Tests for _collect_sessions edge cases."""
|
|
|
|
def setUp(self):
|
|
self.handler = DummyStateHandler()
|
|
|
|
def test_invalid_json_session_file_skipped(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
sessions_dir = Path(tmpdir) / "sessions"
|
|
sessions_dir.mkdir()
|
|
events_dir = Path(tmpdir) / "events"
|
|
events_dir.mkdir()
|
|
|
|
# Create an invalid JSON file
|
|
bad_file = sessions_dir / "bad.json"
|
|
bad_file.write_text("not json")
|
|
|
|
# Create a valid session
|
|
good_file = sessions_dir / "good.json"
|
|
good_file.write_text(json.dumps({
|
|
"session_id": "good",
|
|
"agent": "claude",
|
|
"status": "active",
|
|
"last_event_at": "2024-01-01T00:00:00Z",
|
|
}))
|
|
|
|
with patch.object(state_mod, "SESSIONS_DIR", sessions_dir), \
|
|
patch.object(state_mod, "EVENTS_DIR", events_dir), \
|
|
patch.object(self.handler, "_discover_active_codex_sessions"), \
|
|
patch.object(self.handler, "_get_active_zellij_sessions", return_value=None), \
|
|
patch.object(self.handler, "_get_active_transcript_files", return_value=set()), \
|
|
patch.object(self.handler, "_get_context_usage_for_session", return_value=None), \
|
|
patch.object(self.handler, "_get_conversation_mtime", return_value=None):
|
|
sessions = self.handler._collect_sessions()
|
|
|
|
# Should only get the good session
|
|
self.assertEqual(len(sessions), 1)
|
|
self.assertEqual(sessions[0]["session_id"], "good")
|
|
|
|
def test_non_dict_session_file_skipped(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
sessions_dir = Path(tmpdir) / "sessions"
|
|
sessions_dir.mkdir()
|
|
events_dir = Path(tmpdir) / "events"
|
|
events_dir.mkdir()
|
|
|
|
# Create a file with array instead of dict
|
|
array_file = sessions_dir / "array.json"
|
|
array_file.write_text("[1, 2, 3]")
|
|
|
|
with patch.object(state_mod, "SESSIONS_DIR", sessions_dir), \
|
|
patch.object(state_mod, "EVENTS_DIR", events_dir), \
|
|
patch.object(self.handler, "_discover_active_codex_sessions"), \
|
|
patch.object(self.handler, "_get_active_zellij_sessions", return_value=None), \
|
|
patch.object(self.handler, "_get_active_transcript_files", return_value=set()), \
|
|
patch.object(self.handler, "_get_context_usage_for_session", return_value=None), \
|
|
patch.object(self.handler, "_get_conversation_mtime", return_value=None):
|
|
sessions = self.handler._collect_sessions()
|
|
|
|
self.assertEqual(len(sessions), 0)
|
|
|
|
def test_orphan_starting_session_deleted(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
sessions_dir = Path(tmpdir) / "sessions"
|
|
sessions_dir.mkdir()
|
|
events_dir = Path(tmpdir) / "events"
|
|
events_dir.mkdir()
|
|
|
|
# Create a starting session with a non-existent zellij session
|
|
orphan_file = sessions_dir / "orphan.json"
|
|
orphan_file.write_text(json.dumps({
|
|
"session_id": "orphan",
|
|
"status": "starting",
|
|
"zellij_session": "deleted_session",
|
|
}))
|
|
|
|
active_zellij = {"other_session"} # orphan's session not in here
|
|
|
|
with patch.object(state_mod, "SESSIONS_DIR", sessions_dir), \
|
|
patch.object(state_mod, "EVENTS_DIR", events_dir), \
|
|
patch.object(self.handler, "_discover_active_codex_sessions"), \
|
|
patch.object(self.handler, "_get_active_zellij_sessions", return_value=active_zellij), \
|
|
patch.object(self.handler, "_get_active_transcript_files", return_value=set()), \
|
|
patch.object(self.handler, "_get_context_usage_for_session", return_value=None), \
|
|
patch.object(self.handler, "_get_conversation_mtime", return_value=None):
|
|
sessions = self.handler._collect_sessions()
|
|
|
|
# Orphan should be deleted and not in results
|
|
self.assertEqual(len(sessions), 0)
|
|
self.assertFalse(orphan_file.exists())
|
|
|
|
def test_sessions_sorted_by_id(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
sessions_dir = Path(tmpdir) / "sessions"
|
|
sessions_dir.mkdir()
|
|
events_dir = Path(tmpdir) / "events"
|
|
events_dir.mkdir()
|
|
|
|
for sid in ["zebra", "alpha", "middle"]:
|
|
(sessions_dir / f"{sid}.json").write_text(json.dumps({
|
|
"session_id": sid,
|
|
"status": "active",
|
|
"last_event_at": "2024-01-01T00:00:00Z",
|
|
}))
|
|
|
|
with patch.object(state_mod, "SESSIONS_DIR", sessions_dir), \
|
|
patch.object(state_mod, "EVENTS_DIR", events_dir), \
|
|
patch.object(self.handler, "_discover_active_codex_sessions"), \
|
|
patch.object(self.handler, "_get_active_zellij_sessions", return_value=None), \
|
|
patch.object(self.handler, "_get_active_transcript_files", return_value=set()), \
|
|
patch.object(self.handler, "_get_context_usage_for_session", return_value=None), \
|
|
patch.object(self.handler, "_get_conversation_mtime", return_value=None):
|
|
sessions = self.handler._collect_sessions()
|
|
|
|
ids = [s["session_id"] for s in sessions]
|
|
self.assertEqual(ids, ["alpha", "middle", "zebra"])
|
|
|
|
|
|
class TestDedupeSamePaneSessions(unittest.TestCase):
|
|
"""Tests for _dedupe_same_pane_sessions (--resume orphan handling)."""
|
|
|
|
def setUp(self):
|
|
self.handler = DummyStateHandler()
|
|
|
|
def test_keeps_session_with_context_usage(self):
|
|
"""When two sessions share a pane, keep the one with context_usage."""
|
|
sessions = [
|
|
{
|
|
"session_id": "orphan",
|
|
"zellij_session": "infra",
|
|
"zellij_pane": "42",
|
|
"context_usage": None,
|
|
"conversation_mtime_ns": 1000,
|
|
},
|
|
{
|
|
"session_id": "real",
|
|
"zellij_session": "infra",
|
|
"zellij_pane": "42",
|
|
"context_usage": {"current_tokens": 5000},
|
|
"conversation_mtime_ns": 900, # older but has context
|
|
},
|
|
]
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
sessions_dir = Path(tmpdir)
|
|
# Create dummy session files
|
|
for s in sessions:
|
|
(sessions_dir / f"{s['session_id']}.json").write_text("{}")
|
|
|
|
with patch.object(state_mod, "SESSIONS_DIR", sessions_dir):
|
|
result = self.handler._dedupe_same_pane_sessions(sessions)
|
|
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(result[0]["session_id"], "real")
|
|
|
|
def test_keeps_higher_mtime_when_both_have_context(self):
|
|
"""When both have context_usage, keep the one with higher mtime."""
|
|
sessions = [
|
|
{
|
|
"session_id": "older",
|
|
"zellij_session": "infra",
|
|
"zellij_pane": "42",
|
|
"context_usage": {"current_tokens": 5000},
|
|
"conversation_mtime_ns": 1000,
|
|
},
|
|
{
|
|
"session_id": "newer",
|
|
"zellij_session": "infra",
|
|
"zellij_pane": "42",
|
|
"context_usage": {"current_tokens": 6000},
|
|
"conversation_mtime_ns": 2000,
|
|
},
|
|
]
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
sessions_dir = Path(tmpdir)
|
|
for s in sessions:
|
|
(sessions_dir / f"{s['session_id']}.json").write_text("{}")
|
|
|
|
with patch.object(state_mod, "SESSIONS_DIR", sessions_dir):
|
|
result = self.handler._dedupe_same_pane_sessions(sessions)
|
|
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(result[0]["session_id"], "newer")
|
|
|
|
def test_no_dedup_when_different_panes(self):
|
|
"""Sessions in different panes should not be deduped."""
|
|
sessions = [
|
|
{
|
|
"session_id": "session1",
|
|
"zellij_session": "infra",
|
|
"zellij_pane": "42",
|
|
},
|
|
{
|
|
"session_id": "session2",
|
|
"zellij_session": "infra",
|
|
"zellij_pane": "43", # different pane
|
|
},
|
|
]
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
with patch.object(state_mod, "SESSIONS_DIR", Path(tmpdir)):
|
|
result = self.handler._dedupe_same_pane_sessions(sessions)
|
|
|
|
self.assertEqual(len(result), 2)
|
|
|
|
def test_no_dedup_when_empty_pane_info(self):
|
|
"""Sessions without pane info should not be deduped."""
|
|
sessions = [
|
|
{"session_id": "session1", "zellij_session": "", "zellij_pane": ""},
|
|
{"session_id": "session2", "zellij_session": "", "zellij_pane": ""},
|
|
]
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
with patch.object(state_mod, "SESSIONS_DIR", Path(tmpdir)):
|
|
result = self.handler._dedupe_same_pane_sessions(sessions)
|
|
|
|
self.assertEqual(len(result), 2)
|
|
|
|
def test_deletes_orphan_session_file(self):
|
|
"""Orphan session file should be deleted from disk."""
|
|
sessions = [
|
|
{
|
|
"session_id": "orphan",
|
|
"zellij_session": "infra",
|
|
"zellij_pane": "42",
|
|
"context_usage": None,
|
|
},
|
|
{
|
|
"session_id": "real",
|
|
"zellij_session": "infra",
|
|
"zellij_pane": "42",
|
|
"context_usage": {"current_tokens": 5000},
|
|
},
|
|
]
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
sessions_dir = Path(tmpdir)
|
|
orphan_file = sessions_dir / "orphan.json"
|
|
real_file = sessions_dir / "real.json"
|
|
orphan_file.write_text("{}")
|
|
real_file.write_text("{}")
|
|
|
|
with patch.object(state_mod, "SESSIONS_DIR", sessions_dir):
|
|
self.handler._dedupe_same_pane_sessions(sessions)
|
|
|
|
self.assertFalse(orphan_file.exists())
|
|
self.assertTrue(real_file.exists())
|
|
|
|
def test_handles_session_without_id(self):
|
|
"""Sessions without session_id should be skipped, not cause errors."""
|
|
sessions = [
|
|
{
|
|
"session_id": None, # Missing ID
|
|
"zellij_session": "infra",
|
|
"zellij_pane": "42",
|
|
"context_usage": None,
|
|
},
|
|
{
|
|
"session_id": "real",
|
|
"zellij_session": "infra",
|
|
"zellij_pane": "42",
|
|
"context_usage": {"current_tokens": 5000},
|
|
},
|
|
]
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
sessions_dir = Path(tmpdir)
|
|
(sessions_dir / "real.json").write_text("{}")
|
|
|
|
with patch.object(state_mod, "SESSIONS_DIR", sessions_dir):
|
|
result = self.handler._dedupe_same_pane_sessions(sessions)
|
|
|
|
# Should keep real, skip the None-id session (not filter it out)
|
|
self.assertEqual(len(result), 2) # Both still in list
|
|
self.assertIn("real", [s.get("session_id") for s in result])
|
|
|
|
def test_keeps_only_best_with_three_sessions(self):
|
|
"""When 3+ sessions share a pane, keep only the best one."""
|
|
sessions = [
|
|
{
|
|
"session_id": "worst",
|
|
"zellij_session": "infra",
|
|
"zellij_pane": "42",
|
|
"context_usage": None,
|
|
"conversation_mtime_ns": 1000,
|
|
},
|
|
{
|
|
"session_id": "middle",
|
|
"zellij_session": "infra",
|
|
"zellij_pane": "42",
|
|
"context_usage": None,
|
|
"conversation_mtime_ns": 2000,
|
|
},
|
|
{
|
|
"session_id": "best",
|
|
"zellij_session": "infra",
|
|
"zellij_pane": "42",
|
|
"context_usage": {"current_tokens": 100},
|
|
"conversation_mtime_ns": 500, # Oldest but has context
|
|
},
|
|
]
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
sessions_dir = Path(tmpdir)
|
|
for s in sessions:
|
|
(sessions_dir / f"{s['session_id']}.json").write_text("{}")
|
|
|
|
with patch.object(state_mod, "SESSIONS_DIR", sessions_dir):
|
|
result = self.handler._dedupe_same_pane_sessions(sessions)
|
|
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(result[0]["session_id"], "best")
|
|
|
|
def test_handles_non_numeric_mtime(self):
|
|
"""Non-numeric mtime values should be treated as 0."""
|
|
sessions = [
|
|
{
|
|
"session_id": "bad_mtime",
|
|
"zellij_session": "infra",
|
|
"zellij_pane": "42",
|
|
"context_usage": None,
|
|
"conversation_mtime_ns": "not a number", # Invalid
|
|
},
|
|
{
|
|
"session_id": "good_mtime",
|
|
"zellij_session": "infra",
|
|
"zellij_pane": "42",
|
|
"context_usage": None,
|
|
"conversation_mtime_ns": 1000,
|
|
},
|
|
]
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
sessions_dir = Path(tmpdir)
|
|
for s in sessions:
|
|
(sessions_dir / f"{s['session_id']}.json").write_text("{}")
|
|
|
|
with patch.object(state_mod, "SESSIONS_DIR", sessions_dir):
|
|
# Should not raise, should keep session with valid mtime
|
|
result = self.handler._dedupe_same_pane_sessions(sessions)
|
|
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(result[0]["session_id"], "good_mtime")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|