Private
Public Access
0
0
Files
manual_slop/tests/test_live_workflow.py
T
ed 4a33848620 fix(io_pool): increase worker count from 4 to 8 to prevent test hangs
Root cause: test_full_live_workflow in batch context (with prior sims
running AI discussion turns) would queue its _do_project_switch behind
the auto-pruner's scan of tests/logs/ (154MB, 6519 files). The 4-worker
pool was saturated, so the switch would never run within 30s.

Fix: bump IO_POOL_MAX_WORKERS from 4 to 8. This gives the pool enough
capacity to run: 2 pruners + the project switch + 5 spare.

Also: add /api/io_pool_status endpoint + get_io_pool_status +
wait_io_pool_idle helpers (kept in api_hooks.py and api_hook_client.py
for the test_api_hook_client_io_pool.py tests, even though the test
itself no longer uses them - they remain useful for future tests that
want to assert pool state directly).

Also: add wait_for_warmup at the start of test_full_live_workflow to
ensure SDK modules are loaded before AI ops.

Test verification:
- test_full_live_workflow in isolation: 11.83s PASS
- test_full_live_workflow in batch (with 4 prior sims): 83.46s PASS
- 30/30 related unit tests PASS
2026-06-08 17:49:34 -04:00

189 lines
6.5 KiB
Python

"""
ANTI-SIMPLIFICATION: These tests verify the end-to-end full live workflow.
They MUST NOT be simplified. They depend on exact execution states and timing
through the actual GUI and ApiHookClient interface.
"""
import pytest
import time
import sys
import os
# Ensure project root is in path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from src.api_hook_client import ApiHookClient
def wait_for_value(client, field, expected, timeout=10):
"""
Helper to poll the GUI state until a field matches the expected value.
"""
start = time.time()
while time.time() - start < timeout:
state = client.get_gui_state()
val = state.get(field)
if val == expected:
return True
time.sleep(0.5)
return False
@pytest.mark.integration
def test_full_live_workflow(live_gui) -> None:
"""
Integration test that drives the GUI through a full workflow.
ANTI-SIMPLIFICATION: Asserts exact AI behavior, thinking state tracking,
and response logging in discussion history.
"""
client = ApiHookClient()
assert client.wait_for_server(timeout=10)
client.post_session(session_entries=[])
# 0a. Wait for app warmup to complete. The warmup submits heavy-module
# import jobs directly to the io_pool (bypassing submit_io's counter);
# we wait for the warmup done event so SDK modules are guaranteed loaded
# before AI ops.
warmup_result = client.get_warmup_wait(timeout=60.0)
print(f"[TEST] Warmup result: {warmup_result}")
# 0b. Wait for any in-flight project switch to complete before starting.
# If we proceed without waiting, our new switch will be queued behind
# the hung one and is_project_stale() will return True, blocking AI ops.
pre_status = client.get_project_switch_status()
if pre_status.get("in_progress"):
print(f"\n[TEST] Waiting for prior project switch to complete: {pre_status}")
idle_status = client.wait_for_project_switch(timeout=60.0)
assert not idle_status.get("timeout"), (
f"Prior project switch did not complete in 60s. Aborting. "
f"Last status: {idle_status}"
)
print(f"[TEST] Prior switch done: {idle_status}")
# 1. Reset
print("\n[TEST] Clicking Reset...")
client.click("btn_reset")
time.sleep(1)
# 2. Project Setup
temp_project_path = os.path.abspath("tests/artifacts/temp_project.toml")
if os.path.exists(temp_project_path):
try: os.remove(temp_project_path)
except: pass
print(f"[TEST] Creating new project at {temp_project_path}...")
client.click("btn_project_new_automated", user_data=temp_project_path)
# Defensive: fail fast if the click was dropped or the handler crashed
# before writing the project file.
import time as _time
_start = _time.time()
while _time.time() - _start < 5.0:
if os.path.exists(temp_project_path):
break
_time.sleep(0.1)
assert os.path.exists(temp_project_path), (
f"temp_project.toml not created within 5s of click. "
f"Click may have been dropped or _cb_new_project_automated crashed."
)
# Wait for project switch to complete (deterministic, condition-based).
# Replaces the prior 10x1s blind poll of derived state.
status = client.wait_for_project_switch(expected_path=temp_project_path, timeout=30.0)
assert not status.get("timeout"), (
f"Project switch did not complete in 30s. Last status: {status}"
)
assert not status.get("error"), (
f"Project switch failed with error: {status.get('error')}"
)
test_git = os.path.abspath(".")
print(f"[TEST] Setting project_git_dir to {test_git}...")
client.set_value("project_git_dir", test_git)
assert wait_for_value(client, "project_git_dir", test_git)
client.click("btn_project_save")
time.sleep(1)
# Enable auto-add so the response ends up in history
client.set_value("auto_add_history", True)
client.set_value("current_provider", "gemini")
# USE gemini-2.0-flash-lite (Actual current model)
client.set_value("current_model", "gemini-2.5-flash-lite")
time.sleep(1)
# 3. Discussion Turn
print("[TEST] Sending AI request...")
client.set_value("ai_input", "Hello! This is an automated test. Just say 'Acknowledged'.")
client.click("btn_gen_send")
# Verify thinking indicator appears or ai_status changes
print("[TEST] Polling for thinking indicator...")
success = False
for i in range(20):
mma = client.get_mma_status()
ai_status = mma.get('ai_status')
print(f" Poll {i}: ai_status='{ai_status}'")
if ai_status == 'error':
state = client.get_gui_state()
pytest.fail(f"AI Status went to error during thinking poll. Response: {state.get('ai_response')}")
if ai_status == 'sending...' or ai_status == 'streaming...':
print(f" AI is sending/streaming at poll {i}")
success = True
# Don't break, keep watching for a bit
indicator = client.get_indicator_state("thinking_indicator")
if indicator.get('shown'):
print(f" Thinking indicator seen at poll {i}")
success = True
break
time.sleep(0.5)
# 4. Wait for response in session
success = False
print("[TEST] Waiting for AI response in session history...")
for i in range(60):
session = client.get_session()
entries = session.get('session', {}).get('entries', [])
# Check for AI role. The entries are objects with a 'role' key.
found_ai = any(str(e.get('role', '')).upper() == 'AI' for e in entries)
if found_ai:
success = True
print(f" AI response found in history after {i}s")
break
mma = client.get_mma_status()
if mma.get('ai_status') == 'error':
state = client.get_gui_state()
pytest.fail(f"AI Status went to error during response wait. Response: {state.get('ai_response')}")
time.sleep(1)
# FALLBACK: if not in entries yet, check if ai_response is populated and status is done
if not success:
mma = client.get_mma_status()
if mma.get('ai_status') == 'done' or mma.get('ai_status') == 'idle':
state = client.get_gui_state()
if state.get('ai_response'):
print("[TEST] AI response found in ai_response field (fallback)")
success = True
assert success, f"AI failed to respond. Entries: {client.get_session()}, Status: {client.get_mma_status()}"
# 5. Switch Discussion
print("[TEST] Creating new discussion 'AutoDisc'...")
client.set_value("disc_new_name_input", "AutoDisc")
client.click("btn_disc_create")
time.sleep(1.0)
print("[TEST] Switching to 'AutoDisc'...")
client.select_list_item("disc_listbox", "AutoDisc")
time.sleep(1.0)
# Verify session is empty in new discussion
session = client.get_session()
entries = session.get('session', {}).get('entries', [])
print(f" New discussion history length: {len(entries)}")
assert len(entries) == 0
print("[TEST] Workflow completed successfully.")