Add routing for spawn-related endpoints to HttpMixin: - GET /api/projects -> _handle_projects - GET /api/health -> _handle_health - POST /api/spawn -> _handle_spawn - POST /api/projects/refresh -> _handle_projects_refresh Update CORS preflight (AC-39) to include GET in allowed methods and Authorization in allowed headers. Closes bd-2al
174 lines
7.3 KiB
Python
174 lines
7.3 KiB
Python
import json
|
|
import urllib.parse
|
|
|
|
import amc_server.context as ctx
|
|
from amc_server.context import DASHBOARD_DIR
|
|
from amc_server.logging_utils import LOGGER
|
|
|
|
|
|
class HttpMixin:
|
|
def _send_bytes_response(self, code, content, content_type="application/json", extra_headers=None):
|
|
"""Send a generic byte response; ignore expected disconnect errors."""
|
|
try:
|
|
self.send_response(code)
|
|
self.send_header("Content-Type", content_type)
|
|
if extra_headers:
|
|
for key, value in extra_headers.items():
|
|
self.send_header(key, value)
|
|
self.send_header("Content-Length", str(len(content)))
|
|
self.end_headers()
|
|
self.wfile.write(content)
|
|
return True
|
|
except (BrokenPipeError, ConnectionResetError, OSError):
|
|
return False
|
|
|
|
def _send_json(self, code, payload):
|
|
"""Send JSON response with CORS header."""
|
|
content = json.dumps(payload).encode()
|
|
return self._send_bytes_response(
|
|
code,
|
|
content,
|
|
content_type="application/json",
|
|
extra_headers={"Access-Control-Allow-Origin": "*"},
|
|
)
|
|
|
|
def do_GET(self):
|
|
try:
|
|
if self.path == "/" or self.path == "/index.html":
|
|
self._serve_dashboard_file("index.html")
|
|
elif self.path.startswith("/") and not self.path.startswith("/api/"):
|
|
# Serve static files from dashboard directory
|
|
file_path = self.path.lstrip("/")
|
|
if file_path and ".." not in file_path:
|
|
self._serve_dashboard_file(file_path)
|
|
else:
|
|
self._json_error(404, "Not Found")
|
|
elif self.path == "/api/state":
|
|
self._serve_state()
|
|
elif self.path == "/api/stream":
|
|
self._serve_stream()
|
|
elif self.path.startswith("/api/events/"):
|
|
session_id = urllib.parse.unquote(self.path[len("/api/events/"):])
|
|
self._serve_events(session_id)
|
|
elif self.path.startswith("/api/conversation/"):
|
|
# Parse session_id and query params
|
|
path_part = self.path[len("/api/conversation/"):]
|
|
if "?" in path_part:
|
|
session_id, query = path_part.split("?", 1)
|
|
params = urllib.parse.parse_qs(query)
|
|
project_dir = params.get("project_dir", [""])[0]
|
|
agent = params.get("agent", ["claude"])[0]
|
|
else:
|
|
session_id = path_part
|
|
project_dir = ""
|
|
agent = "claude"
|
|
self._serve_conversation(urllib.parse.unquote(session_id), urllib.parse.unquote(project_dir), agent)
|
|
elif self.path == "/api/skills" or self.path.startswith("/api/skills?"):
|
|
# Parse agent from query params, default to claude
|
|
if "?" in self.path:
|
|
query = self.path.split("?", 1)[1]
|
|
params = urllib.parse.parse_qs(query)
|
|
agent = params.get("agent", ["claude"])[0]
|
|
else:
|
|
agent = "claude"
|
|
self._serve_skills(agent)
|
|
elif self.path == "/api/projects":
|
|
self._handle_projects()
|
|
elif self.path == "/api/health":
|
|
self._handle_health()
|
|
else:
|
|
self._json_error(404, "Not Found")
|
|
except Exception:
|
|
LOGGER.exception("Unhandled GET error for path=%s", self.path)
|
|
try:
|
|
self._json_error(500, "Internal Server Error")
|
|
except Exception:
|
|
pass
|
|
|
|
def do_POST(self):
|
|
try:
|
|
if self.path == "/api/dismiss-dead":
|
|
self._dismiss_dead_sessions()
|
|
elif self.path.startswith("/api/dismiss/"):
|
|
session_id = urllib.parse.unquote(self.path[len("/api/dismiss/"):])
|
|
self._dismiss_session(session_id)
|
|
elif self.path.startswith("/api/respond/"):
|
|
session_id = urllib.parse.unquote(self.path[len("/api/respond/"):])
|
|
self._respond_to_session(session_id)
|
|
elif self.path == "/api/spawn":
|
|
self._handle_spawn()
|
|
elif self.path == "/api/projects/refresh":
|
|
self._handle_projects_refresh()
|
|
else:
|
|
self._json_error(404, "Not Found")
|
|
except Exception:
|
|
LOGGER.exception("Unhandled POST error for path=%s", self.path)
|
|
try:
|
|
self._json_error(500, "Internal Server Error")
|
|
except Exception:
|
|
pass
|
|
|
|
def do_OPTIONS(self):
|
|
# CORS preflight for API endpoints (AC-39: wildcard CORS;
|
|
# localhost-only binding AC-24 is the real security boundary)
|
|
self.send_response(204)
|
|
self.send_header("Access-Control-Allow-Origin", "*")
|
|
self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
|
|
self.send_header("Access-Control-Allow-Headers", "Content-Type, Authorization")
|
|
self.end_headers()
|
|
|
|
def _serve_dashboard_file(self, file_path):
|
|
"""Serve a static file from the dashboard directory."""
|
|
# Content type mapping
|
|
content_types = {
|
|
".html": "text/html; charset=utf-8",
|
|
".css": "text/css; charset=utf-8",
|
|
".js": "application/javascript; charset=utf-8",
|
|
".json": "application/json; charset=utf-8",
|
|
".svg": "image/svg+xml",
|
|
".png": "image/png",
|
|
".ico": "image/x-icon",
|
|
}
|
|
|
|
try:
|
|
full_path = DASHBOARD_DIR / file_path
|
|
# Security: ensure path doesn't escape dashboard directory
|
|
full_path = full_path.resolve()
|
|
resolved_dashboard = DASHBOARD_DIR.resolve()
|
|
try:
|
|
# Use relative_to for robust path containment check
|
|
# (avoids startswith prefix-match bugs like "/dashboard" vs "/dashboardEVIL")
|
|
full_path.relative_to(resolved_dashboard)
|
|
except ValueError:
|
|
self._json_error(403, "Forbidden")
|
|
return
|
|
|
|
content = full_path.read_bytes()
|
|
ext = full_path.suffix.lower()
|
|
content_type = content_types.get(ext, "application/octet-stream")
|
|
|
|
# Inject auth token into index.html for spawn endpoint security
|
|
if file_path == "index.html" and ctx._auth_token:
|
|
content = content.replace(
|
|
b"<!-- AMC_AUTH_TOKEN -->",
|
|
f'<script>window.AMC_AUTH_TOKEN = "{ctx._auth_token}";</script>'.encode(),
|
|
)
|
|
|
|
# No caching during development
|
|
self._send_bytes_response(
|
|
200,
|
|
content,
|
|
content_type=content_type,
|
|
extra_headers={"Cache-Control": "no-cache, no-store, must-revalidate"},
|
|
)
|
|
except FileNotFoundError:
|
|
self._json_error(404, f"File not found: {file_path}")
|
|
|
|
def _json_error(self, code, message):
|
|
"""Send a JSON error response."""
|
|
self._send_json(code, {"ok": False, "error": message})
|
|
|
|
def log_message(self, format, *args):
|
|
"""Suppress default request logging to keep output clean."""
|
|
pass
|