""" API Hook Client - Python client for the Hook API. This module provides a Python client for interacting with the Hook API exposed by the application on port 8999. It is used for: - Automated GUI testing via the `live_gui` pytest fixture - External tool integration - Remote control of the application Architecture: - Uses requests library for HTTP communication - All methods return dict[str, Any] or None - Handles connection errors gracefully (returns None on failure) Key Method Categories: 1. Connection: wait_for_server, get_status 2. State Query: get_project, get_session. get_performance, get_mma_status 3. GUI Manipulation: click, set_value, select_tab, select_list_item 4. Polling: wait_for_event 5. HITL: request_confirmation Timeout Handling: - Standard operations: 5s timeout - HITL dialogs: 60s timeout (waits for human input) Integration: - Used by simulation tests (tests/visual_sim_mma_v2.py) - Used by external tools for automation See Also: - src/api_hooks.py for the server implementation - docs/guide_tools.md for Hook API documentation """ from __future__ import annotations import requests # type: ignore[import-untyped] import time from typing import Any class ApiHookClient: def __init__(self, base_url: str = "http://127.0.0.1:8999", api_key: str | None = None): self.base_url = base_url.rstrip('/') self.api_key = api_key def _make_request(self, method: str, path: str, data: dict | None = None, timeout: float = 5.0) -> dict[str, Any] | None: """Helper to make HTTP requests to the hook server.""" url = f"{self.base_url}{path}" headers = {} if self.api_key: headers["X-API-KEY"] = self.api_key if method not in ('GET', 'POST', 'DELETE'): raise ValueError(f"Unsupported HTTP method: {method}") try: if method == 'GET': response = requests.get(url, headers=headers, timeout=timeout) elif method == 'POST': response = requests.post(url, json=data, headers=headers, timeout=timeout) elif method == 'DELETE': response = requests.delete(url, headers=headers, timeout=timeout) if response.status_code == 200: return response.json() return None except Exception: # Silently ignore connection errors unless we are in a wait loop return None def wait_for_server(self, timeout: int = 15) -> bool: """Polls the health endpoint until the server responds or timeout occurs.""" start = time.time() while time.time() - start < timeout: status = self.get_status() if status and (status.get("status") == "ok" or "status" in status): return True time.sleep(0.5) return False def get_status(self) -> dict[str, Any]: """Checks the health of the hook server.""" res = self._make_request('GET', '/status') if res is None: # For backward compatibility with tests expecting ConnectionError # But our _make_request handles it. Let's return empty if failed. return {} return res def post_project(self, project_data: dict) -> dict[str, Any]: return self._make_request('POST', '/api/project', data=project_data) or {} def get_project(self) -> dict[str, Any]: """Retrieves the current project state.""" return self._make_request('GET', '/api/project') or {} def get_session(self) -> dict[str, Any]: """Retrieves the current discussion session history.""" return self._make_request('GET', '/api/session') or {} def post_session(self, session_entries: list[dict]) -> dict[str, Any]: """Updates the session history.""" return self._make_request('POST', '/api/session', data={"session": {"entries": session_entries}}) or {} def get_events(self) -> list[dict[str, Any]]: res = self._make_request('GET', '/api/events') return res.get("events", []) if res else [] def clear_events(self) -> list[dict[str, Any]]: return self.get_events() def wait_for_event(self, event_type: str, timeout: int = 5) -> dict[str, Any] | None: start = time.time() while time.time() - start < timeout: events = self.get_events() for ev in events: if ev.get("type") == event_type or ev.get("event_type") == event_type: return ev time.sleep(0.2) return None def post_gui(self, payload: dict) -> dict[str, Any]: """Pushes an event to the GUI's AsyncEventQueue via the /api/gui endpoint.""" return self._make_request('POST', '/api/gui', data=payload) or {} def push_event(self, action: str, payload: dict) -> dict[str, Any]: """Convenience to push a GUI task.""" return self.post_gui({"action": action, **payload}) def click(self, item: str, user_data: Any = None) -> dict[str, Any]: """Simulates a button click.""" return self.post_gui({"action": "click", "item": item, "user_data": user_data}) def set_value(self, item: str, value: Any) -> dict[str, Any]: """Sets the value of a GUI widget.""" return self.post_gui({"action": "set_value", "item": item, "value": value}) def select_tab(self, item: str, value: str) -> dict[str, Any]: """Selects a specific tab in a tab bar.""" return self.set_value(item, value) def select_list_item(self, item: str, value: str) -> dict[str, Any]: """Selects an item in a listbox or combo.""" return self.set_value(item, value) def get_gui_state(self) -> dict[str, Any]: """Returns the full GUI state available via the hook API.""" return self._make_request('GET', '/api/gui/state') or {} def get_value(self, item: str) -> Any: """Gets the value of a GUI item via its mapped field.""" # Try state endpoint first (new preferred way) state = self.get_gui_state() if item in state: return state[item] # Fallback for thinking/live/prior which are in diagnostics diag = self.get_gui_diagnostics() if diag and item in diag: return diag[item] # Map common indicator tags to diagnostics keys mapping = { "thinking_indicator": "thinking", "operations_live_indicator": "live", "prior_session_indicator": "prior" } key = mapping.get(item) if diag and key and key in diag: return diag[key] return None def get_text_value(self, item_tag: str) -> str | None: """Wraps get_value and returns its string representation, or None.""" val = self.get_value(item_tag) return str(val) if val is not None else None def get_indicator_state(self, item_tag: str) -> dict[str, bool]: """Returns the visibility/active state of a status indicator.""" val = self.get_value(item_tag) return {"shown": bool(val)} def get_gui_diagnostics(self) -> dict[str, Any]: """Retrieves performance and diagnostic metrics.""" return self._make_request('GET', '/api/gui/diagnostics') or {} def get_performance(self) -> dict[str, Any]: """Retrieves performance metrics from the dedicated endpoint.""" return self._make_request('GET', '/api/performance') or {} def get_mma_status(self) -> dict[str, Any]: """Retrieves the dedicated MMA engine status.""" return self._make_request('GET', '/api/gui/mma_status') or {} def get_mma_workers(self) -> dict[str, Any]: """Retrieves status for all active MMA workers.""" return self._make_request('GET', '/api/mma/workers') or {} def get_context_state(self) -> dict[str, Any]: """Retrieves the current file and screenshot context state.""" return self._make_request('GET', '/api/context/state') or {} def get_financial_metrics(self) -> dict[str, Any]: """Retrieves token usage and estimated financial cost metrics.""" return self._make_request('GET', '/api/metrics/financial') or {} def get_system_telemetry(self) -> dict[str, Any]: """Retrieves system-level telemetry including thread status and event queue size.""" return self._make_request('GET', '/api/system/telemetry') or {} def get_node_status(self, node_id: str) -> dict[str, Any]: """Retrieves status for a specific node in the MMA DAG.""" return self._make_request('GET', f'/api/mma/node/{node_id}') or {} def request_confirmation(self, tool_name: str, args: dict) -> bool | None: """ Pushes a manual confirmation request and waits for response. Blocks for up to 60 seconds. """ # Long timeout as this waits for human input (60 seconds) res = self._make_request('POST', '/api/ask', data={'type': 'tool_approval', 'tool': tool_name, 'args': args}, timeout=60.0) return res.get('response') if res else None def reset_session(self) -> None: """Resets the current session via button click.""" self.click("btn_reset") def trigger_patch(self, patch_text: str, file_paths: list[str]) -> dict[str, Any]: """Triggers the patch modal to show in the GUI.""" return self._make_request('POST', '/api/patch/trigger', data={ "patch_text": patch_text, "file_paths": file_paths }) or {} def apply_patch(self) -> dict[str, Any]: """Applies the pending patch.""" return self._make_request('POST', '/api/patch/apply') or {} def reject_patch(self) -> dict[str, Any]: """Rejects the pending patch.""" return self._make_request('POST', '/api/patch/reject') or {} def get_patch_status(self) -> dict[str, Any]: """Gets the current patch modal status.""" return self._make_request('GET', '/api/patch/status') or {} def spawn_mma_worker(self, data: dict) -> dict: return self._make_request('POST', '/api/mma/workers/spawn', data=data) or {} def kill_mma_worker(self, worker_id: str) -> dict: return self._make_request('POST', '/api/mma/workers/kill', data={"worker_id": worker_id}) or {} def pause_mma_pipeline(self) -> dict: return self._make_request('POST', '/api/mma/pipeline/pause') or {} def resume_mma_pipeline(self) -> dict: return self._make_request('POST', '/api/mma/pipeline/resume') or {} def inject_context(self, data: dict) -> dict: return self._make_request('POST', '/api/context/inject', data=data) or {} def mutate_mma_dag(self, data: dict) -> dict: return self._make_request('POST', '/api/mma/dag/mutate', data=data) or {}