Split the monolithic context.py (117 lines) into five purpose-specific modules following single-responsibility principle: - config.py: Server-level constants (DATA_DIR, SESSIONS_DIR, PORT, STALE_EVENT_AGE, _state_lock) - agents.py: Agent-specific paths and caches (CLAUDE_PROJECTS_DIR, CODEX_SESSIONS_DIR, discovery caches) - auth.py: Authentication token generation/validation for spawn endpoint - spawn_config.py: Spawn feature configuration (PENDING_SPAWNS_DIR, rate limiting, projects watcher thread) - zellij.py: Zellij binary resolution and session management constants This refactoring improves: - Code navigation: Find relevant constants by domain, not alphabetically - Testing: Each module can be tested in isolation - Import clarity: Mixins import only what they need - Future maintenance: Changes to one domain don't risk breaking others All mixins updated to import from new module locations. Tests updated to use new import paths. Includes PROPOSED_CODE_FILE_REORGANIZATION_PLAN.md documenting the rationale and mapping from old to new locations. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
636 lines
24 KiB
Python
636 lines
24 KiB
Python
"""Tests for mixins/parsing.py edge cases.
|
|
|
|
Unit tests for parsing helper functions and conversation file resolution.
|
|
"""
|
|
|
|
import json
|
|
import tempfile
|
|
import unittest
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
|
|
from amc_server.mixins.parsing import SessionParsingMixin
|
|
|
|
|
|
class DummyParsingHandler(SessionParsingMixin):
|
|
"""Minimal handler for testing parsing mixin."""
|
|
pass
|
|
|
|
|
|
class TestToInt(unittest.TestCase):
|
|
"""Tests for _to_int edge cases."""
|
|
|
|
def setUp(self):
|
|
self.handler = DummyParsingHandler()
|
|
|
|
def test_none_returns_none(self):
|
|
self.assertIsNone(self.handler._to_int(None))
|
|
|
|
def test_bool_true_returns_none(self):
|
|
# Booleans are technically ints in Python, but we don't want to convert them
|
|
self.assertIsNone(self.handler._to_int(True))
|
|
|
|
def test_bool_false_returns_none(self):
|
|
self.assertIsNone(self.handler._to_int(False))
|
|
|
|
def test_int_returns_int(self):
|
|
self.assertEqual(self.handler._to_int(42), 42)
|
|
|
|
def test_negative_int_returns_int(self):
|
|
self.assertEqual(self.handler._to_int(-10), -10)
|
|
|
|
def test_zero_returns_zero(self):
|
|
self.assertEqual(self.handler._to_int(0), 0)
|
|
|
|
def test_float_truncates_to_int(self):
|
|
self.assertEqual(self.handler._to_int(3.7), 3)
|
|
|
|
def test_negative_float_truncates(self):
|
|
self.assertEqual(self.handler._to_int(-2.9), -2)
|
|
|
|
def test_string_int_parses(self):
|
|
self.assertEqual(self.handler._to_int("123"), 123)
|
|
|
|
def test_string_negative_parses(self):
|
|
self.assertEqual(self.handler._to_int("-456"), -456)
|
|
|
|
def test_string_with_whitespace_fails(self):
|
|
# Python's int() handles whitespace, but let's verify
|
|
self.assertEqual(self.handler._to_int(" 42 "), 42)
|
|
|
|
def test_string_float_fails(self):
|
|
# "3.14" can't be parsed by int()
|
|
self.assertIsNone(self.handler._to_int("3.14"))
|
|
|
|
def test_empty_string_returns_none(self):
|
|
self.assertIsNone(self.handler._to_int(""))
|
|
|
|
def test_non_numeric_string_returns_none(self):
|
|
self.assertIsNone(self.handler._to_int("abc"))
|
|
|
|
def test_list_returns_none(self):
|
|
self.assertIsNone(self.handler._to_int([1, 2, 3]))
|
|
|
|
def test_dict_returns_none(self):
|
|
self.assertIsNone(self.handler._to_int({"value": 42}))
|
|
|
|
|
|
class TestSumOptionalInts(unittest.TestCase):
|
|
"""Tests for _sum_optional_ints edge cases."""
|
|
|
|
def setUp(self):
|
|
self.handler = DummyParsingHandler()
|
|
|
|
def test_empty_list_returns_none(self):
|
|
self.assertIsNone(self.handler._sum_optional_ints([]))
|
|
|
|
def test_all_none_returns_none(self):
|
|
self.assertIsNone(self.handler._sum_optional_ints([None, None, None]))
|
|
|
|
def test_single_int_returns_that_int(self):
|
|
self.assertEqual(self.handler._sum_optional_ints([42]), 42)
|
|
|
|
def test_mixed_none_and_int_sums_ints(self):
|
|
self.assertEqual(self.handler._sum_optional_ints([None, 10, None, 20]), 30)
|
|
|
|
def test_all_ints_sums_all(self):
|
|
self.assertEqual(self.handler._sum_optional_ints([1, 2, 3, 4]), 10)
|
|
|
|
def test_includes_zero(self):
|
|
self.assertEqual(self.handler._sum_optional_ints([0, 5]), 5)
|
|
|
|
def test_negative_ints(self):
|
|
self.assertEqual(self.handler._sum_optional_ints([10, -3, 5]), 12)
|
|
|
|
def test_floats_ignored(self):
|
|
# Only integers are summed
|
|
self.assertEqual(self.handler._sum_optional_ints([10, 3.14, 5]), 15)
|
|
|
|
def test_strings_ignored(self):
|
|
self.assertEqual(self.handler._sum_optional_ints(["10", 5]), 5)
|
|
|
|
def test_only_non_ints_returns_none(self):
|
|
self.assertIsNone(self.handler._sum_optional_ints(["10", 3.14, None]))
|
|
|
|
|
|
class TestAsDict(unittest.TestCase):
|
|
"""Tests for _as_dict edge cases."""
|
|
|
|
def setUp(self):
|
|
self.handler = DummyParsingHandler()
|
|
|
|
def test_dict_returns_dict(self):
|
|
self.assertEqual(self.handler._as_dict({"key": "value"}), {"key": "value"})
|
|
|
|
def test_empty_dict_returns_empty_dict(self):
|
|
self.assertEqual(self.handler._as_dict({}), {})
|
|
|
|
def test_none_returns_empty_dict(self):
|
|
self.assertEqual(self.handler._as_dict(None), {})
|
|
|
|
def test_list_returns_empty_dict(self):
|
|
self.assertEqual(self.handler._as_dict([1, 2, 3]), {})
|
|
|
|
def test_string_returns_empty_dict(self):
|
|
self.assertEqual(self.handler._as_dict("not a dict"), {})
|
|
|
|
def test_int_returns_empty_dict(self):
|
|
self.assertEqual(self.handler._as_dict(42), {})
|
|
|
|
def test_bool_returns_empty_dict(self):
|
|
self.assertEqual(self.handler._as_dict(True), {})
|
|
|
|
|
|
class TestGetClaudeContextWindow(unittest.TestCase):
|
|
"""Tests for _get_claude_context_window edge cases."""
|
|
|
|
def setUp(self):
|
|
self.handler = DummyParsingHandler()
|
|
|
|
def test_none_model_returns_200k(self):
|
|
self.assertEqual(self.handler._get_claude_context_window(None), 200_000)
|
|
|
|
def test_empty_string_returns_200k(self):
|
|
self.assertEqual(self.handler._get_claude_context_window(""), 200_000)
|
|
|
|
def test_claude_2_returns_100k(self):
|
|
self.assertEqual(self.handler._get_claude_context_window("claude-2"), 100_000)
|
|
|
|
def test_claude_2_1_returns_100k(self):
|
|
self.assertEqual(self.handler._get_claude_context_window("claude-2.1"), 100_000)
|
|
|
|
def test_claude_3_returns_200k(self):
|
|
self.assertEqual(self.handler._get_claude_context_window("claude-3-opus-20240229"), 200_000)
|
|
|
|
def test_claude_35_returns_200k(self):
|
|
self.assertEqual(self.handler._get_claude_context_window("claude-3-5-sonnet-20241022"), 200_000)
|
|
|
|
def test_unknown_model_returns_200k(self):
|
|
self.assertEqual(self.handler._get_claude_context_window("some-future-model"), 200_000)
|
|
|
|
|
|
class TestGetClaudeConversationFile(unittest.TestCase):
|
|
"""Tests for _get_claude_conversation_file edge cases."""
|
|
|
|
def setUp(self):
|
|
self.handler = DummyParsingHandler()
|
|
|
|
def test_empty_project_dir_returns_none(self):
|
|
self.assertIsNone(self.handler._get_claude_conversation_file("session123", ""))
|
|
|
|
def test_none_project_dir_returns_none(self):
|
|
self.assertIsNone(self.handler._get_claude_conversation_file("session123", None))
|
|
|
|
def test_nonexistent_file_returns_none(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
with patch("amc_server.mixins.parsing.CLAUDE_PROJECTS_DIR", Path(tmpdir)):
|
|
result = self.handler._get_claude_conversation_file("session123", "/some/project")
|
|
self.assertIsNone(result)
|
|
|
|
def test_existing_file_returns_path(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
# Create the expected file structure
|
|
# project_dir "/foo/bar" becomes "-foo-bar"
|
|
encoded_dir = Path(tmpdir) / "-foo-bar"
|
|
encoded_dir.mkdir()
|
|
conv_file = encoded_dir / "session123.jsonl"
|
|
conv_file.write_text('{"type": "user"}\n')
|
|
|
|
with patch("amc_server.mixins.parsing.CLAUDE_PROJECTS_DIR", Path(tmpdir)):
|
|
result = self.handler._get_claude_conversation_file("session123", "/foo/bar")
|
|
self.assertEqual(result, conv_file)
|
|
|
|
def test_project_dir_without_leading_slash_gets_prefixed(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
# project_dir "foo/bar" becomes "-foo-bar" (adds leading dash)
|
|
encoded_dir = Path(tmpdir) / "-foo-bar"
|
|
encoded_dir.mkdir()
|
|
conv_file = encoded_dir / "session123.jsonl"
|
|
conv_file.write_text('{"type": "user"}\n')
|
|
|
|
with patch("amc_server.mixins.parsing.CLAUDE_PROJECTS_DIR", Path(tmpdir)):
|
|
result = self.handler._get_claude_conversation_file("session123", "foo/bar")
|
|
self.assertEqual(result, conv_file)
|
|
|
|
|
|
class TestFindCodexTranscriptFile(unittest.TestCase):
|
|
"""Tests for _find_codex_transcript_file edge cases."""
|
|
|
|
def setUp(self):
|
|
self.handler = DummyParsingHandler()
|
|
|
|
def test_empty_session_id_returns_none(self):
|
|
self.assertIsNone(self.handler._find_codex_transcript_file(""))
|
|
|
|
def test_none_session_id_returns_none(self):
|
|
self.assertIsNone(self.handler._find_codex_transcript_file(None))
|
|
|
|
def test_codex_sessions_dir_missing_returns_none(self):
|
|
with patch("amc_server.mixins.parsing.CODEX_SESSIONS_DIR", Path("/nonexistent")):
|
|
# Clear cache to force discovery
|
|
from amc_server.agents import _codex_transcript_cache
|
|
_codex_transcript_cache.clear()
|
|
result = self.handler._find_codex_transcript_file("abc123")
|
|
self.assertIsNone(result)
|
|
|
|
def test_cache_hit_returns_cached_path(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
transcript_file = Path(tmpdir) / "abc123.jsonl"
|
|
transcript_file.write_text('{"type": "session_meta"}\n')
|
|
|
|
from amc_server.agents import _codex_transcript_cache
|
|
_codex_transcript_cache["abc123"] = str(transcript_file)
|
|
|
|
result = self.handler._find_codex_transcript_file("abc123")
|
|
self.assertEqual(result, transcript_file)
|
|
|
|
# Clean up
|
|
_codex_transcript_cache.clear()
|
|
|
|
def test_cache_hit_with_deleted_file_returns_none(self):
|
|
from amc_server.agents import _codex_transcript_cache
|
|
_codex_transcript_cache["deleted-session"] = "/nonexistent/file.jsonl"
|
|
|
|
result = self.handler._find_codex_transcript_file("deleted-session")
|
|
self.assertIsNone(result)
|
|
|
|
_codex_transcript_cache.clear()
|
|
|
|
def test_cache_hit_with_none_returns_none(self):
|
|
from amc_server.agents import _codex_transcript_cache
|
|
_codex_transcript_cache["cached-none"] = None
|
|
|
|
result = self.handler._find_codex_transcript_file("cached-none")
|
|
self.assertIsNone(result)
|
|
|
|
_codex_transcript_cache.clear()
|
|
|
|
|
|
class TestReadJsonlTailEntries(unittest.TestCase):
|
|
"""Tests for _read_jsonl_tail_entries edge cases."""
|
|
|
|
def setUp(self):
|
|
self.handler = DummyParsingHandler()
|
|
|
|
def test_empty_file_returns_empty_list(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
path = Path(f.name)
|
|
try:
|
|
result = self.handler._read_jsonl_tail_entries(path)
|
|
self.assertEqual(result, [])
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_nonexistent_file_returns_empty_list(self):
|
|
result = self.handler._read_jsonl_tail_entries(Path("/nonexistent/file.jsonl"))
|
|
self.assertEqual(result, [])
|
|
|
|
def test_single_line_file(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
f.write('{"key": "value"}\n')
|
|
path = Path(f.name)
|
|
try:
|
|
result = self.handler._read_jsonl_tail_entries(path)
|
|
self.assertEqual(result, [{"key": "value"}])
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_max_lines_limits_output(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
for i in range(100):
|
|
f.write(f'{{"n": {i}}}\n')
|
|
path = Path(f.name)
|
|
try:
|
|
result = self.handler._read_jsonl_tail_entries(path, max_lines=10)
|
|
self.assertEqual(len(result), 10)
|
|
# Should be the LAST 10 lines
|
|
self.assertEqual(result[-1], {"n": 99})
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_max_bytes_truncates_from_start(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
# Write many lines
|
|
for i in range(100):
|
|
f.write(f'{{"number": {i}}}\n')
|
|
path = Path(f.name)
|
|
try:
|
|
# Read only last 200 bytes
|
|
result = self.handler._read_jsonl_tail_entries(path, max_bytes=200)
|
|
# Should get some entries from the end
|
|
self.assertGreater(len(result), 0)
|
|
# All entries should be from near the end
|
|
for entry in result:
|
|
self.assertGreater(entry["number"], 80)
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_partial_first_line_skipped(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
# Write enough to trigger partial read
|
|
f.write('{"first": "line", "long_key": "' + "x" * 500 + '"}\n')
|
|
f.write('{"second": "line"}\n')
|
|
path = Path(f.name)
|
|
try:
|
|
# Read only last 100 bytes (will cut first line)
|
|
result = self.handler._read_jsonl_tail_entries(path, max_bytes=100)
|
|
# First line should be skipped (partial JSON)
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(result[0], {"second": "line"})
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_invalid_json_lines_skipped(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
f.write('{"valid": "json"}\n')
|
|
f.write('this is not json\n')
|
|
f.write('{"another": "valid"}\n')
|
|
path = Path(f.name)
|
|
try:
|
|
result = self.handler._read_jsonl_tail_entries(path)
|
|
self.assertEqual(len(result), 2)
|
|
self.assertEqual(result[0], {"valid": "json"})
|
|
self.assertEqual(result[1], {"another": "valid"})
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_empty_lines_skipped(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
f.write('{"first": 1}\n')
|
|
f.write('\n')
|
|
f.write('{"second": 2}\n')
|
|
path = Path(f.name)
|
|
try:
|
|
result = self.handler._read_jsonl_tail_entries(path)
|
|
self.assertEqual(len(result), 2)
|
|
finally:
|
|
path.unlink()
|
|
|
|
|
|
class TestParseClaudeContextUsageFromFile(unittest.TestCase):
|
|
"""Tests for _parse_claude_context_usage_from_file edge cases."""
|
|
|
|
def setUp(self):
|
|
self.handler = DummyParsingHandler()
|
|
|
|
def test_empty_file_returns_none(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
path = Path(f.name)
|
|
try:
|
|
result = self.handler._parse_claude_context_usage_from_file(path)
|
|
self.assertIsNone(result)
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_no_assistant_messages_returns_none(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
f.write('{"type": "user", "message": {"content": "hello"}}\n')
|
|
path = Path(f.name)
|
|
try:
|
|
result = self.handler._parse_claude_context_usage_from_file(path)
|
|
self.assertIsNone(result)
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_assistant_without_usage_returns_none(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
f.write('{"type": "assistant", "message": {"content": []}}\n')
|
|
path = Path(f.name)
|
|
try:
|
|
result = self.handler._parse_claude_context_usage_from_file(path)
|
|
self.assertIsNone(result)
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_extracts_usage_from_assistant_message(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
f.write(json.dumps({
|
|
"type": "assistant",
|
|
"timestamp": "2024-01-01T00:00:00Z",
|
|
"message": {
|
|
"model": "claude-3-5-sonnet-20241022",
|
|
"usage": {
|
|
"input_tokens": 1000,
|
|
"output_tokens": 500,
|
|
"cache_read_input_tokens": 200,
|
|
"cache_creation_input_tokens": 100,
|
|
}
|
|
}
|
|
}) + "\n")
|
|
path = Path(f.name)
|
|
try:
|
|
result = self.handler._parse_claude_context_usage_from_file(path)
|
|
self.assertIsNotNone(result)
|
|
self.assertEqual(result["input_tokens"], 1000)
|
|
self.assertEqual(result["output_tokens"], 500)
|
|
self.assertEqual(result["cached_input_tokens"], 300) # 200 + 100
|
|
self.assertEqual(result["current_tokens"], 1800) # sum of all
|
|
self.assertEqual(result["window_tokens"], 200_000)
|
|
self.assertEqual(result["model"], "claude-3-5-sonnet-20241022")
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_uses_most_recent_assistant_message(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
f.write(json.dumps({
|
|
"type": "assistant",
|
|
"message": {"usage": {"input_tokens": 100, "output_tokens": 50}}
|
|
}) + "\n")
|
|
f.write(json.dumps({
|
|
"type": "assistant",
|
|
"message": {"usage": {"input_tokens": 999, "output_tokens": 888}}
|
|
}) + "\n")
|
|
path = Path(f.name)
|
|
try:
|
|
result = self.handler._parse_claude_context_usage_from_file(path)
|
|
# Should use the last message
|
|
self.assertEqual(result["input_tokens"], 999)
|
|
self.assertEqual(result["output_tokens"], 888)
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_skips_assistant_with_no_current_tokens(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
# Last message has no usable tokens
|
|
f.write(json.dumps({
|
|
"type": "assistant",
|
|
"message": {"usage": {"input_tokens": 100, "output_tokens": 50}}
|
|
}) + "\n")
|
|
f.write(json.dumps({
|
|
"type": "assistant",
|
|
"message": {"usage": {}} # No tokens
|
|
}) + "\n")
|
|
path = Path(f.name)
|
|
try:
|
|
result = self.handler._parse_claude_context_usage_from_file(path)
|
|
# Should fall back to earlier message with valid tokens
|
|
self.assertEqual(result["input_tokens"], 100)
|
|
finally:
|
|
path.unlink()
|
|
|
|
|
|
class TestParseCodexContextUsageFromFile(unittest.TestCase):
|
|
"""Tests for _parse_codex_context_usage_from_file edge cases."""
|
|
|
|
def setUp(self):
|
|
self.handler = DummyParsingHandler()
|
|
|
|
def test_empty_file_returns_none(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
path = Path(f.name)
|
|
try:
|
|
result = self.handler._parse_codex_context_usage_from_file(path)
|
|
self.assertIsNone(result)
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_no_token_count_events_returns_none(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
f.write('{"type": "response_item"}\n')
|
|
path = Path(f.name)
|
|
try:
|
|
result = self.handler._parse_codex_context_usage_from_file(path)
|
|
self.assertIsNone(result)
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_extracts_token_count_event(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
f.write(json.dumps({
|
|
"type": "event_msg",
|
|
"timestamp": "2024-01-01T00:00:00Z",
|
|
"payload": {
|
|
"type": "token_count",
|
|
"info": {
|
|
"model_context_window": 128000,
|
|
"last_token_usage": {
|
|
"input_tokens": 5000,
|
|
"output_tokens": 2000,
|
|
"cached_input_tokens": 1000,
|
|
"total_tokens": 8000,
|
|
},
|
|
"total_token_usage": {
|
|
"total_tokens": 50000,
|
|
}
|
|
}
|
|
}
|
|
}) + "\n")
|
|
path = Path(f.name)
|
|
try:
|
|
result = self.handler._parse_codex_context_usage_from_file(path)
|
|
self.assertIsNotNone(result)
|
|
self.assertEqual(result["window_tokens"], 128000)
|
|
self.assertEqual(result["current_tokens"], 8000)
|
|
self.assertEqual(result["input_tokens"], 5000)
|
|
self.assertEqual(result["output_tokens"], 2000)
|
|
self.assertEqual(result["session_total_tokens"], 50000)
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_calculates_current_tokens_when_total_missing(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
f.write(json.dumps({
|
|
"type": "event_msg",
|
|
"payload": {
|
|
"type": "token_count",
|
|
"info": {
|
|
"last_token_usage": {
|
|
"input_tokens": 100,
|
|
"output_tokens": 50,
|
|
# no total_tokens
|
|
}
|
|
}
|
|
}
|
|
}) + "\n")
|
|
path = Path(f.name)
|
|
try:
|
|
result = self.handler._parse_codex_context_usage_from_file(path)
|
|
# Should sum available tokens
|
|
self.assertEqual(result["current_tokens"], 150)
|
|
finally:
|
|
path.unlink()
|
|
|
|
|
|
class TestGetCachedContextUsage(unittest.TestCase):
|
|
"""Tests for _get_cached_context_usage edge cases."""
|
|
|
|
def setUp(self):
|
|
self.handler = DummyParsingHandler()
|
|
# Clear cache before each test
|
|
from amc_server.agents import _context_usage_cache
|
|
_context_usage_cache.clear()
|
|
|
|
def test_nonexistent_file_returns_none(self):
|
|
def mock_parser(path):
|
|
return {"tokens": 100}
|
|
|
|
result = self.handler._get_cached_context_usage(
|
|
Path("/nonexistent/file.jsonl"),
|
|
mock_parser
|
|
)
|
|
self.assertIsNone(result)
|
|
|
|
def test_caches_result_by_mtime_and_size(self):
|
|
call_count = [0]
|
|
def counting_parser(path):
|
|
call_count[0] += 1
|
|
return {"tokens": 100}
|
|
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
f.write('{"data": "test"}\n')
|
|
path = Path(f.name)
|
|
try:
|
|
# First call - should invoke parser
|
|
result1 = self.handler._get_cached_context_usage(path, counting_parser)
|
|
self.assertEqual(call_count[0], 1)
|
|
self.assertEqual(result1, {"tokens": 100})
|
|
|
|
# Second call - should use cache
|
|
result2 = self.handler._get_cached_context_usage(path, counting_parser)
|
|
self.assertEqual(call_count[0], 1) # No additional call
|
|
self.assertEqual(result2, {"tokens": 100})
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_invalidates_cache_on_mtime_change(self):
|
|
import time
|
|
|
|
call_count = [0]
|
|
def counting_parser(path):
|
|
call_count[0] += 1
|
|
return {"tokens": call_count[0] * 100}
|
|
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
f.write('{"data": "test"}\n')
|
|
path = Path(f.name)
|
|
try:
|
|
result1 = self.handler._get_cached_context_usage(path, counting_parser)
|
|
self.assertEqual(result1, {"tokens": 100})
|
|
|
|
# Modify file to change mtime
|
|
time.sleep(0.01)
|
|
path.write_text('{"data": "modified"}\n')
|
|
|
|
result2 = self.handler._get_cached_context_usage(path, counting_parser)
|
|
self.assertEqual(call_count[0], 2) # Parser called again
|
|
self.assertEqual(result2, {"tokens": 200})
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_parser_exception_returns_none(self):
|
|
def failing_parser(path):
|
|
raise ValueError("Parse error")
|
|
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
f.write('{"data": "test"}\n')
|
|
path = Path(f.name)
|
|
try:
|
|
result = self.handler._get_cached_context_usage(path, failing_parser)
|
|
self.assertIsNone(result)
|
|
finally:
|
|
path.unlink()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|