import io import json import os import subprocess import tempfile import unittest from pathlib import Path from unittest.mock import MagicMock, patch import amc_server.mixins.control as control from amc_server.mixins.control import SessionControlMixin class DummyControlHandler(SessionControlMixin): def __init__(self, body=None): if body is None: body = {} raw = json.dumps(body).encode("utf-8") self.headers = {"Content-Length": str(len(raw))} self.rfile = io.BytesIO(raw) self.sent = [] self.errors = [] def _send_json(self, code, payload): self.sent.append((code, payload)) def _json_error(self, code, message): self.errors.append((code, message)) class SessionControlMixinTests(unittest.TestCase): def _write_session(self, sessions_dir: Path, session_id: str, zellij_session="infra", zellij_pane="21"): sessions_dir.mkdir(parents=True, exist_ok=True) session_file = sessions_dir / f"{session_id}.json" session_file.write_text( json.dumps({ "session_id": session_id, "zellij_session": zellij_session, "zellij_pane": zellij_pane, }) ) def test_inject_text_then_enter_is_two_step_with_delay(self): handler = DummyControlHandler() calls = [] def fake_inject(zellij_session, pane_id, text, send_enter=True): calls.append((zellij_session, pane_id, text, send_enter)) return {"ok": True} handler._inject_to_pane = fake_inject with patch("amc_server.mixins.control.time.sleep") as sleep_mock, patch.dict(os.environ, {}, clear=True): result = handler._inject_text_then_enter("infra", 24, "testing") self.assertEqual(result, {"ok": True}) self.assertEqual( calls, [ ("infra", 24, "testing", False), ("infra", 24, "", True), ], ) sleep_mock.assert_called_once_with(0.20) def test_inject_text_then_enter_delay_honors_environment_override(self): handler = DummyControlHandler() handler._inject_to_pane = MagicMock(return_value={"ok": True}) with patch("amc_server.mixins.control.time.sleep") as sleep_mock, patch.dict( os.environ, {"AMC_SUBMIT_ENTER_DELAY_MS": "350"}, clear=True ): result = handler._inject_text_then_enter("infra", 9, "hello") self.assertEqual(result, {"ok": True}) sleep_mock.assert_called_once_with(0.35) def test_respond_to_session_freeform_selects_other_then_submits_text(self): session_id = "abc123" with tempfile.TemporaryDirectory() as tmpdir: sessions_dir = Path(tmpdir) self._write_session(sessions_dir, session_id) handler = DummyControlHandler( { "text": "testing", "freeform": True, "optionCount": 3, } ) with patch.object(control, "SESSIONS_DIR", sessions_dir), patch( "amc_server.mixins.control.time.sleep" ) as sleep_mock: handler._inject_to_pane = MagicMock(return_value={"ok": True}) handler._inject_text_then_enter = MagicMock(return_value={"ok": True}) handler._respond_to_session(session_id) handler._inject_to_pane.assert_called_once_with("infra", 21, "4", send_enter=False) handler._inject_text_then_enter.assert_called_once_with("infra", 21, "testing") sleep_mock.assert_called_once_with(handler._FREEFORM_MODE_SWITCH_DELAY_SEC) self.assertEqual(handler.errors, []) self.assertEqual(handler.sent, [(200, {"ok": True})]) def test_respond_to_session_non_freeform_uses_text_then_enter_helper(self): session_id = "abc456" with tempfile.TemporaryDirectory() as tmpdir: sessions_dir = Path(tmpdir) self._write_session(sessions_dir, session_id, zellij_pane="terminal_5") handler = DummyControlHandler({"text": "hello"}) with patch.object(control, "SESSIONS_DIR", sessions_dir): handler._inject_to_pane = MagicMock(return_value={"ok": True}) handler._inject_text_then_enter = MagicMock(return_value={"ok": True}) handler._respond_to_session(session_id) handler._inject_to_pane.assert_not_called() handler._inject_text_then_enter.assert_called_once_with("infra", 5, "hello") self.assertEqual(handler.sent, [(200, {"ok": True})]) def test_respond_to_session_missing_session_returns_404(self): handler = DummyControlHandler({"text": "hello"}) sessions_dir = Path(self.id()) with patch.object(control, "SESSIONS_DIR", sessions_dir): handler._respond_to_session("does-not-exist") self.assertEqual(handler.sent, []) self.assertEqual(handler.errors, [(404, "Session not found")]) def test_try_plugin_inject_uses_explicit_session_and_payload(self): handler = DummyControlHandler() env = {} completed = subprocess.CompletedProcess(args=[], returncode=0, stdout="", stderr="") with patch.object(control, "ZELLIJ_BIN", "/opt/homebrew/bin/zellij"), patch( "amc_server.mixins.control.subprocess.run", return_value=completed ) as run_mock: result = handler._try_plugin_inject(env, "infra", 24, "testing", send_enter=True) self.assertEqual(result, {"ok": True}) args = run_mock.call_args.args[0] self.assertEqual(args[0], "/opt/homebrew/bin/zellij") self.assertEqual(args[1:3], ["--session", "infra"]) payload = json.loads(args[-1]) self.assertEqual(payload["pane_id"], 24) self.assertEqual(payload["text"], "testing") self.assertIs(payload["send_enter"], True) def test_inject_to_pane_without_plugin_or_unsafe_fallback_returns_error(self): handler = DummyControlHandler() with patch.object(control, "ZELLIJ_PLUGIN", Path("/definitely/missing/plugin.wasm")), patch.dict( os.environ, {}, clear=True ): result = handler._inject_to_pane("infra", 24, "testing", send_enter=True) self.assertFalse(result["ok"]) self.assertIn("Pane-targeted injection requires zellij-send-keys plugin", result["error"]) def test_inject_to_pane_allows_unsafe_fallback_when_enabled(self): handler = DummyControlHandler() with patch.object(control, "ZELLIJ_PLUGIN", Path("/definitely/missing/plugin.wasm")), patch.dict( os.environ, {"AMC_ALLOW_UNSAFE_WRITE_CHARS_FALLBACK": "1"}, clear=True ): handler._try_write_chars_inject = MagicMock(return_value={"ok": True}) result = handler._inject_to_pane("infra", 24, "testing", send_enter=True) self.assertEqual(result, {"ok": True}) handler._try_write_chars_inject.assert_called_once() class TestParsePaneId(unittest.TestCase): """Tests for _parse_pane_id edge cases.""" def setUp(self): self.handler = DummyControlHandler() def test_empty_string_returns_none(self): self.assertIsNone(self.handler._parse_pane_id("")) def test_none_returns_none(self): self.assertIsNone(self.handler._parse_pane_id(None)) def test_direct_int_string_parses(self): self.assertEqual(self.handler._parse_pane_id("42"), 42) def test_terminal_format_parses(self): self.assertEqual(self.handler._parse_pane_id("terminal_5"), 5) def test_plugin_format_parses(self): self.assertEqual(self.handler._parse_pane_id("plugin_3"), 3) def test_unknown_prefix_returns_none(self): self.assertIsNone(self.handler._parse_pane_id("pane_7")) def test_non_numeric_suffix_returns_none(self): self.assertIsNone(self.handler._parse_pane_id("terminal_abc")) def test_too_many_underscores_returns_none(self): self.assertIsNone(self.handler._parse_pane_id("terminal_5_extra")) def test_negative_int_parses(self): # Edge case: negative numbers self.assertEqual(self.handler._parse_pane_id("-1"), -1) class TestGetSubmitEnterDelaySec(unittest.TestCase): """Tests for _get_submit_enter_delay_sec edge cases.""" def setUp(self): self.handler = DummyControlHandler() def test_unset_env_returns_default(self): with patch.dict(os.environ, {}, clear=True): result = self.handler._get_submit_enter_delay_sec() self.assertEqual(result, 0.20) def test_empty_string_returns_default(self): with patch.dict(os.environ, {"AMC_SUBMIT_ENTER_DELAY_MS": ""}, clear=True): result = self.handler._get_submit_enter_delay_sec() self.assertEqual(result, 0.20) def test_whitespace_only_returns_default(self): with patch.dict(os.environ, {"AMC_SUBMIT_ENTER_DELAY_MS": " "}, clear=True): result = self.handler._get_submit_enter_delay_sec() self.assertEqual(result, 0.20) def test_negative_value_returns_zero(self): with patch.dict(os.environ, {"AMC_SUBMIT_ENTER_DELAY_MS": "-100"}, clear=True): result = self.handler._get_submit_enter_delay_sec() self.assertEqual(result, 0.0) def test_value_over_2000_clamped(self): with patch.dict(os.environ, {"AMC_SUBMIT_ENTER_DELAY_MS": "5000"}, clear=True): result = self.handler._get_submit_enter_delay_sec() self.assertEqual(result, 2.0) # 2000ms = 2.0s def test_valid_ms_converted_to_seconds(self): with patch.dict(os.environ, {"AMC_SUBMIT_ENTER_DELAY_MS": "500"}, clear=True): result = self.handler._get_submit_enter_delay_sec() self.assertEqual(result, 0.5) def test_float_value_works(self): with patch.dict(os.environ, {"AMC_SUBMIT_ENTER_DELAY_MS": "150.5"}, clear=True): result = self.handler._get_submit_enter_delay_sec() self.assertAlmostEqual(result, 0.1505) def test_non_numeric_returns_default(self): with patch.dict(os.environ, {"AMC_SUBMIT_ENTER_DELAY_MS": "fast"}, clear=True): result = self.handler._get_submit_enter_delay_sec() self.assertEqual(result, 0.20) class TestAllowUnsafeWriteCharsFallback(unittest.TestCase): """Tests for _allow_unsafe_write_chars_fallback edge cases.""" def setUp(self): self.handler = DummyControlHandler() def test_unset_returns_false(self): with patch.dict(os.environ, {}, clear=True): self.assertFalse(self.handler._allow_unsafe_write_chars_fallback()) def test_empty_returns_false(self): with patch.dict(os.environ, {"AMC_ALLOW_UNSAFE_WRITE_CHARS_FALLBACK": ""}, clear=True): self.assertFalse(self.handler._allow_unsafe_write_chars_fallback()) def test_one_returns_true(self): with patch.dict(os.environ, {"AMC_ALLOW_UNSAFE_WRITE_CHARS_FALLBACK": "1"}, clear=True): self.assertTrue(self.handler._allow_unsafe_write_chars_fallback()) def test_true_returns_true(self): with patch.dict(os.environ, {"AMC_ALLOW_UNSAFE_WRITE_CHARS_FALLBACK": "true"}, clear=True): self.assertTrue(self.handler._allow_unsafe_write_chars_fallback()) def test_yes_returns_true(self): with patch.dict(os.environ, {"AMC_ALLOW_UNSAFE_WRITE_CHARS_FALLBACK": "yes"}, clear=True): self.assertTrue(self.handler._allow_unsafe_write_chars_fallback()) def test_on_returns_true(self): with patch.dict(os.environ, {"AMC_ALLOW_UNSAFE_WRITE_CHARS_FALLBACK": "on"}, clear=True): self.assertTrue(self.handler._allow_unsafe_write_chars_fallback()) def test_case_insensitive(self): with patch.dict(os.environ, {"AMC_ALLOW_UNSAFE_WRITE_CHARS_FALLBACK": "TRUE"}, clear=True): self.assertTrue(self.handler._allow_unsafe_write_chars_fallback()) def test_random_string_returns_false(self): with patch.dict(os.environ, {"AMC_ALLOW_UNSAFE_WRITE_CHARS_FALLBACK": "maybe"}, clear=True): self.assertFalse(self.handler._allow_unsafe_write_chars_fallback()) class TestDismissSession(unittest.TestCase): """Tests for _dismiss_session edge cases.""" def test_deletes_existing_session_file(self): with tempfile.TemporaryDirectory() as tmpdir: sessions_dir = Path(tmpdir) sessions_dir.mkdir(exist_ok=True) session_file = sessions_dir / "abc123.json" session_file.write_text('{"session_id": "abc123"}') handler = DummyControlHandler() with patch.object(control, "SESSIONS_DIR", sessions_dir): handler._dismiss_session("abc123") self.assertFalse(session_file.exists()) self.assertEqual(handler.sent, [(200, {"ok": True})]) def test_handles_missing_file_gracefully(self): with tempfile.TemporaryDirectory() as tmpdir: sessions_dir = Path(tmpdir) handler = DummyControlHandler() with patch.object(control, "SESSIONS_DIR", sessions_dir): handler._dismiss_session("nonexistent") # Should still return success self.assertEqual(handler.sent, [(200, {"ok": True})]) def test_path_traversal_sanitized(self): with tempfile.TemporaryDirectory() as tmpdir: sessions_dir = Path(tmpdir) sessions_dir.mkdir(exist_ok=True) # Create a file that should NOT be deleted (unused - documents test intent) _secret_file = Path(tmpdir).parent / "secret.json" handler = DummyControlHandler() with patch.object(control, "SESSIONS_DIR", sessions_dir): handler._dismiss_session("../secret") # Secret file should not have been targeted # (if it existed, it would still exist) def test_tracks_dismissed_codex_session(self): from amc_server.agents import _dismissed_codex_ids _dismissed_codex_ids.clear() with tempfile.TemporaryDirectory() as tmpdir: sessions_dir = Path(tmpdir) handler = DummyControlHandler() with patch.object(control, "SESSIONS_DIR", sessions_dir): handler._dismiss_session("codex-session-123") self.assertIn("codex-session-123", _dismissed_codex_ids) _dismissed_codex_ids.clear() class TestTryWriteCharsInject(unittest.TestCase): """Tests for _try_write_chars_inject edge cases.""" def setUp(self): self.handler = DummyControlHandler() def test_successful_write_without_enter(self): completed = subprocess.CompletedProcess(args=[], returncode=0, stdout="", stderr="") with patch.object(control, "ZELLIJ_BIN", "/usr/bin/zellij"), \ patch("amc_server.mixins.control.subprocess.run", return_value=completed) as run_mock: result = self.handler._try_write_chars_inject({}, "infra", "hello", send_enter=False) self.assertEqual(result, {"ok": True}) # Should only be called once (no Enter) self.assertEqual(run_mock.call_count, 1) def test_successful_write_with_enter(self): completed = subprocess.CompletedProcess(args=[], returncode=0, stdout="", stderr="") with patch.object(control, "ZELLIJ_BIN", "/usr/bin/zellij"), \ patch("amc_server.mixins.control.subprocess.run", return_value=completed) as run_mock: result = self.handler._try_write_chars_inject({}, "infra", "hello", send_enter=True) self.assertEqual(result, {"ok": True}) # Should be called twice (write-chars + write Enter) self.assertEqual(run_mock.call_count, 2) def test_write_chars_failure_returns_error(self): failed = subprocess.CompletedProcess(args=[], returncode=1, stdout="", stderr="write failed") with patch.object(control, "ZELLIJ_BIN", "/usr/bin/zellij"), \ patch("amc_server.mixins.control.subprocess.run", return_value=failed): result = self.handler._try_write_chars_inject({}, "infra", "hello", send_enter=False) self.assertFalse(result["ok"]) self.assertIn("write", result["error"].lower()) def test_timeout_returns_error(self): with patch.object(control, "ZELLIJ_BIN", "/usr/bin/zellij"), \ patch("amc_server.mixins.control.subprocess.run", side_effect=subprocess.TimeoutExpired("cmd", 2)): result = self.handler._try_write_chars_inject({}, "infra", "hello", send_enter=False) self.assertFalse(result["ok"]) self.assertIn("timed out", result["error"].lower()) def test_zellij_not_found_returns_error(self): with patch.object(control, "ZELLIJ_BIN", "/nonexistent/zellij"), \ patch("amc_server.mixins.control.subprocess.run", side_effect=FileNotFoundError("No such file")): result = self.handler._try_write_chars_inject({}, "infra", "hello", send_enter=False) self.assertFalse(result["ok"]) self.assertIn("not found", result["error"].lower()) class TestRespondToSessionEdgeCases(unittest.TestCase): """Additional edge case tests for _respond_to_session.""" def _write_session(self, sessions_dir, session_id, **kwargs): sessions_dir.mkdir(parents=True, exist_ok=True) session_file = sessions_dir / f"{session_id}.json" data = {"session_id": session_id, **kwargs} session_file.write_text(json.dumps(data)) def test_invalid_json_body_returns_400(self): handler = DummyControlHandler.__new__(DummyControlHandler) handler.headers = {"Content-Length": "10"} handler.rfile = io.BytesIO(b"not json!!") handler.sent = [] handler.errors = [] with tempfile.TemporaryDirectory() as tmpdir: with patch.object(control, "SESSIONS_DIR", Path(tmpdir)): handler._respond_to_session("test") self.assertEqual(handler.errors, [(400, "Invalid JSON body")]) def test_non_dict_body_returns_400(self): raw = b'"just a string"' handler = DummyControlHandler.__new__(DummyControlHandler) handler.headers = {"Content-Length": str(len(raw))} handler.rfile = io.BytesIO(raw) handler.sent = [] handler.errors = [] with tempfile.TemporaryDirectory() as tmpdir: with patch.object(control, "SESSIONS_DIR", Path(tmpdir)): handler._respond_to_session("test") self.assertEqual(handler.errors, [(400, "Invalid JSON body")]) def test_empty_text_returns_400(self): handler = DummyControlHandler({"text": ""}) with tempfile.TemporaryDirectory() as tmpdir: sessions_dir = Path(tmpdir) self._write_session(sessions_dir, "test", zellij_session="s", zellij_pane="1") with patch.object(control, "SESSIONS_DIR", sessions_dir): handler._respond_to_session("test") self.assertEqual(handler.errors, [(400, "Missing or empty 'text' field")]) def test_whitespace_only_text_returns_400(self): handler = DummyControlHandler({"text": " \n\t "}) with tempfile.TemporaryDirectory() as tmpdir: sessions_dir = Path(tmpdir) self._write_session(sessions_dir, "test", zellij_session="s", zellij_pane="1") with patch.object(control, "SESSIONS_DIR", sessions_dir): handler._respond_to_session("test") self.assertEqual(handler.errors, [(400, "Missing or empty 'text' field")]) def test_non_string_text_returns_400(self): handler = DummyControlHandler({"text": 123}) with tempfile.TemporaryDirectory() as tmpdir: sessions_dir = Path(tmpdir) self._write_session(sessions_dir, "test", zellij_session="s", zellij_pane="1") with patch.object(control, "SESSIONS_DIR", sessions_dir): handler._respond_to_session("test") self.assertEqual(handler.errors, [(400, "Missing or empty 'text' field")]) def test_missing_zellij_session_returns_400(self): handler = DummyControlHandler({"text": "hello"}) with tempfile.TemporaryDirectory() as tmpdir: sessions_dir = Path(tmpdir) self._write_session(sessions_dir, "test", zellij_session="", zellij_pane="1") with patch.object(control, "SESSIONS_DIR", sessions_dir): handler._respond_to_session("test") self.assertIn("missing Zellij pane info", handler.errors[0][1]) def test_missing_zellij_pane_returns_400(self): handler = DummyControlHandler({"text": "hello"}) with tempfile.TemporaryDirectory() as tmpdir: sessions_dir = Path(tmpdir) self._write_session(sessions_dir, "test", zellij_session="sess", zellij_pane="") with patch.object(control, "SESSIONS_DIR", sessions_dir): handler._respond_to_session("test") self.assertIn("missing Zellij pane info", handler.errors[0][1]) def test_invalid_pane_format_returns_400(self): handler = DummyControlHandler({"text": "hello"}) with tempfile.TemporaryDirectory() as tmpdir: sessions_dir = Path(tmpdir) self._write_session(sessions_dir, "test", zellij_session="sess", zellij_pane="invalid_format_here") with patch.object(control, "SESSIONS_DIR", sessions_dir): handler._respond_to_session("test") self.assertIn("Invalid pane format", handler.errors[0][1]) def test_invalid_option_count_treated_as_zero(self): # optionCount that can't be parsed as int should default to 0 handler = DummyControlHandler({"text": "hello", "freeform": True, "optionCount": "not a number"}) with tempfile.TemporaryDirectory() as tmpdir: sessions_dir = Path(tmpdir) self._write_session(sessions_dir, "test", zellij_session="sess", zellij_pane="5") with patch.object(control, "SESSIONS_DIR", sessions_dir): handler._inject_text_then_enter = MagicMock(return_value={"ok": True}) handler._respond_to_session("test") # With optionCount=0, freeform mode shouldn't trigger the "other" selection # It should go straight to inject_text_then_enter handler._inject_text_then_enter.assert_called_once() if __name__ == "__main__": unittest.main()