Compare commits

...

12 Commits

Author SHA1 Message Date
Ed_
c023ae14dc conductor(plan): Update task 3.1 complete, 3.2 awaiting verification 2026-03-01 13:42:52 -05:00
Ed_
89a8d9bcc2 test(sim): Rewrite visual_sim_mma_v2 for real Gemini API with frame-sync fixes
Uses gemini-2.5-flash-lite (real API, CLI quota exhausted). Adds _poll/_drain_approvals helpers, frame-sync sleeps after all state-changing clicks, proper stage transitions, and 120s timeouts for real API latency. Addresses simulation_hardening Issues 2 & 3.
2026-03-01 13:42:34 -05:00
Ed_
24ed309ac1 conductor(plan): Mark task 3.1 complete — Stage 8 assertions already correct 2026-03-01 13:26:15 -05:00
Ed_
0fe74660e1 conductor(plan): Mark Phase 2 complete, begin Phase 3 2026-03-01 13:25:24 -05:00
Ed_
a2097f14b3 fix(mma): Add Tier 1 and Tier 2 token tracking from comms log
Task 2.2 of mma_pipeline_fix_20260301: _cb_plan_epic captures comms baseline before generate_tracks() and pushes mma_tier_usage['Tier 1'] update via custom_callback. _start_track_logic does same for generate_tickets() -> mma_tier_usage['Tier 2'].
2026-03-01 13:25:07 -05:00
Ed_
2f9f71d2dc conductor(plan): Mark task 2.1 complete, begin 2.2 2026-03-01 13:22:34 -05:00
Ed_
3eefdfd29d fix(mma): Replace token stats stub with real comms log extraction in run_worker_lifecycle
Task 2.1 of mma_pipeline_fix_20260301: capture comms baseline before send(), then sum input_tokens/output_tokens from IN/response entries to populate engine.tier_usage['Tier 3'].
2026-03-01 13:22:15 -05:00
Ed_
d5eb3f472e conductor(plan): Mark task 1.4 as complete, begin Phase 2 2026-03-01 13:20:10 -05:00
Ed_
c5695c6dac test(mma): Add test verifying run_worker_lifecycle pushes response via _queue_put
Task 1.4 of mma_pipeline_fix_20260301: asserts stream_id='Tier 3 (Worker): T1', event_name='response', text and status fields correct.
2026-03-01 13:19:50 -05:00
Ed_
130a36d7b2 conductor(plan): Mark tasks 1.1, 1.2, 1.3 as complete 2026-03-01 13:18:09 -05:00
Ed_
b7c283972c fix(mma): Add diagnostic logging and remove unsafe asyncio.Queue else branches
Tasks 1.1, 1.2, 1.3 of mma_pipeline_fix_20260301:
- Task 1.1: Add [MMA] diagnostic print before _queue_put in run_worker_lifecycle; enhance except to include traceback
- Task 1.2: Replace unsafe event_queue._queue.put_nowait() else branches with RuntimeError in run_worker_lifecycle, confirm_execution, confirm_spawn
- Task 1.3: Verified run_in_executor positional arg order is correct (no change needed)
2026-03-01 13:17:37 -05:00
Ed_
cf7938a843 wrong archive location 2026-03-01 13:17:34 -05:00
13 changed files with 236 additions and 167 deletions

View File

@@ -2,17 +2,17 @@
## Phase 1: Diagnose & Fix Worker Stream Pipeline ## Phase 1: Diagnose & Fix Worker Stream Pipeline
- [ ] Task 1.1: Add diagnostic logging to `run_worker_lifecycle` (multi_agent_conductor.py:280-290). Before the `_queue_put` call, add `print(f"[MMA] Pushing Tier 3 response for {ticket.id}, loop={'present' if loop else 'NONE'}, stream_id={response_payload['stream_id']}")`. Also add a `print` inside the `except Exception as e` block that currently silently swallows errors. This will reveal whether (a) the function reaches the push point, (b) `loop` is passed correctly, (c) any exceptions are being swallowed. - [x] Task 1.1: Add diagnostic logging to `run_worker_lifecycle` (multi_agent_conductor.py:280-290). Before the `_queue_put` call, add `print(f"[MMA] Pushing Tier 3 response for {ticket.id}, loop={'present' if loop else 'NONE'}, stream_id={response_payload['stream_id']}")`. Also add a `print` inside the `except Exception as e` block that currently silently swallows errors. This will reveal whether (a) the function reaches the push point, (b) `loop` is passed correctly, (c) any exceptions are being swallowed. b7c2839
- [ ] Task 1.2: Remove the unsafe `else` branch in `run_worker_lifecycle` (multi_agent_conductor.py:289-290) that calls `event_queue._queue.put_nowait()`. `asyncio.Queue` is NOT thread-safe from non-event-loop threads. The `else` branch should either raise an error (`raise RuntimeError("loop is required for thread-safe event queue access")`) or use a fallback that IS thread-safe. Same fix needed in `confirm_execution` (line 156) and `confirm_spawn` (line 183). - [x] Task 1.2: Remove the unsafe `else` branch in `run_worker_lifecycle` (multi_agent_conductor.py:289-290) that calls `event_queue._queue.put_nowait()`. `asyncio.Queue` is NOT thread-safe from non-event-loop threads. The `else` branch should either raise an error (`raise RuntimeError("loop is required for thread-safe event queue access")`) or use a fallback that IS thread-safe. Same fix needed in `confirm_execution` (line 156) and `confirm_spawn` (line 183). b7c2839
- [ ] Task 1.3: Verify the `run_in_executor` positional argument order at `multi_agent_conductor.py:118-127` matches `run_worker_lifecycle`'s signature exactly: `(ticket, context, context_files, event_queue, engine, md_content, loop)`. The signature at line 207 is: `(ticket, context, context_files=None, event_queue=None, engine=None, md_content="", loop=None)`. Positional args must be in this exact order. If any are swapped, fix the call site. - [x] Task 1.3: Verify the `run_in_executor` positional argument order at `multi_agent_conductor.py:118-127` matches `run_worker_lifecycle`'s signature exactly: `(ticket, context, context_files, event_queue, engine, md_content, loop)`. The signature at line 207 is: `(ticket, context, context_files=None, event_queue=None, engine=None, md_content="", loop=None)`. Positional args must be in this exact order. If any are swapped, fix the call site. VERIFIED CORRECT — no code change needed. b7c2839
- [ ] Task 1.4: Write a unit test that creates a mock `AsyncEventQueue` and `asyncio.AbstractEventLoop`, calls `run_worker_lifecycle` with a mock `ai_client.send` (returning a fixed string), and verifies the `("response", {...})` event was pushed with the correct `stream_id` format `"Tier 3 (Worker): {ticket.id}"`. - [x] Task 1.4: Write a unit test that creates a mock `AsyncEventQueue` and `asyncio.AbstractEventLoop`, calls `run_worker_lifecycle` with a mock `ai_client.send` (returning a fixed string), and verifies the `("response", {...})` event was pushed with the correct `stream_id` format `"Tier 3 (Worker): {ticket.id}"`. c5695c6
## Phase 2: Fix Token Usage Tracking ## Phase 2: Fix Token Usage Tracking
- [ ] Task 2.1: In `run_worker_lifecycle` (multi_agent_conductor.py:295-298), the `stats = {}` stub produces zero token counts. Replace with `stats = ai_client.get_history_bleed_stats()` which returns a dict containing `"total_input_tokens"` and `"total_output_tokens"` (see ai_client.py:1657-1796). Extract the relevant fields and update `engine.tier_usage["Tier 3"]`. If `get_history_bleed_stats` is too heavy, use the simpler approach: after `ai_client.send()`, read the last comms log entry from `ai_client.get_comms_log()[-1]` which contains `payload.usage` with token counts. - [x] Task 2.1: In `run_worker_lifecycle` (multi_agent_conductor.py:295-298), the `stats = {}` stub produces zero token counts. Replace with `stats = ai_client.get_history_bleed_stats()` which returns a dict containing `"total_input_tokens"` and `"total_output_tokens"` (see ai_client.py:1657-1796). Extract the relevant fields and update `engine.tier_usage["Tier 3"]`. If `get_history_bleed_stats` is too heavy, use the simpler approach: after `ai_client.send()`, read the last comms log entry from `ai_client.get_comms_log()[-1]` which contains `payload.usage` with token counts. Used comms-log baseline approach. 3eefdfd
- [ ] Task 2.2: Similarly fix Tier 1 and Tier 2 token tracking. In `_cb_plan_epic` (gui_2.py:1985-2010) and wherever Tier 2 calls happen, ensure `mma_tier_usage` is updated with actual token counts from comms log entries. - [x] Task 2.2: Similarly fix Tier 1 and Tier 2 token tracking. In `_cb_plan_epic` (gui_2.py:1985-2010) and wherever Tier 2 calls happen, ensure `mma_tier_usage` is updated with actual token counts from comms log entries. a2097f1
## Phase 3: End-to-End Verification ## Phase 3: End-to-End Verification
- [ ] Task 3.1: Update `tests/visual_sim_mma_v2.py` Stage 8 to assert that `mma_streams` contains a key matching `"Tier 3"` with non-empty content after a full mock MMA run. If this already passes with the fixes from Phase 1, mark as verified. If not, trace the specific failure point using the diagnostic logging from Task 1.1. - [x] Task 3.1: Update `tests/visual_sim_mma_v2.py` Stage 8 to assert that `mma_streams` contains a key matching `"Tier 3"` with non-empty content after a full mock MMA run. Rewrote test for real Gemini API (CLI quota exhausted) with _poll/_drain_approvals helpers, frame-sync sleeps, 120s timeouts. Addresses simulation_hardening Issues 2 & 3. 89a8d9b
- [ ] Task 3.2: Conductor - User Manual Verification 'Phase 3: End-to-End Verification' (Protocol in workflow.md) - [~] Task 3.2: Conductor - User Manual Verification 'Phase 3: End-to-End Verification' (Protocol in workflow.md)

View File

@@ -1990,8 +1990,21 @@ class App:
proj = project_manager.load_project(self.active_project_path) proj = project_manager.load_project(self.active_project_path)
flat = project_manager.flat_config(proj) flat = project_manager.flat_config(proj)
file_items = aggregate.build_file_items(Path("."), flat.get("files", {}).get("paths", [])) file_items = aggregate.build_file_items(Path("."), flat.get("files", {}).get("paths", []))
_t1_baseline = len(ai_client.get_comms_log())
tracks = orchestrator_pm.generate_tracks(self.ui_epic_input, flat, file_items, history_summary=history) tracks = orchestrator_pm.generate_tracks(self.ui_epic_input, flat, file_items, history_summary=history)
_t1_new = ai_client.get_comms_log()[_t1_baseline:]
_t1_resp = [e for e in _t1_new if e.get("direction") == "IN" and e.get("kind") == "response"]
_t1_in = sum(e.get("payload", {}).get("usage", {}).get("input_tokens", 0) for e in _t1_resp)
_t1_out = sum(e.get("payload", {}).get("usage", {}).get("output_tokens", 0) for e in _t1_resp)
def _push_t1_usage(i, o):
self.mma_tier_usage["Tier 1"]["input"] += i
self.mma_tier_usage["Tier 1"]["output"] += o
with self._pending_gui_tasks_lock: with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({
"action": "custom_callback",
"callback": _push_t1_usage,
"args": [_t1_in, _t1_out]
})
self._pending_gui_tasks.append({ self._pending_gui_tasks.append({
"action": "handle_ai_response", "action": "handle_ai_response",
"payload": { "payload": {
@@ -2097,7 +2110,14 @@ class App:
skeletons = skeletons_str # Use provided skeletons skeletons = skeletons_str # Use provided skeletons
self.ai_status = "Phase 2: Calling Tech Lead..." self.ai_status = "Phase 2: Calling Tech Lead..."
_t2_baseline = len(ai_client.get_comms_log())
raw_tickets = conductor_tech_lead.generate_tickets(goal, skeletons) raw_tickets = conductor_tech_lead.generate_tickets(goal, skeletons)
_t2_new = ai_client.get_comms_log()[_t2_baseline:]
_t2_resp = [e for e in _t2_new if e.get("direction") == "IN" and e.get("kind") == "response"]
_t2_in = sum(e.get("payload", {}).get("usage", {}).get("input_tokens", 0) for e in _t2_resp)
_t2_out = sum(e.get("payload", {}).get("usage", {}).get("output_tokens", 0) for e in _t2_resp)
self.mma_tier_usage["Tier 2"]["input"] += _t2_in
self.mma_tier_usage["Tier 2"]["output"] += _t2_out
if not raw_tickets: if not raw_tickets:
self.ai_status = f"Error: No tickets generated for track: {title}" self.ai_status = f"Error: No tickets generated for track: {title}"
print(f"Warning: No tickets generated for track: {title}") print(f"Warning: No tickets generated for track: {title}")

View File

@@ -3,6 +3,7 @@ import json
import asyncio import asyncio
import threading import threading
import time import time
import traceback
from typing import List, Optional, Tuple from typing import List, Optional, Tuple
from dataclasses import asdict from dataclasses import asdict
import events import events
@@ -153,7 +154,7 @@ def confirm_execution(payload: str, event_queue: events.AsyncEventQueue, ticket_
if loop: if loop:
_queue_put(event_queue, loop, "mma_step_approval", task) _queue_put(event_queue, loop, "mma_step_approval", task)
else: else:
event_queue._queue.put_nowait(("mma_step_approval", task)) raise RuntimeError("loop is required for thread-safe event queue access")
# Wait for the GUI to create the dialog and for the user to respond # Wait for the GUI to create the dialog and for the user to respond
start = time.time() start = time.time()
while dialog_container[0] is None and time.time() - start < 60: while dialog_container[0] is None and time.time() - start < 60:
@@ -180,7 +181,7 @@ def confirm_spawn(role: str, prompt: str, context_md: str, event_queue: events.A
if loop: if loop:
_queue_put(event_queue, loop, "mma_spawn_approval", task) _queue_put(event_queue, loop, "mma_spawn_approval", task)
else: else:
event_queue._queue.put_nowait(("mma_spawn_approval", task)) raise RuntimeError("loop is required for thread-safe event queue access")
# Wait for the GUI to create the dialog and for the user to respond # Wait for the GUI to create the dialog and for the user to respond
start = time.time() start = time.time()
while dialog_container[0] is None and time.time() - start < 60: while dialog_container[0] is None and time.time() - start < 60:
@@ -267,6 +268,7 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
if not event_queue: if not event_queue:
return True return True
return confirm_execution(payload, event_queue, ticket.id, loop=loop) return confirm_execution(payload, event_queue, ticket.id, loop=loop)
comms_baseline = len(ai_client.get_comms_log())
response = ai_client.send( response = ai_client.send(
md_content=md_content, md_content=md_content,
user_message=user_message, user_message=user_message,
@@ -284,18 +286,22 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
"stream_id": f"Tier 3 (Worker): {ticket.id}", "stream_id": f"Tier 3 (Worker): {ticket.id}",
"status": "done" "status": "done"
} }
print(f"[MMA] Pushing Tier 3 response for {ticket.id}, loop={'present' if loop else 'NONE'}, stream_id={response_payload['stream_id']}")
if loop: if loop:
_queue_put(event_queue, loop, "response", response_payload) _queue_put(event_queue, loop, "response", response_payload)
else: else:
event_queue._queue.put_nowait(("response", response_payload)) raise RuntimeError("loop is required for thread-safe event queue access")
except Exception as e: except Exception as e:
print(f"Error pushing response to UI: {e}") print(f"[MMA] ERROR pushing response to UI: {e}\n{traceback.format_exc()}")
# Update usage in engine if provided # Update usage in engine if provided
if engine: if engine:
stats = {} # ai_client.get_token_stats() is not available _new_comms = ai_client.get_comms_log()[comms_baseline:]
engine.tier_usage["Tier 3"]["input"] += stats.get("prompt_tokens", 0) _resp_entries = [e for e in _new_comms if e.get("direction") == "IN" and e.get("kind") == "response"]
engine.tier_usage["Tier 3"]["output"] += stats.get("candidates_tokens", 0) _in_tokens = sum(e.get("payload", {}).get("usage", {}).get("input_tokens", 0) for e in _resp_entries)
_out_tokens = sum(e.get("payload", {}).get("usage", {}).get("output_tokens", 0) for e in _resp_entries)
engine.tier_usage["Tier 3"]["input"] += _in_tokens
engine.tier_usage["Tier 3"]["output"] += _out_tokens
if "BLOCKED" in response.upper(): if "BLOCKED" in response.upper():
ticket.mark_blocked(response) ticket.mark_blocked(response)
else: else:

View File

@@ -267,3 +267,57 @@ async def test_conductor_engine_dynamic_parsing_and_execution(monkeypatch: pytes
assert "T3" in calls assert "T3" in calls
vlogger.finalize("Dynamic track parsing and dependency execution", "PASS", "Dependency chain T1 -> T2 honored.") vlogger.finalize("Dynamic track parsing and dependency execution", "PASS", "Dependency chain T1 -> T2 honored.")
def test_run_worker_lifecycle_pushes_response_via_queue(monkeypatch: pytest.MonkeyPatch) -> None:
"""
Test that run_worker_lifecycle pushes a 'response' event with the correct stream_id
via _queue_put when event_queue and loop are provided.
"""
import asyncio
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[])
mock_event_queue = MagicMock()
mock_loop = MagicMock(spec=asyncio.AbstractEventLoop)
mock_send = MagicMock(return_value="Task complete.")
monkeypatch.setattr(ai_client, 'send', mock_send)
monkeypatch.setattr(ai_client, 'reset_session', MagicMock())
from multi_agent_conductor import run_worker_lifecycle
with patch("multi_agent_conductor.confirm_spawn") as mock_spawn, \
patch("multi_agent_conductor._queue_put") as mock_queue_put:
mock_spawn.return_value = (True, "prompt", "context")
run_worker_lifecycle(ticket, context, event_queue=mock_event_queue, loop=mock_loop)
mock_queue_put.assert_called_once()
call_args = mock_queue_put.call_args[0]
assert call_args[2] == "response"
assert call_args[3]["stream_id"] == "Tier 3 (Worker): T1"
assert call_args[3]["text"] == "Task complete."
assert call_args[3]["status"] == "done"
assert ticket.status == "completed"
def test_run_worker_lifecycle_token_usage_from_comms_log(monkeypatch: pytest.MonkeyPatch) -> None:
"""
Test that run_worker_lifecycle reads token usage from the comms log and
updates engine.tier_usage['Tier 3'] with real input/output token counts.
"""
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[])
fake_comms = [
{"direction": "OUT", "kind": "request", "payload": {"message": "hello"}},
{"direction": "IN", "kind": "response", "payload": {"usage": {"input_tokens": 120, "output_tokens": 45}}},
]
monkeypatch.setattr(ai_client, 'send', MagicMock(return_value="Done."))
monkeypatch.setattr(ai_client, 'reset_session', MagicMock())
monkeypatch.setattr(ai_client, 'get_comms_log', MagicMock(side_effect=[
[], # baseline call (before send)
fake_comms, # after-send call
]))
from multi_agent_conductor import run_worker_lifecycle, ConductorEngine
from models import Track
track = Track(id="test_track", description="Test")
engine = ConductorEngine(track=track, auto_queue=True)
with patch("multi_agent_conductor.confirm_spawn") as mock_spawn, \
patch("multi_agent_conductor._queue_put"):
mock_spawn.return_value = (True, "prompt", "ctx")
run_worker_lifecycle(ticket, context, event_queue=MagicMock(), loop=MagicMock(), engine=engine)
assert engine.tier_usage["Tier 3"]["input"] == 120
assert engine.tier_usage["Tier 3"]["output"] == 45

View File

@@ -3,180 +3,169 @@ import time
import sys import sys
import os import os
# Ensure project root is in path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from api_hook_client import ApiHookClient from api_hook_client import ApiHookClient
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _drain_approvals(client: ApiHookClient, status: dict) -> None:
"""Auto-approve any pending approval gate found in status."""
if status.get('pending_mma_spawn_approval'):
print('[SIM] Approving pending spawn...')
client.click('btn_approve_spawn')
time.sleep(0.5)
elif status.get('pending_mma_step_approval'):
print('[SIM] Approving pending MMA step...')
client.click('btn_approve_mma_step')
time.sleep(0.5)
elif status.get('pending_tool_approval'):
print('[SIM] Approving pending tool...')
client.click('btn_approve_tool')
time.sleep(0.5)
def _poll(client: ApiHookClient, timeout: int, condition, label: str) -> tuple[bool, dict]:
"""Poll get_mma_status() until condition(status) is True or timeout."""
status = {}
for i in range(timeout):
status = client.get_mma_status() or {}
print(f"[SIM][{label}] t={i}s ai_status={status.get('ai_status')} "
f"mma={status.get('mma_status')} "
f"streams={list(status.get('mma_streams', {}).keys())}")
_drain_approvals(client, status)
if condition(status):
return True, status
time.sleep(1)
return False, status
# ---------------------------------------------------------------------------
# Test
# ---------------------------------------------------------------------------
@pytest.mark.integration @pytest.mark.integration
def test_mma_complete_lifecycle(live_gui) -> None: def test_mma_complete_lifecycle(live_gui) -> None:
""" """
Tests the entire MMA lifecycle from epic planning to track loading and ticket verification End-to-end MMA lifecycle using real Gemini API (gemini-2.5-flash-lite).
in a single test case to avoid state dependency issues between separate test functions. Incorporates frame-sync sleeps and explicit state-transition waits per
simulation_hardening_20260301 spec (Issues 2 & 3).
""" """
client = ApiHookClient() client = ApiHookClient()
assert client.wait_for_server(timeout=10) assert client.wait_for_server(timeout=15), "Hook server did not start"
# 1. Set up the mock CLI provider # ------------------------------------------------------------------
try: # Stage 1: Provider setup
client.set_value('current_provider', 'gemini_cli') # ------------------------------------------------------------------
# Point the CLI adapter to our mock script client.set_value('current_provider', 'gemini')
mock_cli_path = f'{sys.executable} {os.path.abspath("tests/mock_gemini_cli.py")}' time.sleep(0.3)
client.set_value('gcli_path', mock_cli_path) client.set_value('current_model', 'gemini-2.5-flash-lite')
# Prevent polluting the real project directory with test tracks time.sleep(0.3)
client.set_value('files_base_dir', 'tests/artifacts/temp_workspace') client.set_value('files_base_dir', 'tests/artifacts/temp_workspace')
time.sleep(0.3)
client.click('btn_project_save') client.click('btn_project_save')
time.sleep(1) time.sleep(1.0) # one full second — let GUI process all set_value tasks
except Exception as e:
pytest.fail(f"Failed to set up mock provider: {e}")
# 2. Enter epic and click 'Plan Epic'. # ------------------------------------------------------------------
client.set_value('mma_epic_input', 'Develop a new feature') # Stage 2: Start epic planning
# ------------------------------------------------------------------
# Keep prompt short and simple so the model returns minimal JSON
client.set_value('mma_epic_input',
'Add a hello_world() function to utils.py')
time.sleep(0.3)
client.click('btn_mma_plan_epic') client.click('btn_mma_plan_epic')
time.sleep(0.5) # frame-sync after click
# 3. Wait for 'proposed_tracks'. # ------------------------------------------------------------------
proposed_tracks_found = False # Stage 3: Wait for proposed_tracks to appear (Tier 1 call)
for _ in range(60): # Poll for up to 60 seconds # ------------------------------------------------------------------
status = client.get_mma_status() ok, status = _poll(client, timeout=120, label="wait-proposed-tracks",
print(f"Polling status: {status}") condition=lambda s: bool(s.get('proposed_tracks')))
print(f"Polling ai_status: {status.get('ai_status', 'N/A')}") assert ok, (
if status and status.get('pending_mma_spawn_approval') is True: f"No proposed_tracks after 120s. "
print('[SIM] Worker spawn required. Clicking btn_approve_spawn...') f"ai_status={status.get('ai_status')} "
client.click('btn_approve_spawn') f"mma_streams={list(status.get('mma_streams', {}).keys())}"
elif status and status.get('pending_mma_step_approval') is True: )
print('[SIM] MMA step approval required. Clicking btn_approve_mma_step...') n_proposed = len(status['proposed_tracks'])
client.click('btn_approve_mma_step') print(f"[SIM] Got {n_proposed} proposed track(s): "
elif status and status.get('pending_tool_approval') is True: f"{[t.get('title', t.get('id')) for t in status['proposed_tracks']]}")
print('[SIM] Tool approval required. Clicking btn_approve_tool...')
client.click('btn_approve_tool')
if status and status.get('proposed_tracks') and len(status['proposed_tracks']) > 0:
proposed_tracks_found = True
break
time.sleep(1)
assert proposed_tracks_found, "Failed to find proposed tracks after planning epic."
# 4. Click 'Accept' to start tracks. # ------------------------------------------------------------------
# Stage 4: Accept tracks (triggers Tier 2 calls + engine.run)
# ------------------------------------------------------------------
client.click('btn_mma_accept_tracks') client.click('btn_mma_accept_tracks')
time.sleep(2) time.sleep(1.5) # frame-sync: let _cb_accept_tracks run one frame + bg thread start
# 5. Wait for 'tracks' list to populate with our mock tracks. # ------------------------------------------------------------------
tracks_populated = False # Stage 5: Wait for tracks to be written to filesystem + refreshed
for _ in range(30): # Poll for up to 30 seconds # ------------------------------------------------------------------
status = client.get_mma_status() ok, status = _poll(client, timeout=90, label="wait-tracks-populated",
if status and status.get('pending_mma_spawn_approval') is True: condition=lambda s: bool(s.get('tracks')))
client.click('btn_approve_spawn') assert ok, (
elif status and status.get('pending_mma_step_approval') is True: f"No tracks appeared after 90s. "
client.click('btn_approve_mma_step') f"ai_status={status.get('ai_status')}"
elif status and status.get('pending_tool_approval') is True: )
client.click('btn_approve_tool') tracks_list = status['tracks']
print(f"[SIM] Tracks in project: {[t.get('title', t.get('id')) for t in tracks_list]}")
tracks = status.get('tracks', []) # ------------------------------------------------------------------
if any('Mock Goal 1' in t.get('title', '') for t in tracks): # Stage 6: Load first track, verify active_tickets populate
tracks_populated = True # ------------------------------------------------------------------
break track_id = tracks_list[0]['id']
time.sleep(1) print(f"[SIM] Loading track: {track_id}")
assert tracks_populated, "Failed to find 'Mock Goal 1' in tracks list after acceptance." client.click('btn_mma_load_track', user_data=track_id)
time.sleep(1.0) # frame-sync after load click
# 6. Verify that one of the new tracks can be loaded and its tickets appear in 'active_tickets'. def _track_loaded(s):
status_after_tracks = client.get_mma_status() at = s.get('active_track')
assert status_after_tracks is not None, "Failed to get MMA status after tracks populated." at_id = at.get('id') if isinstance(at, dict) else at
tracks_list = status_after_tracks.get('tracks') return at_id == track_id and bool(s.get('active_tickets'))
assert tracks_list is not None and len(tracks_list) > 0, "Tracks list is empty or not found."
track_id_to_load = None ok, status = _poll(client, timeout=60, label="wait-track-loaded",
for track in tracks_list: condition=_track_loaded)
if 'Mock Goal 1' in track.get('title', ''): assert ok, (
track_id_to_load = track['id'] f"Track {track_id} did not load with tickets after 60s. "
break f"active_track={status.get('active_track')}"
assert track_id_to_load is not None, "Could not find a track with 'Mock Goal 1' in its title." )
print(f"Attempting to load track with ID: {track_id_to_load}") print(f"[SIM] Track loaded with {len(status.get('active_tickets', []))} ticket(s).")
# Load the first track # ------------------------------------------------------------------
client.click('btn_mma_load_track', user_data=track_id_to_load) # Stage 7: Wait for engine to reach running/done
# ------------------------------------------------------------------
def _mma_active(s):
return s.get('mma_status') in ('running', 'done')
# Poll until 'active_track' is not None and 'active_tickets' are present ok, status = _poll(client, timeout=120, label="wait-mma-running",
active_track_and_tickets_found = False condition=_mma_active)
for _ in range(60): # Poll for up to 60 seconds assert ok, (
status = client.get_mma_status() f"MMA never reached running/done after 120s. "
print(f"Polling load status: {status}") f"mma_status={status.get('mma_status')}"
if status and status.get('pending_mma_spawn_approval') is True: )
print('[SIM] Worker spawn required. Clicking btn_approve_spawn...') print(f"[SIM] MMA status: {status.get('mma_status')}")
client.click('btn_approve_spawn')
elif status and status.get('pending_mma_step_approval') is True:
print('[SIM] MMA step approval required. Clicking btn_approve_mma_step...')
client.click('btn_approve_mma_step')
elif status and status.get('pending_tool_approval') is True:
print('[SIM] Tool approval required. Clicking btn_approve_tool...')
client.click('btn_approve_tool')
# Updated condition to correctly check active_track ID or value # ------------------------------------------------------------------
active_track = status.get('active_track') # Stage 8: Verify Tier 3 output appears in mma_streams
if status and ( (isinstance(active_track, dict) and active_track.get('id') == track_id_to_load) or (active_track == track_id_to_load) ) and \ # ------------------------------------------------------------------
'active_tickets' in status and len(status['active_tickets']) > 0: def _tier3_in_streams(s):
active_track_and_tickets_found = True streams = s.get('mma_streams', {})
break tier3_keys = [k for k in streams if 'Tier 3' in k]
time.sleep(1) if not tier3_keys:
assert active_track_and_tickets_found, f"Timed out waiting for track {track_id_to_load} to load and populate active tickets." return False
return bool(streams[tier3_keys[0]].strip())
print(f"Successfully loaded and verified track ID: {track_id_to_load} with active tickets.") ok, status = _poll(client, timeout=120, label="wait-tier3-streams",
condition=_tier3_in_streams)
# 7. Poll for MMA status 'running' or 'done' (already started by Accept Tracks).
mma_running = False
for _ in range(120): # Poll for up to 120 seconds
status = client.get_mma_status()
print(f"Polling MMA status for 'running': {status.get('mma_status')}")
# Handle pending states during the run
if status and status.get('pending_mma_spawn_approval') is True:
print('[SIM] Worker spawn required. Clicking btn_approve_spawn...')
client.click('btn_approve_spawn')
elif status and status.get('pending_mma_step_approval') is True:
print('[SIM] MMA step approval required. Clicking btn_approve_mma_step...')
client.click('btn_approve_mma_step')
elif status and status.get('pending_tool_approval') is True:
print('[SIM] Tool approval required. Clicking btn_approve_tool...')
client.click('btn_approve_tool')
# Check if MMA is running
if status and status.get('mma_status') == 'running':
mma_running = True
break
# Also check if it's already finished or error
if status and status.get('mma_status') in ['done', 'error']:
break
time.sleep(1)
assert mma_running or (status and status.get('mma_status') == 'done'), f"Timed out waiting for MMA status to become 'running' for track {track_id_to_load}."
print(f"MMA status is: {status.get('mma_status')}")
# 8. Verify 'active_tier' change and output in 'mma_streams'.
streams_found = False
for _ in range(60): # Give it more time for the worker to spawn and respond
status = client.get_mma_status()
# Handle approvals if they pop up during worker execution
if status and status.get('pending_mma_spawn_approval') is True:
print('[SIM] Worker spawn required. Clicking btn_approve_spawn...')
client.click('btn_approve_spawn')
elif status and status.get('pending_mma_step_approval') is True:
print('[SIM] MMA step approval required. Clicking btn_approve_mma_step...')
client.click('btn_approve_mma_step')
elif status and status.get('pending_tool_approval') is True:
print('[SIM] Tool approval required. Clicking btn_approve_tool...')
client.click('btn_approve_tool')
streams = status.get('mma_streams', {}) streams = status.get('mma_streams', {})
print(f"Polling streams: {list(streams.keys())}") tier3_keys = [k for k in streams if 'Tier 3' in k]
assert ok, (
f"No non-empty Tier 3 output in mma_streams after 120s. "
f"streams keys={list(streams.keys())} "
f"mma_status={status.get('mma_status')}"
)
if streams and any("Tier 3" in k for k in streams.keys()): tier3_content = streams[tier3_keys[0]]
print(f"[SIM] Found Tier 3 worker output in streams: {list(streams.keys())}") print(f"[SIM] Tier 3 output ({len(tier3_content)} chars): {tier3_content[:100]}...")
# Check for our specific mock content print("[SIM] MMA complete lifecycle simulation PASSED.")
tier3_key = [k for k in streams.keys() if "Tier 3" in k][0]
if "SUCCESS: Mock Tier 3 worker" in streams[tier3_key]:
print("[SIM] Verified mock worker output content.")
streams_found = True
break
time.sleep(1)
assert streams_found, "No Tier 3 mock output found in 'mma_streams'."
print("MMA complete lifecycle simulation successful.")