import pytest import time import sys import os sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from api_hook_client import ApiHookClient # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _drain_approvals(client: ApiHookClient, status: dict) -> None: """Auto-approve any pending approval gate found in status.""" if status.get('pending_mma_spawn_approval'): print('[SIM] Approving pending spawn...') client.click('btn_approve_spawn') time.sleep(0.5) elif status.get('pending_mma_step_approval'): print('[SIM] Approving pending MMA step...') client.click('btn_approve_mma_step') time.sleep(0.5) elif status.get('pending_tool_approval'): print('[SIM] Approving pending tool...') client.click('btn_approve_tool') time.sleep(0.5) elif status.get('pending_script_approval'): print('[SIM] Approving pending PowerShell script...') client.click('btn_approve_script') time.sleep(0.5) def _poll(client: ApiHookClient, timeout: int, condition, label: str) -> tuple[bool, dict]: """Poll get_mma_status() until condition(status) is True or timeout.""" status = {} for i in range(timeout): status = client.get_mma_status() or {} print(f"[SIM][{label}] t={i}s ai_status={status.get('ai_status')} " f"mma={status.get('mma_status')} " f"streams={list(status.get('mma_streams', {}).keys())}") _drain_approvals(client, status) if condition(status): return True, status time.sleep(1) return False, status # --------------------------------------------------------------------------- # Test # --------------------------------------------------------------------------- @pytest.mark.integration @pytest.mark.timeout(300) def test_mma_complete_lifecycle(live_gui) -> None: """ End-to-end MMA lifecycle using real Gemini API (gemini-2.5-flash-lite). Incorporates frame-sync sleeps and explicit state-transition waits per simulation_hardening_20260301 spec (Issues 2 & 3). """ client = ApiHookClient() assert client.wait_for_server(timeout=15), "Hook server did not start" # ------------------------------------------------------------------ # Stage 1: Provider setup # ------------------------------------------------------------------ client.set_value('current_provider', 'gemini') time.sleep(0.3) client.set_value('current_model', 'gemini-2.5-flash-lite') time.sleep(0.3) client.set_value('files_base_dir', 'tests/artifacts/temp_workspace') time.sleep(0.3) client.click('btn_project_save') time.sleep(1.0) # one full second — let GUI process all set_value tasks # ------------------------------------------------------------------ # Stage 2: Start epic planning # ------------------------------------------------------------------ # Keep prompt short and simple so the model returns minimal JSON client.set_value('mma_epic_input', 'Add a hello_world greeting function to the project') time.sleep(0.3) client.click('btn_mma_plan_epic') time.sleep(0.5) # frame-sync after click # ------------------------------------------------------------------ # Stage 3: Wait for proposed_tracks to appear (Tier 1 call) # ------------------------------------------------------------------ ok, status = _poll(client, timeout=120, label="wait-proposed-tracks", condition=lambda s: bool(s.get('proposed_tracks'))) assert ok, ( f"No proposed_tracks after 120s. " f"ai_status={status.get('ai_status')} " f"mma_streams={list(status.get('mma_streams', {}).keys())}" ) n_proposed = len(status['proposed_tracks']) print(f"[SIM] Got {n_proposed} proposed track(s): " f"{[t.get('title', t.get('id')) for t in status['proposed_tracks']]}") # ------------------------------------------------------------------ # Stage 4: Accept tracks (triggers Tier 2 calls + engine.run) # ------------------------------------------------------------------ client.click('btn_mma_accept_tracks') time.sleep(1.5) # frame-sync: let _cb_accept_tracks run one frame + bg thread start # ------------------------------------------------------------------ # Stage 5: Wait for tracks to be written to filesystem + refreshed # ------------------------------------------------------------------ ok, status = _poll(client, timeout=90, label="wait-tracks-populated", condition=lambda s: bool(s.get('tracks'))) assert ok, ( f"No tracks appeared after 90s. " f"ai_status={status.get('ai_status')}" ) tracks_list = status['tracks'] print(f"[SIM] Tracks in project: {[t.get('title', t.get('id')) for t in tracks_list]}") # ------------------------------------------------------------------ # Stage 6: Load first track, verify active_tickets populate # ------------------------------------------------------------------ track_id = tracks_list[0]['id'] print(f"[SIM] Loading track: {track_id}") client.click('btn_mma_load_track', user_data=track_id) time.sleep(1.0) # frame-sync after load click def _track_loaded(s): at = s.get('active_track') at_id = at.get('id') if isinstance(at, dict) else at return at_id == track_id and bool(s.get('active_tickets')) ok, status = _poll(client, timeout=60, label="wait-track-loaded", condition=_track_loaded) assert ok, ( f"Track {track_id} did not load with tickets after 60s. " f"active_track={status.get('active_track')}" ) print(f"[SIM] Track loaded with {len(status.get('active_tickets', []))} ticket(s).") # ------------------------------------------------------------------ # Stage 7: Wait for engine to reach running/done # ------------------------------------------------------------------ def _mma_active(s): return s.get('mma_status') in ('running', 'done') ok, status = _poll(client, timeout=120, label="wait-mma-running", condition=_mma_active) assert ok, ( f"MMA never reached running/done after 120s. " f"mma_status={status.get('mma_status')}" ) print(f"[SIM] MMA status: {status.get('mma_status')}") # ------------------------------------------------------------------ # Stage 8: Verify Tier 3 output appears in mma_streams # ------------------------------------------------------------------ def _tier3_in_streams(s): streams = s.get('mma_streams', {}) tier3_keys = [k for k in streams if 'Tier 3' in k] if not tier3_keys: return False return bool(streams[tier3_keys[0]].strip()) ok, status = _poll(client, timeout=120, label="wait-tier3-streams", condition=_tier3_in_streams) streams = status.get('mma_streams', {}) tier3_keys = [k for k in streams if 'Tier 3' in k] assert ok, ( f"No non-empty Tier 3 output in mma_streams after 120s. " f"streams keys={list(streams.keys())} " f"mma_status={status.get('mma_status')}" ) tier3_content = streams[tier3_keys[0]] print(f"[SIM] Tier 3 output ({len(tier3_content)} chars): {tier3_content[:100]}...") # ------------------------------------------------------------------ # Stage 9: Wait for mma_status == 'done' and mma_tier_usage Tier 3 non-zero # ------------------------------------------------------------------ def _tier3_usage_nonzero(s): usage = s.get('mma_tier_usage', {}) t3 = usage.get('Tier 3', {}) return t3.get('input', 0) > 0 or t3.get('output', 0) > 0 ok, status = _poll(client, timeout=30, label="wait-tier3-usage", condition=_tier3_usage_nonzero) # Non-blocking: if tier_usage isn't wired yet, just log and continue tier_usage = status.get('mma_tier_usage', {}) print(f"[SIM] Tier usage: {tier_usage}") if not ok: print("[SIM] WARNING: mma_tier_usage Tier 3 still zero after 30s — may not be wired to hook API yet") print("[SIM] MMA complete lifecycle simulation PASSED.")