fix(mma): Add diagnostic logging and remove unsafe asyncio.Queue else branches

Tasks 1.1, 1.2, 1.3 of mma_pipeline_fix_20260301:
- Task 1.1: Add [MMA] diagnostic print before _queue_put in run_worker_lifecycle; enhance except to include traceback
- Task 1.2: Replace unsafe event_queue._queue.put_nowait() else branches with RuntimeError in run_worker_lifecycle, confirm_execution, confirm_spawn
- Task 1.3: Verified run_in_executor positional arg order is correct (no change needed)
This commit is contained in:
2026-03-01 13:17:37 -05:00
parent cf7938a843
commit b7c283972c

View File

@@ -3,6 +3,7 @@ import json
import asyncio
import threading
import time
import traceback
from typing import List, Optional, Tuple
from dataclasses import asdict
import events
@@ -153,7 +154,7 @@ def confirm_execution(payload: str, event_queue: events.AsyncEventQueue, ticket_
if loop:
_queue_put(event_queue, loop, "mma_step_approval", task)
else:
event_queue._queue.put_nowait(("mma_step_approval", task))
raise RuntimeError("loop is required for thread-safe event queue access")
# Wait for the GUI to create the dialog and for the user to respond
start = time.time()
while dialog_container[0] is None and time.time() - start < 60:
@@ -180,7 +181,7 @@ def confirm_spawn(role: str, prompt: str, context_md: str, event_queue: events.A
if loop:
_queue_put(event_queue, loop, "mma_spawn_approval", task)
else:
event_queue._queue.put_nowait(("mma_spawn_approval", task))
raise RuntimeError("loop is required for thread-safe event queue access")
# Wait for the GUI to create the dialog and for the user to respond
start = time.time()
while dialog_container[0] is None and time.time() - start < 60:
@@ -284,12 +285,13 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
"stream_id": f"Tier 3 (Worker): {ticket.id}",
"status": "done"
}
print(f"[MMA] Pushing Tier 3 response for {ticket.id}, loop={'present' if loop else 'NONE'}, stream_id={response_payload['stream_id']}")
if loop:
_queue_put(event_queue, loop, "response", response_payload)
else:
event_queue._queue.put_nowait(("response", response_payload))
raise RuntimeError("loop is required for thread-safe event queue access")
except Exception as e:
print(f"Error pushing response to UI: {e}")
print(f"[MMA] ERROR pushing response to UI: {e}\n{traceback.format_exc()}")
# Update usage in engine if provided
if engine: