Files
manual_slop/tests/test_mma_concurrent_tracks_stress_sim.py
T
ed b043d06771 chore: add standard STATUS markers to worker streams and optimize test polling
This fixes the 'stuck' behavior in concurrent tests by ensuring the tests look for standard completion markers and don't wait for unnecessary timeouts.
2026-05-07 18:37:19 -04:00

99 lines
3.8 KiB
Python

import pytest
import time
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src")))
from src import api_hook_client
def _poll_mma_workers(client: api_hook_client.ApiHookClient, timeout: int, condition, label: str) -> tuple[bool, dict]:
"""Poll get_mma_workers() until condition(workers) is True or timeout."""
workers = {}
for i in range(timeout):
res = client.get_mma_workers() or {}
workers = res.get('workers', {})
print(f"[SIM][{label}] t={i}s active_workers={list(workers.keys())}")
if condition(workers):
return True, workers
time.sleep(1)
return False, workers
@pytest.mark.integration
@pytest.mark.timeout(600)
def test_mma_concurrent_tracks_stress(live_gui) -> None:
"""
Stress test: Start two tracks concurrently and verify they both progress
without crashing the GUI or losing state.
"""
client = api_hook_client.ApiHookClient()
assert client.wait_for_server(timeout=15), "Hook server did not start"
# 1. Setup mock provider
client.set_value('current_provider', 'gemini_cli')
client.set_value('gcli_path', f'"{sys.executable}" "{os.path.abspath("tests/mock_concurrent_mma.py")}"')
client.click('btn_project_save')
time.sleep(1.0)
# 2. Generate two tracks via Epic
client.set_value('mma_epic_input', 'STRESS TEST: TRACK A AND TRACK B')
client.click('btn_mma_plan_epic')
# Wait for tracks to be generated and accepted
# Note: Epic planning usually generates multiple tracks.
start = time.time()
while time.time() - start < 60:
status = client.get_mma_status()
if status.get('proposed_tracks'):
break
time.sleep(1)
client.click('btn_mma_accept_tracks')
time.sleep(2.0)
# 3. Get track IDs
status = client.get_mma_status()
tracks = status.get('tracks', [])
assert len(tracks) >= 2, f"Need at least 2 tracks for stress test, found {len(tracks)}"
track_id_a = tracks[0]['id']
track_id_b = tracks[1]['id']
# 4. Start both tracks
print(f"[SIM] Starting Track A: {track_id_a}")
client.click('btn_mma_load_track', user_data=track_id_a)
time.sleep(0.5)
client.click('btn_mma_start_track', user_data=track_id_a)
print(f"[SIM] Starting Track B: {track_id_b}")
client.click('btn_mma_load_track', user_data=track_id_b)
time.sleep(0.5)
client.click('btn_mma_start_track', user_data=track_id_b)
# 5. Verify workers from both tracks appear
# Workers are named "Tier 3 (Worker): <ticket_id>"
ok, workers = _poll_mma_workers(client, timeout=30, label="wait-workers",
condition=lambda w: any("ticket-A-1" in str(k) for k in w) and any("ticket-B-1" in str(k) for k in w))
# Note: ticket_id might not contain track_id directly, but the mock epic generator
# usually includes some identifier. If not, we just check for multiple Tier 3 workers.
if not ok:
print("[SIM] WARNING: Could not explicitly find both tracks in worker names, checking for multiple workers instead.")
ok, workers = _poll_mma_workers(client, timeout=30, label="wait-multi-workers",
condition=lambda w: sum(1 for k in w if "Tier 3" in k) >= 2)
assert ok, f"Did not see concurrent workers starting. Active: {list(workers.keys())}"
# 6. Wait for completion
print("[SIM] Waiting for all workers to finish...")
ok, workers = _poll_mma_workers(client, timeout=120, label="wait-completion",
condition=lambda w: all("[STATUS] COMPLETED" in str(v) or "FAILED" in str(v) or "BLOCKED" in str(v)
for k, v in w.items() if "Tier 3" in k) if any("Tier 3" in k for k in w) else False)
assert ok, f"Workers did not complete. Active: {list(workers.keys())}"
# Final check: GUI should still be responsive
res = client.get_status()
assert res.get('status') == 'ok', "GUI crashed during concurrent execution"
print("[SIM] MMA Concurrent Tracks stress test PASSED.")