Files
manual_slop/tests/test_sync_hooks.py

56 lines
1.4 KiB
Python

import threading
import time
import requests
import pytest
from api_hook_client import ApiHookClient
def test_api_ask_client_method(live_gui) -> None:
"""
Tests the request_confirmation method in ApiHookClient.
"""
client = ApiHookClient("http://127.0.0.1:8999")
# Drain existing events
client.get_events()
results = {"response": None, "error": None}
def make_blocking_request() -> None:
try:
# This call should block until we respond
results["response"] = client.request_confirmation(
tool_name="powershell",
args={"command": "echo hello"}
)
except Exception as e:
results["error"] = str(e)
# Start the request in a background thread
t = threading.Thread(target=make_blocking_request)
t.start()
# Poll for the 'ask_received' event
request_id = None
start_time = time.time()
while time.time() - start_time < 5:
events = client.get_events()
for ev in events:
if ev.get("type") == "ask_received":
request_id = ev.get("request_id")
break
if request_id:
break
time.sleep(0.1)
assert request_id is not None, "Timed out waiting for 'ask_received' event"
# Respond
expected_response = {"approved": True}
resp = requests.post(
"http://127.0.0.1:8999/api/ask/respond",
json={
"request_id": request_id,
"response": expected_response
}
)
assert resp.status_code == 200
t.join(timeout=5)
assert not t.is_alive()
assert results["error"] is None
assert results["response"] == expected_response