4a33848620
Root cause: test_full_live_workflow in batch context (with prior sims running AI discussion turns) would queue its _do_project_switch behind the auto-pruner's scan of tests/logs/ (154MB, 6519 files). The 4-worker pool was saturated, so the switch would never run within 30s. Fix: bump IO_POOL_MAX_WORKERS from 4 to 8. This gives the pool enough capacity to run: 2 pruners + the project switch + 5 spare. Also: add /api/io_pool_status endpoint + get_io_pool_status + wait_io_pool_idle helpers (kept in api_hooks.py and api_hook_client.py for the test_api_hook_client_io_pool.py tests, even though the test itself no longer uses them - they remain useful for future tests that want to assert pool state directly). Also: add wait_for_warmup at the start of test_full_live_workflow to ensure SDK modules are loaded before AI ops. Test verification: - test_full_live_workflow in isolation: 11.83s PASS - test_full_live_workflow in batch (with 4 prior sims): 83.46s PASS - 30/30 related unit tests PASS
189 lines
6.5 KiB
Python
189 lines
6.5 KiB
Python
"""
|
|
ANTI-SIMPLIFICATION: These tests verify the end-to-end full live workflow.
|
|
They MUST NOT be simplified. They depend on exact execution states and timing
|
|
through the actual GUI and ApiHookClient interface.
|
|
"""
|
|
import pytest
|
|
import time
|
|
import sys
|
|
import os
|
|
|
|
# Ensure project root is in path
|
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
|
|
|
|
from src.api_hook_client import ApiHookClient
|
|
|
|
def wait_for_value(client, field, expected, timeout=10):
|
|
"""
|
|
|
|
|
|
Helper to poll the GUI state until a field matches the expected value.
|
|
"""
|
|
start = time.time()
|
|
while time.time() - start < timeout:
|
|
state = client.get_gui_state()
|
|
val = state.get(field)
|
|
if val == expected:
|
|
return True
|
|
time.sleep(0.5)
|
|
return False
|
|
|
|
@pytest.mark.integration
|
|
def test_full_live_workflow(live_gui) -> None:
|
|
"""
|
|
|
|
|
|
Integration test that drives the GUI through a full workflow.
|
|
ANTI-SIMPLIFICATION: Asserts exact AI behavior, thinking state tracking,
|
|
and response logging in discussion history.
|
|
"""
|
|
client = ApiHookClient()
|
|
assert client.wait_for_server(timeout=10)
|
|
client.post_session(session_entries=[])
|
|
|
|
# 0a. Wait for app warmup to complete. The warmup submits heavy-module
|
|
# import jobs directly to the io_pool (bypassing submit_io's counter);
|
|
# we wait for the warmup done event so SDK modules are guaranteed loaded
|
|
# before AI ops.
|
|
warmup_result = client.get_warmup_wait(timeout=60.0)
|
|
print(f"[TEST] Warmup result: {warmup_result}")
|
|
|
|
# 0b. Wait for any in-flight project switch to complete before starting.
|
|
# If we proceed without waiting, our new switch will be queued behind
|
|
# the hung one and is_project_stale() will return True, blocking AI ops.
|
|
pre_status = client.get_project_switch_status()
|
|
if pre_status.get("in_progress"):
|
|
print(f"\n[TEST] Waiting for prior project switch to complete: {pre_status}")
|
|
idle_status = client.wait_for_project_switch(timeout=60.0)
|
|
assert not idle_status.get("timeout"), (
|
|
f"Prior project switch did not complete in 60s. Aborting. "
|
|
f"Last status: {idle_status}"
|
|
)
|
|
print(f"[TEST] Prior switch done: {idle_status}")
|
|
|
|
# 1. Reset
|
|
print("\n[TEST] Clicking Reset...")
|
|
client.click("btn_reset")
|
|
time.sleep(1)
|
|
|
|
# 2. Project Setup
|
|
temp_project_path = os.path.abspath("tests/artifacts/temp_project.toml")
|
|
if os.path.exists(temp_project_path):
|
|
try: os.remove(temp_project_path)
|
|
except: pass
|
|
print(f"[TEST] Creating new project at {temp_project_path}...")
|
|
client.click("btn_project_new_automated", user_data=temp_project_path)
|
|
# Defensive: fail fast if the click was dropped or the handler crashed
|
|
# before writing the project file.
|
|
import time as _time
|
|
_start = _time.time()
|
|
while _time.time() - _start < 5.0:
|
|
if os.path.exists(temp_project_path):
|
|
break
|
|
_time.sleep(0.1)
|
|
assert os.path.exists(temp_project_path), (
|
|
f"temp_project.toml not created within 5s of click. "
|
|
f"Click may have been dropped or _cb_new_project_automated crashed."
|
|
)
|
|
|
|
# Wait for project switch to complete (deterministic, condition-based).
|
|
# Replaces the prior 10x1s blind poll of derived state.
|
|
status = client.wait_for_project_switch(expected_path=temp_project_path, timeout=30.0)
|
|
assert not status.get("timeout"), (
|
|
f"Project switch did not complete in 30s. Last status: {status}"
|
|
)
|
|
assert not status.get("error"), (
|
|
f"Project switch failed with error: {status.get('error')}"
|
|
)
|
|
|
|
test_git = os.path.abspath(".")
|
|
print(f"[TEST] Setting project_git_dir to {test_git}...")
|
|
client.set_value("project_git_dir", test_git)
|
|
assert wait_for_value(client, "project_git_dir", test_git)
|
|
|
|
client.click("btn_project_save")
|
|
time.sleep(1)
|
|
|
|
# Enable auto-add so the response ends up in history
|
|
client.set_value("auto_add_history", True)
|
|
client.set_value("current_provider", "gemini")
|
|
|
|
# USE gemini-2.0-flash-lite (Actual current model)
|
|
client.set_value("current_model", "gemini-2.5-flash-lite")
|
|
time.sleep(1)
|
|
|
|
# 3. Discussion Turn
|
|
print("[TEST] Sending AI request...")
|
|
client.set_value("ai_input", "Hello! This is an automated test. Just say 'Acknowledged'.")
|
|
client.click("btn_gen_send")
|
|
|
|
# Verify thinking indicator appears or ai_status changes
|
|
print("[TEST] Polling for thinking indicator...")
|
|
success = False
|
|
for i in range(20):
|
|
mma = client.get_mma_status()
|
|
ai_status = mma.get('ai_status')
|
|
print(f" Poll {i}: ai_status='{ai_status}'")
|
|
if ai_status == 'error':
|
|
state = client.get_gui_state()
|
|
pytest.fail(f"AI Status went to error during thinking poll. Response: {state.get('ai_response')}")
|
|
|
|
if ai_status == 'sending...' or ai_status == 'streaming...':
|
|
print(f" AI is sending/streaming at poll {i}")
|
|
success = True
|
|
# Don't break, keep watching for a bit
|
|
|
|
indicator = client.get_indicator_state("thinking_indicator")
|
|
if indicator.get('shown'):
|
|
print(f" Thinking indicator seen at poll {i}")
|
|
success = True
|
|
break
|
|
time.sleep(0.5)
|
|
|
|
# 4. Wait for response in session
|
|
success = False
|
|
print("[TEST] Waiting for AI response in session history...")
|
|
for i in range(60):
|
|
session = client.get_session()
|
|
entries = session.get('session', {}).get('entries', [])
|
|
# Check for AI role. The entries are objects with a 'role' key.
|
|
found_ai = any(str(e.get('role', '')).upper() == 'AI' for e in entries)
|
|
if found_ai:
|
|
success = True
|
|
print(f" AI response found in history after {i}s")
|
|
break
|
|
|
|
mma = client.get_mma_status()
|
|
if mma.get('ai_status') == 'error':
|
|
state = client.get_gui_state()
|
|
pytest.fail(f"AI Status went to error during response wait. Response: {state.get('ai_response')}")
|
|
|
|
time.sleep(1)
|
|
|
|
# FALLBACK: if not in entries yet, check if ai_response is populated and status is done
|
|
if not success:
|
|
mma = client.get_mma_status()
|
|
if mma.get('ai_status') == 'done' or mma.get('ai_status') == 'idle':
|
|
state = client.get_gui_state()
|
|
if state.get('ai_response'):
|
|
print("[TEST] AI response found in ai_response field (fallback)")
|
|
success = True
|
|
|
|
assert success, f"AI failed to respond. Entries: {client.get_session()}, Status: {client.get_mma_status()}"
|
|
|
|
# 5. Switch Discussion
|
|
print("[TEST] Creating new discussion 'AutoDisc'...")
|
|
client.set_value("disc_new_name_input", "AutoDisc")
|
|
client.click("btn_disc_create")
|
|
time.sleep(1.0)
|
|
|
|
print("[TEST] Switching to 'AutoDisc'...")
|
|
client.select_list_item("disc_listbox", "AutoDisc")
|
|
time.sleep(1.0)
|
|
|
|
# Verify session is empty in new discussion
|
|
session = client.get_session()
|
|
entries = session.get('session', {}).get('entries', [])
|
|
print(f" New discussion history length: {len(entries)}")
|
|
assert len(entries) == 0
|
|
print("[TEST] Workflow completed successfully.") |