Private
Public Access
0
0
Files
manual_slop/tests/test_live_workflow.py
T
ed 51ecace464 test(live_workflow): pre-flight health check fails fast on dirty state
PR3 of the test_full_live_workflow_imgui_assert fix sequence.

When a prior live_gui test in the same session crashes the GUI (e.g.
via an ImGui IM_ASSERT from cumulative panel state), the controller's
_io_pool gets shut down. The next test starts in a degraded state
but only discovers this 120s later when its project switch times
out with a confusing 'cannot schedule new futures after shutdown'
error.

This commit adds a /api/gui_health pre-flight check at the start of
test_full_live_workflow. If the GUI is degraded, the test fails
fast (within 1s) with a clear, actionable message that includes:
- The exact RuntimeError that caused the degradation
- The full traceback of the last ImGui scope mismatch
- A note that the new test cannot proceed with a dirty state

Per user feedback 2026-06-08: 'I don't want a batch to be too fragile
where I can't restart the app and continue with the next test file
if it fails. Just has to note that the new file didn't get to deal
with a dirty state.'

Also includes the planning documents written earlier in this session:
- TODO_test_full_live_workflow_v2.md (task list)
- test_full_live_workflow_imgui_assert_20260608.md (root cause report)
- test_full_live_workflow_propagation_digest_20260608.md (solutions digest)
- batch_resilience_plan_20260608.md (batch resilience plan)

Verification:
- test_full_live_workflow in isolation: 13.45s PASS (health=True, no degrade)
- 4 sims + test_full_live_workflow in batch: 76.46s (1 FAIL fast, 4 sims PASS)
  - Without PR3 fix: 200s FAIL with confusing 120s timeout
  - With PR3 fix: 76s FAIL with clear 'GUI is degraded' message
- The fast-fail is observable, not silent (per user's 'wrap might be
  worth it if that properly lets us handle the assert')
2026-06-08 21:17:54 -04:00

207 lines
7.4 KiB
Python

"""
ANTI-SIMPLIFICATION: These tests verify the end-to-end full live workflow.
They MUST NOT be simplified. They depend on exact execution states and timing
through the actual GUI and ApiHookClient interface.
"""
import pytest
import time
import sys
import os
# Ensure project root is in path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from src.api_hook_client import ApiHookClient
def wait_for_value(client, field, expected, timeout=10):
"""
Helper to poll the GUI state until a field matches the expected value.
"""
start = time.time()
while time.time() - start < timeout:
state = client.get_gui_state()
val = state.get(field)
if val == expected:
return True
time.sleep(0.5)
return False
@pytest.mark.integration
def test_full_live_workflow(live_gui) -> None:
"""
Integration test that drives the GUI through a full workflow.
ANTI-SIMPLIFICATION: Asserts exact AI behavior, thinking state tracking,
and response logging in discussion history.
"""
client = ApiHookClient()
assert client.wait_for_server(timeout=10)
# 00. Pre-flight health check. If the live_gui subprocess was left in
# a degraded state by a prior test (e.g. an ImGui IM_ASSERT crashed
# the GUI main loop, shutting down the controller's _io_pool), fail
# fast with a clear message instead of waiting 120s for a switch
# that can never complete. Per user feedback 2026-06-08: the test
# should "note that the new file didn't get to deal with a dirty state"
# rather than silently time out.
health = client.get_gui_health()
if not health.get("healthy"):
pytest.fail(
f"GUI is degraded before test starts. "
f"degraded_reason={health.get('degraded_reason')!r}, "
f"last_assert={health.get('last_assert')!r}. "
f"This is likely caused by a prior test in the same live_gui session "
f"crashing the GUI. The new test cannot proceed with a dirty state."
)
client.post_session(session_entries=[])
# 0a. Wait for app warmup to complete. The warmup submits heavy-module
# import jobs directly to the io_pool (bypassing submit_io's counter);
# we wait for the warmup done event so SDK modules are guaranteed loaded
# before AI ops.
warmup_result = client.get_warmup_wait(timeout=60.0)
print(f"[TEST] Warmup result: {warmup_result}")
# 0b. Wait for any in-flight project switch to complete before starting.
# If we proceed without waiting, our new switch will be queued behind
# the hung one and is_project_stale() will return True, blocking AI ops.
pre_status = client.get_project_switch_status()
if pre_status.get("in_progress"):
print(f"\n[TEST] Waiting for prior project switch to complete: {pre_status}")
idle_status = client.wait_for_project_switch(timeout=60.0)
assert not idle_status.get("timeout"), (
f"Prior project switch did not complete in 60s. Aborting. "
f"Last status: {idle_status}"
)
print(f"[TEST] Prior switch done: {idle_status}")
# 1. Reset
print("\n[TEST] Clicking Reset...")
client.click("btn_reset")
time.sleep(1)
# 2. Project Setup
temp_project_path = os.path.abspath("tests/artifacts/temp_project.toml")
if os.path.exists(temp_project_path):
try: os.remove(temp_project_path)
except: pass
print(f"[TEST] Creating new project at {temp_project_path}...")
client.click("btn_project_new_automated", user_data=temp_project_path)
# Defensive: fail fast if the click was dropped or the handler crashed
# before writing the project file.
import time as _time
_start = _time.time()
while _time.time() - _start < 5.0:
if os.path.exists(temp_project_path):
break
_time.sleep(0.1)
assert os.path.exists(temp_project_path), (
f"temp_project.toml not created within 5s of click. "
f"Click may have been dropped or _cb_new_project_automated crashed."
)
# Wait for project switch to complete (deterministic, condition-based).
# Replaces the prior 10x1s blind poll of derived state.
# Timeout is 120s: in batch context, prior sims' AI discussion turn workers
# can saturate the 8-worker io_pool, queueing this switch for tens of seconds.
status = client.wait_for_project_switch(expected_path=temp_project_path, timeout=120.0)
assert not status.get("timeout"), (
f"Project switch did not complete in 30s. Last status: {status}"
)
assert not status.get("error"), (
f"Project switch failed with error: {status.get('error')}"
)
test_git = os.path.abspath(".")
print(f"[TEST] Setting project_git_dir to {test_git}...")
client.set_value("project_git_dir", test_git)
assert wait_for_value(client, "project_git_dir", test_git)
client.click("btn_project_save")
time.sleep(1)
# Enable auto-add so the response ends up in history
client.set_value("auto_add_history", True)
client.set_value("current_provider", "gemini")
# USE gemini-2.0-flash-lite (Actual current model)
client.set_value("current_model", "gemini-2.5-flash-lite")
time.sleep(1)
# 3. Discussion Turn
print("[TEST] Sending AI request...")
client.set_value("ai_input", "Hello! This is an automated test. Just say 'Acknowledged'.")
client.click("btn_gen_send")
# Verify thinking indicator appears or ai_status changes
print("[TEST] Polling for thinking indicator...")
success = False
for i in range(20):
mma = client.get_mma_status()
ai_status = mma.get('ai_status')
print(f" Poll {i}: ai_status='{ai_status}'")
if ai_status == 'error':
state = client.get_gui_state()
pytest.fail(f"AI Status went to error during thinking poll. Response: {state.get('ai_response')}")
if ai_status == 'sending...' or ai_status == 'streaming...':
print(f" AI is sending/streaming at poll {i}")
success = True
# Don't break, keep watching for a bit
indicator = client.get_indicator_state("thinking_indicator")
if indicator.get('shown'):
print(f" Thinking indicator seen at poll {i}")
success = True
break
time.sleep(0.5)
# 4. Wait for response in session
success = False
print("[TEST] Waiting for AI response in session history...")
for i in range(60):
session = client.get_session()
entries = session.get('session', {}).get('entries', [])
# Check for AI role. The entries are objects with a 'role' key.
found_ai = any(str(e.get('role', '')).upper() == 'AI' for e in entries)
if found_ai:
success = True
print(f" AI response found in history after {i}s")
break
mma = client.get_mma_status()
if mma.get('ai_status') == 'error':
state = client.get_gui_state()
pytest.fail(f"AI Status went to error during response wait. Response: {state.get('ai_response')}")
time.sleep(1)
# FALLBACK: if not in entries yet, check if ai_response is populated and status is done
if not success:
mma = client.get_mma_status()
if mma.get('ai_status') == 'done' or mma.get('ai_status') == 'idle':
state = client.get_gui_state()
if state.get('ai_response'):
print("[TEST] AI response found in ai_response field (fallback)")
success = True
assert success, f"AI failed to respond. Entries: {client.get_session()}, Status: {client.get_mma_status()}"
# 5. Switch Discussion
print("[TEST] Creating new discussion 'AutoDisc'...")
client.set_value("disc_new_name_input", "AutoDisc")
client.click("btn_disc_create")
time.sleep(1.0)
print("[TEST] Switching to 'AutoDisc'...")
client.select_list_item("disc_listbox", "AutoDisc")
time.sleep(1.0)
# Verify session is empty in new discussion
session = client.get_session()
entries = session.get('session', {}).get('entries', [])
print(f" New discussion history length: {len(entries)}")
assert len(entries) == 0
print("[TEST] Workflow completed successfully.")