feat(dashboard): add click-outside dismissal for autocomplete dropdown

Closes bd-3ny. Added mousedown listener that dismisses the dropdown when
clicking outside both the dropdown and textarea. Uses early return to avoid
registering listeners when dropdown is already closed.
This commit is contained in:
teernisse
2026-02-26 16:52:36 -05:00
parent ba16daac2a
commit db3d2a2e31
35 changed files with 5560 additions and 104 deletions

View File

@@ -172,5 +172,358 @@ class SessionControlMixinTests(unittest.TestCase):
handler._try_write_chars_inject.assert_called_once()
class TestParsePaneId(unittest.TestCase):
"""Tests for _parse_pane_id edge cases."""
def setUp(self):
self.handler = DummyControlHandler()
def test_empty_string_returns_none(self):
self.assertIsNone(self.handler._parse_pane_id(""))
def test_none_returns_none(self):
self.assertIsNone(self.handler._parse_pane_id(None))
def test_direct_int_string_parses(self):
self.assertEqual(self.handler._parse_pane_id("42"), 42)
def test_terminal_format_parses(self):
self.assertEqual(self.handler._parse_pane_id("terminal_5"), 5)
def test_plugin_format_parses(self):
self.assertEqual(self.handler._parse_pane_id("plugin_3"), 3)
def test_unknown_prefix_returns_none(self):
self.assertIsNone(self.handler._parse_pane_id("pane_7"))
def test_non_numeric_suffix_returns_none(self):
self.assertIsNone(self.handler._parse_pane_id("terminal_abc"))
def test_too_many_underscores_returns_none(self):
self.assertIsNone(self.handler._parse_pane_id("terminal_5_extra"))
def test_negative_int_parses(self):
# Edge case: negative numbers
self.assertEqual(self.handler._parse_pane_id("-1"), -1)
class TestGetSubmitEnterDelaySec(unittest.TestCase):
"""Tests for _get_submit_enter_delay_sec edge cases."""
def setUp(self):
self.handler = DummyControlHandler()
def test_unset_env_returns_default(self):
with patch.dict(os.environ, {}, clear=True):
result = self.handler._get_submit_enter_delay_sec()
self.assertEqual(result, 0.20)
def test_empty_string_returns_default(self):
with patch.dict(os.environ, {"AMC_SUBMIT_ENTER_DELAY_MS": ""}, clear=True):
result = self.handler._get_submit_enter_delay_sec()
self.assertEqual(result, 0.20)
def test_whitespace_only_returns_default(self):
with patch.dict(os.environ, {"AMC_SUBMIT_ENTER_DELAY_MS": " "}, clear=True):
result = self.handler._get_submit_enter_delay_sec()
self.assertEqual(result, 0.20)
def test_negative_value_returns_zero(self):
with patch.dict(os.environ, {"AMC_SUBMIT_ENTER_DELAY_MS": "-100"}, clear=True):
result = self.handler._get_submit_enter_delay_sec()
self.assertEqual(result, 0.0)
def test_value_over_2000_clamped(self):
with patch.dict(os.environ, {"AMC_SUBMIT_ENTER_DELAY_MS": "5000"}, clear=True):
result = self.handler._get_submit_enter_delay_sec()
self.assertEqual(result, 2.0) # 2000ms = 2.0s
def test_valid_ms_converted_to_seconds(self):
with patch.dict(os.environ, {"AMC_SUBMIT_ENTER_DELAY_MS": "500"}, clear=True):
result = self.handler._get_submit_enter_delay_sec()
self.assertEqual(result, 0.5)
def test_float_value_works(self):
with patch.dict(os.environ, {"AMC_SUBMIT_ENTER_DELAY_MS": "150.5"}, clear=True):
result = self.handler._get_submit_enter_delay_sec()
self.assertAlmostEqual(result, 0.1505)
def test_non_numeric_returns_default(self):
with patch.dict(os.environ, {"AMC_SUBMIT_ENTER_DELAY_MS": "fast"}, clear=True):
result = self.handler._get_submit_enter_delay_sec()
self.assertEqual(result, 0.20)
class TestAllowUnsafeWriteCharsFallback(unittest.TestCase):
"""Tests for _allow_unsafe_write_chars_fallback edge cases."""
def setUp(self):
self.handler = DummyControlHandler()
def test_unset_returns_false(self):
with patch.dict(os.environ, {}, clear=True):
self.assertFalse(self.handler._allow_unsafe_write_chars_fallback())
def test_empty_returns_false(self):
with patch.dict(os.environ, {"AMC_ALLOW_UNSAFE_WRITE_CHARS_FALLBACK": ""}, clear=True):
self.assertFalse(self.handler._allow_unsafe_write_chars_fallback())
def test_one_returns_true(self):
with patch.dict(os.environ, {"AMC_ALLOW_UNSAFE_WRITE_CHARS_FALLBACK": "1"}, clear=True):
self.assertTrue(self.handler._allow_unsafe_write_chars_fallback())
def test_true_returns_true(self):
with patch.dict(os.environ, {"AMC_ALLOW_UNSAFE_WRITE_CHARS_FALLBACK": "true"}, clear=True):
self.assertTrue(self.handler._allow_unsafe_write_chars_fallback())
def test_yes_returns_true(self):
with patch.dict(os.environ, {"AMC_ALLOW_UNSAFE_WRITE_CHARS_FALLBACK": "yes"}, clear=True):
self.assertTrue(self.handler._allow_unsafe_write_chars_fallback())
def test_on_returns_true(self):
with patch.dict(os.environ, {"AMC_ALLOW_UNSAFE_WRITE_CHARS_FALLBACK": "on"}, clear=True):
self.assertTrue(self.handler._allow_unsafe_write_chars_fallback())
def test_case_insensitive(self):
with patch.dict(os.environ, {"AMC_ALLOW_UNSAFE_WRITE_CHARS_FALLBACK": "TRUE"}, clear=True):
self.assertTrue(self.handler._allow_unsafe_write_chars_fallback())
def test_random_string_returns_false(self):
with patch.dict(os.environ, {"AMC_ALLOW_UNSAFE_WRITE_CHARS_FALLBACK": "maybe"}, clear=True):
self.assertFalse(self.handler._allow_unsafe_write_chars_fallback())
class TestDismissSession(unittest.TestCase):
"""Tests for _dismiss_session edge cases."""
def test_deletes_existing_session_file(self):
with tempfile.TemporaryDirectory() as tmpdir:
sessions_dir = Path(tmpdir)
sessions_dir.mkdir(exist_ok=True)
session_file = sessions_dir / "abc123.json"
session_file.write_text('{"session_id": "abc123"}')
handler = DummyControlHandler()
with patch.object(control, "SESSIONS_DIR", sessions_dir):
handler._dismiss_session("abc123")
self.assertFalse(session_file.exists())
self.assertEqual(handler.sent, [(200, {"ok": True})])
def test_handles_missing_file_gracefully(self):
with tempfile.TemporaryDirectory() as tmpdir:
sessions_dir = Path(tmpdir)
handler = DummyControlHandler()
with patch.object(control, "SESSIONS_DIR", sessions_dir):
handler._dismiss_session("nonexistent")
# Should still return success
self.assertEqual(handler.sent, [(200, {"ok": True})])
def test_path_traversal_sanitized(self):
with tempfile.TemporaryDirectory() as tmpdir:
sessions_dir = Path(tmpdir)
sessions_dir.mkdir(exist_ok=True)
# Create a file that should NOT be deleted
secret_file = Path(tmpdir).parent / "secret.json"
handler = DummyControlHandler()
with patch.object(control, "SESSIONS_DIR", sessions_dir):
handler._dismiss_session("../secret")
# Secret file should not have been targeted
# (if it existed, it would still exist)
def test_tracks_dismissed_codex_session(self):
from amc_server.context import _dismissed_codex_ids
_dismissed_codex_ids.clear()
with tempfile.TemporaryDirectory() as tmpdir:
sessions_dir = Path(tmpdir)
handler = DummyControlHandler()
with patch.object(control, "SESSIONS_DIR", sessions_dir):
handler._dismiss_session("codex-session-123")
self.assertIn("codex-session-123", _dismissed_codex_ids)
_dismissed_codex_ids.clear()
class TestTryWriteCharsInject(unittest.TestCase):
"""Tests for _try_write_chars_inject edge cases."""
def setUp(self):
self.handler = DummyControlHandler()
def test_successful_write_without_enter(self):
completed = subprocess.CompletedProcess(args=[], returncode=0, stdout="", stderr="")
with patch.object(control, "ZELLIJ_BIN", "/usr/bin/zellij"), \
patch("amc_server.mixins.control.subprocess.run", return_value=completed) as run_mock:
result = self.handler._try_write_chars_inject({}, "infra", "hello", send_enter=False)
self.assertEqual(result, {"ok": True})
# Should only be called once (no Enter)
self.assertEqual(run_mock.call_count, 1)
def test_successful_write_with_enter(self):
completed = subprocess.CompletedProcess(args=[], returncode=0, stdout="", stderr="")
with patch.object(control, "ZELLIJ_BIN", "/usr/bin/zellij"), \
patch("amc_server.mixins.control.subprocess.run", return_value=completed) as run_mock:
result = self.handler._try_write_chars_inject({}, "infra", "hello", send_enter=True)
self.assertEqual(result, {"ok": True})
# Should be called twice (write-chars + write Enter)
self.assertEqual(run_mock.call_count, 2)
def test_write_chars_failure_returns_error(self):
failed = subprocess.CompletedProcess(args=[], returncode=1, stdout="", stderr="write failed")
with patch.object(control, "ZELLIJ_BIN", "/usr/bin/zellij"), \
patch("amc_server.mixins.control.subprocess.run", return_value=failed):
result = self.handler._try_write_chars_inject({}, "infra", "hello", send_enter=False)
self.assertFalse(result["ok"])
self.assertIn("write", result["error"].lower())
def test_timeout_returns_error(self):
with patch.object(control, "ZELLIJ_BIN", "/usr/bin/zellij"), \
patch("amc_server.mixins.control.subprocess.run",
side_effect=subprocess.TimeoutExpired("cmd", 2)):
result = self.handler._try_write_chars_inject({}, "infra", "hello", send_enter=False)
self.assertFalse(result["ok"])
self.assertIn("timed out", result["error"].lower())
def test_zellij_not_found_returns_error(self):
with patch.object(control, "ZELLIJ_BIN", "/nonexistent/zellij"), \
patch("amc_server.mixins.control.subprocess.run",
side_effect=FileNotFoundError("No such file")):
result = self.handler._try_write_chars_inject({}, "infra", "hello", send_enter=False)
self.assertFalse(result["ok"])
self.assertIn("not found", result["error"].lower())
class TestRespondToSessionEdgeCases(unittest.TestCase):
"""Additional edge case tests for _respond_to_session."""
def _write_session(self, sessions_dir, session_id, **kwargs):
sessions_dir.mkdir(parents=True, exist_ok=True)
session_file = sessions_dir / f"{session_id}.json"
data = {"session_id": session_id, **kwargs}
session_file.write_text(json.dumps(data))
def test_invalid_json_body_returns_400(self):
handler = DummyControlHandler.__new__(DummyControlHandler)
handler.headers = {"Content-Length": "10"}
handler.rfile = io.BytesIO(b"not json!!")
handler.sent = []
handler.errors = []
with tempfile.TemporaryDirectory() as tmpdir:
with patch.object(control, "SESSIONS_DIR", Path(tmpdir)):
handler._respond_to_session("test")
self.assertEqual(handler.errors, [(400, "Invalid JSON body")])
def test_non_dict_body_returns_400(self):
raw = b'"just a string"'
handler = DummyControlHandler.__new__(DummyControlHandler)
handler.headers = {"Content-Length": str(len(raw))}
handler.rfile = io.BytesIO(raw)
handler.sent = []
handler.errors = []
with tempfile.TemporaryDirectory() as tmpdir:
with patch.object(control, "SESSIONS_DIR", Path(tmpdir)):
handler._respond_to_session("test")
self.assertEqual(handler.errors, [(400, "Invalid JSON body")])
def test_empty_text_returns_400(self):
handler = DummyControlHandler({"text": ""})
with tempfile.TemporaryDirectory() as tmpdir:
sessions_dir = Path(tmpdir)
self._write_session(sessions_dir, "test", zellij_session="s", zellij_pane="1")
with patch.object(control, "SESSIONS_DIR", sessions_dir):
handler._respond_to_session("test")
self.assertEqual(handler.errors, [(400, "Missing or empty 'text' field")])
def test_whitespace_only_text_returns_400(self):
handler = DummyControlHandler({"text": " \n\t "})
with tempfile.TemporaryDirectory() as tmpdir:
sessions_dir = Path(tmpdir)
self._write_session(sessions_dir, "test", zellij_session="s", zellij_pane="1")
with patch.object(control, "SESSIONS_DIR", sessions_dir):
handler._respond_to_session("test")
self.assertEqual(handler.errors, [(400, "Missing or empty 'text' field")])
def test_non_string_text_returns_400(self):
handler = DummyControlHandler({"text": 123})
with tempfile.TemporaryDirectory() as tmpdir:
sessions_dir = Path(tmpdir)
self._write_session(sessions_dir, "test", zellij_session="s", zellij_pane="1")
with patch.object(control, "SESSIONS_DIR", sessions_dir):
handler._respond_to_session("test")
self.assertEqual(handler.errors, [(400, "Missing or empty 'text' field")])
def test_missing_zellij_session_returns_400(self):
handler = DummyControlHandler({"text": "hello"})
with tempfile.TemporaryDirectory() as tmpdir:
sessions_dir = Path(tmpdir)
self._write_session(sessions_dir, "test", zellij_session="", zellij_pane="1")
with patch.object(control, "SESSIONS_DIR", sessions_dir):
handler._respond_to_session("test")
self.assertIn("missing Zellij pane info", handler.errors[0][1])
def test_missing_zellij_pane_returns_400(self):
handler = DummyControlHandler({"text": "hello"})
with tempfile.TemporaryDirectory() as tmpdir:
sessions_dir = Path(tmpdir)
self._write_session(sessions_dir, "test", zellij_session="sess", zellij_pane="")
with patch.object(control, "SESSIONS_DIR", sessions_dir):
handler._respond_to_session("test")
self.assertIn("missing Zellij pane info", handler.errors[0][1])
def test_invalid_pane_format_returns_400(self):
handler = DummyControlHandler({"text": "hello"})
with tempfile.TemporaryDirectory() as tmpdir:
sessions_dir = Path(tmpdir)
self._write_session(sessions_dir, "test", zellij_session="sess", zellij_pane="invalid_format_here")
with patch.object(control, "SESSIONS_DIR", sessions_dir):
handler._respond_to_session("test")
self.assertIn("Invalid pane format", handler.errors[0][1])
def test_invalid_option_count_treated_as_zero(self):
# optionCount that can't be parsed as int should default to 0
handler = DummyControlHandler({"text": "hello", "freeform": True, "optionCount": "not a number"})
with tempfile.TemporaryDirectory() as tmpdir:
sessions_dir = Path(tmpdir)
self._write_session(sessions_dir, "test", zellij_session="sess", zellij_pane="5")
with patch.object(control, "SESSIONS_DIR", sessions_dir):
handler._inject_text_then_enter = MagicMock(return_value={"ok": True})
handler._respond_to_session("test")
# With optionCount=0, freeform mode shouldn't trigger the "other" selection
# It should go straight to inject_text_then_enter
handler._inject_text_then_enter.assert_called_once()
if __name__ == "__main__":
unittest.main()

482
tests/test_conversation.py Normal file
View File

@@ -0,0 +1,482 @@
"""Tests for mixins/conversation.py edge cases.
Unit tests for conversation parsing from Claude Code and Codex JSONL files.
"""
import json
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch, MagicMock
import io
from amc_server.mixins.conversation import ConversationMixin
from amc_server.mixins.parsing import SessionParsingMixin
class DummyConversationHandler(ConversationMixin, SessionParsingMixin):
"""Minimal handler for testing conversation mixin."""
def __init__(self):
self.sent_responses = []
def _send_json(self, code, payload):
self.sent_responses.append((code, payload))
class TestParseCodexArguments(unittest.TestCase):
"""Tests for _parse_codex_arguments edge cases."""
def setUp(self):
self.handler = DummyConversationHandler()
def test_dict_input_returned_as_is(self):
result = self.handler._parse_codex_arguments({"key": "value"})
self.assertEqual(result, {"key": "value"})
def test_empty_dict_returned_as_is(self):
result = self.handler._parse_codex_arguments({})
self.assertEqual(result, {})
def test_json_string_parsed(self):
result = self.handler._parse_codex_arguments('{"key": "value"}')
self.assertEqual(result, {"key": "value"})
def test_invalid_json_string_returns_raw(self):
result = self.handler._parse_codex_arguments("not valid json")
self.assertEqual(result, {"raw": "not valid json"})
def test_empty_string_returns_raw(self):
result = self.handler._parse_codex_arguments("")
self.assertEqual(result, {"raw": ""})
def test_none_returns_empty_dict(self):
result = self.handler._parse_codex_arguments(None)
self.assertEqual(result, {})
def test_int_returns_empty_dict(self):
result = self.handler._parse_codex_arguments(42)
self.assertEqual(result, {})
def test_list_returns_empty_dict(self):
result = self.handler._parse_codex_arguments([1, 2, 3])
self.assertEqual(result, {})
class TestServeEvents(unittest.TestCase):
"""Tests for _serve_events edge cases."""
def setUp(self):
self.handler = DummyConversationHandler()
def test_path_traversal_sanitized(self):
with tempfile.TemporaryDirectory() as tmpdir:
events_dir = Path(tmpdir)
# Create a file that path traversal might try to access
secret_file = Path(tmpdir).parent / "secret.jsonl"
with patch("amc_server.mixins.conversation.EVENTS_DIR", events_dir):
# Try path traversal
self.handler._serve_events("../secret")
# Should have served response with sanitized id
self.assertEqual(len(self.handler.sent_responses), 1)
code, payload = self.handler.sent_responses[0]
self.assertEqual(code, 200)
self.assertEqual(payload["session_id"], "secret")
self.assertEqual(payload["events"], [])
def test_nonexistent_file_returns_empty_events(self):
with tempfile.TemporaryDirectory() as tmpdir:
with patch("amc_server.mixins.conversation.EVENTS_DIR", Path(tmpdir)):
self.handler._serve_events("nonexistent")
code, payload = self.handler.sent_responses[0]
self.assertEqual(payload["events"], [])
def test_empty_file_returns_empty_events(self):
with tempfile.TemporaryDirectory() as tmpdir:
events_dir = Path(tmpdir)
event_file = events_dir / "session123.jsonl"
event_file.write_text("")
with patch("amc_server.mixins.conversation.EVENTS_DIR", events_dir):
self.handler._serve_events("session123")
code, payload = self.handler.sent_responses[0]
self.assertEqual(payload["events"], [])
def test_invalid_json_lines_skipped(self):
with tempfile.TemporaryDirectory() as tmpdir:
events_dir = Path(tmpdir)
event_file = events_dir / "session123.jsonl"
event_file.write_text('{"valid": "event"}\nnot json\n{"another": "event"}\n')
with patch("amc_server.mixins.conversation.EVENTS_DIR", events_dir):
self.handler._serve_events("session123")
code, payload = self.handler.sent_responses[0]
self.assertEqual(len(payload["events"]), 2)
self.assertEqual(payload["events"][0], {"valid": "event"})
self.assertEqual(payload["events"][1], {"another": "event"})
class TestParseClaudeConversation(unittest.TestCase):
"""Tests for _parse_claude_conversation edge cases."""
def setUp(self):
self.handler = DummyConversationHandler()
def test_user_message_with_array_content_skipped(self):
# Array content is tool results, not human messages
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write(json.dumps({
"type": "user",
"message": {"content": [{"type": "tool_result"}]}
}) + "\n")
path = Path(f.name)
try:
with patch.object(self.handler, "_get_claude_conversation_file", return_value=path):
messages = self.handler._parse_claude_conversation("session123", "/project")
self.assertEqual(messages, [])
finally:
path.unlink()
def test_user_message_with_string_content_included(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write(json.dumps({
"type": "user",
"timestamp": "2024-01-01T00:00:00Z",
"message": {"content": "Hello, Claude!"}
}) + "\n")
path = Path(f.name)
try:
with patch.object(self.handler, "_get_claude_conversation_file", return_value=path):
messages = self.handler._parse_claude_conversation("session123", "/project")
self.assertEqual(len(messages), 1)
self.assertEqual(messages[0]["role"], "user")
self.assertEqual(messages[0]["content"], "Hello, Claude!")
finally:
path.unlink()
def test_assistant_message_with_text_parts(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write(json.dumps({
"type": "assistant",
"timestamp": "2024-01-01T00:00:00Z",
"message": {
"content": [
{"type": "text", "text": "Part 1"},
{"type": "text", "text": "Part 2"},
]
}
}) + "\n")
path = Path(f.name)
try:
with patch.object(self.handler, "_get_claude_conversation_file", return_value=path):
messages = self.handler._parse_claude_conversation("session123", "/project")
self.assertEqual(len(messages), 1)
self.assertEqual(messages[0]["content"], "Part 1\nPart 2")
finally:
path.unlink()
def test_assistant_message_with_tool_use(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write(json.dumps({
"type": "assistant",
"message": {
"content": [
{"type": "tool_use", "name": "Read", "input": {"file_path": "/test"}},
]
}
}) + "\n")
path = Path(f.name)
try:
with patch.object(self.handler, "_get_claude_conversation_file", return_value=path):
messages = self.handler._parse_claude_conversation("session123", "/project")
self.assertEqual(len(messages), 1)
self.assertEqual(messages[0]["tool_calls"][0]["name"], "Read")
self.assertEqual(messages[0]["tool_calls"][0]["input"]["file_path"], "/test")
finally:
path.unlink()
def test_assistant_message_with_thinking(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write(json.dumps({
"type": "assistant",
"message": {
"content": [
{"type": "thinking", "thinking": "Let me consider..."},
{"type": "text", "text": "Here's my answer"},
]
}
}) + "\n")
path = Path(f.name)
try:
with patch.object(self.handler, "_get_claude_conversation_file", return_value=path):
messages = self.handler._parse_claude_conversation("session123", "/project")
self.assertEqual(len(messages), 1)
self.assertEqual(messages[0]["thinking"], "Let me consider...")
self.assertEqual(messages[0]["content"], "Here's my answer")
finally:
path.unlink()
def test_assistant_message_content_as_string_parts(self):
# Some entries might have string content parts instead of dicts
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write(json.dumps({
"type": "assistant",
"message": {
"content": ["plain string", {"type": "text", "text": "structured"}]
}
}) + "\n")
path = Path(f.name)
try:
with patch.object(self.handler, "_get_claude_conversation_file", return_value=path):
messages = self.handler._parse_claude_conversation("session123", "/project")
self.assertEqual(messages[0]["content"], "plain string\nstructured")
finally:
path.unlink()
def test_missing_conversation_file_returns_empty(self):
with patch.object(self.handler, "_get_claude_conversation_file", return_value=None):
messages = self.handler._parse_claude_conversation("session123", "/project")
self.assertEqual(messages, [])
def test_non_dict_entry_skipped(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write('"just a string"\n')
f.write('123\n')
f.write('{"type": "user", "message": {"content": "valid"}}\n')
path = Path(f.name)
try:
with patch.object(self.handler, "_get_claude_conversation_file", return_value=path):
messages = self.handler._parse_claude_conversation("session123", "/project")
self.assertEqual(len(messages), 1)
finally:
path.unlink()
def test_non_list_content_in_assistant_skipped(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write(json.dumps({
"type": "assistant",
"message": {"content": "not a list"}
}) + "\n")
path = Path(f.name)
try:
with patch.object(self.handler, "_get_claude_conversation_file", return_value=path):
messages = self.handler._parse_claude_conversation("session123", "/project")
self.assertEqual(messages, [])
finally:
path.unlink()
class TestParseCodexConversation(unittest.TestCase):
"""Tests for _parse_codex_conversation edge cases."""
def setUp(self):
self.handler = DummyConversationHandler()
def test_developer_role_skipped(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write(json.dumps({
"type": "response_item",
"payload": {
"type": "message",
"role": "developer",
"content": [{"text": "System instructions"}]
}
}) + "\n")
path = Path(f.name)
try:
with patch.object(self.handler, "_find_codex_transcript_file", return_value=path):
messages = self.handler._parse_codex_conversation("session123")
self.assertEqual(messages, [])
finally:
path.unlink()
def test_injected_context_skipped(self):
skip_prefixes = [
"<INSTRUCTIONS>",
"<environment_context>",
"<permissions instructions>",
"# AGENTS.md instructions",
]
for prefix in skip_prefixes:
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write(json.dumps({
"type": "response_item",
"payload": {
"type": "message",
"role": "user",
"content": [{"text": f"{prefix} more content here"}]
}
}) + "\n")
path = Path(f.name)
try:
with patch.object(self.handler, "_find_codex_transcript_file", return_value=path):
messages = self.handler._parse_codex_conversation("session123")
self.assertEqual(messages, [], f"Should skip content starting with {prefix}")
finally:
path.unlink()
def test_function_call_accumulated_to_next_assistant(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
# Tool call
f.write(json.dumps({
"type": "response_item",
"payload": {
"type": "function_call",
"name": "shell",
"arguments": '{"command": "ls"}'
}
}) + "\n")
# Assistant message
f.write(json.dumps({
"type": "response_item",
"payload": {
"type": "message",
"role": "assistant",
"content": [{"text": "Here are the files"}]
}
}) + "\n")
path = Path(f.name)
try:
with patch.object(self.handler, "_find_codex_transcript_file", return_value=path):
messages = self.handler._parse_codex_conversation("session123")
self.assertEqual(len(messages), 1)
self.assertEqual(messages[0]["tool_calls"][0]["name"], "shell")
self.assertEqual(messages[0]["content"], "Here are the files")
finally:
path.unlink()
def test_function_calls_flushed_before_user_message(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
# Tool call
f.write(json.dumps({
"type": "response_item",
"payload": {"type": "function_call", "name": "tool1", "arguments": "{}"}
}) + "\n")
# User message (tool calls should be flushed first)
f.write(json.dumps({
"type": "response_item",
"payload": {
"type": "message",
"role": "user",
"content": [{"text": "User response"}]
}
}) + "\n")
path = Path(f.name)
try:
with patch.object(self.handler, "_find_codex_transcript_file", return_value=path):
messages = self.handler._parse_codex_conversation("session123")
# First message should be assistant with tool_calls (flushed)
# Second should be user
self.assertEqual(len(messages), 2)
self.assertEqual(messages[0]["role"], "assistant")
self.assertEqual(messages[0]["tool_calls"][0]["name"], "tool1")
self.assertEqual(messages[1]["role"], "user")
finally:
path.unlink()
def test_reasoning_creates_thinking_message(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write(json.dumps({
"type": "response_item",
"payload": {
"type": "reasoning",
"summary": [
{"type": "summary_text", "text": "Let me think..."},
{"type": "summary_text", "text": "I'll try this approach."},
]
}
}) + "\n")
path = Path(f.name)
try:
with patch.object(self.handler, "_find_codex_transcript_file", return_value=path):
messages = self.handler._parse_codex_conversation("session123")
self.assertEqual(len(messages), 1)
self.assertEqual(messages[0]["thinking"], "Let me think...\nI'll try this approach.")
finally:
path.unlink()
def test_pending_tool_calls_flushed_at_end(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
# Tool call with no following message
f.write(json.dumps({
"type": "response_item",
"payload": {"type": "function_call", "name": "final_tool", "arguments": "{}"}
}) + "\n")
path = Path(f.name)
try:
with patch.object(self.handler, "_find_codex_transcript_file", return_value=path):
messages = self.handler._parse_codex_conversation("session123")
# Should flush pending tool calls at end
self.assertEqual(len(messages), 1)
self.assertEqual(messages[0]["tool_calls"][0]["name"], "final_tool")
finally:
path.unlink()
def test_non_response_item_types_skipped(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write('{"type": "session_meta"}\n')
f.write('{"type": "event_msg"}\n')
f.write(json.dumps({
"type": "response_item",
"payload": {"type": "message", "role": "user", "content": [{"text": "Hello"}]}
}) + "\n")
path = Path(f.name)
try:
with patch.object(self.handler, "_find_codex_transcript_file", return_value=path):
messages = self.handler._parse_codex_conversation("session123")
self.assertEqual(len(messages), 1)
finally:
path.unlink()
def test_missing_transcript_file_returns_empty(self):
with patch.object(self.handler, "_find_codex_transcript_file", return_value=None):
messages = self.handler._parse_codex_conversation("session123")
self.assertEqual(messages, [])
class TestServeConversation(unittest.TestCase):
"""Tests for _serve_conversation routing."""
def setUp(self):
self.handler = DummyConversationHandler()
def test_routes_to_codex_parser(self):
with patch.object(self.handler, "_parse_codex_conversation", return_value=[]) as mock:
self.handler._serve_conversation("session123", "/project", agent="codex")
mock.assert_called_once_with("session123")
def test_routes_to_claude_parser_by_default(self):
with patch.object(self.handler, "_parse_claude_conversation", return_value=[]) as mock:
self.handler._serve_conversation("session123", "/project")
mock.assert_called_once_with("session123", "/project")
def test_sanitizes_session_id(self):
with patch.object(self.handler, "_parse_claude_conversation", return_value=[]):
self.handler._serve_conversation("../../../etc/passwd", "/project")
code, payload = self.handler.sent_responses[0]
self.assertEqual(payload["session_id"], "passwd")
if __name__ == "__main__":
unittest.main()

361
tests/test_discovery.py Normal file
View File

@@ -0,0 +1,361 @@
"""Tests for mixins/discovery.py edge cases.
Unit tests for Codex session discovery and pane matching.
"""
import json
import os
import subprocess
import tempfile
import time
import unittest
from pathlib import Path
from unittest.mock import patch, MagicMock
from amc_server.mixins.discovery import SessionDiscoveryMixin
from amc_server.mixins.parsing import SessionParsingMixin
class DummyDiscoveryHandler(SessionDiscoveryMixin, SessionParsingMixin):
"""Minimal handler for testing discovery mixin."""
pass
class TestGetCodexPaneInfo(unittest.TestCase):
"""Tests for _get_codex_pane_info edge cases."""
def setUp(self):
self.handler = DummyDiscoveryHandler()
# Clear cache before each test
from amc_server.context import _codex_pane_cache
_codex_pane_cache["expires"] = 0
_codex_pane_cache["pid_info"] = {}
_codex_pane_cache["cwd_map"] = {}
def test_pgrep_failure_returns_empty(self):
failed = subprocess.CompletedProcess(args=[], returncode=1, stdout="", stderr="")
with patch("amc_server.mixins.discovery.subprocess.run", return_value=failed):
pid_info, cwd_map = self.handler._get_codex_pane_info()
self.assertEqual(pid_info, {})
self.assertEqual(cwd_map, {})
def test_no_codex_processes_returns_empty(self):
no_results = subprocess.CompletedProcess(args=[], returncode=0, stdout="", stderr="")
with patch("amc_server.mixins.discovery.subprocess.run", return_value=no_results):
pid_info, cwd_map = self.handler._get_codex_pane_info()
self.assertEqual(pid_info, {})
self.assertEqual(cwd_map, {})
def test_extracts_zellij_env_vars(self):
pgrep_result = subprocess.CompletedProcess(args=[], returncode=0, stdout="12345\n", stderr="")
ps_result = subprocess.CompletedProcess(
args=[], returncode=0,
stdout="codex ZELLIJ_PANE_ID=7 ZELLIJ_SESSION_NAME=myproject",
stderr=""
)
lsof_result = subprocess.CompletedProcess(
args=[], returncode=0,
stdout="p12345\nn/Users/test/project",
stderr=""
)
def mock_run(args, **kwargs):
if args[0] == "pgrep":
return pgrep_result
elif args[0] == "ps":
return ps_result
elif args[0] == "lsof":
return lsof_result
return subprocess.CompletedProcess(args=[], returncode=1, stdout="", stderr="")
with patch("amc_server.mixins.discovery.subprocess.run", side_effect=mock_run):
pid_info, cwd_map = self.handler._get_codex_pane_info()
self.assertIn("12345", pid_info)
self.assertEqual(pid_info["12345"]["pane_id"], "7")
self.assertEqual(pid_info["12345"]["zellij_session"], "myproject")
def test_cache_used_when_fresh(self):
from amc_server.context import _codex_pane_cache
_codex_pane_cache["pid_info"] = {"cached": {"pane_id": "1", "zellij_session": "s"}}
_codex_pane_cache["cwd_map"] = {"/cached/path": {"session": "s", "pane_id": "1"}}
_codex_pane_cache["expires"] = time.time() + 100
# Should not call subprocess
with patch("amc_server.mixins.discovery.subprocess.run") as mock_run:
pid_info, cwd_map = self.handler._get_codex_pane_info()
mock_run.assert_not_called()
self.assertEqual(pid_info, {"cached": {"pane_id": "1", "zellij_session": "s"}})
def test_timeout_handled_gracefully(self):
with patch("amc_server.mixins.discovery.subprocess.run",
side_effect=subprocess.TimeoutExpired("cmd", 2)):
pid_info, cwd_map = self.handler._get_codex_pane_info()
self.assertEqual(pid_info, {})
self.assertEqual(cwd_map, {})
class TestMatchCodexSessionToPane(unittest.TestCase):
"""Tests for _match_codex_session_to_pane edge cases."""
def setUp(self):
self.handler = DummyDiscoveryHandler()
def test_lsof_match_found(self):
"""When lsof finds a PID with the session file open, use that match."""
pid_info = {
"12345": {"pane_id": "7", "zellij_session": "project"},
}
cwd_map = {}
lsof_result = subprocess.CompletedProcess(
args=[], returncode=0, stdout="12345\n", stderr=""
)
with patch("amc_server.mixins.discovery.subprocess.run", return_value=lsof_result):
session, pane = self.handler._match_codex_session_to_pane(
Path("/some/session.jsonl"), "/project", pid_info, cwd_map
)
self.assertEqual(session, "project")
self.assertEqual(pane, "7")
def test_cwd_fallback_when_lsof_fails(self):
"""When lsof doesn't find a match, fall back to CWD matching."""
pid_info = {}
cwd_map = {
"/home/user/project": {"session": "myproject", "pane_id": "3"},
}
lsof_result = subprocess.CompletedProcess(
args=[], returncode=1, stdout="", stderr=""
)
with patch("amc_server.mixins.discovery.subprocess.run", return_value=lsof_result):
session, pane = self.handler._match_codex_session_to_pane(
Path("/some/session.jsonl"), "/home/user/project", pid_info, cwd_map
)
self.assertEqual(session, "myproject")
self.assertEqual(pane, "3")
def test_no_match_returns_empty_strings(self):
pid_info = {}
cwd_map = {}
lsof_result = subprocess.CompletedProcess(
args=[], returncode=1, stdout="", stderr=""
)
with patch("amc_server.mixins.discovery.subprocess.run", return_value=lsof_result):
session, pane = self.handler._match_codex_session_to_pane(
Path("/some/session.jsonl"), "/unmatched/path", pid_info, cwd_map
)
self.assertEqual(session, "")
self.assertEqual(pane, "")
def test_cwd_normalized_for_matching(self):
"""CWD paths should be normalized for comparison."""
pid_info = {}
cwd_map = {
"/home/user/project": {"session": "proj", "pane_id": "1"},
}
lsof_result = subprocess.CompletedProcess(
args=[], returncode=1, stdout="", stderr=""
)
with patch("amc_server.mixins.discovery.subprocess.run", return_value=lsof_result):
# Session CWD has trailing slash and extra dots
session, pane = self.handler._match_codex_session_to_pane(
Path("/some/session.jsonl"), "/home/user/./project/", pid_info, cwd_map
)
self.assertEqual(session, "proj")
def test_empty_session_cwd_no_match(self):
pid_info = {}
cwd_map = {"/some/path": {"session": "s", "pane_id": "1"}}
lsof_result = subprocess.CompletedProcess(
args=[], returncode=1, stdout="", stderr=""
)
with patch("amc_server.mixins.discovery.subprocess.run", return_value=lsof_result):
session, pane = self.handler._match_codex_session_to_pane(
Path("/some/session.jsonl"), "", pid_info, cwd_map
)
self.assertEqual(session, "")
self.assertEqual(pane, "")
class TestDiscoverActiveCodexSessions(unittest.TestCase):
"""Tests for _discover_active_codex_sessions edge cases."""
def setUp(self):
self.handler = DummyDiscoveryHandler()
# Clear caches
from amc_server.context import _codex_transcript_cache, _dismissed_codex_ids
_codex_transcript_cache.clear()
_dismissed_codex_ids.clear()
def test_skips_when_codex_sessions_dir_missing(self):
with patch("amc_server.mixins.discovery.CODEX_SESSIONS_DIR", Path("/nonexistent")):
# Should not raise
self.handler._discover_active_codex_sessions()
def test_skips_old_files(self):
"""Files older than CODEX_ACTIVE_WINDOW should be skipped."""
with tempfile.TemporaryDirectory() as tmpdir:
codex_dir = Path(tmpdir)
sessions_dir = Path(tmpdir) / "sessions"
sessions_dir.mkdir()
# Create an old transcript file
old_file = codex_dir / "old-12345678-1234-1234-1234-123456789abc.jsonl"
old_file.write_text('{"type": "session_meta", "payload": {"cwd": "/test"}}\n')
# Set mtime to 2 hours ago
old_time = time.time() - 7200
os.utime(old_file, (old_time, old_time))
with patch("amc_server.mixins.discovery.CODEX_SESSIONS_DIR", codex_dir), \
patch("amc_server.mixins.discovery.SESSIONS_DIR", sessions_dir):
self.handler._get_codex_pane_info = MagicMock(return_value=({}, {}))
self.handler._discover_active_codex_sessions()
# Should not have created a session file
self.assertEqual(list(sessions_dir.glob("*.json")), [])
def test_skips_dismissed_sessions(self):
"""Sessions in _dismissed_codex_ids should be skipped."""
from amc_server.context import _dismissed_codex_ids
with tempfile.TemporaryDirectory() as tmpdir:
codex_dir = Path(tmpdir)
sessions_dir = Path(tmpdir) / "sessions"
sessions_dir.mkdir()
# Create a recent transcript file
session_id = "12345678-1234-1234-1234-123456789abc"
transcript = codex_dir / f"session-{session_id}.jsonl"
transcript.write_text('{"type": "session_meta", "payload": {"cwd": "/test"}}\n')
# Mark as dismissed
_dismissed_codex_ids[session_id] = True
with patch("amc_server.mixins.discovery.CODEX_SESSIONS_DIR", codex_dir), \
patch("amc_server.mixins.discovery.SESSIONS_DIR", sessions_dir):
self.handler._get_codex_pane_info = MagicMock(return_value=({}, {}))
self.handler._discover_active_codex_sessions()
# Should not have created a session file
self.assertEqual(list(sessions_dir.glob("*.json")), [])
def test_skips_non_uuid_filenames(self):
"""Files without a UUID in the name should be skipped."""
with tempfile.TemporaryDirectory() as tmpdir:
codex_dir = Path(tmpdir)
sessions_dir = Path(tmpdir) / "sessions"
sessions_dir.mkdir()
# Create a file without a UUID
no_uuid = codex_dir / "random-name.jsonl"
no_uuid.write_text('{"type": "session_meta", "payload": {"cwd": "/test"}}\n')
with patch("amc_server.mixins.discovery.CODEX_SESSIONS_DIR", codex_dir), \
patch("amc_server.mixins.discovery.SESSIONS_DIR", sessions_dir):
self.handler._get_codex_pane_info = MagicMock(return_value=({}, {}))
self.handler._discover_active_codex_sessions()
self.assertEqual(list(sessions_dir.glob("*.json")), [])
def test_skips_non_session_meta_first_line(self):
"""Files without session_meta as first line should be skipped."""
with tempfile.TemporaryDirectory() as tmpdir:
codex_dir = Path(tmpdir)
sessions_dir = Path(tmpdir) / "sessions"
sessions_dir.mkdir()
session_id = "12345678-1234-1234-1234-123456789abc"
transcript = codex_dir / f"session-{session_id}.jsonl"
# First line is not session_meta
transcript.write_text('{"type": "response_item", "payload": {}}\n')
with patch("amc_server.mixins.discovery.CODEX_SESSIONS_DIR", codex_dir), \
patch("amc_server.mixins.discovery.SESSIONS_DIR", sessions_dir):
self.handler._get_codex_pane_info = MagicMock(return_value=({}, {}))
self.handler._discover_active_codex_sessions()
self.assertEqual(list(sessions_dir.glob("*.json")), [])
def test_creates_session_file_for_valid_transcript(self):
"""Valid recent transcripts should create session files."""
with tempfile.TemporaryDirectory() as tmpdir:
codex_dir = Path(tmpdir)
sessions_dir = Path(tmpdir) / "sessions"
sessions_dir.mkdir()
session_id = "12345678-1234-1234-1234-123456789abc"
transcript = codex_dir / f"session-{session_id}.jsonl"
transcript.write_text(json.dumps({
"type": "session_meta",
"payload": {"cwd": "/test/project", "timestamp": "2024-01-01T00:00:00Z"}
}) + "\n")
with patch("amc_server.mixins.discovery.CODEX_SESSIONS_DIR", codex_dir), \
patch("amc_server.mixins.discovery.SESSIONS_DIR", sessions_dir):
self.handler._get_codex_pane_info = MagicMock(return_value=({}, {}))
self.handler._match_codex_session_to_pane = MagicMock(return_value=("proj", "5"))
self.handler._get_cached_context_usage = MagicMock(return_value=None)
self.handler._discover_active_codex_sessions()
session_file = sessions_dir / f"{session_id}.json"
self.assertTrue(session_file.exists())
data = json.loads(session_file.read_text())
self.assertEqual(data["session_id"], session_id)
self.assertEqual(data["agent"], "codex")
self.assertEqual(data["project"], "project")
self.assertEqual(data["zellij_session"], "proj")
self.assertEqual(data["zellij_pane"], "5")
def test_determines_status_by_file_age(self):
"""Recent files should be 'active', older ones 'done'."""
with tempfile.TemporaryDirectory() as tmpdir:
codex_dir = Path(tmpdir)
sessions_dir = Path(tmpdir) / "sessions"
sessions_dir.mkdir()
session_id = "12345678-1234-1234-1234-123456789abc"
transcript = codex_dir / f"session-{session_id}.jsonl"
transcript.write_text(json.dumps({
"type": "session_meta",
"payload": {"cwd": "/test"}
}) + "\n")
# Set mtime to 3 minutes ago (> 2 min threshold)
old_time = time.time() - 180
os.utime(transcript, (old_time, old_time))
with patch("amc_server.mixins.discovery.CODEX_SESSIONS_DIR", codex_dir), \
patch("amc_server.mixins.discovery.SESSIONS_DIR", sessions_dir):
self.handler._get_codex_pane_info = MagicMock(return_value=({}, {}))
self.handler._match_codex_session_to_pane = MagicMock(return_value=("", ""))
self.handler._get_cached_context_usage = MagicMock(return_value=None)
self.handler._discover_active_codex_sessions()
session_file = sessions_dir / f"{session_id}.json"
data = json.loads(session_file.read_text())
self.assertEqual(data["status"], "done")
if __name__ == "__main__":
unittest.main()

475
tests/test_hook.py Normal file
View File

@@ -0,0 +1,475 @@
"""Tests for bin/amc-hook functions.
These are unit tests for the pure functions in the hook script.
Edge cases are prioritized over happy paths.
"""
import json
import os
import sys
import tempfile
import types
import unittest
from pathlib import Path
from unittest.mock import patch
# Import the hook module (no .py extension, so use compile+exec pattern)
hook_path = Path(__file__).parent.parent / "bin" / "amc-hook"
amc_hook = types.ModuleType("amc_hook")
amc_hook.__file__ = str(hook_path)
# Load module code - this is safe, we're loading our own source file
code = compile(hook_path.read_text(), hook_path, "exec")
exec(code, amc_hook.__dict__) # noqa: S102 - loading local module
class TestDetectProseQuestion(unittest.TestCase):
"""Tests for _detect_prose_question edge cases."""
def test_none_input_returns_none(self):
self.assertIsNone(amc_hook._detect_prose_question(None))
def test_empty_string_returns_none(self):
self.assertIsNone(amc_hook._detect_prose_question(""))
def test_whitespace_only_returns_none(self):
self.assertIsNone(amc_hook._detect_prose_question(" \n\t "))
def test_no_question_mark_returns_none(self):
self.assertIsNone(amc_hook._detect_prose_question("This is a statement."))
def test_question_mark_in_middle_not_at_end_returns_none(self):
# Question mark exists but message doesn't END with one
self.assertIsNone(amc_hook._detect_prose_question("What? I said hello."))
def test_trailing_whitespace_after_question_still_detects(self):
result = amc_hook._detect_prose_question("Is this a question? \n\t")
self.assertEqual(result, "Is this a question?")
def test_question_in_last_paragraph_only(self):
msg = "First paragraph here.\n\nSecond paragraph is the question?"
result = amc_hook._detect_prose_question(msg)
self.assertEqual(result, "Second paragraph is the question?")
def test_multiple_paragraphs_question_not_in_last_returns_none(self):
# Question in first paragraph, statement in last
msg = "Is this a question?\n\nNo, this is the last paragraph."
self.assertIsNone(amc_hook._detect_prose_question(msg))
def test_truncates_long_question_to_max_length(self):
long_question = "x" * 600 + "?"
result = amc_hook._detect_prose_question(long_question)
self.assertLessEqual(len(result), amc_hook.MAX_QUESTION_LEN + 1) # +1 for ?
def test_long_question_tries_sentence_boundary(self):
# Create a message longer than MAX_QUESTION_LEN (500) with a sentence boundary
# The truncation takes the LAST MAX_QUESTION_LEN chars, then finds FIRST ". " within that
prefix = "a" * 500 + ". Sentence start. "
suffix = "Is this the question?"
msg = prefix + suffix
self.assertGreater(len(msg), amc_hook.MAX_QUESTION_LEN)
result = amc_hook._detect_prose_question(msg)
# Code finds FIRST ". " in truncated portion, so starts at "Sentence start"
self.assertTrue(
result.startswith("Sentence start"),
f"Expected to start with 'Sentence start', got: {result[:50]}"
)
def test_long_question_no_sentence_boundary_truncates_from_end(self):
# No period in the long text
long_msg = "a" * 600 + "?"
result = amc_hook._detect_prose_question(long_msg)
self.assertTrue(result.endswith("?"))
self.assertLessEqual(len(result), amc_hook.MAX_QUESTION_LEN + 1)
def test_single_character_question(self):
result = amc_hook._detect_prose_question("?")
self.assertEqual(result, "?")
def test_newlines_within_last_paragraph_preserved(self):
msg = "Intro.\n\nLine one\nLine two?"
result = amc_hook._detect_prose_question(msg)
self.assertIn("\n", result)
class TestExtractQuestions(unittest.TestCase):
"""Tests for _extract_questions edge cases."""
def test_empty_hook_returns_empty_list(self):
self.assertEqual(amc_hook._extract_questions({}), [])
def test_missing_tool_input_returns_empty_list(self):
self.assertEqual(amc_hook._extract_questions({"other": "data"}), [])
def test_tool_input_is_none_returns_empty_list(self):
self.assertEqual(amc_hook._extract_questions({"tool_input": None}), [])
def test_tool_input_is_list_returns_empty_list(self):
# tool_input should be dict, not list
self.assertEqual(amc_hook._extract_questions({"tool_input": []}), [])
def test_tool_input_is_string_json_parsed(self):
tool_input = json.dumps({"questions": [{"question": "Test?", "options": []}]})
result = amc_hook._extract_questions({"tool_input": tool_input})
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["question"], "Test?")
def test_tool_input_invalid_json_string_returns_empty(self):
result = amc_hook._extract_questions({"tool_input": "not valid json"})
self.assertEqual(result, [])
def test_questions_key_is_none_returns_empty(self):
result = amc_hook._extract_questions({"tool_input": {"questions": None}})
self.assertEqual(result, [])
def test_questions_key_missing_returns_empty(self):
result = amc_hook._extract_questions({"tool_input": {"other": "data"}})
self.assertEqual(result, [])
def test_option_without_markdown_excluded_from_output(self):
hook = {
"tool_input": {
"questions": [{
"question": "Pick one",
"options": [{"label": "A", "description": "Desc A"}],
}]
}
}
result = amc_hook._extract_questions(hook)
self.assertNotIn("markdown", result[0]["options"][0])
def test_option_with_markdown_included(self):
hook = {
"tool_input": {
"questions": [{
"question": "Pick one",
"options": [{"label": "A", "description": "Desc", "markdown": "```code```"}],
}]
}
}
result = amc_hook._extract_questions(hook)
self.assertEqual(result[0]["options"][0]["markdown"], "```code```")
def test_missing_question_fields_default_to_empty(self):
hook = {"tool_input": {"questions": [{}]}}
result = amc_hook._extract_questions(hook)
self.assertEqual(result[0]["question"], "")
self.assertEqual(result[0]["header"], "")
self.assertEqual(result[0]["options"], [])
def test_option_missing_fields_default_to_empty(self):
hook = {"tool_input": {"questions": [{"options": [{}]}]}}
result = amc_hook._extract_questions(hook)
self.assertEqual(result[0]["options"][0]["label"], "")
self.assertEqual(result[0]["options"][0]["description"], "")
class TestAtomicWrite(unittest.TestCase):
"""Tests for _atomic_write edge cases."""
def test_writes_to_nonexistent_file(self):
with tempfile.TemporaryDirectory() as tmpdir:
path = Path(tmpdir) / "new_file.json"
amc_hook._atomic_write(path, {"key": "value"})
self.assertTrue(path.exists())
self.assertEqual(json.loads(path.read_text()), {"key": "value"})
def test_overwrites_existing_file(self):
with tempfile.TemporaryDirectory() as tmpdir:
path = Path(tmpdir) / "existing.json"
path.write_text('{"old": "data"}')
amc_hook._atomic_write(path, {"new": "data"})
self.assertEqual(json.loads(path.read_text()), {"new": "data"})
def test_cleans_up_temp_file_on_replace_failure(self):
with tempfile.TemporaryDirectory() as tmpdir:
path = Path(tmpdir) / "subdir" / "file.json"
# Parent doesn't exist, so mkstemp will fail
with self.assertRaises(FileNotFoundError):
amc_hook._atomic_write(path, {"data": "test"})
def test_no_partial_writes_on_failure(self):
with tempfile.TemporaryDirectory() as tmpdir:
path = Path(tmpdir) / "file.json"
path.write_text('{"original": "data"}')
# Mock os.replace to fail after the temp file is written
original_replace = os.replace
def failing_replace(src, dst):
raise PermissionError("Simulated failure")
with patch("os.replace", side_effect=failing_replace):
with self.assertRaises(PermissionError):
amc_hook._atomic_write(path, {"new": "data"})
# Original file should be unchanged
self.assertEqual(json.loads(path.read_text()), {"original": "data"})
class TestReadSession(unittest.TestCase):
"""Tests for _read_session edge cases."""
def test_nonexistent_file_returns_empty_dict(self):
result = amc_hook._read_session(Path("/nonexistent/path/file.json"))
self.assertEqual(result, {})
def test_empty_file_returns_empty_dict(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
f.write("")
path = Path(f.name)
try:
result = amc_hook._read_session(path)
self.assertEqual(result, {})
finally:
path.unlink()
def test_invalid_json_returns_empty_dict(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
f.write("not valid json {{{")
path = Path(f.name)
try:
result = amc_hook._read_session(path)
self.assertEqual(result, {})
finally:
path.unlink()
def test_valid_json_returned(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump({"session_id": "abc"}, f)
path = Path(f.name)
try:
result = amc_hook._read_session(path)
self.assertEqual(result, {"session_id": "abc"})
finally:
path.unlink()
class TestAppendEvent(unittest.TestCase):
"""Tests for _append_event edge cases."""
def test_creates_file_if_missing(self):
with tempfile.TemporaryDirectory() as tmpdir:
with patch.object(amc_hook, "EVENTS_DIR", Path(tmpdir)):
amc_hook._append_event("session123", {"event": "test"})
event_file = Path(tmpdir) / "session123.jsonl"
self.assertTrue(event_file.exists())
def test_appends_to_existing_file(self):
with tempfile.TemporaryDirectory() as tmpdir:
event_file = Path(tmpdir) / "session123.jsonl"
event_file.write_text('{"event": "first"}\n')
with patch.object(amc_hook, "EVENTS_DIR", Path(tmpdir)):
amc_hook._append_event("session123", {"event": "second"})
lines = event_file.read_text().strip().split("\n")
self.assertEqual(len(lines), 2)
self.assertEqual(json.loads(lines[1])["event"], "second")
def test_oserror_silently_ignored(self):
with patch.object(amc_hook, "EVENTS_DIR", Path("/nonexistent/path")):
# Should not raise
amc_hook._append_event("session123", {"event": "test"})
class TestMainHookPathTraversal(unittest.TestCase):
"""Tests for path traversal protection in main()."""
def test_session_id_with_path_traversal_sanitized(self):
with tempfile.TemporaryDirectory() as tmpdir:
sessions_dir = Path(tmpdir) / "sessions"
events_dir = Path(tmpdir) / "events"
sessions_dir.mkdir()
events_dir.mkdir()
# Create a legitimate session file to test that traversal doesn't reach it
legit_file = Path(tmpdir) / "secret.json"
legit_file.write_text('{"secret": "data"}')
hook_input = json.dumps({
"hook_event_name": "SessionStart",
"session_id": "../secret",
"cwd": "/test/project",
})
with patch.object(amc_hook, "SESSIONS_DIR", sessions_dir), \
patch.object(amc_hook, "EVENTS_DIR", events_dir), \
patch("sys.stdin.read", return_value=hook_input):
amc_hook.main()
# The sanitized session ID should be "secret" (basename of "../secret")
# and should NOT have modified the legit_file in parent dir
self.assertEqual(json.loads(legit_file.read_text()), {"secret": "data"})
class TestMainHookEmptyInput(unittest.TestCase):
"""Tests for main() with various empty/invalid inputs."""
def test_empty_stdin_returns_silently(self):
with patch("sys.stdin.read", return_value=""):
# Should not raise
amc_hook.main()
def test_whitespace_only_stdin_returns_silently(self):
with patch("sys.stdin.read", return_value=" \n\t "):
amc_hook.main()
def test_invalid_json_stdin_fails_silently(self):
with patch("sys.stdin.read", return_value="not json"):
amc_hook.main()
def test_missing_session_id_returns_silently(self):
with patch("sys.stdin.read", return_value='{"hook_event_name": "SessionStart"}'):
amc_hook.main()
def test_missing_event_name_returns_silently(self):
with patch("sys.stdin.read", return_value='{"session_id": "abc123"}'):
amc_hook.main()
def test_empty_session_id_after_sanitization_returns_silently(self):
# Edge case: session_id that becomes empty after basename()
with patch("sys.stdin.read", return_value='{"hook_event_name": "SessionStart", "session_id": "/"}'):
amc_hook.main()
class TestMainSessionEndDeletesFile(unittest.TestCase):
"""Tests for SessionEnd hook behavior."""
def test_session_end_deletes_existing_session_file(self):
with tempfile.TemporaryDirectory() as tmpdir:
sessions_dir = Path(tmpdir) / "sessions"
events_dir = Path(tmpdir) / "events"
sessions_dir.mkdir()
events_dir.mkdir()
session_file = sessions_dir / "abc123.json"
session_file.write_text('{"session_id": "abc123"}')
hook_input = json.dumps({
"hook_event_name": "SessionEnd",
"session_id": "abc123",
})
with patch.object(amc_hook, "SESSIONS_DIR", sessions_dir), \
patch.object(amc_hook, "EVENTS_DIR", events_dir), \
patch("sys.stdin.read", return_value=hook_input):
amc_hook.main()
self.assertFalse(session_file.exists())
def test_session_end_missing_file_no_error(self):
with tempfile.TemporaryDirectory() as tmpdir:
sessions_dir = Path(tmpdir) / "sessions"
events_dir = Path(tmpdir) / "events"
sessions_dir.mkdir()
events_dir.mkdir()
hook_input = json.dumps({
"hook_event_name": "SessionEnd",
"session_id": "nonexistent",
})
with patch.object(amc_hook, "SESSIONS_DIR", sessions_dir), \
patch.object(amc_hook, "EVENTS_DIR", events_dir), \
patch("sys.stdin.read", return_value=hook_input):
# Should not raise
amc_hook.main()
class TestMainPreToolUseWithoutExistingSession(unittest.TestCase):
"""Edge case: PreToolUse arrives but session file doesn't exist."""
def test_pre_tool_use_no_existing_session_returns_silently(self):
with tempfile.TemporaryDirectory() as tmpdir:
sessions_dir = Path(tmpdir) / "sessions"
events_dir = Path(tmpdir) / "events"
sessions_dir.mkdir()
events_dir.mkdir()
hook_input = json.dumps({
"hook_event_name": "PreToolUse",
"tool_name": "AskUserQuestion",
"session_id": "nonexistent",
"tool_input": {"questions": []},
})
with patch.object(amc_hook, "SESSIONS_DIR", sessions_dir), \
patch.object(amc_hook, "EVENTS_DIR", events_dir), \
patch("sys.stdin.read", return_value=hook_input):
amc_hook.main()
# No session file should be created
self.assertFalse((sessions_dir / "nonexistent.json").exists())
class TestMainStopWithProseQuestion(unittest.TestCase):
"""Tests for Stop hook detecting prose questions."""
def test_stop_with_prose_question_sets_needs_attention(self):
with tempfile.TemporaryDirectory() as tmpdir:
sessions_dir = Path(tmpdir) / "sessions"
events_dir = Path(tmpdir) / "events"
sessions_dir.mkdir()
events_dir.mkdir()
# Create existing session
session_file = sessions_dir / "abc123.json"
session_file.write_text(json.dumps({
"session_id": "abc123",
"status": "active",
}))
hook_input = json.dumps({
"hook_event_name": "Stop",
"session_id": "abc123",
"last_assistant_message": "What do you think about this approach?",
"cwd": "/test/project",
})
with patch.object(amc_hook, "SESSIONS_DIR", sessions_dir), \
patch.object(amc_hook, "EVENTS_DIR", events_dir), \
patch("sys.stdin.read", return_value=hook_input):
amc_hook.main()
data = json.loads(session_file.read_text())
self.assertEqual(data["status"], "needs_attention")
self.assertEqual(len(data["pending_questions"]), 1)
self.assertIn("approach?", data["pending_questions"][0]["question"])
class TestMainTurnTimingAccumulation(unittest.TestCase):
"""Tests for turn timing accumulation across pause/resume cycles."""
def test_post_tool_use_accumulates_paused_time(self):
with tempfile.TemporaryDirectory() as tmpdir:
sessions_dir = Path(tmpdir) / "sessions"
events_dir = Path(tmpdir) / "events"
sessions_dir.mkdir()
events_dir.mkdir()
# Create session with existing paused state
session_file = sessions_dir / "abc123.json"
session_file.write_text(json.dumps({
"session_id": "abc123",
"status": "needs_attention",
"turn_paused_at": "2024-01-01T00:00:00+00:00",
"turn_paused_ms": 5000, # Already had 5 seconds paused
}))
hook_input = json.dumps({
"hook_event_name": "PostToolUse",
"tool_name": "AskUserQuestion",
"session_id": "abc123",
})
with patch.object(amc_hook, "SESSIONS_DIR", sessions_dir), \
patch.object(amc_hook, "EVENTS_DIR", events_dir), \
patch("sys.stdin.read", return_value=hook_input):
amc_hook.main()
data = json.loads(session_file.read_text())
# Should have accumulated more paused time
self.assertGreater(data["turn_paused_ms"], 5000)
# turn_paused_at should be removed after resuming
self.assertNotIn("turn_paused_at", data)
if __name__ == "__main__":
unittest.main()

335
tests/test_http.py Normal file
View File

@@ -0,0 +1,335 @@
"""Tests for mixins/http.py edge cases.
Unit tests for HTTP routing and response handling.
"""
import io
import json
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch, MagicMock
from amc_server.mixins.http import HttpMixin
class DummyHttpHandler(HttpMixin):
"""Minimal handler for testing HTTP mixin."""
def __init__(self):
self.response_code = None
self.headers_sent = {}
self.body_sent = b""
self.path = "/"
self.wfile = io.BytesIO()
def send_response(self, code):
self.response_code = code
def send_header(self, key, value):
self.headers_sent[key] = value
def end_headers(self):
pass
class TestSendBytesResponse(unittest.TestCase):
"""Tests for _send_bytes_response edge cases."""
def test_sends_correct_headers(self):
handler = DummyHttpHandler()
handler._send_bytes_response(200, b"test", content_type="text/plain")
self.assertEqual(handler.response_code, 200)
self.assertEqual(handler.headers_sent["Content-Type"], "text/plain")
self.assertEqual(handler.headers_sent["Content-Length"], "4")
def test_includes_extra_headers(self):
handler = DummyHttpHandler()
handler._send_bytes_response(
200, b"test",
extra_headers={"X-Custom": "value", "Cache-Control": "no-cache"}
)
self.assertEqual(handler.headers_sent["X-Custom"], "value")
self.assertEqual(handler.headers_sent["Cache-Control"], "no-cache")
def test_broken_pipe_returns_false(self):
handler = DummyHttpHandler()
handler.wfile.write = MagicMock(side_effect=BrokenPipeError())
result = handler._send_bytes_response(200, b"test")
self.assertFalse(result)
def test_connection_reset_returns_false(self):
handler = DummyHttpHandler()
handler.wfile.write = MagicMock(side_effect=ConnectionResetError())
result = handler._send_bytes_response(200, b"test")
self.assertFalse(result)
def test_os_error_returns_false(self):
handler = DummyHttpHandler()
handler.wfile.write = MagicMock(side_effect=OSError("write error"))
result = handler._send_bytes_response(200, b"test")
self.assertFalse(result)
class TestSendJson(unittest.TestCase):
"""Tests for _send_json edge cases."""
def test_includes_cors_header(self):
handler = DummyHttpHandler()
handler._send_json(200, {"key": "value"})
self.assertEqual(handler.headers_sent["Access-Control-Allow-Origin"], "*")
def test_sets_json_content_type(self):
handler = DummyHttpHandler()
handler._send_json(200, {"key": "value"})
self.assertEqual(handler.headers_sent["Content-Type"], "application/json")
def test_encodes_payload_as_json(self):
handler = DummyHttpHandler()
handler._send_json(200, {"key": "value"})
written = handler.wfile.getvalue()
self.assertEqual(json.loads(written), {"key": "value"})
class TestServeDashboardFile(unittest.TestCase):
"""Tests for _serve_dashboard_file edge cases."""
def test_nonexistent_file_returns_404(self):
handler = DummyHttpHandler()
handler.errors = []
def capture_error(code, message):
handler.errors.append((code, message))
handler._json_error = capture_error
with tempfile.TemporaryDirectory() as tmpdir:
with patch("amc_server.mixins.http.DASHBOARD_DIR", Path(tmpdir)):
handler._serve_dashboard_file("nonexistent.html")
self.assertEqual(len(handler.errors), 1)
self.assertEqual(handler.errors[0][0], 404)
def test_path_traversal_blocked(self):
handler = DummyHttpHandler()
handler.errors = []
def capture_error(code, message):
handler.errors.append((code, message))
handler._json_error = capture_error
with tempfile.TemporaryDirectory() as tmpdir:
# Create a file outside the dashboard dir that shouldn't be accessible
secret = Path(tmpdir).parent / "secret.txt"
with patch("amc_server.mixins.http.DASHBOARD_DIR", Path(tmpdir)):
handler._serve_dashboard_file("../secret.txt")
self.assertEqual(len(handler.errors), 1)
self.assertEqual(handler.errors[0][0], 403)
def test_correct_content_type_for_html(self):
handler = DummyHttpHandler()
with tempfile.TemporaryDirectory() as tmpdir:
html_file = Path(tmpdir) / "test.html"
html_file.write_text("<html></html>")
with patch("amc_server.mixins.http.DASHBOARD_DIR", Path(tmpdir)):
handler._serve_dashboard_file("test.html")
self.assertEqual(handler.headers_sent["Content-Type"], "text/html; charset=utf-8")
def test_correct_content_type_for_css(self):
handler = DummyHttpHandler()
with tempfile.TemporaryDirectory() as tmpdir:
css_file = Path(tmpdir) / "styles.css"
css_file.write_text("body {}")
with patch("amc_server.mixins.http.DASHBOARD_DIR", Path(tmpdir)):
handler._serve_dashboard_file("styles.css")
self.assertEqual(handler.headers_sent["Content-Type"], "text/css; charset=utf-8")
def test_correct_content_type_for_js(self):
handler = DummyHttpHandler()
with tempfile.TemporaryDirectory() as tmpdir:
js_file = Path(tmpdir) / "app.js"
js_file.write_text("console.log('hello')")
with patch("amc_server.mixins.http.DASHBOARD_DIR", Path(tmpdir)):
handler._serve_dashboard_file("app.js")
self.assertEqual(handler.headers_sent["Content-Type"], "application/javascript; charset=utf-8")
def test_unknown_extension_gets_octet_stream(self):
handler = DummyHttpHandler()
with tempfile.TemporaryDirectory() as tmpdir:
unknown_file = Path(tmpdir) / "data.xyz"
unknown_file.write_bytes(b"\x00\x01\x02")
with patch("amc_server.mixins.http.DASHBOARD_DIR", Path(tmpdir)):
handler._serve_dashboard_file("data.xyz")
self.assertEqual(handler.headers_sent["Content-Type"], "application/octet-stream")
def test_no_cache_headers_set(self):
handler = DummyHttpHandler()
with tempfile.TemporaryDirectory() as tmpdir:
html_file = Path(tmpdir) / "test.html"
html_file.write_text("<html></html>")
with patch("amc_server.mixins.http.DASHBOARD_DIR", Path(tmpdir)):
handler._serve_dashboard_file("test.html")
self.assertIn("no-cache", handler.headers_sent.get("Cache-Control", ""))
class TestDoGet(unittest.TestCase):
"""Tests for do_GET routing edge cases."""
def _make_handler(self, path):
handler = DummyHttpHandler()
handler.path = path
handler._serve_dashboard_file = MagicMock()
handler._serve_state = MagicMock()
handler._serve_stream = MagicMock()
handler._serve_events = MagicMock()
handler._serve_conversation = MagicMock()
handler._json_error = MagicMock()
return handler
def test_root_serves_index(self):
handler = self._make_handler("/")
handler.do_GET()
handler._serve_dashboard_file.assert_called_with("index.html")
def test_index_html_serves_index(self):
handler = self._make_handler("/index.html")
handler.do_GET()
handler._serve_dashboard_file.assert_called_with("index.html")
def test_static_file_served(self):
handler = self._make_handler("/components/App.js")
handler.do_GET()
handler._serve_dashboard_file.assert_called_with("components/App.js")
def test_path_traversal_in_static_blocked(self):
handler = self._make_handler("/../../etc/passwd")
handler.do_GET()
handler._json_error.assert_called_with(404, "Not Found")
def test_api_state_routed(self):
handler = self._make_handler("/api/state")
handler.do_GET()
handler._serve_state.assert_called_once()
def test_api_stream_routed(self):
handler = self._make_handler("/api/stream")
handler.do_GET()
handler._serve_stream.assert_called_once()
def test_api_events_routed_with_id(self):
handler = self._make_handler("/api/events/session-123")
handler.do_GET()
handler._serve_events.assert_called_with("session-123")
def test_api_events_url_decoded(self):
handler = self._make_handler("/api/events/session%20with%20spaces")
handler.do_GET()
handler._serve_events.assert_called_with("session with spaces")
def test_api_conversation_with_query_params(self):
handler = self._make_handler("/api/conversation/sess123?project_dir=/test&agent=codex")
handler.do_GET()
handler._serve_conversation.assert_called_with("sess123", "/test", "codex")
def test_api_conversation_defaults_to_claude(self):
handler = self._make_handler("/api/conversation/sess123")
handler.do_GET()
handler._serve_conversation.assert_called_with("sess123", "", "claude")
def test_unknown_api_path_returns_404(self):
handler = self._make_handler("/api/unknown")
handler.do_GET()
handler._json_error.assert_called_with(404, "Not Found")
class TestDoPost(unittest.TestCase):
"""Tests for do_POST routing edge cases."""
def _make_handler(self, path):
handler = DummyHttpHandler()
handler.path = path
handler._dismiss_dead_sessions = MagicMock()
handler._dismiss_session = MagicMock()
handler._respond_to_session = MagicMock()
handler._json_error = MagicMock()
return handler
def test_dismiss_dead_routed(self):
handler = self._make_handler("/api/dismiss-dead")
handler.do_POST()
handler._dismiss_dead_sessions.assert_called_once()
def test_dismiss_session_routed(self):
handler = self._make_handler("/api/dismiss/session-abc")
handler.do_POST()
handler._dismiss_session.assert_called_with("session-abc")
def test_dismiss_url_decoded(self):
handler = self._make_handler("/api/dismiss/session%2Fwith%2Fslash")
handler.do_POST()
handler._dismiss_session.assert_called_with("session/with/slash")
def test_respond_routed(self):
handler = self._make_handler("/api/respond/session-xyz")
handler.do_POST()
handler._respond_to_session.assert_called_with("session-xyz")
def test_unknown_post_path_returns_404(self):
handler = self._make_handler("/api/unknown")
handler.do_POST()
handler._json_error.assert_called_with(404, "Not Found")
class TestDoOptions(unittest.TestCase):
"""Tests for do_OPTIONS CORS preflight."""
def test_returns_204_with_cors_headers(self):
handler = DummyHttpHandler()
handler.do_OPTIONS()
self.assertEqual(handler.response_code, 204)
self.assertEqual(handler.headers_sent["Access-Control-Allow-Origin"], "*")
self.assertIn("POST", handler.headers_sent["Access-Control-Allow-Methods"])
self.assertIn("Content-Type", handler.headers_sent["Access-Control-Allow-Headers"])
class TestJsonError(unittest.TestCase):
"""Tests for _json_error helper."""
def test_sends_json_with_error(self):
handler = DummyHttpHandler()
handler._json_error(404, "Not Found")
written = handler.wfile.getvalue()
payload = json.loads(written)
self.assertEqual(payload, {"ok": False, "error": "Not Found"})
if __name__ == "__main__":
unittest.main()

635
tests/test_parsing.py Normal file
View File

@@ -0,0 +1,635 @@
"""Tests for mixins/parsing.py edge cases.
Unit tests for parsing helper functions and conversation file resolution.
"""
import json
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch, MagicMock
from amc_server.mixins.parsing import SessionParsingMixin
class DummyParsingHandler(SessionParsingMixin):
"""Minimal handler for testing parsing mixin."""
pass
class TestToInt(unittest.TestCase):
"""Tests for _to_int edge cases."""
def setUp(self):
self.handler = DummyParsingHandler()
def test_none_returns_none(self):
self.assertIsNone(self.handler._to_int(None))
def test_bool_true_returns_none(self):
# Booleans are technically ints in Python, but we don't want to convert them
self.assertIsNone(self.handler._to_int(True))
def test_bool_false_returns_none(self):
self.assertIsNone(self.handler._to_int(False))
def test_int_returns_int(self):
self.assertEqual(self.handler._to_int(42), 42)
def test_negative_int_returns_int(self):
self.assertEqual(self.handler._to_int(-10), -10)
def test_zero_returns_zero(self):
self.assertEqual(self.handler._to_int(0), 0)
def test_float_truncates_to_int(self):
self.assertEqual(self.handler._to_int(3.7), 3)
def test_negative_float_truncates(self):
self.assertEqual(self.handler._to_int(-2.9), -2)
def test_string_int_parses(self):
self.assertEqual(self.handler._to_int("123"), 123)
def test_string_negative_parses(self):
self.assertEqual(self.handler._to_int("-456"), -456)
def test_string_with_whitespace_fails(self):
# Python's int() handles whitespace, but let's verify
self.assertEqual(self.handler._to_int(" 42 "), 42)
def test_string_float_fails(self):
# "3.14" can't be parsed by int()
self.assertIsNone(self.handler._to_int("3.14"))
def test_empty_string_returns_none(self):
self.assertIsNone(self.handler._to_int(""))
def test_non_numeric_string_returns_none(self):
self.assertIsNone(self.handler._to_int("abc"))
def test_list_returns_none(self):
self.assertIsNone(self.handler._to_int([1, 2, 3]))
def test_dict_returns_none(self):
self.assertIsNone(self.handler._to_int({"value": 42}))
class TestSumOptionalInts(unittest.TestCase):
"""Tests for _sum_optional_ints edge cases."""
def setUp(self):
self.handler = DummyParsingHandler()
def test_empty_list_returns_none(self):
self.assertIsNone(self.handler._sum_optional_ints([]))
def test_all_none_returns_none(self):
self.assertIsNone(self.handler._sum_optional_ints([None, None, None]))
def test_single_int_returns_that_int(self):
self.assertEqual(self.handler._sum_optional_ints([42]), 42)
def test_mixed_none_and_int_sums_ints(self):
self.assertEqual(self.handler._sum_optional_ints([None, 10, None, 20]), 30)
def test_all_ints_sums_all(self):
self.assertEqual(self.handler._sum_optional_ints([1, 2, 3, 4]), 10)
def test_includes_zero(self):
self.assertEqual(self.handler._sum_optional_ints([0, 5]), 5)
def test_negative_ints(self):
self.assertEqual(self.handler._sum_optional_ints([10, -3, 5]), 12)
def test_floats_ignored(self):
# Only integers are summed
self.assertEqual(self.handler._sum_optional_ints([10, 3.14, 5]), 15)
def test_strings_ignored(self):
self.assertEqual(self.handler._sum_optional_ints(["10", 5]), 5)
def test_only_non_ints_returns_none(self):
self.assertIsNone(self.handler._sum_optional_ints(["10", 3.14, None]))
class TestAsDict(unittest.TestCase):
"""Tests for _as_dict edge cases."""
def setUp(self):
self.handler = DummyParsingHandler()
def test_dict_returns_dict(self):
self.assertEqual(self.handler._as_dict({"key": "value"}), {"key": "value"})
def test_empty_dict_returns_empty_dict(self):
self.assertEqual(self.handler._as_dict({}), {})
def test_none_returns_empty_dict(self):
self.assertEqual(self.handler._as_dict(None), {})
def test_list_returns_empty_dict(self):
self.assertEqual(self.handler._as_dict([1, 2, 3]), {})
def test_string_returns_empty_dict(self):
self.assertEqual(self.handler._as_dict("not a dict"), {})
def test_int_returns_empty_dict(self):
self.assertEqual(self.handler._as_dict(42), {})
def test_bool_returns_empty_dict(self):
self.assertEqual(self.handler._as_dict(True), {})
class TestGetClaudeContextWindow(unittest.TestCase):
"""Tests for _get_claude_context_window edge cases."""
def setUp(self):
self.handler = DummyParsingHandler()
def test_none_model_returns_200k(self):
self.assertEqual(self.handler._get_claude_context_window(None), 200_000)
def test_empty_string_returns_200k(self):
self.assertEqual(self.handler._get_claude_context_window(""), 200_000)
def test_claude_2_returns_100k(self):
self.assertEqual(self.handler._get_claude_context_window("claude-2"), 100_000)
def test_claude_2_1_returns_100k(self):
self.assertEqual(self.handler._get_claude_context_window("claude-2.1"), 100_000)
def test_claude_3_returns_200k(self):
self.assertEqual(self.handler._get_claude_context_window("claude-3-opus-20240229"), 200_000)
def test_claude_35_returns_200k(self):
self.assertEqual(self.handler._get_claude_context_window("claude-3-5-sonnet-20241022"), 200_000)
def test_unknown_model_returns_200k(self):
self.assertEqual(self.handler._get_claude_context_window("some-future-model"), 200_000)
class TestGetClaudeConversationFile(unittest.TestCase):
"""Tests for _get_claude_conversation_file edge cases."""
def setUp(self):
self.handler = DummyParsingHandler()
def test_empty_project_dir_returns_none(self):
self.assertIsNone(self.handler._get_claude_conversation_file("session123", ""))
def test_none_project_dir_returns_none(self):
self.assertIsNone(self.handler._get_claude_conversation_file("session123", None))
def test_nonexistent_file_returns_none(self):
with tempfile.TemporaryDirectory() as tmpdir:
with patch("amc_server.mixins.parsing.CLAUDE_PROJECTS_DIR", Path(tmpdir)):
result = self.handler._get_claude_conversation_file("session123", "/some/project")
self.assertIsNone(result)
def test_existing_file_returns_path(self):
with tempfile.TemporaryDirectory() as tmpdir:
# Create the expected file structure
# project_dir "/foo/bar" becomes "-foo-bar"
encoded_dir = Path(tmpdir) / "-foo-bar"
encoded_dir.mkdir()
conv_file = encoded_dir / "session123.jsonl"
conv_file.write_text('{"type": "user"}\n')
with patch("amc_server.mixins.parsing.CLAUDE_PROJECTS_DIR", Path(tmpdir)):
result = self.handler._get_claude_conversation_file("session123", "/foo/bar")
self.assertEqual(result, conv_file)
def test_project_dir_without_leading_slash_gets_prefixed(self):
with tempfile.TemporaryDirectory() as tmpdir:
# project_dir "foo/bar" becomes "-foo-bar" (adds leading dash)
encoded_dir = Path(tmpdir) / "-foo-bar"
encoded_dir.mkdir()
conv_file = encoded_dir / "session123.jsonl"
conv_file.write_text('{"type": "user"}\n')
with patch("amc_server.mixins.parsing.CLAUDE_PROJECTS_DIR", Path(tmpdir)):
result = self.handler._get_claude_conversation_file("session123", "foo/bar")
self.assertEqual(result, conv_file)
class TestFindCodexTranscriptFile(unittest.TestCase):
"""Tests for _find_codex_transcript_file edge cases."""
def setUp(self):
self.handler = DummyParsingHandler()
def test_empty_session_id_returns_none(self):
self.assertIsNone(self.handler._find_codex_transcript_file(""))
def test_none_session_id_returns_none(self):
self.assertIsNone(self.handler._find_codex_transcript_file(None))
def test_codex_sessions_dir_missing_returns_none(self):
with patch("amc_server.mixins.parsing.CODEX_SESSIONS_DIR", Path("/nonexistent")):
# Clear cache to force discovery
from amc_server.context import _codex_transcript_cache
_codex_transcript_cache.clear()
result = self.handler._find_codex_transcript_file("abc123")
self.assertIsNone(result)
def test_cache_hit_returns_cached_path(self):
with tempfile.TemporaryDirectory() as tmpdir:
transcript_file = Path(tmpdir) / "abc123.jsonl"
transcript_file.write_text('{"type": "session_meta"}\n')
from amc_server.context import _codex_transcript_cache
_codex_transcript_cache["abc123"] = str(transcript_file)
result = self.handler._find_codex_transcript_file("abc123")
self.assertEqual(result, transcript_file)
# Clean up
_codex_transcript_cache.clear()
def test_cache_hit_with_deleted_file_returns_none(self):
from amc_server.context import _codex_transcript_cache
_codex_transcript_cache["deleted-session"] = "/nonexistent/file.jsonl"
result = self.handler._find_codex_transcript_file("deleted-session")
self.assertIsNone(result)
_codex_transcript_cache.clear()
def test_cache_hit_with_none_returns_none(self):
from amc_server.context import _codex_transcript_cache
_codex_transcript_cache["cached-none"] = None
result = self.handler._find_codex_transcript_file("cached-none")
self.assertIsNone(result)
_codex_transcript_cache.clear()
class TestReadJsonlTailEntries(unittest.TestCase):
"""Tests for _read_jsonl_tail_entries edge cases."""
def setUp(self):
self.handler = DummyParsingHandler()
def test_empty_file_returns_empty_list(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
path = Path(f.name)
try:
result = self.handler._read_jsonl_tail_entries(path)
self.assertEqual(result, [])
finally:
path.unlink()
def test_nonexistent_file_returns_empty_list(self):
result = self.handler._read_jsonl_tail_entries(Path("/nonexistent/file.jsonl"))
self.assertEqual(result, [])
def test_single_line_file(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write('{"key": "value"}\n')
path = Path(f.name)
try:
result = self.handler._read_jsonl_tail_entries(path)
self.assertEqual(result, [{"key": "value"}])
finally:
path.unlink()
def test_max_lines_limits_output(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
for i in range(100):
f.write(f'{{"n": {i}}}\n')
path = Path(f.name)
try:
result = self.handler._read_jsonl_tail_entries(path, max_lines=10)
self.assertEqual(len(result), 10)
# Should be the LAST 10 lines
self.assertEqual(result[-1], {"n": 99})
finally:
path.unlink()
def test_max_bytes_truncates_from_start(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
# Write many lines
for i in range(100):
f.write(f'{{"number": {i}}}\n')
path = Path(f.name)
try:
# Read only last 200 bytes
result = self.handler._read_jsonl_tail_entries(path, max_bytes=200)
# Should get some entries from the end
self.assertGreater(len(result), 0)
# All entries should be from near the end
for entry in result:
self.assertGreater(entry["number"], 80)
finally:
path.unlink()
def test_partial_first_line_skipped(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
# Write enough to trigger partial read
f.write('{"first": "line", "long_key": "' + "x" * 500 + '"}\n')
f.write('{"second": "line"}\n')
path = Path(f.name)
try:
# Read only last 100 bytes (will cut first line)
result = self.handler._read_jsonl_tail_entries(path, max_bytes=100)
# First line should be skipped (partial JSON)
self.assertEqual(len(result), 1)
self.assertEqual(result[0], {"second": "line"})
finally:
path.unlink()
def test_invalid_json_lines_skipped(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write('{"valid": "json"}\n')
f.write('this is not json\n')
f.write('{"another": "valid"}\n')
path = Path(f.name)
try:
result = self.handler._read_jsonl_tail_entries(path)
self.assertEqual(len(result), 2)
self.assertEqual(result[0], {"valid": "json"})
self.assertEqual(result[1], {"another": "valid"})
finally:
path.unlink()
def test_empty_lines_skipped(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write('{"first": 1}\n')
f.write('\n')
f.write('{"second": 2}\n')
path = Path(f.name)
try:
result = self.handler._read_jsonl_tail_entries(path)
self.assertEqual(len(result), 2)
finally:
path.unlink()
class TestParseClaudeContextUsageFromFile(unittest.TestCase):
"""Tests for _parse_claude_context_usage_from_file edge cases."""
def setUp(self):
self.handler = DummyParsingHandler()
def test_empty_file_returns_none(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
path = Path(f.name)
try:
result = self.handler._parse_claude_context_usage_from_file(path)
self.assertIsNone(result)
finally:
path.unlink()
def test_no_assistant_messages_returns_none(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write('{"type": "user", "message": {"content": "hello"}}\n')
path = Path(f.name)
try:
result = self.handler._parse_claude_context_usage_from_file(path)
self.assertIsNone(result)
finally:
path.unlink()
def test_assistant_without_usage_returns_none(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write('{"type": "assistant", "message": {"content": []}}\n')
path = Path(f.name)
try:
result = self.handler._parse_claude_context_usage_from_file(path)
self.assertIsNone(result)
finally:
path.unlink()
def test_extracts_usage_from_assistant_message(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write(json.dumps({
"type": "assistant",
"timestamp": "2024-01-01T00:00:00Z",
"message": {
"model": "claude-3-5-sonnet-20241022",
"usage": {
"input_tokens": 1000,
"output_tokens": 500,
"cache_read_input_tokens": 200,
"cache_creation_input_tokens": 100,
}
}
}) + "\n")
path = Path(f.name)
try:
result = self.handler._parse_claude_context_usage_from_file(path)
self.assertIsNotNone(result)
self.assertEqual(result["input_tokens"], 1000)
self.assertEqual(result["output_tokens"], 500)
self.assertEqual(result["cached_input_tokens"], 300) # 200 + 100
self.assertEqual(result["current_tokens"], 1800) # sum of all
self.assertEqual(result["window_tokens"], 200_000)
self.assertEqual(result["model"], "claude-3-5-sonnet-20241022")
finally:
path.unlink()
def test_uses_most_recent_assistant_message(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write(json.dumps({
"type": "assistant",
"message": {"usage": {"input_tokens": 100, "output_tokens": 50}}
}) + "\n")
f.write(json.dumps({
"type": "assistant",
"message": {"usage": {"input_tokens": 999, "output_tokens": 888}}
}) + "\n")
path = Path(f.name)
try:
result = self.handler._parse_claude_context_usage_from_file(path)
# Should use the last message
self.assertEqual(result["input_tokens"], 999)
self.assertEqual(result["output_tokens"], 888)
finally:
path.unlink()
def test_skips_assistant_with_no_current_tokens(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
# Last message has no usable tokens
f.write(json.dumps({
"type": "assistant",
"message": {"usage": {"input_tokens": 100, "output_tokens": 50}}
}) + "\n")
f.write(json.dumps({
"type": "assistant",
"message": {"usage": {}} # No tokens
}) + "\n")
path = Path(f.name)
try:
result = self.handler._parse_claude_context_usage_from_file(path)
# Should fall back to earlier message with valid tokens
self.assertEqual(result["input_tokens"], 100)
finally:
path.unlink()
class TestParseCodexContextUsageFromFile(unittest.TestCase):
"""Tests for _parse_codex_context_usage_from_file edge cases."""
def setUp(self):
self.handler = DummyParsingHandler()
def test_empty_file_returns_none(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
path = Path(f.name)
try:
result = self.handler._parse_codex_context_usage_from_file(path)
self.assertIsNone(result)
finally:
path.unlink()
def test_no_token_count_events_returns_none(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write('{"type": "response_item"}\n')
path = Path(f.name)
try:
result = self.handler._parse_codex_context_usage_from_file(path)
self.assertIsNone(result)
finally:
path.unlink()
def test_extracts_token_count_event(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write(json.dumps({
"type": "event_msg",
"timestamp": "2024-01-01T00:00:00Z",
"payload": {
"type": "token_count",
"info": {
"model_context_window": 128000,
"last_token_usage": {
"input_tokens": 5000,
"output_tokens": 2000,
"cached_input_tokens": 1000,
"total_tokens": 8000,
},
"total_token_usage": {
"total_tokens": 50000,
}
}
}
}) + "\n")
path = Path(f.name)
try:
result = self.handler._parse_codex_context_usage_from_file(path)
self.assertIsNotNone(result)
self.assertEqual(result["window_tokens"], 128000)
self.assertEqual(result["current_tokens"], 8000)
self.assertEqual(result["input_tokens"], 5000)
self.assertEqual(result["output_tokens"], 2000)
self.assertEqual(result["session_total_tokens"], 50000)
finally:
path.unlink()
def test_calculates_current_tokens_when_total_missing(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write(json.dumps({
"type": "event_msg",
"payload": {
"type": "token_count",
"info": {
"last_token_usage": {
"input_tokens": 100,
"output_tokens": 50,
# no total_tokens
}
}
}
}) + "\n")
path = Path(f.name)
try:
result = self.handler._parse_codex_context_usage_from_file(path)
# Should sum available tokens
self.assertEqual(result["current_tokens"], 150)
finally:
path.unlink()
class TestGetCachedContextUsage(unittest.TestCase):
"""Tests for _get_cached_context_usage edge cases."""
def setUp(self):
self.handler = DummyParsingHandler()
# Clear cache before each test
from amc_server.context import _context_usage_cache
_context_usage_cache.clear()
def test_nonexistent_file_returns_none(self):
def mock_parser(path):
return {"tokens": 100}
result = self.handler._get_cached_context_usage(
Path("/nonexistent/file.jsonl"),
mock_parser
)
self.assertIsNone(result)
def test_caches_result_by_mtime_and_size(self):
call_count = [0]
def counting_parser(path):
call_count[0] += 1
return {"tokens": 100}
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write('{"data": "test"}\n')
path = Path(f.name)
try:
# First call - should invoke parser
result1 = self.handler._get_cached_context_usage(path, counting_parser)
self.assertEqual(call_count[0], 1)
self.assertEqual(result1, {"tokens": 100})
# Second call - should use cache
result2 = self.handler._get_cached_context_usage(path, counting_parser)
self.assertEqual(call_count[0], 1) # No additional call
self.assertEqual(result2, {"tokens": 100})
finally:
path.unlink()
def test_invalidates_cache_on_mtime_change(self):
import time
call_count = [0]
def counting_parser(path):
call_count[0] += 1
return {"tokens": call_count[0] * 100}
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write('{"data": "test"}\n')
path = Path(f.name)
try:
result1 = self.handler._get_cached_context_usage(path, counting_parser)
self.assertEqual(result1, {"tokens": 100})
# Modify file to change mtime
time.sleep(0.01)
path.write_text('{"data": "modified"}\n')
result2 = self.handler._get_cached_context_usage(path, counting_parser)
self.assertEqual(call_count[0], 2) # Parser called again
self.assertEqual(result2, {"tokens": 200})
finally:
path.unlink()
def test_parser_exception_returns_none(self):
def failing_parser(path):
raise ValueError("Parse error")
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write('{"data": "test"}\n')
path = Path(f.name)
try:
result = self.handler._get_cached_context_usage(path, failing_parser)
self.assertIsNone(result)
finally:
path.unlink()
if __name__ == "__main__":
unittest.main()

View File

@@ -1,21 +1,30 @@
import json
import subprocess
import tempfile
import time
import unittest
from unittest.mock import patch
from pathlib import Path
from unittest.mock import patch, MagicMock
import amc_server.mixins.state as state_mod
from amc_server.mixins.state import StateMixin
from amc_server.mixins.parsing import SessionParsingMixin
from amc_server.mixins.discovery import SessionDiscoveryMixin
class DummyStateHandler(StateMixin):
class DummyStateHandler(StateMixin, SessionParsingMixin, SessionDiscoveryMixin):
pass
class StateMixinTests(unittest.TestCase):
def test_get_active_zellij_sessions_uses_resolved_binary_and_parses_output(self):
handler = DummyStateHandler()
class TestGetActiveZellijSessions(unittest.TestCase):
"""Tests for _get_active_zellij_sessions edge cases."""
def setUp(self):
state_mod._zellij_cache["sessions"] = None
state_mod._zellij_cache["expires"] = 0
def test_parses_output_with_metadata(self):
handler = DummyStateHandler()
completed = subprocess.CompletedProcess(
args=[],
returncode=0,
@@ -23,15 +32,389 @@ class StateMixinTests(unittest.TestCase):
stderr="",
)
with patch.object(state_mod, "ZELLIJ_BIN", "/opt/homebrew/bin/zellij"), patch(
"amc_server.mixins.state.subprocess.run", return_value=completed
) as run_mock:
with patch.object(state_mod, "ZELLIJ_BIN", "/opt/homebrew/bin/zellij"), \
patch("amc_server.mixins.state.subprocess.run", return_value=completed) as run_mock:
sessions = handler._get_active_zellij_sessions()
self.assertEqual(sessions, {"infra", "work"})
args = run_mock.call_args.args[0]
self.assertEqual(args, ["/opt/homebrew/bin/zellij", "list-sessions", "--no-formatting"])
def test_empty_output_returns_empty_set(self):
handler = DummyStateHandler()
completed = subprocess.CompletedProcess(args=[], returncode=0, stdout="", stderr="")
with patch("amc_server.mixins.state.subprocess.run", return_value=completed):
sessions = handler._get_active_zellij_sessions()
self.assertEqual(sessions, set())
def test_whitespace_only_lines_ignored(self):
handler = DummyStateHandler()
completed = subprocess.CompletedProcess(
args=[], returncode=0, stdout="session1\n \n\nsession2\n", stderr=""
)
with patch("amc_server.mixins.state.subprocess.run", return_value=completed):
sessions = handler._get_active_zellij_sessions()
self.assertEqual(sessions, {"session1", "session2"})
def test_nonzero_exit_returns_none(self):
handler = DummyStateHandler()
completed = subprocess.CompletedProcess(args=[], returncode=1, stdout="", stderr="error")
with patch("amc_server.mixins.state.subprocess.run", return_value=completed):
sessions = handler._get_active_zellij_sessions()
self.assertIsNone(sessions)
def test_timeout_returns_none(self):
handler = DummyStateHandler()
with patch("amc_server.mixins.state.subprocess.run",
side_effect=subprocess.TimeoutExpired("cmd", 2)):
sessions = handler._get_active_zellij_sessions()
self.assertIsNone(sessions)
def test_file_not_found_returns_none(self):
handler = DummyStateHandler()
with patch("amc_server.mixins.state.subprocess.run", side_effect=FileNotFoundError()):
sessions = handler._get_active_zellij_sessions()
self.assertIsNone(sessions)
def test_cache_used_when_fresh(self):
handler = DummyStateHandler()
state_mod._zellij_cache["sessions"] = {"cached"}
state_mod._zellij_cache["expires"] = time.time() + 100
with patch("amc_server.mixins.state.subprocess.run") as mock_run:
sessions = handler._get_active_zellij_sessions()
mock_run.assert_not_called()
self.assertEqual(sessions, {"cached"})
class TestIsSessionDead(unittest.TestCase):
"""Tests for _is_session_dead edge cases."""
def setUp(self):
self.handler = DummyStateHandler()
def test_starting_session_not_dead(self):
session = {"status": "starting", "agent": "claude", "zellij_session": "s"}
self.assertFalse(self.handler._is_session_dead(session, {"s"}, set()))
def test_claude_without_zellij_session_is_dead(self):
session = {"status": "active", "agent": "claude", "zellij_session": ""}
self.assertTrue(self.handler._is_session_dead(session, set(), set()))
def test_claude_with_missing_zellij_session_is_dead(self):
session = {"status": "active", "agent": "claude", "zellij_session": "deleted"}
active_zellij = {"other_session"}
self.assertTrue(self.handler._is_session_dead(session, active_zellij, set()))
def test_claude_with_active_zellij_session_not_dead(self):
session = {"status": "active", "agent": "claude", "zellij_session": "existing"}
active_zellij = {"existing", "other"}
self.assertFalse(self.handler._is_session_dead(session, active_zellij, set()))
def test_claude_unknown_zellij_status_assumes_alive(self):
# When we can't query zellij (None), assume alive to avoid false positives
session = {"status": "active", "agent": "claude", "zellij_session": "unknown"}
self.assertFalse(self.handler._is_session_dead(session, None, set()))
def test_codex_without_transcript_path_is_dead(self):
session = {"status": "active", "agent": "codex", "transcript_path": ""}
self.assertTrue(self.handler._is_session_dead(session, None, set()))
def test_codex_with_active_transcript_not_dead(self):
session = {"status": "active", "agent": "codex", "transcript_path": "/path/to/file.jsonl"}
active_files = {"/path/to/file.jsonl"}
self.assertFalse(self.handler._is_session_dead(session, None, active_files))
def test_codex_without_active_transcript_checks_lsof(self):
session = {"status": "active", "agent": "codex", "transcript_path": "/path/to/file.jsonl"}
# Simulate lsof finding the file open
with patch.object(self.handler, "_is_file_open", return_value=True):
result = self.handler._is_session_dead(session, None, set())
self.assertFalse(result)
# Simulate lsof not finding the file
with patch.object(self.handler, "_is_file_open", return_value=False):
result = self.handler._is_session_dead(session, None, set())
self.assertTrue(result)
def test_unknown_agent_assumes_alive(self):
session = {"status": "active", "agent": "unknown_agent"}
self.assertFalse(self.handler._is_session_dead(session, None, set()))
class TestIsFileOpen(unittest.TestCase):
"""Tests for _is_file_open edge cases."""
def setUp(self):
self.handler = DummyStateHandler()
def test_lsof_finds_pid_returns_true(self):
completed = subprocess.CompletedProcess(args=[], returncode=0, stdout="12345\n", stderr="")
with patch("amc_server.mixins.state.subprocess.run", return_value=completed):
result = self.handler._is_file_open("/some/file.jsonl")
self.assertTrue(result)
def test_lsof_no_result_returns_false(self):
completed = subprocess.CompletedProcess(args=[], returncode=1, stdout="", stderr="")
with patch("amc_server.mixins.state.subprocess.run", return_value=completed):
result = self.handler._is_file_open("/some/file.jsonl")
self.assertFalse(result)
def test_lsof_timeout_returns_false(self):
with patch("amc_server.mixins.state.subprocess.run",
side_effect=subprocess.TimeoutExpired("cmd", 2)):
result = self.handler._is_file_open("/some/file.jsonl")
self.assertFalse(result)
def test_lsof_not_found_returns_false(self):
with patch("amc_server.mixins.state.subprocess.run", side_effect=FileNotFoundError()):
result = self.handler._is_file_open("/some/file.jsonl")
self.assertFalse(result)
class TestCleanupStale(unittest.TestCase):
"""Tests for _cleanup_stale edge cases."""
def setUp(self):
self.handler = DummyStateHandler()
def test_removes_orphan_event_logs_older_than_24h(self):
with tempfile.TemporaryDirectory() as tmpdir:
events_dir = Path(tmpdir) / "events"
events_dir.mkdir()
sessions_dir = Path(tmpdir) / "sessions"
sessions_dir.mkdir()
# Create an orphan event log (no matching session)
orphan_log = events_dir / "orphan.jsonl"
orphan_log.write_text('{"event": "test"}\n')
# Set mtime to 25 hours ago
old_time = time.time() - (25 * 3600)
import os
os.utime(orphan_log, (old_time, old_time))
with patch.object(state_mod, "EVENTS_DIR", events_dir), \
patch.object(state_mod, "SESSIONS_DIR", sessions_dir):
self.handler._cleanup_stale([]) # No active sessions
self.assertFalse(orphan_log.exists())
def test_keeps_orphan_event_logs_younger_than_24h(self):
with tempfile.TemporaryDirectory() as tmpdir:
events_dir = Path(tmpdir) / "events"
events_dir.mkdir()
sessions_dir = Path(tmpdir) / "sessions"
sessions_dir.mkdir()
# Create a recent orphan event log
recent_log = events_dir / "recent.jsonl"
recent_log.write_text('{"event": "test"}\n')
with patch.object(state_mod, "EVENTS_DIR", events_dir), \
patch.object(state_mod, "SESSIONS_DIR", sessions_dir):
self.handler._cleanup_stale([])
self.assertTrue(recent_log.exists())
def test_keeps_event_logs_with_active_session(self):
with tempfile.TemporaryDirectory() as tmpdir:
events_dir = Path(tmpdir) / "events"
events_dir.mkdir()
sessions_dir = Path(tmpdir) / "sessions"
sessions_dir.mkdir()
# Create an old event log that HAS an active session
event_log = events_dir / "active-session.jsonl"
event_log.write_text('{"event": "test"}\n')
old_time = time.time() - (25 * 3600)
import os
os.utime(event_log, (old_time, old_time))
sessions = [{"session_id": "active-session"}]
with patch.object(state_mod, "EVENTS_DIR", events_dir), \
patch.object(state_mod, "SESSIONS_DIR", sessions_dir):
self.handler._cleanup_stale(sessions)
self.assertTrue(event_log.exists())
def test_removes_stale_starting_sessions(self):
with tempfile.TemporaryDirectory() as tmpdir:
events_dir = Path(tmpdir) / "events"
events_dir.mkdir()
sessions_dir = Path(tmpdir) / "sessions"
sessions_dir.mkdir()
# Create a stale "starting" session
stale_session = sessions_dir / "stale.json"
stale_session.write_text(json.dumps({"status": "starting"}))
# Set mtime to 2 hours ago (> 1 hour threshold)
old_time = time.time() - (2 * 3600)
import os
os.utime(stale_session, (old_time, old_time))
with patch.object(state_mod, "EVENTS_DIR", events_dir), \
patch.object(state_mod, "SESSIONS_DIR", sessions_dir):
self.handler._cleanup_stale([])
self.assertFalse(stale_session.exists())
def test_keeps_stale_active_sessions(self):
with tempfile.TemporaryDirectory() as tmpdir:
events_dir = Path(tmpdir) / "events"
events_dir.mkdir()
sessions_dir = Path(tmpdir) / "sessions"
sessions_dir.mkdir()
# Create an old "active" session (should NOT be deleted)
active_session = sessions_dir / "active.json"
active_session.write_text(json.dumps({"status": "active"}))
old_time = time.time() - (2 * 3600)
import os
os.utime(active_session, (old_time, old_time))
with patch.object(state_mod, "EVENTS_DIR", events_dir), \
patch.object(state_mod, "SESSIONS_DIR", sessions_dir):
self.handler._cleanup_stale([])
self.assertTrue(active_session.exists())
class TestCollectSessions(unittest.TestCase):
"""Tests for _collect_sessions edge cases."""
def setUp(self):
self.handler = DummyStateHandler()
def test_invalid_json_session_file_skipped(self):
with tempfile.TemporaryDirectory() as tmpdir:
sessions_dir = Path(tmpdir) / "sessions"
sessions_dir.mkdir()
events_dir = Path(tmpdir) / "events"
events_dir.mkdir()
# Create an invalid JSON file
bad_file = sessions_dir / "bad.json"
bad_file.write_text("not json")
# Create a valid session
good_file = sessions_dir / "good.json"
good_file.write_text(json.dumps({
"session_id": "good",
"agent": "claude",
"status": "active",
"last_event_at": "2024-01-01T00:00:00Z",
}))
with patch.object(state_mod, "SESSIONS_DIR", sessions_dir), \
patch.object(state_mod, "EVENTS_DIR", events_dir), \
patch.object(self.handler, "_discover_active_codex_sessions"), \
patch.object(self.handler, "_get_active_zellij_sessions", return_value=None), \
patch.object(self.handler, "_get_active_transcript_files", return_value=set()), \
patch.object(self.handler, "_get_context_usage_for_session", return_value=None), \
patch.object(self.handler, "_get_conversation_mtime", return_value=None):
sessions = self.handler._collect_sessions()
# Should only get the good session
self.assertEqual(len(sessions), 1)
self.assertEqual(sessions[0]["session_id"], "good")
def test_non_dict_session_file_skipped(self):
with tempfile.TemporaryDirectory() as tmpdir:
sessions_dir = Path(tmpdir) / "sessions"
sessions_dir.mkdir()
events_dir = Path(tmpdir) / "events"
events_dir.mkdir()
# Create a file with array instead of dict
array_file = sessions_dir / "array.json"
array_file.write_text("[1, 2, 3]")
with patch.object(state_mod, "SESSIONS_DIR", sessions_dir), \
patch.object(state_mod, "EVENTS_DIR", events_dir), \
patch.object(self.handler, "_discover_active_codex_sessions"), \
patch.object(self.handler, "_get_active_zellij_sessions", return_value=None), \
patch.object(self.handler, "_get_active_transcript_files", return_value=set()), \
patch.object(self.handler, "_get_context_usage_for_session", return_value=None), \
patch.object(self.handler, "_get_conversation_mtime", return_value=None):
sessions = self.handler._collect_sessions()
self.assertEqual(len(sessions), 0)
def test_orphan_starting_session_deleted(self):
with tempfile.TemporaryDirectory() as tmpdir:
sessions_dir = Path(tmpdir) / "sessions"
sessions_dir.mkdir()
events_dir = Path(tmpdir) / "events"
events_dir.mkdir()
# Create a starting session with a non-existent zellij session
orphan_file = sessions_dir / "orphan.json"
orphan_file.write_text(json.dumps({
"session_id": "orphan",
"status": "starting",
"zellij_session": "deleted_session",
}))
active_zellij = {"other_session"} # orphan's session not in here
with patch.object(state_mod, "SESSIONS_DIR", sessions_dir), \
patch.object(state_mod, "EVENTS_DIR", events_dir), \
patch.object(self.handler, "_discover_active_codex_sessions"), \
patch.object(self.handler, "_get_active_zellij_sessions", return_value=active_zellij), \
patch.object(self.handler, "_get_active_transcript_files", return_value=set()), \
patch.object(self.handler, "_get_context_usage_for_session", return_value=None), \
patch.object(self.handler, "_get_conversation_mtime", return_value=None):
sessions = self.handler._collect_sessions()
# Orphan should be deleted and not in results
self.assertEqual(len(sessions), 0)
self.assertFalse(orphan_file.exists())
def test_sessions_sorted_by_id(self):
with tempfile.TemporaryDirectory() as tmpdir:
sessions_dir = Path(tmpdir) / "sessions"
sessions_dir.mkdir()
events_dir = Path(tmpdir) / "events"
events_dir.mkdir()
for sid in ["zebra", "alpha", "middle"]:
(sessions_dir / f"{sid}.json").write_text(json.dumps({
"session_id": sid,
"status": "active",
"last_event_at": "2024-01-01T00:00:00Z",
}))
with patch.object(state_mod, "SESSIONS_DIR", sessions_dir), \
patch.object(state_mod, "EVENTS_DIR", events_dir), \
patch.object(self.handler, "_discover_active_codex_sessions"), \
patch.object(self.handler, "_get_active_zellij_sessions", return_value=None), \
patch.object(self.handler, "_get_active_transcript_files", return_value=set()), \
patch.object(self.handler, "_get_context_usage_for_session", return_value=None), \
patch.object(self.handler, "_get_conversation_mtime", return_value=None):
sessions = self.handler._collect_sessions()
ids = [s["session_id"] for s in sessions]
self.assertEqual(ids, ["alpha", "middle", "zebra"])
if __name__ == "__main__":
unittest.main()