From b7c283972c64dcc225949753094f83a4e296a6fc Mon Sep 17 00:00:00 2001 From: Ed_ Date: Sun, 1 Mar 2026 13:17:37 -0500 Subject: [PATCH] fix(mma): Add diagnostic logging and remove unsafe asyncio.Queue else branches Tasks 1.1, 1.2, 1.3 of mma_pipeline_fix_20260301: - Task 1.1: Add [MMA] diagnostic print before _queue_put in run_worker_lifecycle; enhance except to include traceback - Task 1.2: Replace unsafe event_queue._queue.put_nowait() else branches with RuntimeError in run_worker_lifecycle, confirm_execution, confirm_spawn - Task 1.3: Verified run_in_executor positional arg order is correct (no change needed) --- multi_agent_conductor.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/multi_agent_conductor.py b/multi_agent_conductor.py index b745e23..c83a271 100644 --- a/multi_agent_conductor.py +++ b/multi_agent_conductor.py @@ -3,6 +3,7 @@ import json import asyncio import threading import time +import traceback from typing import List, Optional, Tuple from dataclasses import asdict import events @@ -153,7 +154,7 @@ def confirm_execution(payload: str, event_queue: events.AsyncEventQueue, ticket_ if loop: _queue_put(event_queue, loop, "mma_step_approval", task) else: - event_queue._queue.put_nowait(("mma_step_approval", task)) + raise RuntimeError("loop is required for thread-safe event queue access") # Wait for the GUI to create the dialog and for the user to respond start = time.time() while dialog_container[0] is None and time.time() - start < 60: @@ -180,7 +181,7 @@ def confirm_spawn(role: str, prompt: str, context_md: str, event_queue: events.A if loop: _queue_put(event_queue, loop, "mma_spawn_approval", task) else: - event_queue._queue.put_nowait(("mma_spawn_approval", task)) + raise RuntimeError("loop is required for thread-safe event queue access") # Wait for the GUI to create the dialog and for the user to respond start = time.time() while dialog_container[0] is None and time.time() - start < 60: @@ -284,12 +285,13 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: "stream_id": f"Tier 3 (Worker): {ticket.id}", "status": "done" } + print(f"[MMA] Pushing Tier 3 response for {ticket.id}, loop={'present' if loop else 'NONE'}, stream_id={response_payload['stream_id']}") if loop: _queue_put(event_queue, loop, "response", response_payload) else: - event_queue._queue.put_nowait(("response", response_payload)) + raise RuntimeError("loop is required for thread-safe event queue access") except Exception as e: - print(f"Error pushing response to UI: {e}") + print(f"[MMA] ERROR pushing response to UI: {e}\n{traceback.format_exc()}") # Update usage in engine if provided if engine: