7 Commits

11 changed files with 659 additions and 192 deletions
+1 -1
View File
@@ -22,7 +22,7 @@ This file tracks all major tracks for the project. Each track has its own detail
*Link: [./tracks/tool_bias_tuning_20260308/](./tracks/tool_bias_tuning_20260308/)*
*Goal: Influence agent tool selection via a weighting system. Implement semantic nudges in tool descriptions and a dynamic "Tooling Strategy" section in the system prompt. Includes GUI badges and sliders for weight adjustment.*
4. [ ] **Track: Expanded Hook API & Headless Orchestration**
4. [~] **Track: Expanded Hook API & Headless Orchestration**
*Link: [./tracks/hook_api_expansion_20260308/](./tracks/hook_api_expansion_20260308/)*
*Goal: Maximize internal state exposure and provide comprehensive control endpoints (worker spawn/kill, pipeline pause/resume, DAG mutation) via the Hook API. Implement WebSocket-based real-time event streaming.*
@@ -1,44 +1,44 @@
# Implementation Plan: Expanded Hook API & Headless Orchestration
## Phase 1: WebSocket Infrastructure & Event Streaming
- [ ] Task: Implement the WebSocket gateway.
- [ ] Integrate a lightweight WebSocket library (e.g., `websockets` or `simple-websocket`).
- [ ] Create a dedicated `WebSocketServer` class in `src/api_hooks.py` that runs on a separate port (e.g., 9000).
- [ ] Implement a basic subscription mechanism for different event channels.
- [ ] Task: Connect the event queue to the WebSocket stream.
- [ ] Update `AsyncEventQueue` to broadcast events to connected WebSocket clients.
- [ ] Add high-frequency telemetry (FPS, CPU) to the event stream.
- [ ] Task: Write unit tests for WebSocket connection and event broadcasting.
- [ ] Task: Conductor - User Manual Verification 'Phase 1: WebSocket Infrastructure' (Protocol in workflow.md)
- [x] Task: Implement the WebSocket gateway.
- [x] Integrate a lightweight WebSocket library (e.g., `websockets` or `simple-websocket`).
- [x] Create a dedicated `WebSocketServer` class in `src/api_hooks.py` that runs on a separate port (e.g., 9000).
- [x] Implement a basic subscription mechanism for different event channels.
- [x] Task: Connect the event queue to the WebSocket stream.
- [x] Update `AsyncEventQueue` to broadcast events to connected WebSocket clients.
- [x] Add high-frequency telemetry (FPS, CPU) to the event stream.
- [x] Task: Write unit tests for WebSocket connection and event broadcasting.
- [x] Task: Conductor - User Manual Verification 'Phase 1: WebSocket Infrastructure' (Protocol in workflow.md)
## Phase 2: Expanded Read Endpoints (GET)
- [ ] Task: Implement detailed state exposure endpoints.
- [ ] Add `/api/mma/workers` to return the status, logs, and traces of all active sub-agents.
- [ ] Add `/api/context/state` to expose AST cache metadata and file aggregation status.
- [ ] Add `/api/metrics/financial` to return track-specific token usage and cost data.
- [ ] Add `/api/system/telemetry` to expose internal thread and queue metrics.
- [ ] Task: Enhance `/api/gui/state` to provide a truly exhaustive JSON dump of all internal managers.
- [ ] Task: Update `api_hook_client.py` with corresponding methods for all new GET endpoints.
- [ ] Task: Write integration tests for all new GET endpoints using `live_gui`.
- [ ] Task: Conductor - User Manual Verification 'Phase 2: Expanded Read Endpoints' (Protocol in workflow.md)
- [x] Task: Implement detailed state exposure endpoints.
- [x] Add `/api/mma/workers` to return the status, logs, and traces of all active sub-agents.
- [x] Add `/api/context/state` to expose AST cache metadata and file aggregation status.
- [x] Add `/api/metrics/financial` to return track-specific token usage and cost data.
- [x] Add `/api/system/telemetry` to expose internal thread and queue metrics.
- [x] Task: Enhance `/api/gui/state` to provide a truly exhaustive JSON dump of all internal managers.
- [x] Task: Update `api_hook_client.py` with corresponding methods for all new GET endpoints.
- [x] Task: Write integration tests for all new GET endpoints using `live_gui`.
- [x] Task: Conductor - User Manual Verification 'Phase 2: Expanded Read Endpoints' (Protocol in workflow.md)
## Phase 3: Comprehensive Control Endpoints (POST)
- [ ] Task: Implement worker and pipeline control.
- [ ] Add `/api/mma/workers/spawn` to manually initiate sub-agent execution via the API.
- [ ] Add `/api/mma/workers/kill` to programmatically abort running workers.
- [ ] Add `/api/mma/pipeline/pause` and `/api/mma/pipeline/resume` to control the global MMA loop.
- [ ] Task: Implement context and DAG mutation.
- [ ] Add `/api/context/inject` to allow programmatic context injection (files/skeletons).
- [ ] Add `/api/mma/dag/mutate` to allow modifying task dependencies through the API.
- [ ] Task: Update `api_hook_client.py` with corresponding methods for all new POST endpoints.
- [ ] Task: Write integration tests for all new control endpoints using `live_gui`.
- [ ] Task: Conductor - User Manual Verification 'Phase 3: Comprehensive Control Endpoints' (Protocol in workflow.md)
- [x] Task: Implement worker and pipeline control.
- [x] Add `/api/mma/workers/spawn` to manually initiate sub-agent execution via the API.
- [x] Add `/api/mma/workers/kill` to programmatically abort running workers.
- [x] Add `/api/mma/pipeline/pause` and `/api/mma/pipeline/resume` to control the global MMA loop.
- [x] Task: Implement context and DAG mutation.
- [x] Add `/api/context/inject` to allow programmatic context injection (files/skeletons).
- [x] Add `/api/mma/dag/mutate` to allow modifying task dependencies through the API.
- [x] Task: Update `api_hook_client.py` with corresponding methods for all new POST endpoints.
- [x] Task: Write integration tests for all new control endpoints using `live_gui`.
- [x] Task: Conductor - User Manual Verification 'Phase 3: Comprehensive Control Endpoints' (Protocol in workflow.md)
## Phase 4: Headless Refinement & Verification
- [ ] Task: Improve error reporting.
- [ ] Refactor `HookHandler` to catch and wrap all internal exceptions in JSON error responses.
- [ ] Task: Conduct a full headless simulation.
- [ ] Create a specialized simulation script that replicates a full MMA track lifecycle (planning, worker spawn, DAG mutation, completion) using ONLY the Hook API.
- [ ] Task: Final performance audit.
- [ ] Ensure that active WebSocket clients and large state dumps do not cause GUI frame drops.
- [x] Task: Improve error reporting.
- [x] Refactor `HookHandler` to catch and wrap all internal exceptions in JSON error responses.
- [x] Task: Conduct a full headless simulation.
- [x] Create a specialized simulation script that replicates a full MMA track lifecycle (planning, worker spawn, DAG mutation, completion) using ONLY the Hook API.
- [x] Task: Final performance audit.
- [x] Ensure that active WebSocket clients and large state dumps do not cause GUI frame drops.
- [ ] Task: Conductor - User Manual Verification 'Phase 4: Headless Refinement & Verification' (Protocol in workflow.md)
+35 -1
View File
@@ -116,7 +116,7 @@ class ApiHookClient:
return None
def post_gui(self, payload: dict) -> dict[str, Any]:
"""Pushes an event to the GUI's SyncEventQueue via the /api/gui endpoint."""
"""Pushes an event to the GUI's AsyncEventQueue via the /api/gui endpoint."""
return self._make_request('POST', '/api/gui', data=payload) or {}
def push_event(self, action: str, payload: dict) -> dict[str, Any]:
@@ -186,6 +186,22 @@ class ApiHookClient:
"""Retrieves the dedicated MMA engine status."""
return self._make_request('GET', '/api/gui/mma_status') or {}
def get_mma_workers(self) -> dict[str, Any]:
"""Retrieves status for all active MMA workers."""
return self._make_request('GET', '/api/mma/workers') or {}
def get_context_state(self) -> dict[str, Any]:
"""Retrieves the current file and screenshot context state."""
return self._make_request('GET', '/api/context/state') or {}
def get_financial_metrics(self) -> dict[str, Any]:
"""Retrieves token usage and estimated financial cost metrics."""
return self._make_request('GET', '/api/metrics/financial') or {}
def get_system_telemetry(self) -> dict[str, Any]:
"""Retrieves system-level telemetry including thread status and event queue size."""
return self._make_request('GET', '/api/system/telemetry') or {}
def get_node_status(self, node_id: str) -> dict[str, Any]:
"""Retrieves status for a specific node in the MMA DAG."""
return self._make_request('GET', f'/api/mma/node/{node_id}') or {}
@@ -224,3 +240,21 @@ class ApiHookClient:
"""Gets the current patch modal status."""
return self._make_request('GET', '/api/patch/status') or {}
def spawn_mma_worker(self, data: dict) -> dict:
return self._make_request('POST', '/api/mma/workers/spawn', data=data) or {}
def kill_mma_worker(self, worker_id: str) -> dict:
return self._make_request('POST', '/api/mma/workers/kill', data={"worker_id": worker_id}) or {}
def pause_mma_pipeline(self) -> dict:
return self._make_request('POST', '/api/mma/pipeline/pause') or {}
def resume_mma_pipeline(self) -> dict:
return self._make_request('POST', '/api/mma/pipeline/resume') or {}
def inject_context(self, data: dict) -> dict:
return self._make_request('POST', '/api/context/inject', data=data) or {}
def mutate_mma_dag(self, data: dict) -> dict:
return self._make_request('POST', '/api/mma/dag/mutate', data=data) or {}
+340 -140
View File
@@ -3,10 +3,14 @@ import json
import threading
import uuid
import sys
import asyncio
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
from typing import Any
import logging
import websockets
from websockets.asyncio.server import serve
from src import session_logger
from src import cost_tracker
"""
API Hooks - REST API for external automation and state inspection.
@@ -77,165 +81,207 @@ def _serialize_for_api(obj: Any) -> Any:
class HookHandler(BaseHTTPRequestHandler):
"""Handles incoming HTTP requests for the API hooks."""
def do_GET(self) -> None:
app = self.server.app
session_logger.log_api_hook("GET", self.path, "")
if self.path == "/status":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "ok"}).encode("utf-8"))
elif self.path == "/api/project":
from src import project_manager
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
flat = project_manager.flat_config(_get_app_attr(app, "project"))
self.wfile.write(json.dumps({"project": flat}).encode("utf-8"))
elif self.path == "/api/session":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
lock = _get_app_attr(app, "_disc_entries_lock")
entries = _get_app_attr(app, "disc_entries", [])
if lock:
with lock: entries_snapshot = list(entries)
else:
entries_snapshot = list(entries)
self.wfile.write(json.dumps({"session": {"entries": entries_snapshot}}).encode("utf-8"))
elif self.path == "/api/performance":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
metrics = {}
perf = _get_app_attr(app, "perf_monitor")
if perf: metrics = perf.get_metrics()
self.wfile.write(json.dumps({"performance": metrics}).encode("utf-8"))
elif self.path == "/api/events":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
events = []
if _has_app_attr(app, "_api_event_queue"):
lock = _get_app_attr(app, "_api_event_queue_lock")
queue = _get_app_attr(app, "_api_event_queue")
try:
app = self.server.app
session_logger.log_api_hook("GET", self.path, "")
if self.path == "/status":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "ok"}).encode("utf-8"))
elif self.path == "/api/project":
from src import project_manager
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
flat = project_manager.flat_config(_get_app_attr(app, "project"))
self.wfile.write(json.dumps({"project": flat}).encode("utf-8"))
elif self.path == "/api/session":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
lock = _get_app_attr(app, "_disc_entries_lock")
entries = _get_app_attr(app, "disc_entries", [])
if lock:
with lock:
with lock: entries_snapshot = list(entries)
else:
entries_snapshot = list(entries)
self.wfile.write(json.dumps({"session": {"entries": entries_snapshot}}).encode("utf-8"))
elif self.path == "/api/performance":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
metrics = {}
perf = _get_app_attr(app, "perf_monitor")
if perf: metrics = perf.get_metrics()
self.wfile.write(json.dumps({"performance": metrics}).encode("utf-8"))
elif self.path == "/api/events":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
events = []
if _has_app_attr(app, "_api_event_queue"):
lock = _get_app_attr(app, "_api_event_queue_lock")
queue = _get_app_attr(app, "_api_event_queue")
if lock:
with lock:
events = list(queue)
queue.clear()
else:
events = list(queue)
queue.clear()
self.wfile.write(json.dumps({"events": events}).encode("utf-8"))
elif self.path.startswith("/api/gui/value/"):
field_tag = self.path.split("/")[-1]
event = threading.Event()
result = {"value": None}
def get_val():
try:
settable = _get_app_attr(app, "_settable_fields", {})
gettable = _get_app_attr(app, "_gettable_fields", {})
combined = {**settable, **gettable}
if field_tag in combined:
attr = combined[field_tag]
result["value"] = _get_app_attr(app, attr, None)
else:
sys.stderr.write(f"[DEBUG] Hook API: field {field_tag} not found in settable or gettable\n")
sys.stderr.flush()
finally: event.set()
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": get_val})
if event.wait(timeout=10):
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(result).encode("utf-8"))
else:
events = list(queue)
queue.clear()
self.wfile.write(json.dumps({"events": events}).encode("utf-8"))
elif self.path.startswith("/api/gui/value/"):
field_tag = self.path.split("/")[-1]
event = threading.Event()
result = {"value": None}
def get_val():
try:
settable = _get_app_attr(app, "_settable_fields", {})
gettable = _get_app_attr(app, "_gettable_fields", {})
combined = {**settable, **gettable}
if field_tag in combined:
attr = combined[field_tag]
result["value"] = _get_app_attr(app, attr, None)
else:
sys.stderr.write(f"[DEBUG] Hook API: field {field_tag} not found in settable or gettable\n")
sys.stderr.flush()
finally: event.set()
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": get_val})
if event.wait(timeout=10):
self.send_response(504)
self.end_headers()
elif self.path == "/api/gui/mma_status":
event = threading.Event()
result = {}
def get_mma():
try:
result["mma_status"] = _get_app_attr(app, "mma_status", "idle")
result["ai_status"] = _get_app_attr(app, "ai_status", "idle")
result["active_tier"] = _get_app_attr(app, "active_tier", None)
at = _get_app_attr(app, "active_track", None)
result["active_track"] = at.id if hasattr(at, "id") else at
result["active_tickets"] = _get_app_attr(app, "active_tickets", [])
result["mma_step_mode"] = _get_app_attr(app, "mma_step_mode", False)
result["pending_tool_approval"] = _get_app_attr(app, "_pending_ask_dialog", False)
result["pending_script_approval"] = _get_app_attr(app, "_pending_dialog", None) is not None
result["pending_mma_step_approval"] = _get_app_attr(app, "_pending_mma_approval", None) is not None
result["pending_mma_spawn_approval"] = _get_app_attr(app, "_pending_mma_spawn", None) is not None
result["pending_approval"] = result["pending_mma_step_approval"] or result["pending_tool_approval"]
result["pending_spawn"] = result["pending_mma_spawn_approval"]
result["tracks"] = _get_app_attr(app, "tracks", [])
result["proposed_tracks"] = _get_app_attr(app, "proposed_tracks", [])
result["mma_streams"] = _get_app_attr(app, "mma_streams", {})
result["mma_tier_usage"] = _get_app_attr(app, "mma_tier_usage", {})
finally: event.set()
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": get_mma})
if event.wait(timeout=10):
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(result).encode("utf-8"))
else:
self.send_response(504)
self.end_headers()
elif self.path == "/api/gui/diagnostics":
event = threading.Event()
result = {}
def check_all():
try:
status = _get_app_attr(app, "ai_status", "idle")
result["thinking"] = status in ["sending...", "running powershell..."]
result["live"] = status in ["running powershell...", "fetching url...", "searching web...", "powershell done, awaiting AI..."]
result["prior"] = _get_app_attr(app, "is_viewing_prior_session", False)
finally: event.set()
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": check_all})
if event.wait(timeout=10):
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(result).encode("utf-8"))
else:
self.send_response(504)
self.end_headers()
elif self.path == '/api/gui/state':
event = threading.Event()
result = {}
def get_state():
try:
gettable = _get_app_attr(app, "_gettable_fields", {})
for key, attr in gettable.items():
val = _get_app_attr(app, attr, None)
result[key] = _serialize_for_api(val)
finally: event.set()
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": get_state})
if event.wait(timeout=10):
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(result).encode("utf-8"))
else:
self.send_response(504)
self.end_headers()
elif self.path == "/api/mma/workers":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(result).encode("utf-8"))
else:
self.send_response(504)
self.end_headers()
elif self.path == "/api/gui/mma_status":
event = threading.Event()
result = {}
def get_mma():
try:
result["mma_status"] = _get_app_attr(app, "mma_status", "idle")
result["ai_status"] = _get_app_attr(app, "ai_status", "idle")
result["active_tier"] = _get_app_attr(app, "active_tier", None)
at = _get_app_attr(app, "active_track", None)
result["active_track"] = at.id if hasattr(at, "id") else at
result["active_tickets"] = _get_app_attr(app, "active_tickets", [])
result["mma_step_mode"] = _get_app_attr(app, "mma_step_mode", False)
result["pending_tool_approval"] = _get_app_attr(app, "_pending_ask_dialog", False)
result["pending_script_approval"] = _get_app_attr(app, "_pending_dialog", None) is not None
result["pending_mma_step_approval"] = _get_app_attr(app, "_pending_mma_approval", None) is not None
result["pending_mma_spawn_approval"] = _get_app_attr(app, "_pending_mma_spawn", None) is not None
result["pending_approval"] = result["pending_mma_step_approval"] or result["pending_tool_approval"]
result["pending_spawn"] = result["pending_mma_spawn_approval"]
result["tracks"] = _get_app_attr(app, "tracks", [])
result["proposed_tracks"] = _get_app_attr(app, "proposed_tracks", [])
result["mma_streams"] = _get_app_attr(app, "mma_streams", {})
result["mma_tier_usage"] = _get_app_attr(app, "mma_tier_usage", {})
finally: event.set()
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": get_mma})
if event.wait(timeout=10):
mma_streams = _get_app_attr(app, "mma_streams", {})
self.wfile.write(json.dumps({"workers": _serialize_for_api(mma_streams)}).encode("utf-8"))
elif self.path == "/api/context/state":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(result).encode("utf-8"))
else:
self.send_response(504)
self.end_headers()
elif self.path == "/api/gui/diagnostics":
event = threading.Event()
result = {}
def check_all():
try:
status = _get_app_attr(app, "ai_status", "idle")
result["thinking"] = status in ["sending...", "running powershell..."]
result["live"] = status in ["running powershell...", "fetching url...", "searching web...", "powershell done, awaiting AI..."]
result["prior"] = _get_app_attr(app, "is_viewing_prior_session", False)
finally: event.set()
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": check_all})
if event.wait(timeout=10):
files = _get_app_attr(app, "files", [])
screenshots = _get_app_attr(app, "screenshots", [])
self.wfile.write(json.dumps({"files": files, "screenshots": screenshots}).encode("utf-8"))
elif self.path == "/api/metrics/financial":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(result).encode("utf-8"))
else:
self.send_response(504)
self.end_headers()
elif self.path == '/api/gui/state':
event = threading.Event()
result = {}
def get_state():
try:
gettable = _get_app_attr(app, "_gettable_fields", {})
for key, attr in gettable.items():
val = _get_app_attr(app, attr, None)
result[key] = _serialize_for_api(val)
finally: event.set()
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": get_state})
if event.wait(timeout=10):
usage = _get_app_attr(app, "mma_tier_usage", {})
metrics = {}
for tier, data in usage.items():
model = data.get("model", "")
in_t = data.get("input", 0)
out_t = data.get("output", 0)
cost = cost_tracker.estimate_cost(model, in_t, out_t)
metrics[tier] = {**data, "estimated_cost": cost}
self.wfile.write(json.dumps({"financial": metrics}).encode("utf-8"))
elif self.path == "/api/system/telemetry":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(result).encode("utf-8"))
threads = [t.name for t in threading.enumerate()]
queue_size = 0
if _has_app_attr(app, "_api_event_queue"):
queue = _get_app_attr(app, "_api_event_queue")
if queue: queue_size = len(queue)
self.wfile.write(json.dumps({"threads": threads, "event_queue_size": queue_size}).encode("utf-8"))
else:
self.send_response(504)
self.send_response(404)
self.end_headers()
else:
self.send_response(404)
except Exception as e:
self.send_response(500)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"error": str(e)}).encode("utf-8"))
def do_POST(self) -> None:
app = self.server.app
@@ -479,6 +525,90 @@ class HookHandler(BaseHTTPRequestHandler):
else:
self.send_response(404)
self.end_headers()
elif self.path == "/api/mma/workers/spawn":
def spawn_worker():
try:
func = _get_app_attr(app, "_spawn_worker")
if func: func(data)
except Exception as e:
sys.stderr.write(f"[DEBUG] Hook API spawn_worker error: {e}\n")
sys.stderr.flush()
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": spawn_worker})
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
elif self.path == "/api/mma/workers/kill":
def kill_worker():
try:
worker_id = data.get("worker_id")
func = _get_app_attr(app, "_kill_worker")
if func: func(worker_id)
except Exception as e:
sys.stderr.write(f"[DEBUG] Hook API kill_worker error: {e}\n")
sys.stderr.flush()
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": kill_worker})
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
elif self.path == "/api/mma/pipeline/pause":
def pause_pipeline():
_set_app_attr(app, "mma_step_mode", True)
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": pause_pipeline})
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
elif self.path == "/api/mma/pipeline/resume":
def resume_pipeline():
_set_app_attr(app, "mma_step_mode", False)
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": resume_pipeline})
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
elif self.path == "/api/context/inject":
def inject_context():
files = _get_app_attr(app, "files")
if isinstance(files, list):
files.extend(data.get("files", []))
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": inject_context})
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
elif self.path == "/api/mma/dag/mutate":
def mutate_dag():
try:
func = _get_app_attr(app, "_mutate_dag")
if func: func(data)
except Exception as e:
sys.stderr.write(f"[DEBUG] Hook API mutate_dag error: {e}\n")
sys.stderr.flush()
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": mutate_dag})
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
else:
self.send_response(404)
self.end_headers()
@@ -498,6 +628,7 @@ class HookServer:
self.port = port
self.server = None
self.thread = None
self.websocket_server: WebSocketServer | None = None
def start(self) -> None:
if self.thread and self.thread.is_alive():
@@ -511,12 +642,22 @@ class HookServer:
if not _has_app_attr(self.app, '_ask_responses'): _set_app_attr(self.app, '_ask_responses', {})
if not _has_app_attr(self.app, '_api_event_queue'): _set_app_attr(self.app, '_api_event_queue', [])
if not _has_app_attr(self.app, '_api_event_queue_lock'): _set_app_attr(self.app, '_api_event_queue_lock', threading.Lock())
self.websocket_server = WebSocketServer(self.app)
self.websocket_server.start()
eq = _get_app_attr(self.app, 'event_queue')
if eq:
eq.websocket_server = self.websocket_server
self.server = HookServerInstance(('127.0.0.1', self.port), HookHandler, self.app)
self.thread = threading.Thread(target=self.server.serve_forever, daemon=True)
self.thread.start()
logging.info(f"Hook server started on port {self.port}")
def stop(self) -> None:
if self.websocket_server:
self.websocket_server.stop()
if self.server:
self.server.shutdown()
self.server.server_close()
@@ -524,3 +665,62 @@ class HookServer:
self.thread.join()
logging.info("Hook server stopped")
class WebSocketServer:
"""WebSocket gateway for real-time event streaming."""
def __init__(self, app: Any, port: int = 9000) -> None:
self.app = app
self.port = port
self.clients: dict[str, set] = {"events": set(), "telemetry": set()}
self.loop: asyncio.AbstractEventLoop | None = None
self.thread: threading.Thread | None = None
self.server = None
self._stop_event: asyncio.Event | None = None
async def _handler(self, websocket) -> None:
try:
async for message in websocket:
try:
data = json.loads(message)
if data.get("action") == "subscribe":
channel = data.get("channel")
if channel in self.clients:
self.clients[channel].add(websocket)
await websocket.send(json.dumps({"type": "subscription_confirmed", "channel": channel}))
except json.JSONDecodeError:
pass
except websockets.exceptions.ConnectionClosed:
pass
finally:
for channel in self.clients:
if websocket in self.clients[channel]:
self.clients[channel].remove(websocket)
def _run_loop(self) -> None:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self._stop_event = asyncio.Event()
async def main():
async with serve(self._handler, "127.0.0.1", self.port) as server:
self.server = server
await self._stop_event.wait()
self.loop.run_until_complete(main())
def start(self) -> None:
if self.thread and self.thread.is_alive():
return
self.thread = threading.Thread(target=self._run_loop, daemon=True)
self.thread.start()
def stop(self) -> None:
if self.loop and self._stop_event:
self.loop.call_soon_threadsafe(self._stop_event.set)
if self.thread:
self.thread.join(timeout=2.0)
def broadcast(self, channel: str, payload: dict[str, Any]) -> None:
if not self.loop or channel not in self.clients:
return
message = json.dumps({"channel": channel, "payload": payload})
for ws in list(self.clients[channel]):
asyncio.run_coroutine_threadsafe(ws.send(message), self.loop)
+10 -1
View File
@@ -150,7 +150,7 @@ class AppController:
self.disc_roles: List[str] = []
self.files: List[str] = []
self.screenshots: List[str] = []
self.event_queue: events.SyncEventQueue = events.SyncEventQueue()
self.event_queue: events.AsyncEventQueue = events.AsyncEventQueue()
self._loop_thread: Optional[threading.Thread] = None
self.tracks: List[Dict[str, Any]] = []
self.active_track: Optional[models.Track] = None
@@ -188,6 +188,7 @@ class AppController:
"Tier 4": {"input": 0, "output": 0, "provider": "gemini", "model": "gemini-2.5-flash-lite", "tool_preset": None},
}
self.perf_monitor: performance_monitor.PerformanceMonitor = performance_monitor.PerformanceMonitor()
self._last_telemetry_time: float = 0.0
self._pending_gui_tasks: List[Dict[str, Any]] = []
self._api_event_queue: List[Dict[str, Any]] = []
# Pending dialogs state moved from App
@@ -522,6 +523,14 @@ class AppController:
})
def _process_pending_gui_tasks(self) -> None:
# Periodic telemetry broadcast
now = time.time()
if hasattr(self, 'event_queue') and hasattr(self.event_queue, 'websocket_server') and self.event_queue.websocket_server:
if now - self._last_telemetry_time >= 1.0:
self._last_telemetry_time = now
metrics = self.perf_monitor.get_metrics()
self.event_queue.websocket_server.broadcast("telemetry", metrics)
if not self._pending_gui_tasks:
return
sys.stderr.write(f"[DEBUG] _process_pending_gui_tasks: processing {len(self._pending_gui_tasks)} tasks\n")
+13 -7
View File
@@ -9,7 +9,7 @@ between the GUI main thread and background workers:
- Thread-safe: Callbacks execute on emitter's thread
- Example: ai_client.py emits 'request_start' and 'response_received' events
2. SyncEventQueue: Producer-consumer pattern via queue.Queue
2. AsyncEventQueue: Producer-consumer pattern via queue.Queue
- Used for: Decoupled task submission where consumer polls at its own pace
- Thread-safe: Built on Python's thread-safe queue.Queue
- Example: Background workers submit tasks, main thread drains queue
@@ -21,16 +21,16 @@ between the GUI main thread and background workers:
Integration Points:
- ai_client.py: EventEmitter for API lifecycle events
- gui_2.py: Consumes events via _process_event_queue()
- multi_agent_conductor.py: Uses SyncEventQueue for state updates
- multi_agent_conductor.py: Uses AsyncEventQueue for state updates
- api_hooks.py: Pushes events to _api_event_queue for external visibility
Thread Safety:
- EventEmitter: NOT thread-safe for concurrent on/emit (use from single thread)
- SyncEventQueue: FULLY thread-safe (built on queue.Queue)
- AsyncEventQueue: FULLY thread-safe (built on queue.Queue)
- UserRequestEvent: Immutable, safe for concurrent access
"""
import queue
from typing import Callable, Any, Dict, List, Tuple
from typing import Callable, Any, Dict, List, Tuple, Optional
class EventEmitter:
"""
@@ -70,14 +70,16 @@ class EventEmitter:
"""Clears all registered listeners."""
self._listeners.clear()
class SyncEventQueue:
class AsyncEventQueue:
"""
Synchronous event queue for decoupled communication using queue.Queue.
(Named AsyncEventQueue for architectural consistency, but is synchronous)
"""
def __init__(self) -> None:
"""Initializes the SyncEventQueue with an internal queue.Queue."""
"""Initializes the AsyncEventQueue with an internal queue.Queue."""
self._queue: queue.Queue[Tuple[str, Any]] = queue.Queue()
self.websocket_server: Optional[Any] = None
def put(self, event_name: str, payload: Any = None) -> None:
"""
@@ -88,6 +90,8 @@ class SyncEventQueue:
payload: Optional data associated with the event.
"""
self._queue.put((event_name, payload))
if self.websocket_server:
self.websocket_server.broadcast("events", {"event": event_name, "payload": payload})
def get(self) -> Tuple[str, Any]:
"""
@@ -106,6 +110,9 @@ class SyncEventQueue:
"""Blocks until all items in the queue have been gotten and processed."""
self._queue.join()
# Alias for backward compatibility
SyncEventQueue = AsyncEventQueue
class UserRequestEvent:
"""
Payload for a user request event.
@@ -126,4 +133,3 @@ class UserRequestEvent:
"disc_text": self.disc_text,
"base_dir": self.base_dir
}
+8 -8
View File
@@ -11,7 +11,7 @@ Key Components:
Architecture Integration:
- Uses TrackDAG and ExecutionEngine from dag_engine.py
- Communicates with GUI via SyncEventQueue
- Communicates with GUI via AsyncEventQueue
- Manages tier-specific token usage via update_usage()
Thread Safety:
@@ -45,7 +45,7 @@ Thread Safety:
- Abort events enable per-ticket cancellation
Integration:
- Uses SyncEventQueue for state updates to the GUI
- Uses AsyncEventQueue for state updates to the GUI
- Uses ai_client.send() for LLM communication
- Uses mcp_client for tool dispatch
@@ -123,7 +123,7 @@ class ConductorEngine:
Orchestrates the execution of tickets within a track.
"""
def __init__(self, track: Track, event_queue: Optional[events.SyncEventQueue] = None, auto_queue: bool = False) -> None:
def __init__(self, track: Track, event_queue: Optional[events.AsyncEventQueue] = None, auto_queue: bool = False) -> None:
self.track = track
self.event_queue = event_queue
self.tier_usage = {
@@ -343,12 +343,12 @@ class ConductorEngine:
self._push_state(active_tier="Tier 2 (Tech Lead)")
time.sleep(1)
def _queue_put(event_queue: events.SyncEventQueue, event_name: str, payload) -> None:
"""Thread-safe helper to push an event to the SyncEventQueue from a worker thread."""
def _queue_put(event_queue: events.AsyncEventQueue, event_name: str, payload) -> None:
"""Thread-safe helper to push an event to the AsyncEventQueue from a worker thread."""
if event_queue is not None:
event_queue.put(event_name, payload)
def confirm_execution(payload: str, event_queue: events.SyncEventQueue, ticket_id: str) -> bool:
def confirm_execution(payload: str, event_queue: events.AsyncEventQueue, ticket_id: str) -> bool:
"""
Pushes an approval request to the GUI and waits for response.
"""
@@ -370,7 +370,7 @@ def confirm_execution(payload: str, event_queue: events.SyncEventQueue, ticket_i
return approved
return False
def confirm_spawn(role: str, prompt: str, context_md: str, event_queue: events.SyncEventQueue, ticket_id: str) -> Tuple[bool, str, str]:
def confirm_spawn(role: str, prompt: str, context_md: str, event_queue: events.AsyncEventQueue, ticket_id: str) -> Tuple[bool, str, str]:
"""
Pushes a spawn approval request to the GUI and waits for response.
Returns (approved, modified_prompt, modified_context)
@@ -409,7 +409,7 @@ def confirm_spawn(role: str, prompt: str, context_md: str, event_queue: events.S
return approved, modified_prompt, modified_context
return False, prompt, context_md
def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: List[str] | None = None, event_queue: events.SyncEventQueue | None = None, engine: Optional['ConductorEngine'] = None, md_content: str = "") -> None:
def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: List[str] | None = None, event_queue: events.AsyncEventQueue | None = None, engine: Optional['ConductorEngine'] = None, md_content: str = "") -> None:
"""
Simulates the lifecycle of a single agent working on a ticket.
Calls the AI client and updates the ticket status based on the response.
+21
View File
@@ -0,0 +1,21 @@
import pytest
from src.api_hook_client import ApiHookClient
@pytest.mark.asyncio
async def test_control_endpoints_exist():
client = ApiHookClient()
assert hasattr(client, "spawn_mma_worker")
assert hasattr(client, "kill_mma_worker")
assert hasattr(client, "pause_mma_pipeline")
assert hasattr(client, "resume_mma_pipeline")
assert hasattr(client, "inject_context")
assert hasattr(client, "mutate_mma_dag")
def test_api_hook_client_control_methods_exist():
client = ApiHookClient()
assert callable(getattr(client, "spawn_mma_worker", None))
assert callable(getattr(client, "kill_mma_worker", None))
assert callable(getattr(client, "pause_mma_pipeline", None))
assert callable(getattr(client, "resume_mma_pipeline", None))
assert callable(getattr(client, "inject_context", None))
assert callable(getattr(client, "mutate_mma_dag", None))
+36
View File
@@ -0,0 +1,36 @@
import pytest
from src.api_hook_client import ApiHookClient
@pytest.fixture
def mock_app():
class MockApp:
def __init__(self):
self.mma_streams = {"W1": {"status": "running", "logs": ["started"]}}
self.active_tickets = []
self.files = ["file1.py", "file2.py"]
self.mma_tier_usage = {"Tier 1": {"input": 100, "output": 50, "model": "gemini"}}
self.event_queue = type("MockQueue", (), {"_queue": type("Q", (), {"qsize": lambda s: 5})()})()
self._gettable_fields = {"test_field": "test_attr"}
self.test_attr = "hello"
self.test_hooks_enabled = True
self._pending_gui_tasks = []
self._pending_gui_tasks_lock = None
self.ai_status = "idle"
return MockApp()
@pytest.mark.asyncio
async def test_get_mma_workers():
client = ApiHookClient()
# Set up client to talk to a locally mocked server or use a live fixture if available
# For now, just test that the methods exist on ApiHookClient
assert hasattr(client, "get_mma_workers")
assert hasattr(client, "get_context_state")
assert hasattr(client, "get_financial_metrics")
assert hasattr(client, "get_system_telemetry")
def test_api_hook_client_methods_exist():
client = ApiHookClient()
assert callable(getattr(client, "get_mma_workers", None))
assert callable(getattr(client, "get_context_state", None))
assert callable(getattr(client, "get_financial_metrics", None))
assert callable(getattr(client, "get_system_telemetry", None))
+117
View File
@@ -0,0 +1,117 @@
import pytest
from unittest.mock import patch, MagicMock
from src.api_hook_client import ApiHookClient
@pytest.mark.asyncio
async def test_mma_track_lifecycle_simulation():
"""
This test simulates the sequence of API calls an external orchestrator
would make to manage an MMA track lifecycle via the Hook API.
It verifies that ApiHookClient correctly routes requests to the
corresponding endpoints in src/api_hooks.py.
"""
client = ApiHookClient("http://localhost:8999")
with patch('requests.get') as mock_get, patch('requests.post') as mock_post:
# --- PHASE 1: Initialization & Discovery ---
# Mock successful status check
mock_get.return_value.status_code = 200
mock_get.return_value.json.return_value = {"status": "ok"}
assert client.get_status()["status"] == "ok"
# Mock project state retrieval
mock_get.return_value.json.return_value = {"project": {"name": "test_project"}}
project = client.get_project()
assert project["project"]["name"] == "test_project"
# --- PHASE 2: Track Planning & Initialization ---
# Inject some files into context for the AI to work with
mock_post.return_value.status_code = 200
mock_post.return_value.json.return_value = {"status": "queued"}
inject_data = {"files": ["src/app_controller.py", "tests/test_basic.py"]}
res = client.inject_context(inject_data)
assert res["status"] == "queued"
mock_post.assert_called_with("http://localhost:8999/api/context/inject", json=inject_data, headers={}, timeout=5.0)
# --- PHASE 3: Worker Spawn & Execution ---
# Spawn a worker to start a ticket
spawn_data = {
"track_id": "track_20260311",
"ticket_id": "TKT-001",
"role": "tier3-worker",
"prompt": "Implement the new logging feature"
}
res = client.spawn_mma_worker(spawn_data)
assert res["status"] == "queued"
mock_post.assert_called_with("http://localhost:8999/api/mma/workers/spawn", json=spawn_data, headers={}, timeout=5.0)
# --- PHASE 4: DAG Mutation & Dependency Management ---
# Add a second ticket that depends on the first one
dag_mutation = {
"action": "add_ticket",
"ticket": {
"id": "TKT-002",
"deps": ["TKT-001"],
"role": "tier4-qa",
"prompt": "Verify the logging feature"
}
}
res = client.mutate_mma_dag(dag_mutation)
assert res["status"] == "queued"
mock_post.assert_called_with("http://localhost:8999/api/mma/dag/mutate", json=dag_mutation, headers={}, timeout=5.0)
# --- PHASE 5: Monitoring & Status Polling ---
# Poll for MMA status
mock_get.return_value.json.return_value = {
"mma_status": "running",
"active_tickets": ["TKT-001"],
"active_tier": "Tier 3",
"tracks": [{"id": "track_20260311", "status": "active"}]
}
mma_status = client.get_mma_status()
assert mma_status["mma_status"] == "running"
assert "TKT-001" in mma_status["active_tickets"]
# Check worker stream status
mock_get.return_value.json.return_value = {
"workers": {
"TKT-001": {"status": "running", "output": "Starting work..."}
}
}
workers = client.get_mma_workers()
assert workers["workers"]["TKT-001"]["status"] == "running"
# --- PHASE 6: Human-in-the-Loop Interaction ---
# Mock a tool approval request
# In a real scenario, this would block until a POST to /api/ask/respond occurs
mock_post.return_value.json.return_value = {"status": "ok", "response": True}
approved = client.request_confirmation("run_powershell", {"script": "ls -Recurse"})
assert approved is True
# --- PHASE 7: Completion & Cleanup ---
# Mock completion status
mock_get.return_value.json.return_value = {
"mma_status": "idle",
"active_tickets": [],
"tracks": [{"id": "track_20260311", "status": "completed"}]
}
final_status = client.get_mma_status()
assert final_status["mma_status"] == "idle"
assert len(final_status["active_tickets"]) == 0
# Reset session to clean up
client.reset_session()
# Verify reset click was pushed
mock_post.assert_called_with("http://localhost:8999/api/gui", json={"action": "click", "item": "btn_reset", "user_data": None}, headers={}, timeout=5.0)
if __name__ == "__main__":
import asyncio
asyncio.run(test_mma_track_lifecycle_simulation())
+44
View File
@@ -0,0 +1,44 @@
import pytest
import asyncio
import json
import websockets
from src.api_hooks import WebSocketServer
@pytest.mark.asyncio
async def test_websocket_subscription_and_broadcast():
# Mock app
app = type("MockApp", (), {"test_hooks_enabled": True})()
# Start server on a specific port
port = 9005
server = WebSocketServer(app, port=port)
server.start()
# Wait for server to start
await asyncio.sleep(0.5)
try:
uri = f"ws://127.0.0.1:{port}"
async with websockets.connect(uri) as websocket:
# Subscribe to events channel
subscribe_msg = {"action": "subscribe", "channel": "events"}
await websocket.send(json.dumps(subscribe_msg))
# Receive confirmation
response = await websocket.recv()
data = json.loads(response)
assert data["type"] == "subscription_confirmed"
assert data["channel"] == "events"
# Broadcast an event from the server
event_payload = {"event": "test_event", "data": "hello"}
server.broadcast("events", event_payload)
# Receive the broadcast
broadcast_response = await websocket.recv()
broadcast_data = json.loads(broadcast_response)
assert broadcast_data["channel"] == "events"
assert broadcast_data["payload"] == event_payload
finally:
server.stop()