Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 036c2f360a | |||
| 930b833055 | |||
| 4777dd957a | |||
| e88f0f1831 | |||
| 1be576a9a0 | |||
| e8303b819b | |||
| 02e0fce548 |
+1
-1
@@ -22,7 +22,7 @@ This file tracks all major tracks for the project. Each track has its own detail
|
|||||||
*Link: [./tracks/tool_bias_tuning_20260308/](./tracks/tool_bias_tuning_20260308/)*
|
*Link: [./tracks/tool_bias_tuning_20260308/](./tracks/tool_bias_tuning_20260308/)*
|
||||||
*Goal: Influence agent tool selection via a weighting system. Implement semantic nudges in tool descriptions and a dynamic "Tooling Strategy" section in the system prompt. Includes GUI badges and sliders for weight adjustment.*
|
*Goal: Influence agent tool selection via a weighting system. Implement semantic nudges in tool descriptions and a dynamic "Tooling Strategy" section in the system prompt. Includes GUI badges and sliders for weight adjustment.*
|
||||||
|
|
||||||
4. [ ] **Track: Expanded Hook API & Headless Orchestration**
|
4. [~] **Track: Expanded Hook API & Headless Orchestration**
|
||||||
*Link: [./tracks/hook_api_expansion_20260308/](./tracks/hook_api_expansion_20260308/)*
|
*Link: [./tracks/hook_api_expansion_20260308/](./tracks/hook_api_expansion_20260308/)*
|
||||||
*Goal: Maximize internal state exposure and provide comprehensive control endpoints (worker spawn/kill, pipeline pause/resume, DAG mutation) via the Hook API. Implement WebSocket-based real-time event streaming.*
|
*Goal: Maximize internal state exposure and provide comprehensive control endpoints (worker spawn/kill, pipeline pause/resume, DAG mutation) via the Hook API. Implement WebSocket-based real-time event streaming.*
|
||||||
|
|
||||||
|
|||||||
@@ -1,44 +1,44 @@
|
|||||||
# Implementation Plan: Expanded Hook API & Headless Orchestration
|
# Implementation Plan: Expanded Hook API & Headless Orchestration
|
||||||
|
|
||||||
## Phase 1: WebSocket Infrastructure & Event Streaming
|
## Phase 1: WebSocket Infrastructure & Event Streaming
|
||||||
- [ ] Task: Implement the WebSocket gateway.
|
- [x] Task: Implement the WebSocket gateway.
|
||||||
- [ ] Integrate a lightweight WebSocket library (e.g., `websockets` or `simple-websocket`).
|
- [x] Integrate a lightweight WebSocket library (e.g., `websockets` or `simple-websocket`).
|
||||||
- [ ] Create a dedicated `WebSocketServer` class in `src/api_hooks.py` that runs on a separate port (e.g., 9000).
|
- [x] Create a dedicated `WebSocketServer` class in `src/api_hooks.py` that runs on a separate port (e.g., 9000).
|
||||||
- [ ] Implement a basic subscription mechanism for different event channels.
|
- [x] Implement a basic subscription mechanism for different event channels.
|
||||||
- [ ] Task: Connect the event queue to the WebSocket stream.
|
- [x] Task: Connect the event queue to the WebSocket stream.
|
||||||
- [ ] Update `AsyncEventQueue` to broadcast events to connected WebSocket clients.
|
- [x] Update `AsyncEventQueue` to broadcast events to connected WebSocket clients.
|
||||||
- [ ] Add high-frequency telemetry (FPS, CPU) to the event stream.
|
- [x] Add high-frequency telemetry (FPS, CPU) to the event stream.
|
||||||
- [ ] Task: Write unit tests for WebSocket connection and event broadcasting.
|
- [x] Task: Write unit tests for WebSocket connection and event broadcasting.
|
||||||
- [ ] Task: Conductor - User Manual Verification 'Phase 1: WebSocket Infrastructure' (Protocol in workflow.md)
|
- [x] Task: Conductor - User Manual Verification 'Phase 1: WebSocket Infrastructure' (Protocol in workflow.md)
|
||||||
|
|
||||||
## Phase 2: Expanded Read Endpoints (GET)
|
## Phase 2: Expanded Read Endpoints (GET)
|
||||||
- [ ] Task: Implement detailed state exposure endpoints.
|
- [x] Task: Implement detailed state exposure endpoints.
|
||||||
- [ ] Add `/api/mma/workers` to return the status, logs, and traces of all active sub-agents.
|
- [x] Add `/api/mma/workers` to return the status, logs, and traces of all active sub-agents.
|
||||||
- [ ] Add `/api/context/state` to expose AST cache metadata and file aggregation status.
|
- [x] Add `/api/context/state` to expose AST cache metadata and file aggregation status.
|
||||||
- [ ] Add `/api/metrics/financial` to return track-specific token usage and cost data.
|
- [x] Add `/api/metrics/financial` to return track-specific token usage and cost data.
|
||||||
- [ ] Add `/api/system/telemetry` to expose internal thread and queue metrics.
|
- [x] Add `/api/system/telemetry` to expose internal thread and queue metrics.
|
||||||
- [ ] Task: Enhance `/api/gui/state` to provide a truly exhaustive JSON dump of all internal managers.
|
- [x] Task: Enhance `/api/gui/state` to provide a truly exhaustive JSON dump of all internal managers.
|
||||||
- [ ] Task: Update `api_hook_client.py` with corresponding methods for all new GET endpoints.
|
- [x] Task: Update `api_hook_client.py` with corresponding methods for all new GET endpoints.
|
||||||
- [ ] Task: Write integration tests for all new GET endpoints using `live_gui`.
|
- [x] Task: Write integration tests for all new GET endpoints using `live_gui`.
|
||||||
- [ ] Task: Conductor - User Manual Verification 'Phase 2: Expanded Read Endpoints' (Protocol in workflow.md)
|
- [x] Task: Conductor - User Manual Verification 'Phase 2: Expanded Read Endpoints' (Protocol in workflow.md)
|
||||||
|
|
||||||
## Phase 3: Comprehensive Control Endpoints (POST)
|
## Phase 3: Comprehensive Control Endpoints (POST)
|
||||||
- [ ] Task: Implement worker and pipeline control.
|
- [x] Task: Implement worker and pipeline control.
|
||||||
- [ ] Add `/api/mma/workers/spawn` to manually initiate sub-agent execution via the API.
|
- [x] Add `/api/mma/workers/spawn` to manually initiate sub-agent execution via the API.
|
||||||
- [ ] Add `/api/mma/workers/kill` to programmatically abort running workers.
|
- [x] Add `/api/mma/workers/kill` to programmatically abort running workers.
|
||||||
- [ ] Add `/api/mma/pipeline/pause` and `/api/mma/pipeline/resume` to control the global MMA loop.
|
- [x] Add `/api/mma/pipeline/pause` and `/api/mma/pipeline/resume` to control the global MMA loop.
|
||||||
- [ ] Task: Implement context and DAG mutation.
|
- [x] Task: Implement context and DAG mutation.
|
||||||
- [ ] Add `/api/context/inject` to allow programmatic context injection (files/skeletons).
|
- [x] Add `/api/context/inject` to allow programmatic context injection (files/skeletons).
|
||||||
- [ ] Add `/api/mma/dag/mutate` to allow modifying task dependencies through the API.
|
- [x] Add `/api/mma/dag/mutate` to allow modifying task dependencies through the API.
|
||||||
- [ ] Task: Update `api_hook_client.py` with corresponding methods for all new POST endpoints.
|
- [x] Task: Update `api_hook_client.py` with corresponding methods for all new POST endpoints.
|
||||||
- [ ] Task: Write integration tests for all new control endpoints using `live_gui`.
|
- [x] Task: Write integration tests for all new control endpoints using `live_gui`.
|
||||||
- [ ] Task: Conductor - User Manual Verification 'Phase 3: Comprehensive Control Endpoints' (Protocol in workflow.md)
|
- [x] Task: Conductor - User Manual Verification 'Phase 3: Comprehensive Control Endpoints' (Protocol in workflow.md)
|
||||||
|
|
||||||
## Phase 4: Headless Refinement & Verification
|
## Phase 4: Headless Refinement & Verification
|
||||||
- [ ] Task: Improve error reporting.
|
- [x] Task: Improve error reporting.
|
||||||
- [ ] Refactor `HookHandler` to catch and wrap all internal exceptions in JSON error responses.
|
- [x] Refactor `HookHandler` to catch and wrap all internal exceptions in JSON error responses.
|
||||||
- [ ] Task: Conduct a full headless simulation.
|
- [x] Task: Conduct a full headless simulation.
|
||||||
- [ ] Create a specialized simulation script that replicates a full MMA track lifecycle (planning, worker spawn, DAG mutation, completion) using ONLY the Hook API.
|
- [x] Create a specialized simulation script that replicates a full MMA track lifecycle (planning, worker spawn, DAG mutation, completion) using ONLY the Hook API.
|
||||||
- [ ] Task: Final performance audit.
|
- [x] Task: Final performance audit.
|
||||||
- [ ] Ensure that active WebSocket clients and large state dumps do not cause GUI frame drops.
|
- [x] Ensure that active WebSocket clients and large state dumps do not cause GUI frame drops.
|
||||||
- [ ] Task: Conductor - User Manual Verification 'Phase 4: Headless Refinement & Verification' (Protocol in workflow.md)
|
- [ ] Task: Conductor - User Manual Verification 'Phase 4: Headless Refinement & Verification' (Protocol in workflow.md)
|
||||||
|
|||||||
+35
-1
@@ -116,7 +116,7 @@ class ApiHookClient:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
def post_gui(self, payload: dict) -> dict[str, Any]:
|
def post_gui(self, payload: dict) -> dict[str, Any]:
|
||||||
"""Pushes an event to the GUI's SyncEventQueue via the /api/gui endpoint."""
|
"""Pushes an event to the GUI's AsyncEventQueue via the /api/gui endpoint."""
|
||||||
return self._make_request('POST', '/api/gui', data=payload) or {}
|
return self._make_request('POST', '/api/gui', data=payload) or {}
|
||||||
|
|
||||||
def push_event(self, action: str, payload: dict) -> dict[str, Any]:
|
def push_event(self, action: str, payload: dict) -> dict[str, Any]:
|
||||||
@@ -186,6 +186,22 @@ class ApiHookClient:
|
|||||||
"""Retrieves the dedicated MMA engine status."""
|
"""Retrieves the dedicated MMA engine status."""
|
||||||
return self._make_request('GET', '/api/gui/mma_status') or {}
|
return self._make_request('GET', '/api/gui/mma_status') or {}
|
||||||
|
|
||||||
|
def get_mma_workers(self) -> dict[str, Any]:
|
||||||
|
"""Retrieves status for all active MMA workers."""
|
||||||
|
return self._make_request('GET', '/api/mma/workers') or {}
|
||||||
|
|
||||||
|
def get_context_state(self) -> dict[str, Any]:
|
||||||
|
"""Retrieves the current file and screenshot context state."""
|
||||||
|
return self._make_request('GET', '/api/context/state') or {}
|
||||||
|
|
||||||
|
def get_financial_metrics(self) -> dict[str, Any]:
|
||||||
|
"""Retrieves token usage and estimated financial cost metrics."""
|
||||||
|
return self._make_request('GET', '/api/metrics/financial') or {}
|
||||||
|
|
||||||
|
def get_system_telemetry(self) -> dict[str, Any]:
|
||||||
|
"""Retrieves system-level telemetry including thread status and event queue size."""
|
||||||
|
return self._make_request('GET', '/api/system/telemetry') or {}
|
||||||
|
|
||||||
def get_node_status(self, node_id: str) -> dict[str, Any]:
|
def get_node_status(self, node_id: str) -> dict[str, Any]:
|
||||||
"""Retrieves status for a specific node in the MMA DAG."""
|
"""Retrieves status for a specific node in the MMA DAG."""
|
||||||
return self._make_request('GET', f'/api/mma/node/{node_id}') or {}
|
return self._make_request('GET', f'/api/mma/node/{node_id}') or {}
|
||||||
@@ -224,3 +240,21 @@ class ApiHookClient:
|
|||||||
"""Gets the current patch modal status."""
|
"""Gets the current patch modal status."""
|
||||||
return self._make_request('GET', '/api/patch/status') or {}
|
return self._make_request('GET', '/api/patch/status') or {}
|
||||||
|
|
||||||
|
def spawn_mma_worker(self, data: dict) -> dict:
|
||||||
|
return self._make_request('POST', '/api/mma/workers/spawn', data=data) or {}
|
||||||
|
|
||||||
|
def kill_mma_worker(self, worker_id: str) -> dict:
|
||||||
|
return self._make_request('POST', '/api/mma/workers/kill', data={"worker_id": worker_id}) or {}
|
||||||
|
|
||||||
|
def pause_mma_pipeline(self) -> dict:
|
||||||
|
return self._make_request('POST', '/api/mma/pipeline/pause') or {}
|
||||||
|
|
||||||
|
def resume_mma_pipeline(self) -> dict:
|
||||||
|
return self._make_request('POST', '/api/mma/pipeline/resume') or {}
|
||||||
|
|
||||||
|
def inject_context(self, data: dict) -> dict:
|
||||||
|
return self._make_request('POST', '/api/context/inject', data=data) or {}
|
||||||
|
|
||||||
|
def mutate_mma_dag(self, data: dict) -> dict:
|
||||||
|
return self._make_request('POST', '/api/mma/dag/mutate', data=data) or {}
|
||||||
|
|
||||||
|
|||||||
+340
-140
@@ -3,10 +3,14 @@ import json
|
|||||||
import threading
|
import threading
|
||||||
import uuid
|
import uuid
|
||||||
import sys
|
import sys
|
||||||
|
import asyncio
|
||||||
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
|
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
|
||||||
from typing import Any
|
from typing import Any
|
||||||
import logging
|
import logging
|
||||||
|
import websockets
|
||||||
|
from websockets.asyncio.server import serve
|
||||||
from src import session_logger
|
from src import session_logger
|
||||||
|
from src import cost_tracker
|
||||||
"""
|
"""
|
||||||
API Hooks - REST API for external automation and state inspection.
|
API Hooks - REST API for external automation and state inspection.
|
||||||
|
|
||||||
@@ -77,165 +81,207 @@ def _serialize_for_api(obj: Any) -> Any:
|
|||||||
class HookHandler(BaseHTTPRequestHandler):
|
class HookHandler(BaseHTTPRequestHandler):
|
||||||
"""Handles incoming HTTP requests for the API hooks."""
|
"""Handles incoming HTTP requests for the API hooks."""
|
||||||
def do_GET(self) -> None:
|
def do_GET(self) -> None:
|
||||||
app = self.server.app
|
try:
|
||||||
session_logger.log_api_hook("GET", self.path, "")
|
app = self.server.app
|
||||||
if self.path == "/status":
|
session_logger.log_api_hook("GET", self.path, "")
|
||||||
self.send_response(200)
|
if self.path == "/status":
|
||||||
self.send_header("Content-Type", "application/json")
|
self.send_response(200)
|
||||||
self.end_headers()
|
self.send_header("Content-Type", "application/json")
|
||||||
self.wfile.write(json.dumps({"status": "ok"}).encode("utf-8"))
|
self.end_headers()
|
||||||
elif self.path == "/api/project":
|
self.wfile.write(json.dumps({"status": "ok"}).encode("utf-8"))
|
||||||
from src import project_manager
|
elif self.path == "/api/project":
|
||||||
self.send_response(200)
|
from src import project_manager
|
||||||
self.send_header("Content-Type", "application/json")
|
self.send_response(200)
|
||||||
self.end_headers()
|
self.send_header("Content-Type", "application/json")
|
||||||
flat = project_manager.flat_config(_get_app_attr(app, "project"))
|
self.end_headers()
|
||||||
self.wfile.write(json.dumps({"project": flat}).encode("utf-8"))
|
flat = project_manager.flat_config(_get_app_attr(app, "project"))
|
||||||
elif self.path == "/api/session":
|
self.wfile.write(json.dumps({"project": flat}).encode("utf-8"))
|
||||||
self.send_response(200)
|
elif self.path == "/api/session":
|
||||||
self.send_header("Content-Type", "application/json")
|
self.send_response(200)
|
||||||
self.end_headers()
|
self.send_header("Content-Type", "application/json")
|
||||||
lock = _get_app_attr(app, "_disc_entries_lock")
|
self.end_headers()
|
||||||
entries = _get_app_attr(app, "disc_entries", [])
|
lock = _get_app_attr(app, "_disc_entries_lock")
|
||||||
if lock:
|
entries = _get_app_attr(app, "disc_entries", [])
|
||||||
with lock: entries_snapshot = list(entries)
|
|
||||||
else:
|
|
||||||
entries_snapshot = list(entries)
|
|
||||||
self.wfile.write(json.dumps({"session": {"entries": entries_snapshot}}).encode("utf-8"))
|
|
||||||
elif self.path == "/api/performance":
|
|
||||||
self.send_response(200)
|
|
||||||
self.send_header("Content-Type", "application/json")
|
|
||||||
self.end_headers()
|
|
||||||
metrics = {}
|
|
||||||
perf = _get_app_attr(app, "perf_monitor")
|
|
||||||
if perf: metrics = perf.get_metrics()
|
|
||||||
self.wfile.write(json.dumps({"performance": metrics}).encode("utf-8"))
|
|
||||||
elif self.path == "/api/events":
|
|
||||||
self.send_response(200)
|
|
||||||
self.send_header("Content-Type", "application/json")
|
|
||||||
self.end_headers()
|
|
||||||
events = []
|
|
||||||
if _has_app_attr(app, "_api_event_queue"):
|
|
||||||
lock = _get_app_attr(app, "_api_event_queue_lock")
|
|
||||||
queue = _get_app_attr(app, "_api_event_queue")
|
|
||||||
if lock:
|
if lock:
|
||||||
with lock:
|
with lock: entries_snapshot = list(entries)
|
||||||
|
else:
|
||||||
|
entries_snapshot = list(entries)
|
||||||
|
self.wfile.write(json.dumps({"session": {"entries": entries_snapshot}}).encode("utf-8"))
|
||||||
|
elif self.path == "/api/performance":
|
||||||
|
self.send_response(200)
|
||||||
|
self.send_header("Content-Type", "application/json")
|
||||||
|
self.end_headers()
|
||||||
|
metrics = {}
|
||||||
|
perf = _get_app_attr(app, "perf_monitor")
|
||||||
|
if perf: metrics = perf.get_metrics()
|
||||||
|
self.wfile.write(json.dumps({"performance": metrics}).encode("utf-8"))
|
||||||
|
elif self.path == "/api/events":
|
||||||
|
self.send_response(200)
|
||||||
|
self.send_header("Content-Type", "application/json")
|
||||||
|
self.end_headers()
|
||||||
|
events = []
|
||||||
|
if _has_app_attr(app, "_api_event_queue"):
|
||||||
|
lock = _get_app_attr(app, "_api_event_queue_lock")
|
||||||
|
queue = _get_app_attr(app, "_api_event_queue")
|
||||||
|
if lock:
|
||||||
|
with lock:
|
||||||
|
events = list(queue)
|
||||||
|
queue.clear()
|
||||||
|
else:
|
||||||
events = list(queue)
|
events = list(queue)
|
||||||
queue.clear()
|
queue.clear()
|
||||||
|
self.wfile.write(json.dumps({"events": events}).encode("utf-8"))
|
||||||
|
elif self.path.startswith("/api/gui/value/"):
|
||||||
|
field_tag = self.path.split("/")[-1]
|
||||||
|
event = threading.Event()
|
||||||
|
result = {"value": None}
|
||||||
|
def get_val():
|
||||||
|
try:
|
||||||
|
settable = _get_app_attr(app, "_settable_fields", {})
|
||||||
|
gettable = _get_app_attr(app, "_gettable_fields", {})
|
||||||
|
combined = {**settable, **gettable}
|
||||||
|
if field_tag in combined:
|
||||||
|
attr = combined[field_tag]
|
||||||
|
result["value"] = _get_app_attr(app, attr, None)
|
||||||
|
else:
|
||||||
|
sys.stderr.write(f"[DEBUG] Hook API: field {field_tag} not found in settable or gettable\n")
|
||||||
|
sys.stderr.flush()
|
||||||
|
finally: event.set()
|
||||||
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
||||||
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
||||||
|
if lock and tasks is not None:
|
||||||
|
with lock: tasks.append({"action": "custom_callback", "callback": get_val})
|
||||||
|
if event.wait(timeout=10):
|
||||||
|
self.send_response(200)
|
||||||
|
self.send_header("Content-Type", "application/json")
|
||||||
|
self.end_headers()
|
||||||
|
self.wfile.write(json.dumps(result).encode("utf-8"))
|
||||||
else:
|
else:
|
||||||
events = list(queue)
|
self.send_response(504)
|
||||||
queue.clear()
|
self.end_headers()
|
||||||
self.wfile.write(json.dumps({"events": events}).encode("utf-8"))
|
elif self.path == "/api/gui/mma_status":
|
||||||
elif self.path.startswith("/api/gui/value/"):
|
event = threading.Event()
|
||||||
field_tag = self.path.split("/")[-1]
|
result = {}
|
||||||
event = threading.Event()
|
def get_mma():
|
||||||
result = {"value": None}
|
try:
|
||||||
def get_val():
|
result["mma_status"] = _get_app_attr(app, "mma_status", "idle")
|
||||||
try:
|
result["ai_status"] = _get_app_attr(app, "ai_status", "idle")
|
||||||
settable = _get_app_attr(app, "_settable_fields", {})
|
result["active_tier"] = _get_app_attr(app, "active_tier", None)
|
||||||
gettable = _get_app_attr(app, "_gettable_fields", {})
|
at = _get_app_attr(app, "active_track", None)
|
||||||
combined = {**settable, **gettable}
|
result["active_track"] = at.id if hasattr(at, "id") else at
|
||||||
if field_tag in combined:
|
result["active_tickets"] = _get_app_attr(app, "active_tickets", [])
|
||||||
attr = combined[field_tag]
|
result["mma_step_mode"] = _get_app_attr(app, "mma_step_mode", False)
|
||||||
result["value"] = _get_app_attr(app, attr, None)
|
result["pending_tool_approval"] = _get_app_attr(app, "_pending_ask_dialog", False)
|
||||||
else:
|
result["pending_script_approval"] = _get_app_attr(app, "_pending_dialog", None) is not None
|
||||||
sys.stderr.write(f"[DEBUG] Hook API: field {field_tag} not found in settable or gettable\n")
|
result["pending_mma_step_approval"] = _get_app_attr(app, "_pending_mma_approval", None) is not None
|
||||||
sys.stderr.flush()
|
result["pending_mma_spawn_approval"] = _get_app_attr(app, "_pending_mma_spawn", None) is not None
|
||||||
finally: event.set()
|
result["pending_approval"] = result["pending_mma_step_approval"] or result["pending_tool_approval"]
|
||||||
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
result["pending_spawn"] = result["pending_mma_spawn_approval"]
|
||||||
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
result["tracks"] = _get_app_attr(app, "tracks", [])
|
||||||
if lock and tasks is not None:
|
result["proposed_tracks"] = _get_app_attr(app, "proposed_tracks", [])
|
||||||
with lock: tasks.append({"action": "custom_callback", "callback": get_val})
|
result["mma_streams"] = _get_app_attr(app, "mma_streams", {})
|
||||||
if event.wait(timeout=10):
|
result["mma_tier_usage"] = _get_app_attr(app, "mma_tier_usage", {})
|
||||||
|
finally: event.set()
|
||||||
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
||||||
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
||||||
|
if lock and tasks is not None:
|
||||||
|
with lock: tasks.append({"action": "custom_callback", "callback": get_mma})
|
||||||
|
if event.wait(timeout=10):
|
||||||
|
self.send_response(200)
|
||||||
|
self.send_header("Content-Type", "application/json")
|
||||||
|
self.end_headers()
|
||||||
|
self.wfile.write(json.dumps(result).encode("utf-8"))
|
||||||
|
else:
|
||||||
|
self.send_response(504)
|
||||||
|
self.end_headers()
|
||||||
|
elif self.path == "/api/gui/diagnostics":
|
||||||
|
event = threading.Event()
|
||||||
|
result = {}
|
||||||
|
def check_all():
|
||||||
|
try:
|
||||||
|
status = _get_app_attr(app, "ai_status", "idle")
|
||||||
|
result["thinking"] = status in ["sending...", "running powershell..."]
|
||||||
|
result["live"] = status in ["running powershell...", "fetching url...", "searching web...", "powershell done, awaiting AI..."]
|
||||||
|
result["prior"] = _get_app_attr(app, "is_viewing_prior_session", False)
|
||||||
|
finally: event.set()
|
||||||
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
||||||
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
||||||
|
if lock and tasks is not None:
|
||||||
|
with lock: tasks.append({"action": "custom_callback", "callback": check_all})
|
||||||
|
if event.wait(timeout=10):
|
||||||
|
self.send_response(200)
|
||||||
|
self.send_header("Content-Type", "application/json")
|
||||||
|
self.end_headers()
|
||||||
|
self.wfile.write(json.dumps(result).encode("utf-8"))
|
||||||
|
else:
|
||||||
|
self.send_response(504)
|
||||||
|
self.end_headers()
|
||||||
|
elif self.path == '/api/gui/state':
|
||||||
|
event = threading.Event()
|
||||||
|
result = {}
|
||||||
|
def get_state():
|
||||||
|
try:
|
||||||
|
gettable = _get_app_attr(app, "_gettable_fields", {})
|
||||||
|
for key, attr in gettable.items():
|
||||||
|
val = _get_app_attr(app, attr, None)
|
||||||
|
result[key] = _serialize_for_api(val)
|
||||||
|
finally: event.set()
|
||||||
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
||||||
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
||||||
|
if lock and tasks is not None:
|
||||||
|
with lock: tasks.append({"action": "custom_callback", "callback": get_state})
|
||||||
|
if event.wait(timeout=10):
|
||||||
|
self.send_response(200)
|
||||||
|
self.send_header("Content-Type", "application/json")
|
||||||
|
self.end_headers()
|
||||||
|
self.wfile.write(json.dumps(result).encode("utf-8"))
|
||||||
|
else:
|
||||||
|
self.send_response(504)
|
||||||
|
self.end_headers()
|
||||||
|
elif self.path == "/api/mma/workers":
|
||||||
self.send_response(200)
|
self.send_response(200)
|
||||||
self.send_header("Content-Type", "application/json")
|
self.send_header("Content-Type", "application/json")
|
||||||
self.end_headers()
|
self.end_headers()
|
||||||
self.wfile.write(json.dumps(result).encode("utf-8"))
|
mma_streams = _get_app_attr(app, "mma_streams", {})
|
||||||
else:
|
self.wfile.write(json.dumps({"workers": _serialize_for_api(mma_streams)}).encode("utf-8"))
|
||||||
self.send_response(504)
|
elif self.path == "/api/context/state":
|
||||||
self.end_headers()
|
|
||||||
elif self.path == "/api/gui/mma_status":
|
|
||||||
event = threading.Event()
|
|
||||||
result = {}
|
|
||||||
def get_mma():
|
|
||||||
try:
|
|
||||||
result["mma_status"] = _get_app_attr(app, "mma_status", "idle")
|
|
||||||
result["ai_status"] = _get_app_attr(app, "ai_status", "idle")
|
|
||||||
result["active_tier"] = _get_app_attr(app, "active_tier", None)
|
|
||||||
at = _get_app_attr(app, "active_track", None)
|
|
||||||
result["active_track"] = at.id if hasattr(at, "id") else at
|
|
||||||
result["active_tickets"] = _get_app_attr(app, "active_tickets", [])
|
|
||||||
result["mma_step_mode"] = _get_app_attr(app, "mma_step_mode", False)
|
|
||||||
result["pending_tool_approval"] = _get_app_attr(app, "_pending_ask_dialog", False)
|
|
||||||
result["pending_script_approval"] = _get_app_attr(app, "_pending_dialog", None) is not None
|
|
||||||
result["pending_mma_step_approval"] = _get_app_attr(app, "_pending_mma_approval", None) is not None
|
|
||||||
result["pending_mma_spawn_approval"] = _get_app_attr(app, "_pending_mma_spawn", None) is not None
|
|
||||||
result["pending_approval"] = result["pending_mma_step_approval"] or result["pending_tool_approval"]
|
|
||||||
result["pending_spawn"] = result["pending_mma_spawn_approval"]
|
|
||||||
result["tracks"] = _get_app_attr(app, "tracks", [])
|
|
||||||
result["proposed_tracks"] = _get_app_attr(app, "proposed_tracks", [])
|
|
||||||
result["mma_streams"] = _get_app_attr(app, "mma_streams", {})
|
|
||||||
result["mma_tier_usage"] = _get_app_attr(app, "mma_tier_usage", {})
|
|
||||||
finally: event.set()
|
|
||||||
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
|
||||||
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
|
||||||
if lock and tasks is not None:
|
|
||||||
with lock: tasks.append({"action": "custom_callback", "callback": get_mma})
|
|
||||||
if event.wait(timeout=10):
|
|
||||||
self.send_response(200)
|
self.send_response(200)
|
||||||
self.send_header("Content-Type", "application/json")
|
self.send_header("Content-Type", "application/json")
|
||||||
self.end_headers()
|
self.end_headers()
|
||||||
self.wfile.write(json.dumps(result).encode("utf-8"))
|
files = _get_app_attr(app, "files", [])
|
||||||
else:
|
screenshots = _get_app_attr(app, "screenshots", [])
|
||||||
self.send_response(504)
|
self.wfile.write(json.dumps({"files": files, "screenshots": screenshots}).encode("utf-8"))
|
||||||
self.end_headers()
|
elif self.path == "/api/metrics/financial":
|
||||||
elif self.path == "/api/gui/diagnostics":
|
|
||||||
event = threading.Event()
|
|
||||||
result = {}
|
|
||||||
def check_all():
|
|
||||||
try:
|
|
||||||
status = _get_app_attr(app, "ai_status", "idle")
|
|
||||||
result["thinking"] = status in ["sending...", "running powershell..."]
|
|
||||||
result["live"] = status in ["running powershell...", "fetching url...", "searching web...", "powershell done, awaiting AI..."]
|
|
||||||
result["prior"] = _get_app_attr(app, "is_viewing_prior_session", False)
|
|
||||||
finally: event.set()
|
|
||||||
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
|
||||||
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
|
||||||
if lock and tasks is not None:
|
|
||||||
with lock: tasks.append({"action": "custom_callback", "callback": check_all})
|
|
||||||
if event.wait(timeout=10):
|
|
||||||
self.send_response(200)
|
self.send_response(200)
|
||||||
self.send_header("Content-Type", "application/json")
|
self.send_header("Content-Type", "application/json")
|
||||||
self.end_headers()
|
self.end_headers()
|
||||||
self.wfile.write(json.dumps(result).encode("utf-8"))
|
usage = _get_app_attr(app, "mma_tier_usage", {})
|
||||||
else:
|
metrics = {}
|
||||||
self.send_response(504)
|
for tier, data in usage.items():
|
||||||
self.end_headers()
|
model = data.get("model", "")
|
||||||
elif self.path == '/api/gui/state':
|
in_t = data.get("input", 0)
|
||||||
event = threading.Event()
|
out_t = data.get("output", 0)
|
||||||
result = {}
|
cost = cost_tracker.estimate_cost(model, in_t, out_t)
|
||||||
def get_state():
|
metrics[tier] = {**data, "estimated_cost": cost}
|
||||||
try:
|
self.wfile.write(json.dumps({"financial": metrics}).encode("utf-8"))
|
||||||
gettable = _get_app_attr(app, "_gettable_fields", {})
|
elif self.path == "/api/system/telemetry":
|
||||||
for key, attr in gettable.items():
|
|
||||||
val = _get_app_attr(app, attr, None)
|
|
||||||
result[key] = _serialize_for_api(val)
|
|
||||||
finally: event.set()
|
|
||||||
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
|
||||||
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
|
||||||
if lock and tasks is not None:
|
|
||||||
with lock: tasks.append({"action": "custom_callback", "callback": get_state})
|
|
||||||
if event.wait(timeout=10):
|
|
||||||
self.send_response(200)
|
self.send_response(200)
|
||||||
self.send_header("Content-Type", "application/json")
|
self.send_header("Content-Type", "application/json")
|
||||||
self.end_headers()
|
self.end_headers()
|
||||||
self.wfile.write(json.dumps(result).encode("utf-8"))
|
threads = [t.name for t in threading.enumerate()]
|
||||||
|
queue_size = 0
|
||||||
|
if _has_app_attr(app, "_api_event_queue"):
|
||||||
|
queue = _get_app_attr(app, "_api_event_queue")
|
||||||
|
if queue: queue_size = len(queue)
|
||||||
|
self.wfile.write(json.dumps({"threads": threads, "event_queue_size": queue_size}).encode("utf-8"))
|
||||||
else:
|
else:
|
||||||
self.send_response(504)
|
self.send_response(404)
|
||||||
self.end_headers()
|
self.end_headers()
|
||||||
else:
|
except Exception as e:
|
||||||
self.send_response(404)
|
self.send_response(500)
|
||||||
|
self.send_header("Content-Type", "application/json")
|
||||||
self.end_headers()
|
self.end_headers()
|
||||||
|
self.wfile.write(json.dumps({"error": str(e)}).encode("utf-8"))
|
||||||
|
|
||||||
def do_POST(self) -> None:
|
def do_POST(self) -> None:
|
||||||
app = self.server.app
|
app = self.server.app
|
||||||
@@ -479,6 +525,90 @@ class HookHandler(BaseHTTPRequestHandler):
|
|||||||
else:
|
else:
|
||||||
self.send_response(404)
|
self.send_response(404)
|
||||||
self.end_headers()
|
self.end_headers()
|
||||||
|
elif self.path == "/api/mma/workers/spawn":
|
||||||
|
def spawn_worker():
|
||||||
|
try:
|
||||||
|
func = _get_app_attr(app, "_spawn_worker")
|
||||||
|
if func: func(data)
|
||||||
|
except Exception as e:
|
||||||
|
sys.stderr.write(f"[DEBUG] Hook API spawn_worker error: {e}\n")
|
||||||
|
sys.stderr.flush()
|
||||||
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
||||||
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
||||||
|
if lock and tasks is not None:
|
||||||
|
with lock: tasks.append({"action": "custom_callback", "callback": spawn_worker})
|
||||||
|
self.send_response(200)
|
||||||
|
self.send_header("Content-Type", "application/json")
|
||||||
|
self.end_headers()
|
||||||
|
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
|
||||||
|
elif self.path == "/api/mma/workers/kill":
|
||||||
|
def kill_worker():
|
||||||
|
try:
|
||||||
|
worker_id = data.get("worker_id")
|
||||||
|
func = _get_app_attr(app, "_kill_worker")
|
||||||
|
if func: func(worker_id)
|
||||||
|
except Exception as e:
|
||||||
|
sys.stderr.write(f"[DEBUG] Hook API kill_worker error: {e}\n")
|
||||||
|
sys.stderr.flush()
|
||||||
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
||||||
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
||||||
|
if lock and tasks is not None:
|
||||||
|
with lock: tasks.append({"action": "custom_callback", "callback": kill_worker})
|
||||||
|
self.send_response(200)
|
||||||
|
self.send_header("Content-Type", "application/json")
|
||||||
|
self.end_headers()
|
||||||
|
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
|
||||||
|
elif self.path == "/api/mma/pipeline/pause":
|
||||||
|
def pause_pipeline():
|
||||||
|
_set_app_attr(app, "mma_step_mode", True)
|
||||||
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
||||||
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
||||||
|
if lock and tasks is not None:
|
||||||
|
with lock: tasks.append({"action": "custom_callback", "callback": pause_pipeline})
|
||||||
|
self.send_response(200)
|
||||||
|
self.send_header("Content-Type", "application/json")
|
||||||
|
self.end_headers()
|
||||||
|
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
|
||||||
|
elif self.path == "/api/mma/pipeline/resume":
|
||||||
|
def resume_pipeline():
|
||||||
|
_set_app_attr(app, "mma_step_mode", False)
|
||||||
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
||||||
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
||||||
|
if lock and tasks is not None:
|
||||||
|
with lock: tasks.append({"action": "custom_callback", "callback": resume_pipeline})
|
||||||
|
self.send_response(200)
|
||||||
|
self.send_header("Content-Type", "application/json")
|
||||||
|
self.end_headers()
|
||||||
|
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
|
||||||
|
elif self.path == "/api/context/inject":
|
||||||
|
def inject_context():
|
||||||
|
files = _get_app_attr(app, "files")
|
||||||
|
if isinstance(files, list):
|
||||||
|
files.extend(data.get("files", []))
|
||||||
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
||||||
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
||||||
|
if lock and tasks is not None:
|
||||||
|
with lock: tasks.append({"action": "custom_callback", "callback": inject_context})
|
||||||
|
self.send_response(200)
|
||||||
|
self.send_header("Content-Type", "application/json")
|
||||||
|
self.end_headers()
|
||||||
|
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
|
||||||
|
elif self.path == "/api/mma/dag/mutate":
|
||||||
|
def mutate_dag():
|
||||||
|
try:
|
||||||
|
func = _get_app_attr(app, "_mutate_dag")
|
||||||
|
if func: func(data)
|
||||||
|
except Exception as e:
|
||||||
|
sys.stderr.write(f"[DEBUG] Hook API mutate_dag error: {e}\n")
|
||||||
|
sys.stderr.flush()
|
||||||
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
||||||
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
||||||
|
if lock and tasks is not None:
|
||||||
|
with lock: tasks.append({"action": "custom_callback", "callback": mutate_dag})
|
||||||
|
self.send_response(200)
|
||||||
|
self.send_header("Content-Type", "application/json")
|
||||||
|
self.end_headers()
|
||||||
|
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
|
||||||
else:
|
else:
|
||||||
self.send_response(404)
|
self.send_response(404)
|
||||||
self.end_headers()
|
self.end_headers()
|
||||||
@@ -498,6 +628,7 @@ class HookServer:
|
|||||||
self.port = port
|
self.port = port
|
||||||
self.server = None
|
self.server = None
|
||||||
self.thread = None
|
self.thread = None
|
||||||
|
self.websocket_server: WebSocketServer | None = None
|
||||||
|
|
||||||
def start(self) -> None:
|
def start(self) -> None:
|
||||||
if self.thread and self.thread.is_alive():
|
if self.thread and self.thread.is_alive():
|
||||||
@@ -511,12 +642,22 @@ class HookServer:
|
|||||||
if not _has_app_attr(self.app, '_ask_responses'): _set_app_attr(self.app, '_ask_responses', {})
|
if not _has_app_attr(self.app, '_ask_responses'): _set_app_attr(self.app, '_ask_responses', {})
|
||||||
if not _has_app_attr(self.app, '_api_event_queue'): _set_app_attr(self.app, '_api_event_queue', [])
|
if not _has_app_attr(self.app, '_api_event_queue'): _set_app_attr(self.app, '_api_event_queue', [])
|
||||||
if not _has_app_attr(self.app, '_api_event_queue_lock'): _set_app_attr(self.app, '_api_event_queue_lock', threading.Lock())
|
if not _has_app_attr(self.app, '_api_event_queue_lock'): _set_app_attr(self.app, '_api_event_queue_lock', threading.Lock())
|
||||||
|
|
||||||
|
self.websocket_server = WebSocketServer(self.app)
|
||||||
|
self.websocket_server.start()
|
||||||
|
|
||||||
|
eq = _get_app_attr(self.app, 'event_queue')
|
||||||
|
if eq:
|
||||||
|
eq.websocket_server = self.websocket_server
|
||||||
|
|
||||||
self.server = HookServerInstance(('127.0.0.1', self.port), HookHandler, self.app)
|
self.server = HookServerInstance(('127.0.0.1', self.port), HookHandler, self.app)
|
||||||
self.thread = threading.Thread(target=self.server.serve_forever, daemon=True)
|
self.thread = threading.Thread(target=self.server.serve_forever, daemon=True)
|
||||||
self.thread.start()
|
self.thread.start()
|
||||||
logging.info(f"Hook server started on port {self.port}")
|
logging.info(f"Hook server started on port {self.port}")
|
||||||
|
|
||||||
def stop(self) -> None:
|
def stop(self) -> None:
|
||||||
|
if self.websocket_server:
|
||||||
|
self.websocket_server.stop()
|
||||||
if self.server:
|
if self.server:
|
||||||
self.server.shutdown()
|
self.server.shutdown()
|
||||||
self.server.server_close()
|
self.server.server_close()
|
||||||
@@ -524,3 +665,62 @@ class HookServer:
|
|||||||
self.thread.join()
|
self.thread.join()
|
||||||
logging.info("Hook server stopped")
|
logging.info("Hook server stopped")
|
||||||
|
|
||||||
|
class WebSocketServer:
|
||||||
|
"""WebSocket gateway for real-time event streaming."""
|
||||||
|
def __init__(self, app: Any, port: int = 9000) -> None:
|
||||||
|
self.app = app
|
||||||
|
self.port = port
|
||||||
|
self.clients: dict[str, set] = {"events": set(), "telemetry": set()}
|
||||||
|
self.loop: asyncio.AbstractEventLoop | None = None
|
||||||
|
self.thread: threading.Thread | None = None
|
||||||
|
self.server = None
|
||||||
|
self._stop_event: asyncio.Event | None = None
|
||||||
|
|
||||||
|
async def _handler(self, websocket) -> None:
|
||||||
|
try:
|
||||||
|
async for message in websocket:
|
||||||
|
try:
|
||||||
|
data = json.loads(message)
|
||||||
|
if data.get("action") == "subscribe":
|
||||||
|
channel = data.get("channel")
|
||||||
|
if channel in self.clients:
|
||||||
|
self.clients[channel].add(websocket)
|
||||||
|
await websocket.send(json.dumps({"type": "subscription_confirmed", "channel": channel}))
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
pass
|
||||||
|
except websockets.exceptions.ConnectionClosed:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
for channel in self.clients:
|
||||||
|
if websocket in self.clients[channel]:
|
||||||
|
self.clients[channel].remove(websocket)
|
||||||
|
|
||||||
|
def _run_loop(self) -> None:
|
||||||
|
self.loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(self.loop)
|
||||||
|
self._stop_event = asyncio.Event()
|
||||||
|
async def main():
|
||||||
|
async with serve(self._handler, "127.0.0.1", self.port) as server:
|
||||||
|
self.server = server
|
||||||
|
await self._stop_event.wait()
|
||||||
|
self.loop.run_until_complete(main())
|
||||||
|
|
||||||
|
def start(self) -> None:
|
||||||
|
if self.thread and self.thread.is_alive():
|
||||||
|
return
|
||||||
|
self.thread = threading.Thread(target=self._run_loop, daemon=True)
|
||||||
|
self.thread.start()
|
||||||
|
|
||||||
|
def stop(self) -> None:
|
||||||
|
if self.loop and self._stop_event:
|
||||||
|
self.loop.call_soon_threadsafe(self._stop_event.set)
|
||||||
|
if self.thread:
|
||||||
|
self.thread.join(timeout=2.0)
|
||||||
|
|
||||||
|
def broadcast(self, channel: str, payload: dict[str, Any]) -> None:
|
||||||
|
if not self.loop or channel not in self.clients:
|
||||||
|
return
|
||||||
|
message = json.dumps({"channel": channel, "payload": payload})
|
||||||
|
for ws in list(self.clients[channel]):
|
||||||
|
asyncio.run_coroutine_threadsafe(ws.send(message), self.loop)
|
||||||
|
|
||||||
|
|||||||
+10
-1
@@ -150,7 +150,7 @@ class AppController:
|
|||||||
self.disc_roles: List[str] = []
|
self.disc_roles: List[str] = []
|
||||||
self.files: List[str] = []
|
self.files: List[str] = []
|
||||||
self.screenshots: List[str] = []
|
self.screenshots: List[str] = []
|
||||||
self.event_queue: events.SyncEventQueue = events.SyncEventQueue()
|
self.event_queue: events.AsyncEventQueue = events.AsyncEventQueue()
|
||||||
self._loop_thread: Optional[threading.Thread] = None
|
self._loop_thread: Optional[threading.Thread] = None
|
||||||
self.tracks: List[Dict[str, Any]] = []
|
self.tracks: List[Dict[str, Any]] = []
|
||||||
self.active_track: Optional[models.Track] = None
|
self.active_track: Optional[models.Track] = None
|
||||||
@@ -188,6 +188,7 @@ class AppController:
|
|||||||
"Tier 4": {"input": 0, "output": 0, "provider": "gemini", "model": "gemini-2.5-flash-lite", "tool_preset": None},
|
"Tier 4": {"input": 0, "output": 0, "provider": "gemini", "model": "gemini-2.5-flash-lite", "tool_preset": None},
|
||||||
}
|
}
|
||||||
self.perf_monitor: performance_monitor.PerformanceMonitor = performance_monitor.PerformanceMonitor()
|
self.perf_monitor: performance_monitor.PerformanceMonitor = performance_monitor.PerformanceMonitor()
|
||||||
|
self._last_telemetry_time: float = 0.0
|
||||||
self._pending_gui_tasks: List[Dict[str, Any]] = []
|
self._pending_gui_tasks: List[Dict[str, Any]] = []
|
||||||
self._api_event_queue: List[Dict[str, Any]] = []
|
self._api_event_queue: List[Dict[str, Any]] = []
|
||||||
# Pending dialogs state moved from App
|
# Pending dialogs state moved from App
|
||||||
@@ -522,6 +523,14 @@ class AppController:
|
|||||||
})
|
})
|
||||||
|
|
||||||
def _process_pending_gui_tasks(self) -> None:
|
def _process_pending_gui_tasks(self) -> None:
|
||||||
|
# Periodic telemetry broadcast
|
||||||
|
now = time.time()
|
||||||
|
if hasattr(self, 'event_queue') and hasattr(self.event_queue, 'websocket_server') and self.event_queue.websocket_server:
|
||||||
|
if now - self._last_telemetry_time >= 1.0:
|
||||||
|
self._last_telemetry_time = now
|
||||||
|
metrics = self.perf_monitor.get_metrics()
|
||||||
|
self.event_queue.websocket_server.broadcast("telemetry", metrics)
|
||||||
|
|
||||||
if not self._pending_gui_tasks:
|
if not self._pending_gui_tasks:
|
||||||
return
|
return
|
||||||
sys.stderr.write(f"[DEBUG] _process_pending_gui_tasks: processing {len(self._pending_gui_tasks)} tasks\n")
|
sys.stderr.write(f"[DEBUG] _process_pending_gui_tasks: processing {len(self._pending_gui_tasks)} tasks\n")
|
||||||
|
|||||||
+13
-7
@@ -9,7 +9,7 @@ between the GUI main thread and background workers:
|
|||||||
- Thread-safe: Callbacks execute on emitter's thread
|
- Thread-safe: Callbacks execute on emitter's thread
|
||||||
- Example: ai_client.py emits 'request_start' and 'response_received' events
|
- Example: ai_client.py emits 'request_start' and 'response_received' events
|
||||||
|
|
||||||
2. SyncEventQueue: Producer-consumer pattern via queue.Queue
|
2. AsyncEventQueue: Producer-consumer pattern via queue.Queue
|
||||||
- Used for: Decoupled task submission where consumer polls at its own pace
|
- Used for: Decoupled task submission where consumer polls at its own pace
|
||||||
- Thread-safe: Built on Python's thread-safe queue.Queue
|
- Thread-safe: Built on Python's thread-safe queue.Queue
|
||||||
- Example: Background workers submit tasks, main thread drains queue
|
- Example: Background workers submit tasks, main thread drains queue
|
||||||
@@ -21,16 +21,16 @@ between the GUI main thread and background workers:
|
|||||||
Integration Points:
|
Integration Points:
|
||||||
- ai_client.py: EventEmitter for API lifecycle events
|
- ai_client.py: EventEmitter for API lifecycle events
|
||||||
- gui_2.py: Consumes events via _process_event_queue()
|
- gui_2.py: Consumes events via _process_event_queue()
|
||||||
- multi_agent_conductor.py: Uses SyncEventQueue for state updates
|
- multi_agent_conductor.py: Uses AsyncEventQueue for state updates
|
||||||
- api_hooks.py: Pushes events to _api_event_queue for external visibility
|
- api_hooks.py: Pushes events to _api_event_queue for external visibility
|
||||||
|
|
||||||
Thread Safety:
|
Thread Safety:
|
||||||
- EventEmitter: NOT thread-safe for concurrent on/emit (use from single thread)
|
- EventEmitter: NOT thread-safe for concurrent on/emit (use from single thread)
|
||||||
- SyncEventQueue: FULLY thread-safe (built on queue.Queue)
|
- AsyncEventQueue: FULLY thread-safe (built on queue.Queue)
|
||||||
- UserRequestEvent: Immutable, safe for concurrent access
|
- UserRequestEvent: Immutable, safe for concurrent access
|
||||||
"""
|
"""
|
||||||
import queue
|
import queue
|
||||||
from typing import Callable, Any, Dict, List, Tuple
|
from typing import Callable, Any, Dict, List, Tuple, Optional
|
||||||
|
|
||||||
class EventEmitter:
|
class EventEmitter:
|
||||||
"""
|
"""
|
||||||
@@ -70,14 +70,16 @@ class EventEmitter:
|
|||||||
"""Clears all registered listeners."""
|
"""Clears all registered listeners."""
|
||||||
self._listeners.clear()
|
self._listeners.clear()
|
||||||
|
|
||||||
class SyncEventQueue:
|
class AsyncEventQueue:
|
||||||
"""
|
"""
|
||||||
Synchronous event queue for decoupled communication using queue.Queue.
|
Synchronous event queue for decoupled communication using queue.Queue.
|
||||||
|
(Named AsyncEventQueue for architectural consistency, but is synchronous)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
"""Initializes the SyncEventQueue with an internal queue.Queue."""
|
"""Initializes the AsyncEventQueue with an internal queue.Queue."""
|
||||||
self._queue: queue.Queue[Tuple[str, Any]] = queue.Queue()
|
self._queue: queue.Queue[Tuple[str, Any]] = queue.Queue()
|
||||||
|
self.websocket_server: Optional[Any] = None
|
||||||
|
|
||||||
def put(self, event_name: str, payload: Any = None) -> None:
|
def put(self, event_name: str, payload: Any = None) -> None:
|
||||||
"""
|
"""
|
||||||
@@ -88,6 +90,8 @@ class SyncEventQueue:
|
|||||||
payload: Optional data associated with the event.
|
payload: Optional data associated with the event.
|
||||||
"""
|
"""
|
||||||
self._queue.put((event_name, payload))
|
self._queue.put((event_name, payload))
|
||||||
|
if self.websocket_server:
|
||||||
|
self.websocket_server.broadcast("events", {"event": event_name, "payload": payload})
|
||||||
|
|
||||||
def get(self) -> Tuple[str, Any]:
|
def get(self) -> Tuple[str, Any]:
|
||||||
"""
|
"""
|
||||||
@@ -106,6 +110,9 @@ class SyncEventQueue:
|
|||||||
"""Blocks until all items in the queue have been gotten and processed."""
|
"""Blocks until all items in the queue have been gotten and processed."""
|
||||||
self._queue.join()
|
self._queue.join()
|
||||||
|
|
||||||
|
# Alias for backward compatibility
|
||||||
|
SyncEventQueue = AsyncEventQueue
|
||||||
|
|
||||||
class UserRequestEvent:
|
class UserRequestEvent:
|
||||||
"""
|
"""
|
||||||
Payload for a user request event.
|
Payload for a user request event.
|
||||||
@@ -126,4 +133,3 @@ class UserRequestEvent:
|
|||||||
"disc_text": self.disc_text,
|
"disc_text": self.disc_text,
|
||||||
"base_dir": self.base_dir
|
"base_dir": self.base_dir
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ Key Components:
|
|||||||
|
|
||||||
Architecture Integration:
|
Architecture Integration:
|
||||||
- Uses TrackDAG and ExecutionEngine from dag_engine.py
|
- Uses TrackDAG and ExecutionEngine from dag_engine.py
|
||||||
- Communicates with GUI via SyncEventQueue
|
- Communicates with GUI via AsyncEventQueue
|
||||||
- Manages tier-specific token usage via update_usage()
|
- Manages tier-specific token usage via update_usage()
|
||||||
|
|
||||||
Thread Safety:
|
Thread Safety:
|
||||||
@@ -45,7 +45,7 @@ Thread Safety:
|
|||||||
- Abort events enable per-ticket cancellation
|
- Abort events enable per-ticket cancellation
|
||||||
|
|
||||||
Integration:
|
Integration:
|
||||||
- Uses SyncEventQueue for state updates to the GUI
|
- Uses AsyncEventQueue for state updates to the GUI
|
||||||
- Uses ai_client.send() for LLM communication
|
- Uses ai_client.send() for LLM communication
|
||||||
- Uses mcp_client for tool dispatch
|
- Uses mcp_client for tool dispatch
|
||||||
|
|
||||||
@@ -123,7 +123,7 @@ class ConductorEngine:
|
|||||||
Orchestrates the execution of tickets within a track.
|
Orchestrates the execution of tickets within a track.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, track: Track, event_queue: Optional[events.SyncEventQueue] = None, auto_queue: bool = False) -> None:
|
def __init__(self, track: Track, event_queue: Optional[events.AsyncEventQueue] = None, auto_queue: bool = False) -> None:
|
||||||
self.track = track
|
self.track = track
|
||||||
self.event_queue = event_queue
|
self.event_queue = event_queue
|
||||||
self.tier_usage = {
|
self.tier_usage = {
|
||||||
@@ -343,12 +343,12 @@ class ConductorEngine:
|
|||||||
self._push_state(active_tier="Tier 2 (Tech Lead)")
|
self._push_state(active_tier="Tier 2 (Tech Lead)")
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
def _queue_put(event_queue: events.SyncEventQueue, event_name: str, payload) -> None:
|
def _queue_put(event_queue: events.AsyncEventQueue, event_name: str, payload) -> None:
|
||||||
"""Thread-safe helper to push an event to the SyncEventQueue from a worker thread."""
|
"""Thread-safe helper to push an event to the AsyncEventQueue from a worker thread."""
|
||||||
if event_queue is not None:
|
if event_queue is not None:
|
||||||
event_queue.put(event_name, payload)
|
event_queue.put(event_name, payload)
|
||||||
|
|
||||||
def confirm_execution(payload: str, event_queue: events.SyncEventQueue, ticket_id: str) -> bool:
|
def confirm_execution(payload: str, event_queue: events.AsyncEventQueue, ticket_id: str) -> bool:
|
||||||
"""
|
"""
|
||||||
Pushes an approval request to the GUI and waits for response.
|
Pushes an approval request to the GUI and waits for response.
|
||||||
"""
|
"""
|
||||||
@@ -370,7 +370,7 @@ def confirm_execution(payload: str, event_queue: events.SyncEventQueue, ticket_i
|
|||||||
return approved
|
return approved
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def confirm_spawn(role: str, prompt: str, context_md: str, event_queue: events.SyncEventQueue, ticket_id: str) -> Tuple[bool, str, str]:
|
def confirm_spawn(role: str, prompt: str, context_md: str, event_queue: events.AsyncEventQueue, ticket_id: str) -> Tuple[bool, str, str]:
|
||||||
"""
|
"""
|
||||||
Pushes a spawn approval request to the GUI and waits for response.
|
Pushes a spawn approval request to the GUI and waits for response.
|
||||||
Returns (approved, modified_prompt, modified_context)
|
Returns (approved, modified_prompt, modified_context)
|
||||||
@@ -409,7 +409,7 @@ def confirm_spawn(role: str, prompt: str, context_md: str, event_queue: events.S
|
|||||||
return approved, modified_prompt, modified_context
|
return approved, modified_prompt, modified_context
|
||||||
return False, prompt, context_md
|
return False, prompt, context_md
|
||||||
|
|
||||||
def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: List[str] | None = None, event_queue: events.SyncEventQueue | None = None, engine: Optional['ConductorEngine'] = None, md_content: str = "") -> None:
|
def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: List[str] | None = None, event_queue: events.AsyncEventQueue | None = None, engine: Optional['ConductorEngine'] = None, md_content: str = "") -> None:
|
||||||
"""
|
"""
|
||||||
Simulates the lifecycle of a single agent working on a ticket.
|
Simulates the lifecycle of a single agent working on a ticket.
|
||||||
Calls the AI client and updates the ticket status based on the response.
|
Calls the AI client and updates the ticket status based on the response.
|
||||||
|
|||||||
@@ -0,0 +1,21 @@
|
|||||||
|
import pytest
|
||||||
|
from src.api_hook_client import ApiHookClient
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_control_endpoints_exist():
|
||||||
|
client = ApiHookClient()
|
||||||
|
assert hasattr(client, "spawn_mma_worker")
|
||||||
|
assert hasattr(client, "kill_mma_worker")
|
||||||
|
assert hasattr(client, "pause_mma_pipeline")
|
||||||
|
assert hasattr(client, "resume_mma_pipeline")
|
||||||
|
assert hasattr(client, "inject_context")
|
||||||
|
assert hasattr(client, "mutate_mma_dag")
|
||||||
|
|
||||||
|
def test_api_hook_client_control_methods_exist():
|
||||||
|
client = ApiHookClient()
|
||||||
|
assert callable(getattr(client, "spawn_mma_worker", None))
|
||||||
|
assert callable(getattr(client, "kill_mma_worker", None))
|
||||||
|
assert callable(getattr(client, "pause_mma_pipeline", None))
|
||||||
|
assert callable(getattr(client, "resume_mma_pipeline", None))
|
||||||
|
assert callable(getattr(client, "inject_context", None))
|
||||||
|
assert callable(getattr(client, "mutate_mma_dag", None))
|
||||||
@@ -0,0 +1,36 @@
|
|||||||
|
import pytest
|
||||||
|
from src.api_hook_client import ApiHookClient
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_app():
|
||||||
|
class MockApp:
|
||||||
|
def __init__(self):
|
||||||
|
self.mma_streams = {"W1": {"status": "running", "logs": ["started"]}}
|
||||||
|
self.active_tickets = []
|
||||||
|
self.files = ["file1.py", "file2.py"]
|
||||||
|
self.mma_tier_usage = {"Tier 1": {"input": 100, "output": 50, "model": "gemini"}}
|
||||||
|
self.event_queue = type("MockQueue", (), {"_queue": type("Q", (), {"qsize": lambda s: 5})()})()
|
||||||
|
self._gettable_fields = {"test_field": "test_attr"}
|
||||||
|
self.test_attr = "hello"
|
||||||
|
self.test_hooks_enabled = True
|
||||||
|
self._pending_gui_tasks = []
|
||||||
|
self._pending_gui_tasks_lock = None
|
||||||
|
self.ai_status = "idle"
|
||||||
|
return MockApp()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_get_mma_workers():
|
||||||
|
client = ApiHookClient()
|
||||||
|
# Set up client to talk to a locally mocked server or use a live fixture if available
|
||||||
|
# For now, just test that the methods exist on ApiHookClient
|
||||||
|
assert hasattr(client, "get_mma_workers")
|
||||||
|
assert hasattr(client, "get_context_state")
|
||||||
|
assert hasattr(client, "get_financial_metrics")
|
||||||
|
assert hasattr(client, "get_system_telemetry")
|
||||||
|
|
||||||
|
def test_api_hook_client_methods_exist():
|
||||||
|
client = ApiHookClient()
|
||||||
|
assert callable(getattr(client, "get_mma_workers", None))
|
||||||
|
assert callable(getattr(client, "get_context_state", None))
|
||||||
|
assert callable(getattr(client, "get_financial_metrics", None))
|
||||||
|
assert callable(getattr(client, "get_system_telemetry", None))
|
||||||
@@ -0,0 +1,117 @@
|
|||||||
|
import pytest
|
||||||
|
from unittest.mock import patch, MagicMock
|
||||||
|
from src.api_hook_client import ApiHookClient
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_mma_track_lifecycle_simulation():
|
||||||
|
"""
|
||||||
|
This test simulates the sequence of API calls an external orchestrator
|
||||||
|
would make to manage an MMA track lifecycle via the Hook API.
|
||||||
|
It verifies that ApiHookClient correctly routes requests to the
|
||||||
|
corresponding endpoints in src/api_hooks.py.
|
||||||
|
"""
|
||||||
|
|
||||||
|
client = ApiHookClient("http://localhost:8999")
|
||||||
|
|
||||||
|
with patch('requests.get') as mock_get, patch('requests.post') as mock_post:
|
||||||
|
# --- PHASE 1: Initialization & Discovery ---
|
||||||
|
|
||||||
|
# Mock successful status check
|
||||||
|
mock_get.return_value.status_code = 200
|
||||||
|
mock_get.return_value.json.return_value = {"status": "ok"}
|
||||||
|
assert client.get_status()["status"] == "ok"
|
||||||
|
|
||||||
|
# Mock project state retrieval
|
||||||
|
mock_get.return_value.json.return_value = {"project": {"name": "test_project"}}
|
||||||
|
project = client.get_project()
|
||||||
|
assert project["project"]["name"] == "test_project"
|
||||||
|
|
||||||
|
# --- PHASE 2: Track Planning & Initialization ---
|
||||||
|
|
||||||
|
# Inject some files into context for the AI to work with
|
||||||
|
mock_post.return_value.status_code = 200
|
||||||
|
mock_post.return_value.json.return_value = {"status": "queued"}
|
||||||
|
inject_data = {"files": ["src/app_controller.py", "tests/test_basic.py"]}
|
||||||
|
res = client.inject_context(inject_data)
|
||||||
|
assert res["status"] == "queued"
|
||||||
|
mock_post.assert_called_with("http://localhost:8999/api/context/inject", json=inject_data, headers={}, timeout=5.0)
|
||||||
|
|
||||||
|
# --- PHASE 3: Worker Spawn & Execution ---
|
||||||
|
|
||||||
|
# Spawn a worker to start a ticket
|
||||||
|
spawn_data = {
|
||||||
|
"track_id": "track_20260311",
|
||||||
|
"ticket_id": "TKT-001",
|
||||||
|
"role": "tier3-worker",
|
||||||
|
"prompt": "Implement the new logging feature"
|
||||||
|
}
|
||||||
|
res = client.spawn_mma_worker(spawn_data)
|
||||||
|
assert res["status"] == "queued"
|
||||||
|
mock_post.assert_called_with("http://localhost:8999/api/mma/workers/spawn", json=spawn_data, headers={}, timeout=5.0)
|
||||||
|
|
||||||
|
# --- PHASE 4: DAG Mutation & Dependency Management ---
|
||||||
|
|
||||||
|
# Add a second ticket that depends on the first one
|
||||||
|
dag_mutation = {
|
||||||
|
"action": "add_ticket",
|
||||||
|
"ticket": {
|
||||||
|
"id": "TKT-002",
|
||||||
|
"deps": ["TKT-001"],
|
||||||
|
"role": "tier4-qa",
|
||||||
|
"prompt": "Verify the logging feature"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
res = client.mutate_mma_dag(dag_mutation)
|
||||||
|
assert res["status"] == "queued"
|
||||||
|
mock_post.assert_called_with("http://localhost:8999/api/mma/dag/mutate", json=dag_mutation, headers={}, timeout=5.0)
|
||||||
|
|
||||||
|
# --- PHASE 5: Monitoring & Status Polling ---
|
||||||
|
|
||||||
|
# Poll for MMA status
|
||||||
|
mock_get.return_value.json.return_value = {
|
||||||
|
"mma_status": "running",
|
||||||
|
"active_tickets": ["TKT-001"],
|
||||||
|
"active_tier": "Tier 3",
|
||||||
|
"tracks": [{"id": "track_20260311", "status": "active"}]
|
||||||
|
}
|
||||||
|
mma_status = client.get_mma_status()
|
||||||
|
assert mma_status["mma_status"] == "running"
|
||||||
|
assert "TKT-001" in mma_status["active_tickets"]
|
||||||
|
|
||||||
|
# Check worker stream status
|
||||||
|
mock_get.return_value.json.return_value = {
|
||||||
|
"workers": {
|
||||||
|
"TKT-001": {"status": "running", "output": "Starting work..."}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
workers = client.get_mma_workers()
|
||||||
|
assert workers["workers"]["TKT-001"]["status"] == "running"
|
||||||
|
|
||||||
|
# --- PHASE 6: Human-in-the-Loop Interaction ---
|
||||||
|
|
||||||
|
# Mock a tool approval request
|
||||||
|
# In a real scenario, this would block until a POST to /api/ask/respond occurs
|
||||||
|
mock_post.return_value.json.return_value = {"status": "ok", "response": True}
|
||||||
|
approved = client.request_confirmation("run_powershell", {"script": "ls -Recurse"})
|
||||||
|
assert approved is True
|
||||||
|
|
||||||
|
# --- PHASE 7: Completion & Cleanup ---
|
||||||
|
|
||||||
|
# Mock completion status
|
||||||
|
mock_get.return_value.json.return_value = {
|
||||||
|
"mma_status": "idle",
|
||||||
|
"active_tickets": [],
|
||||||
|
"tracks": [{"id": "track_20260311", "status": "completed"}]
|
||||||
|
}
|
||||||
|
final_status = client.get_mma_status()
|
||||||
|
assert final_status["mma_status"] == "idle"
|
||||||
|
assert len(final_status["active_tickets"]) == 0
|
||||||
|
|
||||||
|
# Reset session to clean up
|
||||||
|
client.reset_session()
|
||||||
|
# Verify reset click was pushed
|
||||||
|
mock_post.assert_called_with("http://localhost:8999/api/gui", json={"action": "click", "item": "btn_reset", "user_data": None}, headers={}, timeout=5.0)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import asyncio
|
||||||
|
asyncio.run(test_mma_track_lifecycle_simulation())
|
||||||
@@ -0,0 +1,44 @@
|
|||||||
|
import pytest
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import websockets
|
||||||
|
from src.api_hooks import WebSocketServer
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_websocket_subscription_and_broadcast():
|
||||||
|
# Mock app
|
||||||
|
app = type("MockApp", (), {"test_hooks_enabled": True})()
|
||||||
|
|
||||||
|
# Start server on a specific port
|
||||||
|
port = 9005
|
||||||
|
server = WebSocketServer(app, port=port)
|
||||||
|
server.start()
|
||||||
|
|
||||||
|
# Wait for server to start
|
||||||
|
await asyncio.sleep(0.5)
|
||||||
|
|
||||||
|
try:
|
||||||
|
uri = f"ws://127.0.0.1:{port}"
|
||||||
|
async with websockets.connect(uri) as websocket:
|
||||||
|
# Subscribe to events channel
|
||||||
|
subscribe_msg = {"action": "subscribe", "channel": "events"}
|
||||||
|
await websocket.send(json.dumps(subscribe_msg))
|
||||||
|
|
||||||
|
# Receive confirmation
|
||||||
|
response = await websocket.recv()
|
||||||
|
data = json.loads(response)
|
||||||
|
assert data["type"] == "subscription_confirmed"
|
||||||
|
assert data["channel"] == "events"
|
||||||
|
|
||||||
|
# Broadcast an event from the server
|
||||||
|
event_payload = {"event": "test_event", "data": "hello"}
|
||||||
|
server.broadcast("events", event_payload)
|
||||||
|
|
||||||
|
# Receive the broadcast
|
||||||
|
broadcast_response = await websocket.recv()
|
||||||
|
broadcast_data = json.loads(broadcast_response)
|
||||||
|
assert broadcast_data["channel"] == "events"
|
||||||
|
assert broadcast_data["payload"] == event_payload
|
||||||
|
|
||||||
|
finally:
|
||||||
|
server.stop()
|
||||||
Reference in New Issue
Block a user