Files
manual_slop/tests/test_mma_concurrent_tracks_sim.py
T
ed b043d06771 chore: add standard STATUS markers to worker streams and optimize test polling
This fixes the 'stuck' behavior in concurrent tests by ensuring the tests look for standard completion markers and don't wait for unnecessary timeouts.
2026-05-07 18:37:19 -04:00

136 lines
4.5 KiB
Python

import pytest
import time
import sys
import os
# Ensure project root is in path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src")))
from src import api_hook_client
def _poll_mma_status(client, timeout, condition, label):
"""Poll get_mma_status() until condition(status) is True or timeout."""
last_status = {}
for i in range(timeout):
status = client.get_mma_status() or {}
if condition(status):
return True, status
last_status = status
time.sleep(1)
return False, last_status
@pytest.mark.integration
@pytest.mark.timeout(300)
def test_mma_concurrent_tracks_execution(live_gui) -> None:
"""
Stress test for concurrent MMA track execution.
Verifies that starting multiple tracks simultaneously doesn't cause crashes
and that workers from both tracks are processed.
"""
client = api_hook_client.ApiHookClient()
assert client.wait_for_server(timeout=15), "Hook server did not start"
# 1. Setup provider to custom mock
mock_path = os.path.abspath("tests/mock_concurrent_mma.py")
client.set_value('current_provider', 'gemini_cli')
client.set_value('gcli_path', f'"{sys.executable}" "{mock_path}"')
client.click('btn_project_save')
time.sleep(1.0)
# 2. Plan Epic to generate tracks
print("[SIM] Planning Epic...")
client.set_value('mma_epic_input', 'PATH: Epic Initialization')
client.click('btn_mma_plan_epic')
ok, status = _poll_mma_status(client, 60,
lambda s: len(s.get('proposed_tracks', [])) >= 2, "plan-epic")
assert ok, f"Proposed tracks not found: {status.get('proposed_tracks')}"
print(f"[SIM] Found {len(status['proposed_tracks'])} proposed tracks.")
# 3. Accept tracks and wait for them to be created
print("[SIM] Accepting tracks...")
client.click('btn_mma_accept_tracks')
ok, status = _poll_mma_status(client, 30,
lambda s: len(s.get('tracks', [])) >= 2, "accept-tracks")
assert ok, "Tracks not created in project"
tracks = status.get('tracks', [])
track_a_id = next(t['id'] for t in tracks if 'track-a' in t['id'] or 'Track A' in t['title'])
track_b_id = next(t['id'] for t in tracks if 'track-b' in t['id'] or 'Track B' in t['title'])
print(f"[SIM] Track IDs: A={track_a_id}, B={track_b_id}")
# 4. Start BOTH tracks concurrently
print(f"[SIM] Starting Track A ({track_a_id})...")
client.click('btn_mma_start_track', user_data=track_a_id)
# Tiny sleep to allow the first one to initialize its engine
time.sleep(0.5)
print(f"[SIM] Starting Track B ({track_b_id})...")
client.click('btn_mma_start_track', user_data=track_b_id)
# 5. Verify workers from BOTH tracks appear in mma_streams
print("[SIM] Verifying concurrent worker activity...")
seen_a = False
seen_b = False
for i in range(40):
res = client.get_mma_workers() or {}
workers = res.get('workers', {})
stream_ids = workers.keys()
if any("ticket-A-1" in sid for sid in stream_ids):
seen_a = True
if any("ticket-B-1" in sid for sid in stream_ids):
seen_b = True
if seen_a and seen_b:
print(f"[SIM] SUCCESS: Observed workers from both tracks at t={i}s")
break
time.sleep(1)
if i % 5 == 0:
print(f"[SIM] t={i}s: seen_a={seen_a}, seen_b={seen_b}, active_streams={list(stream_ids)}")
assert seen_a, "Worker from Track A never appeared in mma_streams"
assert seen_b, "Worker from Track B never appeared in mma_streams"
# 6. Wait for both tracks to complete
print("[SIM] Waiting for tracks to complete...")
# Each track has 1 ticket in our mock.
completed_a = False
completed_b = False
for i in range(60):
res = client.get_mma_workers() or {}
workers = res.get('workers', {})
# Check stream content for completion marker from our mock
for sid, text in workers.items():
if "ticket-A-1" in sid:
if "Done." in text or "[STATUS] COMPLETED" in text:
completed_a = True
else:
if i % 10 == 0: print(f"[SIM] t={i}s: Stream A: {text[:50]}...")
if "ticket-B-1" in sid:
if "Done." in text or "[STATUS] COMPLETED" in text:
completed_b = True
else:
if i % 10 == 0: print(f"[SIM] t={i}s: Stream B: {text[:50]}...")
if completed_a and completed_b:
print(f"[SIM] Both tracks completed at t={i}s")
break
time.sleep(1)
assert completed_a, "Track A did not complete"
assert completed_b, "Track B did not complete"
# 7. Final stability check - ensure no crashes occurred in Hook Server
status = client.get_mma_status()
assert status is not None
assert status.get('mma_status') in ['done', 'idle', 'running']
print("[SIM] Concurrent MMA tracks stress test PASSED.")