Files
amc/tests/test_parsing.py
teernisse baa712ba15 refactor(dashboard): change SpawnModal from overlay modal to dropdown
Position the spawn modal directly under the 'New Agent' button without a
blur overlay. Uses click-outside dismissal and absolute positioning.
Reduces visual disruption for quick agent spawning.
2026-02-26 17:15:22 -05:00

636 lines
24 KiB
Python

"""Tests for mixins/parsing.py edge cases.
Unit tests for parsing helper functions and conversation file resolution.
"""
import json
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch
from amc_server.mixins.parsing import SessionParsingMixin
class DummyParsingHandler(SessionParsingMixin):
"""Minimal handler for testing parsing mixin."""
pass
class TestToInt(unittest.TestCase):
"""Tests for _to_int edge cases."""
def setUp(self):
self.handler = DummyParsingHandler()
def test_none_returns_none(self):
self.assertIsNone(self.handler._to_int(None))
def test_bool_true_returns_none(self):
# Booleans are technically ints in Python, but we don't want to convert them
self.assertIsNone(self.handler._to_int(True))
def test_bool_false_returns_none(self):
self.assertIsNone(self.handler._to_int(False))
def test_int_returns_int(self):
self.assertEqual(self.handler._to_int(42), 42)
def test_negative_int_returns_int(self):
self.assertEqual(self.handler._to_int(-10), -10)
def test_zero_returns_zero(self):
self.assertEqual(self.handler._to_int(0), 0)
def test_float_truncates_to_int(self):
self.assertEqual(self.handler._to_int(3.7), 3)
def test_negative_float_truncates(self):
self.assertEqual(self.handler._to_int(-2.9), -2)
def test_string_int_parses(self):
self.assertEqual(self.handler._to_int("123"), 123)
def test_string_negative_parses(self):
self.assertEqual(self.handler._to_int("-456"), -456)
def test_string_with_whitespace_fails(self):
# Python's int() handles whitespace, but let's verify
self.assertEqual(self.handler._to_int(" 42 "), 42)
def test_string_float_fails(self):
# "3.14" can't be parsed by int()
self.assertIsNone(self.handler._to_int("3.14"))
def test_empty_string_returns_none(self):
self.assertIsNone(self.handler._to_int(""))
def test_non_numeric_string_returns_none(self):
self.assertIsNone(self.handler._to_int("abc"))
def test_list_returns_none(self):
self.assertIsNone(self.handler._to_int([1, 2, 3]))
def test_dict_returns_none(self):
self.assertIsNone(self.handler._to_int({"value": 42}))
class TestSumOptionalInts(unittest.TestCase):
"""Tests for _sum_optional_ints edge cases."""
def setUp(self):
self.handler = DummyParsingHandler()
def test_empty_list_returns_none(self):
self.assertIsNone(self.handler._sum_optional_ints([]))
def test_all_none_returns_none(self):
self.assertIsNone(self.handler._sum_optional_ints([None, None, None]))
def test_single_int_returns_that_int(self):
self.assertEqual(self.handler._sum_optional_ints([42]), 42)
def test_mixed_none_and_int_sums_ints(self):
self.assertEqual(self.handler._sum_optional_ints([None, 10, None, 20]), 30)
def test_all_ints_sums_all(self):
self.assertEqual(self.handler._sum_optional_ints([1, 2, 3, 4]), 10)
def test_includes_zero(self):
self.assertEqual(self.handler._sum_optional_ints([0, 5]), 5)
def test_negative_ints(self):
self.assertEqual(self.handler._sum_optional_ints([10, -3, 5]), 12)
def test_floats_ignored(self):
# Only integers are summed
self.assertEqual(self.handler._sum_optional_ints([10, 3.14, 5]), 15)
def test_strings_ignored(self):
self.assertEqual(self.handler._sum_optional_ints(["10", 5]), 5)
def test_only_non_ints_returns_none(self):
self.assertIsNone(self.handler._sum_optional_ints(["10", 3.14, None]))
class TestAsDict(unittest.TestCase):
"""Tests for _as_dict edge cases."""
def setUp(self):
self.handler = DummyParsingHandler()
def test_dict_returns_dict(self):
self.assertEqual(self.handler._as_dict({"key": "value"}), {"key": "value"})
def test_empty_dict_returns_empty_dict(self):
self.assertEqual(self.handler._as_dict({}), {})
def test_none_returns_empty_dict(self):
self.assertEqual(self.handler._as_dict(None), {})
def test_list_returns_empty_dict(self):
self.assertEqual(self.handler._as_dict([1, 2, 3]), {})
def test_string_returns_empty_dict(self):
self.assertEqual(self.handler._as_dict("not a dict"), {})
def test_int_returns_empty_dict(self):
self.assertEqual(self.handler._as_dict(42), {})
def test_bool_returns_empty_dict(self):
self.assertEqual(self.handler._as_dict(True), {})
class TestGetClaudeContextWindow(unittest.TestCase):
"""Tests for _get_claude_context_window edge cases."""
def setUp(self):
self.handler = DummyParsingHandler()
def test_none_model_returns_200k(self):
self.assertEqual(self.handler._get_claude_context_window(None), 200_000)
def test_empty_string_returns_200k(self):
self.assertEqual(self.handler._get_claude_context_window(""), 200_000)
def test_claude_2_returns_100k(self):
self.assertEqual(self.handler._get_claude_context_window("claude-2"), 100_000)
def test_claude_2_1_returns_100k(self):
self.assertEqual(self.handler._get_claude_context_window("claude-2.1"), 100_000)
def test_claude_3_returns_200k(self):
self.assertEqual(self.handler._get_claude_context_window("claude-3-opus-20240229"), 200_000)
def test_claude_35_returns_200k(self):
self.assertEqual(self.handler._get_claude_context_window("claude-3-5-sonnet-20241022"), 200_000)
def test_unknown_model_returns_200k(self):
self.assertEqual(self.handler._get_claude_context_window("some-future-model"), 200_000)
class TestGetClaudeConversationFile(unittest.TestCase):
"""Tests for _get_claude_conversation_file edge cases."""
def setUp(self):
self.handler = DummyParsingHandler()
def test_empty_project_dir_returns_none(self):
self.assertIsNone(self.handler._get_claude_conversation_file("session123", ""))
def test_none_project_dir_returns_none(self):
self.assertIsNone(self.handler._get_claude_conversation_file("session123", None))
def test_nonexistent_file_returns_none(self):
with tempfile.TemporaryDirectory() as tmpdir:
with patch("amc_server.mixins.parsing.CLAUDE_PROJECTS_DIR", Path(tmpdir)):
result = self.handler._get_claude_conversation_file("session123", "/some/project")
self.assertIsNone(result)
def test_existing_file_returns_path(self):
with tempfile.TemporaryDirectory() as tmpdir:
# Create the expected file structure
# project_dir "/foo/bar" becomes "-foo-bar"
encoded_dir = Path(tmpdir) / "-foo-bar"
encoded_dir.mkdir()
conv_file = encoded_dir / "session123.jsonl"
conv_file.write_text('{"type": "user"}\n')
with patch("amc_server.mixins.parsing.CLAUDE_PROJECTS_DIR", Path(tmpdir)):
result = self.handler._get_claude_conversation_file("session123", "/foo/bar")
self.assertEqual(result, conv_file)
def test_project_dir_without_leading_slash_gets_prefixed(self):
with tempfile.TemporaryDirectory() as tmpdir:
# project_dir "foo/bar" becomes "-foo-bar" (adds leading dash)
encoded_dir = Path(tmpdir) / "-foo-bar"
encoded_dir.mkdir()
conv_file = encoded_dir / "session123.jsonl"
conv_file.write_text('{"type": "user"}\n')
with patch("amc_server.mixins.parsing.CLAUDE_PROJECTS_DIR", Path(tmpdir)):
result = self.handler._get_claude_conversation_file("session123", "foo/bar")
self.assertEqual(result, conv_file)
class TestFindCodexTranscriptFile(unittest.TestCase):
"""Tests for _find_codex_transcript_file edge cases."""
def setUp(self):
self.handler = DummyParsingHandler()
def test_empty_session_id_returns_none(self):
self.assertIsNone(self.handler._find_codex_transcript_file(""))
def test_none_session_id_returns_none(self):
self.assertIsNone(self.handler._find_codex_transcript_file(None))
def test_codex_sessions_dir_missing_returns_none(self):
with patch("amc_server.mixins.parsing.CODEX_SESSIONS_DIR", Path("/nonexistent")):
# Clear cache to force discovery
from amc_server.context import _codex_transcript_cache
_codex_transcript_cache.clear()
result = self.handler._find_codex_transcript_file("abc123")
self.assertIsNone(result)
def test_cache_hit_returns_cached_path(self):
with tempfile.TemporaryDirectory() as tmpdir:
transcript_file = Path(tmpdir) / "abc123.jsonl"
transcript_file.write_text('{"type": "session_meta"}\n')
from amc_server.context import _codex_transcript_cache
_codex_transcript_cache["abc123"] = str(transcript_file)
result = self.handler._find_codex_transcript_file("abc123")
self.assertEqual(result, transcript_file)
# Clean up
_codex_transcript_cache.clear()
def test_cache_hit_with_deleted_file_returns_none(self):
from amc_server.context import _codex_transcript_cache
_codex_transcript_cache["deleted-session"] = "/nonexistent/file.jsonl"
result = self.handler._find_codex_transcript_file("deleted-session")
self.assertIsNone(result)
_codex_transcript_cache.clear()
def test_cache_hit_with_none_returns_none(self):
from amc_server.context import _codex_transcript_cache
_codex_transcript_cache["cached-none"] = None
result = self.handler._find_codex_transcript_file("cached-none")
self.assertIsNone(result)
_codex_transcript_cache.clear()
class TestReadJsonlTailEntries(unittest.TestCase):
"""Tests for _read_jsonl_tail_entries edge cases."""
def setUp(self):
self.handler = DummyParsingHandler()
def test_empty_file_returns_empty_list(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
path = Path(f.name)
try:
result = self.handler._read_jsonl_tail_entries(path)
self.assertEqual(result, [])
finally:
path.unlink()
def test_nonexistent_file_returns_empty_list(self):
result = self.handler._read_jsonl_tail_entries(Path("/nonexistent/file.jsonl"))
self.assertEqual(result, [])
def test_single_line_file(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write('{"key": "value"}\n')
path = Path(f.name)
try:
result = self.handler._read_jsonl_tail_entries(path)
self.assertEqual(result, [{"key": "value"}])
finally:
path.unlink()
def test_max_lines_limits_output(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
for i in range(100):
f.write(f'{{"n": {i}}}\n')
path = Path(f.name)
try:
result = self.handler._read_jsonl_tail_entries(path, max_lines=10)
self.assertEqual(len(result), 10)
# Should be the LAST 10 lines
self.assertEqual(result[-1], {"n": 99})
finally:
path.unlink()
def test_max_bytes_truncates_from_start(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
# Write many lines
for i in range(100):
f.write(f'{{"number": {i}}}\n')
path = Path(f.name)
try:
# Read only last 200 bytes
result = self.handler._read_jsonl_tail_entries(path, max_bytes=200)
# Should get some entries from the end
self.assertGreater(len(result), 0)
# All entries should be from near the end
for entry in result:
self.assertGreater(entry["number"], 80)
finally:
path.unlink()
def test_partial_first_line_skipped(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
# Write enough to trigger partial read
f.write('{"first": "line", "long_key": "' + "x" * 500 + '"}\n')
f.write('{"second": "line"}\n')
path = Path(f.name)
try:
# Read only last 100 bytes (will cut first line)
result = self.handler._read_jsonl_tail_entries(path, max_bytes=100)
# First line should be skipped (partial JSON)
self.assertEqual(len(result), 1)
self.assertEqual(result[0], {"second": "line"})
finally:
path.unlink()
def test_invalid_json_lines_skipped(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write('{"valid": "json"}\n')
f.write('this is not json\n')
f.write('{"another": "valid"}\n')
path = Path(f.name)
try:
result = self.handler._read_jsonl_tail_entries(path)
self.assertEqual(len(result), 2)
self.assertEqual(result[0], {"valid": "json"})
self.assertEqual(result[1], {"another": "valid"})
finally:
path.unlink()
def test_empty_lines_skipped(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write('{"first": 1}\n')
f.write('\n')
f.write('{"second": 2}\n')
path = Path(f.name)
try:
result = self.handler._read_jsonl_tail_entries(path)
self.assertEqual(len(result), 2)
finally:
path.unlink()
class TestParseClaudeContextUsageFromFile(unittest.TestCase):
"""Tests for _parse_claude_context_usage_from_file edge cases."""
def setUp(self):
self.handler = DummyParsingHandler()
def test_empty_file_returns_none(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
path = Path(f.name)
try:
result = self.handler._parse_claude_context_usage_from_file(path)
self.assertIsNone(result)
finally:
path.unlink()
def test_no_assistant_messages_returns_none(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write('{"type": "user", "message": {"content": "hello"}}\n')
path = Path(f.name)
try:
result = self.handler._parse_claude_context_usage_from_file(path)
self.assertIsNone(result)
finally:
path.unlink()
def test_assistant_without_usage_returns_none(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write('{"type": "assistant", "message": {"content": []}}\n')
path = Path(f.name)
try:
result = self.handler._parse_claude_context_usage_from_file(path)
self.assertIsNone(result)
finally:
path.unlink()
def test_extracts_usage_from_assistant_message(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write(json.dumps({
"type": "assistant",
"timestamp": "2024-01-01T00:00:00Z",
"message": {
"model": "claude-3-5-sonnet-20241022",
"usage": {
"input_tokens": 1000,
"output_tokens": 500,
"cache_read_input_tokens": 200,
"cache_creation_input_tokens": 100,
}
}
}) + "\n")
path = Path(f.name)
try:
result = self.handler._parse_claude_context_usage_from_file(path)
self.assertIsNotNone(result)
self.assertEqual(result["input_tokens"], 1000)
self.assertEqual(result["output_tokens"], 500)
self.assertEqual(result["cached_input_tokens"], 300) # 200 + 100
self.assertEqual(result["current_tokens"], 1800) # sum of all
self.assertEqual(result["window_tokens"], 200_000)
self.assertEqual(result["model"], "claude-3-5-sonnet-20241022")
finally:
path.unlink()
def test_uses_most_recent_assistant_message(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write(json.dumps({
"type": "assistant",
"message": {"usage": {"input_tokens": 100, "output_tokens": 50}}
}) + "\n")
f.write(json.dumps({
"type": "assistant",
"message": {"usage": {"input_tokens": 999, "output_tokens": 888}}
}) + "\n")
path = Path(f.name)
try:
result = self.handler._parse_claude_context_usage_from_file(path)
# Should use the last message
self.assertEqual(result["input_tokens"], 999)
self.assertEqual(result["output_tokens"], 888)
finally:
path.unlink()
def test_skips_assistant_with_no_current_tokens(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
# Last message has no usable tokens
f.write(json.dumps({
"type": "assistant",
"message": {"usage": {"input_tokens": 100, "output_tokens": 50}}
}) + "\n")
f.write(json.dumps({
"type": "assistant",
"message": {"usage": {}} # No tokens
}) + "\n")
path = Path(f.name)
try:
result = self.handler._parse_claude_context_usage_from_file(path)
# Should fall back to earlier message with valid tokens
self.assertEqual(result["input_tokens"], 100)
finally:
path.unlink()
class TestParseCodexContextUsageFromFile(unittest.TestCase):
"""Tests for _parse_codex_context_usage_from_file edge cases."""
def setUp(self):
self.handler = DummyParsingHandler()
def test_empty_file_returns_none(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
path = Path(f.name)
try:
result = self.handler._parse_codex_context_usage_from_file(path)
self.assertIsNone(result)
finally:
path.unlink()
def test_no_token_count_events_returns_none(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write('{"type": "response_item"}\n')
path = Path(f.name)
try:
result = self.handler._parse_codex_context_usage_from_file(path)
self.assertIsNone(result)
finally:
path.unlink()
def test_extracts_token_count_event(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write(json.dumps({
"type": "event_msg",
"timestamp": "2024-01-01T00:00:00Z",
"payload": {
"type": "token_count",
"info": {
"model_context_window": 128000,
"last_token_usage": {
"input_tokens": 5000,
"output_tokens": 2000,
"cached_input_tokens": 1000,
"total_tokens": 8000,
},
"total_token_usage": {
"total_tokens": 50000,
}
}
}
}) + "\n")
path = Path(f.name)
try:
result = self.handler._parse_codex_context_usage_from_file(path)
self.assertIsNotNone(result)
self.assertEqual(result["window_tokens"], 128000)
self.assertEqual(result["current_tokens"], 8000)
self.assertEqual(result["input_tokens"], 5000)
self.assertEqual(result["output_tokens"], 2000)
self.assertEqual(result["session_total_tokens"], 50000)
finally:
path.unlink()
def test_calculates_current_tokens_when_total_missing(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write(json.dumps({
"type": "event_msg",
"payload": {
"type": "token_count",
"info": {
"last_token_usage": {
"input_tokens": 100,
"output_tokens": 50,
# no total_tokens
}
}
}
}) + "\n")
path = Path(f.name)
try:
result = self.handler._parse_codex_context_usage_from_file(path)
# Should sum available tokens
self.assertEqual(result["current_tokens"], 150)
finally:
path.unlink()
class TestGetCachedContextUsage(unittest.TestCase):
"""Tests for _get_cached_context_usage edge cases."""
def setUp(self):
self.handler = DummyParsingHandler()
# Clear cache before each test
from amc_server.context import _context_usage_cache
_context_usage_cache.clear()
def test_nonexistent_file_returns_none(self):
def mock_parser(path):
return {"tokens": 100}
result = self.handler._get_cached_context_usage(
Path("/nonexistent/file.jsonl"),
mock_parser
)
self.assertIsNone(result)
def test_caches_result_by_mtime_and_size(self):
call_count = [0]
def counting_parser(path):
call_count[0] += 1
return {"tokens": 100}
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write('{"data": "test"}\n')
path = Path(f.name)
try:
# First call - should invoke parser
result1 = self.handler._get_cached_context_usage(path, counting_parser)
self.assertEqual(call_count[0], 1)
self.assertEqual(result1, {"tokens": 100})
# Second call - should use cache
result2 = self.handler._get_cached_context_usage(path, counting_parser)
self.assertEqual(call_count[0], 1) # No additional call
self.assertEqual(result2, {"tokens": 100})
finally:
path.unlink()
def test_invalidates_cache_on_mtime_change(self):
import time
call_count = [0]
def counting_parser(path):
call_count[0] += 1
return {"tokens": call_count[0] * 100}
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write('{"data": "test"}\n')
path = Path(f.name)
try:
result1 = self.handler._get_cached_context_usage(path, counting_parser)
self.assertEqual(result1, {"tokens": 100})
# Modify file to change mtime
time.sleep(0.01)
path.write_text('{"data": "modified"}\n')
result2 = self.handler._get_cached_context_usage(path, counting_parser)
self.assertEqual(call_count[0], 2) # Parser called again
self.assertEqual(result2, {"tokens": 200})
finally:
path.unlink()
def test_parser_exception_returns_none(self):
def failing_parser(path):
raise ValueError("Parse error")
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write('{"data": "test"}\n')
path = Path(f.name)
try:
result = self.handler._get_cached_context_usage(path, failing_parser)
self.assertIsNone(result)
finally:
path.unlink()
if __name__ == "__main__":
unittest.main()