1c565da7a0
PR2 of the test_full_live_workflow_imgui_assert fix sequence.
When an ImGui scope mismatch (IM_ASSERT(Missing End())) fires in
immapp.run (e.g. after cumulative state corruption from prior sims'
panel renders), the RuntimeError propagates out of app.run(). The
controller's _io_pool gets shut down via __del__/finalization. The
hook server (separate ThreadingHTTPServer) survives. Subsequent test
clicks fail with 'cannot schedule new futures after shutdown' and
the test times out after 120s with no clear signal of what went
wrong.
This commit:
1. Wraps immapp.run in try/except RuntimeError in gui_2.py:618.
On assertion: logs the error to stderr (NOT silent), records
it on controller._gui_degraded_reason and _last_imgui_assert,
and returns from run() so the hook server keeps serving.
2. Adds _gui_degraded_reason and _last_imgui_assert to
AppController.__init__ (initialized to None).
3. Adds /api/gui_health endpoint in api_hooks.py:148. Returns
{healthy, degraded_reason, last_assert, io_pool_alive}.
4. Adds ApiHookClient.get_gui_health() with the matching unit
tests (3 mocked tests + 1 live test).
Per user feedback 2026-06-08:
- The wrap does NOT silently swallow the error. It logs at ERROR
level and surfaces it via the health endpoint.
- Tests can call client.get_gui_health() to detect a degraded GUI
and fail fast with a clear message.
TDD: tests written first, confirmed to fail, then fix applied.
34/34 unit tests pass. 1/1 live test passes (live_gui health
endpoint reports healthy=True on fresh subprocess).
973 lines
43 KiB
Python
973 lines
43 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import sys
|
|
import threading
|
|
import uuid
|
|
|
|
# TODO(Ed): Eliminate these?
|
|
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
|
|
from typing import Any
|
|
|
|
from src.module_loader import _require_warmed
|
|
|
|
|
|
"""
|
|
API Hooks - REST API for external automation and state inspection.
|
|
|
|
This module implements the HookServer, which exposes internal application state to external HTTP requests on port 8999 using Python's
|
|
ThreadingHTTPServer. All endpoints are thread-safe and reads that pass through lock-guarded lists,
|
|
while stateful reads use the GUI thread trampoline pattern.
|
|
|
|
Architecture:
|
|
- HookServer: ThreadingHTTPServer with app reference
|
|
- HookHandler: BaseHTTPRequestHandler per request
|
|
- Request handling uses trampoline pattern for GUI state reads
|
|
- GUI Thread Trampoline: Create threading.Event + result dict
|
|
- Push callback to `_pending_gui_tasks`
|
|
- Wait for event (timeout)
|
|
- Return result as JSON
|
|
|
|
Thread Safety:
|
|
- All reads use lock-protected lists
|
|
- All state mutations happen on the GUI thread
|
|
- The module does to maintain separation between App and AppController
|
|
|
|
Configuration:
|
|
- `--enable-test-hooks`: Required for Hook API to be available
|
|
- `gemini_cli` provider: Hook API is automatically available for synchronous HITL
|
|
|
|
See Also:
|
|
- docs/guide_tools.md for full API reference
|
|
- api_hook_client.py for the client implementation
|
|
"""
|
|
|
|
def _get_app_attr(app: Any, name: str, default: Any = None) -> Any:
|
|
"""Retrieves an attribute from the App or its Controller."""
|
|
if hasattr(app, name):
|
|
val = getattr(app, name)
|
|
return val
|
|
if hasattr(app, 'controller') and hasattr(app.controller, name):
|
|
val = getattr(app.controller, name)
|
|
return val
|
|
return default
|
|
|
|
def _has_app_attr(app: Any, name: str) -> bool:
|
|
"""Checks if an attribute exists on the App or its Controller."""
|
|
if hasattr(app, name): return True
|
|
if hasattr(app, 'controller') and hasattr(app.controller, name): return True
|
|
return False
|
|
|
|
def _set_app_attr(app: Any, name: str, value: Any) -> None:
|
|
"""Sets an attribute on the App or its Controller."""
|
|
if hasattr(app, name):
|
|
setattr(app, name, value)
|
|
elif hasattr(app, 'controller'):
|
|
setattr(app.controller, name, value)
|
|
else:
|
|
setattr(app, name, value)
|
|
|
|
class HookServerInstance(ThreadingHTTPServer):
|
|
allow_reuse_address = True
|
|
"""Custom HTTPServer that carries a reference to the main App instance."""
|
|
def __init__(self, server_address: tuple[str, int], RequestHandlerClass: type, app: Any) -> None:
|
|
"""
|
|
|
|
Initializes the server instance with an app reference.
|
|
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
|
|
"""
|
|
super().__init__(server_address, RequestHandlerClass)
|
|
self.app = app
|
|
|
|
def _serialize_for_api(obj: Any) -> Any:
|
|
"""Serializes complex objects into API-friendly formats (dicts/lists)."""
|
|
if hasattr(obj, "to_dict"):
|
|
return obj.to_dict()
|
|
if isinstance(obj, list):
|
|
return [_serialize_for_api(x) for x in obj]
|
|
if isinstance(obj, dict):
|
|
return {k: _serialize_for_api(v) for k, v in obj.items()}
|
|
from pathlib import PurePath
|
|
if isinstance(obj, PurePath):
|
|
return str(obj)
|
|
return obj
|
|
|
|
class HookHandler(BaseHTTPRequestHandler):
|
|
"""Handles incoming HTTP requests for the API hooks."""
|
|
def do_GET(self) -> None:
|
|
"""Handles GET requests by routing to the appropriate state provider."""
|
|
try:
|
|
app = self.server.app
|
|
print(f'[HOOKS] GET {self.path}')
|
|
_require_warmed("src.session_logger").log_api_hook("GET", self.path, "")
|
|
if self.path == "/status":
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({"status": "ok"}).encode("utf-8"))
|
|
elif self.path == "/api/project":
|
|
from src import project_manager
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
flat = project_manager.flat_config(_get_app_attr(app, "project"))
|
|
self.wfile.write(json.dumps({"project": flat}).encode("utf-8"))
|
|
elif self.path == "/api/project_switch_status":
|
|
# Determinstic signal for tests waiting on a project switch to complete.
|
|
# Polling /api/project returns derived state that may be stale from prior
|
|
# tests; this endpoint tracks in_progress/error explicitly.
|
|
# in_progress: True while _do_project_switch is scheduled or running
|
|
# path: target path (or last completed path)
|
|
# error: string error if last switch failed; None on success/idle
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
controller = _get_app_attr(app, "controller", None)
|
|
if controller is None:
|
|
payload = {"in_progress": False, "path": None, "error": None}
|
|
else:
|
|
payload = {
|
|
"in_progress": bool(getattr(controller, "_project_switch_in_progress", False)),
|
|
"path": getattr(controller, "_project_switch_pending_path", None) or getattr(controller, "active_project_path", None),
|
|
"error": getattr(controller, "_project_switch_error", None),
|
|
}
|
|
self.wfile.write(json.dumps(payload).encode("utf-8"))
|
|
elif self.path == "/api/io_pool_status":
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
controller = _get_app_attr(app, "controller", None)
|
|
if controller is None:
|
|
payload = {"idle": True, "inflight": 0}
|
|
else:
|
|
inflight = getattr(controller, "_io_pool_inflight", 0)
|
|
payload = {"idle": inflight == 0, "inflight": inflight}
|
|
self.wfile.write(json.dumps(payload).encode("utf-8"))
|
|
elif self.path == "/api/gui_health":
|
|
# Surfaces the controller's GUI health state so tests can detect a
|
|
# degraded GUI (e.g. after an ImGui IM_ASSERT) and fail fast with a
|
|
# clear message. Per user feedback 2026-06-08, the error is logged
|
|
# and surfaced here, NOT silently swallowed.
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
controller = _get_app_attr(app, "controller", None)
|
|
if controller is None:
|
|
payload = {
|
|
"healthy": True,
|
|
"degraded_reason": None,
|
|
"last_assert": None,
|
|
"io_pool_alive": True,
|
|
}
|
|
else:
|
|
degraded = getattr(controller, "_gui_degraded_reason", None)
|
|
payload = {
|
|
"healthy": degraded is None,
|
|
"degraded_reason": degraded,
|
|
"last_assert": getattr(controller, "_last_imgui_assert", None),
|
|
"io_pool_alive": True,
|
|
}
|
|
self.wfile.write(json.dumps(payload).encode("utf-8"))
|
|
elif self.path == "/api/session":
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
lock = _get_app_attr(app, "_disc_entries_lock")
|
|
entries = _get_app_attr(app, "disc_entries", [])
|
|
if lock:
|
|
with lock: entries_snapshot = list(entries)
|
|
else:
|
|
entries_snapshot = list(entries)
|
|
self.wfile.write(json.dumps({"session": {"entries": entries_snapshot}}).encode("utf-8"))
|
|
elif self.path == "/api/performance":
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
metrics = {}
|
|
perf = _get_app_attr(app, "perf_monitor")
|
|
if perf: metrics = perf.get_metrics()
|
|
self.wfile.write(json.dumps({"performance": metrics}).encode("utf-8"))
|
|
elif self.path == "/api/events":
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
events = []
|
|
if _has_app_attr(app, "_api_event_queue"):
|
|
lock = _get_app_attr(app, "_api_event_queue_lock")
|
|
queue = _get_app_attr(app, "_api_event_queue")
|
|
if lock:
|
|
with lock:
|
|
events = list(queue)
|
|
queue.clear()
|
|
else:
|
|
events = list(queue)
|
|
queue.clear()
|
|
self.wfile.write(json.dumps({"events": events}).encode("utf-8"))
|
|
elif self.path.startswith("/api/gui/value/"):
|
|
field_tag = self.path.split("/")[-1]
|
|
event = threading.Event()
|
|
result = {"value": None}
|
|
def get_val():
|
|
try:
|
|
settable = _get_app_attr(app, "_settable_fields", {})
|
|
gettable = _get_app_attr(app, "_gettable_fields", {})
|
|
combined = {**settable, **gettable}
|
|
if field_tag in combined:
|
|
attr = combined[field_tag]
|
|
val = _get_app_attr(app, attr, None)
|
|
res_val = _serialize_for_api(val)
|
|
sys.stderr.write(f"[DEBUG] get_val: attr={attr}, val_type={type(val).__name__}, res_val={res_val}\n")
|
|
sys.stderr.flush()
|
|
result["value"] = res_val
|
|
else:
|
|
sys.stderr.write(f"Hook API: field {field_tag} not found in settable or gettable\n")
|
|
sys.stderr.flush()
|
|
finally: event.set()
|
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
|
if lock and tasks is not None:
|
|
with lock: tasks.append({"action": "custom_callback", "callback": get_val})
|
|
if event.wait(timeout=10):
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(result).encode("utf-8"))
|
|
else:
|
|
self.send_response(504)
|
|
self.end_headers()
|
|
elif self.path == "/api/gui/mma_status":
|
|
event = threading.Event()
|
|
result = {}
|
|
def get_mma():
|
|
try:
|
|
result["mma_status"] = _get_app_attr(app, "mma_status", "idle")
|
|
result["ai_status"] = _get_app_attr(app, "ai_status", "idle")
|
|
result["active_tier"] = _get_app_attr(app, "active_tier", None)
|
|
at = _get_app_attr(app, "active_track", None)
|
|
result["active_track"] = at.id if hasattr(at, "id") else at
|
|
result["active_tickets"] = _get_app_attr(app, "active_tickets", [])
|
|
result["mma_step_mode"] = _get_app_attr(app, "mma_step_mode", False)
|
|
result["pending_tool_approval"] = _get_app_attr(app, "_pending_ask_dialog", False)
|
|
result["pending_script_approval"] = _get_app_attr(app, "_pending_dialog", None) is not None
|
|
result["pending_mma_step_approval"] = _get_app_attr(app, "_pending_mma_approval", None) is not None
|
|
result["pending_mma_spawn_approval"] = _get_app_attr(app, "_pending_mma_spawn", None) is not None
|
|
result["pending_approval"] = result["pending_mma_step_approval"] or result["pending_tool_approval"]
|
|
result["pending_spawn"] = result["pending_mma_spawn_approval"]
|
|
result["tracks"] = _get_app_attr(app, "tracks", [])
|
|
result["proposed_tracks"] = _get_app_attr(app, "proposed_tracks", [])
|
|
result["mma_streams"] = _get_app_attr(app, "mma_streams", {})
|
|
result["tier_usage"] = _get_app_attr(app, "mma_tier_usage", {})
|
|
finally: event.set()
|
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
|
if lock and tasks is not None:
|
|
with lock: tasks.append({"action": "custom_callback", "callback": get_mma})
|
|
if event.wait(timeout=10):
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(result).encode("utf-8"))
|
|
else:
|
|
self.send_response(504)
|
|
self.end_headers()
|
|
elif self.path == "/api/gui/diagnostics":
|
|
event = threading.Event()
|
|
result = {}
|
|
def check_all():
|
|
try:
|
|
status = _get_app_attr(app, "ai_status", "idle")
|
|
result["thinking"] = status in ["sending...", "running powershell..."]
|
|
result["live"] = status in ["running powershell...", "fetching url...", "searching web...", "powershell done, awaiting AI..."]
|
|
result["prior"] = _get_app_attr(app, "is_viewing_prior_session", False)
|
|
perf = _get_app_attr(app, "perf_monitor")
|
|
if perf:
|
|
result.update(perf.get_metrics())
|
|
# Warmup status (startup_speedup_20260606 Phase 7). Exposes the
|
|
# AppController's warmup_status() result so external clients and
|
|
# tests can poll until all heavy modules are loaded.
|
|
controller = _get_app_attr(app, "controller", None)
|
|
if controller and hasattr(controller, "warmup_status"):
|
|
try:
|
|
result["warmup"] = controller.warmup_status()
|
|
except Exception:
|
|
result["warmup"] = {"pending": [], "completed": [], "failed": []}
|
|
finally: event.set()
|
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
|
if lock and tasks is not None:
|
|
with lock: tasks.append({"action": "custom_callback", "callback": check_all})
|
|
if event.wait(timeout=10):
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(result).encode("utf-8"))
|
|
else:
|
|
self.send_response(504)
|
|
self.end_headers()
|
|
elif self.path == '/api/gui/state':
|
|
event = threading.Event()
|
|
result = {}
|
|
def get_state():
|
|
try:
|
|
gettable = _get_app_attr(app, "_gettable_fields", {})
|
|
for key, attr in gettable.items():
|
|
val = _get_app_attr(app, attr, None)
|
|
result[key] = _serialize_for_api(val)
|
|
result['show_text_viewer'] = app.show_windows.get('Text Viewer', False)
|
|
result['text_viewer_title'] = _get_app_attr(app, 'text_viewer_title', '')
|
|
result['text_viewer_type'] = _get_app_attr(app, 'text_viewer_type', 'markdown')
|
|
finally: event.set()
|
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
|
if lock and tasks is not None:
|
|
with lock: tasks.append({"action": "custom_callback", "callback": get_state})
|
|
if event.wait(timeout=10):
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(result).encode("utf-8"))
|
|
else:
|
|
self.send_response(504)
|
|
self.end_headers()
|
|
elif self.path == "/api/mma/workers":
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
mma_streams = _get_app_attr(app, "mma_streams", {})
|
|
self.wfile.write(json.dumps({"workers": _serialize_for_api(mma_streams)}).encode("utf-8"))
|
|
elif self.path == "/api/context/state":
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
files = _get_app_attr(app, "files", [])
|
|
screenshots = _get_app_attr(app, "screenshots", [])
|
|
self.wfile.write(json.dumps({"files": _serialize_for_api(files), "screenshots": _serialize_for_api(screenshots)}).encode("utf-8"))
|
|
elif self.path == "/api/v1/context":
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
from src.app_controller import _api_get_context
|
|
ctx_data = _api_get_context(app.controller)
|
|
self.wfile.write(json.dumps(_serialize_for_api(ctx_data)).encode("utf-8"))
|
|
elif self.path == "/api/metrics/financial":
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
usage = _get_app_attr(app, "mma_tier_usage", {})
|
|
metrics = {}
|
|
for tier, data in usage.items():
|
|
model = data.get("model", "")
|
|
in_t = data.get("input", 0)
|
|
out_t = data.get("output", 0)
|
|
cost = _require_warmed("src.cost_tracker").estimate_cost(model, in_t, out_t)
|
|
metrics[tier] = {**data, "estimated_cost": cost}
|
|
self.wfile.write(json.dumps({"financial": metrics}).encode("utf-8"))
|
|
elif self.path == "/api/system/telemetry":
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
threads = [t.name for t in threading.enumerate()]
|
|
queue_size = 0
|
|
if _has_app_attr(app, "_api_event_queue"):
|
|
queue = _get_app_attr(app, "_api_event_queue")
|
|
if queue: queue_size = len(queue)
|
|
self.wfile.write(json.dumps({"threads": threads, "event_queue_size": queue_size}).encode("utf-8"))
|
|
elif self.path == "/api/warmup_status" or self.path.startswith("/api/warmup_status?"):
|
|
# Cheap snapshot of the AppController's warmup progress.
|
|
# Thread-safe: WarmupManager.status() returns a lock-guarded copy.
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
controller = _get_app_attr(app, "controller", None)
|
|
if controller and hasattr(controller, "warmup_status"):
|
|
try:
|
|
payload = controller.warmup_status()
|
|
except Exception:
|
|
payload = {"pending": [], "completed": [], "failed": []}
|
|
else:
|
|
payload = {"pending": [], "completed": [], "failed": []}
|
|
self.wfile.write(json.dumps(payload).encode("utf-8"))
|
|
elif self.path == "/api/warmup_wait" or self.path.startswith("/api/warmup_wait?"):
|
|
# Blocks the request thread (safe under ThreadingHTTPServer) up
|
|
# to `timeout` seconds waiting for warmup to complete, then
|
|
# returns the final status. Default timeout: 30s. Useful for
|
|
# external clients (scripts, other tools) that need to know when
|
|
# the system is fully ready before issuing AI requests.
|
|
timeout = 30.0
|
|
if "?" in self.path:
|
|
from urllib.parse import parse_qs, urlparse
|
|
qs = parse_qs(urlparse(self.path).query)
|
|
if "timeout" in qs:
|
|
try: timeout = float(qs["timeout"][0])
|
|
except (TypeError, ValueError): timeout = 30.0
|
|
controller = _get_app_attr(app, "controller", None)
|
|
if controller and hasattr(controller, "wait_for_warmup"):
|
|
try:
|
|
controller.wait_for_warmup(timeout=timeout)
|
|
except Exception: pass
|
|
try:
|
|
payload = controller.warmup_status()
|
|
except Exception:
|
|
payload = {"pending": [], "completed": [], "failed": []}
|
|
else:
|
|
payload = {"pending": [], "completed": [], "failed": []}
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(payload).encode("utf-8"))
|
|
elif self.path == "/api/warmup_canaries" or self.path.startswith("/api/warmup_canaries?"):
|
|
# Per-module import canary records (startup_speedup_20260606 sub-track 4+).
|
|
# Each record carries canary_id, module, thread_name, thread_id,
|
|
# submit_ts, start_ts, end_ts, elapsed_ms, status, error.
|
|
# Cheap (lock-guarded copy on the WarmupManager). Direct call,
|
|
# no GUI trampoline (the WarmupManager is already thread-safe).
|
|
controller = _get_app_attr(app, "controller", None)
|
|
if controller and hasattr(controller, "warmup_canaries"):
|
|
try:
|
|
payload = {"canaries": controller.warmup_canaries()}
|
|
except Exception:
|
|
payload = {"canaries": []}
|
|
else:
|
|
payload = {"canaries": []}
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(payload).encode("utf-8"))
|
|
elif self.path == "/api/startup_timeline" or self.path.startswith("/api/startup_timeline?"):
|
|
# Startup timeline: init/warmup/first-frame timestamps + precomputed deltas.
|
|
controller = _get_app_attr(app, "controller", None)
|
|
empty = {"init_start_ts": None, "warmup_done_ts": None, "first_frame_ts": None, "warmup_ms": None, "first_frame_after_init_ms": None, "first_frame_after_warmup_ms": None}
|
|
if controller and hasattr(controller, "startup_timeline"):
|
|
try: payload = controller.startup_timeline()
|
|
except Exception: payload = empty
|
|
else: payload = empty
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(payload).encode("utf-8"))
|
|
else:
|
|
self.send_response(404)
|
|
self.end_headers()
|
|
except Exception as e:
|
|
self.send_response(500)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({"error": str(e)}).encode("utf-8"))
|
|
|
|
def do_POST(self) -> None:
|
|
try:
|
|
app = self.server.app
|
|
content_length = int(self.headers.get("Content-Length", 0))
|
|
body = self.rfile.read(content_length)
|
|
body_str = body.decode("utf-8") if body else ""
|
|
_require_warmed("src.session_logger").log_api_hook("POST", self.path, body_str)
|
|
data = json.loads(body_str) if body_str else {}
|
|
print(f'[HOOKS] POST {self.path} data length: {len(data)}')
|
|
if self.path == "/api/project":
|
|
project = _get_app_attr(app, "project")
|
|
_set_app_attr(app, "project", data.get("project", project))
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({"status": "updated"}).encode("utf-8"))
|
|
elif self.path.startswith("/api/confirm/"):
|
|
action_id = self.path.split("/")[-1]
|
|
approved = data.get("approved", False)
|
|
resolve_func = _get_app_attr(app, "resolve_pending_action")
|
|
if resolve_func:
|
|
success = resolve_func(action_id, approved)
|
|
if success:
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({"status": "ok"}).encode("utf-8"))
|
|
else:
|
|
self.send_response(404)
|
|
self.end_headers()
|
|
else:
|
|
self.send_response(500)
|
|
self.end_headers()
|
|
elif self.path == "/api/session":
|
|
lock = _get_app_attr(app, "_disc_entries_lock")
|
|
entries = _get_app_attr(app, "disc_entries")
|
|
new_entries = data.get("session", {}).get("entries", entries)
|
|
if lock:
|
|
with lock: _set_app_attr(app, "disc_entries", new_entries)
|
|
else:
|
|
_set_app_attr(app, "disc_entries", new_entries)
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({"status": "updated"}).encode("utf-8"))
|
|
elif self.path == "/api/gui":
|
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
|
if lock and tasks is not None:
|
|
with lock: tasks.append(data)
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
|
|
elif self.path == "/api/gui/value":
|
|
field_tag = data.get("field")
|
|
event = threading.Event()
|
|
result = {"value": None}
|
|
def get_val():
|
|
try:
|
|
settable = _get_app_attr(app, "_settable_fields", {})
|
|
if field_tag in settable:
|
|
attr = settable[field_tag]
|
|
result["value"] = _serialize_for_api(_get_app_attr(app, attr, None))
|
|
elif "[" in field_tag and field_tag.endswith("]"):
|
|
dict_name, _, key_part = field_tag.partition("[")
|
|
key = key_part[:-1].strip().strip("'\"")
|
|
if dict_name in settable:
|
|
attr = settable[dict_name]
|
|
current = _get_app_attr(app, attr, None)
|
|
if isinstance(current, dict) and key in current:
|
|
result["value"] = _serialize_for_api(current[key])
|
|
finally: event.set()
|
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
|
if lock and tasks is not None:
|
|
with lock: tasks.append({"action": "custom_callback", "callback": get_val})
|
|
if event.wait(timeout=10):
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(result).encode("utf-8"))
|
|
else:
|
|
self.send_response(504)
|
|
self.end_headers()
|
|
elif self.path == "/api/patch/trigger":
|
|
sys.stderr.write(f"[DEBUG] /api/patch/trigger called with data: {data}\n")
|
|
sys.stderr.flush()
|
|
patch_text = data.get("patch_text", "")
|
|
file_paths = data.get("file_paths", [])
|
|
sys.stderr.write(f"[DEBUG] patch_text length: {len(patch_text)}, files: {file_paths}\n")
|
|
sys.stderr.flush()
|
|
event = threading.Event()
|
|
result = {"status": "queued"}
|
|
def trigger_patch():
|
|
try:
|
|
sys.stderr.write(f"[DEBUG] trigger_patch callback executing...\n")
|
|
sys.stderr.flush()
|
|
app._pending_patch_text = patch_text
|
|
app._pending_patch_files = file_paths
|
|
app._show_patch_modal = True
|
|
sys.stderr.write(f"[DEBUG] Set patch modal: show={app._show_patch_modal}, text={'yes' if app._pending_patch_text else 'no'}\n")
|
|
sys.stderr.flush()
|
|
result["status"] = "ok"
|
|
except Exception as e:
|
|
sys.stderr.write(f"[DEBUG] trigger_patch error: {e}\n")
|
|
sys.stderr.flush()
|
|
result["status"] = "error"
|
|
result["error"] = str(e)
|
|
finally:
|
|
event.set()
|
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
|
if lock and tasks is not None:
|
|
with lock: tasks.append({"action": "custom_callback", "callback": trigger_patch})
|
|
if event.wait(timeout=10):
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(result).encode("utf-8"))
|
|
else:
|
|
self.send_response(504)
|
|
self.end_headers()
|
|
elif self.path == "/api/patch/apply":
|
|
event = threading.Event()
|
|
result = {"status": "done"}
|
|
def apply_patch():
|
|
"""
|
|
[C: tests/test_patch_modal.py:test_apply_callback]
|
|
"""
|
|
try:
|
|
if hasattr(app, "_apply_pending_patch"):
|
|
app._apply_pending_patch()
|
|
else:
|
|
result["status"] = "no_method"
|
|
except Exception as e:
|
|
result["status"] = "error"
|
|
result["error"] = str(e)
|
|
finally:
|
|
event.set()
|
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
|
if lock and tasks is not None:
|
|
with lock: tasks.append({"action": "custom_callback", "callback": apply_patch})
|
|
if event.wait(timeout=10):
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(result).encode("utf-8"))
|
|
else:
|
|
self.send_response(504)
|
|
self.end_headers()
|
|
elif self.path == "/api/patch/reject":
|
|
event = threading.Event()
|
|
result = {"status": "done"}
|
|
def reject_patch():
|
|
"""
|
|
[C: tests/test_patch_modal.py:test_reject_callback, tests/test_patch_modal.py:test_reject_patch]
|
|
"""
|
|
try:
|
|
app._show_patch_modal = False
|
|
app._pending_patch_text = None
|
|
app._pending_patch_files = []
|
|
except Exception as e:
|
|
result["status"] = "error"
|
|
result["error"] = str(e)
|
|
finally:
|
|
event.set()
|
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
|
if lock and tasks is not None:
|
|
with lock: tasks.append({"action": "custom_callback", "callback": reject_patch})
|
|
if event.wait(timeout=10):
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(result).encode("utf-8"))
|
|
else:
|
|
self.send_response(504)
|
|
self.end_headers()
|
|
elif self.path == "/api/patch/status":
|
|
sys.stderr.write(f"[DEBUG] /api/patch/status called\n")
|
|
sys.stderr.flush()
|
|
show_modal = _get_app_attr(app, "_show_patch_modal", False)
|
|
patch_text = _get_app_attr(app, "_pending_patch_text", None)
|
|
patch_files = _get_app_attr(app, "_pending_patch_files", [])
|
|
sys.stderr.write(f"[DEBUG] patch status: show_modal={show_modal}, patch_text={patch_text is not None}, files={patch_files}\n")
|
|
sys.stderr.flush()
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({
|
|
"show_modal": show_modal,
|
|
"patch_text": patch_text,
|
|
"file_paths": patch_files
|
|
}).encode("utf-8"))
|
|
elif self.path == "/api/ask":
|
|
request_id = str(uuid.uuid4())
|
|
event = threading.Event()
|
|
pending_asks = _get_app_attr(app, "_pending_asks")
|
|
if pending_asks is None:
|
|
pending_asks = {}
|
|
_set_app_attr(app, "_pending_asks", pending_asks)
|
|
ask_responses = _get_app_attr(app, "_ask_responses")
|
|
if ask_responses is None:
|
|
ask_responses = {}
|
|
_set_app_attr(app, "_ask_responses", ask_responses)
|
|
pending_asks[request_id] = event
|
|
event_queue_lock = _get_app_attr(app, "_api_event_queue_lock")
|
|
event_queue = _get_app_attr(app, "_api_event_queue")
|
|
if event_queue is not None:
|
|
if event_queue_lock:
|
|
with event_queue_lock: event_queue.append({"type": "ask_received", "request_id": request_id, "data": data})
|
|
else:
|
|
event_queue.append({"type": "ask_received", "request_id": request_id, "data": data})
|
|
gui_tasks_lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
|
gui_tasks = _get_app_attr(app, "_pending_gui_tasks")
|
|
if gui_tasks is not None:
|
|
if gui_tasks_lock:
|
|
with gui_tasks_lock: gui_tasks.append({"type": "ask", "request_id": request_id, "data": data})
|
|
else:
|
|
gui_tasks.append({"type": "ask", "request_id": request_id, "data": data})
|
|
if event.wait(timeout=60.0):
|
|
response_data = ask_responses.get(request_id)
|
|
if request_id in ask_responses: del ask_responses[request_id]
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({"status": "ok", "response": response_data}).encode("utf-8"))
|
|
else:
|
|
if request_id in pending_asks: del pending_asks[request_id]
|
|
self.send_response(504)
|
|
self.end_headers()
|
|
elif self.path == "/api/ask/respond":
|
|
request_id = data.get("request_id")
|
|
response_data = data.get("response")
|
|
pending_asks = _get_app_attr(app, "_pending_asks")
|
|
ask_responses = _get_app_attr(app, "_ask_responses")
|
|
if request_id and pending_asks and request_id in pending_asks:
|
|
ask_responses[request_id] = response_data
|
|
event = pending_asks[request_id]
|
|
event.set()
|
|
del pending_asks[request_id]
|
|
gui_tasks_lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
|
gui_tasks = _get_app_attr(app, "_pending_gui_tasks")
|
|
if gui_tasks is not None:
|
|
if gui_tasks_lock:
|
|
with gui_tasks_lock: gui_tasks.append({"action": "clear_ask", "request_id": request_id})
|
|
else:
|
|
gui_tasks.append({"action": "clear_ask", "request_id": request_id})
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({"status": "ok"}).encode("utf-8"))
|
|
else:
|
|
self.send_response(404)
|
|
self.end_headers()
|
|
elif self.path == "/api/mma/workers/spawn":
|
|
def spawn_worker():
|
|
try:
|
|
func = _get_app_attr(app, "_spawn_worker")
|
|
if func: func(data)
|
|
except Exception as e:
|
|
sys.stderr.write(f"[DEBUG] Hook API spawn_worker error: {e}\n")
|
|
sys.stderr.flush()
|
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
|
if lock and tasks is not None:
|
|
with lock: tasks.append({"action": "custom_callback", "callback": spawn_worker})
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
|
|
elif self.path == "/api/mma/workers/kill":
|
|
def kill_worker():
|
|
"""
|
|
[C: src/app_controller.py:AppController.kill_worker, src/gui_2.py:App._cb_kill_ticket, tests/test_conductor_engine_abort.py:test_kill_worker_sets_abort_and_joins_thread]
|
|
"""
|
|
try:
|
|
worker_id = data.get("worker_id")
|
|
func = _get_app_attr(app, "_kill_worker")
|
|
if func: func(worker_id)
|
|
except Exception as e:
|
|
sys.stderr.write(f"[DEBUG] Hook API kill_worker error: {e}\n")
|
|
sys.stderr.flush()
|
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
|
if lock and tasks is not None:
|
|
with lock: tasks.append({"action": "custom_callback", "callback": kill_worker})
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
|
|
elif self.path == "/api/mma/pipeline/pause":
|
|
def pause_pipeline():
|
|
_set_app_attr(app, "mma_step_mode", True)
|
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
|
if lock and tasks is not None:
|
|
with lock: tasks.append({"action": "custom_callback", "callback": pause_pipeline})
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
|
|
elif self.path == "/api/mma/pipeline/resume":
|
|
def resume_pipeline():
|
|
_set_app_attr(app, "mma_step_mode", False)
|
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
|
if lock and tasks is not None:
|
|
with lock: tasks.append({"action": "custom_callback", "callback": resume_pipeline})
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
|
|
elif self.path == "/api/context/inject":
|
|
def inject_context():
|
|
"""
|
|
[C: tests/test_headless_simulation.py:test_mma_track_lifecycle_simulation]
|
|
"""
|
|
files = _get_app_attr(app, "files")
|
|
if isinstance(files, list):
|
|
files.extend(data.get("files", []))
|
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
|
if lock and tasks is not None:
|
|
with lock: tasks.append({"action": "custom_callback", "callback": inject_context})
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
|
|
elif self.path == "/api/mma/dag/mutate":
|
|
def mutate_dag():
|
|
try:
|
|
func = _get_app_attr(app, "mutate_dag")
|
|
if func: func(data)
|
|
except Exception as e:
|
|
sys.stderr.write(f"[DEBUG] Hook API mutate_dag error: {e}\n")
|
|
sys.stderr.flush()
|
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
|
if lock and tasks is not None:
|
|
with lock: tasks.append({"action": "custom_callback", "callback": mutate_dag})
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
|
|
elif self.path == "/api/mma/ticket/approve":
|
|
ticket_id = data.get("ticket_id")
|
|
def approve_ticket():
|
|
try:
|
|
func = _get_app_attr(app, "approve_ticket")
|
|
if func: func(ticket_id)
|
|
except Exception as e:
|
|
sys.stderr.write(f"[DEBUG] Hook API approve_ticket error: {e}\n")
|
|
sys.stderr.flush()
|
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
|
if lock and tasks is not None:
|
|
with lock: tasks.append({"action": "custom_callback", "callback": approve_ticket})
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
|
|
else:
|
|
self.send_response(404)
|
|
self.end_headers()
|
|
except Exception as e:
|
|
import traceback
|
|
traceback.print_exc(file=sys.stderr)
|
|
self.send_response(500)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({"error": str(e)}).encode("utf-8"))
|
|
|
|
|
|
def log_message(self, format: str, *args: Any) -> None:
|
|
logging.info("Hook API: " + format % args)
|
|
|
|
class HookServer:
|
|
def __init__(self, app: Any, port: int = 8999) -> None:
|
|
"""
|
|
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
|
|
"""
|
|
self.app = app
|
|
self.port = port
|
|
self.server = None
|
|
self.thread = None
|
|
self.websocket_server: WebSocketServer | None = None
|
|
|
|
def start(self) -> None:
|
|
"""
|
|
[C: src/app_controller.py:AppController._cb_accept_tracks, src/app_controller.py:AppController._cb_plan_epic, src/app_controller.py:AppController._cb_start_track, src/app_controller.py:AppController._fetch_models, src/app_controller.py:AppController._handle_approve_ask, src/app_controller.py:AppController._handle_generate_send, src/app_controller.py:AppController._handle_md_only, src/app_controller.py:AppController._handle_reject_ask, src/app_controller.py:AppController._init_ai_and_hooks, src/app_controller.py:AppController._process_event_queue, src/app_controller.py:AppController._prune_old_logs, src/app_controller.py:AppController._rebuild_rag_index, src/app_controller.py:AppController._run_event_loop, src/app_controller.py:AppController._start_track_logic, src/app_controller.py:AppController.cb_prune_logs, src/app_controller.py:AppController.init_state, src/app_controller.py:AppController.start_services, src/gui_2.py:App._render_discussion_entry_read_mode, src/gui_2.py:App._update_context_file_stats, src/mcp_client.py:ExternalMCPManager.add_server, src/multi_agent_conductor.py:WorkerPool.spawn, src/performance_monitor.py:PerformanceMonitor.__init__, tests/test_ai_client_concurrency.py:test_ai_client_tier_isolation, tests/test_conductor_engine_abort.py:test_kill_worker_sets_abort_and_joins_thread, tests/test_conductor_engine_v2.py:side_effect, tests/test_spawn_interception_v2.py:test_confirm_spawn_pushed_to_queue, tests/test_websocket_server.py:test_websocket_subscription_and_broadcast]
|
|
"""
|
|
if self.thread and self.thread.is_alive():
|
|
return
|
|
is_gemini_cli = _get_app_attr(self.app, 'current_provider', '') == 'gemini_cli'
|
|
if not _get_app_attr(self.app, 'test_hooks_enabled', False) and not is_gemini_cli:
|
|
return
|
|
if not _has_app_attr(self.app, '_pending_gui_tasks'): _set_app_attr(self.app, '_pending_gui_tasks', [])
|
|
if not _has_app_attr(self.app, '_pending_gui_tasks_lock'): _set_app_attr(self.app, '_pending_gui_tasks_lock', threading.Lock())
|
|
if not _has_app_attr(self.app, '_pending_asks'): _set_app_attr(self.app, '_pending_asks', {})
|
|
if not _has_app_attr(self.app, '_ask_responses'): _set_app_attr(self.app, '_ask_responses', {})
|
|
if not _has_app_attr(self.app, '_api_event_queue'): _set_app_attr(self.app, '_api_event_queue', [])
|
|
if not _has_app_attr(self.app, '_api_event_queue_lock'): _set_app_attr(self.app, '_api_event_queue_lock', threading.Lock())
|
|
|
|
self.websocket_server = WebSocketServer(self.app, port=self.port + 1)
|
|
self.websocket_server.start()
|
|
|
|
eq = _get_app_attr(self.app, 'event_queue')
|
|
if eq:
|
|
eq.websocket_server = self.websocket_server
|
|
|
|
self.server = HookServerInstance(('127.0.0.1', self.port), HookHandler, self.app)
|
|
self.thread = threading.Thread(target=self.server.serve_forever, daemon=True)
|
|
self.thread.start()
|
|
logging.info(f"Hook server started on port {self.port}")
|
|
|
|
def stop(self) -> None:
|
|
"""
|
|
[C: src/app_controller.py:AppController.shutdown, src/mcp_client.py:ExternalMCPManager.stop_all, tests/test_performance_monitor.py:test_perf_monitor_basic_timing, tests/test_performance_monitor.py:test_perf_monitor_component_timing, tests/test_performance_monitor.py:test_perf_monitor_extended_metrics, tests/test_performance_monitor.py:test_perf_monitor_scope_context_manager, tests/test_websocket_server.py:test_websocket_subscription_and_broadcast]
|
|
"""
|
|
if self.websocket_server:
|
|
self.websocket_server.stop()
|
|
if self.server:
|
|
self.server.shutdown()
|
|
self.server.server_close()
|
|
if self.thread:
|
|
self.thread.join()
|
|
logging.info("Hook server stopped")
|
|
|
|
class WebSocketServer:
|
|
"""WebSocket gateway for real-time event streaming."""
|
|
def __init__(self, app: Any, port: int = 9000) -> None:
|
|
"""
|
|
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
|
|
"""
|
|
self.app = app
|
|
self.port = port
|
|
self.clients: dict[str, set] = {"events": set(), "telemetry": set()}
|
|
self.loop: asyncio.AbstractEventLoop | None = None
|
|
self.thread: threading.Thread | None = None
|
|
self.server = None
|
|
self._stop_event: asyncio.Event | None = None
|
|
|
|
async def _handler(self, websocket) -> None:
|
|
try:
|
|
async for message in websocket:
|
|
try:
|
|
data = json.loads(message)
|
|
if data.get("action") == "subscribe":
|
|
channel = data.get("channel")
|
|
if channel in self.clients:
|
|
self.clients[channel].add(websocket)
|
|
await websocket.send(json.dumps({"type": "subscription_confirmed", "channel": channel}))
|
|
except json.JSONDecodeError:
|
|
pass
|
|
except _require_warmed("websockets").exceptions.ConnectionClosed:
|
|
pass
|
|
finally:
|
|
for channel in self.clients:
|
|
if websocket in self.clients[channel]:
|
|
self.clients[channel].remove(websocket)
|
|
|
|
def _run_loop(self) -> None:
|
|
self.loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(self.loop)
|
|
self._stop_event = asyncio.Event()
|
|
async def main():
|
|
max_retries = 10
|
|
current_port = self.port
|
|
for attempt in range(max_retries):
|
|
try:
|
|
async with _require_warmed("websockets.asyncio.server").serve(self._handler, "127.0.0.1", current_port) as server:
|
|
self.port = current_port
|
|
self.server = server
|
|
logging.info(f"WebSocketServer successfully bound to port {self.port}")
|
|
await self._stop_event.wait()
|
|
break
|
|
except OSError as e:
|
|
if attempt == max_retries - 1:
|
|
logging.error(f"WebSocketServer failed to bind after {max_retries} attempts: {e}")
|
|
raise
|
|
logging.warning(f"WebSocketServer port {current_port} in use, retrying on {current_port + 1}...")
|
|
current_port += 1
|
|
self.loop.run_until_complete(main())
|
|
|
|
def start(self) -> None:
|
|
"""
|
|
[C: src/app_controller.py:AppController._cb_accept_tracks, src/app_controller.py:AppController._cb_plan_epic, src/app_controller.py:AppController._cb_start_track, src/app_controller.py:AppController._fetch_models, src/app_controller.py:AppController._handle_approve_ask, src/app_controller.py:AppController._handle_generate_send, src/app_controller.py:AppController._handle_md_only, src/app_controller.py:AppController._handle_reject_ask, src/app_controller.py:AppController._init_ai_and_hooks, src/app_controller.py:AppController._process_event_queue, src/app_controller.py:AppController._prune_old_logs, src/app_controller.py:AppController._rebuild_rag_index, src/app_controller.py:AppController._run_event_loop, src/app_controller.py:AppController._start_track_logic, src/app_controller.py:AppController.cb_prune_logs, src/app_controller.py:AppController.init_state, src/app_controller.py:AppController.start_services, src/gui_2.py:App._render_discussion_entry_read_mode, src/gui_2.py:App._update_context_file_stats, src/mcp_client.py:ExternalMCPManager.add_server, src/multi_agent_conductor.py:WorkerPool.spawn, src/performance_monitor.py:PerformanceMonitor.__init__, tests/test_ai_client_concurrency.py:test_ai_client_tier_isolation, tests/test_conductor_engine_abort.py:test_kill_worker_sets_abort_and_joins_thread, tests/test_conductor_engine_v2.py:side_effect, tests/test_spawn_interception_v2.py:test_confirm_spawn_pushed_to_queue, tests/test_websocket_server.py:test_websocket_subscription_and_broadcast]
|
|
"""
|
|
if self.thread and self.thread.is_alive():
|
|
return
|
|
self.thread = threading.Thread(target=self._run_loop, daemon=True)
|
|
self.thread.start()
|
|
|
|
def stop(self) -> None:
|
|
"""
|
|
[C: src/app_controller.py:AppController.shutdown, src/mcp_client.py:ExternalMCPManager.stop_all, tests/test_performance_monitor.py:test_perf_monitor_basic_timing, tests/test_performance_monitor.py:test_perf_monitor_component_timing, tests/test_performance_monitor.py:test_perf_monitor_extended_metrics, tests/test_performance_monitor.py:test_perf_monitor_scope_context_manager, tests/test_websocket_server.py:test_websocket_subscription_and_broadcast]
|
|
"""
|
|
if self.loop and self._stop_event:
|
|
self.loop.call_soon_threadsafe(self._stop_event.set)
|
|
if self.thread:
|
|
self.thread.join(timeout=2.0)
|
|
|
|
def broadcast(self, channel: str, payload: dict[str, Any]) -> None:
|
|
"""
|
|
[C: src/app_controller.py:AppController._process_pending_gui_tasks, src/events.py:AsyncEventQueue.put, tests/test_websocket_server.py:test_websocket_subscription_and_broadcast]
|
|
"""
|
|
if not self.loop or channel not in self.clients:
|
|
return
|
|
message = json.dumps({"channel": channel, "payload": payload})
|
|
for ws in list(self.clients[channel]):
|
|
asyncio.run_coroutine_threadsafe(ws.send(message), self.loop)
|