Private
Public Access
0
0
Files
manual_slop/tests/test_live_workflow.py
T
ed c9a991bbb8 test(live_workflow): bump project switch wait timeout 30s -> 120s
The 30s wait_for_project_switch timeout was an excessive constraint.
In batch context, prior sims' AI discussion turn workers saturate the
8-worker io_pool, queueing this switch for tens of seconds. The other
defensive waits in the test (warmup 60s, prior switch 60s) already use
60s+, so 30s was the inconsistent outlier.

User confirmed: 'I think not completing in 30s is an excessive constraint
if thats whats going on.'

Verification:
- test_full_live_workflow isolation: 11.69s PASS
- 7-test batch (test_full_live_workflow + 4 extended sims + 2 markdown): 85.83s PASS
2026-06-08 18:14:18 -04:00

191 lines
6.6 KiB
Python

"""
ANTI-SIMPLIFICATION: These tests verify the end-to-end full live workflow.
They MUST NOT be simplified. They depend on exact execution states and timing
through the actual GUI and ApiHookClient interface.
"""
import pytest
import time
import sys
import os
# Ensure project root is in path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from src.api_hook_client import ApiHookClient
def wait_for_value(client, field, expected, timeout=10):
"""
Helper to poll the GUI state until a field matches the expected value.
"""
start = time.time()
while time.time() - start < timeout:
state = client.get_gui_state()
val = state.get(field)
if val == expected:
return True
time.sleep(0.5)
return False
@pytest.mark.integration
def test_full_live_workflow(live_gui) -> None:
"""
Integration test that drives the GUI through a full workflow.
ANTI-SIMPLIFICATION: Asserts exact AI behavior, thinking state tracking,
and response logging in discussion history.
"""
client = ApiHookClient()
assert client.wait_for_server(timeout=10)
client.post_session(session_entries=[])
# 0a. Wait for app warmup to complete. The warmup submits heavy-module
# import jobs directly to the io_pool (bypassing submit_io's counter);
# we wait for the warmup done event so SDK modules are guaranteed loaded
# before AI ops.
warmup_result = client.get_warmup_wait(timeout=60.0)
print(f"[TEST] Warmup result: {warmup_result}")
# 0b. Wait for any in-flight project switch to complete before starting.
# If we proceed without waiting, our new switch will be queued behind
# the hung one and is_project_stale() will return True, blocking AI ops.
pre_status = client.get_project_switch_status()
if pre_status.get("in_progress"):
print(f"\n[TEST] Waiting for prior project switch to complete: {pre_status}")
idle_status = client.wait_for_project_switch(timeout=60.0)
assert not idle_status.get("timeout"), (
f"Prior project switch did not complete in 60s. Aborting. "
f"Last status: {idle_status}"
)
print(f"[TEST] Prior switch done: {idle_status}")
# 1. Reset
print("\n[TEST] Clicking Reset...")
client.click("btn_reset")
time.sleep(1)
# 2. Project Setup
temp_project_path = os.path.abspath("tests/artifacts/temp_project.toml")
if os.path.exists(temp_project_path):
try: os.remove(temp_project_path)
except: pass
print(f"[TEST] Creating new project at {temp_project_path}...")
client.click("btn_project_new_automated", user_data=temp_project_path)
# Defensive: fail fast if the click was dropped or the handler crashed
# before writing the project file.
import time as _time
_start = _time.time()
while _time.time() - _start < 5.0:
if os.path.exists(temp_project_path):
break
_time.sleep(0.1)
assert os.path.exists(temp_project_path), (
f"temp_project.toml not created within 5s of click. "
f"Click may have been dropped or _cb_new_project_automated crashed."
)
# Wait for project switch to complete (deterministic, condition-based).
# Replaces the prior 10x1s blind poll of derived state.
# Timeout is 120s: in batch context, prior sims' AI discussion turn workers
# can saturate the 8-worker io_pool, queueing this switch for tens of seconds.
status = client.wait_for_project_switch(expected_path=temp_project_path, timeout=120.0)
assert not status.get("timeout"), (
f"Project switch did not complete in 30s. Last status: {status}"
)
assert not status.get("error"), (
f"Project switch failed with error: {status.get('error')}"
)
test_git = os.path.abspath(".")
print(f"[TEST] Setting project_git_dir to {test_git}...")
client.set_value("project_git_dir", test_git)
assert wait_for_value(client, "project_git_dir", test_git)
client.click("btn_project_save")
time.sleep(1)
# Enable auto-add so the response ends up in history
client.set_value("auto_add_history", True)
client.set_value("current_provider", "gemini")
# USE gemini-2.0-flash-lite (Actual current model)
client.set_value("current_model", "gemini-2.5-flash-lite")
time.sleep(1)
# 3. Discussion Turn
print("[TEST] Sending AI request...")
client.set_value("ai_input", "Hello! This is an automated test. Just say 'Acknowledged'.")
client.click("btn_gen_send")
# Verify thinking indicator appears or ai_status changes
print("[TEST] Polling for thinking indicator...")
success = False
for i in range(20):
mma = client.get_mma_status()
ai_status = mma.get('ai_status')
print(f" Poll {i}: ai_status='{ai_status}'")
if ai_status == 'error':
state = client.get_gui_state()
pytest.fail(f"AI Status went to error during thinking poll. Response: {state.get('ai_response')}")
if ai_status == 'sending...' or ai_status == 'streaming...':
print(f" AI is sending/streaming at poll {i}")
success = True
# Don't break, keep watching for a bit
indicator = client.get_indicator_state("thinking_indicator")
if indicator.get('shown'):
print(f" Thinking indicator seen at poll {i}")
success = True
break
time.sleep(0.5)
# 4. Wait for response in session
success = False
print("[TEST] Waiting for AI response in session history...")
for i in range(60):
session = client.get_session()
entries = session.get('session', {}).get('entries', [])
# Check for AI role. The entries are objects with a 'role' key.
found_ai = any(str(e.get('role', '')).upper() == 'AI' for e in entries)
if found_ai:
success = True
print(f" AI response found in history after {i}s")
break
mma = client.get_mma_status()
if mma.get('ai_status') == 'error':
state = client.get_gui_state()
pytest.fail(f"AI Status went to error during response wait. Response: {state.get('ai_response')}")
time.sleep(1)
# FALLBACK: if not in entries yet, check if ai_response is populated and status is done
if not success:
mma = client.get_mma_status()
if mma.get('ai_status') == 'done' or mma.get('ai_status') == 'idle':
state = client.get_gui_state()
if state.get('ai_response'):
print("[TEST] AI response found in ai_response field (fallback)")
success = True
assert success, f"AI failed to respond. Entries: {client.get_session()}, Status: {client.get_mma_status()}"
# 5. Switch Discussion
print("[TEST] Creating new discussion 'AutoDisc'...")
client.set_value("disc_new_name_input", "AutoDisc")
client.click("btn_disc_create")
time.sleep(1.0)
print("[TEST] Switching to 'AutoDisc'...")
client.select_list_item("disc_listbox", "AutoDisc")
time.sleep(1.0)
# Verify session is empty in new discussion
session = client.get_session()
entries = session.get('session', {}).get('entries', [])
print(f" New discussion history length: {len(entries)}")
assert len(entries) == 0
print("[TEST] Workflow completed successfully.")