Files
manual_slop/tests/test_mma_concurrent_tracks_sim.py
T
2026-05-02 19:00:40 -04:00

136 lines
4.5 KiB
Python

import pytest
import time
import sys
import os
# Ensure project root is in path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src")))
from src import api_hook_client
def _poll_mma_status(client, timeout, condition, label):
"""Poll get_mma_status() until condition(status) is True or timeout."""
last_status = {}
for i in range(timeout):
status = client.get_mma_status() or {}
if condition(status):
return True, status
last_status = status
time.sleep(1)
return False, last_status
@pytest.mark.integration
@pytest.mark.timeout(300)
def test_mma_concurrent_tracks_execution(live_gui) -> None:
"""
Stress test for concurrent MMA track execution.
Verifies that starting multiple tracks simultaneously doesn't cause crashes
and that workers from both tracks are processed.
"""
client = api_hook_client.ApiHookClient()
assert client.wait_for_server(timeout=15), "Hook server did not start"
# 1. Setup provider to custom mock
mock_path = os.path.abspath("tests/mock_concurrent_mma.py")
client.set_value('current_provider', 'gemini_cli')
client.set_value('gcli_path', f'"{sys.executable}" "{mock_path}"')
client.click('btn_project_save')
time.sleep(1.0)
# 2. Plan Epic to generate tracks
print("[SIM] Planning Epic...")
client.set_value('mma_epic_input', 'PATH: Epic Initialization')
client.click('btn_mma_plan_epic')
ok, status = _poll_mma_status(client, 60,
lambda s: len(s.get('proposed_tracks', [])) >= 2, "plan-epic")
assert ok, f"Proposed tracks not found: {status.get('proposed_tracks')}"
print(f"[SIM] Found {len(status['proposed_tracks'])} proposed tracks.")
# 3. Accept tracks and wait for them to be created
print("[SIM] Accepting tracks...")
client.click('btn_mma_accept_tracks')
ok, status = _poll_mma_status(client, 30,
lambda s: len(s.get('tracks', [])) >= 2, "accept-tracks")
assert ok, "Tracks not created in project"
tracks = status.get('tracks', [])
track_a_id = next(t['id'] for t in tracks if 'track-a' in t['id'] or 'Track A' in t['title'])
track_b_id = next(t['id'] for t in tracks if 'track-b' in t['id'] or 'Track B' in t['title'])
print(f"[SIM] Track IDs: A={track_a_id}, B={track_b_id}")
# 4. Start BOTH tracks concurrently
print(f"[SIM] Starting Track A ({track_a_id})...")
client.click('btn_mma_start_track', user_data=track_a_id)
# Tiny sleep to allow the first one to initialize its engine
time.sleep(0.5)
print(f"[SIM] Starting Track B ({track_b_id})...")
client.click('btn_mma_start_track', user_data=track_b_id)
# 5. Verify workers from BOTH tracks appear in mma_streams
print("[SIM] Verifying concurrent worker activity...")
seen_a = False
seen_b = False
for i in range(40):
res = client.get_mma_workers() or {}
workers = res.get('workers', {})
stream_ids = workers.keys()
if any("ticket-A-1" in sid for sid in stream_ids):
seen_a = True
if any("ticket-B-1" in sid for sid in stream_ids):
seen_b = True
if seen_a and seen_b:
print(f"[SIM] SUCCESS: Observed workers from both tracks at t={i}s")
break
time.sleep(1)
if i % 5 == 0:
print(f"[SIM] t={i}s: seen_a={seen_a}, seen_b={seen_b}, active_streams={list(stream_ids)}")
assert seen_a, "Worker from Track A never appeared in mma_streams"
assert seen_b, "Worker from Track B never appeared in mma_streams"
# 6. Wait for both tracks to complete
print("[SIM] Waiting for tracks to complete...")
# Each track has 1 ticket in our mock.
completed_a = False
completed_b = False
for i in range(60):
res = client.get_mma_workers() or {}
workers = res.get('workers', {})
# Check stream content for completion marker from our mock
for sid, text in workers.items():
if "ticket-A-1" in sid:
if "Done." in text:
completed_a = True
else:
if i % 10 == 0: print(f"[SIM] t={i}s: Stream A: {text[:50]}...")
if "ticket-B-1" in sid:
if "Done." in text:
completed_b = True
else:
if i % 10 == 0: print(f"[SIM] t={i}s: Stream B: {text[:50]}...")
if completed_a and completed_b:
print(f"[SIM] Both tracks completed at t={i}s")
break
time.sleep(1)
assert completed_a, "Track A did not complete"
assert completed_b, "Track B did not complete"
# 7. Final stability check - ensure no crashes occurred in Hook Server
status = client.get_mma_status()
assert status is not None
assert status.get('mma_status') in ['done', 'idle', 'running']
print("[SIM] Concurrent MMA tracks stress test PASSED.")