from __future__ import annotations import json import threading import uuid from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler from typing import Any import logging from src import session_logger def _get_app_attr(app: Any, name: str, default: Any = None) -> Any: if hasattr(app, name): return getattr(app, name) if hasattr(app, 'controller') and hasattr(app.controller, name): return getattr(app.controller, name) return default def _has_app_attr(app: Any, name: str) -> bool: if hasattr(app, name): return True if hasattr(app, 'controller') and hasattr(app.controller, name): return True return False def _set_app_attr(app: Any, name: str, value: Any) -> None: if hasattr(app, name): setattr(app, name, value) elif hasattr(app, 'controller'): setattr(app.controller, name, value) else: setattr(app, name, value) class HookServerInstance(ThreadingHTTPServer): """Custom HTTPServer that carries a reference to the main App instance.""" def __init__(self, server_address: tuple[str, int], RequestHandlerClass: type, app: Any) -> None: super().__init__(server_address, RequestHandlerClass) self.app = app class HookHandler(BaseHTTPRequestHandler): """Handles incoming HTTP requests for the API hooks.""" def do_GET(self) -> None: app = self.server.app session_logger.log_api_hook("GET", self.path, "") if self.path == "/status": self.send_response(200) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(json.dumps({"status": "ok"}).encode("utf-8")) elif self.path == "/api/project": from src import project_manager self.send_response(200) self.send_header("Content-Type", "application/json") self.end_headers() flat = project_manager.flat_config(_get_app_attr(app, "project")) self.wfile.write(json.dumps({"project": flat}).encode("utf-8")) elif self.path == "/api/session": self.send_response(200) self.send_header("Content-Type", "application/json") self.end_headers() lock = _get_app_attr(app, "_disc_entries_lock") entries = _get_app_attr(app, "disc_entries", []) if lock: with lock: entries_snapshot = list(entries) else: entries_snapshot = list(entries) self.wfile.write(json.dumps({"session": {"entries": entries_snapshot}}).encode("utf-8")) elif self.path == "/api/performance": self.send_response(200) self.send_header("Content-Type", "application/json") self.end_headers() metrics = {} perf = _get_app_attr(app, "perf_monitor") if perf: metrics = perf.get_metrics() self.wfile.write(json.dumps({"performance": metrics}).encode("utf-8")) elif self.path == "/api/events": self.send_response(200) self.send_header("Content-Type", "application/json") self.end_headers() events = [] if _has_app_attr(app, "_api_event_queue"): lock = _get_app_attr(app, "_api_event_queue_lock") queue = _get_app_attr(app, "_api_event_queue") if lock: with lock: events = list(queue) queue.clear() else: events = list(queue) queue.clear() self.wfile.write(json.dumps({"events": events}).encode("utf-8")) elif self.path.startswith("/api/gui/value/"): field_tag = self.path.split("/")[-1] event = threading.Event() result = {"value": None} def get_val(): try: settable = _get_app_attr(app, "_settable_fields", {}) if field_tag in settable: attr = settable[field_tag] result["value"] = _get_app_attr(app, attr, None) finally: event.set() lock = _get_app_attr(app, "_pending_gui_tasks_lock") tasks = _get_app_attr(app, "_pending_gui_tasks") if lock and tasks is not None: with lock: tasks.append({"action": "custom_callback", "callback": get_val}) if event.wait(timeout=10): self.send_response(200) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(json.dumps(result).encode("utf-8")) else: self.send_response(504) self.end_headers() elif self.path == "/api/gui/mma_status": event = threading.Event() result = {} def get_mma(): try: result["mma_status"] = _get_app_attr(app, "mma_status", "idle") result["ai_status"] = _get_app_attr(app, "ai_status", "idle") result["active_tier"] = _get_app_attr(app, "active_tier", None) at = _get_app_attr(app, "active_track", None) result["active_track"] = at.id if hasattr(at, "id") else at result["active_tickets"] = _get_app_attr(app, "active_tickets", []) result["mma_step_mode"] = _get_app_attr(app, "mma_step_mode", False) result["pending_tool_approval"] = _get_app_attr(app, "_pending_ask_dialog", False) result["pending_script_approval"] = _get_app_attr(app, "_pending_dialog", None) is not None result["pending_mma_step_approval"] = _get_app_attr(app, "_pending_mma_approval", None) is not None result["pending_mma_spawn_approval"] = _get_app_attr(app, "_pending_mma_spawn", None) is not None result["pending_approval"] = result["pending_mma_step_approval"] or result["pending_tool_approval"] result["pending_spawn"] = result["pending_mma_spawn_approval"] result["tracks"] = _get_app_attr(app, "tracks", []) result["proposed_tracks"] = _get_app_attr(app, "proposed_tracks", []) result["mma_streams"] = _get_app_attr(app, "mma_streams", {}) result["mma_tier_usage"] = _get_app_attr(app, "mma_tier_usage", {}) finally: event.set() lock = _get_app_attr(app, "_pending_gui_tasks_lock") tasks = _get_app_attr(app, "_pending_gui_tasks") if lock and tasks is not None: with lock: tasks.append({"action": "custom_callback", "callback": get_mma}) if event.wait(timeout=10): self.send_response(200) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(json.dumps(result).encode("utf-8")) else: self.send_response(504) self.end_headers() elif self.path == "/api/gui/diagnostics": event = threading.Event() result = {} def check_all(): try: status = _get_app_attr(app, "ai_status", "idle") result["thinking"] = status in ["sending...", "running powershell..."] result["live"] = status in ["running powershell...", "fetching url...", "searching web...", "powershell done, awaiting AI..."] result["prior"] = _get_app_attr(app, "is_viewing_prior_session", False) finally: event.set() lock = _get_app_attr(app, "_pending_gui_tasks_lock") tasks = _get_app_attr(app, "_pending_gui_tasks") if lock and tasks is not None: with lock: tasks.append({"action": "custom_callback", "callback": check_all}) if event.wait(timeout=10): self.send_response(200) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(json.dumps(result).encode("utf-8")) else: self.send_response(504) self.end_headers() elif self.path == '/api/gui/state': event = threading.Event() result = {} def get_state(): try: gettable = _get_app_attr(app, "_gettable_fields", {}) for key, attr in gettable.items(): result[key] = _get_app_attr(app, attr, None) finally: event.set() lock = _get_app_attr(app, "_pending_gui_tasks_lock") tasks = _get_app_attr(app, "_pending_gui_tasks") if lock and tasks is not None: with lock: tasks.append({"action": "custom_callback", "callback": get_state}) if event.wait(timeout=10): self.send_response(200) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(json.dumps(result).encode("utf-8")) else: self.send_response(504) self.end_headers() else: self.send_response(404) self.end_headers() def do_POST(self) -> None: app = self.server.app content_length = int(self.headers.get("Content-Length", 0)) body = self.rfile.read(content_length) body_str = body.decode("utf-8") if body else "" session_logger.log_api_hook("POST", self.path, body_str) try: data = json.loads(body_str) if body_str else {} if self.path == "/api/project": project = _get_app_attr(app, "project") _set_app_attr(app, "project", data.get("project", project)) self.send_response(200) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(json.dumps({"status": "updated"}).encode("utf-8")) elif self.path.startswith("/api/confirm/"): action_id = self.path.split("/")[-1] approved = data.get("approved", False) resolve_func = _get_app_attr(app, "resolve_pending_action") if resolve_func: success = resolve_func(action_id, approved) if success: self.send_response(200) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(json.dumps({"status": "ok"}).encode("utf-8")) else: self.send_response(404) self.end_headers() else: self.send_response(500) self.end_headers() elif self.path == "/api/session": lock = _get_app_attr(app, "_disc_entries_lock") entries = _get_app_attr(app, "disc_entries") new_entries = data.get("session", {}).get("entries", entries) if lock: with lock: _set_app_attr(app, "disc_entries", new_entries) else: _set_app_attr(app, "disc_entries", new_entries) self.send_response(200) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(json.dumps({"status": "updated"}).encode("utf-8")) elif self.path == "/api/gui": lock = _get_app_attr(app, "_pending_gui_tasks_lock") tasks = _get_app_attr(app, "_pending_gui_tasks") if lock and tasks is not None: with lock: tasks.append(data) self.send_response(200) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8")) elif self.path == "/api/gui/value": field_tag = data.get("field") event = threading.Event() result = {"value": None} def get_val(): try: settable = _get_app_attr(app, "_settable_fields", {}) if field_tag in settable: attr = settable[field_tag] result["value"] = _get_app_attr(app, attr, None) finally: event.set() lock = _get_app_attr(app, "_pending_gui_tasks_lock") tasks = _get_app_attr(app, "_pending_gui_tasks") if lock and tasks is not None: with lock: tasks.append({"action": "custom_callback", "callback": get_val}) if event.wait(timeout=10): self.send_response(200) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(json.dumps(result).encode("utf-8")) else: self.send_response(504) self.end_headers() elif self.path == "/api/ask": request_id = str(uuid.uuid4()) event = threading.Event() pending_asks = _get_app_attr(app, "_pending_asks") if pending_asks is None: pending_asks = {} _set_app_attr(app, "_pending_asks", pending_asks) ask_responses = _get_app_attr(app, "_ask_responses") if ask_responses is None: ask_responses = {} _set_app_attr(app, "_ask_responses", ask_responses) pending_asks[request_id] = event event_queue_lock = _get_app_attr(app, "_api_event_queue_lock") event_queue = _get_app_attr(app, "_api_event_queue") if event_queue is not None: if event_queue_lock: with event_queue_lock: event_queue.append({"type": "ask_received", "request_id": request_id, "data": data}) else: event_queue.append({"type": "ask_received", "request_id": request_id, "data": data}) gui_tasks_lock = _get_app_attr(app, "_pending_gui_tasks_lock") gui_tasks = _get_app_attr(app, "_pending_gui_tasks") if gui_tasks is not None: if gui_tasks_lock: with gui_tasks_lock: gui_tasks.append({"type": "ask", "request_id": request_id, "data": data}) else: gui_tasks.append({"type": "ask", "request_id": request_id, "data": data}) if event.wait(timeout=60.0): response_data = ask_responses.get(request_id) if request_id in ask_responses: del ask_responses[request_id] self.send_response(200) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(json.dumps({"status": "ok", "response": response_data}).encode("utf-8")) else: if request_id in pending_asks: del pending_asks[request_id] self.send_response(504) self.end_headers() elif self.path == "/api/ask/respond": request_id = data.get("request_id") response_data = data.get("response") pending_asks = _get_app_attr(app, "_pending_asks") ask_responses = _get_app_attr(app, "_ask_responses") if request_id and pending_asks and request_id in pending_asks: ask_responses[request_id] = response_data event = pending_asks[request_id] event.set() del pending_asks[request_id] gui_tasks_lock = _get_app_attr(app, "_pending_gui_tasks_lock") gui_tasks = _get_app_attr(app, "_pending_gui_tasks") if gui_tasks is not None: if gui_tasks_lock: with gui_tasks_lock: gui_tasks.append({"action": "clear_ask", "request_id": request_id}) else: gui_tasks.append({"action": "clear_ask", "request_id": request_id}) self.send_response(200) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(json.dumps({"status": "ok"}).encode("utf-8")) else: self.send_response(404) self.end_headers() else: self.send_response(404) self.end_headers() except Exception as e: self.send_response(500) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(json.dumps({"error": str(e)}).encode("utf-8")) def log_message(self, format: str, *args: Any) -> None: logging.info("Hook API: " + format % args) class HookServer: def __init__(self, app: Any, port: int = 8999) -> None: self.app = app self.port = port self.server = None self.thread = None def start(self) -> None: if self.thread and self.thread.is_alive(): return is_gemini_cli = _get_app_attr(self.app, 'current_provider', '') == 'gemini_cli' if not _get_app_attr(self.app, 'test_hooks_enabled', False) and not is_gemini_cli: return if not _has_app_attr(self.app, '_pending_gui_tasks'): _set_app_attr(self.app, '_pending_gui_tasks', []) if not _has_app_attr(self.app, '_pending_gui_tasks_lock'): _set_app_attr(self.app, '_pending_gui_tasks_lock', threading.Lock()) if not _has_app_attr(self.app, '_pending_asks'): _set_app_attr(self.app, '_pending_asks', {}) if not _has_app_attr(self.app, '_ask_responses'): _set_app_attr(self.app, '_ask_responses', {}) if not _has_app_attr(self.app, '_api_event_queue'): _set_app_attr(self.app, '_api_event_queue', []) if not _has_app_attr(self.app, '_api_event_queue_lock'): _set_app_attr(self.app, '_api_event_queue_lock', threading.Lock()) self.server = HookServerInstance(('127.0.0.1', self.port), HookHandler, self.app) self.thread = threading.Thread(target=self.server.serve_forever, daemon=True) self.thread.start() logging.info(f"Hook server started on port {self.port}") def stop(self) -> None: if self.server: self.server.shutdown() self.server.server_close() if self.thread: self.thread.join() logging.info("Hook server stopped")