diff --git a/conductor/tracks/hook_api_expansion_20260308/plan.md b/conductor/tracks/hook_api_expansion_20260308/plan.md index 2d3168e..8c7642a 100644 --- a/conductor/tracks/hook_api_expansion_20260308/plan.md +++ b/conductor/tracks/hook_api_expansion_20260308/plan.md @@ -23,15 +23,15 @@ - [x] Task: Conductor - User Manual Verification 'Phase 2: Expanded Read Endpoints' (Protocol in workflow.md) ## Phase 3: Comprehensive Control Endpoints (POST) -- [ ] Task: Implement worker and pipeline control. - - [ ] Add `/api/mma/workers/spawn` to manually initiate sub-agent execution via the API. - - [ ] Add `/api/mma/workers/kill` to programmatically abort running workers. - - [ ] Add `/api/mma/pipeline/pause` and `/api/mma/pipeline/resume` to control the global MMA loop. -- [ ] Task: Implement context and DAG mutation. - - [ ] Add `/api/context/inject` to allow programmatic context injection (files/skeletons). - - [ ] Add `/api/mma/dag/mutate` to allow modifying task dependencies through the API. -- [ ] Task: Update `api_hook_client.py` with corresponding methods for all new POST endpoints. -- [ ] Task: Write integration tests for all new control endpoints using `live_gui`. +- [x] Task: Implement worker and pipeline control. + - [x] Add `/api/mma/workers/spawn` to manually initiate sub-agent execution via the API. + - [x] Add `/api/mma/workers/kill` to programmatically abort running workers. + - [x] Add `/api/mma/pipeline/pause` and `/api/mma/pipeline/resume` to control the global MMA loop. +- [x] Task: Implement context and DAG mutation. + - [x] Add `/api/context/inject` to allow programmatic context injection (files/skeletons). + - [x] Add `/api/mma/dag/mutate` to allow modifying task dependencies through the API. +- [x] Task: Update `api_hook_client.py` with corresponding methods for all new POST endpoints. +- [x] Task: Write integration tests for all new control endpoints using `live_gui`. - [ ] Task: Conductor - User Manual Verification 'Phase 3: Comprehensive Control Endpoints' (Protocol in workflow.md) ## Phase 4: Headless Refinement & Verification diff --git a/src/api_hook_client.py b/src/api_hook_client.py index f59f783..1480df4 100644 --- a/src/api_hook_client.py +++ b/src/api_hook_client.py @@ -240,3 +240,21 @@ class ApiHookClient: """Gets the current patch modal status.""" return self._make_request('GET', '/api/patch/status') or {} + def spawn_mma_worker(self, data: dict) -> dict: + return self._make_request('POST', '/api/mma/workers/spawn', data=data) or {} + + def kill_mma_worker(self, worker_id: str) -> dict: + return self._make_request('POST', '/api/mma/workers/kill', data={"worker_id": worker_id}) or {} + + def pause_mma_pipeline(self) -> dict: + return self._make_request('POST', '/api/mma/pipeline/pause') or {} + + def resume_mma_pipeline(self) -> dict: + return self._make_request('POST', '/api/mma/pipeline/resume') or {} + + def inject_context(self, data: dict) -> dict: + return self._make_request('POST', '/api/context/inject', data=data) or {} + + def mutate_mma_dag(self, data: dict) -> dict: + return self._make_request('POST', '/api/mma/dag/mutate', data=data) or {} + diff --git a/src/api_hooks.py b/src/api_hooks.py index dae652d..a755433 100644 --- a/src/api_hooks.py +++ b/src/api_hooks.py @@ -519,6 +519,90 @@ class HookHandler(BaseHTTPRequestHandler): else: self.send_response(404) self.end_headers() + elif self.path == "/api/mma/workers/spawn": + def spawn_worker(): + try: + func = _get_app_attr(app, "_spawn_worker") + if func: func(data) + except Exception as e: + sys.stderr.write(f"[DEBUG] Hook API spawn_worker error: {e}\n") + sys.stderr.flush() + lock = _get_app_attr(app, "_pending_gui_tasks_lock") + tasks = _get_app_attr(app, "_pending_gui_tasks") + if lock and tasks is not None: + with lock: tasks.append({"action": "custom_callback", "callback": spawn_worker}) + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8")) + elif self.path == "/api/mma/workers/kill": + def kill_worker(): + try: + worker_id = data.get("worker_id") + func = _get_app_attr(app, "_kill_worker") + if func: func(worker_id) + except Exception as e: + sys.stderr.write(f"[DEBUG] Hook API kill_worker error: {e}\n") + sys.stderr.flush() + lock = _get_app_attr(app, "_pending_gui_tasks_lock") + tasks = _get_app_attr(app, "_pending_gui_tasks") + if lock and tasks is not None: + with lock: tasks.append({"action": "custom_callback", "callback": kill_worker}) + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8")) + elif self.path == "/api/mma/pipeline/pause": + def pause_pipeline(): + _set_app_attr(app, "mma_step_mode", True) + lock = _get_app_attr(app, "_pending_gui_tasks_lock") + tasks = _get_app_attr(app, "_pending_gui_tasks") + if lock and tasks is not None: + with lock: tasks.append({"action": "custom_callback", "callback": pause_pipeline}) + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8")) + elif self.path == "/api/mma/pipeline/resume": + def resume_pipeline(): + _set_app_attr(app, "mma_step_mode", False) + lock = _get_app_attr(app, "_pending_gui_tasks_lock") + tasks = _get_app_attr(app, "_pending_gui_tasks") + if lock and tasks is not None: + with lock: tasks.append({"action": "custom_callback", "callback": resume_pipeline}) + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8")) + elif self.path == "/api/context/inject": + def inject_context(): + files = _get_app_attr(app, "files") + if isinstance(files, list): + files.extend(data.get("files", [])) + lock = _get_app_attr(app, "_pending_gui_tasks_lock") + tasks = _get_app_attr(app, "_pending_gui_tasks") + if lock and tasks is not None: + with lock: tasks.append({"action": "custom_callback", "callback": inject_context}) + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8")) + elif self.path == "/api/mma/dag/mutate": + def mutate_dag(): + try: + func = _get_app_attr(app, "_mutate_dag") + if func: func(data) + except Exception as e: + sys.stderr.write(f"[DEBUG] Hook API mutate_dag error: {e}\n") + sys.stderr.flush() + lock = _get_app_attr(app, "_pending_gui_tasks_lock") + tasks = _get_app_attr(app, "_pending_gui_tasks") + if lock and tasks is not None: + with lock: tasks.append({"action": "custom_callback", "callback": mutate_dag}) + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8")) else: self.send_response(404) self.end_headers() diff --git a/tests/test_api_control_endpoints.py b/tests/test_api_control_endpoints.py new file mode 100644 index 0000000..0a45a45 --- /dev/null +++ b/tests/test_api_control_endpoints.py @@ -0,0 +1,21 @@ +import pytest +from src.api_hook_client import ApiHookClient + +@pytest.mark.asyncio +async def test_control_endpoints_exist(): + client = ApiHookClient() + assert hasattr(client, "spawn_mma_worker") + assert hasattr(client, "kill_mma_worker") + assert hasattr(client, "pause_mma_pipeline") + assert hasattr(client, "resume_mma_pipeline") + assert hasattr(client, "inject_context") + assert hasattr(client, "mutate_mma_dag") + +def test_api_hook_client_control_methods_exist(): + client = ApiHookClient() + assert callable(getattr(client, "spawn_mma_worker", None)) + assert callable(getattr(client, "kill_mma_worker", None)) + assert callable(getattr(client, "pause_mma_pipeline", None)) + assert callable(getattr(client, "resume_mma_pipeline", None)) + assert callable(getattr(client, "inject_context", None)) + assert callable(getattr(client, "mutate_mma_dag", None))