# Implementation Plan: MMA Pipeline Fix & Worker Stream Verification ## Phase 1: Diagnose & Fix Worker Stream Pipeline - [x] Task 1.1: Add diagnostic logging to `run_worker_lifecycle` (multi_agent_conductor.py:280-290). Before the `_queue_put` call, add `print(f"[MMA] Pushing Tier 3 response for {ticket.id}, loop={'present' if loop else 'NONE'}, stream_id={response_payload['stream_id']}")`. Also add a `print` inside the `except Exception as e` block that currently silently swallows errors. This will reveal whether (a) the function reaches the push point, (b) `loop` is passed correctly, (c) any exceptions are being swallowed. b7c2839 - [x] Task 1.2: Remove the unsafe `else` branch in `run_worker_lifecycle` (multi_agent_conductor.py:289-290) that calls `event_queue._queue.put_nowait()`. `asyncio.Queue` is NOT thread-safe from non-event-loop threads. The `else` branch should either raise an error (`raise RuntimeError("loop is required for thread-safe event queue access")`) or use a fallback that IS thread-safe. Same fix needed in `confirm_execution` (line 156) and `confirm_spawn` (line 183). b7c2839 - [x] Task 1.3: Verify the `run_in_executor` positional argument order at `multi_agent_conductor.py:118-127` matches `run_worker_lifecycle`'s signature exactly: `(ticket, context, context_files, event_queue, engine, md_content, loop)`. The signature at line 207 is: `(ticket, context, context_files=None, event_queue=None, engine=None, md_content="", loop=None)`. Positional args must be in this exact order. If any are swapped, fix the call site. VERIFIED CORRECT — no code change needed. b7c2839 - [x] Task 1.4: Write a unit test that creates a mock `AsyncEventQueue` and `asyncio.AbstractEventLoop`, calls `run_worker_lifecycle` with a mock `ai_client.send` (returning a fixed string), and verifies the `("response", {...})` event was pushed with the correct `stream_id` format `"Tier 3 (Worker): {ticket.id}"`. c5695c6 ## Phase 2: Fix Token Usage Tracking - [x] Task 2.1: In `run_worker_lifecycle` (multi_agent_conductor.py:295-298), the `stats = {}` stub produces zero token counts. Replace with `stats = ai_client.get_history_bleed_stats()` which returns a dict containing `"total_input_tokens"` and `"total_output_tokens"` (see ai_client.py:1657-1796). Extract the relevant fields and update `engine.tier_usage["Tier 3"]`. If `get_history_bleed_stats` is too heavy, use the simpler approach: after `ai_client.send()`, read the last comms log entry from `ai_client.get_comms_log()[-1]` which contains `payload.usage` with token counts. Used comms-log baseline approach. 3eefdfd - [x] Task 2.2: Similarly fix Tier 1 and Tier 2 token tracking. In `_cb_plan_epic` (gui_2.py:1985-2010) and wherever Tier 2 calls happen, ensure `mma_tier_usage` is updated with actual token counts from comms log entries. a2097f1 ## Phase 3: End-to-End Verification - [x] Task 3.1: Update `tests/visual_sim_mma_v2.py` Stage 8 to assert that `mma_streams` contains a key matching `"Tier 3"` with non-empty content after a full mock MMA run. Rewrote test for real Gemini API (CLI quota exhausted) with _poll/_drain_approvals helpers, frame-sync sleeps, 120s timeouts. Addresses simulation_hardening Issues 2 & 3. 89a8d9b - [x] Task 3.2: Fix Tier 1 tool-use bug (enable_tools=False in generate_tracks), rerun sim test — PASSED in 11s. ce5b6d2