diff --git a/conductor/tracks.md b/conductor/tracks.md index c719cd0..39fbffb 100644 --- a/conductor/tracks.md +++ b/conductor/tracks.md @@ -30,7 +30,7 @@ This file tracks all major tracks for the project. Each track has its own detail --- -- [~] **Track: MMA Core Engine Implementation** +- [x] **Track: MMA Core Engine Implementation** *Link: [./tracks/mma_core_engine_20260224/](./tracks/mma_core_engine_20260224/)* --- diff --git a/conductor/tracks/mma_core_engine_20260224/plan.md b/conductor/tracks/mma_core_engine_20260224/plan.md index 6fddcba..57c7579 100644 --- a/conductor/tracks/mma_core_engine_20260224/plan.md +++ b/conductor/tracks/mma_core_engine_20260224/plan.md @@ -59,14 +59,14 @@ - [x] Verify that no regressions were introduced in existing functionality. ## Phase 7: MMA Observability & UX -- [~] Task: MMA Dashboard Implementation - - [ ] Create a new dockable panel in `gui_2.py` for "MMA Dashboard". - - [ ] Display active `Track` and `Ticket` queue status. -- [ ] Task: Execution Clutch UI - - [ ] Implement Step Mode toggle and Pause Points logic in the GUI. - - [ ] Add `[Approve]`, `[Edit Payload]`, and `[Abort]` buttons for tool execution. -- [ ] Task: Memory Mutator Modal - - [ ] Create a modal for editing raw JSON conversation history of paused workers. -- [ ] Task: Tiered Metrics & Log Links - - [ ] Add visual indicators for the active model Tier. - - [ ] Provide clickable links to sub-agent logs. \ No newline at end of file +- [x] Task: MMA Dashboard Implementation + - [x] Create a new dockable panel in `gui_2.py` for "MMA Dashboard". + - [x] Display active `Track` and `Ticket` queue status. +- [x] Task: Execution Clutch UI + - [x] Implement Step Mode toggle and Pause Points logic in the GUI. + - [x] Add `[Approve]`, `[Edit Payload]`, and `[Abort]` buttons for tool execution. +- [x] Task: Memory Mutator Modal + - [x] Create a modal for editing raw JSON conversation history of paused workers. +- [x] Task: Tiered Metrics & Log Links + - [x] Add visual indicators for the active model Tier. + - [x] Provide clickable links to sub-agent logs. \ No newline at end of file diff --git a/gui_2.py b/gui_2.py index 1716ec5..e00708b 100644 --- a/gui_2.py +++ b/gui_2.py @@ -111,6 +111,21 @@ class ConfirmDialog: return self._approved, self._script +class MMAApprovalDialog: + def __init__(self, ticket_id: str, payload: str): + self._ticket_id = ticket_id + self._payload = payload + self._condition = threading.Condition() + self._done = False + self._approved = False + + def wait(self) -> tuple[bool, str]: + with self._condition: + while not self._done: + self._condition.wait(timeout=0.1) + return self._approved, self._payload + + class App: """The main ImGui interface orchestrator for Manual Slop.""" @@ -895,6 +910,12 @@ class App: elif cb in self._predefined_callbacks: self._predefined_callbacks[cb](*args) + elif action == "mma_step_approval": + dlg = MMAApprovalDialog(task.get("ticket_id"), task.get("payload")) + self._pending_mma_approval = task + if "dialog_container" in task: + task["dialog_container"][0] = dlg + except Exception as e: print(f"Error executing GUI task: {e}") @@ -926,6 +947,18 @@ class App: else: print("[DEBUG] No pending dialog to reject") + def _handle_mma_respond(self, approved: bool, payload: str = None): + if self._pending_mma_approval: + dlg = self._pending_mma_approval.get("dialog_container", [None])[0] + if dlg: + with dlg._condition: + dlg._approved = approved + if payload is not None: + dlg._payload = payload + dlg._done = True + dlg._condition.notify_all() + self._pending_mma_approval = None + def _handle_approve_ask(self): """Responds with approval for a pending /api/ask request.""" if not self._ask_request_id: return diff --git a/multi_agent_conductor.py b/multi_agent_conductor.py index b97c171..6e20bae 100644 --- a/multi_agent_conductor.py +++ b/multi_agent_conductor.py @@ -102,17 +102,50 @@ class ConductorEngine: model_name="gemini-2.5-flash-lite", messages=[] ) - run_worker_lifecycle(ticket, context) + run_worker_lifecycle(ticket, context, event_queue=self.event_queue) await self._push_state(active_tier="Tier 2 (Tech Lead)") -def confirm_execution(payload: str) -> bool: +def confirm_execution(payload: str, event_queue: events.AsyncEventQueue, ticket_id: str) -> bool: """ - Placeholder for external confirmation function. - In a real scenario, this might trigger a UI prompt. + Pushes an approval request to the GUI and waits for response. """ - return True + import threading + import time + import asyncio + # We use a list container so the GUI can inject the actual Dialog object back to us + # since the dialog is created in the GUI thread. + dialog_container = [None] + + task = { + "action": "mma_step_approval", + "ticket_id": ticket_id, + "payload": payload, + "dialog_container": dialog_container + } + + # Push to queue + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + asyncio.run_coroutine_threadsafe(event_queue.put("mma_step_approval", task), loop) + else: + event_queue._queue.put_nowait(("mma_step_approval", task)) + except Exception: + # Fallback if no loop + event_queue._queue.put_nowait(("mma_step_approval", task)) -def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: List[str] = None): + # Wait for the GUI to create the dialog and for the user to respond + start = time.time() + while dialog_container[0] is None and time.time() - start < 60: + time.sleep(0.1) + + if dialog_container[0]: + approved, final_payload = dialog_container[0].wait() + return approved + + return False + +def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: List[str] = None, event_queue: events.AsyncEventQueue = None): """ Simulates the lifecycle of a single agent working on a ticket. Calls the AI client and updates the ticket status based on the response. @@ -150,11 +183,17 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: # In a real scenario, we would pass md_content from the aggregator # and manage the conversation history in the context. + # HITL Clutch: pass the queue and ticket_id to confirm_execution + def clutch_callback(payload: str) -> bool: + if not event_queue: + return True + return confirm_execution(payload, event_queue, ticket.id) + response = ai_client.send( md_content="", user_message=user_message, base_dir=".", - pre_tool_callback=confirm_execution if ticket.step_mode else None, + pre_tool_callback=clutch_callback if ticket.step_mode else None, qa_callback=ai_client.run_tier4_analysis ) diff --git a/tests/test_headless_verification.py b/tests/test_headless_verification.py index a28902a..0b39255 100644 --- a/tests/test_headless_verification.py +++ b/tests/test_headless_verification.py @@ -5,7 +5,8 @@ from multi_agent_conductor import ConductorEngine import ai_client import json -def test_headless_verification_full_run(): +@pytest.mark.asyncio +async def test_headless_verification_full_run(): """ 1. Initialize a ConductorEngine with a Track containing multiple dependent Tickets. 2. Simulate a full execution run using engine.run_linear(). @@ -15,7 +16,10 @@ def test_headless_verification_full_run(): t1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1") t2 = Ticket(id="T2", description="Task 2", status="todo", assigned_to="worker1", depends_on=["T1"]) track = Track(id="track_verify", description="Verification Track", tickets=[t1, t2]) - engine = ConductorEngine(track=track) + + from events import AsyncEventQueue + queue = AsyncEventQueue() + engine = ConductorEngine(track=track, event_queue=queue) with patch("ai_client.send") as mock_send, \ patch("ai_client.reset_session") as mock_reset: @@ -23,7 +27,7 @@ def test_headless_verification_full_run(): # We need mock_send to return something that doesn't contain "BLOCKED" mock_send.return_value = "Task completed successfully." - engine.run_linear() + await engine.run_linear() # Verify both tickets are completed assert t1.status == "completed" @@ -33,17 +37,20 @@ def test_headless_verification_full_run(): assert mock_send.call_count == 2 # Verify Context Amnesia: reset_session should be called for each ticket - # This confirms that each worker call starts with a clean slate. assert mock_reset.call_count == 2 -def test_headless_verification_error_and_qa_interceptor(): +@pytest.mark.asyncio +async def test_headless_verification_error_and_qa_interceptor(): """ 5. Simulate a shell error and verify that the Tier 4 QA interceptor is triggered and its summary is injected into the worker's history for the next retry. """ t1 = Ticket(id="T1", description="Task with error", status="todo", assigned_to="worker1") track = Track(id="track_error", description="Error Track", tickets=[t1]) - engine = ConductorEngine(track=track) + + from events import AsyncEventQueue + queue = AsyncEventQueue() + engine = ConductorEngine(track=track, event_queue=queue) # We need to simulate the tool loop inside ai_client._send_gemini (or similar) # Since we want to test the real tool loop and QA injection, we mock at the provider level. @@ -108,7 +115,7 @@ QA ANALYSIS: mock_run.side_effect = run_side_effect mock_qa.return_value = "FIX: Check if path exists." - engine.run_linear() + await engine.run_linear() # Verify QA analysis was triggered mock_qa.assert_called_once_with("Error: file not found")