7 Commits

11 changed files with 659 additions and 192 deletions
+1 -1
View File
@@ -22,7 +22,7 @@ This file tracks all major tracks for the project. Each track has its own detail
*Link: [./tracks/tool_bias_tuning_20260308/](./tracks/tool_bias_tuning_20260308/)* *Link: [./tracks/tool_bias_tuning_20260308/](./tracks/tool_bias_tuning_20260308/)*
*Goal: Influence agent tool selection via a weighting system. Implement semantic nudges in tool descriptions and a dynamic "Tooling Strategy" section in the system prompt. Includes GUI badges and sliders for weight adjustment.* *Goal: Influence agent tool selection via a weighting system. Implement semantic nudges in tool descriptions and a dynamic "Tooling Strategy" section in the system prompt. Includes GUI badges and sliders for weight adjustment.*
4. [ ] **Track: Expanded Hook API & Headless Orchestration** 4. [~] **Track: Expanded Hook API & Headless Orchestration**
*Link: [./tracks/hook_api_expansion_20260308/](./tracks/hook_api_expansion_20260308/)* *Link: [./tracks/hook_api_expansion_20260308/](./tracks/hook_api_expansion_20260308/)*
*Goal: Maximize internal state exposure and provide comprehensive control endpoints (worker spawn/kill, pipeline pause/resume, DAG mutation) via the Hook API. Implement WebSocket-based real-time event streaming.* *Goal: Maximize internal state exposure and provide comprehensive control endpoints (worker spawn/kill, pipeline pause/resume, DAG mutation) via the Hook API. Implement WebSocket-based real-time event streaming.*
@@ -1,44 +1,44 @@
# Implementation Plan: Expanded Hook API & Headless Orchestration # Implementation Plan: Expanded Hook API & Headless Orchestration
## Phase 1: WebSocket Infrastructure & Event Streaming ## Phase 1: WebSocket Infrastructure & Event Streaming
- [ ] Task: Implement the WebSocket gateway. - [x] Task: Implement the WebSocket gateway.
- [ ] Integrate a lightweight WebSocket library (e.g., `websockets` or `simple-websocket`). - [x] Integrate a lightweight WebSocket library (e.g., `websockets` or `simple-websocket`).
- [ ] Create a dedicated `WebSocketServer` class in `src/api_hooks.py` that runs on a separate port (e.g., 9000). - [x] Create a dedicated `WebSocketServer` class in `src/api_hooks.py` that runs on a separate port (e.g., 9000).
- [ ] Implement a basic subscription mechanism for different event channels. - [x] Implement a basic subscription mechanism for different event channels.
- [ ] Task: Connect the event queue to the WebSocket stream. - [x] Task: Connect the event queue to the WebSocket stream.
- [ ] Update `AsyncEventQueue` to broadcast events to connected WebSocket clients. - [x] Update `AsyncEventQueue` to broadcast events to connected WebSocket clients.
- [ ] Add high-frequency telemetry (FPS, CPU) to the event stream. - [x] Add high-frequency telemetry (FPS, CPU) to the event stream.
- [ ] Task: Write unit tests for WebSocket connection and event broadcasting. - [x] Task: Write unit tests for WebSocket connection and event broadcasting.
- [ ] Task: Conductor - User Manual Verification 'Phase 1: WebSocket Infrastructure' (Protocol in workflow.md) - [x] Task: Conductor - User Manual Verification 'Phase 1: WebSocket Infrastructure' (Protocol in workflow.md)
## Phase 2: Expanded Read Endpoints (GET) ## Phase 2: Expanded Read Endpoints (GET)
- [ ] Task: Implement detailed state exposure endpoints. - [x] Task: Implement detailed state exposure endpoints.
- [ ] Add `/api/mma/workers` to return the status, logs, and traces of all active sub-agents. - [x] Add `/api/mma/workers` to return the status, logs, and traces of all active sub-agents.
- [ ] Add `/api/context/state` to expose AST cache metadata and file aggregation status. - [x] Add `/api/context/state` to expose AST cache metadata and file aggregation status.
- [ ] Add `/api/metrics/financial` to return track-specific token usage and cost data. - [x] Add `/api/metrics/financial` to return track-specific token usage and cost data.
- [ ] Add `/api/system/telemetry` to expose internal thread and queue metrics. - [x] Add `/api/system/telemetry` to expose internal thread and queue metrics.
- [ ] Task: Enhance `/api/gui/state` to provide a truly exhaustive JSON dump of all internal managers. - [x] Task: Enhance `/api/gui/state` to provide a truly exhaustive JSON dump of all internal managers.
- [ ] Task: Update `api_hook_client.py` with corresponding methods for all new GET endpoints. - [x] Task: Update `api_hook_client.py` with corresponding methods for all new GET endpoints.
- [ ] Task: Write integration tests for all new GET endpoints using `live_gui`. - [x] Task: Write integration tests for all new GET endpoints using `live_gui`.
- [ ] Task: Conductor - User Manual Verification 'Phase 2: Expanded Read Endpoints' (Protocol in workflow.md) - [x] Task: Conductor - User Manual Verification 'Phase 2: Expanded Read Endpoints' (Protocol in workflow.md)
## Phase 3: Comprehensive Control Endpoints (POST) ## Phase 3: Comprehensive Control Endpoints (POST)
- [ ] Task: Implement worker and pipeline control. - [x] Task: Implement worker and pipeline control.
- [ ] Add `/api/mma/workers/spawn` to manually initiate sub-agent execution via the API. - [x] Add `/api/mma/workers/spawn` to manually initiate sub-agent execution via the API.
- [ ] Add `/api/mma/workers/kill` to programmatically abort running workers. - [x] Add `/api/mma/workers/kill` to programmatically abort running workers.
- [ ] Add `/api/mma/pipeline/pause` and `/api/mma/pipeline/resume` to control the global MMA loop. - [x] Add `/api/mma/pipeline/pause` and `/api/mma/pipeline/resume` to control the global MMA loop.
- [ ] Task: Implement context and DAG mutation. - [x] Task: Implement context and DAG mutation.
- [ ] Add `/api/context/inject` to allow programmatic context injection (files/skeletons). - [x] Add `/api/context/inject` to allow programmatic context injection (files/skeletons).
- [ ] Add `/api/mma/dag/mutate` to allow modifying task dependencies through the API. - [x] Add `/api/mma/dag/mutate` to allow modifying task dependencies through the API.
- [ ] Task: Update `api_hook_client.py` with corresponding methods for all new POST endpoints. - [x] Task: Update `api_hook_client.py` with corresponding methods for all new POST endpoints.
- [ ] Task: Write integration tests for all new control endpoints using `live_gui`. - [x] Task: Write integration tests for all new control endpoints using `live_gui`.
- [ ] Task: Conductor - User Manual Verification 'Phase 3: Comprehensive Control Endpoints' (Protocol in workflow.md) - [x] Task: Conductor - User Manual Verification 'Phase 3: Comprehensive Control Endpoints' (Protocol in workflow.md)
## Phase 4: Headless Refinement & Verification ## Phase 4: Headless Refinement & Verification
- [ ] Task: Improve error reporting. - [x] Task: Improve error reporting.
- [ ] Refactor `HookHandler` to catch and wrap all internal exceptions in JSON error responses. - [x] Refactor `HookHandler` to catch and wrap all internal exceptions in JSON error responses.
- [ ] Task: Conduct a full headless simulation. - [x] Task: Conduct a full headless simulation.
- [ ] Create a specialized simulation script that replicates a full MMA track lifecycle (planning, worker spawn, DAG mutation, completion) using ONLY the Hook API. - [x] Create a specialized simulation script that replicates a full MMA track lifecycle (planning, worker spawn, DAG mutation, completion) using ONLY the Hook API.
- [ ] Task: Final performance audit. - [x] Task: Final performance audit.
- [ ] Ensure that active WebSocket clients and large state dumps do not cause GUI frame drops. - [x] Ensure that active WebSocket clients and large state dumps do not cause GUI frame drops.
- [ ] Task: Conductor - User Manual Verification 'Phase 4: Headless Refinement & Verification' (Protocol in workflow.md) - [ ] Task: Conductor - User Manual Verification 'Phase 4: Headless Refinement & Verification' (Protocol in workflow.md)
+35 -1
View File
@@ -116,7 +116,7 @@ class ApiHookClient:
return None return None
def post_gui(self, payload: dict) -> dict[str, Any]: def post_gui(self, payload: dict) -> dict[str, Any]:
"""Pushes an event to the GUI's SyncEventQueue via the /api/gui endpoint.""" """Pushes an event to the GUI's AsyncEventQueue via the /api/gui endpoint."""
return self._make_request('POST', '/api/gui', data=payload) or {} return self._make_request('POST', '/api/gui', data=payload) or {}
def push_event(self, action: str, payload: dict) -> dict[str, Any]: def push_event(self, action: str, payload: dict) -> dict[str, Any]:
@@ -186,6 +186,22 @@ class ApiHookClient:
"""Retrieves the dedicated MMA engine status.""" """Retrieves the dedicated MMA engine status."""
return self._make_request('GET', '/api/gui/mma_status') or {} return self._make_request('GET', '/api/gui/mma_status') or {}
def get_mma_workers(self) -> dict[str, Any]:
"""Retrieves status for all active MMA workers."""
return self._make_request('GET', '/api/mma/workers') or {}
def get_context_state(self) -> dict[str, Any]:
"""Retrieves the current file and screenshot context state."""
return self._make_request('GET', '/api/context/state') or {}
def get_financial_metrics(self) -> dict[str, Any]:
"""Retrieves token usage and estimated financial cost metrics."""
return self._make_request('GET', '/api/metrics/financial') or {}
def get_system_telemetry(self) -> dict[str, Any]:
"""Retrieves system-level telemetry including thread status and event queue size."""
return self._make_request('GET', '/api/system/telemetry') or {}
def get_node_status(self, node_id: str) -> dict[str, Any]: def get_node_status(self, node_id: str) -> dict[str, Any]:
"""Retrieves status for a specific node in the MMA DAG.""" """Retrieves status for a specific node in the MMA DAG."""
return self._make_request('GET', f'/api/mma/node/{node_id}') or {} return self._make_request('GET', f'/api/mma/node/{node_id}') or {}
@@ -224,3 +240,21 @@ class ApiHookClient:
"""Gets the current patch modal status.""" """Gets the current patch modal status."""
return self._make_request('GET', '/api/patch/status') or {} return self._make_request('GET', '/api/patch/status') or {}
def spawn_mma_worker(self, data: dict) -> dict:
return self._make_request('POST', '/api/mma/workers/spawn', data=data) or {}
def kill_mma_worker(self, worker_id: str) -> dict:
return self._make_request('POST', '/api/mma/workers/kill', data={"worker_id": worker_id}) or {}
def pause_mma_pipeline(self) -> dict:
return self._make_request('POST', '/api/mma/pipeline/pause') or {}
def resume_mma_pipeline(self) -> dict:
return self._make_request('POST', '/api/mma/pipeline/resume') or {}
def inject_context(self, data: dict) -> dict:
return self._make_request('POST', '/api/context/inject', data=data) or {}
def mutate_mma_dag(self, data: dict) -> dict:
return self._make_request('POST', '/api/mma/dag/mutate', data=data) or {}
+200
View File
@@ -3,10 +3,14 @@ import json
import threading import threading
import uuid import uuid
import sys import sys
import asyncio
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
from typing import Any from typing import Any
import logging import logging
import websockets
from websockets.asyncio.server import serve
from src import session_logger from src import session_logger
from src import cost_tracker
""" """
API Hooks - REST API for external automation and state inspection. API Hooks - REST API for external automation and state inspection.
@@ -77,6 +81,7 @@ def _serialize_for_api(obj: Any) -> Any:
class HookHandler(BaseHTTPRequestHandler): class HookHandler(BaseHTTPRequestHandler):
"""Handles incoming HTTP requests for the API hooks.""" """Handles incoming HTTP requests for the API hooks."""
def do_GET(self) -> None: def do_GET(self) -> None:
try:
app = self.server.app app = self.server.app
session_logger.log_api_hook("GET", self.path, "") session_logger.log_api_hook("GET", self.path, "")
if self.path == "/status": if self.path == "/status":
@@ -233,9 +238,50 @@ class HookHandler(BaseHTTPRequestHandler):
else: else:
self.send_response(504) self.send_response(504)
self.end_headers() self.end_headers()
elif self.path == "/api/mma/workers":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
mma_streams = _get_app_attr(app, "mma_streams", {})
self.wfile.write(json.dumps({"workers": _serialize_for_api(mma_streams)}).encode("utf-8"))
elif self.path == "/api/context/state":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
files = _get_app_attr(app, "files", [])
screenshots = _get_app_attr(app, "screenshots", [])
self.wfile.write(json.dumps({"files": files, "screenshots": screenshots}).encode("utf-8"))
elif self.path == "/api/metrics/financial":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
usage = _get_app_attr(app, "mma_tier_usage", {})
metrics = {}
for tier, data in usage.items():
model = data.get("model", "")
in_t = data.get("input", 0)
out_t = data.get("output", 0)
cost = cost_tracker.estimate_cost(model, in_t, out_t)
metrics[tier] = {**data, "estimated_cost": cost}
self.wfile.write(json.dumps({"financial": metrics}).encode("utf-8"))
elif self.path == "/api/system/telemetry":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
threads = [t.name for t in threading.enumerate()]
queue_size = 0
if _has_app_attr(app, "_api_event_queue"):
queue = _get_app_attr(app, "_api_event_queue")
if queue: queue_size = len(queue)
self.wfile.write(json.dumps({"threads": threads, "event_queue_size": queue_size}).encode("utf-8"))
else: else:
self.send_response(404) self.send_response(404)
self.end_headers() self.end_headers()
except Exception as e:
self.send_response(500)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"error": str(e)}).encode("utf-8"))
def do_POST(self) -> None: def do_POST(self) -> None:
app = self.server.app app = self.server.app
@@ -479,6 +525,90 @@ class HookHandler(BaseHTTPRequestHandler):
else: else:
self.send_response(404) self.send_response(404)
self.end_headers() self.end_headers()
elif self.path == "/api/mma/workers/spawn":
def spawn_worker():
try:
func = _get_app_attr(app, "_spawn_worker")
if func: func(data)
except Exception as e:
sys.stderr.write(f"[DEBUG] Hook API spawn_worker error: {e}\n")
sys.stderr.flush()
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": spawn_worker})
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
elif self.path == "/api/mma/workers/kill":
def kill_worker():
try:
worker_id = data.get("worker_id")
func = _get_app_attr(app, "_kill_worker")
if func: func(worker_id)
except Exception as e:
sys.stderr.write(f"[DEBUG] Hook API kill_worker error: {e}\n")
sys.stderr.flush()
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": kill_worker})
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
elif self.path == "/api/mma/pipeline/pause":
def pause_pipeline():
_set_app_attr(app, "mma_step_mode", True)
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": pause_pipeline})
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
elif self.path == "/api/mma/pipeline/resume":
def resume_pipeline():
_set_app_attr(app, "mma_step_mode", False)
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": resume_pipeline})
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
elif self.path == "/api/context/inject":
def inject_context():
files = _get_app_attr(app, "files")
if isinstance(files, list):
files.extend(data.get("files", []))
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": inject_context})
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
elif self.path == "/api/mma/dag/mutate":
def mutate_dag():
try:
func = _get_app_attr(app, "_mutate_dag")
if func: func(data)
except Exception as e:
sys.stderr.write(f"[DEBUG] Hook API mutate_dag error: {e}\n")
sys.stderr.flush()
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": mutate_dag})
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
else: else:
self.send_response(404) self.send_response(404)
self.end_headers() self.end_headers()
@@ -498,6 +628,7 @@ class HookServer:
self.port = port self.port = port
self.server = None self.server = None
self.thread = None self.thread = None
self.websocket_server: WebSocketServer | None = None
def start(self) -> None: def start(self) -> None:
if self.thread and self.thread.is_alive(): if self.thread and self.thread.is_alive():
@@ -511,12 +642,22 @@ class HookServer:
if not _has_app_attr(self.app, '_ask_responses'): _set_app_attr(self.app, '_ask_responses', {}) if not _has_app_attr(self.app, '_ask_responses'): _set_app_attr(self.app, '_ask_responses', {})
if not _has_app_attr(self.app, '_api_event_queue'): _set_app_attr(self.app, '_api_event_queue', []) if not _has_app_attr(self.app, '_api_event_queue'): _set_app_attr(self.app, '_api_event_queue', [])
if not _has_app_attr(self.app, '_api_event_queue_lock'): _set_app_attr(self.app, '_api_event_queue_lock', threading.Lock()) if not _has_app_attr(self.app, '_api_event_queue_lock'): _set_app_attr(self.app, '_api_event_queue_lock', threading.Lock())
self.websocket_server = WebSocketServer(self.app)
self.websocket_server.start()
eq = _get_app_attr(self.app, 'event_queue')
if eq:
eq.websocket_server = self.websocket_server
self.server = HookServerInstance(('127.0.0.1', self.port), HookHandler, self.app) self.server = HookServerInstance(('127.0.0.1', self.port), HookHandler, self.app)
self.thread = threading.Thread(target=self.server.serve_forever, daemon=True) self.thread = threading.Thread(target=self.server.serve_forever, daemon=True)
self.thread.start() self.thread.start()
logging.info(f"Hook server started on port {self.port}") logging.info(f"Hook server started on port {self.port}")
def stop(self) -> None: def stop(self) -> None:
if self.websocket_server:
self.websocket_server.stop()
if self.server: if self.server:
self.server.shutdown() self.server.shutdown()
self.server.server_close() self.server.server_close()
@@ -524,3 +665,62 @@ class HookServer:
self.thread.join() self.thread.join()
logging.info("Hook server stopped") logging.info("Hook server stopped")
class WebSocketServer:
"""WebSocket gateway for real-time event streaming."""
def __init__(self, app: Any, port: int = 9000) -> None:
self.app = app
self.port = port
self.clients: dict[str, set] = {"events": set(), "telemetry": set()}
self.loop: asyncio.AbstractEventLoop | None = None
self.thread: threading.Thread | None = None
self.server = None
self._stop_event: asyncio.Event | None = None
async def _handler(self, websocket) -> None:
try:
async for message in websocket:
try:
data = json.loads(message)
if data.get("action") == "subscribe":
channel = data.get("channel")
if channel in self.clients:
self.clients[channel].add(websocket)
await websocket.send(json.dumps({"type": "subscription_confirmed", "channel": channel}))
except json.JSONDecodeError:
pass
except websockets.exceptions.ConnectionClosed:
pass
finally:
for channel in self.clients:
if websocket in self.clients[channel]:
self.clients[channel].remove(websocket)
def _run_loop(self) -> None:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self._stop_event = asyncio.Event()
async def main():
async with serve(self._handler, "127.0.0.1", self.port) as server:
self.server = server
await self._stop_event.wait()
self.loop.run_until_complete(main())
def start(self) -> None:
if self.thread and self.thread.is_alive():
return
self.thread = threading.Thread(target=self._run_loop, daemon=True)
self.thread.start()
def stop(self) -> None:
if self.loop and self._stop_event:
self.loop.call_soon_threadsafe(self._stop_event.set)
if self.thread:
self.thread.join(timeout=2.0)
def broadcast(self, channel: str, payload: dict[str, Any]) -> None:
if not self.loop or channel not in self.clients:
return
message = json.dumps({"channel": channel, "payload": payload})
for ws in list(self.clients[channel]):
asyncio.run_coroutine_threadsafe(ws.send(message), self.loop)
+10 -1
View File
@@ -150,7 +150,7 @@ class AppController:
self.disc_roles: List[str] = [] self.disc_roles: List[str] = []
self.files: List[str] = [] self.files: List[str] = []
self.screenshots: List[str] = [] self.screenshots: List[str] = []
self.event_queue: events.SyncEventQueue = events.SyncEventQueue() self.event_queue: events.AsyncEventQueue = events.AsyncEventQueue()
self._loop_thread: Optional[threading.Thread] = None self._loop_thread: Optional[threading.Thread] = None
self.tracks: List[Dict[str, Any]] = [] self.tracks: List[Dict[str, Any]] = []
self.active_track: Optional[models.Track] = None self.active_track: Optional[models.Track] = None
@@ -188,6 +188,7 @@ class AppController:
"Tier 4": {"input": 0, "output": 0, "provider": "gemini", "model": "gemini-2.5-flash-lite", "tool_preset": None}, "Tier 4": {"input": 0, "output": 0, "provider": "gemini", "model": "gemini-2.5-flash-lite", "tool_preset": None},
} }
self.perf_monitor: performance_monitor.PerformanceMonitor = performance_monitor.PerformanceMonitor() self.perf_monitor: performance_monitor.PerformanceMonitor = performance_monitor.PerformanceMonitor()
self._last_telemetry_time: float = 0.0
self._pending_gui_tasks: List[Dict[str, Any]] = [] self._pending_gui_tasks: List[Dict[str, Any]] = []
self._api_event_queue: List[Dict[str, Any]] = [] self._api_event_queue: List[Dict[str, Any]] = []
# Pending dialogs state moved from App # Pending dialogs state moved from App
@@ -522,6 +523,14 @@ class AppController:
}) })
def _process_pending_gui_tasks(self) -> None: def _process_pending_gui_tasks(self) -> None:
# Periodic telemetry broadcast
now = time.time()
if hasattr(self, 'event_queue') and hasattr(self.event_queue, 'websocket_server') and self.event_queue.websocket_server:
if now - self._last_telemetry_time >= 1.0:
self._last_telemetry_time = now
metrics = self.perf_monitor.get_metrics()
self.event_queue.websocket_server.broadcast("telemetry", metrics)
if not self._pending_gui_tasks: if not self._pending_gui_tasks:
return return
sys.stderr.write(f"[DEBUG] _process_pending_gui_tasks: processing {len(self._pending_gui_tasks)} tasks\n") sys.stderr.write(f"[DEBUG] _process_pending_gui_tasks: processing {len(self._pending_gui_tasks)} tasks\n")
+13 -7
View File
@@ -9,7 +9,7 @@ between the GUI main thread and background workers:
- Thread-safe: Callbacks execute on emitter's thread - Thread-safe: Callbacks execute on emitter's thread
- Example: ai_client.py emits 'request_start' and 'response_received' events - Example: ai_client.py emits 'request_start' and 'response_received' events
2. SyncEventQueue: Producer-consumer pattern via queue.Queue 2. AsyncEventQueue: Producer-consumer pattern via queue.Queue
- Used for: Decoupled task submission where consumer polls at its own pace - Used for: Decoupled task submission where consumer polls at its own pace
- Thread-safe: Built on Python's thread-safe queue.Queue - Thread-safe: Built on Python's thread-safe queue.Queue
- Example: Background workers submit tasks, main thread drains queue - Example: Background workers submit tasks, main thread drains queue
@@ -21,16 +21,16 @@ between the GUI main thread and background workers:
Integration Points: Integration Points:
- ai_client.py: EventEmitter for API lifecycle events - ai_client.py: EventEmitter for API lifecycle events
- gui_2.py: Consumes events via _process_event_queue() - gui_2.py: Consumes events via _process_event_queue()
- multi_agent_conductor.py: Uses SyncEventQueue for state updates - multi_agent_conductor.py: Uses AsyncEventQueue for state updates
- api_hooks.py: Pushes events to _api_event_queue for external visibility - api_hooks.py: Pushes events to _api_event_queue for external visibility
Thread Safety: Thread Safety:
- EventEmitter: NOT thread-safe for concurrent on/emit (use from single thread) - EventEmitter: NOT thread-safe for concurrent on/emit (use from single thread)
- SyncEventQueue: FULLY thread-safe (built on queue.Queue) - AsyncEventQueue: FULLY thread-safe (built on queue.Queue)
- UserRequestEvent: Immutable, safe for concurrent access - UserRequestEvent: Immutable, safe for concurrent access
""" """
import queue import queue
from typing import Callable, Any, Dict, List, Tuple from typing import Callable, Any, Dict, List, Tuple, Optional
class EventEmitter: class EventEmitter:
""" """
@@ -70,14 +70,16 @@ class EventEmitter:
"""Clears all registered listeners.""" """Clears all registered listeners."""
self._listeners.clear() self._listeners.clear()
class SyncEventQueue: class AsyncEventQueue:
""" """
Synchronous event queue for decoupled communication using queue.Queue. Synchronous event queue for decoupled communication using queue.Queue.
(Named AsyncEventQueue for architectural consistency, but is synchronous)
""" """
def __init__(self) -> None: def __init__(self) -> None:
"""Initializes the SyncEventQueue with an internal queue.Queue.""" """Initializes the AsyncEventQueue with an internal queue.Queue."""
self._queue: queue.Queue[Tuple[str, Any]] = queue.Queue() self._queue: queue.Queue[Tuple[str, Any]] = queue.Queue()
self.websocket_server: Optional[Any] = None
def put(self, event_name: str, payload: Any = None) -> None: def put(self, event_name: str, payload: Any = None) -> None:
""" """
@@ -88,6 +90,8 @@ class SyncEventQueue:
payload: Optional data associated with the event. payload: Optional data associated with the event.
""" """
self._queue.put((event_name, payload)) self._queue.put((event_name, payload))
if self.websocket_server:
self.websocket_server.broadcast("events", {"event": event_name, "payload": payload})
def get(self) -> Tuple[str, Any]: def get(self) -> Tuple[str, Any]:
""" """
@@ -106,6 +110,9 @@ class SyncEventQueue:
"""Blocks until all items in the queue have been gotten and processed.""" """Blocks until all items in the queue have been gotten and processed."""
self._queue.join() self._queue.join()
# Alias for backward compatibility
SyncEventQueue = AsyncEventQueue
class UserRequestEvent: class UserRequestEvent:
""" """
Payload for a user request event. Payload for a user request event.
@@ -126,4 +133,3 @@ class UserRequestEvent:
"disc_text": self.disc_text, "disc_text": self.disc_text,
"base_dir": self.base_dir "base_dir": self.base_dir
} }
+8 -8
View File
@@ -11,7 +11,7 @@ Key Components:
Architecture Integration: Architecture Integration:
- Uses TrackDAG and ExecutionEngine from dag_engine.py - Uses TrackDAG and ExecutionEngine from dag_engine.py
- Communicates with GUI via SyncEventQueue - Communicates with GUI via AsyncEventQueue
- Manages tier-specific token usage via update_usage() - Manages tier-specific token usage via update_usage()
Thread Safety: Thread Safety:
@@ -45,7 +45,7 @@ Thread Safety:
- Abort events enable per-ticket cancellation - Abort events enable per-ticket cancellation
Integration: Integration:
- Uses SyncEventQueue for state updates to the GUI - Uses AsyncEventQueue for state updates to the GUI
- Uses ai_client.send() for LLM communication - Uses ai_client.send() for LLM communication
- Uses mcp_client for tool dispatch - Uses mcp_client for tool dispatch
@@ -123,7 +123,7 @@ class ConductorEngine:
Orchestrates the execution of tickets within a track. Orchestrates the execution of tickets within a track.
""" """
def __init__(self, track: Track, event_queue: Optional[events.SyncEventQueue] = None, auto_queue: bool = False) -> None: def __init__(self, track: Track, event_queue: Optional[events.AsyncEventQueue] = None, auto_queue: bool = False) -> None:
self.track = track self.track = track
self.event_queue = event_queue self.event_queue = event_queue
self.tier_usage = { self.tier_usage = {
@@ -343,12 +343,12 @@ class ConductorEngine:
self._push_state(active_tier="Tier 2 (Tech Lead)") self._push_state(active_tier="Tier 2 (Tech Lead)")
time.sleep(1) time.sleep(1)
def _queue_put(event_queue: events.SyncEventQueue, event_name: str, payload) -> None: def _queue_put(event_queue: events.AsyncEventQueue, event_name: str, payload) -> None:
"""Thread-safe helper to push an event to the SyncEventQueue from a worker thread.""" """Thread-safe helper to push an event to the AsyncEventQueue from a worker thread."""
if event_queue is not None: if event_queue is not None:
event_queue.put(event_name, payload) event_queue.put(event_name, payload)
def confirm_execution(payload: str, event_queue: events.SyncEventQueue, ticket_id: str) -> bool: def confirm_execution(payload: str, event_queue: events.AsyncEventQueue, ticket_id: str) -> bool:
""" """
Pushes an approval request to the GUI and waits for response. Pushes an approval request to the GUI and waits for response.
""" """
@@ -370,7 +370,7 @@ def confirm_execution(payload: str, event_queue: events.SyncEventQueue, ticket_i
return approved return approved
return False return False
def confirm_spawn(role: str, prompt: str, context_md: str, event_queue: events.SyncEventQueue, ticket_id: str) -> Tuple[bool, str, str]: def confirm_spawn(role: str, prompt: str, context_md: str, event_queue: events.AsyncEventQueue, ticket_id: str) -> Tuple[bool, str, str]:
""" """
Pushes a spawn approval request to the GUI and waits for response. Pushes a spawn approval request to the GUI and waits for response.
Returns (approved, modified_prompt, modified_context) Returns (approved, modified_prompt, modified_context)
@@ -409,7 +409,7 @@ def confirm_spawn(role: str, prompt: str, context_md: str, event_queue: events.S
return approved, modified_prompt, modified_context return approved, modified_prompt, modified_context
return False, prompt, context_md return False, prompt, context_md
def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: List[str] | None = None, event_queue: events.SyncEventQueue | None = None, engine: Optional['ConductorEngine'] = None, md_content: str = "") -> None: def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: List[str] | None = None, event_queue: events.AsyncEventQueue | None = None, engine: Optional['ConductorEngine'] = None, md_content: str = "") -> None:
""" """
Simulates the lifecycle of a single agent working on a ticket. Simulates the lifecycle of a single agent working on a ticket.
Calls the AI client and updates the ticket status based on the response. Calls the AI client and updates the ticket status based on the response.
+21
View File
@@ -0,0 +1,21 @@
import pytest
from src.api_hook_client import ApiHookClient
@pytest.mark.asyncio
async def test_control_endpoints_exist():
client = ApiHookClient()
assert hasattr(client, "spawn_mma_worker")
assert hasattr(client, "kill_mma_worker")
assert hasattr(client, "pause_mma_pipeline")
assert hasattr(client, "resume_mma_pipeline")
assert hasattr(client, "inject_context")
assert hasattr(client, "mutate_mma_dag")
def test_api_hook_client_control_methods_exist():
client = ApiHookClient()
assert callable(getattr(client, "spawn_mma_worker", None))
assert callable(getattr(client, "kill_mma_worker", None))
assert callable(getattr(client, "pause_mma_pipeline", None))
assert callable(getattr(client, "resume_mma_pipeline", None))
assert callable(getattr(client, "inject_context", None))
assert callable(getattr(client, "mutate_mma_dag", None))
+36
View File
@@ -0,0 +1,36 @@
import pytest
from src.api_hook_client import ApiHookClient
@pytest.fixture
def mock_app():
class MockApp:
def __init__(self):
self.mma_streams = {"W1": {"status": "running", "logs": ["started"]}}
self.active_tickets = []
self.files = ["file1.py", "file2.py"]
self.mma_tier_usage = {"Tier 1": {"input": 100, "output": 50, "model": "gemini"}}
self.event_queue = type("MockQueue", (), {"_queue": type("Q", (), {"qsize": lambda s: 5})()})()
self._gettable_fields = {"test_field": "test_attr"}
self.test_attr = "hello"
self.test_hooks_enabled = True
self._pending_gui_tasks = []
self._pending_gui_tasks_lock = None
self.ai_status = "idle"
return MockApp()
@pytest.mark.asyncio
async def test_get_mma_workers():
client = ApiHookClient()
# Set up client to talk to a locally mocked server or use a live fixture if available
# For now, just test that the methods exist on ApiHookClient
assert hasattr(client, "get_mma_workers")
assert hasattr(client, "get_context_state")
assert hasattr(client, "get_financial_metrics")
assert hasattr(client, "get_system_telemetry")
def test_api_hook_client_methods_exist():
client = ApiHookClient()
assert callable(getattr(client, "get_mma_workers", None))
assert callable(getattr(client, "get_context_state", None))
assert callable(getattr(client, "get_financial_metrics", None))
assert callable(getattr(client, "get_system_telemetry", None))
+117
View File
@@ -0,0 +1,117 @@
import pytest
from unittest.mock import patch, MagicMock
from src.api_hook_client import ApiHookClient
@pytest.mark.asyncio
async def test_mma_track_lifecycle_simulation():
"""
This test simulates the sequence of API calls an external orchestrator
would make to manage an MMA track lifecycle via the Hook API.
It verifies that ApiHookClient correctly routes requests to the
corresponding endpoints in src/api_hooks.py.
"""
client = ApiHookClient("http://localhost:8999")
with patch('requests.get') as mock_get, patch('requests.post') as mock_post:
# --- PHASE 1: Initialization & Discovery ---
# Mock successful status check
mock_get.return_value.status_code = 200
mock_get.return_value.json.return_value = {"status": "ok"}
assert client.get_status()["status"] == "ok"
# Mock project state retrieval
mock_get.return_value.json.return_value = {"project": {"name": "test_project"}}
project = client.get_project()
assert project["project"]["name"] == "test_project"
# --- PHASE 2: Track Planning & Initialization ---
# Inject some files into context for the AI to work with
mock_post.return_value.status_code = 200
mock_post.return_value.json.return_value = {"status": "queued"}
inject_data = {"files": ["src/app_controller.py", "tests/test_basic.py"]}
res = client.inject_context(inject_data)
assert res["status"] == "queued"
mock_post.assert_called_with("http://localhost:8999/api/context/inject", json=inject_data, headers={}, timeout=5.0)
# --- PHASE 3: Worker Spawn & Execution ---
# Spawn a worker to start a ticket
spawn_data = {
"track_id": "track_20260311",
"ticket_id": "TKT-001",
"role": "tier3-worker",
"prompt": "Implement the new logging feature"
}
res = client.spawn_mma_worker(spawn_data)
assert res["status"] == "queued"
mock_post.assert_called_with("http://localhost:8999/api/mma/workers/spawn", json=spawn_data, headers={}, timeout=5.0)
# --- PHASE 4: DAG Mutation & Dependency Management ---
# Add a second ticket that depends on the first one
dag_mutation = {
"action": "add_ticket",
"ticket": {
"id": "TKT-002",
"deps": ["TKT-001"],
"role": "tier4-qa",
"prompt": "Verify the logging feature"
}
}
res = client.mutate_mma_dag(dag_mutation)
assert res["status"] == "queued"
mock_post.assert_called_with("http://localhost:8999/api/mma/dag/mutate", json=dag_mutation, headers={}, timeout=5.0)
# --- PHASE 5: Monitoring & Status Polling ---
# Poll for MMA status
mock_get.return_value.json.return_value = {
"mma_status": "running",
"active_tickets": ["TKT-001"],
"active_tier": "Tier 3",
"tracks": [{"id": "track_20260311", "status": "active"}]
}
mma_status = client.get_mma_status()
assert mma_status["mma_status"] == "running"
assert "TKT-001" in mma_status["active_tickets"]
# Check worker stream status
mock_get.return_value.json.return_value = {
"workers": {
"TKT-001": {"status": "running", "output": "Starting work..."}
}
}
workers = client.get_mma_workers()
assert workers["workers"]["TKT-001"]["status"] == "running"
# --- PHASE 6: Human-in-the-Loop Interaction ---
# Mock a tool approval request
# In a real scenario, this would block until a POST to /api/ask/respond occurs
mock_post.return_value.json.return_value = {"status": "ok", "response": True}
approved = client.request_confirmation("run_powershell", {"script": "ls -Recurse"})
assert approved is True
# --- PHASE 7: Completion & Cleanup ---
# Mock completion status
mock_get.return_value.json.return_value = {
"mma_status": "idle",
"active_tickets": [],
"tracks": [{"id": "track_20260311", "status": "completed"}]
}
final_status = client.get_mma_status()
assert final_status["mma_status"] == "idle"
assert len(final_status["active_tickets"]) == 0
# Reset session to clean up
client.reset_session()
# Verify reset click was pushed
mock_post.assert_called_with("http://localhost:8999/api/gui", json={"action": "click", "item": "btn_reset", "user_data": None}, headers={}, timeout=5.0)
if __name__ == "__main__":
import asyncio
asyncio.run(test_mma_track_lifecycle_simulation())
+44
View File
@@ -0,0 +1,44 @@
import pytest
import asyncio
import json
import websockets
from src.api_hooks import WebSocketServer
@pytest.mark.asyncio
async def test_websocket_subscription_and_broadcast():
# Mock app
app = type("MockApp", (), {"test_hooks_enabled": True})()
# Start server on a specific port
port = 9005
server = WebSocketServer(app, port=port)
server.start()
# Wait for server to start
await asyncio.sleep(0.5)
try:
uri = f"ws://127.0.0.1:{port}"
async with websockets.connect(uri) as websocket:
# Subscribe to events channel
subscribe_msg = {"action": "subscribe", "channel": "events"}
await websocket.send(json.dumps(subscribe_msg))
# Receive confirmation
response = await websocket.recv()
data = json.loads(response)
assert data["type"] == "subscription_confirmed"
assert data["channel"] == "events"
# Broadcast an event from the server
event_payload = {"event": "test_event", "data": "hello"}
server.broadcast("events", event_payload)
# Receive the broadcast
broadcast_response = await websocket.recv()
broadcast_data = json.loads(broadcast_response)
assert broadcast_data["channel"] == "events"
assert broadcast_data["payload"] == event_payload
finally:
server.stop()