import requests import json import time class ApiHookClient: def __init__(self, base_url="http://127.0.0.1:8999", max_retries=2, retry_delay=0.1): self.base_url = base_url self.max_retries = max_retries self.retry_delay = retry_delay def wait_for_server(self, timeout=3): """ Polls the /status endpoint until the server is ready or timeout is reached. """ start_time = time.time() while time.time() - start_time < timeout: try: if self.get_status().get('status') == 'ok': return True except (requests.exceptions.ConnectionError, requests.exceptions.Timeout): time.sleep(0.1) return False def _make_request(self, method, endpoint, data=None): url = f"{self.base_url}{endpoint}" headers = {'Content-Type': 'application/json'} last_exception = None # Lower request timeout for local server req_timeout = 0.5 for attempt in range(self.max_retries + 1): try: if method == 'GET': response = requests.get(url, timeout=req_timeout) elif method == 'POST': response = requests.post(url, json=data, headers=headers, timeout=req_timeout) else: raise ValueError(f"Unsupported HTTP method: {method}") response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) return response.json() except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: last_exception = e if attempt < self.max_retries: time.sleep(self.retry_delay) continue else: if isinstance(e, requests.exceptions.Timeout): raise requests.exceptions.Timeout(f"Request to {endpoint} timed out after {self.max_retries} retries.") from e else: raise requests.exceptions.ConnectionError(f"Could not connect to API hook server at {self.base_url} after {self.max_retries} retries.") from e except requests.exceptions.HTTPError as e: raise requests.exceptions.HTTPError(f"HTTP error {e.response.status_code} for {endpoint}: {e.response.text}") from e except json.JSONDecodeError as e: raise ValueError(f"Failed to decode JSON from response for {endpoint}: {response.text}") from e if last_exception: raise last_exception def get_status(self): """Checks the health of the hook server.""" url = f"{self.base_url}/status" try: response = requests.get(url, timeout=0.2) response.raise_for_status() return response.json() except Exception: raise requests.exceptions.ConnectionError(f"Could not reach /status at {self.base_url}") def get_project(self): return self._make_request('GET', '/api/project') def post_project(self, project_data): return self._make_request('POST', '/api/project', data={'project': project_data}) def get_session(self): return self._make_request('GET', '/api/session') def get_performance(self): """Retrieves UI performance metrics.""" return self._make_request('GET', '/api/performance') def post_session(self, session_entries): return self._make_request('POST', '/api/session', data={'session': {'entries': session_entries}}) def post_gui(self, gui_data): return self._make_request('POST', '/api/gui', data=gui_data) def select_tab(self, tab_bar, tab): """Tells the GUI to switch to a specific tab in a tab bar.""" return self.post_gui({ "action": "select_tab", "tab_bar": tab_bar, "tab": tab }) def select_list_item(self, listbox, item_value): """Tells the GUI to select an item in a listbox by its value.""" return self.post_gui({ "action": "select_list_item", "listbox": listbox, "item_value": item_value }) def set_value(self, item, value): """Sets the value of a GUI item.""" return self.post_gui({ "action": "set_value", "item": item, "value": value }) def get_value(self, item): """Gets the value of a GUI item via its mapped field.""" try: # First try direct field querying via POST res = self._make_request('POST', '/api/gui/value', data={"field": item}) if res and "value" in res: v = res.get("value") if v is not None: return v except Exception: pass try: # Try GET fallback res = self._make_request('GET', f'/api/gui/value/{item}') if res and "value" in res: v = res.get("value") if v is not None: return v except Exception: pass try: # Fallback for thinking/live/prior which are in diagnostics diag = self._make_request('GET', '/api/gui/diagnostics') if item in diag: return diag[item] # Map common indicator tags to diagnostics keys mapping = { "thinking_indicator": "thinking", "operations_live_indicator": "live", "prior_session_indicator": "prior" } key = mapping.get(item) if key and key in diag: return diag[key] except Exception: pass return None def click(self, item, *args, **kwargs): """Simulates a click on a GUI button or item.""" user_data = kwargs.pop('user_data', None) return self.post_gui({ "action": "click", "item": item, "args": args, "kwargs": kwargs, "user_data": user_data }) def get_indicator_state(self, tag): """Checks if an indicator is shown using the diagnostics endpoint.""" # Mapping tag to the keys used in diagnostics endpoint mapping = { "thinking_indicator": "thinking", "operations_live_indicator": "live", "prior_session_indicator": "prior" } key = mapping.get(tag, tag) try: diag = self._make_request('GET', '/api/gui/diagnostics') return {"tag": tag, "shown": diag.get(key, False)} except Exception as e: return {"tag": tag, "shown": False, "error": str(e)} def get_events(self): """Fetches and clears the event queue from the server.""" try: return self._make_request('GET', '/api/events').get("events", []) except Exception: return [] def wait_for_event(self, event_type, timeout=5): """Polls for a specific event type.""" start = time.time() while time.time() - start < timeout: events = self.get_events() for ev in events: if ev.get("type") == event_type: return ev time.sleep(0.1) # Fast poll return None def wait_for_value(self, item, expected, timeout=5): """Polls until get_value(item) == expected.""" start = time.time() while time.time() - start < timeout: if self.get_value(item) == expected: return True time.sleep(0.1) # Fast poll return False def reset_session(self): """Simulates clicking the 'Reset Session' button in the GUI.""" return self.click("btn_reset")