import pytest import time import sys import os # Ensure project root is in path sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src"))) from src import api_hook_client def _poll_mma_status(client, timeout, condition, label): """Poll get_mma_status() until condition(status) is True or timeout.""" last_status = {} for i in range(timeout): status = client.get_mma_status() or {} if condition(status): return True, status last_status = status time.sleep(1) return False, last_status @pytest.mark.integration @pytest.mark.timeout(300) def test_mma_concurrent_tracks_execution(live_gui) -> None: """ Stress test for concurrent MMA track execution. Verifies that starting multiple tracks simultaneously doesn't cause crashes and that workers from both tracks are processed. """ client = api_hook_client.ApiHookClient() assert client.wait_for_server(timeout=15), "Hook server did not start" # 1. Setup provider to custom mock mock_path = os.path.abspath("tests/mock_concurrent_mma.py") client.set_value('current_provider', 'gemini_cli') client.set_value('gcli_path', f'"{sys.executable}" "{mock_path}"') client.click('btn_project_save') time.sleep(1.0) # 2. Plan Epic to generate tracks print("[SIM] Planning Epic...") client.set_value('mma_epic_input', 'PATH: Epic Initialization') client.click('btn_mma_plan_epic') ok, status = _poll_mma_status(client, 60, lambda s: len(s.get('proposed_tracks', [])) >= 2, "plan-epic") assert ok, f"Proposed tracks not found: {status.get('proposed_tracks')}" print(f"[SIM] Found {len(status['proposed_tracks'])} proposed tracks.") # 3. Accept tracks and wait for them to be created print("[SIM] Accepting tracks...") client.click('btn_mma_accept_tracks') ok, status = _poll_mma_status(client, 30, lambda s: len(s.get('tracks', [])) >= 2, "accept-tracks") assert ok, "Tracks not created in project" tracks = status.get('tracks', []) track_a_id = next(t['id'] for t in tracks if 'track-a' in t['id'] or 'Track A' in t['title']) track_b_id = next(t['id'] for t in tracks if 'track-b' in t['id'] or 'Track B' in t['title']) print(f"[SIM] Track IDs: A={track_a_id}, B={track_b_id}") # 4. Start BOTH tracks concurrently print(f"[SIM] Starting Track A ({track_a_id})...") client.click('btn_mma_start_track', user_data=track_a_id) # Tiny sleep to allow the first one to initialize its engine time.sleep(0.5) print(f"[SIM] Starting Track B ({track_b_id})...") client.click('btn_mma_start_track', user_data=track_b_id) # 5. Verify workers from BOTH tracks appear in mma_streams print("[SIM] Verifying concurrent worker activity...") seen_a = False seen_b = False for i in range(40): res = client.get_mma_workers() or {} workers = res.get('workers', {}) stream_ids = workers.keys() if any("ticket-A-1" in sid for sid in stream_ids): seen_a = True if any("ticket-B-1" in sid for sid in stream_ids): seen_b = True if seen_a and seen_b: print(f"[SIM] SUCCESS: Observed workers from both tracks at t={i}s") break time.sleep(1) if i % 5 == 0: print(f"[SIM] t={i}s: seen_a={seen_a}, seen_b={seen_b}, active_streams={list(stream_ids)}") assert seen_a, "Worker from Track A never appeared in mma_streams" assert seen_b, "Worker from Track B never appeared in mma_streams" # 6. Wait for both tracks to complete print("[SIM] Waiting for tracks to complete...") # Each track has 1 ticket in our mock. completed_a = False completed_b = False for i in range(60): res = client.get_mma_workers() or {} workers = res.get('workers', {}) # Check stream content for completion marker from our mock for sid, text in workers.items(): if "ticket-A-1" in sid: if "Done." in text: completed_a = True else: if i % 10 == 0: print(f"[SIM] t={i}s: Stream A: {text[:50]}...") if "ticket-B-1" in sid: if "Done." in text: completed_b = True else: if i % 10 == 0: print(f"[SIM] t={i}s: Stream B: {text[:50]}...") if completed_a and completed_b: print(f"[SIM] Both tracks completed at t={i}s") break time.sleep(1) assert completed_a, "Track A did not complete" assert completed_b, "Track B did not complete" # 7. Final stability check - ensure no crashes occurred in Hook Server status = client.get_mma_status() assert status is not None assert status.get('mma_status') in ['done', 'idle', 'running'] print("[SIM] Concurrent MMA tracks stress test PASSED.")