fix(mma): Add diagnostic logging and remove unsafe asyncio.Queue else branches

Tasks 1.1, 1.2, 1.3 of mma_pipeline_fix_20260301:
- Task 1.1: Add [MMA] diagnostic print before _queue_put in run_worker_lifecycle; enhance except to include traceback
- Task 1.2: Replace unsafe event_queue._queue.put_nowait() else branches with RuntimeError in run_worker_lifecycle, confirm_execution, confirm_spawn
- Task 1.3: Verified run_in_executor positional arg order is correct (no change needed)
This commit is contained in:
2026-03-01 13:17:37 -05:00
parent cf7938a843
commit b7c283972c

View File

@@ -3,6 +3,7 @@ import json
import asyncio import asyncio
import threading import threading
import time import time
import traceback
from typing import List, Optional, Tuple from typing import List, Optional, Tuple
from dataclasses import asdict from dataclasses import asdict
import events import events
@@ -153,7 +154,7 @@ def confirm_execution(payload: str, event_queue: events.AsyncEventQueue, ticket_
if loop: if loop:
_queue_put(event_queue, loop, "mma_step_approval", task) _queue_put(event_queue, loop, "mma_step_approval", task)
else: else:
event_queue._queue.put_nowait(("mma_step_approval", task)) raise RuntimeError("loop is required for thread-safe event queue access")
# Wait for the GUI to create the dialog and for the user to respond # Wait for the GUI to create the dialog and for the user to respond
start = time.time() start = time.time()
while dialog_container[0] is None and time.time() - start < 60: while dialog_container[0] is None and time.time() - start < 60:
@@ -180,7 +181,7 @@ def confirm_spawn(role: str, prompt: str, context_md: str, event_queue: events.A
if loop: if loop:
_queue_put(event_queue, loop, "mma_spawn_approval", task) _queue_put(event_queue, loop, "mma_spawn_approval", task)
else: else:
event_queue._queue.put_nowait(("mma_spawn_approval", task)) raise RuntimeError("loop is required for thread-safe event queue access")
# Wait for the GUI to create the dialog and for the user to respond # Wait for the GUI to create the dialog and for the user to respond
start = time.time() start = time.time()
while dialog_container[0] is None and time.time() - start < 60: while dialog_container[0] is None and time.time() - start < 60:
@@ -284,12 +285,13 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
"stream_id": f"Tier 3 (Worker): {ticket.id}", "stream_id": f"Tier 3 (Worker): {ticket.id}",
"status": "done" "status": "done"
} }
print(f"[MMA] Pushing Tier 3 response for {ticket.id}, loop={'present' if loop else 'NONE'}, stream_id={response_payload['stream_id']}")
if loop: if loop:
_queue_put(event_queue, loop, "response", response_payload) _queue_put(event_queue, loop, "response", response_payload)
else: else:
event_queue._queue.put_nowait(("response", response_payload)) raise RuntimeError("loop is required for thread-safe event queue access")
except Exception as e: except Exception as e:
print(f"Error pushing response to UI: {e}") print(f"[MMA] ERROR pushing response to UI: {e}\n{traceback.format_exc()}")
# Update usage in engine if provided # Update usage in engine if provided
if engine: if engine: