ok
This commit is contained in:
109
tests/test_z_negative_flows.py
Normal file
109
tests/test_z_negative_flows.py
Normal file
@@ -0,0 +1,109 @@
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
from src import api_hook_client
|
||||
|
||||
def test_mock_malformed_json(live_gui) -> None:
|
||||
"""Test that the application handles malformed JSON from the provider."""
|
||||
client = api_hook_client.ApiHookClient()
|
||||
assert client.wait_for_server(timeout=15)
|
||||
|
||||
# Reset state
|
||||
client.click("btn_reset")
|
||||
time.sleep(1)
|
||||
|
||||
# Configure mock provider
|
||||
mock_path = Path("tests/mock_gemini_cli.py").absolute()
|
||||
client.set_value("current_provider", "gemini_cli")
|
||||
time.sleep(1)
|
||||
client.set_value("gcli_path", f'"{sys.executable}" "{mock_path}"')
|
||||
time.sleep(1)
|
||||
|
||||
# Inject MOCK_MODE
|
||||
client.push_event('custom_callback', {'callback': '_set_env_var', 'args': ['MOCK_MODE', 'malformed_json']})
|
||||
time.sleep(1)
|
||||
|
||||
try:
|
||||
# Trigger generation
|
||||
client.set_value("system_prompt_input", "Trigger malformed")
|
||||
client.click("btn_gen_send")
|
||||
|
||||
# Wait for response
|
||||
event = client.wait_for_event("response", timeout=15)
|
||||
assert event is not None, "Did not receive response event"
|
||||
assert event["payload"]["status"] == "error"
|
||||
assert "JSONDecodeError" in event["payload"]["text"] or "json" in event["payload"]["text"].lower()
|
||||
finally:
|
||||
# Cleanup
|
||||
client.push_event('custom_callback', {'callback': '_set_env_var', 'args': ['MOCK_MODE', 'success']})
|
||||
|
||||
|
||||
def test_mock_error_result(live_gui) -> None:
|
||||
"""Test that the application handles explicit error result from the provider."""
|
||||
client = api_hook_client.ApiHookClient()
|
||||
assert client.wait_for_server(timeout=15)
|
||||
|
||||
# Reset state
|
||||
client.click("btn_reset")
|
||||
time.sleep(1)
|
||||
|
||||
# Configure mock provider
|
||||
mock_path = Path("tests/mock_gemini_cli.py").absolute()
|
||||
client.set_value("current_provider", "gemini_cli")
|
||||
time.sleep(1)
|
||||
client.set_value("gcli_path", f'"{sys.executable}" "{mock_path}"')
|
||||
time.sleep(1)
|
||||
|
||||
# Inject MOCK_MODE
|
||||
client.push_event('custom_callback', {'callback': '_set_env_var', 'args': ['MOCK_MODE', 'error_result']})
|
||||
time.sleep(1)
|
||||
|
||||
try:
|
||||
# Trigger generation
|
||||
client.set_value("system_prompt_input", "Trigger error")
|
||||
client.click("btn_gen_send")
|
||||
|
||||
# Wait for response
|
||||
event = client.wait_for_event("response", timeout=15)
|
||||
assert event is not None, "Did not receive response event"
|
||||
assert event["payload"]["status"] == "error"
|
||||
assert "Mock simulated error" in event["payload"]["text"]
|
||||
finally:
|
||||
# Cleanup
|
||||
client.push_event('custom_callback', {'callback': '_set_env_var', 'args': ['MOCK_MODE', 'success']})
|
||||
|
||||
|
||||
def test_mock_timeout(live_gui) -> None:
|
||||
"""Test that the application handles a subprocess timeout."""
|
||||
client = api_hook_client.ApiHookClient()
|
||||
assert client.wait_for_server(timeout=15)
|
||||
|
||||
# Reset state
|
||||
client.click("btn_reset")
|
||||
time.sleep(1)
|
||||
|
||||
# Configure mock provider
|
||||
mock_path = Path("tests/mock_gemini_cli.py").absolute()
|
||||
client.set_value("current_provider", "gemini_cli")
|
||||
time.sleep(1)
|
||||
client.set_value("gcli_path", f'"{sys.executable}" "{mock_path}"')
|
||||
time.sleep(1)
|
||||
|
||||
# Inject MOCK_MODE
|
||||
client.push_event('custom_callback', {'callback': '_set_env_var', 'args': ['MOCK_MODE', 'timeout']})
|
||||
time.sleep(1)
|
||||
|
||||
try:
|
||||
# Trigger generation
|
||||
client.set_value("system_prompt_input", "Trigger timeout")
|
||||
client.click("btn_gen_send")
|
||||
|
||||
# Wait for response. Note: gemini_cli_adapter has a 60s timeout,
|
||||
# but the mock might not actually hang for 60s if we adjust it or we wait for 65s here.
|
||||
event = client.wait_for_event("response", timeout=70)
|
||||
assert event is not None, "Did not receive response event"
|
||||
assert event["payload"]["status"] == "error"
|
||||
assert "timeout" in event["payload"]["text"].lower()
|
||||
finally:
|
||||
# Cleanup
|
||||
client.push_event('custom_callback', {'callback': '_set_env_var', 'args': ['MOCK_MODE', 'success']})
|
||||
Reference in New Issue
Block a user