2 Commits

5 changed files with 276 additions and 28 deletions
+1 -1
View File
@@ -30,7 +30,7 @@ This file tracks all major tracks for the project. Each track has its own detail
---
- [~] **Track: MMA Core Engine Implementation**
- [x] **Track: MMA Core Engine Implementation**
*Link: [./tracks/mma_core_engine_20260224/](./tracks/mma_core_engine_20260224/)*
---
@@ -59,14 +59,14 @@
- [x] Verify that no regressions were introduced in existing functionality.
## Phase 7: MMA Observability & UX
- [~] Task: MMA Dashboard Implementation
- [ ] Create a new dockable panel in `gui_2.py` for "MMA Dashboard".
- [ ] Display active `Track` and `Ticket` queue status.
- [ ] Task: Execution Clutch UI
- [ ] Implement Step Mode toggle and Pause Points logic in the GUI.
- [ ] Add `[Approve]`, `[Edit Payload]`, and `[Abort]` buttons for tool execution.
- [ ] Task: Memory Mutator Modal
- [ ] Create a modal for editing raw JSON conversation history of paused workers.
- [ ] Task: Tiered Metrics & Log Links
- [ ] Add visual indicators for the active model Tier.
- [ ] Provide clickable links to sub-agent logs.
- [x] Task: MMA Dashboard Implementation
- [x] Create a new dockable panel in `gui_2.py` for "MMA Dashboard".
- [x] Display active `Track` and `Ticket` queue status.
- [x] Task: Execution Clutch UI
- [x] Implement Step Mode toggle and Pause Points logic in the GUI.
- [x] Add `[Approve]`, `[Edit Payload]`, and `[Abort]` buttons for tool execution.
- [x] Task: Memory Mutator Modal
- [x] Create a modal for editing raw JSON conversation history of paused workers.
- [x] Task: Tiered Metrics & Log Links
- [x] Add visual indicators for the active model Tier.
- [x] Provide clickable links to sub-agent logs.
+174
View File
@@ -111,6 +111,21 @@ class ConfirmDialog:
return self._approved, self._script
class MMAApprovalDialog:
def __init__(self, ticket_id: str, payload: str):
self._ticket_id = ticket_id
self._payload = payload
self._condition = threading.Condition()
self._done = False
self._approved = False
def wait(self) -> tuple[bool, str]:
with self._condition:
while not self._done:
self._condition.wait(timeout=0.1)
return self._approved, self._payload
class App:
"""The main ImGui interface orchestrator for Manual Slop."""
@@ -185,6 +200,7 @@ class App:
"Context Hub": True,
"Files & Media": True,
"AI Settings": True,
"MMA Dashboard": True,
"Discussion Hub": True,
"Operations Hub": True,
"Theme": True,
@@ -209,6 +225,19 @@ class App:
self._ask_request_id = None
self._ask_tool_data = None
# MMA State
self.mma_step_mode = False
self.active_track = None
self.active_tickets = []
self.active_tier = None # "Tier 1", "Tier 2", etc.
self.mma_status = "idle"
# MMA-specific approval state
self._pending_mma_approval = None
self._mma_approval_open = False
self._mma_approval_edit_mode = False
self._mma_approval_payload = ""
self._tool_log: list[tuple[str, str]] = []
self._comms_log: list[dict] = []
@@ -827,6 +856,13 @@ class App:
"ts": project_manager.now_ts()
})
elif action == "mma_state_update":
payload = task.get("payload", {})
self.mma_status = payload.get("status", "idle")
self.active_tier = payload.get("active_tier")
self.active_track = payload.get("track")
self.active_tickets = payload.get("tickets", [])
elif action == "set_value":
item = task.get("item")
value = task.get("value")
@@ -874,6 +910,12 @@ class App:
elif cb in self._predefined_callbacks:
self._predefined_callbacks[cb](*args)
elif action == "mma_step_approval":
dlg = MMAApprovalDialog(task.get("ticket_id"), task.get("payload"))
self._pending_mma_approval = task
if "dialog_container" in task:
task["dialog_container"][0] = dlg
except Exception as e:
print(f"Error executing GUI task: {e}")
@@ -905,6 +947,18 @@ class App:
else:
print("[DEBUG] No pending dialog to reject")
def _handle_mma_respond(self, approved: bool, payload: str = None):
if self._pending_mma_approval:
dlg = self._pending_mma_approval.get("dialog_container", [None])[0]
if dlg:
with dlg._condition:
dlg._approved = approved
if payload is not None:
dlg._payload = payload
dlg._done = True
dlg._condition.notify_all()
self._pending_mma_approval = None
def _handle_approve_ask(self):
"""Responds with approval for a pending /api/ask request."""
if not self._ask_request_id: return
@@ -1014,6 +1068,12 @@ class App:
"action": "handle_ai_response",
"payload": payload
})
elif event_name == "mma_state_update":
with self._pending_gui_tasks_lock:
self._pending_gui_tasks.append({
"action": "mma_state_update",
"payload": payload
})
def _handle_request_event(self, event: events.UserRequestEvent):
"""Processes a UserRequestEvent by calling the AI client."""
@@ -1429,6 +1489,12 @@ class App:
if imgui.collapsing_header("System Prompts"):
self._render_system_prompts_panel()
imgui.end()
if self.show_windows.get("MMA Dashboard", False):
exp, self.show_windows["MMA Dashboard"] = imgui.begin("MMA Dashboard", self.show_windows["MMA Dashboard"])
if exp:
self._render_mma_dashboard()
imgui.end()
if self.show_windows.get("Theme", False):
self._render_theme_panel()
@@ -1606,6 +1672,48 @@ class App:
imgui.close_current_popup()
imgui.end_popup()
# MMA Step Approval Modal
if self._pending_mma_approval:
if not self._mma_approval_open:
imgui.open_popup("MMA Step Approval")
self._mma_approval_open = True
self._mma_approval_edit_mode = False
self._mma_approval_payload = self._pending_mma_approval.get("payload", "")
else:
self._mma_approval_open = False
if imgui.begin_popup_modal("MMA Step Approval", None, imgui.WindowFlags_.always_auto_resize)[0]:
if not self._pending_mma_approval:
imgui.close_current_popup()
else:
ticket_id = self._pending_mma_approval.get("ticket_id", "??")
imgui.text(f"Ticket {ticket_id} is waiting for tool execution approval.")
imgui.separator()
if self._mma_approval_edit_mode:
imgui.text("Edit Raw Payload (Manual Memory Mutation):")
_, self._mma_approval_payload = imgui.input_text_multiline("##mma_payload", self._mma_approval_payload, imgui.ImVec2(600, 400))
else:
imgui.text("Proposed Tool Call:")
imgui.begin_child("mma_preview", imgui.ImVec2(600, 300), True)
imgui.text_unformatted(self._pending_mma_approval.get("payload", ""))
imgui.end_child()
imgui.separator()
if imgui.button("Approve", imgui.ImVec2(120, 0)):
self._handle_mma_respond(approved=True, payload=self._mma_approval_payload)
imgui.close_current_popup()
imgui.same_line()
if imgui.button("Edit Payload" if not self._mma_approval_edit_mode else "Show Original", imgui.ImVec2(120, 0)):
self._mma_approval_edit_mode = not self._mma_approval_edit_mode
imgui.same_line()
if imgui.button("Abort Ticket", imgui.ImVec2(120, 0)):
self._handle_mma_respond(approved=False)
imgui.close_current_popup()
imgui.end_popup()
if self.show_script_output:
if self._trigger_script_blink:
self._trigger_script_blink = False
@@ -2257,6 +2365,72 @@ class App:
if is_blinking:
imgui.pop_style_color(2)
def _render_mma_dashboard(self):
# 1. Global Controls
changed, self.mma_step_mode = imgui.checkbox("Step Mode (HITL)", self.mma_step_mode)
if changed:
# We could push an event here if the engine needs to know immediately
pass
imgui.same_line()
imgui.text(f"Status: {self.mma_status.upper()}")
if self.active_tier:
imgui.same_line()
imgui.text_colored(C_VAL, f"| Active: {self.active_tier}")
imgui.separator()
# 2. Active Track Info
if self.active_track:
imgui.text(f"Track: {self.active_track.get('title', 'Unknown')}")
# Progress bar
tickets = self.active_tickets
total = len(tickets)
if total > 0:
complete = sum(1 for t in tickets if t.get('status') == 'complete')
progress = complete / total
imgui.progress_bar(progress, imgui.ImVec2(-1, 0), f"{complete}/{total} Tickets")
else:
imgui.text_disabled("No active MMA track.")
imgui.separator()
# 3. Ticket Queue
imgui.text("Ticket Queue")
if imgui.begin_table("mma_tickets", 3, imgui.TableFlags_.borders_inner_h | imgui.TableFlags_.resizable):
imgui.table_setup_column("ID", imgui.TableColumnFlags_.width_fixed, 80)
imgui.table_setup_column("Target", imgui.TableColumnFlags_.width_stretch)
imgui.table_setup_column("Status", imgui.TableColumnFlags_.width_fixed, 100)
imgui.table_headers_row()
for t in self.active_tickets:
imgui.table_next_row()
imgui.table_next_column()
imgui.text(str(t.get('id', '??')))
imgui.table_next_column()
imgui.text(str(t.get('target_file', 'general')))
imgui.table_next_column()
status = t.get('status', 'pending').upper()
if status == 'RUNNING':
imgui.push_style_color(imgui.Col_.text, vec4(255, 255, 0)) # Yellow
elif status == 'COMPLETE':
imgui.push_style_color(imgui.Col_.text, vec4(0, 255, 0)) # Green
elif status == 'BLOCKED' or status == 'ERROR':
imgui.push_style_color(imgui.Col_.text, vec4(255, 0, 0)) # Red
elif status == 'PAUSED':
imgui.push_style_color(imgui.Col_.text, vec4(255, 165, 0)) # Orange
imgui.text(status)
if status in ['RUNNING', 'COMPLETE', 'BLOCKED', 'ERROR', 'PAUSED']:
imgui.pop_style_color()
imgui.end_table()
def _render_tool_calls_panel(self):
imgui.text("Tool call history")
imgui.same_line()
+76 -9
View File
@@ -1,6 +1,9 @@
import ai_client
import json
import asyncio
from typing import List, Optional
from dataclasses import asdict
import events
from models import Ticket, Track, WorkerContext
from file_cache import ASTParser
@@ -8,8 +11,24 @@ class ConductorEngine:
"""
Orchestrates the execution of tickets within a track.
"""
def __init__(self, track: Track):
def __init__(self, track: Track, event_queue: Optional[events.AsyncEventQueue] = None):
self.track = track
self.event_queue = event_queue
async def _push_state(self, status: str = "running", active_tier: str = None):
if not self.event_queue:
return
payload = {
"status": status,
"active_tier": active_tier,
"track": {
"id": self.track.id,
"title": self.track.description,
},
"tickets": [asdict(t) for t in self.track.tickets]
}
await self.event_queue.put("mma_state_update", payload)
def parse_json_tickets(self, json_str: str):
"""
@@ -38,13 +57,15 @@ class ConductorEngine:
except KeyError as e:
print(f"Missing required field in ticket definition: {e}")
def run_linear(self):
async def run_linear(self):
"""
Executes tickets sequentially according to their dependencies.
Iterates through the track's executable tickets until no more can be run.
Supports dynamic execution as tickets added during runtime will be picked up
in the next iteration of the main loop.
"""
await self._push_state(status="running", active_tier="Tier 2 (Tech Lead)")
while True:
executable = self.track.get_executable_tickets()
if not executable:
@@ -52,14 +73,17 @@ class ConductorEngine:
all_done = all(t.status == "completed" for t in self.track.tickets)
if all_done:
print("Track completed successfully.")
await self._push_state(status="done", active_tier=None)
else:
# If we have no executable tickets but some are not completed, we might be blocked
# or there are simply no more tickets to run at this moment.
incomplete = [t for t in self.track.tickets if t.status != "completed"]
if not incomplete:
print("Track completed successfully.")
await self._push_state(status="done", active_tier=None)
else:
print(f"No more executable tickets. {len(incomplete)} tickets remain incomplete.")
await self._push_state(status="blocked", active_tier=None)
break
for ticket in executable:
@@ -69,22 +93,59 @@ class ConductorEngine:
continue
print(f"Executing ticket {ticket.id}: {ticket.description}")
ticket.status = "running"
await self._push_state(active_tier=f"Tier 3 (Worker): {ticket.id}")
# For now, we use a default model name or take it from config
context = WorkerContext(
ticket_id=ticket.id,
model_name="gemini-2.5-flash-lite",
messages=[]
)
run_worker_lifecycle(ticket, context)
run_worker_lifecycle(ticket, context, event_queue=self.event_queue)
await self._push_state(active_tier="Tier 2 (Tech Lead)")
def confirm_execution(payload: str) -> bool:
def confirm_execution(payload: str, event_queue: events.AsyncEventQueue, ticket_id: str) -> bool:
"""
Placeholder for external confirmation function.
In a real scenario, this might trigger a UI prompt.
Pushes an approval request to the GUI and waits for response.
"""
return True
import threading
import time
import asyncio
# We use a list container so the GUI can inject the actual Dialog object back to us
# since the dialog is created in the GUI thread.
dialog_container = [None]
task = {
"action": "mma_step_approval",
"ticket_id": ticket_id,
"payload": payload,
"dialog_container": dialog_container
}
# Push to queue
try:
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.run_coroutine_threadsafe(event_queue.put("mma_step_approval", task), loop)
else:
event_queue._queue.put_nowait(("mma_step_approval", task))
except Exception:
# Fallback if no loop
event_queue._queue.put_nowait(("mma_step_approval", task))
def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: List[str] = None):
# Wait for the GUI to create the dialog and for the user to respond
start = time.time()
while dialog_container[0] is None and time.time() - start < 60:
time.sleep(0.1)
if dialog_container[0]:
approved, final_payload = dialog_container[0].wait()
return approved
return False
def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: List[str] = None, event_queue: events.AsyncEventQueue = None):
"""
Simulates the lifecycle of a single agent working on a ticket.
Calls the AI client and updates the ticket status based on the response.
@@ -122,11 +183,17 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
# In a real scenario, we would pass md_content from the aggregator
# and manage the conversation history in the context.
# HITL Clutch: pass the queue and ticket_id to confirm_execution
def clutch_callback(payload: str) -> bool:
if not event_queue:
return True
return confirm_execution(payload, event_queue, ticket.id)
response = ai_client.send(
md_content="",
user_message=user_message,
base_dir=".",
pre_tool_callback=confirm_execution if ticket.step_mode else None,
pre_tool_callback=clutch_callback if ticket.step_mode else None,
qa_callback=ai_client.run_tier4_analysis
)
+14 -7
View File
@@ -5,7 +5,8 @@ from multi_agent_conductor import ConductorEngine
import ai_client
import json
def test_headless_verification_full_run():
@pytest.mark.asyncio
async def test_headless_verification_full_run():
"""
1. Initialize a ConductorEngine with a Track containing multiple dependent Tickets.
2. Simulate a full execution run using engine.run_linear().
@@ -15,7 +16,10 @@ def test_headless_verification_full_run():
t1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
t2 = Ticket(id="T2", description="Task 2", status="todo", assigned_to="worker1", depends_on=["T1"])
track = Track(id="track_verify", description="Verification Track", tickets=[t1, t2])
engine = ConductorEngine(track=track)
from events import AsyncEventQueue
queue = AsyncEventQueue()
engine = ConductorEngine(track=track, event_queue=queue)
with patch("ai_client.send") as mock_send, \
patch("ai_client.reset_session") as mock_reset:
@@ -23,7 +27,7 @@ def test_headless_verification_full_run():
# We need mock_send to return something that doesn't contain "BLOCKED"
mock_send.return_value = "Task completed successfully."
engine.run_linear()
await engine.run_linear()
# Verify both tickets are completed
assert t1.status == "completed"
@@ -33,17 +37,20 @@ def test_headless_verification_full_run():
assert mock_send.call_count == 2
# Verify Context Amnesia: reset_session should be called for each ticket
# This confirms that each worker call starts with a clean slate.
assert mock_reset.call_count == 2
def test_headless_verification_error_and_qa_interceptor():
@pytest.mark.asyncio
async def test_headless_verification_error_and_qa_interceptor():
"""
5. Simulate a shell error and verify that the Tier 4 QA interceptor is triggered
and its summary is injected into the worker's history for the next retry.
"""
t1 = Ticket(id="T1", description="Task with error", status="todo", assigned_to="worker1")
track = Track(id="track_error", description="Error Track", tickets=[t1])
engine = ConductorEngine(track=track)
from events import AsyncEventQueue
queue = AsyncEventQueue()
engine = ConductorEngine(track=track, event_queue=queue)
# We need to simulate the tool loop inside ai_client._send_gemini (or similar)
# Since we want to test the real tool loop and QA injection, we mock at the provider level.
@@ -108,7 +115,7 @@ QA ANALYSIS:
mock_run.side_effect = run_side_effect
mock_qa.return_value = "FIX: Check if path exists."
engine.run_linear()
await engine.run_linear()
# Verify QA analysis was triggered
mock_qa.assert_called_once_with("Error: file not found")