b043d06771
This fixes the 'stuck' behavior in concurrent tests by ensuring the tests look for standard completion markers and don't wait for unnecessary timeouts.
136 lines
4.5 KiB
Python
136 lines
4.5 KiB
Python
import pytest
|
|
import time
|
|
import sys
|
|
import os
|
|
|
|
# Ensure project root is in path
|
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
|
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src")))
|
|
|
|
from src import api_hook_client
|
|
|
|
def _poll_mma_status(client, timeout, condition, label):
|
|
"""Poll get_mma_status() until condition(status) is True or timeout."""
|
|
last_status = {}
|
|
for i in range(timeout):
|
|
status = client.get_mma_status() or {}
|
|
if condition(status):
|
|
return True, status
|
|
last_status = status
|
|
time.sleep(1)
|
|
return False, last_status
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.timeout(300)
|
|
def test_mma_concurrent_tracks_execution(live_gui) -> None:
|
|
"""
|
|
Stress test for concurrent MMA track execution.
|
|
Verifies that starting multiple tracks simultaneously doesn't cause crashes
|
|
and that workers from both tracks are processed.
|
|
"""
|
|
client = api_hook_client.ApiHookClient()
|
|
assert client.wait_for_server(timeout=15), "Hook server did not start"
|
|
|
|
# 1. Setup provider to custom mock
|
|
mock_path = os.path.abspath("tests/mock_concurrent_mma.py")
|
|
client.set_value('current_provider', 'gemini_cli')
|
|
client.set_value('gcli_path', f'"{sys.executable}" "{mock_path}"')
|
|
client.click('btn_project_save')
|
|
time.sleep(1.0)
|
|
|
|
# 2. Plan Epic to generate tracks
|
|
print("[SIM] Planning Epic...")
|
|
client.set_value('mma_epic_input', 'PATH: Epic Initialization')
|
|
client.click('btn_mma_plan_epic')
|
|
|
|
ok, status = _poll_mma_status(client, 60,
|
|
lambda s: len(s.get('proposed_tracks', [])) >= 2, "plan-epic")
|
|
assert ok, f"Proposed tracks not found: {status.get('proposed_tracks')}"
|
|
print(f"[SIM] Found {len(status['proposed_tracks'])} proposed tracks.")
|
|
|
|
# 3. Accept tracks and wait for them to be created
|
|
print("[SIM] Accepting tracks...")
|
|
client.click('btn_mma_accept_tracks')
|
|
ok, status = _poll_mma_status(client, 30,
|
|
lambda s: len(s.get('tracks', [])) >= 2, "accept-tracks")
|
|
assert ok, "Tracks not created in project"
|
|
|
|
tracks = status.get('tracks', [])
|
|
track_a_id = next(t['id'] for t in tracks if 'track-a' in t['id'] or 'Track A' in t['title'])
|
|
track_b_id = next(t['id'] for t in tracks if 'track-b' in t['id'] or 'Track B' in t['title'])
|
|
print(f"[SIM] Track IDs: A={track_a_id}, B={track_b_id}")
|
|
|
|
# 4. Start BOTH tracks concurrently
|
|
print(f"[SIM] Starting Track A ({track_a_id})...")
|
|
client.click('btn_mma_start_track', user_data=track_a_id)
|
|
|
|
# Tiny sleep to allow the first one to initialize its engine
|
|
time.sleep(0.5)
|
|
|
|
print(f"[SIM] Starting Track B ({track_b_id})...")
|
|
client.click('btn_mma_start_track', user_data=track_b_id)
|
|
|
|
# 5. Verify workers from BOTH tracks appear in mma_streams
|
|
print("[SIM] Verifying concurrent worker activity...")
|
|
seen_a = False
|
|
seen_b = False
|
|
|
|
for i in range(40):
|
|
res = client.get_mma_workers() or {}
|
|
workers = res.get('workers', {})
|
|
stream_ids = workers.keys()
|
|
|
|
if any("ticket-A-1" in sid for sid in stream_ids):
|
|
seen_a = True
|
|
if any("ticket-B-1" in sid for sid in stream_ids):
|
|
seen_b = True
|
|
|
|
if seen_a and seen_b:
|
|
print(f"[SIM] SUCCESS: Observed workers from both tracks at t={i}s")
|
|
break
|
|
|
|
time.sleep(1)
|
|
if i % 5 == 0:
|
|
print(f"[SIM] t={i}s: seen_a={seen_a}, seen_b={seen_b}, active_streams={list(stream_ids)}")
|
|
|
|
assert seen_a, "Worker from Track A never appeared in mma_streams"
|
|
assert seen_b, "Worker from Track B never appeared in mma_streams"
|
|
|
|
# 6. Wait for both tracks to complete
|
|
print("[SIM] Waiting for tracks to complete...")
|
|
# Each track has 1 ticket in our mock.
|
|
completed_a = False
|
|
completed_b = False
|
|
|
|
for i in range(60):
|
|
res = client.get_mma_workers() or {}
|
|
workers = res.get('workers', {})
|
|
|
|
# Check stream content for completion marker from our mock
|
|
for sid, text in workers.items():
|
|
if "ticket-A-1" in sid:
|
|
if "Done." in text or "[STATUS] COMPLETED" in text:
|
|
completed_a = True
|
|
else:
|
|
if i % 10 == 0: print(f"[SIM] t={i}s: Stream A: {text[:50]}...")
|
|
if "ticket-B-1" in sid:
|
|
if "Done." in text or "[STATUS] COMPLETED" in text:
|
|
completed_b = True
|
|
else:
|
|
if i % 10 == 0: print(f"[SIM] t={i}s: Stream B: {text[:50]}...")
|
|
|
|
if completed_a and completed_b:
|
|
print(f"[SIM] Both tracks completed at t={i}s")
|
|
break
|
|
time.sleep(1)
|
|
|
|
assert completed_a, "Track A did not complete"
|
|
assert completed_b, "Track B did not complete"
|
|
|
|
# 7. Final stability check - ensure no crashes occurred in Hook Server
|
|
status = client.get_mma_status()
|
|
assert status is not None
|
|
assert status.get('mma_status') in ['done', 'idle', 'running']
|
|
|
|
print("[SIM] Concurrent MMA tracks stress test PASSED.")
|