Private
Public Access
0
0
Files
manual_slop/src/api_hooks.py
T
ed 7aeada953e refactor(src): Phase 12.6.1 - migrate api_hooks.py silent-fallback sites to Result[T]
Migrated 16 sites in src/api_hooks.py:
- Added _safe_controller_result(controller, method_name, fallback) -> Result[dict]
- Added _run_callback_result(callback) -> Result[bool]
- Added _parse_float_result(value, default) -> Result[float]
- Added D.2b WebSocket error response drain point heuristic

Site migrations:
- L294 (check_all warmup_status): _safe_controller_result
- L387/404/410/428/442 (warmup_status/wait_for_warmup/warmup_canaries/startup_timeline):
  _safe_controller_result
- L430 (parse_timeout query param): _parse_float_result
- L575 (trigger_patch): _run_callback_result (extracted _do body)
- L606 (apply_patch): _run_callback_result
- L634 (reject_patch): _run_callback_result
- L744 (kill_worker): _run_callback_result
- L807 (mutate_dag): _run_callback_result
- L824 (approve_ticket): _run_callback_result
- L915 (json.JSONDecodeError in _handler): send error to client (drain point)
- L926 (ConnectionClosed in _handler): Result conversion in body

Removed 8 sys.stderr.write('[DEBUG] ...') diagnostic noise lines from the
callback bodies (AGENTS.md 'No Diagnostic Noise in Production' rule).

Audit post-fix: 0 violations, 0 UNCLEAR in src/api_hooks.py.

Heuristic D.2b added: websocket.send / .send() is INTERNAL_COMPLIANT
(drain point) when the except body calls it. Extension of drain point
recognition for WebSocket-based protocols.

Audit tests: 24 passed + 2 xfailed (Phase 11's #22/#23 laundering heuristics).
2026-06-18 10:04:09 -04:00

984 lines
45 KiB
Python

from __future__ import annotations
import asyncio
import json
import logging
import sys
import threading
import uuid
# TODO(Ed): Eliminate these?
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
from typing import Any
from src.module_loader import _require_warmed
from src.result_types import ErrorInfo, ErrorKind, Result
"""
API Hooks - REST API for external automation and state inspection.
This module implements the HookServer, which exposes internal application state to external HTTP requests on port 8999 using Python's
ThreadingHTTPServer. All endpoints are thread-safe and reads that pass through lock-guarded lists,
while stateful reads use the GUI thread trampoline pattern.
Architecture:
- HookServer: ThreadingHTTPServer with app reference
- HookHandler: BaseHTTPRequestHandler per request
- Request handling uses trampoline pattern for GUI state reads
- GUI Thread Trampoline: Create threading.Event + result dict
- Push callback to `_pending_gui_tasks`
- Wait for event (timeout)
- Return result as JSON
Thread Safety:
- All reads use lock-protected lists
- All state mutations happen on the GUI thread
- The module does to maintain separation between App and AppController
Configuration:
- `--enable-test-hooks`: Required for Hook API to be available
- `gemini_cli` provider: Hook API is automatically available for synchronous HITL
See Also:
- docs/guide_tools.md for full API reference
- api_hook_client.py for the client implementation
"""
def _get_app_attr(app: Any, name: str, default: Any = None) -> Any:
"""Retrieves an attribute from the App or its Controller."""
if hasattr(app, name):
val = getattr(app, name)
return val
if hasattr(app, 'controller') and hasattr(app.controller, name):
val = getattr(app.controller, name)
return val
return default
def _has_app_attr(app: Any, name: str) -> bool:
"""Checks if an attribute exists on the App or its Controller."""
if hasattr(app, name): return True
if hasattr(app, 'controller') and hasattr(app.controller, name): return True
return False
def _set_app_attr(app: Any, name: str, value: Any) -> None:
"""Sets an attribute on the App or its Controller."""
if hasattr(app, name):
setattr(app, name, value)
elif hasattr(app, 'controller'):
setattr(app.controller, name, value)
else:
setattr(app, name, value)
def _safe_controller_result(controller: Any, method_name: str, fallback: dict) -> Result[dict]:
"""Safely call controller.<method_name>(); return Result[dict] with fallback on error.
Per error_handling.md: log/silent-fallback sites must propagate Result[T] to a true
drain point. This helper internally does the try/except and returns Result[dict]
(matching Heuristic A: Result-returning recovery = INTERNAL_COMPLIANT). The HTTP
response (the drain point) terminates the propagation.
[C: src/api_hooks.py:HookHandler.do_GET, src/api_hooks.py:HookHandler.do_POST]
"""
if controller is None or not hasattr(controller, method_name):
return Result(data=fallback, errors=[ErrorInfo(kind=ErrorKind.NOT_READY, message=f"controller missing or has no {method_name}", source=f"api_hooks._safe_controller_result.{method_name}")])
try:
data = getattr(controller, method_name)()
return Result(data=data if data is not None else fallback)
except Exception as e:
return Result(data=fallback, errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source=f"api_hooks._safe_controller_result.{method_name}", original=e)])
def _parse_float_result(value: Any, default: float) -> Result[float]:
"""Parse a string to float; return Result[float] with default on TypeError/ValueError.
Per error_handling.md: narrow-except fallback sites must propagate Result[T]. This
helper does the parse + try/except + Result conversion internally (Heuristic A).
[C: src/api_hooks.py:HookHandler.do_GET]
"""
try:
return Result(data=float(value))
except (TypeError, ValueError) as e:
return Result(data=default, errors=[ErrorInfo(kind=ErrorKind.INVALID_INPUT, message=f"invalid float: {value!r}: {e}", source="api_hooks._parse_float_result", original=e)])
def _run_callback_result(callback) -> Result[bool]:
"""Execute a GUI trampoline callback; return Result[bool] (True on success).
Per error_handling.md: log/silent-fallback sites must propagate Result[T] to a true
drain point. This helper internally does the try/except and returns Result[bool]
(matching Heuristic A). The drain point is the HTTP response (self.send_response).
[C: src/api_hooks.py:HookHandler.do_POST, src/api_hooks.py:HookHandler.do_GET]
"""
try:
callback()
return Result(data=True)
except Exception as e:
return Result(data=False, errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="api_hooks._run_callback_result", original=e)])
class HookServerInstance(ThreadingHTTPServer):
allow_reuse_address = True
"""Custom HTTPServer that carries a reference to the main App instance."""
def __init__(self, server_address: tuple[str, int], RequestHandlerClass: type, app: Any) -> None:
"""
Initializes the server instance with an app reference.
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
"""
super().__init__(server_address, RequestHandlerClass)
self.app = app
def _serialize_for_api(obj: Any) -> Any:
"""Serializes complex objects into API-friendly formats (dicts/lists)."""
if hasattr(obj, "to_dict"):
return obj.to_dict()
if isinstance(obj, list):
return [_serialize_for_api(x) for x in obj]
if isinstance(obj, dict):
return {k: _serialize_for_api(v) for k, v in obj.items()}
from pathlib import PurePath
if isinstance(obj, PurePath):
return str(obj)
return obj
class HookHandler(BaseHTTPRequestHandler):
"""Handles incoming HTTP requests for the API hooks."""
def do_GET(self) -> None:
"""Handles GET requests by routing to the appropriate state provider."""
try:
app = self.server.app
print(f'[HOOKS] GET {self.path}')
_require_warmed("src.session_logger").log_api_hook("GET", self.path, "")
if self.path == "/status":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "ok"}).encode("utf-8"))
elif self.path == "/api/project":
from src import project_manager
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
flat = project_manager.flat_config(_get_app_attr(app, "project"))
self.wfile.write(json.dumps({"project": flat}).encode("utf-8"))
elif self.path == "/api/project_switch_status":
# Determinstic signal for tests waiting on a project switch to complete.
# Polling /api/project returns derived state that may be stale from prior
# tests; this endpoint tracks in_progress/error explicitly.
# in_progress: True while _do_project_switch is scheduled or running
# path: target path (or last completed path)
# error: string error if last switch failed; None on success/idle
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
controller = _get_app_attr(app, "controller", None)
if controller is None:
payload = {"in_progress": False, "path": None, "error": None}
else:
payload = {
"in_progress": bool(getattr(controller, "_project_switch_in_progress", False)),
"path": getattr(controller, "_project_switch_pending_path", None) or getattr(controller, "active_project_path", None),
"error": getattr(controller, "_project_switch_error", None),
}
self.wfile.write(json.dumps(payload).encode("utf-8"))
elif self.path == "/api/io_pool_status":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
controller = _get_app_attr(app, "controller", None)
if controller is None:
payload = {"idle": True, "inflight": 0}
else:
inflight = getattr(controller, "_io_pool_inflight", 0)
payload = {"idle": inflight == 0, "inflight": inflight}
self.wfile.write(json.dumps(payload).encode("utf-8"))
elif self.path == "/api/gui_health":
# Surfaces the controller's GUI health state so tests can detect a
# degraded GUI (e.g. after an ImGui IM_ASSERT) and fail fast with a
# clear message. Per user feedback 2026-06-08, the error is logged
# and surfaced here, NOT silently swallowed.
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
controller = _get_app_attr(app, "controller", None)
if controller is None:
payload = {
"healthy": True,
"degraded_reason": None,
"last_assert": None,
"io_pool_alive": True,
}
else:
degraded = getattr(controller, "_gui_degraded_reason", None)
payload = {
"healthy": degraded is None,
"degraded_reason": degraded,
"last_assert": getattr(controller, "_last_imgui_assert", None),
"io_pool_alive": True,
}
self.wfile.write(json.dumps(payload).encode("utf-8"))
elif self.path == "/api/session":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
lock = _get_app_attr(app, "_disc_entries_lock")
entries = _get_app_attr(app, "disc_entries", [])
if lock:
with lock: entries_snapshot = list(entries)
else:
entries_snapshot = list(entries)
self.wfile.write(json.dumps({"session": {"entries": entries_snapshot}}).encode("utf-8"))
elif self.path == "/api/performance":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
metrics = {}
perf = _get_app_attr(app, "perf_monitor")
if perf: metrics = perf.get_metrics()
self.wfile.write(json.dumps({"performance": metrics}).encode("utf-8"))
elif self.path == "/api/events":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
events = []
if _has_app_attr(app, "_api_event_queue"):
lock = _get_app_attr(app, "_api_event_queue_lock")
queue = _get_app_attr(app, "_api_event_queue")
if lock:
with lock:
events = list(queue)
queue.clear()
else:
events = list(queue)
queue.clear()
self.wfile.write(json.dumps({"events": events}).encode("utf-8"))
elif self.path.startswith("/api/gui/value/"):
field_tag = self.path.split("/")[-1]
event = threading.Event()
result = {"value": None}
def get_val():
try:
settable = _get_app_attr(app, "_settable_fields", {})
gettable = _get_app_attr(app, "_gettable_fields", {})
combined = {**settable, **gettable}
if field_tag in combined:
attr = combined[field_tag]
val = _get_app_attr(app, attr, None)
res_val = _serialize_for_api(val)
sys.stderr.write(f"[DEBUG] get_val: attr={attr}, val_type={type(val).__name__}, res_val={res_val}\n")
sys.stderr.flush()
result["value"] = res_val
else:
sys.stderr.write(f"Hook API: field {field_tag} not found in settable or gettable\n")
sys.stderr.flush()
finally: event.set()
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": get_val})
if event.wait(timeout=10):
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(result).encode("utf-8"))
else:
self.send_response(504)
self.end_headers()
elif self.path == "/api/gui/mma_status":
event = threading.Event()
result = {}
def get_mma():
try:
result["mma_status"] = _get_app_attr(app, "mma_status", "idle")
result["ai_status"] = _get_app_attr(app, "ai_status", "idle")
result["active_tier"] = _get_app_attr(app, "active_tier", None)
at = _get_app_attr(app, "active_track", None)
result["active_track"] = at.id if hasattr(at, "id") else at
result["active_tickets"] = _get_app_attr(app, "active_tickets", [])
result["mma_step_mode"] = _get_app_attr(app, "mma_step_mode", False)
result["pending_tool_approval"] = _get_app_attr(app, "_pending_ask_dialog", False)
result["pending_script_approval"] = _get_app_attr(app, "_pending_dialog", None) is not None
result["pending_mma_step_approval"] = _get_app_attr(app, "_pending_mma_approval", None) is not None
result["pending_mma_spawn_approval"] = _get_app_attr(app, "_pending_mma_spawn", None) is not None
result["pending_approval"] = result["pending_mma_step_approval"] or result["pending_tool_approval"]
result["pending_spawn"] = result["pending_mma_spawn_approval"]
result["tracks"] = _get_app_attr(app, "tracks", [])
result["proposed_tracks"] = _get_app_attr(app, "proposed_tracks", [])
result["mma_streams"] = _get_app_attr(app, "mma_streams", {})
result["tier_usage"] = _get_app_attr(app, "mma_tier_usage", {})
finally: event.set()
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": get_mma})
if event.wait(timeout=10):
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(result).encode("utf-8"))
else:
self.send_response(504)
self.end_headers()
elif self.path == "/api/gui/diagnostics":
event = threading.Event()
result = {}
def check_all():
try:
status = _get_app_attr(app, "ai_status", "idle")
result["thinking"] = status in ["sending...", "running powershell..."]
result["live"] = status in ["running powershell...", "fetching url...", "searching web...", "powershell done, awaiting AI..."]
result["prior"] = _get_app_attr(app, "is_viewing_prior_session", False)
perf = _get_app_attr(app, "perf_monitor")
if perf:
result.update(perf.get_metrics())
# Warmup status (startup_speedup_20260606 Phase 7). Exposes the
# AppController's warmup_status() result so external clients and
# tests can poll until all heavy modules are loaded.
controller = _get_app_attr(app, "controller", None)
result["warmup"] = _safe_controller_result(controller, "warmup_status", {"pending": [], "completed": [], "failed": []}).data
finally: event.set()
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": check_all})
if event.wait(timeout=10):
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(result).encode("utf-8"))
else:
self.send_response(504)
self.end_headers()
elif self.path == '/api/gui/state':
event = threading.Event()
result = {}
def get_state():
try:
gettable = _get_app_attr(app, "_gettable_fields", {})
for key, attr in gettable.items():
val = _get_app_attr(app, attr, None)
result[key] = _serialize_for_api(val)
result['show_text_viewer'] = app.show_windows.get('Text Viewer', False)
result['text_viewer_title'] = _get_app_attr(app, 'text_viewer_title', '')
result['text_viewer_type'] = _get_app_attr(app, 'text_viewer_type', 'markdown')
finally: event.set()
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": get_state})
if event.wait(timeout=10):
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(result).encode("utf-8"))
else:
self.send_response(504)
self.end_headers()
elif self.path == "/api/mma/workers":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
mma_streams = _get_app_attr(app, "mma_streams", {})
self.wfile.write(json.dumps({"workers": _serialize_for_api(mma_streams)}).encode("utf-8"))
elif self.path == "/api/context/state":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
files = _get_app_attr(app, "files", [])
screenshots = _get_app_attr(app, "screenshots", [])
self.wfile.write(json.dumps({"files": _serialize_for_api(files), "screenshots": _serialize_for_api(screenshots)}).encode("utf-8"))
elif self.path == "/api/v1/context":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
from src.app_controller import _api_get_context
ctx_data = _api_get_context(app.controller)
self.wfile.write(json.dumps(_serialize_for_api(ctx_data)).encode("utf-8"))
elif self.path == "/api/metrics/financial":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
usage = _get_app_attr(app, "mma_tier_usage", {})
metrics = {}
for tier, data in usage.items():
model = data.get("model", "")
in_t = data.get("input", 0)
out_t = data.get("output", 0)
cost = _require_warmed("src.cost_tracker").estimate_cost(model, in_t, out_t)
metrics[tier] = {**data, "estimated_cost": cost}
self.wfile.write(json.dumps({"financial": metrics}).encode("utf-8"))
elif self.path == "/api/system/telemetry":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
threads = [t.name for t in threading.enumerate()]
queue_size = 0
if _has_app_attr(app, "_api_event_queue"):
queue = _get_app_attr(app, "_api_event_queue")
if queue: queue_size = len(queue)
self.wfile.write(json.dumps({"threads": threads, "event_queue_size": queue_size}).encode("utf-8"))
elif self.path == "/api/warmup_status" or self.path.startswith("/api/warmup_status?"):
# Cheap snapshot of the AppController's warmup progress.
# Thread-safe: WarmupManager.status() returns a lock-guarded copy.
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
controller = _get_app_attr(app, "controller", None)
payload = _safe_controller_result(controller, "warmup_status", {"pending": [], "completed": [], "failed": []}).data
self.wfile.write(json.dumps(payload).encode("utf-8"))
elif self.path == "/api/warmup_wait" or self.path.startswith("/api/warmup_wait?"):
# Blocks the request thread (safe under ThreadingHTTPServer) up
# to `timeout` seconds waiting for warmup to complete, then
# returns the final status. Default timeout: 30s. Useful for
# external clients (scripts, other tools) that need to know when
# the system is fully ready before issuing AI requests.
timeout = 30.0
if "?" in self.path:
from urllib.parse import parse_qs, urlparse
qs = parse_qs(urlparse(self.path).query)
if "timeout" in qs:
timeout = _parse_float_result(qs["timeout"][0], default=30.0).data
controller = _get_app_attr(app, "controller", None)
if controller and hasattr(controller, "wait_for_warmup"):
controller.wait_for_warmup(timeout=timeout)
payload = _safe_controller_result(controller, "warmup_status", {"pending": [], "completed": [], "failed": []}).data
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(payload).encode("utf-8"))
elif self.path == "/api/warmup_canaries" or self.path.startswith("/api/warmup_canaries?"):
# Per-module import canary records (startup_speedup_20260606 sub-track 4+).
# Each record carries canary_id, module, thread_name, thread_id,
# submit_ts, start_ts, end_ts, elapsed_ms, status, error.
# Cheap (lock-guarded copy on the WarmupManager). Direct call,
# no GUI trampoline (the WarmupManager is already thread-safe).
controller = _get_app_attr(app, "controller", None)
payload = {"canaries": (_safe_controller_result(controller, "warmup_canaries", []).data or [])}
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(payload).encode("utf-8"))
elif self.path == "/api/startup_timeline" or self.path.startswith("/api/startup_timeline?"):
# Startup timeline: init/warmup/first-frame timestamps + precomputed deltas.
controller = _get_app_attr(app, "controller", None)
empty = {"init_start_ts": None, "warmup_done_ts": None, "first_frame_ts": None, "warmup_ms": None, "first_frame_after_init_ms": None, "first_frame_after_warmup_ms": None}
payload = _safe_controller_result(controller, "startup_timeline", empty).data
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(payload).encode("utf-8"))
else:
self.send_response(404)
self.end_headers()
except (OSError, ValueError) as e:
self.send_response(500)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"error": str(e)}).encode("utf-8"))
def do_POST(self) -> None:
try:
app = self.server.app
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length)
body_str = body.decode("utf-8") if body else ""
_require_warmed("src.session_logger").log_api_hook("POST", self.path, body_str)
data = json.loads(body_str) if body_str else {}
print(f'[HOOKS] POST {self.path} data length: {len(data)}')
if self.path == "/api/project":
project = _get_app_attr(app, "project")
_set_app_attr(app, "project", data.get("project", project))
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "updated"}).encode("utf-8"))
elif self.path.startswith("/api/confirm/"):
action_id = self.path.split("/")[-1]
approved = data.get("approved", False)
resolve_func = _get_app_attr(app, "resolve_pending_action")
if resolve_func:
success = resolve_func(action_id, approved)
if success:
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "ok"}).encode("utf-8"))
else:
self.send_response(404)
self.end_headers()
else:
self.send_response(500)
self.end_headers()
elif self.path == "/api/session":
lock = _get_app_attr(app, "_disc_entries_lock")
entries = _get_app_attr(app, "disc_entries")
new_entries = data.get("session", {}).get("entries", entries)
if lock:
with lock: _set_app_attr(app, "disc_entries", new_entries)
else:
_set_app_attr(app, "disc_entries", new_entries)
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "updated"}).encode("utf-8"))
elif self.path == "/api/gui":
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append(data)
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
elif self.path == "/api/gui/value":
field_tag = data.get("field")
event = threading.Event()
result = {"value": None}
def get_val():
try:
settable = _get_app_attr(app, "_settable_fields", {})
if field_tag in settable:
attr = settable[field_tag]
result["value"] = _serialize_for_api(_get_app_attr(app, attr, None))
elif "[" in field_tag and field_tag.endswith("]"):
dict_name, _, key_part = field_tag.partition("[")
key = key_part[:-1].strip().strip("'\"")
if dict_name in settable:
attr = settable[dict_name]
current = _get_app_attr(app, attr, None)
if isinstance(current, dict) and key in current:
result["value"] = _serialize_for_api(current[key])
finally: event.set()
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": get_val})
if event.wait(timeout=10):
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(result).encode("utf-8"))
else:
self.send_response(504)
self.end_headers()
elif self.path == "/api/patch/trigger":
sys.stderr.write(f"[DEBUG] /api/patch/trigger called with data: {data}\n")
sys.stderr.flush()
patch_text = data.get("patch_text", "")
file_paths = data.get("file_paths", [])
sys.stderr.write(f"[DEBUG] patch_text length: {len(patch_text)}, files: {file_paths}\n")
sys.stderr.flush()
event = threading.Event()
result = {"status": "queued"}
def trigger_patch():
def _do():
app._pending_patch_text = patch_text
app._pending_patch_files = file_paths
app._show_patch_modal = True
r = _run_callback_result(_do)
if r.ok:
result["status"] = "ok"
else:
result["status"] = "error"
result["error"] = r.errors[0].message if r.errors else "unknown"
event.set()
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": trigger_patch})
if event.wait(timeout=10):
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(result).encode("utf-8"))
else:
self.send_response(504)
self.end_headers()
elif self.path == "/api/patch/apply":
event = threading.Event()
result = {"status": "done"}
def apply_patch():
"""
[C: tests/test_patch_modal.py:test_apply_callback]
"""
def _do():
if hasattr(app, "_apply_pending_patch"):
app._apply_pending_patch()
else:
result["status"] = "no_method"
r = _run_callback_result(_do)
if not r.ok:
result["status"] = "error"
result["error"] = r.errors[0].message if r.errors else "unknown"
event.set()
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": apply_patch})
if event.wait(timeout=10):
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(result).encode("utf-8"))
else:
self.send_response(504)
self.end_headers()
elif self.path == "/api/patch/reject":
event = threading.Event()
result = {"status": "done"}
def reject_patch():
"""
[C: tests/test_patch_modal.py:test_reject_callback, tests/test_patch_modal.py:test_reject_patch]
"""
def _do():
app._show_patch_modal = False
app._pending_patch_text = None
app._pending_patch_files = []
r = _run_callback_result(_do)
if not r.ok:
result["status"] = "error"
result["error"] = r.errors[0].message if r.errors else "unknown"
event.set()
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": reject_patch})
if event.wait(timeout=10):
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(result).encode("utf-8"))
else:
self.send_response(504)
self.end_headers()
elif self.path == "/api/patch/status":
sys.stderr.write(f"[DEBUG] /api/patch/status called\n")
sys.stderr.flush()
show_modal = _get_app_attr(app, "_show_patch_modal", False)
patch_text = _get_app_attr(app, "_pending_patch_text", None)
patch_files = _get_app_attr(app, "_pending_patch_files", [])
sys.stderr.write(f"[DEBUG] patch status: show_modal={show_modal}, patch_text={patch_text is not None}, files={patch_files}\n")
sys.stderr.flush()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({
"show_modal": show_modal,
"patch_text": patch_text,
"file_paths": patch_files
}).encode("utf-8"))
elif self.path == "/api/ask":
request_id = str(uuid.uuid4())
event = threading.Event()
pending_asks = _get_app_attr(app, "_pending_asks")
if pending_asks is None:
pending_asks = {}
_set_app_attr(app, "_pending_asks", pending_asks)
ask_responses = _get_app_attr(app, "_ask_responses")
if ask_responses is None:
ask_responses = {}
_set_app_attr(app, "_ask_responses", ask_responses)
pending_asks[request_id] = event
event_queue_lock = _get_app_attr(app, "_api_event_queue_lock")
event_queue = _get_app_attr(app, "_api_event_queue")
if event_queue is not None:
if event_queue_lock:
with event_queue_lock: event_queue.append({"type": "ask_received", "request_id": request_id, "data": data})
else:
event_queue.append({"type": "ask_received", "request_id": request_id, "data": data})
gui_tasks_lock = _get_app_attr(app, "_pending_gui_tasks_lock")
gui_tasks = _get_app_attr(app, "_pending_gui_tasks")
if gui_tasks is not None:
if gui_tasks_lock:
with gui_tasks_lock: gui_tasks.append({"type": "ask", "request_id": request_id, "data": data})
else:
gui_tasks.append({"type": "ask", "request_id": request_id, "data": data})
if event.wait(timeout=60.0):
response_data = ask_responses.get(request_id)
if request_id in ask_responses: del ask_responses[request_id]
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "ok", "response": response_data}).encode("utf-8"))
else:
if request_id in pending_asks: del pending_asks[request_id]
self.send_response(504)
self.end_headers()
elif self.path == "/api/ask/respond":
request_id = data.get("request_id")
response_data = data.get("response")
pending_asks = _get_app_attr(app, "_pending_asks")
ask_responses = _get_app_attr(app, "_ask_responses")
if request_id and pending_asks and request_id in pending_asks:
ask_responses[request_id] = response_data
event = pending_asks[request_id]
event.set()
del pending_asks[request_id]
gui_tasks_lock = _get_app_attr(app, "_pending_gui_tasks_lock")
gui_tasks = _get_app_attr(app, "_pending_gui_tasks")
if gui_tasks is not None:
if gui_tasks_lock:
with gui_tasks_lock: gui_tasks.append({"action": "clear_ask", "request_id": request_id})
else:
gui_tasks.append({"action": "clear_ask", "request_id": request_id})
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "ok"}).encode("utf-8"))
else:
self.send_response(404)
self.end_headers()
elif self.path == "/api/mma/workers/spawn":
def spawn_worker():
def _do():
func = _get_app_attr(app, "_spawn_worker")
if func: func(data)
_run_callback_result(_do)
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": spawn_worker})
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
elif self.path == "/api/mma/workers/kill":
def kill_worker():
"""
[C: src/app_controller.py:AppController.kill_worker, src/gui_2.py:App._cb_kill_ticket, tests/test_conductor_engine_abort.py:test_kill_worker_sets_abort_and_joins_thread]
"""
def _do():
worker_id = data.get("worker_id")
func = _get_app_attr(app, "_kill_worker")
if func: func(worker_id)
_run_callback_result(_do)
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": kill_worker})
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
elif self.path == "/api/mma/pipeline/pause":
def pause_pipeline():
_set_app_attr(app, "mma_step_mode", True)
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": pause_pipeline})
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
elif self.path == "/api/mma/pipeline/resume":
def resume_pipeline():
_set_app_attr(app, "mma_step_mode", False)
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": resume_pipeline})
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
elif self.path == "/api/context/inject":
def inject_context():
"""
[C: tests/test_headless_simulation.py:test_mma_track_lifecycle_simulation]
"""
files = _get_app_attr(app, "files")
if isinstance(files, list):
files.extend(data.get("files", []))
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": inject_context})
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
elif self.path == "/api/mma/dag/mutate":
def mutate_dag():
def _do():
func = _get_app_attr(app, "mutate_dag")
if func: func(data)
_run_callback_result(_do)
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": mutate_dag})
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
elif self.path == "/api/mma/ticket/approve":
ticket_id = data.get("ticket_id")
def approve_ticket():
def _do():
func = _get_app_attr(app, "approve_ticket")
if func: func(ticket_id)
_run_callback_result(_do)
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": approve_ticket})
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
else:
self.send_response(404)
self.end_headers()
except (OSError, ValueError) as e:
import traceback
traceback.print_exc(file=sys.stderr)
self.send_response(500)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"error": str(e)}).encode("utf-8"))
def log_message(self, format: str, *args: Any) -> None:
logging.info("Hook API: " + format % args)
class HookServer:
def __init__(self, app: Any, port: int = 8999) -> None:
"""
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
"""
self.app = app
self.port = port
self.server = None
self.thread = None
self.websocket_server: WebSocketServer | None = None
def start(self) -> None:
"""
[C: src/app_controller.py:AppController._cb_accept_tracks, src/app_controller.py:AppController._cb_plan_epic, src/app_controller.py:AppController._cb_start_track, src/app_controller.py:AppController._fetch_models, src/app_controller.py:AppController._handle_approve_ask, src/app_controller.py:AppController._handle_generate_send, src/app_controller.py:AppController._handle_md_only, src/app_controller.py:AppController._handle_reject_ask, src/app_controller.py:AppController._init_ai_and_hooks, src/app_controller.py:AppController._process_event_queue, src/app_controller.py:AppController._prune_old_logs, src/app_controller.py:AppController._rebuild_rag_index, src/app_controller.py:AppController._run_event_loop, src/app_controller.py:AppController._start_track_logic, src/app_controller.py:AppController.cb_prune_logs, src/app_controller.py:AppController.init_state, src/app_controller.py:AppController.start_services, src/gui_2.py:App._render_discussion_entry_read_mode, src/gui_2.py:App._update_context_file_stats, src/mcp_client.py:ExternalMCPManager.add_server, src/multi_agent_conductor.py:WorkerPool.spawn, src/performance_monitor.py:PerformanceMonitor.__init__, tests/test_ai_client_concurrency.py:test_ai_client_tier_isolation, tests/test_conductor_engine_abort.py:test_kill_worker_sets_abort_and_joins_thread, tests/test_conductor_engine_v2.py:side_effect, tests/test_spawn_interception_v2.py:test_confirm_spawn_pushed_to_queue, tests/test_websocket_server.py:test_websocket_subscription_and_broadcast]
"""
if self.thread and self.thread.is_alive():
return
is_gemini_cli = _get_app_attr(self.app, 'current_provider', '') == 'gemini_cli'
if not _get_app_attr(self.app, 'test_hooks_enabled', False) and not is_gemini_cli:
return
if not _has_app_attr(self.app, '_pending_gui_tasks'): _set_app_attr(self.app, '_pending_gui_tasks', [])
if not _has_app_attr(self.app, '_pending_gui_tasks_lock'): _set_app_attr(self.app, '_pending_gui_tasks_lock', threading.Lock())
if not _has_app_attr(self.app, '_pending_asks'): _set_app_attr(self.app, '_pending_asks', {})
if not _has_app_attr(self.app, '_ask_responses'): _set_app_attr(self.app, '_ask_responses', {})
if not _has_app_attr(self.app, '_api_event_queue'): _set_app_attr(self.app, '_api_event_queue', [])
if not _has_app_attr(self.app, '_api_event_queue_lock'): _set_app_attr(self.app, '_api_event_queue_lock', threading.Lock())
self.websocket_server = WebSocketServer(self.app, port=self.port + 1)
self.websocket_server.start()
eq = _get_app_attr(self.app, 'event_queue')
if eq:
eq.websocket_server = self.websocket_server
self.server = HookServerInstance(('127.0.0.1', self.port), HookHandler, self.app)
self.thread = threading.Thread(target=self.server.serve_forever, daemon=True)
self.thread.start()
logging.info(f"Hook server started on port {self.port}")
def stop(self) -> None:
"""
[C: src/app_controller.py:AppController.shutdown, src/mcp_client.py:ExternalMCPManager.stop_all, tests/test_performance_monitor.py:test_perf_monitor_basic_timing, tests/test_performance_monitor.py:test_perf_monitor_component_timing, tests/test_performance_monitor.py:test_perf_monitor_extended_metrics, tests/test_performance_monitor.py:test_perf_monitor_scope_context_manager, tests/test_websocket_server.py:test_websocket_subscription_and_broadcast]
"""
if self.websocket_server:
self.websocket_server.stop()
if self.server:
self.server.shutdown()
self.server.server_close()
if self.thread:
self.thread.join()
logging.info("Hook server stopped")
class WebSocketServer:
"""WebSocket gateway for real-time event streaming."""
def __init__(self, app: Any, port: int = 9000) -> None:
"""
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
"""
self.app = app
self.port = port
self.clients: dict[str, set] = {"events": set(), "telemetry": set()}
self.loop: asyncio.AbstractEventLoop | None = None
self.thread: threading.Thread | None = None
self.server = None
self._stop_event: asyncio.Event | None = None
async def _handler(self, websocket) -> None:
try:
async for message in websocket:
try:
data = json.loads(message)
if data.get("action") == "subscribe":
channel = data.get("channel")
if channel in self.clients:
self.clients[channel].add(websocket)
await websocket.send(json.dumps({"type": "subscription_confirmed", "channel": channel}))
except json.JSONDecodeError as e:
await websocket.send(json.dumps({"type": "error", "message": f"JSON decode error: {e}"}))
except _require_warmed("websockets").exceptions.ConnectionClosed as e:
_ws_close_result = Result(data=None, errors=[ErrorInfo(kind=ErrorKind.NETWORK, message=f"connection closed: {e}", source="api_hooks._handler", original=e)])
logging.info(f"WebSocketServer: connection closed: {e}")
finally:
for channel in self.clients:
if websocket in self.clients[channel]:
self.clients[channel].remove(websocket)
def _run_loop(self) -> None:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self._stop_event = asyncio.Event()
async def main():
max_retries = 10
current_port = self.port
for attempt in range(max_retries):
try:
async with _require_warmed("websockets.asyncio.server").serve(self._handler, "127.0.0.1", current_port) as server:
self.port = current_port
self.server = server
logging.info(f"WebSocketServer successfully bound to port {self.port}")
await self._stop_event.wait()
break
except OSError as e:
if attempt == max_retries - 1:
logging.error(f"WebSocketServer failed to bind after {max_retries} attempts: {e}")
raise
logging.warning(f"WebSocketServer port {current_port} in use, retrying on {current_port + 1}...")
current_port += 1
self.loop.run_until_complete(main())
def start(self) -> None:
"""
[C: src/app_controller.py:AppController._cb_accept_tracks, src/app_controller.py:AppController._cb_plan_epic, src/app_controller.py:AppController._cb_start_track, src/app_controller.py:AppController._fetch_models, src/app_controller.py:AppController._handle_approve_ask, src/app_controller.py:AppController._handle_generate_send, src/app_controller.py:AppController._handle_md_only, src/app_controller.py:AppController._handle_reject_ask, src/app_controller.py:AppController._init_ai_and_hooks, src/app_controller.py:AppController._process_event_queue, src/app_controller.py:AppController._prune_old_logs, src/app_controller.py:AppController._rebuild_rag_index, src/app_controller.py:AppController._run_event_loop, src/app_controller.py:AppController._start_track_logic, src/app_controller.py:AppController.cb_prune_logs, src/app_controller.py:AppController.init_state, src/app_controller.py:AppController.start_services, src/gui_2.py:App._render_discussion_entry_read_mode, src/gui_2.py:App._update_context_file_stats, src/mcp_client.py:ExternalMCPManager.add_server, src/multi_agent_conductor.py:WorkerPool.spawn, src/performance_monitor.py:PerformanceMonitor.__init__, tests/test_ai_client_concurrency.py:test_ai_client_tier_isolation, tests/test_conductor_engine_abort.py:test_kill_worker_sets_abort_and_joins_thread, tests/test_conductor_engine_v2.py:side_effect, tests/test_spawn_interception_v2.py:test_confirm_spawn_pushed_to_queue, tests/test_websocket_server.py:test_websocket_subscription_and_broadcast]
"""
if self.thread and self.thread.is_alive():
return
self.thread = threading.Thread(target=self._run_loop, daemon=True)
self.thread.start()
def stop(self) -> None:
"""
[C: src/app_controller.py:AppController.shutdown, src/mcp_client.py:ExternalMCPManager.stop_all, tests/test_performance_monitor.py:test_perf_monitor_basic_timing, tests/test_performance_monitor.py:test_perf_monitor_component_timing, tests/test_performance_monitor.py:test_perf_monitor_extended_metrics, tests/test_performance_monitor.py:test_perf_monitor_scope_context_manager, tests/test_websocket_server.py:test_websocket_subscription_and_broadcast]
"""
if self.loop and self._stop_event:
self.loop.call_soon_threadsafe(self._stop_event.set)
if self.thread:
self.thread.join(timeout=2.0)
def broadcast(self, channel: str, payload: dict[str, Any]) -> None:
"""
[C: src/app_controller.py:AppController._process_pending_gui_tasks, src/events.py:AsyncEventQueue.put, tests/test_websocket_server.py:test_websocket_subscription_and_broadcast]
"""
if not self.loop or channel not in self.clients:
return
message = json.dumps({"channel": channel, "payload": payload})
for ws in list(self.clients[channel]):
asyncio.run_coroutine_threadsafe(ws.send(message), self.loop)