Closes bd-3ny. Added mousedown listener that dismisses the dropdown when clicking outside both the dropdown and textarea. Uses early return to avoid registering listeners when dropdown is already closed.
636 lines
24 KiB
Python
636 lines
24 KiB
Python
"""Tests for mixins/parsing.py edge cases.
|
|
|
|
Unit tests for parsing helper functions and conversation file resolution.
|
|
"""
|
|
|
|
import json
|
|
import tempfile
|
|
import unittest
|
|
from pathlib import Path
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
from amc_server.mixins.parsing import SessionParsingMixin
|
|
|
|
|
|
class DummyParsingHandler(SessionParsingMixin):
|
|
"""Minimal handler for testing parsing mixin."""
|
|
pass
|
|
|
|
|
|
class TestToInt(unittest.TestCase):
|
|
"""Tests for _to_int edge cases."""
|
|
|
|
def setUp(self):
|
|
self.handler = DummyParsingHandler()
|
|
|
|
def test_none_returns_none(self):
|
|
self.assertIsNone(self.handler._to_int(None))
|
|
|
|
def test_bool_true_returns_none(self):
|
|
# Booleans are technically ints in Python, but we don't want to convert them
|
|
self.assertIsNone(self.handler._to_int(True))
|
|
|
|
def test_bool_false_returns_none(self):
|
|
self.assertIsNone(self.handler._to_int(False))
|
|
|
|
def test_int_returns_int(self):
|
|
self.assertEqual(self.handler._to_int(42), 42)
|
|
|
|
def test_negative_int_returns_int(self):
|
|
self.assertEqual(self.handler._to_int(-10), -10)
|
|
|
|
def test_zero_returns_zero(self):
|
|
self.assertEqual(self.handler._to_int(0), 0)
|
|
|
|
def test_float_truncates_to_int(self):
|
|
self.assertEqual(self.handler._to_int(3.7), 3)
|
|
|
|
def test_negative_float_truncates(self):
|
|
self.assertEqual(self.handler._to_int(-2.9), -2)
|
|
|
|
def test_string_int_parses(self):
|
|
self.assertEqual(self.handler._to_int("123"), 123)
|
|
|
|
def test_string_negative_parses(self):
|
|
self.assertEqual(self.handler._to_int("-456"), -456)
|
|
|
|
def test_string_with_whitespace_fails(self):
|
|
# Python's int() handles whitespace, but let's verify
|
|
self.assertEqual(self.handler._to_int(" 42 "), 42)
|
|
|
|
def test_string_float_fails(self):
|
|
# "3.14" can't be parsed by int()
|
|
self.assertIsNone(self.handler._to_int("3.14"))
|
|
|
|
def test_empty_string_returns_none(self):
|
|
self.assertIsNone(self.handler._to_int(""))
|
|
|
|
def test_non_numeric_string_returns_none(self):
|
|
self.assertIsNone(self.handler._to_int("abc"))
|
|
|
|
def test_list_returns_none(self):
|
|
self.assertIsNone(self.handler._to_int([1, 2, 3]))
|
|
|
|
def test_dict_returns_none(self):
|
|
self.assertIsNone(self.handler._to_int({"value": 42}))
|
|
|
|
|
|
class TestSumOptionalInts(unittest.TestCase):
|
|
"""Tests for _sum_optional_ints edge cases."""
|
|
|
|
def setUp(self):
|
|
self.handler = DummyParsingHandler()
|
|
|
|
def test_empty_list_returns_none(self):
|
|
self.assertIsNone(self.handler._sum_optional_ints([]))
|
|
|
|
def test_all_none_returns_none(self):
|
|
self.assertIsNone(self.handler._sum_optional_ints([None, None, None]))
|
|
|
|
def test_single_int_returns_that_int(self):
|
|
self.assertEqual(self.handler._sum_optional_ints([42]), 42)
|
|
|
|
def test_mixed_none_and_int_sums_ints(self):
|
|
self.assertEqual(self.handler._sum_optional_ints([None, 10, None, 20]), 30)
|
|
|
|
def test_all_ints_sums_all(self):
|
|
self.assertEqual(self.handler._sum_optional_ints([1, 2, 3, 4]), 10)
|
|
|
|
def test_includes_zero(self):
|
|
self.assertEqual(self.handler._sum_optional_ints([0, 5]), 5)
|
|
|
|
def test_negative_ints(self):
|
|
self.assertEqual(self.handler._sum_optional_ints([10, -3, 5]), 12)
|
|
|
|
def test_floats_ignored(self):
|
|
# Only integers are summed
|
|
self.assertEqual(self.handler._sum_optional_ints([10, 3.14, 5]), 15)
|
|
|
|
def test_strings_ignored(self):
|
|
self.assertEqual(self.handler._sum_optional_ints(["10", 5]), 5)
|
|
|
|
def test_only_non_ints_returns_none(self):
|
|
self.assertIsNone(self.handler._sum_optional_ints(["10", 3.14, None]))
|
|
|
|
|
|
class TestAsDict(unittest.TestCase):
|
|
"""Tests for _as_dict edge cases."""
|
|
|
|
def setUp(self):
|
|
self.handler = DummyParsingHandler()
|
|
|
|
def test_dict_returns_dict(self):
|
|
self.assertEqual(self.handler._as_dict({"key": "value"}), {"key": "value"})
|
|
|
|
def test_empty_dict_returns_empty_dict(self):
|
|
self.assertEqual(self.handler._as_dict({}), {})
|
|
|
|
def test_none_returns_empty_dict(self):
|
|
self.assertEqual(self.handler._as_dict(None), {})
|
|
|
|
def test_list_returns_empty_dict(self):
|
|
self.assertEqual(self.handler._as_dict([1, 2, 3]), {})
|
|
|
|
def test_string_returns_empty_dict(self):
|
|
self.assertEqual(self.handler._as_dict("not a dict"), {})
|
|
|
|
def test_int_returns_empty_dict(self):
|
|
self.assertEqual(self.handler._as_dict(42), {})
|
|
|
|
def test_bool_returns_empty_dict(self):
|
|
self.assertEqual(self.handler._as_dict(True), {})
|
|
|
|
|
|
class TestGetClaudeContextWindow(unittest.TestCase):
|
|
"""Tests for _get_claude_context_window edge cases."""
|
|
|
|
def setUp(self):
|
|
self.handler = DummyParsingHandler()
|
|
|
|
def test_none_model_returns_200k(self):
|
|
self.assertEqual(self.handler._get_claude_context_window(None), 200_000)
|
|
|
|
def test_empty_string_returns_200k(self):
|
|
self.assertEqual(self.handler._get_claude_context_window(""), 200_000)
|
|
|
|
def test_claude_2_returns_100k(self):
|
|
self.assertEqual(self.handler._get_claude_context_window("claude-2"), 100_000)
|
|
|
|
def test_claude_2_1_returns_100k(self):
|
|
self.assertEqual(self.handler._get_claude_context_window("claude-2.1"), 100_000)
|
|
|
|
def test_claude_3_returns_200k(self):
|
|
self.assertEqual(self.handler._get_claude_context_window("claude-3-opus-20240229"), 200_000)
|
|
|
|
def test_claude_35_returns_200k(self):
|
|
self.assertEqual(self.handler._get_claude_context_window("claude-3-5-sonnet-20241022"), 200_000)
|
|
|
|
def test_unknown_model_returns_200k(self):
|
|
self.assertEqual(self.handler._get_claude_context_window("some-future-model"), 200_000)
|
|
|
|
|
|
class TestGetClaudeConversationFile(unittest.TestCase):
|
|
"""Tests for _get_claude_conversation_file edge cases."""
|
|
|
|
def setUp(self):
|
|
self.handler = DummyParsingHandler()
|
|
|
|
def test_empty_project_dir_returns_none(self):
|
|
self.assertIsNone(self.handler._get_claude_conversation_file("session123", ""))
|
|
|
|
def test_none_project_dir_returns_none(self):
|
|
self.assertIsNone(self.handler._get_claude_conversation_file("session123", None))
|
|
|
|
def test_nonexistent_file_returns_none(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
with patch("amc_server.mixins.parsing.CLAUDE_PROJECTS_DIR", Path(tmpdir)):
|
|
result = self.handler._get_claude_conversation_file("session123", "/some/project")
|
|
self.assertIsNone(result)
|
|
|
|
def test_existing_file_returns_path(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
# Create the expected file structure
|
|
# project_dir "/foo/bar" becomes "-foo-bar"
|
|
encoded_dir = Path(tmpdir) / "-foo-bar"
|
|
encoded_dir.mkdir()
|
|
conv_file = encoded_dir / "session123.jsonl"
|
|
conv_file.write_text('{"type": "user"}\n')
|
|
|
|
with patch("amc_server.mixins.parsing.CLAUDE_PROJECTS_DIR", Path(tmpdir)):
|
|
result = self.handler._get_claude_conversation_file("session123", "/foo/bar")
|
|
self.assertEqual(result, conv_file)
|
|
|
|
def test_project_dir_without_leading_slash_gets_prefixed(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
# project_dir "foo/bar" becomes "-foo-bar" (adds leading dash)
|
|
encoded_dir = Path(tmpdir) / "-foo-bar"
|
|
encoded_dir.mkdir()
|
|
conv_file = encoded_dir / "session123.jsonl"
|
|
conv_file.write_text('{"type": "user"}\n')
|
|
|
|
with patch("amc_server.mixins.parsing.CLAUDE_PROJECTS_DIR", Path(tmpdir)):
|
|
result = self.handler._get_claude_conversation_file("session123", "foo/bar")
|
|
self.assertEqual(result, conv_file)
|
|
|
|
|
|
class TestFindCodexTranscriptFile(unittest.TestCase):
|
|
"""Tests for _find_codex_transcript_file edge cases."""
|
|
|
|
def setUp(self):
|
|
self.handler = DummyParsingHandler()
|
|
|
|
def test_empty_session_id_returns_none(self):
|
|
self.assertIsNone(self.handler._find_codex_transcript_file(""))
|
|
|
|
def test_none_session_id_returns_none(self):
|
|
self.assertIsNone(self.handler._find_codex_transcript_file(None))
|
|
|
|
def test_codex_sessions_dir_missing_returns_none(self):
|
|
with patch("amc_server.mixins.parsing.CODEX_SESSIONS_DIR", Path("/nonexistent")):
|
|
# Clear cache to force discovery
|
|
from amc_server.context import _codex_transcript_cache
|
|
_codex_transcript_cache.clear()
|
|
result = self.handler._find_codex_transcript_file("abc123")
|
|
self.assertIsNone(result)
|
|
|
|
def test_cache_hit_returns_cached_path(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
transcript_file = Path(tmpdir) / "abc123.jsonl"
|
|
transcript_file.write_text('{"type": "session_meta"}\n')
|
|
|
|
from amc_server.context import _codex_transcript_cache
|
|
_codex_transcript_cache["abc123"] = str(transcript_file)
|
|
|
|
result = self.handler._find_codex_transcript_file("abc123")
|
|
self.assertEqual(result, transcript_file)
|
|
|
|
# Clean up
|
|
_codex_transcript_cache.clear()
|
|
|
|
def test_cache_hit_with_deleted_file_returns_none(self):
|
|
from amc_server.context import _codex_transcript_cache
|
|
_codex_transcript_cache["deleted-session"] = "/nonexistent/file.jsonl"
|
|
|
|
result = self.handler._find_codex_transcript_file("deleted-session")
|
|
self.assertIsNone(result)
|
|
|
|
_codex_transcript_cache.clear()
|
|
|
|
def test_cache_hit_with_none_returns_none(self):
|
|
from amc_server.context import _codex_transcript_cache
|
|
_codex_transcript_cache["cached-none"] = None
|
|
|
|
result = self.handler._find_codex_transcript_file("cached-none")
|
|
self.assertIsNone(result)
|
|
|
|
_codex_transcript_cache.clear()
|
|
|
|
|
|
class TestReadJsonlTailEntries(unittest.TestCase):
|
|
"""Tests for _read_jsonl_tail_entries edge cases."""
|
|
|
|
def setUp(self):
|
|
self.handler = DummyParsingHandler()
|
|
|
|
def test_empty_file_returns_empty_list(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
path = Path(f.name)
|
|
try:
|
|
result = self.handler._read_jsonl_tail_entries(path)
|
|
self.assertEqual(result, [])
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_nonexistent_file_returns_empty_list(self):
|
|
result = self.handler._read_jsonl_tail_entries(Path("/nonexistent/file.jsonl"))
|
|
self.assertEqual(result, [])
|
|
|
|
def test_single_line_file(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
f.write('{"key": "value"}\n')
|
|
path = Path(f.name)
|
|
try:
|
|
result = self.handler._read_jsonl_tail_entries(path)
|
|
self.assertEqual(result, [{"key": "value"}])
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_max_lines_limits_output(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
for i in range(100):
|
|
f.write(f'{{"n": {i}}}\n')
|
|
path = Path(f.name)
|
|
try:
|
|
result = self.handler._read_jsonl_tail_entries(path, max_lines=10)
|
|
self.assertEqual(len(result), 10)
|
|
# Should be the LAST 10 lines
|
|
self.assertEqual(result[-1], {"n": 99})
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_max_bytes_truncates_from_start(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
# Write many lines
|
|
for i in range(100):
|
|
f.write(f'{{"number": {i}}}\n')
|
|
path = Path(f.name)
|
|
try:
|
|
# Read only last 200 bytes
|
|
result = self.handler._read_jsonl_tail_entries(path, max_bytes=200)
|
|
# Should get some entries from the end
|
|
self.assertGreater(len(result), 0)
|
|
# All entries should be from near the end
|
|
for entry in result:
|
|
self.assertGreater(entry["number"], 80)
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_partial_first_line_skipped(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
# Write enough to trigger partial read
|
|
f.write('{"first": "line", "long_key": "' + "x" * 500 + '"}\n')
|
|
f.write('{"second": "line"}\n')
|
|
path = Path(f.name)
|
|
try:
|
|
# Read only last 100 bytes (will cut first line)
|
|
result = self.handler._read_jsonl_tail_entries(path, max_bytes=100)
|
|
# First line should be skipped (partial JSON)
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(result[0], {"second": "line"})
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_invalid_json_lines_skipped(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
f.write('{"valid": "json"}\n')
|
|
f.write('this is not json\n')
|
|
f.write('{"another": "valid"}\n')
|
|
path = Path(f.name)
|
|
try:
|
|
result = self.handler._read_jsonl_tail_entries(path)
|
|
self.assertEqual(len(result), 2)
|
|
self.assertEqual(result[0], {"valid": "json"})
|
|
self.assertEqual(result[1], {"another": "valid"})
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_empty_lines_skipped(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
f.write('{"first": 1}\n')
|
|
f.write('\n')
|
|
f.write('{"second": 2}\n')
|
|
path = Path(f.name)
|
|
try:
|
|
result = self.handler._read_jsonl_tail_entries(path)
|
|
self.assertEqual(len(result), 2)
|
|
finally:
|
|
path.unlink()
|
|
|
|
|
|
class TestParseClaudeContextUsageFromFile(unittest.TestCase):
|
|
"""Tests for _parse_claude_context_usage_from_file edge cases."""
|
|
|
|
def setUp(self):
|
|
self.handler = DummyParsingHandler()
|
|
|
|
def test_empty_file_returns_none(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
path = Path(f.name)
|
|
try:
|
|
result = self.handler._parse_claude_context_usage_from_file(path)
|
|
self.assertIsNone(result)
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_no_assistant_messages_returns_none(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
f.write('{"type": "user", "message": {"content": "hello"}}\n')
|
|
path = Path(f.name)
|
|
try:
|
|
result = self.handler._parse_claude_context_usage_from_file(path)
|
|
self.assertIsNone(result)
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_assistant_without_usage_returns_none(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
f.write('{"type": "assistant", "message": {"content": []}}\n')
|
|
path = Path(f.name)
|
|
try:
|
|
result = self.handler._parse_claude_context_usage_from_file(path)
|
|
self.assertIsNone(result)
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_extracts_usage_from_assistant_message(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
f.write(json.dumps({
|
|
"type": "assistant",
|
|
"timestamp": "2024-01-01T00:00:00Z",
|
|
"message": {
|
|
"model": "claude-3-5-sonnet-20241022",
|
|
"usage": {
|
|
"input_tokens": 1000,
|
|
"output_tokens": 500,
|
|
"cache_read_input_tokens": 200,
|
|
"cache_creation_input_tokens": 100,
|
|
}
|
|
}
|
|
}) + "\n")
|
|
path = Path(f.name)
|
|
try:
|
|
result = self.handler._parse_claude_context_usage_from_file(path)
|
|
self.assertIsNotNone(result)
|
|
self.assertEqual(result["input_tokens"], 1000)
|
|
self.assertEqual(result["output_tokens"], 500)
|
|
self.assertEqual(result["cached_input_tokens"], 300) # 200 + 100
|
|
self.assertEqual(result["current_tokens"], 1800) # sum of all
|
|
self.assertEqual(result["window_tokens"], 200_000)
|
|
self.assertEqual(result["model"], "claude-3-5-sonnet-20241022")
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_uses_most_recent_assistant_message(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
f.write(json.dumps({
|
|
"type": "assistant",
|
|
"message": {"usage": {"input_tokens": 100, "output_tokens": 50}}
|
|
}) + "\n")
|
|
f.write(json.dumps({
|
|
"type": "assistant",
|
|
"message": {"usage": {"input_tokens": 999, "output_tokens": 888}}
|
|
}) + "\n")
|
|
path = Path(f.name)
|
|
try:
|
|
result = self.handler._parse_claude_context_usage_from_file(path)
|
|
# Should use the last message
|
|
self.assertEqual(result["input_tokens"], 999)
|
|
self.assertEqual(result["output_tokens"], 888)
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_skips_assistant_with_no_current_tokens(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
# Last message has no usable tokens
|
|
f.write(json.dumps({
|
|
"type": "assistant",
|
|
"message": {"usage": {"input_tokens": 100, "output_tokens": 50}}
|
|
}) + "\n")
|
|
f.write(json.dumps({
|
|
"type": "assistant",
|
|
"message": {"usage": {}} # No tokens
|
|
}) + "\n")
|
|
path = Path(f.name)
|
|
try:
|
|
result = self.handler._parse_claude_context_usage_from_file(path)
|
|
# Should fall back to earlier message with valid tokens
|
|
self.assertEqual(result["input_tokens"], 100)
|
|
finally:
|
|
path.unlink()
|
|
|
|
|
|
class TestParseCodexContextUsageFromFile(unittest.TestCase):
|
|
"""Tests for _parse_codex_context_usage_from_file edge cases."""
|
|
|
|
def setUp(self):
|
|
self.handler = DummyParsingHandler()
|
|
|
|
def test_empty_file_returns_none(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
path = Path(f.name)
|
|
try:
|
|
result = self.handler._parse_codex_context_usage_from_file(path)
|
|
self.assertIsNone(result)
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_no_token_count_events_returns_none(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
f.write('{"type": "response_item"}\n')
|
|
path = Path(f.name)
|
|
try:
|
|
result = self.handler._parse_codex_context_usage_from_file(path)
|
|
self.assertIsNone(result)
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_extracts_token_count_event(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
f.write(json.dumps({
|
|
"type": "event_msg",
|
|
"timestamp": "2024-01-01T00:00:00Z",
|
|
"payload": {
|
|
"type": "token_count",
|
|
"info": {
|
|
"model_context_window": 128000,
|
|
"last_token_usage": {
|
|
"input_tokens": 5000,
|
|
"output_tokens": 2000,
|
|
"cached_input_tokens": 1000,
|
|
"total_tokens": 8000,
|
|
},
|
|
"total_token_usage": {
|
|
"total_tokens": 50000,
|
|
}
|
|
}
|
|
}
|
|
}) + "\n")
|
|
path = Path(f.name)
|
|
try:
|
|
result = self.handler._parse_codex_context_usage_from_file(path)
|
|
self.assertIsNotNone(result)
|
|
self.assertEqual(result["window_tokens"], 128000)
|
|
self.assertEqual(result["current_tokens"], 8000)
|
|
self.assertEqual(result["input_tokens"], 5000)
|
|
self.assertEqual(result["output_tokens"], 2000)
|
|
self.assertEqual(result["session_total_tokens"], 50000)
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_calculates_current_tokens_when_total_missing(self):
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
f.write(json.dumps({
|
|
"type": "event_msg",
|
|
"payload": {
|
|
"type": "token_count",
|
|
"info": {
|
|
"last_token_usage": {
|
|
"input_tokens": 100,
|
|
"output_tokens": 50,
|
|
# no total_tokens
|
|
}
|
|
}
|
|
}
|
|
}) + "\n")
|
|
path = Path(f.name)
|
|
try:
|
|
result = self.handler._parse_codex_context_usage_from_file(path)
|
|
# Should sum available tokens
|
|
self.assertEqual(result["current_tokens"], 150)
|
|
finally:
|
|
path.unlink()
|
|
|
|
|
|
class TestGetCachedContextUsage(unittest.TestCase):
|
|
"""Tests for _get_cached_context_usage edge cases."""
|
|
|
|
def setUp(self):
|
|
self.handler = DummyParsingHandler()
|
|
# Clear cache before each test
|
|
from amc_server.context import _context_usage_cache
|
|
_context_usage_cache.clear()
|
|
|
|
def test_nonexistent_file_returns_none(self):
|
|
def mock_parser(path):
|
|
return {"tokens": 100}
|
|
|
|
result = self.handler._get_cached_context_usage(
|
|
Path("/nonexistent/file.jsonl"),
|
|
mock_parser
|
|
)
|
|
self.assertIsNone(result)
|
|
|
|
def test_caches_result_by_mtime_and_size(self):
|
|
call_count = [0]
|
|
def counting_parser(path):
|
|
call_count[0] += 1
|
|
return {"tokens": 100}
|
|
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
f.write('{"data": "test"}\n')
|
|
path = Path(f.name)
|
|
try:
|
|
# First call - should invoke parser
|
|
result1 = self.handler._get_cached_context_usage(path, counting_parser)
|
|
self.assertEqual(call_count[0], 1)
|
|
self.assertEqual(result1, {"tokens": 100})
|
|
|
|
# Second call - should use cache
|
|
result2 = self.handler._get_cached_context_usage(path, counting_parser)
|
|
self.assertEqual(call_count[0], 1) # No additional call
|
|
self.assertEqual(result2, {"tokens": 100})
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_invalidates_cache_on_mtime_change(self):
|
|
import time
|
|
|
|
call_count = [0]
|
|
def counting_parser(path):
|
|
call_count[0] += 1
|
|
return {"tokens": call_count[0] * 100}
|
|
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
f.write('{"data": "test"}\n')
|
|
path = Path(f.name)
|
|
try:
|
|
result1 = self.handler._get_cached_context_usage(path, counting_parser)
|
|
self.assertEqual(result1, {"tokens": 100})
|
|
|
|
# Modify file to change mtime
|
|
time.sleep(0.01)
|
|
path.write_text('{"data": "modified"}\n')
|
|
|
|
result2 = self.handler._get_cached_context_usage(path, counting_parser)
|
|
self.assertEqual(call_count[0], 2) # Parser called again
|
|
self.assertEqual(result2, {"tokens": 200})
|
|
finally:
|
|
path.unlink()
|
|
|
|
def test_parser_exception_returns_none(self):
|
|
def failing_parser(path):
|
|
raise ValueError("Parse error")
|
|
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
|
f.write('{"data": "test"}\n')
|
|
path = Path(f.name)
|
|
try:
|
|
result = self.handler._get_cached_context_usage(path, failing_parser)
|
|
self.assertIsNone(result)
|
|
finally:
|
|
path.unlink()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|