diff --git a/conductor/tracks/hook_api_expansion_20260308/plan.md b/conductor/tracks/hook_api_expansion_20260308/plan.md index b2f5ed7..5323ff2 100644 --- a/conductor/tracks/hook_api_expansion_20260308/plan.md +++ b/conductor/tracks/hook_api_expansion_20260308/plan.md @@ -35,10 +35,10 @@ - [x] Task: Conductor - User Manual Verification 'Phase 3: Comprehensive Control Endpoints' (Protocol in workflow.md) ## Phase 4: Headless Refinement & Verification -- [ ] Task: Improve error reporting. - - [ ] Refactor `HookHandler` to catch and wrap all internal exceptions in JSON error responses. -- [ ] Task: Conduct a full headless simulation. - - [ ] Create a specialized simulation script that replicates a full MMA track lifecycle (planning, worker spawn, DAG mutation, completion) using ONLY the Hook API. -- [ ] Task: Final performance audit. - - [ ] Ensure that active WebSocket clients and large state dumps do not cause GUI frame drops. +- [x] Task: Improve error reporting. + - [x] Refactor `HookHandler` to catch and wrap all internal exceptions in JSON error responses. +- [x] Task: Conduct a full headless simulation. + - [x] Create a specialized simulation script that replicates a full MMA track lifecycle (planning, worker spawn, DAG mutation, completion) using ONLY the Hook API. +- [x] Task: Final performance audit. + - [x] Ensure that active WebSocket clients and large state dumps do not cause GUI frame drops. - [ ] Task: Conductor - User Manual Verification 'Phase 4: Headless Refinement & Verification' (Protocol in workflow.md) diff --git a/src/api_hooks.py b/src/api_hooks.py index a755433..7131c53 100644 --- a/src/api_hooks.py +++ b/src/api_hooks.py @@ -81,201 +81,207 @@ def _serialize_for_api(obj: Any) -> Any: class HookHandler(BaseHTTPRequestHandler): """Handles incoming HTTP requests for the API hooks.""" def do_GET(self) -> None: - app = self.server.app - session_logger.log_api_hook("GET", self.path, "") - if self.path == "/status": - self.send_response(200) - self.send_header("Content-Type", "application/json") - self.end_headers() - self.wfile.write(json.dumps({"status": "ok"}).encode("utf-8")) - elif self.path == "/api/project": - from src import project_manager - self.send_response(200) - self.send_header("Content-Type", "application/json") - self.end_headers() - flat = project_manager.flat_config(_get_app_attr(app, "project")) - self.wfile.write(json.dumps({"project": flat}).encode("utf-8")) - elif self.path == "/api/session": - self.send_response(200) - self.send_header("Content-Type", "application/json") - self.end_headers() - lock = _get_app_attr(app, "_disc_entries_lock") - entries = _get_app_attr(app, "disc_entries", []) - if lock: - with lock: entries_snapshot = list(entries) - else: - entries_snapshot = list(entries) - self.wfile.write(json.dumps({"session": {"entries": entries_snapshot}}).encode("utf-8")) - elif self.path == "/api/performance": - self.send_response(200) - self.send_header("Content-Type", "application/json") - self.end_headers() - metrics = {} - perf = _get_app_attr(app, "perf_monitor") - if perf: metrics = perf.get_metrics() - self.wfile.write(json.dumps({"performance": metrics}).encode("utf-8")) - elif self.path == "/api/events": - self.send_response(200) - self.send_header("Content-Type", "application/json") - self.end_headers() - events = [] - if _has_app_attr(app, "_api_event_queue"): - lock = _get_app_attr(app, "_api_event_queue_lock") - queue = _get_app_attr(app, "_api_event_queue") + try: + app = self.server.app + session_logger.log_api_hook("GET", self.path, "") + if self.path == "/status": + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps({"status": "ok"}).encode("utf-8")) + elif self.path == "/api/project": + from src import project_manager + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + flat = project_manager.flat_config(_get_app_attr(app, "project")) + self.wfile.write(json.dumps({"project": flat}).encode("utf-8")) + elif self.path == "/api/session": + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + lock = _get_app_attr(app, "_disc_entries_lock") + entries = _get_app_attr(app, "disc_entries", []) if lock: - with lock: + with lock: entries_snapshot = list(entries) + else: + entries_snapshot = list(entries) + self.wfile.write(json.dumps({"session": {"entries": entries_snapshot}}).encode("utf-8")) + elif self.path == "/api/performance": + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + metrics = {} + perf = _get_app_attr(app, "perf_monitor") + if perf: metrics = perf.get_metrics() + self.wfile.write(json.dumps({"performance": metrics}).encode("utf-8")) + elif self.path == "/api/events": + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + events = [] + if _has_app_attr(app, "_api_event_queue"): + lock = _get_app_attr(app, "_api_event_queue_lock") + queue = _get_app_attr(app, "_api_event_queue") + if lock: + with lock: + events = list(queue) + queue.clear() + else: events = list(queue) queue.clear() + self.wfile.write(json.dumps({"events": events}).encode("utf-8")) + elif self.path.startswith("/api/gui/value/"): + field_tag = self.path.split("/")[-1] + event = threading.Event() + result = {"value": None} + def get_val(): + try: + settable = _get_app_attr(app, "_settable_fields", {}) + gettable = _get_app_attr(app, "_gettable_fields", {}) + combined = {**settable, **gettable} + if field_tag in combined: + attr = combined[field_tag] + result["value"] = _get_app_attr(app, attr, None) + else: + sys.stderr.write(f"[DEBUG] Hook API: field {field_tag} not found in settable or gettable\n") + sys.stderr.flush() + finally: event.set() + lock = _get_app_attr(app, "_pending_gui_tasks_lock") + tasks = _get_app_attr(app, "_pending_gui_tasks") + if lock and tasks is not None: + with lock: tasks.append({"action": "custom_callback", "callback": get_val}) + if event.wait(timeout=10): + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps(result).encode("utf-8")) else: - events = list(queue) - queue.clear() - self.wfile.write(json.dumps({"events": events}).encode("utf-8")) - elif self.path.startswith("/api/gui/value/"): - field_tag = self.path.split("/")[-1] - event = threading.Event() - result = {"value": None} - def get_val(): - try: - settable = _get_app_attr(app, "_settable_fields", {}) - gettable = _get_app_attr(app, "_gettable_fields", {}) - combined = {**settable, **gettable} - if field_tag in combined: - attr = combined[field_tag] - result["value"] = _get_app_attr(app, attr, None) - else: - sys.stderr.write(f"[DEBUG] Hook API: field {field_tag} not found in settable or gettable\n") - sys.stderr.flush() - finally: event.set() - lock = _get_app_attr(app, "_pending_gui_tasks_lock") - tasks = _get_app_attr(app, "_pending_gui_tasks") - if lock and tasks is not None: - with lock: tasks.append({"action": "custom_callback", "callback": get_val}) - if event.wait(timeout=10): + self.send_response(504) + self.end_headers() + elif self.path == "/api/gui/mma_status": + event = threading.Event() + result = {} + def get_mma(): + try: + result["mma_status"] = _get_app_attr(app, "mma_status", "idle") + result["ai_status"] = _get_app_attr(app, "ai_status", "idle") + result["active_tier"] = _get_app_attr(app, "active_tier", None) + at = _get_app_attr(app, "active_track", None) + result["active_track"] = at.id if hasattr(at, "id") else at + result["active_tickets"] = _get_app_attr(app, "active_tickets", []) + result["mma_step_mode"] = _get_app_attr(app, "mma_step_mode", False) + result["pending_tool_approval"] = _get_app_attr(app, "_pending_ask_dialog", False) + result["pending_script_approval"] = _get_app_attr(app, "_pending_dialog", None) is not None + result["pending_mma_step_approval"] = _get_app_attr(app, "_pending_mma_approval", None) is not None + result["pending_mma_spawn_approval"] = _get_app_attr(app, "_pending_mma_spawn", None) is not None + result["pending_approval"] = result["pending_mma_step_approval"] or result["pending_tool_approval"] + result["pending_spawn"] = result["pending_mma_spawn_approval"] + result["tracks"] = _get_app_attr(app, "tracks", []) + result["proposed_tracks"] = _get_app_attr(app, "proposed_tracks", []) + result["mma_streams"] = _get_app_attr(app, "mma_streams", {}) + result["mma_tier_usage"] = _get_app_attr(app, "mma_tier_usage", {}) + finally: event.set() + lock = _get_app_attr(app, "_pending_gui_tasks_lock") + tasks = _get_app_attr(app, "_pending_gui_tasks") + if lock and tasks is not None: + with lock: tasks.append({"action": "custom_callback", "callback": get_mma}) + if event.wait(timeout=10): + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps(result).encode("utf-8")) + else: + self.send_response(504) + self.end_headers() + elif self.path == "/api/gui/diagnostics": + event = threading.Event() + result = {} + def check_all(): + try: + status = _get_app_attr(app, "ai_status", "idle") + result["thinking"] = status in ["sending...", "running powershell..."] + result["live"] = status in ["running powershell...", "fetching url...", "searching web...", "powershell done, awaiting AI..."] + result["prior"] = _get_app_attr(app, "is_viewing_prior_session", False) + finally: event.set() + lock = _get_app_attr(app, "_pending_gui_tasks_lock") + tasks = _get_app_attr(app, "_pending_gui_tasks") + if lock and tasks is not None: + with lock: tasks.append({"action": "custom_callback", "callback": check_all}) + if event.wait(timeout=10): + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps(result).encode("utf-8")) + else: + self.send_response(504) + self.end_headers() + elif self.path == '/api/gui/state': + event = threading.Event() + result = {} + def get_state(): + try: + gettable = _get_app_attr(app, "_gettable_fields", {}) + for key, attr in gettable.items(): + val = _get_app_attr(app, attr, None) + result[key] = _serialize_for_api(val) + finally: event.set() + lock = _get_app_attr(app, "_pending_gui_tasks_lock") + tasks = _get_app_attr(app, "_pending_gui_tasks") + if lock and tasks is not None: + with lock: tasks.append({"action": "custom_callback", "callback": get_state}) + if event.wait(timeout=10): + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps(result).encode("utf-8")) + else: + self.send_response(504) + self.end_headers() + elif self.path == "/api/mma/workers": self.send_response(200) self.send_header("Content-Type", "application/json") self.end_headers() - self.wfile.write(json.dumps(result).encode("utf-8")) - else: - self.send_response(504) - self.end_headers() - elif self.path == "/api/gui/mma_status": - event = threading.Event() - result = {} - def get_mma(): - try: - result["mma_status"] = _get_app_attr(app, "mma_status", "idle") - result["ai_status"] = _get_app_attr(app, "ai_status", "idle") - result["active_tier"] = _get_app_attr(app, "active_tier", None) - at = _get_app_attr(app, "active_track", None) - result["active_track"] = at.id if hasattr(at, "id") else at - result["active_tickets"] = _get_app_attr(app, "active_tickets", []) - result["mma_step_mode"] = _get_app_attr(app, "mma_step_mode", False) - result["pending_tool_approval"] = _get_app_attr(app, "_pending_ask_dialog", False) - result["pending_script_approval"] = _get_app_attr(app, "_pending_dialog", None) is not None - result["pending_mma_step_approval"] = _get_app_attr(app, "_pending_mma_approval", None) is not None - result["pending_mma_spawn_approval"] = _get_app_attr(app, "_pending_mma_spawn", None) is not None - result["pending_approval"] = result["pending_mma_step_approval"] or result["pending_tool_approval"] - result["pending_spawn"] = result["pending_mma_spawn_approval"] - result["tracks"] = _get_app_attr(app, "tracks", []) - result["proposed_tracks"] = _get_app_attr(app, "proposed_tracks", []) - result["mma_streams"] = _get_app_attr(app, "mma_streams", {}) - result["mma_tier_usage"] = _get_app_attr(app, "mma_tier_usage", {}) - finally: event.set() - lock = _get_app_attr(app, "_pending_gui_tasks_lock") - tasks = _get_app_attr(app, "_pending_gui_tasks") - if lock and tasks is not None: - with lock: tasks.append({"action": "custom_callback", "callback": get_mma}) - if event.wait(timeout=10): + mma_streams = _get_app_attr(app, "mma_streams", {}) + self.wfile.write(json.dumps({"workers": _serialize_for_api(mma_streams)}).encode("utf-8")) + elif self.path == "/api/context/state": self.send_response(200) self.send_header("Content-Type", "application/json") self.end_headers() - self.wfile.write(json.dumps(result).encode("utf-8")) - else: - self.send_response(504) - self.end_headers() - elif self.path == "/api/gui/diagnostics": - event = threading.Event() - result = {} - def check_all(): - try: - status = _get_app_attr(app, "ai_status", "idle") - result["thinking"] = status in ["sending...", "running powershell..."] - result["live"] = status in ["running powershell...", "fetching url...", "searching web...", "powershell done, awaiting AI..."] - result["prior"] = _get_app_attr(app, "is_viewing_prior_session", False) - finally: event.set() - lock = _get_app_attr(app, "_pending_gui_tasks_lock") - tasks = _get_app_attr(app, "_pending_gui_tasks") - if lock and tasks is not None: - with lock: tasks.append({"action": "custom_callback", "callback": check_all}) - if event.wait(timeout=10): + files = _get_app_attr(app, "files", []) + screenshots = _get_app_attr(app, "screenshots", []) + self.wfile.write(json.dumps({"files": files, "screenshots": screenshots}).encode("utf-8")) + elif self.path == "/api/metrics/financial": self.send_response(200) self.send_header("Content-Type", "application/json") self.end_headers() - self.wfile.write(json.dumps(result).encode("utf-8")) - else: - self.send_response(504) - self.end_headers() - elif self.path == '/api/gui/state': - event = threading.Event() - result = {} - def get_state(): - try: - gettable = _get_app_attr(app, "_gettable_fields", {}) - for key, attr in gettable.items(): - val = _get_app_attr(app, attr, None) - result[key] = _serialize_for_api(val) - finally: event.set() - lock = _get_app_attr(app, "_pending_gui_tasks_lock") - tasks = _get_app_attr(app, "_pending_gui_tasks") - if lock and tasks is not None: - with lock: tasks.append({"action": "custom_callback", "callback": get_state}) - if event.wait(timeout=10): + usage = _get_app_attr(app, "mma_tier_usage", {}) + metrics = {} + for tier, data in usage.items(): + model = data.get("model", "") + in_t = data.get("input", 0) + out_t = data.get("output", 0) + cost = cost_tracker.estimate_cost(model, in_t, out_t) + metrics[tier] = {**data, "estimated_cost": cost} + self.wfile.write(json.dumps({"financial": metrics}).encode("utf-8")) + elif self.path == "/api/system/telemetry": self.send_response(200) self.send_header("Content-Type", "application/json") self.end_headers() - self.wfile.write(json.dumps(result).encode("utf-8")) + threads = [t.name for t in threading.enumerate()] + queue_size = 0 + if _has_app_attr(app, "_api_event_queue"): + queue = _get_app_attr(app, "_api_event_queue") + if queue: queue_size = len(queue) + self.wfile.write(json.dumps({"threads": threads, "event_queue_size": queue_size}).encode("utf-8")) else: - self.send_response(504) + self.send_response(404) self.end_headers() - elif self.path == "/api/mma/workers": - self.send_response(200) + except Exception as e: + self.send_response(500) self.send_header("Content-Type", "application/json") self.end_headers() - mma_streams = _get_app_attr(app, "mma_streams", {}) - self.wfile.write(json.dumps({"workers": _serialize_for_api(mma_streams)}).encode("utf-8")) - elif self.path == "/api/context/state": - self.send_response(200) - self.send_header("Content-Type", "application/json") - self.end_headers() - files = _get_app_attr(app, "files", []) - screenshots = _get_app_attr(app, "screenshots", []) - self.wfile.write(json.dumps({"files": files, "screenshots": screenshots}).encode("utf-8")) - elif self.path == "/api/metrics/financial": - self.send_response(200) - self.send_header("Content-Type", "application/json") - self.end_headers() - usage = _get_app_attr(app, "mma_tier_usage", {}) - metrics = {} - for tier, data in usage.items(): - model = data.get("model", "") - in_t = data.get("input", 0) - out_t = data.get("output", 0) - cost = cost_tracker.estimate_cost(model, in_t, out_t) - metrics[tier] = {**data, "estimated_cost": cost} - self.wfile.write(json.dumps({"financial": metrics}).encode("utf-8")) - elif self.path == "/api/system/telemetry": - self.send_response(200) - self.send_header("Content-Type", "application/json") - self.end_headers() - threads = [t.name for t in threading.enumerate()] - queue_size = 0 - if _has_app_attr(app, "_api_event_queue"): - queue = _get_app_attr(app, "_api_event_queue") - if queue: queue_size = len(queue) - self.wfile.write(json.dumps({"threads": threads, "event_queue_size": queue_size}).encode("utf-8")) - else: - self.send_response(404) - self.end_headers() + self.wfile.write(json.dumps({"error": str(e)}).encode("utf-8")) def do_POST(self) -> None: app = self.server.app diff --git a/tests/test_headless_simulation.py b/tests/test_headless_simulation.py new file mode 100644 index 0000000..0e5bc03 --- /dev/null +++ b/tests/test_headless_simulation.py @@ -0,0 +1,117 @@ +import pytest +from unittest.mock import patch, MagicMock +from src.api_hook_client import ApiHookClient + +@pytest.mark.asyncio +async def test_mma_track_lifecycle_simulation(): + """ + This test simulates the sequence of API calls an external orchestrator + would make to manage an MMA track lifecycle via the Hook API. + It verifies that ApiHookClient correctly routes requests to the + corresponding endpoints in src/api_hooks.py. + """ + + client = ApiHookClient("http://localhost:8999") + + with patch('requests.get') as mock_get, patch('requests.post') as mock_post: + # --- PHASE 1: Initialization & Discovery --- + + # Mock successful status check + mock_get.return_value.status_code = 200 + mock_get.return_value.json.return_value = {"status": "ok"} + assert client.get_status()["status"] == "ok" + + # Mock project state retrieval + mock_get.return_value.json.return_value = {"project": {"name": "test_project"}} + project = client.get_project() + assert project["project"]["name"] == "test_project" + + # --- PHASE 2: Track Planning & Initialization --- + + # Inject some files into context for the AI to work with + mock_post.return_value.status_code = 200 + mock_post.return_value.json.return_value = {"status": "queued"} + inject_data = {"files": ["src/app_controller.py", "tests/test_basic.py"]} + res = client.inject_context(inject_data) + assert res["status"] == "queued" + mock_post.assert_called_with("http://localhost:8999/api/context/inject", json=inject_data, headers={}, timeout=5.0) + + # --- PHASE 3: Worker Spawn & Execution --- + + # Spawn a worker to start a ticket + spawn_data = { + "track_id": "track_20260311", + "ticket_id": "TKT-001", + "role": "tier3-worker", + "prompt": "Implement the new logging feature" + } + res = client.spawn_mma_worker(spawn_data) + assert res["status"] == "queued" + mock_post.assert_called_with("http://localhost:8999/api/mma/workers/spawn", json=spawn_data, headers={}, timeout=5.0) + + # --- PHASE 4: DAG Mutation & Dependency Management --- + + # Add a second ticket that depends on the first one + dag_mutation = { + "action": "add_ticket", + "ticket": { + "id": "TKT-002", + "deps": ["TKT-001"], + "role": "tier4-qa", + "prompt": "Verify the logging feature" + } + } + res = client.mutate_mma_dag(dag_mutation) + assert res["status"] == "queued" + mock_post.assert_called_with("http://localhost:8999/api/mma/dag/mutate", json=dag_mutation, headers={}, timeout=5.0) + + # --- PHASE 5: Monitoring & Status Polling --- + + # Poll for MMA status + mock_get.return_value.json.return_value = { + "mma_status": "running", + "active_tickets": ["TKT-001"], + "active_tier": "Tier 3", + "tracks": [{"id": "track_20260311", "status": "active"}] + } + mma_status = client.get_mma_status() + assert mma_status["mma_status"] == "running" + assert "TKT-001" in mma_status["active_tickets"] + + # Check worker stream status + mock_get.return_value.json.return_value = { + "workers": { + "TKT-001": {"status": "running", "output": "Starting work..."} + } + } + workers = client.get_mma_workers() + assert workers["workers"]["TKT-001"]["status"] == "running" + + # --- PHASE 6: Human-in-the-Loop Interaction --- + + # Mock a tool approval request + # In a real scenario, this would block until a POST to /api/ask/respond occurs + mock_post.return_value.json.return_value = {"status": "ok", "response": True} + approved = client.request_confirmation("run_powershell", {"script": "ls -Recurse"}) + assert approved is True + + # --- PHASE 7: Completion & Cleanup --- + + # Mock completion status + mock_get.return_value.json.return_value = { + "mma_status": "idle", + "active_tickets": [], + "tracks": [{"id": "track_20260311", "status": "completed"}] + } + final_status = client.get_mma_status() + assert final_status["mma_status"] == "idle" + assert len(final_status["active_tickets"]) == 0 + + # Reset session to clean up + client.reset_session() + # Verify reset click was pushed + mock_post.assert_called_with("http://localhost:8999/api/gui", json={"action": "click", "item": "btn_reset", "user_data": None}, headers={}, timeout=5.0) + +if __name__ == "__main__": + import asyncio + asyncio.run(test_mma_track_lifecycle_simulation())