from __future__ import annotations import requests import json import time from typing import Any class ApiHookClient: def __init__(self, base_url: str = "http://127.0.0.1:8999", max_retries: int = 5, retry_delay: float = 0.2) -> None: self.base_url = base_url self.max_retries = max_retries self.retry_delay = retry_delay def wait_for_server(self, timeout: float = 3) -> bool: """ Polls the /status endpoint until the server is ready or timeout is reached. """ start_time = time.time() while time.time() - start_time < timeout: try: if self.get_status().get('status') == 'ok': return True except (requests.exceptions.ConnectionError, requests.exceptions.Timeout): time.sleep(0.1) return False def _make_request(self, method: str, endpoint: str, data: dict | None = None, timeout: float | None = None) -> dict | None: url = f"{self.base_url}{endpoint}" headers = {'Content-Type': 'application/json'} last_exception = None # Increase default request timeout for local server req_timeout = timeout if timeout is not None else 2.0 for attempt in range(self.max_retries + 1): try: if method == 'GET': response = requests.get(url, timeout=req_timeout) elif method == 'POST': response = requests.post(url, json=data, headers=headers, timeout=req_timeout) else: raise ValueError(f"Unsupported HTTP method: {method}") response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) return response.json() except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: last_exception = e if attempt < self.max_retries: time.sleep(self.retry_delay) continue else: if isinstance(e, requests.exceptions.Timeout): raise requests.exceptions.Timeout(f"Request to {endpoint} timed out after {self.max_retries} retries.") from e else: raise requests.exceptions.ConnectionError(f"Could not connect to API hook server at {self.base_url} after {self.max_retries} retries.") from e except requests.exceptions.HTTPError as e: raise requests.exceptions.HTTPError(f"HTTP error {e.response.status_code} for {endpoint}: {e.response.text}") from e except json.JSONDecodeError as e: raise ValueError(f"Failed to decode JSON from response for {endpoint}: {response.text}") from e if last_exception: raise last_exception def get_status(self) -> dict: """Checks the health of the hook server.""" url = f"{self.base_url}/status" try: response = requests.get(url, timeout=5.0) response.raise_for_status() return response.json() except Exception: raise requests.exceptions.ConnectionError(f"Could not reach /status at {self.base_url}") def get_project(self) -> dict | None: return self._make_request('GET', '/api/project') def post_project(self, project_data: dict) -> dict | None: return self._make_request('POST', '/api/project', data={'project': project_data}) def get_session(self) -> dict | None: return self._make_request('GET', '/api/session') def get_mma_status(self) -> dict | None: """Retrieves current MMA status (track, tickets, tier, etc.)""" return self._make_request('GET', '/api/gui/mma_status') def push_event(self, event_type: str, payload: dict) -> dict | None: """Pushes an event to the GUI's AsyncEventQueue via the /api/gui endpoint.""" return self.post_gui({ "action": event_type, "payload": payload }) def get_performance(self) -> dict | None: """Retrieves UI performance metrics.""" return self._make_request('GET', '/api/performance') def post_session(self, session_entries: list) -> dict | None: return self._make_request('POST', '/api/session', data={'session': {'entries': session_entries}}) def post_gui(self, gui_data: dict) -> dict | None: return self._make_request('POST', '/api/gui', data=gui_data) def select_tab(self, tab_bar: str, tab: str) -> dict | None: """Tells the GUI to switch to a specific tab in a tab bar.""" return self.post_gui({ "action": "select_tab", "tab_bar": tab_bar, "tab": tab }) def select_list_item(self, listbox: str, item_value: str) -> dict | None: """Tells the GUI to select an item in a listbox by its value.""" return self.post_gui({ "action": "select_list_item", "listbox": listbox, "item_value": item_value }) def set_value(self, item: str, value: Any) -> dict | None: """Sets the value of a GUI item.""" return self.post_gui({ "action": "set_value", "item": item, "value": value }) def get_value(self, item: str) -> Any: """Gets the value of a GUI item via its mapped field.""" try: # First try direct field querying via POST res = self._make_request('POST', '/api/gui/value', data={"field": item}) if res and "value" in res: v = res.get("value") if v is not None: return v except Exception: pass try: # Try GET fallback res = self._make_request('GET', f'/api/gui/value/{item}') if res and "value" in res: v = res.get("value") if v is not None: return v except Exception: pass try: # Fallback for thinking/live/prior which are in diagnostics diag = self._make_request('GET', '/api/gui/diagnostics') if item in diag: return diag[item] # Map common indicator tags to diagnostics keys mapping = { "thinking_indicator": "thinking", "operations_live_indicator": "live", "prior_session_indicator": "prior" } key = mapping.get(item) if key and key in diag: return diag[key] except Exception: pass return None def get_text_value(self, item_tag: str) -> str | None: """Wraps get_value and returns its string representation, or None.""" val = self.get_value(item_tag) return str(val) if val is not None else None def get_node_status(self, node_tag: str) -> Any: """Wraps get_value for a DAG node or queries the diagnostic endpoint for its status.""" val = self.get_value(node_tag) if val is not None: return val try: diag = self._make_request('GET', '/api/gui/diagnostics') if 'nodes' in diag and node_tag in diag['nodes']: return diag['nodes'][node_tag] if node_tag in diag: return diag[node_tag] except Exception: pass return None def click(self, item: str, *args: Any, **kwargs: Any) -> dict | None: """Simulates a click on a GUI button or item.""" user_data = kwargs.pop('user_data', None) return self.post_gui({ "action": "click", "item": item, "args": args, "kwargs": kwargs, "user_data": user_data }) def get_indicator_state(self, tag: str) -> dict: """Checks if an indicator is shown using the diagnostics endpoint.""" # Mapping tag to the keys used in diagnostics endpoint mapping = { "thinking_indicator": "thinking", "operations_live_indicator": "live", "prior_session_indicator": "prior" } key = mapping.get(tag, tag) try: diag = self._make_request('GET', '/api/gui/diagnostics') return {"tag": tag, "shown": diag.get(key, False)} except Exception as e: return {"tag": tag, "shown": False, "error": str(e)} def get_events(self) -> list: """Fetches and clears the event queue from the server.""" try: return self._make_request('GET', '/api/events').get("events", []) except Exception: return [] def wait_for_event(self, event_type: str, timeout: float = 5) -> dict | None: """Polls for a specific event type.""" start = time.time() while time.time() - start < timeout: events = self.get_events() for ev in events: if ev.get("type") == event_type: return ev time.sleep(0.1) # Fast poll return None def wait_for_value(self, item: str, expected: Any, timeout: float = 5) -> bool: """Polls until get_value(item) == expected.""" start = time.time() while time.time() - start < timeout: if self.get_value(item) == expected: return True time.sleep(0.1) # Fast poll return False def reset_session(self) -> dict | None: """Simulates clicking the 'Reset Session' button in the GUI.""" return self.click("btn_reset") def request_confirmation(self, tool_name: str, args: dict) -> Any: """Asks the user for confirmation via the GUI (blocking call).""" # Using a long timeout as this waits for human input (60 seconds) res = self._make_request('POST', '/api/ask', data={'type': 'tool_approval', 'tool': tool_name, 'args': args}, timeout=60.0) return res.get('response')