Split the monolithic context.py (117 lines) into five purpose-specific modules following single-responsibility principle: - config.py: Server-level constants (DATA_DIR, SESSIONS_DIR, PORT, STALE_EVENT_AGE, _state_lock) - agents.py: Agent-specific paths and caches (CLAUDE_PROJECTS_DIR, CODEX_SESSIONS_DIR, discovery caches) - auth.py: Authentication token generation/validation for spawn endpoint - spawn_config.py: Spawn feature configuration (PENDING_SPAWNS_DIR, rate limiting, projects watcher thread) - zellij.py: Zellij binary resolution and session management constants This refactoring improves: - Code navigation: Find relevant constants by domain, not alphabetically - Testing: Each module can be tested in isolation - Import clarity: Mixins import only what they need - Future maintenance: Changes to one domain don't risk breaking others All mixins updated to import from new module locations. Tests updated to use new import paths. Includes PROPOSED_CODE_FILE_REORGANIZATION_PLAN.md documenting the rationale and mapping from old to new locations. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
362 lines
15 KiB
Python
362 lines
15 KiB
Python
"""Tests for mixins/discovery.py edge cases.
|
|
|
|
Unit tests for Codex session discovery and pane matching.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
import time
|
|
import unittest
|
|
from pathlib import Path
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
from amc_server.mixins.discovery import SessionDiscoveryMixin
|
|
from amc_server.mixins.parsing import SessionParsingMixin
|
|
|
|
|
|
class DummyDiscoveryHandler(SessionDiscoveryMixin, SessionParsingMixin):
|
|
"""Minimal handler for testing discovery mixin."""
|
|
pass
|
|
|
|
|
|
class TestGetCodexPaneInfo(unittest.TestCase):
|
|
"""Tests for _get_codex_pane_info edge cases."""
|
|
|
|
def setUp(self):
|
|
self.handler = DummyDiscoveryHandler()
|
|
# Clear cache before each test
|
|
from amc_server.agents import _codex_pane_cache
|
|
_codex_pane_cache["expires"] = 0
|
|
_codex_pane_cache["pid_info"] = {}
|
|
_codex_pane_cache["cwd_map"] = {}
|
|
|
|
def test_pgrep_failure_returns_empty(self):
|
|
failed = subprocess.CompletedProcess(args=[], returncode=1, stdout="", stderr="")
|
|
|
|
with patch("amc_server.mixins.discovery.subprocess.run", return_value=failed):
|
|
pid_info, cwd_map = self.handler._get_codex_pane_info()
|
|
|
|
self.assertEqual(pid_info, {})
|
|
self.assertEqual(cwd_map, {})
|
|
|
|
def test_no_codex_processes_returns_empty(self):
|
|
no_results = subprocess.CompletedProcess(args=[], returncode=0, stdout="", stderr="")
|
|
|
|
with patch("amc_server.mixins.discovery.subprocess.run", return_value=no_results):
|
|
pid_info, cwd_map = self.handler._get_codex_pane_info()
|
|
|
|
self.assertEqual(pid_info, {})
|
|
self.assertEqual(cwd_map, {})
|
|
|
|
def test_extracts_zellij_env_vars(self):
|
|
pgrep_result = subprocess.CompletedProcess(args=[], returncode=0, stdout="12345\n", stderr="")
|
|
ps_result = subprocess.CompletedProcess(
|
|
args=[], returncode=0,
|
|
stdout="codex ZELLIJ_PANE_ID=7 ZELLIJ_SESSION_NAME=myproject",
|
|
stderr=""
|
|
)
|
|
lsof_result = subprocess.CompletedProcess(
|
|
args=[], returncode=0,
|
|
stdout="p12345\nn/Users/test/project",
|
|
stderr=""
|
|
)
|
|
|
|
def mock_run(args, **kwargs):
|
|
if args[0] == "pgrep":
|
|
return pgrep_result
|
|
elif args[0] == "ps":
|
|
return ps_result
|
|
elif args[0] == "lsof":
|
|
return lsof_result
|
|
return subprocess.CompletedProcess(args=[], returncode=1, stdout="", stderr="")
|
|
|
|
with patch("amc_server.mixins.discovery.subprocess.run", side_effect=mock_run):
|
|
pid_info, cwd_map = self.handler._get_codex_pane_info()
|
|
|
|
self.assertIn("12345", pid_info)
|
|
self.assertEqual(pid_info["12345"]["pane_id"], "7")
|
|
self.assertEqual(pid_info["12345"]["zellij_session"], "myproject")
|
|
|
|
def test_cache_used_when_fresh(self):
|
|
from amc_server.agents import _codex_pane_cache
|
|
_codex_pane_cache["pid_info"] = {"cached": {"pane_id": "1", "zellij_session": "s"}}
|
|
_codex_pane_cache["cwd_map"] = {"/cached/path": {"session": "s", "pane_id": "1"}}
|
|
_codex_pane_cache["expires"] = time.time() + 100
|
|
|
|
# Should not call subprocess
|
|
with patch("amc_server.mixins.discovery.subprocess.run") as mock_run:
|
|
pid_info, cwd_map = self.handler._get_codex_pane_info()
|
|
|
|
mock_run.assert_not_called()
|
|
self.assertEqual(pid_info, {"cached": {"pane_id": "1", "zellij_session": "s"}})
|
|
|
|
def test_timeout_handled_gracefully(self):
|
|
with patch("amc_server.mixins.discovery.subprocess.run",
|
|
side_effect=subprocess.TimeoutExpired("cmd", 2)):
|
|
pid_info, cwd_map = self.handler._get_codex_pane_info()
|
|
|
|
self.assertEqual(pid_info, {})
|
|
self.assertEqual(cwd_map, {})
|
|
|
|
|
|
class TestMatchCodexSessionToPane(unittest.TestCase):
|
|
"""Tests for _match_codex_session_to_pane edge cases."""
|
|
|
|
def setUp(self):
|
|
self.handler = DummyDiscoveryHandler()
|
|
|
|
def test_lsof_match_found(self):
|
|
"""When lsof finds a PID with the session file open, use that match."""
|
|
pid_info = {
|
|
"12345": {"pane_id": "7", "zellij_session": "project"},
|
|
}
|
|
cwd_map = {}
|
|
|
|
lsof_result = subprocess.CompletedProcess(
|
|
args=[], returncode=0, stdout="12345\n", stderr=""
|
|
)
|
|
|
|
with patch("amc_server.mixins.discovery.subprocess.run", return_value=lsof_result):
|
|
session, pane = self.handler._match_codex_session_to_pane(
|
|
Path("/some/session.jsonl"), "/project", pid_info, cwd_map
|
|
)
|
|
|
|
self.assertEqual(session, "project")
|
|
self.assertEqual(pane, "7")
|
|
|
|
def test_cwd_fallback_when_lsof_fails(self):
|
|
"""When lsof doesn't find a match, fall back to CWD matching."""
|
|
pid_info = {}
|
|
cwd_map = {
|
|
"/home/user/project": {"session": "myproject", "pane_id": "3"},
|
|
}
|
|
|
|
lsof_result = subprocess.CompletedProcess(
|
|
args=[], returncode=1, stdout="", stderr=""
|
|
)
|
|
|
|
with patch("amc_server.mixins.discovery.subprocess.run", return_value=lsof_result):
|
|
session, pane = self.handler._match_codex_session_to_pane(
|
|
Path("/some/session.jsonl"), "/home/user/project", pid_info, cwd_map
|
|
)
|
|
|
|
self.assertEqual(session, "myproject")
|
|
self.assertEqual(pane, "3")
|
|
|
|
def test_no_match_returns_empty_strings(self):
|
|
pid_info = {}
|
|
cwd_map = {}
|
|
|
|
lsof_result = subprocess.CompletedProcess(
|
|
args=[], returncode=1, stdout="", stderr=""
|
|
)
|
|
|
|
with patch("amc_server.mixins.discovery.subprocess.run", return_value=lsof_result):
|
|
session, pane = self.handler._match_codex_session_to_pane(
|
|
Path("/some/session.jsonl"), "/unmatched/path", pid_info, cwd_map
|
|
)
|
|
|
|
self.assertEqual(session, "")
|
|
self.assertEqual(pane, "")
|
|
|
|
def test_cwd_normalized_for_matching(self):
|
|
"""CWD paths should be normalized for comparison."""
|
|
pid_info = {}
|
|
cwd_map = {
|
|
"/home/user/project": {"session": "proj", "pane_id": "1"},
|
|
}
|
|
|
|
lsof_result = subprocess.CompletedProcess(
|
|
args=[], returncode=1, stdout="", stderr=""
|
|
)
|
|
|
|
with patch("amc_server.mixins.discovery.subprocess.run", return_value=lsof_result):
|
|
# Session CWD has trailing slash and extra dots
|
|
session, pane = self.handler._match_codex_session_to_pane(
|
|
Path("/some/session.jsonl"), "/home/user/./project/", pid_info, cwd_map
|
|
)
|
|
|
|
self.assertEqual(session, "proj")
|
|
|
|
def test_empty_session_cwd_no_match(self):
|
|
pid_info = {}
|
|
cwd_map = {"/some/path": {"session": "s", "pane_id": "1"}}
|
|
|
|
lsof_result = subprocess.CompletedProcess(
|
|
args=[], returncode=1, stdout="", stderr=""
|
|
)
|
|
|
|
with patch("amc_server.mixins.discovery.subprocess.run", return_value=lsof_result):
|
|
session, pane = self.handler._match_codex_session_to_pane(
|
|
Path("/some/session.jsonl"), "", pid_info, cwd_map
|
|
)
|
|
|
|
self.assertEqual(session, "")
|
|
self.assertEqual(pane, "")
|
|
|
|
|
|
class TestDiscoverActiveCodexSessions(unittest.TestCase):
|
|
"""Tests for _discover_active_codex_sessions edge cases."""
|
|
|
|
def setUp(self):
|
|
self.handler = DummyDiscoveryHandler()
|
|
# Clear caches
|
|
from amc_server.agents import _codex_transcript_cache, _dismissed_codex_ids
|
|
_codex_transcript_cache.clear()
|
|
_dismissed_codex_ids.clear()
|
|
|
|
def test_skips_when_codex_sessions_dir_missing(self):
|
|
with patch("amc_server.mixins.discovery.CODEX_SESSIONS_DIR", Path("/nonexistent")):
|
|
# Should not raise
|
|
self.handler._discover_active_codex_sessions()
|
|
|
|
def test_skips_old_files(self):
|
|
"""Files older than CODEX_ACTIVE_WINDOW should be skipped."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
codex_dir = Path(tmpdir)
|
|
sessions_dir = Path(tmpdir) / "sessions"
|
|
sessions_dir.mkdir()
|
|
|
|
# Create an old transcript file
|
|
old_file = codex_dir / "old-12345678-1234-1234-1234-123456789abc.jsonl"
|
|
old_file.write_text('{"type": "session_meta", "payload": {"cwd": "/test"}}\n')
|
|
# Set mtime to 2 hours ago
|
|
old_time = time.time() - 7200
|
|
os.utime(old_file, (old_time, old_time))
|
|
|
|
with patch("amc_server.mixins.discovery.CODEX_SESSIONS_DIR", codex_dir), \
|
|
patch("amc_server.mixins.discovery.SESSIONS_DIR", sessions_dir):
|
|
self.handler._get_codex_pane_info = MagicMock(return_value=({}, {}))
|
|
self.handler._discover_active_codex_sessions()
|
|
|
|
# Should not have created a session file
|
|
self.assertEqual(list(sessions_dir.glob("*.json")), [])
|
|
|
|
def test_skips_dismissed_sessions(self):
|
|
"""Sessions in _dismissed_codex_ids should be skipped."""
|
|
from amc_server.agents import _dismissed_codex_ids
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
codex_dir = Path(tmpdir)
|
|
sessions_dir = Path(tmpdir) / "sessions"
|
|
sessions_dir.mkdir()
|
|
|
|
# Create a recent transcript file
|
|
session_id = "12345678-1234-1234-1234-123456789abc"
|
|
transcript = codex_dir / f"session-{session_id}.jsonl"
|
|
transcript.write_text('{"type": "session_meta", "payload": {"cwd": "/test"}}\n')
|
|
|
|
# Mark as dismissed
|
|
_dismissed_codex_ids[session_id] = True
|
|
|
|
with patch("amc_server.mixins.discovery.CODEX_SESSIONS_DIR", codex_dir), \
|
|
patch("amc_server.mixins.discovery.SESSIONS_DIR", sessions_dir):
|
|
self.handler._get_codex_pane_info = MagicMock(return_value=({}, {}))
|
|
self.handler._discover_active_codex_sessions()
|
|
|
|
# Should not have created a session file
|
|
self.assertEqual(list(sessions_dir.glob("*.json")), [])
|
|
|
|
def test_skips_non_uuid_filenames(self):
|
|
"""Files without a UUID in the name should be skipped."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
codex_dir = Path(tmpdir)
|
|
sessions_dir = Path(tmpdir) / "sessions"
|
|
sessions_dir.mkdir()
|
|
|
|
# Create a file without a UUID
|
|
no_uuid = codex_dir / "random-name.jsonl"
|
|
no_uuid.write_text('{"type": "session_meta", "payload": {"cwd": "/test"}}\n')
|
|
|
|
with patch("amc_server.mixins.discovery.CODEX_SESSIONS_DIR", codex_dir), \
|
|
patch("amc_server.mixins.discovery.SESSIONS_DIR", sessions_dir):
|
|
self.handler._get_codex_pane_info = MagicMock(return_value=({}, {}))
|
|
self.handler._discover_active_codex_sessions()
|
|
|
|
self.assertEqual(list(sessions_dir.glob("*.json")), [])
|
|
|
|
def test_skips_non_session_meta_first_line(self):
|
|
"""Files without session_meta as first line should be skipped."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
codex_dir = Path(tmpdir)
|
|
sessions_dir = Path(tmpdir) / "sessions"
|
|
sessions_dir.mkdir()
|
|
|
|
session_id = "12345678-1234-1234-1234-123456789abc"
|
|
transcript = codex_dir / f"session-{session_id}.jsonl"
|
|
# First line is not session_meta
|
|
transcript.write_text('{"type": "response_item", "payload": {}}\n')
|
|
|
|
with patch("amc_server.mixins.discovery.CODEX_SESSIONS_DIR", codex_dir), \
|
|
patch("amc_server.mixins.discovery.SESSIONS_DIR", sessions_dir):
|
|
self.handler._get_codex_pane_info = MagicMock(return_value=({}, {}))
|
|
self.handler._discover_active_codex_sessions()
|
|
|
|
self.assertEqual(list(sessions_dir.glob("*.json")), [])
|
|
|
|
def test_creates_session_file_for_valid_transcript(self):
|
|
"""Valid recent transcripts should create session files."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
codex_dir = Path(tmpdir)
|
|
sessions_dir = Path(tmpdir) / "sessions"
|
|
sessions_dir.mkdir()
|
|
|
|
session_id = "12345678-1234-1234-1234-123456789abc"
|
|
transcript = codex_dir / f"session-{session_id}.jsonl"
|
|
transcript.write_text(json.dumps({
|
|
"type": "session_meta",
|
|
"payload": {"cwd": "/test/project", "timestamp": "2024-01-01T00:00:00Z"}
|
|
}) + "\n")
|
|
|
|
with patch("amc_server.mixins.discovery.CODEX_SESSIONS_DIR", codex_dir), \
|
|
patch("amc_server.mixins.discovery.SESSIONS_DIR", sessions_dir):
|
|
self.handler._get_codex_pane_info = MagicMock(return_value=({}, {}))
|
|
self.handler._match_codex_session_to_pane = MagicMock(return_value=("proj", "5"))
|
|
self.handler._get_cached_context_usage = MagicMock(return_value=None)
|
|
self.handler._discover_active_codex_sessions()
|
|
|
|
session_file = sessions_dir / f"{session_id}.json"
|
|
self.assertTrue(session_file.exists())
|
|
|
|
data = json.loads(session_file.read_text())
|
|
self.assertEqual(data["session_id"], session_id)
|
|
self.assertEqual(data["agent"], "codex")
|
|
self.assertEqual(data["project"], "project")
|
|
self.assertEqual(data["zellij_session"], "proj")
|
|
self.assertEqual(data["zellij_pane"], "5")
|
|
|
|
def test_determines_status_by_file_age(self):
|
|
"""Recent files should be 'active', older ones 'done'."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
codex_dir = Path(tmpdir)
|
|
sessions_dir = Path(tmpdir) / "sessions"
|
|
sessions_dir.mkdir()
|
|
|
|
session_id = "12345678-1234-1234-1234-123456789abc"
|
|
transcript = codex_dir / f"session-{session_id}.jsonl"
|
|
transcript.write_text(json.dumps({
|
|
"type": "session_meta",
|
|
"payload": {"cwd": "/test"}
|
|
}) + "\n")
|
|
|
|
# Set mtime to 3 minutes ago (> 2 min threshold)
|
|
old_time = time.time() - 180
|
|
os.utime(transcript, (old_time, old_time))
|
|
|
|
with patch("amc_server.mixins.discovery.CODEX_SESSIONS_DIR", codex_dir), \
|
|
patch("amc_server.mixins.discovery.SESSIONS_DIR", sessions_dir):
|
|
self.handler._get_codex_pane_info = MagicMock(return_value=({}, {}))
|
|
self.handler._match_codex_session_to_pane = MagicMock(return_value=("", ""))
|
|
self.handler._get_cached_context_usage = MagicMock(return_value=None)
|
|
self.handler._discover_active_codex_sessions()
|
|
|
|
session_file = sessions_dir / f"{session_id}.json"
|
|
data = json.loads(session_file.read_text())
|
|
self.assertEqual(data["status"], "done")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|