fix(mma): Unblock visual simulation - event routing, loop passing, adapter preservation

Three independent root causes fixed:
- gui_2.py: Route mma_spawn_approval/mma_step_approval events in _process_event_queue
- multi_agent_conductor.py: Pass asyncio loop from ConductorEngine.run() through to
  thread-pool workers for thread-safe event queue access; add _queue_put helper
- ai_client.py: Preserve GeminiCliAdapter in reset_session() instead of nulling it

Test: visual_sim_mma_v2::test_mma_complete_lifecycle passes in ~8s

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-01 08:32:31 -05:00
parent db32a874fd
commit da21ed543d
11 changed files with 144 additions and 122 deletions

View File

@@ -80,6 +80,7 @@ class ConductorEngine:
md_content: The full markdown context (history + files) for AI workers.
"""
await self._push_state(status="running", active_tier="Tier 2 (Tech Lead)")
loop = asyncio.get_event_loop()
while True:
# 1. Identify ready tasks
ready_tasks = self.engine.tick()
@@ -99,7 +100,6 @@ class ConductorEngine:
await self._push_state(status="blocked", active_tier=None)
break
# 3. Process ready tasks
loop = asyncio.get_event_loop()
for ticket in ready_tasks:
# If auto_queue is on and step_mode is off, engine.tick() already marked it 'in_progress'
# but we need to verify and handle the lifecycle.
@@ -108,38 +108,41 @@ class ConductorEngine:
print(f"Executing ticket {ticket.id}: {ticket.description}")
await self._push_state(active_tier=f"Tier 3 (Worker): {ticket.id}")
context = WorkerContext(
ticket_id=ticket.id,
model_name="gemini-2.5-flash-lite",
ticket_id=ticket.id,
model_name="gemini-2.5-flash-lite",
messages=[]
)
# Offload the blocking lifecycle call to a thread to avoid blocking the async event loop.
# We pass the md_content so the worker has full context.
context_files = ticket.context_requirements if ticket.context_requirements else None
await loop.run_in_executor(
None,
run_worker_lifecycle,
ticket,
context,
context_files,
self.event_queue,
None,
run_worker_lifecycle,
ticket,
context,
context_files,
self.event_queue,
self,
md_content
md_content,
loop
)
await self._push_state(active_tier="Tier 2 (Tech Lead)")
elif ticket.status == "todo" and (ticket.step_mode or not self.engine.auto_queue):
# Task is ready but needs approval
print(f"Ticket {ticket.id} is ready and awaiting approval.")
await self._push_state(active_tier=f"Awaiting Approval: {ticket.id}")
# In a real UI, this would wait for a user event.
# In a real UI, this would wait for a user event.
# For now, we'll treat it as a pause point if not auto-queued.
pass
await asyncio.sleep(1)
def confirm_execution(payload: str, event_queue: events.AsyncEventQueue, ticket_id: str) -> bool:
def _queue_put(event_queue: events.AsyncEventQueue, loop: asyncio.AbstractEventLoop, event_name: str, payload) -> None:
"""Thread-safe helper to push an event to the AsyncEventQueue from a worker thread."""
asyncio.run_coroutine_threadsafe(event_queue.put(event_name, payload), loop)
def confirm_execution(payload: str, event_queue: events.AsyncEventQueue, ticket_id: str, loop: asyncio.AbstractEventLoop = None) -> bool:
"""
Pushes an approval request to the GUI and waits for response.
"""
# We use a list container so the GUI can inject the actual Dialog object back to us
# since the dialog is created in the GUI thread.
dialog_container = [None]
task = {
"action": "mma_step_approval",
@@ -147,15 +150,9 @@ def confirm_execution(payload: str, event_queue: events.AsyncEventQueue, ticket_
"payload": payload,
"dialog_container": dialog_container
}
# Push to queue
try:
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.run_coroutine_threadsafe(event_queue.put("mma_step_approval", task), loop)
else:
event_queue._queue.put_nowait(("mma_step_approval", task))
except Exception:
# Fallback if no loop
if loop:
_queue_put(event_queue, loop, "mma_step_approval", task)
else:
event_queue._queue.put_nowait(("mma_step_approval", task))
# Wait for the GUI to create the dialog and for the user to respond
start = time.time()
@@ -166,7 +163,7 @@ def confirm_execution(payload: str, event_queue: events.AsyncEventQueue, ticket_
return approved
return False
def confirm_spawn(role: str, prompt: str, context_md: str, event_queue: events.AsyncEventQueue, ticket_id: str) -> Tuple[bool, str, str]:
def confirm_spawn(role: str, prompt: str, context_md: str, event_queue: events.AsyncEventQueue, ticket_id: str, loop: asyncio.AbstractEventLoop = None) -> Tuple[bool, str, str]:
"""
Pushes a spawn approval request to the GUI and waits for response.
Returns (approved, modified_prompt, modified_context)
@@ -180,15 +177,9 @@ def confirm_spawn(role: str, prompt: str, context_md: str, event_queue: events.A
"context_md": context_md,
"dialog_container": dialog_container
}
# Push to queue
try:
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.run_coroutine_threadsafe(event_queue.put("mma_spawn_approval", task), loop)
else:
event_queue._queue.put_nowait(("mma_spawn_approval", task))
except Exception:
# Fallback if no loop
if loop:
_queue_put(event_queue, loop, "mma_spawn_approval", task)
else:
event_queue._queue.put_nowait(("mma_spawn_approval", task))
# Wait for the GUI to create the dialog and for the user to respond
start = time.time()
@@ -213,7 +204,7 @@ def confirm_spawn(role: str, prompt: str, context_md: str, event_queue: events.A
return approved, modified_prompt, modified_context
return False, prompt, context_md
def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: List[str] | None = None, event_queue: events.AsyncEventQueue | None = None, engine: Optional['ConductorEngine'] = None, md_content: str = "") -> None:
def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: List[str] | None = None, event_queue: events.AsyncEventQueue | None = None, engine: Optional['ConductorEngine'] = None, md_content: str = "", loop: asyncio.AbstractEventLoop = None) -> None:
"""
Simulates the lifecycle of a single agent working on a ticket.
Calls the AI client and updates the ticket status based on the response.
@@ -224,6 +215,7 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
event_queue: Queue for pushing state updates and receiving approvals.
engine: The conductor engine.
md_content: The markdown context (history + files) for AI workers.
loop: The main asyncio event loop (required for thread-safe queue access).
"""
# Enforce Context Amnesia: each ticket starts with a clean slate.
ai_client.reset_session()
@@ -261,7 +253,8 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
prompt=user_message,
context_md=md_content,
event_queue=event_queue,
ticket_id=ticket.id
ticket_id=ticket.id,
loop=loop
)
if not approved:
ticket.mark_blocked("Spawn rejected by user.")
@@ -273,14 +266,31 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
def clutch_callback(payload: str) -> bool:
if not event_queue:
return True
return confirm_execution(payload, event_queue, ticket.id)
return confirm_execution(payload, event_queue, ticket.id, loop=loop)
response = ai_client.send(
md_content=md_content,
md_content=md_content,
user_message=user_message,
base_dir=".",
pre_tool_callback=clutch_callback if ticket.step_mode else None,
qa_callback=ai_client.run_tier4_analysis
)
if event_queue:
# Push via "response" event type — _process_event_queue wraps this
# as {"action": "handle_ai_response", "payload": ...} for the GUI.
try:
response_payload = {
"text": response,
"stream_id": f"Tier 3 (Worker): {ticket.id}",
"status": "done"
}
if loop:
_queue_put(event_queue, loop, "response", response_payload)
else:
event_queue._queue.put_nowait(("response", response_payload))
except Exception as e:
print(f"Error pushing response to UI: {e}")
# Update usage in engine if provided
if engine:
stats = {} # ai_client.get_token_stats() is not available