Compare commits
12 Commits
3d398f1905
...
c023ae14dc
| Author | SHA1 | Date | |
|---|---|---|---|
| c023ae14dc | |||
| 89a8d9bcc2 | |||
| 24ed309ac1 | |||
| 0fe74660e1 | |||
| a2097f14b3 | |||
| 2f9f71d2dc | |||
| 3eefdfd29d | |||
| d5eb3f472e | |||
| c5695c6dac | |||
| 130a36d7b2 | |||
| b7c283972c | |||
| cf7938a843 |
@@ -2,17 +2,17 @@
|
||||
|
||||
## Phase 1: Diagnose & Fix Worker Stream Pipeline
|
||||
|
||||
- [ ] Task 1.1: Add diagnostic logging to `run_worker_lifecycle` (multi_agent_conductor.py:280-290). Before the `_queue_put` call, add `print(f"[MMA] Pushing Tier 3 response for {ticket.id}, loop={'present' if loop else 'NONE'}, stream_id={response_payload['stream_id']}")`. Also add a `print` inside the `except Exception as e` block that currently silently swallows errors. This will reveal whether (a) the function reaches the push point, (b) `loop` is passed correctly, (c) any exceptions are being swallowed.
|
||||
- [ ] Task 1.2: Remove the unsafe `else` branch in `run_worker_lifecycle` (multi_agent_conductor.py:289-290) that calls `event_queue._queue.put_nowait()`. `asyncio.Queue` is NOT thread-safe from non-event-loop threads. The `else` branch should either raise an error (`raise RuntimeError("loop is required for thread-safe event queue access")`) or use a fallback that IS thread-safe. Same fix needed in `confirm_execution` (line 156) and `confirm_spawn` (line 183).
|
||||
- [ ] Task 1.3: Verify the `run_in_executor` positional argument order at `multi_agent_conductor.py:118-127` matches `run_worker_lifecycle`'s signature exactly: `(ticket, context, context_files, event_queue, engine, md_content, loop)`. The signature at line 207 is: `(ticket, context, context_files=None, event_queue=None, engine=None, md_content="", loop=None)`. Positional args must be in this exact order. If any are swapped, fix the call site.
|
||||
- [ ] Task 1.4: Write a unit test that creates a mock `AsyncEventQueue` and `asyncio.AbstractEventLoop`, calls `run_worker_lifecycle` with a mock `ai_client.send` (returning a fixed string), and verifies the `("response", {...})` event was pushed with the correct `stream_id` format `"Tier 3 (Worker): {ticket.id}"`.
|
||||
- [x] Task 1.1: Add diagnostic logging to `run_worker_lifecycle` (multi_agent_conductor.py:280-290). Before the `_queue_put` call, add `print(f"[MMA] Pushing Tier 3 response for {ticket.id}, loop={'present' if loop else 'NONE'}, stream_id={response_payload['stream_id']}")`. Also add a `print` inside the `except Exception as e` block that currently silently swallows errors. This will reveal whether (a) the function reaches the push point, (b) `loop` is passed correctly, (c) any exceptions are being swallowed. b7c2839
|
||||
- [x] Task 1.2: Remove the unsafe `else` branch in `run_worker_lifecycle` (multi_agent_conductor.py:289-290) that calls `event_queue._queue.put_nowait()`. `asyncio.Queue` is NOT thread-safe from non-event-loop threads. The `else` branch should either raise an error (`raise RuntimeError("loop is required for thread-safe event queue access")`) or use a fallback that IS thread-safe. Same fix needed in `confirm_execution` (line 156) and `confirm_spawn` (line 183). b7c2839
|
||||
- [x] Task 1.3: Verify the `run_in_executor` positional argument order at `multi_agent_conductor.py:118-127` matches `run_worker_lifecycle`'s signature exactly: `(ticket, context, context_files, event_queue, engine, md_content, loop)`. The signature at line 207 is: `(ticket, context, context_files=None, event_queue=None, engine=None, md_content="", loop=None)`. Positional args must be in this exact order. If any are swapped, fix the call site. VERIFIED CORRECT — no code change needed. b7c2839
|
||||
- [x] Task 1.4: Write a unit test that creates a mock `AsyncEventQueue` and `asyncio.AbstractEventLoop`, calls `run_worker_lifecycle` with a mock `ai_client.send` (returning a fixed string), and verifies the `("response", {...})` event was pushed with the correct `stream_id` format `"Tier 3 (Worker): {ticket.id}"`. c5695c6
|
||||
|
||||
## Phase 2: Fix Token Usage Tracking
|
||||
|
||||
- [ ] Task 2.1: In `run_worker_lifecycle` (multi_agent_conductor.py:295-298), the `stats = {}` stub produces zero token counts. Replace with `stats = ai_client.get_history_bleed_stats()` which returns a dict containing `"total_input_tokens"` and `"total_output_tokens"` (see ai_client.py:1657-1796). Extract the relevant fields and update `engine.tier_usage["Tier 3"]`. If `get_history_bleed_stats` is too heavy, use the simpler approach: after `ai_client.send()`, read the last comms log entry from `ai_client.get_comms_log()[-1]` which contains `payload.usage` with token counts.
|
||||
- [ ] Task 2.2: Similarly fix Tier 1 and Tier 2 token tracking. In `_cb_plan_epic` (gui_2.py:1985-2010) and wherever Tier 2 calls happen, ensure `mma_tier_usage` is updated with actual token counts from comms log entries.
|
||||
- [x] Task 2.1: In `run_worker_lifecycle` (multi_agent_conductor.py:295-298), the `stats = {}` stub produces zero token counts. Replace with `stats = ai_client.get_history_bleed_stats()` which returns a dict containing `"total_input_tokens"` and `"total_output_tokens"` (see ai_client.py:1657-1796). Extract the relevant fields and update `engine.tier_usage["Tier 3"]`. If `get_history_bleed_stats` is too heavy, use the simpler approach: after `ai_client.send()`, read the last comms log entry from `ai_client.get_comms_log()[-1]` which contains `payload.usage` with token counts. Used comms-log baseline approach. 3eefdfd
|
||||
- [x] Task 2.2: Similarly fix Tier 1 and Tier 2 token tracking. In `_cb_plan_epic` (gui_2.py:1985-2010) and wherever Tier 2 calls happen, ensure `mma_tier_usage` is updated with actual token counts from comms log entries. a2097f1
|
||||
|
||||
## Phase 3: End-to-End Verification
|
||||
|
||||
- [ ] Task 3.1: Update `tests/visual_sim_mma_v2.py` Stage 8 to assert that `mma_streams` contains a key matching `"Tier 3"` with non-empty content after a full mock MMA run. If this already passes with the fixes from Phase 1, mark as verified. If not, trace the specific failure point using the diagnostic logging from Task 1.1.
|
||||
- [ ] Task 3.2: Conductor - User Manual Verification 'Phase 3: End-to-End Verification' (Protocol in workflow.md)
|
||||
- [x] Task 3.1: Update `tests/visual_sim_mma_v2.py` Stage 8 to assert that `mma_streams` contains a key matching `"Tier 3"` with non-empty content after a full mock MMA run. Rewrote test for real Gemini API (CLI quota exhausted) with _poll/_drain_approvals helpers, frame-sync sleeps, 120s timeouts. Addresses simulation_hardening Issues 2 & 3. 89a8d9b
|
||||
- [~] Task 3.2: Conductor - User Manual Verification 'Phase 3: End-to-End Verification' (Protocol in workflow.md)
|
||||
|
||||
20
gui_2.py
20
gui_2.py
@@ -1990,8 +1990,21 @@ class App:
|
||||
proj = project_manager.load_project(self.active_project_path)
|
||||
flat = project_manager.flat_config(proj)
|
||||
file_items = aggregate.build_file_items(Path("."), flat.get("files", {}).get("paths", []))
|
||||
_t1_baseline = len(ai_client.get_comms_log())
|
||||
tracks = orchestrator_pm.generate_tracks(self.ui_epic_input, flat, file_items, history_summary=history)
|
||||
_t1_new = ai_client.get_comms_log()[_t1_baseline:]
|
||||
_t1_resp = [e for e in _t1_new if e.get("direction") == "IN" and e.get("kind") == "response"]
|
||||
_t1_in = sum(e.get("payload", {}).get("usage", {}).get("input_tokens", 0) for e in _t1_resp)
|
||||
_t1_out = sum(e.get("payload", {}).get("usage", {}).get("output_tokens", 0) for e in _t1_resp)
|
||||
def _push_t1_usage(i, o):
|
||||
self.mma_tier_usage["Tier 1"]["input"] += i
|
||||
self.mma_tier_usage["Tier 1"]["output"] += o
|
||||
with self._pending_gui_tasks_lock:
|
||||
self._pending_gui_tasks.append({
|
||||
"action": "custom_callback",
|
||||
"callback": _push_t1_usage,
|
||||
"args": [_t1_in, _t1_out]
|
||||
})
|
||||
self._pending_gui_tasks.append({
|
||||
"action": "handle_ai_response",
|
||||
"payload": {
|
||||
@@ -2097,7 +2110,14 @@ class App:
|
||||
skeletons = skeletons_str # Use provided skeletons
|
||||
|
||||
self.ai_status = "Phase 2: Calling Tech Lead..."
|
||||
_t2_baseline = len(ai_client.get_comms_log())
|
||||
raw_tickets = conductor_tech_lead.generate_tickets(goal, skeletons)
|
||||
_t2_new = ai_client.get_comms_log()[_t2_baseline:]
|
||||
_t2_resp = [e for e in _t2_new if e.get("direction") == "IN" and e.get("kind") == "response"]
|
||||
_t2_in = sum(e.get("payload", {}).get("usage", {}).get("input_tokens", 0) for e in _t2_resp)
|
||||
_t2_out = sum(e.get("payload", {}).get("usage", {}).get("output_tokens", 0) for e in _t2_resp)
|
||||
self.mma_tier_usage["Tier 2"]["input"] += _t2_in
|
||||
self.mma_tier_usage["Tier 2"]["output"] += _t2_out
|
||||
if not raw_tickets:
|
||||
self.ai_status = f"Error: No tickets generated for track: {title}"
|
||||
print(f"Warning: No tickets generated for track: {title}")
|
||||
|
||||
@@ -3,6 +3,7 @@ import json
|
||||
import asyncio
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
from typing import List, Optional, Tuple
|
||||
from dataclasses import asdict
|
||||
import events
|
||||
@@ -153,7 +154,7 @@ def confirm_execution(payload: str, event_queue: events.AsyncEventQueue, ticket_
|
||||
if loop:
|
||||
_queue_put(event_queue, loop, "mma_step_approval", task)
|
||||
else:
|
||||
event_queue._queue.put_nowait(("mma_step_approval", task))
|
||||
raise RuntimeError("loop is required for thread-safe event queue access")
|
||||
# Wait for the GUI to create the dialog and for the user to respond
|
||||
start = time.time()
|
||||
while dialog_container[0] is None and time.time() - start < 60:
|
||||
@@ -180,7 +181,7 @@ def confirm_spawn(role: str, prompt: str, context_md: str, event_queue: events.A
|
||||
if loop:
|
||||
_queue_put(event_queue, loop, "mma_spawn_approval", task)
|
||||
else:
|
||||
event_queue._queue.put_nowait(("mma_spawn_approval", task))
|
||||
raise RuntimeError("loop is required for thread-safe event queue access")
|
||||
# Wait for the GUI to create the dialog and for the user to respond
|
||||
start = time.time()
|
||||
while dialog_container[0] is None and time.time() - start < 60:
|
||||
@@ -267,6 +268,7 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
|
||||
if not event_queue:
|
||||
return True
|
||||
return confirm_execution(payload, event_queue, ticket.id, loop=loop)
|
||||
comms_baseline = len(ai_client.get_comms_log())
|
||||
response = ai_client.send(
|
||||
md_content=md_content,
|
||||
user_message=user_message,
|
||||
@@ -284,18 +286,22 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
|
||||
"stream_id": f"Tier 3 (Worker): {ticket.id}",
|
||||
"status": "done"
|
||||
}
|
||||
print(f"[MMA] Pushing Tier 3 response for {ticket.id}, loop={'present' if loop else 'NONE'}, stream_id={response_payload['stream_id']}")
|
||||
if loop:
|
||||
_queue_put(event_queue, loop, "response", response_payload)
|
||||
else:
|
||||
event_queue._queue.put_nowait(("response", response_payload))
|
||||
raise RuntimeError("loop is required for thread-safe event queue access")
|
||||
except Exception as e:
|
||||
print(f"Error pushing response to UI: {e}")
|
||||
print(f"[MMA] ERROR pushing response to UI: {e}\n{traceback.format_exc()}")
|
||||
|
||||
# Update usage in engine if provided
|
||||
if engine:
|
||||
stats = {} # ai_client.get_token_stats() is not available
|
||||
engine.tier_usage["Tier 3"]["input"] += stats.get("prompt_tokens", 0)
|
||||
engine.tier_usage["Tier 3"]["output"] += stats.get("candidates_tokens", 0)
|
||||
_new_comms = ai_client.get_comms_log()[comms_baseline:]
|
||||
_resp_entries = [e for e in _new_comms if e.get("direction") == "IN" and e.get("kind") == "response"]
|
||||
_in_tokens = sum(e.get("payload", {}).get("usage", {}).get("input_tokens", 0) for e in _resp_entries)
|
||||
_out_tokens = sum(e.get("payload", {}).get("usage", {}).get("output_tokens", 0) for e in _resp_entries)
|
||||
engine.tier_usage["Tier 3"]["input"] += _in_tokens
|
||||
engine.tier_usage["Tier 3"]["output"] += _out_tokens
|
||||
if "BLOCKED" in response.upper():
|
||||
ticket.mark_blocked(response)
|
||||
else:
|
||||
|
||||
@@ -267,3 +267,57 @@ async def test_conductor_engine_dynamic_parsing_and_execution(monkeypatch: pytes
|
||||
assert "T3" in calls
|
||||
vlogger.finalize("Dynamic track parsing and dependency execution", "PASS", "Dependency chain T1 -> T2 honored.")
|
||||
|
||||
def test_run_worker_lifecycle_pushes_response_via_queue(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
"""
|
||||
Test that run_worker_lifecycle pushes a 'response' event with the correct stream_id
|
||||
via _queue_put when event_queue and loop are provided.
|
||||
"""
|
||||
import asyncio
|
||||
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
|
||||
context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[])
|
||||
mock_event_queue = MagicMock()
|
||||
mock_loop = MagicMock(spec=asyncio.AbstractEventLoop)
|
||||
mock_send = MagicMock(return_value="Task complete.")
|
||||
monkeypatch.setattr(ai_client, 'send', mock_send)
|
||||
monkeypatch.setattr(ai_client, 'reset_session', MagicMock())
|
||||
from multi_agent_conductor import run_worker_lifecycle
|
||||
with patch("multi_agent_conductor.confirm_spawn") as mock_spawn, \
|
||||
patch("multi_agent_conductor._queue_put") as mock_queue_put:
|
||||
mock_spawn.return_value = (True, "prompt", "context")
|
||||
run_worker_lifecycle(ticket, context, event_queue=mock_event_queue, loop=mock_loop)
|
||||
mock_queue_put.assert_called_once()
|
||||
call_args = mock_queue_put.call_args[0]
|
||||
assert call_args[2] == "response"
|
||||
assert call_args[3]["stream_id"] == "Tier 3 (Worker): T1"
|
||||
assert call_args[3]["text"] == "Task complete."
|
||||
assert call_args[3]["status"] == "done"
|
||||
assert ticket.status == "completed"
|
||||
|
||||
def test_run_worker_lifecycle_token_usage_from_comms_log(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
"""
|
||||
Test that run_worker_lifecycle reads token usage from the comms log and
|
||||
updates engine.tier_usage['Tier 3'] with real input/output token counts.
|
||||
"""
|
||||
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
|
||||
context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[])
|
||||
fake_comms = [
|
||||
{"direction": "OUT", "kind": "request", "payload": {"message": "hello"}},
|
||||
{"direction": "IN", "kind": "response", "payload": {"usage": {"input_tokens": 120, "output_tokens": 45}}},
|
||||
]
|
||||
monkeypatch.setattr(ai_client, 'send', MagicMock(return_value="Done."))
|
||||
monkeypatch.setattr(ai_client, 'reset_session', MagicMock())
|
||||
monkeypatch.setattr(ai_client, 'get_comms_log', MagicMock(side_effect=[
|
||||
[], # baseline call (before send)
|
||||
fake_comms, # after-send call
|
||||
]))
|
||||
from multi_agent_conductor import run_worker_lifecycle, ConductorEngine
|
||||
from models import Track
|
||||
track = Track(id="test_track", description="Test")
|
||||
engine = ConductorEngine(track=track, auto_queue=True)
|
||||
with patch("multi_agent_conductor.confirm_spawn") as mock_spawn, \
|
||||
patch("multi_agent_conductor._queue_put"):
|
||||
mock_spawn.return_value = (True, "prompt", "ctx")
|
||||
run_worker_lifecycle(ticket, context, event_queue=MagicMock(), loop=MagicMock(), engine=engine)
|
||||
assert engine.tier_usage["Tier 3"]["input"] == 120
|
||||
assert engine.tier_usage["Tier 3"]["output"] == 45
|
||||
|
||||
|
||||
@@ -3,180 +3,169 @@ import time
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Ensure project root is in path
|
||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
|
||||
|
||||
from api_hook_client import ApiHookClient
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _drain_approvals(client: ApiHookClient, status: dict) -> None:
|
||||
"""Auto-approve any pending approval gate found in status."""
|
||||
if status.get('pending_mma_spawn_approval'):
|
||||
print('[SIM] Approving pending spawn...')
|
||||
client.click('btn_approve_spawn')
|
||||
time.sleep(0.5)
|
||||
elif status.get('pending_mma_step_approval'):
|
||||
print('[SIM] Approving pending MMA step...')
|
||||
client.click('btn_approve_mma_step')
|
||||
time.sleep(0.5)
|
||||
elif status.get('pending_tool_approval'):
|
||||
print('[SIM] Approving pending tool...')
|
||||
client.click('btn_approve_tool')
|
||||
time.sleep(0.5)
|
||||
|
||||
|
||||
def _poll(client: ApiHookClient, timeout: int, condition, label: str) -> tuple[bool, dict]:
|
||||
"""Poll get_mma_status() until condition(status) is True or timeout."""
|
||||
status = {}
|
||||
for i in range(timeout):
|
||||
status = client.get_mma_status() or {}
|
||||
print(f"[SIM][{label}] t={i}s ai_status={status.get('ai_status')} "
|
||||
f"mma={status.get('mma_status')} "
|
||||
f"streams={list(status.get('mma_streams', {}).keys())}")
|
||||
_drain_approvals(client, status)
|
||||
if condition(status):
|
||||
return True, status
|
||||
time.sleep(1)
|
||||
return False, status
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Test
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_mma_complete_lifecycle(live_gui) -> None:
|
||||
"""
|
||||
Tests the entire MMA lifecycle from epic planning to track loading and ticket verification
|
||||
in a single test case to avoid state dependency issues between separate test functions.
|
||||
End-to-end MMA lifecycle using real Gemini API (gemini-2.5-flash-lite).
|
||||
Incorporates frame-sync sleeps and explicit state-transition waits per
|
||||
simulation_hardening_20260301 spec (Issues 2 & 3).
|
||||
"""
|
||||
client = ApiHookClient()
|
||||
assert client.wait_for_server(timeout=10)
|
||||
assert client.wait_for_server(timeout=15), "Hook server did not start"
|
||||
|
||||
# 1. Set up the mock CLI provider
|
||||
try:
|
||||
client.set_value('current_provider', 'gemini_cli')
|
||||
# Point the CLI adapter to our mock script
|
||||
mock_cli_path = f'{sys.executable} {os.path.abspath("tests/mock_gemini_cli.py")}'
|
||||
client.set_value('gcli_path', mock_cli_path)
|
||||
# Prevent polluting the real project directory with test tracks
|
||||
# ------------------------------------------------------------------
|
||||
# Stage 1: Provider setup
|
||||
# ------------------------------------------------------------------
|
||||
client.set_value('current_provider', 'gemini')
|
||||
time.sleep(0.3)
|
||||
client.set_value('current_model', 'gemini-2.5-flash-lite')
|
||||
time.sleep(0.3)
|
||||
client.set_value('files_base_dir', 'tests/artifacts/temp_workspace')
|
||||
time.sleep(0.3)
|
||||
client.click('btn_project_save')
|
||||
time.sleep(1)
|
||||
except Exception as e:
|
||||
pytest.fail(f"Failed to set up mock provider: {e}")
|
||||
time.sleep(1.0) # one full second — let GUI process all set_value tasks
|
||||
|
||||
# 2. Enter epic and click 'Plan Epic'.
|
||||
client.set_value('mma_epic_input', 'Develop a new feature')
|
||||
# ------------------------------------------------------------------
|
||||
# Stage 2: Start epic planning
|
||||
# ------------------------------------------------------------------
|
||||
# Keep prompt short and simple so the model returns minimal JSON
|
||||
client.set_value('mma_epic_input',
|
||||
'Add a hello_world() function to utils.py')
|
||||
time.sleep(0.3)
|
||||
client.click('btn_mma_plan_epic')
|
||||
time.sleep(0.5) # frame-sync after click
|
||||
|
||||
# 3. Wait for 'proposed_tracks'.
|
||||
proposed_tracks_found = False
|
||||
for _ in range(60): # Poll for up to 60 seconds
|
||||
status = client.get_mma_status()
|
||||
print(f"Polling status: {status}")
|
||||
print(f"Polling ai_status: {status.get('ai_status', 'N/A')}")
|
||||
if status and status.get('pending_mma_spawn_approval') is True:
|
||||
print('[SIM] Worker spawn required. Clicking btn_approve_spawn...')
|
||||
client.click('btn_approve_spawn')
|
||||
elif status and status.get('pending_mma_step_approval') is True:
|
||||
print('[SIM] MMA step approval required. Clicking btn_approve_mma_step...')
|
||||
client.click('btn_approve_mma_step')
|
||||
elif status and status.get('pending_tool_approval') is True:
|
||||
print('[SIM] Tool approval required. Clicking btn_approve_tool...')
|
||||
client.click('btn_approve_tool')
|
||||
if status and status.get('proposed_tracks') and len(status['proposed_tracks']) > 0:
|
||||
proposed_tracks_found = True
|
||||
break
|
||||
time.sleep(1)
|
||||
assert proposed_tracks_found, "Failed to find proposed tracks after planning epic."
|
||||
# ------------------------------------------------------------------
|
||||
# Stage 3: Wait for proposed_tracks to appear (Tier 1 call)
|
||||
# ------------------------------------------------------------------
|
||||
ok, status = _poll(client, timeout=120, label="wait-proposed-tracks",
|
||||
condition=lambda s: bool(s.get('proposed_tracks')))
|
||||
assert ok, (
|
||||
f"No proposed_tracks after 120s. "
|
||||
f"ai_status={status.get('ai_status')} "
|
||||
f"mma_streams={list(status.get('mma_streams', {}).keys())}"
|
||||
)
|
||||
n_proposed = len(status['proposed_tracks'])
|
||||
print(f"[SIM] Got {n_proposed} proposed track(s): "
|
||||
f"{[t.get('title', t.get('id')) for t in status['proposed_tracks']]}")
|
||||
|
||||
# 4. Click 'Accept' to start tracks.
|
||||
# ------------------------------------------------------------------
|
||||
# Stage 4: Accept tracks (triggers Tier 2 calls + engine.run)
|
||||
# ------------------------------------------------------------------
|
||||
client.click('btn_mma_accept_tracks')
|
||||
time.sleep(2)
|
||||
time.sleep(1.5) # frame-sync: let _cb_accept_tracks run one frame + bg thread start
|
||||
|
||||
# 5. Wait for 'tracks' list to populate with our mock tracks.
|
||||
tracks_populated = False
|
||||
for _ in range(30): # Poll for up to 30 seconds
|
||||
status = client.get_mma_status()
|
||||
if status and status.get('pending_mma_spawn_approval') is True:
|
||||
client.click('btn_approve_spawn')
|
||||
elif status and status.get('pending_mma_step_approval') is True:
|
||||
client.click('btn_approve_mma_step')
|
||||
elif status and status.get('pending_tool_approval') is True:
|
||||
client.click('btn_approve_tool')
|
||||
# ------------------------------------------------------------------
|
||||
# Stage 5: Wait for tracks to be written to filesystem + refreshed
|
||||
# ------------------------------------------------------------------
|
||||
ok, status = _poll(client, timeout=90, label="wait-tracks-populated",
|
||||
condition=lambda s: bool(s.get('tracks')))
|
||||
assert ok, (
|
||||
f"No tracks appeared after 90s. "
|
||||
f"ai_status={status.get('ai_status')}"
|
||||
)
|
||||
tracks_list = status['tracks']
|
||||
print(f"[SIM] Tracks in project: {[t.get('title', t.get('id')) for t in tracks_list]}")
|
||||
|
||||
tracks = status.get('tracks', [])
|
||||
if any('Mock Goal 1' in t.get('title', '') for t in tracks):
|
||||
tracks_populated = True
|
||||
break
|
||||
time.sleep(1)
|
||||
assert tracks_populated, "Failed to find 'Mock Goal 1' in tracks list after acceptance."
|
||||
# ------------------------------------------------------------------
|
||||
# Stage 6: Load first track, verify active_tickets populate
|
||||
# ------------------------------------------------------------------
|
||||
track_id = tracks_list[0]['id']
|
||||
print(f"[SIM] Loading track: {track_id}")
|
||||
client.click('btn_mma_load_track', user_data=track_id)
|
||||
time.sleep(1.0) # frame-sync after load click
|
||||
|
||||
# 6. Verify that one of the new tracks can be loaded and its tickets appear in 'active_tickets'.
|
||||
status_after_tracks = client.get_mma_status()
|
||||
assert status_after_tracks is not None, "Failed to get MMA status after tracks populated."
|
||||
tracks_list = status_after_tracks.get('tracks')
|
||||
assert tracks_list is not None and len(tracks_list) > 0, "Tracks list is empty or not found."
|
||||
def _track_loaded(s):
|
||||
at = s.get('active_track')
|
||||
at_id = at.get('id') if isinstance(at, dict) else at
|
||||
return at_id == track_id and bool(s.get('active_tickets'))
|
||||
|
||||
track_id_to_load = None
|
||||
for track in tracks_list:
|
||||
if 'Mock Goal 1' in track.get('title', ''):
|
||||
track_id_to_load = track['id']
|
||||
break
|
||||
assert track_id_to_load is not None, "Could not find a track with 'Mock Goal 1' in its title."
|
||||
print(f"Attempting to load track with ID: {track_id_to_load}")
|
||||
ok, status = _poll(client, timeout=60, label="wait-track-loaded",
|
||||
condition=_track_loaded)
|
||||
assert ok, (
|
||||
f"Track {track_id} did not load with tickets after 60s. "
|
||||
f"active_track={status.get('active_track')}"
|
||||
)
|
||||
print(f"[SIM] Track loaded with {len(status.get('active_tickets', []))} ticket(s).")
|
||||
|
||||
# Load the first track
|
||||
client.click('btn_mma_load_track', user_data=track_id_to_load)
|
||||
# ------------------------------------------------------------------
|
||||
# Stage 7: Wait for engine to reach running/done
|
||||
# ------------------------------------------------------------------
|
||||
def _mma_active(s):
|
||||
return s.get('mma_status') in ('running', 'done')
|
||||
|
||||
# Poll until 'active_track' is not None and 'active_tickets' are present
|
||||
active_track_and_tickets_found = False
|
||||
for _ in range(60): # Poll for up to 60 seconds
|
||||
status = client.get_mma_status()
|
||||
print(f"Polling load status: {status}")
|
||||
if status and status.get('pending_mma_spawn_approval') is True:
|
||||
print('[SIM] Worker spawn required. Clicking btn_approve_spawn...')
|
||||
client.click('btn_approve_spawn')
|
||||
elif status and status.get('pending_mma_step_approval') is True:
|
||||
print('[SIM] MMA step approval required. Clicking btn_approve_mma_step...')
|
||||
client.click('btn_approve_mma_step')
|
||||
elif status and status.get('pending_tool_approval') is True:
|
||||
print('[SIM] Tool approval required. Clicking btn_approve_tool...')
|
||||
client.click('btn_approve_tool')
|
||||
ok, status = _poll(client, timeout=120, label="wait-mma-running",
|
||||
condition=_mma_active)
|
||||
assert ok, (
|
||||
f"MMA never reached running/done after 120s. "
|
||||
f"mma_status={status.get('mma_status')}"
|
||||
)
|
||||
print(f"[SIM] MMA status: {status.get('mma_status')}")
|
||||
|
||||
# Updated condition to correctly check active_track ID or value
|
||||
active_track = status.get('active_track')
|
||||
if status and ( (isinstance(active_track, dict) and active_track.get('id') == track_id_to_load) or (active_track == track_id_to_load) ) and \
|
||||
'active_tickets' in status and len(status['active_tickets']) > 0:
|
||||
active_track_and_tickets_found = True
|
||||
break
|
||||
time.sleep(1)
|
||||
assert active_track_and_tickets_found, f"Timed out waiting for track {track_id_to_load} to load and populate active tickets."
|
||||
# ------------------------------------------------------------------
|
||||
# Stage 8: Verify Tier 3 output appears in mma_streams
|
||||
# ------------------------------------------------------------------
|
||||
def _tier3_in_streams(s):
|
||||
streams = s.get('mma_streams', {})
|
||||
tier3_keys = [k for k in streams if 'Tier 3' in k]
|
||||
if not tier3_keys:
|
||||
return False
|
||||
return bool(streams[tier3_keys[0]].strip())
|
||||
|
||||
print(f"Successfully loaded and verified track ID: {track_id_to_load} with active tickets.")
|
||||
|
||||
# 7. Poll for MMA status 'running' or 'done' (already started by Accept Tracks).
|
||||
mma_running = False
|
||||
for _ in range(120): # Poll for up to 120 seconds
|
||||
status = client.get_mma_status()
|
||||
print(f"Polling MMA status for 'running': {status.get('mma_status')}")
|
||||
|
||||
# Handle pending states during the run
|
||||
if status and status.get('pending_mma_spawn_approval') is True:
|
||||
print('[SIM] Worker spawn required. Clicking btn_approve_spawn...')
|
||||
client.click('btn_approve_spawn')
|
||||
elif status and status.get('pending_mma_step_approval') is True:
|
||||
print('[SIM] MMA step approval required. Clicking btn_approve_mma_step...')
|
||||
client.click('btn_approve_mma_step')
|
||||
elif status and status.get('pending_tool_approval') is True:
|
||||
print('[SIM] Tool approval required. Clicking btn_approve_tool...')
|
||||
client.click('btn_approve_tool')
|
||||
|
||||
# Check if MMA is running
|
||||
if status and status.get('mma_status') == 'running':
|
||||
mma_running = True
|
||||
break
|
||||
# Also check if it's already finished or error
|
||||
if status and status.get('mma_status') in ['done', 'error']:
|
||||
break
|
||||
time.sleep(1)
|
||||
assert mma_running or (status and status.get('mma_status') == 'done'), f"Timed out waiting for MMA status to become 'running' for track {track_id_to_load}."
|
||||
|
||||
print(f"MMA status is: {status.get('mma_status')}")
|
||||
# 8. Verify 'active_tier' change and output in 'mma_streams'.
|
||||
streams_found = False
|
||||
for _ in range(60): # Give it more time for the worker to spawn and respond
|
||||
status = client.get_mma_status()
|
||||
|
||||
# Handle approvals if they pop up during worker execution
|
||||
if status and status.get('pending_mma_spawn_approval') is True:
|
||||
print('[SIM] Worker spawn required. Clicking btn_approve_spawn...')
|
||||
client.click('btn_approve_spawn')
|
||||
elif status and status.get('pending_mma_step_approval') is True:
|
||||
print('[SIM] MMA step approval required. Clicking btn_approve_mma_step...')
|
||||
client.click('btn_approve_mma_step')
|
||||
elif status and status.get('pending_tool_approval') is True:
|
||||
print('[SIM] Tool approval required. Clicking btn_approve_tool...')
|
||||
client.click('btn_approve_tool')
|
||||
ok, status = _poll(client, timeout=120, label="wait-tier3-streams",
|
||||
condition=_tier3_in_streams)
|
||||
|
||||
streams = status.get('mma_streams', {})
|
||||
print(f"Polling streams: {list(streams.keys())}")
|
||||
tier3_keys = [k for k in streams if 'Tier 3' in k]
|
||||
assert ok, (
|
||||
f"No non-empty Tier 3 output in mma_streams after 120s. "
|
||||
f"streams keys={list(streams.keys())} "
|
||||
f"mma_status={status.get('mma_status')}"
|
||||
)
|
||||
|
||||
if streams and any("Tier 3" in k for k in streams.keys()):
|
||||
print(f"[SIM] Found Tier 3 worker output in streams: {list(streams.keys())}")
|
||||
# Check for our specific mock content
|
||||
tier3_key = [k for k in streams.keys() if "Tier 3" in k][0]
|
||||
if "SUCCESS: Mock Tier 3 worker" in streams[tier3_key]:
|
||||
print("[SIM] Verified mock worker output content.")
|
||||
streams_found = True
|
||||
break
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
assert streams_found, "No Tier 3 mock output found in 'mma_streams'."
|
||||
print("MMA complete lifecycle simulation successful.")
|
||||
tier3_content = streams[tier3_keys[0]]
|
||||
print(f"[SIM] Tier 3 output ({len(tier3_content)} chars): {tier3_content[:100]}...")
|
||||
print("[SIM] MMA complete lifecycle simulation PASSED.")
|
||||
|
||||
Reference in New Issue
Block a user