feat(simulation): stabilize IPC layer and verify full workflow

This commit is contained in:
2026-02-23 19:53:32 -05:00
parent ba97ccda3c
commit 8bd280efc1
5 changed files with 205 additions and 32 deletions

View File

@@ -3,7 +3,7 @@ import json
import time
class ApiHookClient:
def __init__(self, base_url="http://127.0.0.1:8999", max_retries=3, retry_delay=1):
def __init__(self, base_url="http://127.0.0.1:8999", max_retries=5, retry_delay=2):
self.base_url = base_url
self.max_retries = max_retries
self.retry_delay = retry_delay
@@ -29,9 +29,9 @@ class ApiHookClient:
for attempt in range(self.max_retries + 1):
try:
if method == 'GET':
response = requests.get(url, timeout=2)
response = requests.get(url, timeout=5)
elif method == 'POST':
response = requests.post(url, json=data, headers=headers, timeout=2)
response = requests.post(url, json=data, headers=headers, timeout=5)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
@@ -108,15 +108,28 @@ class ApiHookClient:
"value": value
})
def click(self, item):
def click(self, item, *args, **kwargs):
"""Simulates a click on a GUI button or item."""
user_data = kwargs.pop('user_data', None)
return self.post_gui({
"action": "click",
"item": item
"item": item,
"args": args,
"kwargs": kwargs,
"user_data": user_data
})
def get_indicator_state(self, tag):
"""Checks if an indicator is shown."""
# This requires a new API endpoint or using an existing one that returns GUI state.
# Let's check if /api/gui GET exists.
return self._make_request('GET', f'/api/gui/state/{tag}')
"""Checks if an indicator is shown using the diagnostics endpoint."""
# Mapping tag to the keys used in diagnostics endpoint
mapping = {
"thinking_indicator": "thinking",
"operations_live_indicator": "live",
"prior_session_indicator": "prior"
}
key = mapping.get(tag, tag)
try:
diag = self._make_request('GET', '/api/gui/diagnostics')
return {"tag": tag, "shown": diag.get(key, False)}
except Exception as e:
return {"tag": tag, "shown": False, "error": str(e)}