Files
manual_slop/conductor/tracks/mma_pipeline_fix_20260301/plan.md

3.2 KiB

Implementation Plan: MMA Pipeline Fix & Worker Stream Verification

Phase 1: Diagnose & Fix Worker Stream Pipeline

  • Task 1.1: Add diagnostic logging to run_worker_lifecycle (multi_agent_conductor.py:280-290). Before the _queue_put call, add print(f"[MMA] Pushing Tier 3 response for {ticket.id}, loop={'present' if loop else 'NONE'}, stream_id={response_payload['stream_id']}"). Also add a print inside the except Exception as e block that currently silently swallows errors. This will reveal whether (a) the function reaches the push point, (b) loop is passed correctly, (c) any exceptions are being swallowed. b7c2839
  • Task 1.2: Remove the unsafe else branch in run_worker_lifecycle (multi_agent_conductor.py:289-290) that calls event_queue._queue.put_nowait(). asyncio.Queue is NOT thread-safe from non-event-loop threads. The else branch should either raise an error (raise RuntimeError("loop is required for thread-safe event queue access")) or use a fallback that IS thread-safe. Same fix needed in confirm_execution (line 156) and confirm_spawn (line 183). b7c2839
  • Task 1.3: Verify the run_in_executor positional argument order at multi_agent_conductor.py:118-127 matches run_worker_lifecycle's signature exactly: (ticket, context, context_files, event_queue, engine, md_content, loop). The signature at line 207 is: (ticket, context, context_files=None, event_queue=None, engine=None, md_content="", loop=None). Positional args must be in this exact order. If any are swapped, fix the call site. VERIFIED CORRECT — no code change needed. b7c2839
  • [~] Task 1.4: Write a unit test that creates a mock AsyncEventQueue and asyncio.AbstractEventLoop, calls run_worker_lifecycle with a mock ai_client.send (returning a fixed string), and verifies the ("response", {...}) event was pushed with the correct stream_id format "Tier 3 (Worker): {ticket.id}".

Phase 2: Fix Token Usage Tracking

  • Task 2.1: In run_worker_lifecycle (multi_agent_conductor.py:295-298), the stats = {} stub produces zero token counts. Replace with stats = ai_client.get_history_bleed_stats() which returns a dict containing "total_input_tokens" and "total_output_tokens" (see ai_client.py:1657-1796). Extract the relevant fields and update engine.tier_usage["Tier 3"]. If get_history_bleed_stats is too heavy, use the simpler approach: after ai_client.send(), read the last comms log entry from ai_client.get_comms_log()[-1] which contains payload.usage with token counts.
  • Task 2.2: Similarly fix Tier 1 and Tier 2 token tracking. In _cb_plan_epic (gui_2.py:1985-2010) and wherever Tier 2 calls happen, ensure mma_tier_usage is updated with actual token counts from comms log entries.

Phase 3: End-to-End Verification

  • Task 3.1: Update tests/visual_sim_mma_v2.py Stage 8 to assert that mma_streams contains a key matching "Tier 3" with non-empty content after a full mock MMA run. If this already passes with the fixes from Phase 1, mark as verified. If not, trace the specific failure point using the diagnostic logging from Task 1.1.
  • Task 3.2: Conductor - User Manual Verification 'Phase 3: End-to-End Verification' (Protocol in workflow.md)