feat(mma): complete Phase 6 and finalize Comprehensive GUI UX track

- Implement Live Worker Streaming: wire ai_client.comms_log_callback to Tier 3 streams
- Add Parallel DAG Execution using asyncio.gather for non-dependent tickets
- Implement Automatic Retry with Model Escalation (Flash-Lite -> Flash -> Pro)
- Add Tier Model Configuration UI to MMA Dashboard with project TOML persistence
- Fix FPS reporting in PerformanceMonitor to prevent transient 0.0 values
- Update Ticket model with retry_count and dictionary-like access
- Stabilize Gemini CLI integration tests and handle script approval events in simulations
- Finalize and verify all 6 phases of the implementation plan
This commit is contained in:
2026-03-01 22:38:43 -05:00
parent d1ce0eaaeb
commit 9fb01ce5d1
22 changed files with 756 additions and 498 deletions

View File

@@ -60,38 +60,49 @@ def main() -> None:
}), flush=True)
return
# If the prompt contains tool results, provide final answer
if '"role": "tool"' in prompt or '"tool_call_id"' in prompt:
print(json.dumps({
"type": "message",
"role": "assistant",
"content": "I have processed the tool results and here is the final answer."
}), flush=True)
print(json.dumps({
"type": "result",
"status": "success",
"stats": {"total_tokens": 100, "input_tokens": 80, "output_tokens": 20},
"session_id": "mock-session-final"
}), flush=True)
return
# Default flow: emit a tool call to test multi-round looping
print(json.dumps({
"type": "message",
"role": "assistant",
"content": "I need to check the directory first."
}), flush=True)
print(json.dumps({
"type": "tool_use",
"name": "list_directory",
"id": "mock-call-1",
"args": {"dir_path": "."}
}), flush=True)
print(json.dumps({
"type": "result",
"status": "success",
"stats": {"total_tokens": 10, "input_tokens": 10, "output_tokens": 0},
"session_id": "mock-session-default"
}), flush=True)
# Check for multi-round integration test triggers
is_resume = '--resume' in " ".join(sys.argv) or 'role: tool' in prompt or 'tool_call_id' in prompt
if is_resume or 'Perform multi-round tool test' in prompt or 'Please read test.txt' in prompt or 'Deny me' in prompt:
if not is_resume:
# First round: emit tool call
print(json.dumps({
"type": "message",
"role": "assistant",
"content": "I need to check the directory first."
}), flush=True)
print(json.dumps({
"type": "tool_use",
"name": "run_powershell",
"id": "mock-call-1",
"args": {"script": "Get-ChildItem"}
}), flush=True)
print(json.dumps({
"type": "result",
"status": "success",
"stats": {"total_tokens": 10, "input_tokens": 10, "output_tokens": 0},
"session_id": "mock-session-default"
}), flush=True)
return
else:
# Second round
if "USER REJECTED" in prompt:
print(json.dumps({
"type": "message",
"role": "assistant",
"content": "Tool execution was denied. I cannot proceed."
}), flush=True)
else:
print(json.dumps({
"type": "message",
"role": "assistant",
"content": "I have processed the tool results and here is the final answer."
}), flush=True)
print(json.dumps({
"type": "result",
"status": "success",
"stats": {"total_tokens": 100, "input_tokens": 80, "output_tokens": 20},
"session_id": "mock-session-final"
}), flush=True)
return
if __name__ == "__main__":
main()

View File

@@ -19,30 +19,37 @@ class TestGeminiCliAdapter(unittest.TestCase):
@patch('subprocess.Popen')
def test_send_starts_subprocess_with_correct_args(self, mock_popen: Any) -> None:
"""
Verify that send(message) correctly starts the subprocess with
--output-format stream-json and the provided message via stdin using communicate.
"""
Verify that send(message) correctly starts the subprocess with
--output-format stream-json and the provided message via stdin.
"""
# Setup mock process with a minimal valid JSONL termination
process_mock = MagicMock()
stdout_content = json.dumps({"type": "result", "usage": {}}) + "\n"
process_mock.communicate.return_value = (stdout_content, "")
jsonl_output = [json.dumps({"type": "result", "usage": {}}) + "\n"]
process_mock.stdout.readline.side_effect = jsonl_output + ['']
process_mock.stderr.read.return_value = ""
process_mock.poll.return_value = 0
process_mock.wait.return_value = 0
mock_popen.return_value = process_mock
message = "Hello Gemini CLI"
self.adapter.send(message)
# Verify subprocess.Popen call
mock_popen.assert_called_once()
args, kwargs = mock_popen.call_args
cmd = args[0]
# Check mandatory CLI components
self.assertIn("gemini", cmd)
self.assertIn("--output-format", cmd)
self.assertIn("stream-json", cmd)
# Message should NOT be in cmd now
self.assertNotIn(message, cmd)
# Verify message was sent via communicate
process_mock.communicate.assert_called_once_with(input=message)
# Verify message was written to stdin
process_mock.stdin.write.assert_called_with(message)
# Check process configuration
self.assertEqual(kwargs.get('stdout'), subprocess.PIPE)
self.assertEqual(kwargs.get('stdin'), subprocess.PIPE)
@@ -51,20 +58,21 @@ class TestGeminiCliAdapter(unittest.TestCase):
@patch('subprocess.Popen')
def test_send_parses_jsonl_output(self, mock_popen: Any) -> None:
"""
Verify that it correctly parses multiple JSONL 'message' events
and returns the combined text.
"""
Verify that it correctly parses multiple JSONL 'message' events
and returns the combined text.
"""
jsonl_output = [
json.dumps({"type": "message", "role": "model", "text": "The quick brown "}),
json.dumps({"type": "message", "role": "model", "text": "fox jumps."}),
json.dumps({"type": "result", "usage": {"prompt_tokens": 5, "candidates_tokens": 5}})
json.dumps({"type": "message", "role": "model", "text": "The quick brown "}) + "\n",
json.dumps({"type": "message", "role": "model", "text": "fox jumps."}) + "\n",
json.dumps({"type": "result", "usage": {"prompt_tokens": 5, "candidates_tokens": 5}}) + "\n"
]
stdout_content = "\n".join(jsonl_output) + "\n"
process_mock = MagicMock()
process_mock.communicate.return_value = (stdout_content, "")
process_mock.stdout.readline.side_effect = jsonl_output + ['']
process_mock.stderr.read.return_value = ""
process_mock.poll.return_value = 0
process_mock.wait.return_value = 0
mock_popen.return_value = process_mock
result = self.adapter.send("test message")
self.assertEqual(result["text"], "The quick brown fox jumps.")
self.assertEqual(result["tool_calls"], [])
@@ -72,21 +80,22 @@ class TestGeminiCliAdapter(unittest.TestCase):
@patch('subprocess.Popen')
def test_send_handles_tool_use_events(self, mock_popen: Any) -> None:
"""
Verify that it correctly handles 'tool_use' events in the stream
by continuing to read until the final 'result' event.
"""
Verify that it correctly handles 'tool_use' events in the stream
by continuing to read until the final 'result' event.
"""
jsonl_output = [
json.dumps({"type": "message", "role": "assistant", "text": "Calling tool..."}),
json.dumps({"type": "tool_use", "name": "read_file", "args": {"path": "test.txt"}}),
json.dumps({"type": "message", "role": "assistant", "text": "\nFile read successfully."}),
json.dumps({"type": "result", "usage": {}})
json.dumps({"type": "message", "role": "assistant", "text": "Calling tool..."}) + "\n",
json.dumps({"type": "tool_use", "name": "read_file", "args": {"path": "test.txt"}}) + "\n",
json.dumps({"type": "message", "role": "assistant", "text": "\nFile read successfully."}) + "\n",
json.dumps({"type": "result", "usage": {}}) + "\n"
]
stdout_content = "\n".join(jsonl_output) + "\n"
process_mock = MagicMock()
process_mock.communicate.return_value = (stdout_content, "")
process_mock.stdout.readline.side_effect = jsonl_output + ['']
process_mock.stderr.read.return_value = ""
process_mock.poll.return_value = 0
process_mock.wait.return_value = 0
mock_popen.return_value = process_mock
result = self.adapter.send("read test.txt")
# Result should contain the combined text from all 'message' events
self.assertEqual(result["text"], "Calling tool...\nFile read successfully.")
@@ -96,19 +105,20 @@ class TestGeminiCliAdapter(unittest.TestCase):
@patch('subprocess.Popen')
def test_send_captures_usage_metadata(self, mock_popen: Any) -> None:
"""
Verify that usage data is extracted from the 'result' event.
"""
Verify that usage data is extracted from the 'result' event.
"""
usage_data = {"total_tokens": 42}
jsonl_output = [
json.dumps({"type": "message", "text": "Finalizing"}),
json.dumps({"type": "result", "usage": usage_data})
json.dumps({"type": "message", "text": "Finalizing"}) + "\n",
json.dumps({"type": "result", "usage": usage_data}) + "\n"
]
stdout_content = "\n".join(jsonl_output) + "\n"
process_mock = MagicMock()
process_mock.communicate.return_value = (stdout_content, "")
process_mock.stdout.readline.side_effect = jsonl_output + ['']
process_mock.stderr.read.return_value = ""
process_mock.poll.return_value = 0
process_mock.wait.return_value = 0
mock_popen.return_value = process_mock
self.adapter.send("usage test")
# Verify the usage was captured in the adapter instance
self.assertEqual(self.adapter.last_usage, usage_data)

View File

@@ -91,9 +91,14 @@ else:
approved = False
while time.time() - start_time < timeout:
for ev in client.get_events():
if ev.get("type") == "ask_received":
etype = ev.get("type")
eid = ev.get("request_id") or ev.get("action_id")
if etype == "ask_received":
requests.post("http://127.0.0.1:8999/api/ask/respond",
json={"request_id": ev.get("request_id"), "response": {"approved": True}})
json={"request_id": eid, "response": {"approved": True}})
approved = True
elif etype == "script_confirmation_required":
requests.post(f"http://127.0.0.1:8999/api/confirm/{eid}", json={"approved": True})
approved = True
if approved: break
time.sleep(0.5)
@@ -129,9 +134,14 @@ def test_gemini_cli_loop_termination(live_gui: Any) -> None:
approved = False
while time.time() - start_time < timeout:
for ev in client.get_events():
if ev.get("type") == "ask_received":
etype = ev.get("type")
eid = ev.get("request_id") or ev.get("action_id")
if etype == "ask_received":
requests.post("http://127.0.0.1:8999/api/ask/respond",
json={"request_id": ev.get("request_id"), "response": {"approved": True}})
json={"request_id": eid, "response": {"approved": True}})
approved = True
elif etype == "script_confirmation_required":
requests.post(f"http://127.0.0.1:8999/api/confirm/{eid}", json={"approved": True})
approved = True
if approved: break
time.sleep(0.5)

View File

@@ -1,4 +1,4 @@
from typing import Any
from typing import Any
import pytest
import time
import os
@@ -95,14 +95,19 @@ def test_gemini_cli_rejection_and_history(live_gui: Any) -> None:
while time.time() - start_time < timeout:
for ev in client.get_events():
etype = ev.get("type")
eid = ev.get("request_id")
print(f"[TEST] Received event: {etype}")
eid = ev.get("request_id") or ev.get("action_id")
print(f"[TEST] Received event: {etype} (ID: {eid})")
if etype == "ask_received":
print(f"[TEST] Denying request {eid}")
requests.post("http://127.0.0.1:8999/api/ask/respond",
json={"request_id": eid, "response": {"approved": False}})
denied = True
break
elif etype == "script_confirmation_required":
print(f"[TEST] Denying script {eid}")
requests.post(f"http://127.0.0.1:8999/api/confirm/{eid}", json={"approved": False})
denied = True
break
if denied: break
time.sleep(0.5)
assert denied, "No ask_received event to deny"

View File

@@ -83,7 +83,12 @@ def test_create_track(app_instance, tmp_path):
with patch('gui_2.project_manager.get_all_tracks', return_value=[]):
app_instance._cb_create_track("Test Track", "Test Description", "feature")
track_dir = Path("conductor/tracks/test_track")
# Search for a directory starting with 'test_track' in 'conductor/tracks/'
tracks_root = Path("conductor/tracks")
matching_dirs = [d for d in tracks_root.iterdir() if d.is_dir() and d.name.startswith("test_track")]
assert len(matching_dirs) == 1
track_dir = matching_dirs[0]
assert track_dir.exists()
assert (track_dir / "spec.md").exists()
assert (track_dir / "plan.md").exists()
@@ -93,6 +98,6 @@ def test_create_track(app_instance, tmp_path):
data = json.load(f)
assert data['title'] == "Test Track"
assert data['type'] == "feature"
assert data['id'] == "test_track"
assert data['id'] == track_dir.name
finally:
os.chdir(old_cwd)

View File

@@ -102,3 +102,35 @@ def test_handle_ai_response_resets_stream(app_instance: App):
# (sometimes streaming chunks don't perfectly match final text if there are
# tool calls or specific SDK behaviors).
assert app_instance.mma_streams[stream_id] == "Final complete response."
def test_handle_ai_response_streaming(app_instance: App):
"""Verifies that 'handle_ai_response' with status='streaming...' appends to mma_streams."""
stream_id = "Tier 3 (Worker): T-001"
# 1. First chunk
with app_instance._pending_gui_tasks_lock:
app_instance._pending_gui_tasks.append({
"action": "handle_ai_response",
"payload": {
"stream_id": stream_id,
"text": "Chunk 1. ",
"status": "streaming..."
}
})
app_instance._process_pending_gui_tasks()
assert app_instance.mma_streams[stream_id] == "Chunk 1. "
# 2. Second chunk
with app_instance._pending_gui_tasks_lock:
app_instance._pending_gui_tasks.append({
"action": "handle_ai_response",
"payload": {
"stream_id": stream_id,
"text": "Chunk 2.",
"status": "streaming..."
}
})
app_instance._process_pending_gui_tasks()
# 3. Verify final state
assert app_instance.mma_streams[stream_id] == "Chunk 1. Chunk 2."

View File

@@ -0,0 +1,98 @@
import pytest
from unittest.mock import MagicMock, patch, AsyncMock
import asyncio
import json
import multi_agent_conductor
from multi_agent_conductor import ConductorEngine, run_worker_lifecycle
from models import Ticket, Track, WorkerContext
def test_worker_streaming_intermediate():
ticket = Ticket(id="T-001", description="Test", status="todo", assigned_to="worker")
context = WorkerContext(ticket_id="T-001", model_name="test-model", messages=[])
event_queue = MagicMock()
event_queue.put = AsyncMock()
loop = MagicMock()
with (
patch("ai_client.send") as mock_send,
patch("multi_agent_conductor._queue_put") as mock_q_put,
patch("multi_agent_conductor.confirm_spawn", return_value=(True, "p", "c")),
patch("ai_client.reset_session"),
patch("ai_client.set_provider"),
patch("ai_client.get_provider"),
patch("ai_client.get_comms_log", return_value=[])
):
def side_effect(*args, **kwargs):
import ai_client
cb = ai_client.comms_log_callback
if cb:
cb({"kind": "tool_call", "payload": {"name": "test_tool", "script": "echo hello"}})
cb({"kind": "tool_result", "payload": {"name": "test_tool", "output": "hello"}})
return "DONE"
mock_send.side_effect = side_effect
run_worker_lifecycle(ticket, context, event_queue=event_queue, loop=loop)
responses = [call.args[3] for call in mock_q_put.call_args_list if call.args[2] == "response"]
assert any("[TOOL CALL]" in r.get("text", "") for r in responses)
assert any("[TOOL RESULT]" in r.get("text", "") for r in responses)
def test_per_tier_model_persistence():
# Mock UI frameworks before importing gui_2
mock_imgui = MagicMock()
with patch.dict("sys.modules", {
"imgui_bundle": MagicMock(),
"imgui_bundle.imgui": mock_imgui,
"imgui_bundle.hello_imgui": MagicMock(),
"imgui_bundle.immapp": MagicMock(),
}):
from gui_2 import App
with (
patch("gui_2.project_manager.load_project", return_value={}),
patch("gui_2.project_manager.migrate_from_legacy_config", return_value={}),
patch("gui_2.project_manager.save_project"),
patch("gui_2.save_config"),
patch("gui_2.theme.load_from_config"),
patch("gui_2.ai_client.set_provider"),
patch("gui_2.ai_client.list_models", return_value=["gpt-4", "claude-3"]),
patch("gui_2.PerformanceMonitor"),
patch("gui_2.api_hooks.HookServer"),
patch("gui_2.session_logger.open_session")
):
app = App()
app.available_models = ["gpt-4", "claude-3"]
tier = "Tier 3"
model = "claude-3"
# Simulate 'Tier Model Config' UI logic
app.mma_tier_usage[tier]["model"] = model
app.project.setdefault("mma", {}).setdefault("tier_models", {})[tier] = model
assert app.project["mma"]["tier_models"][tier] == model
@pytest.mark.asyncio
async def test_retry_escalation():
ticket = Ticket(id="T-001", description="Test", status="todo", assigned_to="worker")
track = Track(id="TR-001", description="Track", tickets=[ticket])
event_queue = MagicMock()
event_queue.put = AsyncMock()
engine = ConductorEngine(track, event_queue=event_queue)
engine.engine.auto_queue = True
with patch("multi_agent_conductor.run_worker_lifecycle") as mock_lifecycle:
def lifecycle_side_effect(t, *args, **kwargs):
t.status = "blocked"
return "BLOCKED"
mock_lifecycle.side_effect = lifecycle_side_effect
with patch.object(engine.engine, "tick") as mock_tick:
# First tick returns ticket, second tick returns empty list to stop loop
mock_tick.side_effect = [[ticket], []]
await engine.run()
assert ticket.retry_count == 1
assert ticket.status == "todo"

View File

@@ -2,6 +2,7 @@ import pytest
import time
import sys
import os
import json
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
@@ -10,50 +11,105 @@ from api_hook_client import ApiHookClient
@pytest.mark.integration
@pytest.mark.timeout(60)
def test_gui_ux_event_routing(live_gui) -> None:
client = ApiHookClient()
assert client.wait_for_server(timeout=15), "Hook server did not start"
client = ApiHookClient()
assert client.wait_for_server(timeout=15), "Hook server did not start"
# ------------------------------------------------------------------
# 1. Verify Streaming Event Routing
# ------------------------------------------------------------------
print("[SIM] Testing Streaming Event Routing...")
stream_id = "Tier 3 (Worker): T-SIM-001"
# We use push_event which POSTs to /api/gui with action=mma_stream_append
# As defined in App._process_pending_gui_tasks
client.push_event('mma_stream_append', {'stream_id': stream_id, 'text': 'Hello '})
time.sleep(0.5)
client.push_event('mma_stream_append', {'stream_id': stream_id, 'text': 'World!'})
time.sleep(1.0)
status = client.get_mma_status()
streams = status.get('mma_streams', {})
assert streams.get(stream_id) == 'Hello World!', f"Streaming failed: {streams.get(stream_id)}"
print("[SIM] Streaming event routing verified.")
# ------------------------------------------------------------------
# 1. Verify Streaming Event Routing
# ------------------------------------------------------------------
print("[SIM] Testing Streaming Event Routing...")
stream_id = "Tier 3 (Worker): T-SIM-001"
# We use push_event which POSTs to /api/gui with action=mma_stream_append
# As defined in App._process_pending_gui_tasks
client.push_event('mma_stream_append', {'stream_id': stream_id, 'text': 'Hello '})
time.sleep(0.5)
client.push_event('mma_stream_append', {'stream_id': stream_id, 'text': 'World!'})
time.sleep(1.0)
status = client.get_mma_status()
streams = status.get('mma_streams', {})
assert streams.get(stream_id) == 'Hello World!', f"Streaming failed: {streams.get(stream_id)}"
print("[SIM] Streaming event routing verified.")
# ------------------------------------------------------------------
# 2. Verify State Update (Usage/Cost) Routing
# ------------------------------------------------------------------
print("[SIM] Testing State Update Routing...")
usage = {
"Tier 1": {"input": 1000, "output": 500, "model": "gemini-3.1-pro-preview"},
"Tier 2": {"input": 2000, "output": 1000, "model": "gemini-3-flash-preview"}
}
client.push_event('mma_state_update', {
'status': 'simulating',
'tier_usage': usage,
'tickets': []
})
time.sleep(1.0)
status = client.get_mma_status()
assert status.get('mma_status') == 'simulating'
# The app merges or replaces usage. Let's check what we got back.
received_usage = status.get('mma_tier_usage', {})
assert received_usage.get('Tier 1', {}).get('input') == 1000
assert received_usage.get('Tier 2', {}).get('model') == 'gemini-3-flash-preview'
print("[SIM] State update routing verified.")
# ------------------------------------------------------------------
# 2. Verify State Update (Usage/Cost) Routing
# ------------------------------------------------------------------
print("[SIM] Testing State Update Routing...")
usage = {
"Tier 1": {"input": 1000, "output": 500, "model": "gemini-3.1-pro-preview"},
"Tier 2": {"input": 2000, "output": 1000, "model": "gemini-3-flash-preview"}
}
client.push_event('mma_state_update', {
'status': 'simulating',
'tier_usage': usage,
'tickets': []
})
time.sleep(1.0)
status = client.get_mma_status()
assert status.get('mma_status') == 'simulating'
# The app merges or replaces usage. Let's check what we got back.
received_usage = status.get('mma_tier_usage', {})
assert received_usage.get('Tier 1', {}).get('input') == 1000
assert received_usage.get('Tier 2', {}).get('model') == 'gemini-3-flash-preview'
print("[SIM] State update routing verified.")
# ------------------------------------------------------------------
# 3. Verify Performance
# ------------------------------------------------------------------
print("[SIM] Testing Performance...")
# Wait for at least one second of frame data to accumulate for FPS calculation
time.sleep(2.0)
perf_data = client.get_performance()
assert perf_data is not None, "Failed to retrieve performance metrics"
perf = perf_data.get('performance', {})
fps = perf.get('fps', 0.0)
total_frames = perf.get('total_frames', 0)
print(f"[SIM] Current FPS: {fps}, Total Frames: {total_frames}")
assert fps >= 30.0, f"Performance degradation: {fps} FPS < 30.0 (Total Frames: {total_frames})"
print("[SIM] Performance verified.")
@pytest.mark.integration
@pytest.mark.timeout(60)
def test_gui_track_creation(live_gui) -> None:
client = ApiHookClient()
assert client.wait_for_server(timeout=15), "Hook server did not start"
print("[SIM] Testing Track Creation via GUI...")
track_name = 'UX_SIM_TEST'
track_desc = 'Simulation testing for GUI UX'
track_type = 'feature'
client.set_value('ui_new_track_name', track_name)
client.set_value('ui_new_track_desc', track_desc)
client.set_value('ui_new_track_type', track_type)
client.click('btn_mma_create_track')
time.sleep(2.0)
tracks_dir = 'conductor/tracks/'
found = False
# The implementation lowercases and replaces spaces with underscores
search_prefix = track_name.lower().replace(' ', '_')
for entry in os.listdir(tracks_dir):
if entry.startswith(search_prefix) and os.path.isdir(os.path.join(tracks_dir, entry)):
found = True
metadata_path = os.path.join(tracks_dir, entry, 'metadata.json')
assert os.path.exists(metadata_path), f"metadata.json missing in {entry}"
with open(metadata_path, 'r') as f:
meta = json.load(f)
assert meta.get('status') == 'new'
assert meta.get('title') == track_name
print(f"[SIM] Verified track directory: {entry}")
break
assert found, f"Track directory starting with {search_prefix} not found."
print("[SIM] Track creation verified.")
if __name__ == "__main__":
pass
pass