# Implementation Plan: MMA Pipeline Fix & Worker Stream Verification ## Phase 1: Diagnose & Fix Worker Stream Pipeline - [ ] Task 1.1: Add diagnostic logging to `run_worker_lifecycle` (multi_agent_conductor.py:280-290). Before the `_queue_put` call, add `print(f"[MMA] Pushing Tier 3 response for {ticket.id}, loop={'present' if loop else 'NONE'}, stream_id={response_payload['stream_id']}")`. Also add a `print` inside the `except Exception as e` block that currently silently swallows errors. This will reveal whether (a) the function reaches the push point, (b) `loop` is passed correctly, (c) any exceptions are being swallowed. - [ ] Task 1.2: Remove the unsafe `else` branch in `run_worker_lifecycle` (multi_agent_conductor.py:289-290) that calls `event_queue._queue.put_nowait()`. `asyncio.Queue` is NOT thread-safe from non-event-loop threads. The `else` branch should either raise an error (`raise RuntimeError("loop is required for thread-safe event queue access")`) or use a fallback that IS thread-safe. Same fix needed in `confirm_execution` (line 156) and `confirm_spawn` (line 183). - [ ] Task 1.3: Verify the `run_in_executor` positional argument order at `multi_agent_conductor.py:118-127` matches `run_worker_lifecycle`'s signature exactly: `(ticket, context, context_files, event_queue, engine, md_content, loop)`. The signature at line 207 is: `(ticket, context, context_files=None, event_queue=None, engine=None, md_content="", loop=None)`. Positional args must be in this exact order. If any are swapped, fix the call site. - [ ] Task 1.4: Write a unit test that creates a mock `AsyncEventQueue` and `asyncio.AbstractEventLoop`, calls `run_worker_lifecycle` with a mock `ai_client.send` (returning a fixed string), and verifies the `("response", {...})` event was pushed with the correct `stream_id` format `"Tier 3 (Worker): {ticket.id}"`. ## Phase 2: Fix Token Usage Tracking - [ ] Task 2.1: In `run_worker_lifecycle` (multi_agent_conductor.py:295-298), the `stats = {}` stub produces zero token counts. Replace with `stats = ai_client.get_history_bleed_stats()` which returns a dict containing `"total_input_tokens"` and `"total_output_tokens"` (see ai_client.py:1657-1796). Extract the relevant fields and update `engine.tier_usage["Tier 3"]`. If `get_history_bleed_stats` is too heavy, use the simpler approach: after `ai_client.send()`, read the last comms log entry from `ai_client.get_comms_log()[-1]` which contains `payload.usage` with token counts. - [ ] Task 2.2: Similarly fix Tier 1 and Tier 2 token tracking. In `_cb_plan_epic` (gui_2.py:1985-2010) and wherever Tier 2 calls happen, ensure `mma_tier_usage` is updated with actual token counts from comms log entries. ## Phase 3: End-to-End Verification - [ ] Task 3.1: Update `tests/visual_sim_mma_v2.py` Stage 8 to assert that `mma_streams` contains a key matching `"Tier 3"` with non-empty content after a full mock MMA run. If this already passes with the fixes from Phase 1, mark as verified. If not, trace the specific failure point using the diagnostic logging from Task 1.1. - [ ] Task 3.2: Conductor - User Manual Verification 'Phase 3: End-to-End Verification' (Protocol in workflow.md)