2 Commits

5 changed files with 276 additions and 28 deletions
+1 -1
View File
@@ -30,7 +30,7 @@ This file tracks all major tracks for the project. Each track has its own detail
--- ---
- [~] **Track: MMA Core Engine Implementation** - [x] **Track: MMA Core Engine Implementation**
*Link: [./tracks/mma_core_engine_20260224/](./tracks/mma_core_engine_20260224/)* *Link: [./tracks/mma_core_engine_20260224/](./tracks/mma_core_engine_20260224/)*
--- ---
@@ -59,14 +59,14 @@
- [x] Verify that no regressions were introduced in existing functionality. - [x] Verify that no regressions were introduced in existing functionality.
## Phase 7: MMA Observability & UX ## Phase 7: MMA Observability & UX
- [~] Task: MMA Dashboard Implementation - [x] Task: MMA Dashboard Implementation
- [ ] Create a new dockable panel in `gui_2.py` for "MMA Dashboard". - [x] Create a new dockable panel in `gui_2.py` for "MMA Dashboard".
- [ ] Display active `Track` and `Ticket` queue status. - [x] Display active `Track` and `Ticket` queue status.
- [ ] Task: Execution Clutch UI - [x] Task: Execution Clutch UI
- [ ] Implement Step Mode toggle and Pause Points logic in the GUI. - [x] Implement Step Mode toggle and Pause Points logic in the GUI.
- [ ] Add `[Approve]`, `[Edit Payload]`, and `[Abort]` buttons for tool execution. - [x] Add `[Approve]`, `[Edit Payload]`, and `[Abort]` buttons for tool execution.
- [ ] Task: Memory Mutator Modal - [x] Task: Memory Mutator Modal
- [ ] Create a modal for editing raw JSON conversation history of paused workers. - [x] Create a modal for editing raw JSON conversation history of paused workers.
- [ ] Task: Tiered Metrics & Log Links - [x] Task: Tiered Metrics & Log Links
- [ ] Add visual indicators for the active model Tier. - [x] Add visual indicators for the active model Tier.
- [ ] Provide clickable links to sub-agent logs. - [x] Provide clickable links to sub-agent logs.
+174
View File
@@ -111,6 +111,21 @@ class ConfirmDialog:
return self._approved, self._script return self._approved, self._script
class MMAApprovalDialog:
def __init__(self, ticket_id: str, payload: str):
self._ticket_id = ticket_id
self._payload = payload
self._condition = threading.Condition()
self._done = False
self._approved = False
def wait(self) -> tuple[bool, str]:
with self._condition:
while not self._done:
self._condition.wait(timeout=0.1)
return self._approved, self._payload
class App: class App:
"""The main ImGui interface orchestrator for Manual Slop.""" """The main ImGui interface orchestrator for Manual Slop."""
@@ -185,6 +200,7 @@ class App:
"Context Hub": True, "Context Hub": True,
"Files & Media": True, "Files & Media": True,
"AI Settings": True, "AI Settings": True,
"MMA Dashboard": True,
"Discussion Hub": True, "Discussion Hub": True,
"Operations Hub": True, "Operations Hub": True,
"Theme": True, "Theme": True,
@@ -209,6 +225,19 @@ class App:
self._ask_request_id = None self._ask_request_id = None
self._ask_tool_data = None self._ask_tool_data = None
# MMA State
self.mma_step_mode = False
self.active_track = None
self.active_tickets = []
self.active_tier = None # "Tier 1", "Tier 2", etc.
self.mma_status = "idle"
# MMA-specific approval state
self._pending_mma_approval = None
self._mma_approval_open = False
self._mma_approval_edit_mode = False
self._mma_approval_payload = ""
self._tool_log: list[tuple[str, str]] = [] self._tool_log: list[tuple[str, str]] = []
self._comms_log: list[dict] = [] self._comms_log: list[dict] = []
@@ -827,6 +856,13 @@ class App:
"ts": project_manager.now_ts() "ts": project_manager.now_ts()
}) })
elif action == "mma_state_update":
payload = task.get("payload", {})
self.mma_status = payload.get("status", "idle")
self.active_tier = payload.get("active_tier")
self.active_track = payload.get("track")
self.active_tickets = payload.get("tickets", [])
elif action == "set_value": elif action == "set_value":
item = task.get("item") item = task.get("item")
value = task.get("value") value = task.get("value")
@@ -874,6 +910,12 @@ class App:
elif cb in self._predefined_callbacks: elif cb in self._predefined_callbacks:
self._predefined_callbacks[cb](*args) self._predefined_callbacks[cb](*args)
elif action == "mma_step_approval":
dlg = MMAApprovalDialog(task.get("ticket_id"), task.get("payload"))
self._pending_mma_approval = task
if "dialog_container" in task:
task["dialog_container"][0] = dlg
except Exception as e: except Exception as e:
print(f"Error executing GUI task: {e}") print(f"Error executing GUI task: {e}")
@@ -905,6 +947,18 @@ class App:
else: else:
print("[DEBUG] No pending dialog to reject") print("[DEBUG] No pending dialog to reject")
def _handle_mma_respond(self, approved: bool, payload: str = None):
if self._pending_mma_approval:
dlg = self._pending_mma_approval.get("dialog_container", [None])[0]
if dlg:
with dlg._condition:
dlg._approved = approved
if payload is not None:
dlg._payload = payload
dlg._done = True
dlg._condition.notify_all()
self._pending_mma_approval = None
def _handle_approve_ask(self): def _handle_approve_ask(self):
"""Responds with approval for a pending /api/ask request.""" """Responds with approval for a pending /api/ask request."""
if not self._ask_request_id: return if not self._ask_request_id: return
@@ -1014,6 +1068,12 @@ class App:
"action": "handle_ai_response", "action": "handle_ai_response",
"payload": payload "payload": payload
}) })
elif event_name == "mma_state_update":
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({
"action": "mma_state_update",
"payload": payload
})
def _handle_request_event(self, event: events.UserRequestEvent): def _handle_request_event(self, event: events.UserRequestEvent):
"""Processes a UserRequestEvent by calling the AI client.""" """Processes a UserRequestEvent by calling the AI client."""
@@ -1429,6 +1489,12 @@ class App:
if imgui.collapsing_header("System Prompts"): if imgui.collapsing_header("System Prompts"):
self._render_system_prompts_panel() self._render_system_prompts_panel()
imgui.end() imgui.end()
if self.show_windows.get("MMA Dashboard", False):
exp, self.show_windows["MMA Dashboard"] = imgui.begin("MMA Dashboard", self.show_windows["MMA Dashboard"])
if exp:
self._render_mma_dashboard()
imgui.end()
if self.show_windows.get("Theme", False): if self.show_windows.get("Theme", False):
self._render_theme_panel() self._render_theme_panel()
@@ -1606,6 +1672,48 @@ class App:
imgui.close_current_popup() imgui.close_current_popup()
imgui.end_popup() imgui.end_popup()
# MMA Step Approval Modal
if self._pending_mma_approval:
if not self._mma_approval_open:
imgui.open_popup("MMA Step Approval")
self._mma_approval_open = True
self._mma_approval_edit_mode = False
self._mma_approval_payload = self._pending_mma_approval.get("payload", "")
else:
self._mma_approval_open = False
if imgui.begin_popup_modal("MMA Step Approval", None, imgui.WindowFlags_.always_auto_resize)[0]:
if not self._pending_mma_approval:
imgui.close_current_popup()
else:
ticket_id = self._pending_mma_approval.get("ticket_id", "??")
imgui.text(f"Ticket {ticket_id} is waiting for tool execution approval.")
imgui.separator()
if self._mma_approval_edit_mode:
imgui.text("Edit Raw Payload (Manual Memory Mutation):")
_, self._mma_approval_payload = imgui.input_text_multiline("##mma_payload", self._mma_approval_payload, imgui.ImVec2(600, 400))
else:
imgui.text("Proposed Tool Call:")
imgui.begin_child("mma_preview", imgui.ImVec2(600, 300), True)
imgui.text_unformatted(self._pending_mma_approval.get("payload", ""))
imgui.end_child()
imgui.separator()
if imgui.button("Approve", imgui.ImVec2(120, 0)):
self._handle_mma_respond(approved=True, payload=self._mma_approval_payload)
imgui.close_current_popup()
imgui.same_line()
if imgui.button("Edit Payload" if not self._mma_approval_edit_mode else "Show Original", imgui.ImVec2(120, 0)):
self._mma_approval_edit_mode = not self._mma_approval_edit_mode
imgui.same_line()
if imgui.button("Abort Ticket", imgui.ImVec2(120, 0)):
self._handle_mma_respond(approved=False)
imgui.close_current_popup()
imgui.end_popup()
if self.show_script_output: if self.show_script_output:
if self._trigger_script_blink: if self._trigger_script_blink:
self._trigger_script_blink = False self._trigger_script_blink = False
@@ -2257,6 +2365,72 @@ class App:
if is_blinking: if is_blinking:
imgui.pop_style_color(2) imgui.pop_style_color(2)
def _render_mma_dashboard(self):
# 1. Global Controls
changed, self.mma_step_mode = imgui.checkbox("Step Mode (HITL)", self.mma_step_mode)
if changed:
# We could push an event here if the engine needs to know immediately
pass
imgui.same_line()
imgui.text(f"Status: {self.mma_status.upper()}")
if self.active_tier:
imgui.same_line()
imgui.text_colored(C_VAL, f"| Active: {self.active_tier}")
imgui.separator()
# 2. Active Track Info
if self.active_track:
imgui.text(f"Track: {self.active_track.get('title', 'Unknown')}")
# Progress bar
tickets = self.active_tickets
total = len(tickets)
if total > 0:
complete = sum(1 for t in tickets if t.get('status') == 'complete')
progress = complete / total
imgui.progress_bar(progress, imgui.ImVec2(-1, 0), f"{complete}/{total} Tickets")
else:
imgui.text_disabled("No active MMA track.")
imgui.separator()
# 3. Ticket Queue
imgui.text("Ticket Queue")
if imgui.begin_table("mma_tickets", 3, imgui.TableFlags_.borders_inner_h | imgui.TableFlags_.resizable):
imgui.table_setup_column("ID", imgui.TableColumnFlags_.width_fixed, 80)
imgui.table_setup_column("Target", imgui.TableColumnFlags_.width_stretch)
imgui.table_setup_column("Status", imgui.TableColumnFlags_.width_fixed, 100)
imgui.table_headers_row()
for t in self.active_tickets:
imgui.table_next_row()
imgui.table_next_column()
imgui.text(str(t.get('id', '??')))
imgui.table_next_column()
imgui.text(str(t.get('target_file', 'general')))
imgui.table_next_column()
status = t.get('status', 'pending').upper()
if status == 'RUNNING':
imgui.push_style_color(imgui.Col_.text, vec4(255, 255, 0)) # Yellow
elif status == 'COMPLETE':
imgui.push_style_color(imgui.Col_.text, vec4(0, 255, 0)) # Green
elif status == 'BLOCKED' or status == 'ERROR':
imgui.push_style_color(imgui.Col_.text, vec4(255, 0, 0)) # Red
elif status == 'PAUSED':
imgui.push_style_color(imgui.Col_.text, vec4(255, 165, 0)) # Orange
imgui.text(status)
if status in ['RUNNING', 'COMPLETE', 'BLOCKED', 'ERROR', 'PAUSED']:
imgui.pop_style_color()
imgui.end_table()
def _render_tool_calls_panel(self): def _render_tool_calls_panel(self):
imgui.text("Tool call history") imgui.text("Tool call history")
imgui.same_line() imgui.same_line()
+76 -9
View File
@@ -1,6 +1,9 @@
import ai_client import ai_client
import json import json
import asyncio
from typing import List, Optional from typing import List, Optional
from dataclasses import asdict
import events
from models import Ticket, Track, WorkerContext from models import Ticket, Track, WorkerContext
from file_cache import ASTParser from file_cache import ASTParser
@@ -8,8 +11,24 @@ class ConductorEngine:
""" """
Orchestrates the execution of tickets within a track. Orchestrates the execution of tickets within a track.
""" """
def __init__(self, track: Track): def __init__(self, track: Track, event_queue: Optional[events.AsyncEventQueue] = None):
self.track = track self.track = track
self.event_queue = event_queue
async def _push_state(self, status: str = "running", active_tier: str = None):
if not self.event_queue:
return
payload = {
"status": status,
"active_tier": active_tier,
"track": {
"id": self.track.id,
"title": self.track.description,
},
"tickets": [asdict(t) for t in self.track.tickets]
}
await self.event_queue.put("mma_state_update", payload)
def parse_json_tickets(self, json_str: str): def parse_json_tickets(self, json_str: str):
""" """
@@ -38,13 +57,15 @@ class ConductorEngine:
except KeyError as e: except KeyError as e:
print(f"Missing required field in ticket definition: {e}") print(f"Missing required field in ticket definition: {e}")
def run_linear(self): async def run_linear(self):
""" """
Executes tickets sequentially according to their dependencies. Executes tickets sequentially according to their dependencies.
Iterates through the track's executable tickets until no more can be run. Iterates through the track's executable tickets until no more can be run.
Supports dynamic execution as tickets added during runtime will be picked up Supports dynamic execution as tickets added during runtime will be picked up
in the next iteration of the main loop. in the next iteration of the main loop.
""" """
await self._push_state(status="running", active_tier="Tier 2 (Tech Lead)")
while True: while True:
executable = self.track.get_executable_tickets() executable = self.track.get_executable_tickets()
if not executable: if not executable:
@@ -52,14 +73,17 @@ class ConductorEngine:
all_done = all(t.status == "completed" for t in self.track.tickets) all_done = all(t.status == "completed" for t in self.track.tickets)
if all_done: if all_done:
print("Track completed successfully.") print("Track completed successfully.")
await self._push_state(status="done", active_tier=None)
else: else:
# If we have no executable tickets but some are not completed, we might be blocked # If we have no executable tickets but some are not completed, we might be blocked
# or there are simply no more tickets to run at this moment. # or there are simply no more tickets to run at this moment.
incomplete = [t for t in self.track.tickets if t.status != "completed"] incomplete = [t for t in self.track.tickets if t.status != "completed"]
if not incomplete: if not incomplete:
print("Track completed successfully.") print("Track completed successfully.")
await self._push_state(status="done", active_tier=None)
else: else:
print(f"No more executable tickets. {len(incomplete)} tickets remain incomplete.") print(f"No more executable tickets. {len(incomplete)} tickets remain incomplete.")
await self._push_state(status="blocked", active_tier=None)
break break
for ticket in executable: for ticket in executable:
@@ -69,22 +93,59 @@ class ConductorEngine:
continue continue
print(f"Executing ticket {ticket.id}: {ticket.description}") print(f"Executing ticket {ticket.id}: {ticket.description}")
ticket.status = "running"
await self._push_state(active_tier=f"Tier 3 (Worker): {ticket.id}")
# For now, we use a default model name or take it from config # For now, we use a default model name or take it from config
context = WorkerContext( context = WorkerContext(
ticket_id=ticket.id, ticket_id=ticket.id,
model_name="gemini-2.5-flash-lite", model_name="gemini-2.5-flash-lite",
messages=[] messages=[]
) )
run_worker_lifecycle(ticket, context) run_worker_lifecycle(ticket, context, event_queue=self.event_queue)
await self._push_state(active_tier="Tier 2 (Tech Lead)")
def confirm_execution(payload: str) -> bool: def confirm_execution(payload: str, event_queue: events.AsyncEventQueue, ticket_id: str) -> bool:
""" """
Placeholder for external confirmation function. Pushes an approval request to the GUI and waits for response.
In a real scenario, this might trigger a UI prompt.
""" """
return True import threading
import time
import asyncio
# We use a list container so the GUI can inject the actual Dialog object back to us
# since the dialog is created in the GUI thread.
dialog_container = [None]
task = {
"action": "mma_step_approval",
"ticket_id": ticket_id,
"payload": payload,
"dialog_container": dialog_container
}
# Push to queue
try:
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.run_coroutine_threadsafe(event_queue.put("mma_step_approval", task), loop)
else:
event_queue._queue.put_nowait(("mma_step_approval", task))
except Exception:
# Fallback if no loop
event_queue._queue.put_nowait(("mma_step_approval", task))
def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: List[str] = None): # Wait for the GUI to create the dialog and for the user to respond
start = time.time()
while dialog_container[0] is None and time.time() - start < 60:
time.sleep(0.1)
if dialog_container[0]:
approved, final_payload = dialog_container[0].wait()
return approved
return False
def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: List[str] = None, event_queue: events.AsyncEventQueue = None):
""" """
Simulates the lifecycle of a single agent working on a ticket. Simulates the lifecycle of a single agent working on a ticket.
Calls the AI client and updates the ticket status based on the response. Calls the AI client and updates the ticket status based on the response.
@@ -122,11 +183,17 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
# In a real scenario, we would pass md_content from the aggregator # In a real scenario, we would pass md_content from the aggregator
# and manage the conversation history in the context. # and manage the conversation history in the context.
# HITL Clutch: pass the queue and ticket_id to confirm_execution
def clutch_callback(payload: str) -> bool:
if not event_queue:
return True
return confirm_execution(payload, event_queue, ticket.id)
response = ai_client.send( response = ai_client.send(
md_content="", md_content="",
user_message=user_message, user_message=user_message,
base_dir=".", base_dir=".",
pre_tool_callback=confirm_execution if ticket.step_mode else None, pre_tool_callback=clutch_callback if ticket.step_mode else None,
qa_callback=ai_client.run_tier4_analysis qa_callback=ai_client.run_tier4_analysis
) )
+14 -7
View File
@@ -5,7 +5,8 @@ from multi_agent_conductor import ConductorEngine
import ai_client import ai_client
import json import json
def test_headless_verification_full_run(): @pytest.mark.asyncio
async def test_headless_verification_full_run():
""" """
1. Initialize a ConductorEngine with a Track containing multiple dependent Tickets. 1. Initialize a ConductorEngine with a Track containing multiple dependent Tickets.
2. Simulate a full execution run using engine.run_linear(). 2. Simulate a full execution run using engine.run_linear().
@@ -15,7 +16,10 @@ def test_headless_verification_full_run():
t1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1") t1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
t2 = Ticket(id="T2", description="Task 2", status="todo", assigned_to="worker1", depends_on=["T1"]) t2 = Ticket(id="T2", description="Task 2", status="todo", assigned_to="worker1", depends_on=["T1"])
track = Track(id="track_verify", description="Verification Track", tickets=[t1, t2]) track = Track(id="track_verify", description="Verification Track", tickets=[t1, t2])
engine = ConductorEngine(track=track)
from events import AsyncEventQueue
queue = AsyncEventQueue()
engine = ConductorEngine(track=track, event_queue=queue)
with patch("ai_client.send") as mock_send, \ with patch("ai_client.send") as mock_send, \
patch("ai_client.reset_session") as mock_reset: patch("ai_client.reset_session") as mock_reset:
@@ -23,7 +27,7 @@ def test_headless_verification_full_run():
# We need mock_send to return something that doesn't contain "BLOCKED" # We need mock_send to return something that doesn't contain "BLOCKED"
mock_send.return_value = "Task completed successfully." mock_send.return_value = "Task completed successfully."
engine.run_linear() await engine.run_linear()
# Verify both tickets are completed # Verify both tickets are completed
assert t1.status == "completed" assert t1.status == "completed"
@@ -33,17 +37,20 @@ def test_headless_verification_full_run():
assert mock_send.call_count == 2 assert mock_send.call_count == 2
# Verify Context Amnesia: reset_session should be called for each ticket # Verify Context Amnesia: reset_session should be called for each ticket
# This confirms that each worker call starts with a clean slate.
assert mock_reset.call_count == 2 assert mock_reset.call_count == 2
def test_headless_verification_error_and_qa_interceptor(): @pytest.mark.asyncio
async def test_headless_verification_error_and_qa_interceptor():
""" """
5. Simulate a shell error and verify that the Tier 4 QA interceptor is triggered 5. Simulate a shell error and verify that the Tier 4 QA interceptor is triggered
and its summary is injected into the worker's history for the next retry. and its summary is injected into the worker's history for the next retry.
""" """
t1 = Ticket(id="T1", description="Task with error", status="todo", assigned_to="worker1") t1 = Ticket(id="T1", description="Task with error", status="todo", assigned_to="worker1")
track = Track(id="track_error", description="Error Track", tickets=[t1]) track = Track(id="track_error", description="Error Track", tickets=[t1])
engine = ConductorEngine(track=track)
from events import AsyncEventQueue
queue = AsyncEventQueue()
engine = ConductorEngine(track=track, event_queue=queue)
# We need to simulate the tool loop inside ai_client._send_gemini (or similar) # We need to simulate the tool loop inside ai_client._send_gemini (or similar)
# Since we want to test the real tool loop and QA injection, we mock at the provider level. # Since we want to test the real tool loop and QA injection, we mock at the provider level.
@@ -108,7 +115,7 @@ QA ANALYSIS:
mock_run.side_effect = run_side_effect mock_run.side_effect = run_side_effect
mock_qa.return_value = "FIX: Check if path exists." mock_qa.return_value = "FIX: Check if path exists."
engine.run_linear() await engine.run_linear()
# Verify QA analysis was triggered # Verify QA analysis was triggered
mock_qa.assert_called_once_with("Error: file not found") mock_qa.assert_called_once_with("Error: file not found")