feat(simulation): stabilize IPC layer and verify full workflow

This commit is contained in:
2026-02-23 19:53:32 -05:00
parent ba97ccda3c
commit 8bd280efc1
5 changed files with 205 additions and 32 deletions

View File

@@ -3,7 +3,7 @@ import json
import time
class ApiHookClient:
def __init__(self, base_url="http://127.0.0.1:8999", max_retries=3, retry_delay=1):
def __init__(self, base_url="http://127.0.0.1:8999", max_retries=5, retry_delay=2):
self.base_url = base_url
self.max_retries = max_retries
self.retry_delay = retry_delay
@@ -29,9 +29,9 @@ class ApiHookClient:
for attempt in range(self.max_retries + 1):
try:
if method == 'GET':
response = requests.get(url, timeout=2)
response = requests.get(url, timeout=5)
elif method == 'POST':
response = requests.post(url, json=data, headers=headers, timeout=2)
response = requests.post(url, json=data, headers=headers, timeout=5)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
@@ -108,15 +108,28 @@ class ApiHookClient:
"value": value
})
def click(self, item):
def click(self, item, *args, **kwargs):
"""Simulates a click on a GUI button or item."""
user_data = kwargs.pop('user_data', None)
return self.post_gui({
"action": "click",
"item": item
"item": item,
"args": args,
"kwargs": kwargs,
"user_data": user_data
})
def get_indicator_state(self, tag):
"""Checks if an indicator is shown."""
# This requires a new API endpoint or using an existing one that returns GUI state.
# Let's check if /api/gui GET exists.
return self._make_request('GET', f'/api/gui/state/{tag}')
"""Checks if an indicator is shown using the diagnostics endpoint."""
# Mapping tag to the keys used in diagnostics endpoint
mapping = {
"thinking_indicator": "thinking",
"operations_live_indicator": "live",
"prior_session_indicator": "prior"
}
key = mapping.get(tag, tag)
try:
diag = self._make_request('GET', '/api/gui/diagnostics')
return {"tag": tag, "shown": diag.get(key, False)}
except Exception as e:
return {"tag": tag, "shown": False, "error": str(e)}

View File

@@ -21,11 +21,12 @@ class HookHandler(BaseHTTPRequestHandler):
self.end_headers()
self.wfile.write(json.dumps({'status': 'ok'}).encode('utf-8'))
elif self.path == '/api/project':
import project_manager
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(
json.dumps({'project': app.project}).encode('utf-8'))
flat = project_manager.flat_config(app.project)
self.wfile.write(json.dumps({'project': flat}).encode('utf-8'))
elif self.path == '/api/session':
self.send_response(200)
self.send_header('Content-Type', 'application/json')
@@ -41,16 +42,35 @@ class HookHandler(BaseHTTPRequestHandler):
if hasattr(app, 'perf_monitor'):
metrics = app.perf_monitor.get_metrics()
self.wfile.write(json.dumps({'performance': metrics}).encode('utf-8'))
elif self.path.startswith('/api/gui/state/'):
tag = self.path.replace('/api/gui/state/', '')
elif self.path == '/api/gui/diagnostics':
# Safe way to query multiple states at once via the main thread queue
event = threading.Event()
result = {}
def check_all():
import dearpygui.dearpygui as dpg
shown = False
if dpg.does_item_exist(tag):
shown = dpg.is_item_shown(tag)
try:
result["thinking"] = dpg.is_item_shown("thinking_indicator") if dpg.does_item_exist("thinking_indicator") else False
result["live"] = dpg.is_item_shown("operations_live_indicator") if dpg.does_item_exist("operations_live_indicator") else False
result["prior"] = dpg.is_item_shown("prior_session_indicator") if dpg.does_item_exist("prior_session_indicator") else False
finally:
event.set()
with app._pending_gui_tasks_lock:
app._pending_gui_tasks.append({
"action": "custom_callback",
"callback": check_all
})
if event.wait(timeout=2):
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({'tag': tag, 'shown': shown}).encode('utf-8'))
self.wfile.write(json.dumps(result).encode('utf-8'))
else:
self.send_response(504)
self.end_headers()
self.wfile.write(json.dumps({'error': 'timeout'}).encode('utf-8'))
else:
self.send_response(404)
self.end_headers()
@@ -80,11 +100,6 @@ class HookHandler(BaseHTTPRequestHandler):
self.wfile.write(
json.dumps({'status': 'updated'}).encode('utf-8'))
elif self.path == '/api/gui':
if not hasattr(app, '_pending_gui_tasks'):
app._pending_gui_tasks = []
if not hasattr(app, '_pending_gui_tasks_lock'):
app._pending_gui_tasks_lock = threading.Lock()
with app._pending_gui_tasks_lock:
app._pending_gui_tasks.append(data)
@@ -115,6 +130,13 @@ class HookServer:
def start(self):
if not getattr(self.app, 'test_hooks_enabled', False):
return
# Ensure the app has the task queue and lock initialized
if not hasattr(self.app, '_pending_gui_tasks'):
self.app._pending_gui_tasks = []
if not hasattr(self.app, '_pending_gui_tasks_lock'):
self.app._pending_gui_tasks_lock = threading.Lock()
self.server = HookServerInstance(('127.0.0.1', self.port), HookHandler, self.app)
self.thread = threading.Thread(target=self.server.serve_forever, daemon=True)
self.thread.start()

52
gui.py
View File

@@ -129,7 +129,7 @@ def _add_text_field(parent: str, label: str, value: str):
if wrap:
with dpg.child_window(height=80, border=True):
# add_input_text for selection
dpg.add_input_text(default_value=value, multiline=True, readonly=True, width=-1, height=-1, border=False)
dpg.add_input_text(default_value=value, multiline=True, readonly=True, width=-1, height=-1)
else:
dpg.add_input_text(
default_value=value,
@@ -140,14 +140,14 @@ def _add_text_field(parent: str, label: str, value: str):
)
else:
# Short selectable text
dpg.add_input_text(default_value=value if value else "(empty)", readonly=True, width=-1, border=False)
dpg.add_input_text(default_value=value if value else "(empty)", readonly=True, width=-1)
def _add_kv_row(parent: str, key: str, val, val_color=None):
"""Single key: value row, horizontally laid out."""
with dpg.group(horizontal=True, parent=parent):
dpg.add_text(f"{key}:", color=_LABEL_COLOR)
dpg.add_input_text(default_value=str(val), readonly=True, width=-1, border=False)
dpg.add_input_text(default_value=str(val), readonly=True, width=-1)
def _render_usage(parent: str, usage: dict):
@@ -1506,6 +1506,28 @@ class App:
self._rebuild_projects_list()
self._update_status(f"created project: {name}")
def _cb_new_project_automated(self, path):
"""Automated version of cb_new_project that doesn't show a dialog."""
if not path:
return
name = Path(path).stem
proj = project_manager.default_project(name)
project_manager.save_project(proj, path)
if path not in self.project_paths:
self.project_paths.append(path)
# Safely queue project switch and list rebuild for the main thread
def main_thread_work():
self._switch_project(path)
self._rebuild_projects_list()
self._update_status(f"created project: {name}")
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({
"action": "custom_callback",
"callback": main_thread_work
})
def cb_browse_git_dir(self):
root = hide_tk_root()
d = filedialog.askdirectory(title="Select Git Directory")
@@ -1882,6 +1904,9 @@ class App:
no_close=False,
no_collapse=True,
):
with dpg.group(tag="automated_actions_group", show=False):
dpg.add_button(tag="btn_project_new_automated", callback=lambda s, a, u: self._cb_new_project_automated(u))
with dpg.tab_bar():
with dpg.tab(label="Projects"):
proj_meta = self.project.get("project", {})
@@ -2301,10 +2326,24 @@ class App:
dpg.set_value(item, val)
elif action == "click":
item = task.get("item")
args = task.get("args", [])
kwargs = task.get("kwargs", {})
user_data = task.get("user_data")
if item and dpg.does_item_exist(item):
cb = dpg.get_item_callback(item)
if cb:
try:
# DPG callbacks can have (sender, app_data, user_data)
# If we have specific args/kwargs we use them,
# otherwise we try to follow the DPG pattern.
if args or kwargs:
cb(*args, **kwargs)
elif user_data is not None:
cb(item, None, user_data)
else:
cb()
except Exception as e:
print(f"Error in GUI hook callback for {item}: {e}")
elif action == "select_tab":
tab_bar = task.get("tab_bar")
tab = task.get("tab")
@@ -2320,6 +2359,13 @@ class App:
# Dear PyGui callbacks for listbox usually receive (sender, app_data, user_data)
# app_data is the selected value.
cb(listbox, val)
elif action == "custom_callback":
cb = task.get("callback")
if cb:
try:
cb()
except Exception as e:
print(f"Error in custom GUI hook callback: {e}")
elif action == "refresh_api_metrics":
self._refresh_api_metrics(task.get("payload", {}))
except Exception as e:

View File

@@ -32,11 +32,15 @@ def live_gui():
"""
print("\n[Fixture] Starting gui.py --enable-test-hooks...")
# Ensure logs directory exists
os.makedirs("logs", exist_ok=True)
log_file = open("logs/gui_test.log", "w", encoding="utf-8")
# Start gui.py as a subprocess.
process = subprocess.Popen(
["uv", "run", "python", "gui.py", "--enable-test-hooks"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
stdout=log_file,
stderr=log_file,
text=True,
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if os.name == 'nt' else 0
)

View File

@@ -0,0 +1,88 @@
import pytest
import time
import sys
import os
# Ensure project root is in path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from api_hook_client import ApiHookClient
@pytest.mark.integration
def test_full_live_workflow(live_gui):
"""
Integration test that drives the GUI through a full workflow.
"""
client = ApiHookClient()
assert client.wait_for_server(timeout=10)
time.sleep(2)
# 1. Reset
client.click("btn_reset")
time.sleep(1)
# 2. Project Setup
temp_project_path = os.path.abspath("tests/temp_project.toml")
if os.path.exists(temp_project_path):
os.remove(temp_project_path)
client.click("btn_project_new_automated", user_data=temp_project_path)
time.sleep(1) # Wait for project creation and switch
# Verify metadata update
proj = client.get_project()
test_git = os.path.abspath(".")
client.set_value("project_git_dir", test_git)
client.click("btn_project_save")
time.sleep(1)
proj = client.get_project()
# flat_config returns {"project": {...}, "output": ...}
# so proj is {"project": {"project": {"git_dir": ...}}}
assert proj['project']['project']['git_dir'] == test_git
# Enable auto-add so the response ends up in history
client.set_value("auto_add_history", True)
time.sleep(0.5)
# 3. Discussion Turn
client.set_value("ai_input", "Hello! This is an automated test. Just say 'Acknowledged'.")
client.click("btn_gen_send")
# Verify thinking indicator appears (might be brief)
thinking_seen = False
print("\nPolling for thinking indicator...")
for i in range(20):
state = client.get_indicator_state("thinking_indicator")
if state.get('shown'):
thinking_seen = True
print(f"Thinking indicator seen at poll {i}")
break
time.sleep(0.5)
# 4. Wait for response in session
success = False
print("Waiting for AI response in session...")
for i in range(60):
session = client.get_session()
entries = session.get('session', {}).get('entries', [])
if any(e.get('role') == 'AI' for e in entries):
success = True
print(f"AI response found at second {i}")
break
time.sleep(1)
assert success, "AI failed to respond within 60 seconds"
# 5. Switch Discussion
client.set_value("disc_new_name_input", "AutoDisc")
client.click("btn_disc_create")
time.sleep(0.5)
client.select_list_item("disc_listbox", "AutoDisc")
time.sleep(0.5)
# Verify session is empty in new discussion
session = client.get_session()
assert len(session.get('session', {}).get('entries', [])) == 0