checkpoint: mma_orchestrator track

This commit is contained in:
2026-02-26 22:59:26 -05:00
parent f05fa3d340
commit d087a20f7b
17 changed files with 930 additions and 73 deletions

25
tests/diag_subagent.py Normal file
View File

@@ -0,0 +1,25 @@
import subprocess
import sys
import os
def run_diag(role, prompt):
print(f"--- Running Diag for {role} ---")
cmd = [sys.executable, "scripts/mma_exec.py", "--role", role, prompt]
try:
result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8')
print("STDOUT:")
print(result.stdout)
print("STDERR:")
print(result.stderr)
return result.stdout
except Exception as e:
print(f"FAILED: {e}")
return str(e)
if __name__ == "__main__":
# Test 1: Simple read
print("TEST 1: read_file")
run_diag("tier3-worker", "Read the file 'pyproject.toml' and tell me the version of the project. ONLY the version string.")
print("\nTEST 2: run_shell_command")
run_diag("tier3-worker", "Use run_shell_command to execute 'echo HELLO_SUBAGENT' and return the output. ONLY the output.")

View File

@@ -17,5 +17,5 @@ history = [
[discussions."mma_human veriffication"]
git_commit = ""
last_updated = "2026-02-26T22:06:01"
last_updated = "2026-02-26T22:07:06"
history = []

View File

@@ -0,0 +1,116 @@
import unittest
from unittest.mock import patch, MagicMock
import json
import conductor_tech_lead
class TestConductorTechLead(unittest.TestCase):
@patch('ai_client.send')
@patch('ai_client.set_provider')
@patch('ai_client.reset_session')
def test_generate_tickets_success(self, mock_reset_session, mock_set_provider, mock_send):
# Setup mock response
mock_tickets = [
{
"id": "ticket_1",
"type": "Ticket",
"goal": "Test goal",
"target_file": "test.py",
"depends_on": [],
"context_requirements": []
}
]
mock_send.return_value = "```json\n" + json.dumps(mock_tickets) + "\n```"
track_brief = "Test track brief"
module_skeletons = "Test skeletons"
# Call the function
tickets = conductor_tech_lead.generate_tickets(track_brief, module_skeletons)
# Verify set_provider was called
mock_set_provider.assert_called_with('gemini', 'gemini-1.5-flash')
mock_reset_session.assert_called_once()
# Verify send was called
mock_send.assert_called_once()
args, kwargs = mock_send.call_args
self.assertEqual(kwargs['md_content'], "")
self.assertIn(track_brief, kwargs['user_message'])
self.assertIn(module_skeletons, kwargs['user_message'])
# Verify tickets were parsed correctly
self.assertEqual(tickets, mock_tickets)
@patch('ai_client.send')
@patch('ai_client.set_provider')
@patch('ai_client.reset_session')
def test_generate_tickets_parse_error(self, mock_reset_session, mock_set_provider, mock_send):
# Setup mock invalid response
mock_send.return_value = "Invalid JSON"
# Call the function
tickets = conductor_tech_lead.generate_tickets("brief", "skeletons")
# Verify it returns an empty list on parse error
self.assertEqual(tickets, [])
class TestTopologicalSort(unittest.TestCase):
def test_topological_sort_empty(self):
tickets = []
sorted_tickets = conductor_tech_lead.topological_sort(tickets)
self.assertEqual(sorted_tickets, [])
def test_topological_sort_linear(self):
tickets = [
{"id": "t2", "depends_on": ["t1"]},
{"id": "t1", "depends_on": []},
{"id": "t3", "depends_on": ["t2"]},
]
sorted_tickets = conductor_tech_lead.topological_sort(tickets)
ids = [t["id"] for t in sorted_tickets]
self.assertEqual(ids, ["t1", "t2", "t3"])
def test_topological_sort_complex(self):
# t1
# | \
# t2 t3
# | /
# t4
tickets = [
{"id": "t4", "depends_on": ["t2", "t3"]},
{"id": "t3", "depends_on": ["t1"]},
{"id": "t2", "depends_on": ["t1"]},
{"id": "t1", "depends_on": []},
]
sorted_tickets = conductor_tech_lead.topological_sort(tickets)
ids = [t["id"] for t in sorted_tickets]
# Possible valid orders: [t1, t2, t3, t4] or [t1, t3, t2, t4]
self.assertEqual(ids[0], "t1")
self.assertEqual(ids[-1], "t4")
self.assertSetEqual(set(ids[1:3]), {"t2", "t3"})
def test_topological_sort_cycle(self):
tickets = [
{"id": "t1", "depends_on": ["t2"]},
{"id": "t2", "depends_on": ["t1"]},
]
with self.assertRaises(ValueError) as cm:
conductor_tech_lead.topological_sort(tickets)
self.assertIn("Circular dependency detected", str(cm.exception))
def test_topological_sort_missing_dependency(self):
# If a ticket depends on something not in the list, we should probably handle it or let it fail.
# Usually in our context, we only care about dependencies within the same track.
tickets = [
{"id": "t1", "depends_on": ["missing"]},
]
# For now, let's assume it should raise an error if a dependency is missing within the set we are sorting,
# OR it should just treat it as "ready" if it's external?
# Actually, let's just test that it doesn't crash if it's not a cycle.
# But if 'missing' is not in tickets, it will never be satisfied.
# Let's say it raises ValueError for missing internal dependencies.
with self.assertRaises(ValueError):
conductor_tech_lead.topological_sort(tickets)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,80 @@
import pytest
from unittest.mock import patch, MagicMock
import threading
import time
from gui_2 import App
@pytest.fixture
def app_instance():
with (
patch('gui_2.load_config', return_value={'ai': {}, 'projects': {}}),
patch('gui_2.save_config'),
patch('gui_2.project_manager'),
patch('gui_2.session_logger'),
patch('gui_2.immapp.run'),
patch.object(App, '_load_active_project'),
patch.object(App, '_fetch_models'),
patch.object(App, '_load_fonts'),
patch.object(App, '_post_init')
):
app = App()
# Initialize the new state variables if they aren't there yet (they won't be until we implement them)
if not hasattr(app, 'ui_epic_input'): app.ui_epic_input = ""
if not hasattr(app, 'proposed_tracks'): app.proposed_tracks = []
if not hasattr(app, '_show_track_proposal_modal'): app._show_track_proposal_modal = False
yield app
def test_mma_ui_state_initialization(app_instance):
"""Verifies that the new MMA UI state variables are initialized correctly."""
assert hasattr(app_instance, 'ui_epic_input')
assert hasattr(app_instance, 'proposed_tracks')
assert hasattr(app_instance, '_show_track_proposal_modal')
assert app_instance.ui_epic_input == ""
assert app_instance.proposed_tracks == []
assert app_instance._show_track_proposal_modal is False
def test_process_pending_gui_tasks_show_track_proposal(app_instance):
"""Verifies that the 'show_track_proposal' action correctly updates the UI state."""
mock_tracks = [{"id": "track_1", "title": "Test Track"}]
task = {
"action": "show_track_proposal",
"payload": mock_tracks
}
app_instance._pending_gui_tasks.append(task)
app_instance._process_pending_gui_tasks()
assert app_instance.proposed_tracks == mock_tracks
assert app_instance._show_track_proposal_modal is True
def test_cb_plan_epic_launches_thread(app_instance):
"""Verifies that _cb_plan_epic launches a thread and eventually queues a task."""
app_instance.ui_epic_input = "Develop a new feature"
app_instance.active_project_path = "test_project.toml"
mock_tracks = [{"id": "track_1", "title": "Test Track"}]
with patch('orchestrator_pm.get_track_history_summary', return_value="History summary") as mock_get_history,
patch('orchestrator_pm.generate_tracks', return_value=mock_tracks) as mock_gen_tracks,
patch('aggregate.build_file_items', return_value=[]) as mock_build_files:
# We need to mock project_manager.flat_config and project_manager.load_project
with patch('project_manager.load_project', return_value={}),
patch('project_manager.flat_config', return_value={}):
app_instance._cb_plan_epic()
# Wait for the background thread to finish (it should be quick with mocks)
# In a real test, we might need a more robust way to wait, but for now:
max_wait = 5
start_time = time.time()
while len(app_instance._pending_gui_tasks) == 0 and time.time() - start_time < max_wait:
time.sleep(0.1)
assert len(app_instance._pending_gui_tasks) > 0
task = app_instance._pending_gui_tasks[0]
assert task['action'] == 'show_track_proposal'
assert task['payload'] == mock_tracks
mock_get_history.assert_called_once()
mock_gen_tracks.assert_called_once()

View File

@@ -0,0 +1,53 @@
import pytest
from unittest.mock import patch, MagicMock
import asyncio
from gui_2 import App
@pytest.fixture
def app_instance():
with (
patch('gui_2.load_config', return_value={'ai': {}, 'projects': {}}),
patch('gui_2.save_config'),
patch('gui_2.project_manager'),
patch('gui_2.session_logger'),
patch('gui_2.immapp.run'),
patch.object(App, '_load_active_project'),
patch.object(App, '_fetch_models'),
patch.object(App, '_load_fonts'),
patch.object(App, '_post_init')
):
app = App()
app.active_tickets = []
app._loop = MagicMock()
yield app
def test_cb_ticket_retry(app_instance):
ticket_id = "test_ticket_1"
app_instance.active_tickets = [{"id": ticket_id, "status": "failed"}]
with patch('asyncio.run_coroutine_threadsafe') as mock_run_safe:
app_instance._cb_ticket_retry(ticket_id)
# Verify status update
assert app_instance.active_tickets[0]['status'] == 'todo'
# Verify event pushed
mock_run_safe.assert_called_once()
# First arg is the coroutine (event_queue.put), second is self._loop
args, _ = mock_run_safe.call_args
assert args[1] == app_instance._loop
def test_cb_ticket_skip(app_instance):
ticket_id = "test_ticket_1"
app_instance.active_tickets = [{"id": ticket_id, "status": "todo"}]
with patch('asyncio.run_coroutine_threadsafe') as mock_run_safe:
app_instance._cb_ticket_skip(ticket_id)
# Verify status update
assert app_instance.active_tickets[0]['status'] == 'skipped'
# Verify event pushed
mock_run_safe.assert_called_once()
args, _ = mock_run_safe.call_args
assert args[1] == app_instance._loop

View File

@@ -0,0 +1,81 @@
import unittest
from unittest.mock import patch, MagicMock
import json
import orchestrator_pm
import mma_prompts
class TestOrchestratorPM(unittest.TestCase):
@patch('summarize.build_summary_markdown')
@patch('ai_client.send')
def test_generate_tracks_success(self, mock_send, mock_summarize):
# Setup mocks
mock_summarize.return_value = "REPO_MAP_CONTENT"
mock_response_data = [
{
"id": "track_1",
"type": "Track",
"module": "test_module",
"persona": "Tech Lead",
"severity": "Medium",
"goal": "Test goal",
"acceptance_criteria": ["criteria 1"]
}
]
mock_send.return_value = json.dumps(mock_response_data)
user_request = "Implement unit tests"
project_config = {"files": {"paths": ["src"]}}
file_items = [{"path": "src/main.py", "content": "print('hello')"}]
# Execute
result = orchestrator_pm.generate_tracks(user_request, project_config, file_items)
# Verify summarize call
mock_summarize.assert_called_once_with(file_items)
# Verify ai_client.send call
expected_system_prompt = mma_prompts.PROMPTS['tier1_epic_init']
mock_send.assert_called_once()
args, kwargs = mock_send.call_args
self.assertEqual(kwargs['md_content'], "")
self.assertEqual(kwargs['system_prompt'], expected_system_prompt)
self.assertIn(user_request, kwargs['user_message'])
self.assertIn("REPO_MAP_CONTENT", kwargs['user_message'])
self.assertEqual(kwargs['model_name'], "gemini-1.5-pro")
# Verify result
self.assertEqual(result, mock_response_data)
@patch('summarize.build_summary_markdown')
@patch('ai_client.send')
def test_generate_tracks_markdown_wrapped(self, mock_send, mock_summarize):
mock_summarize.return_value = "REPO_MAP"
mock_response_data = [{"id": "track_1"}]
# Wrapped in ```json ... ```
mock_send.return_value = f"Here is the plan:\n```json\n{json.dumps(mock_response_data)}\n```\nHope this helps."
result = orchestrator_pm.generate_tracks("req", {}, [])
self.assertEqual(result, mock_response_data)
# Wrapped in ``` ... ```
mock_send.return_value = f"```\n{json.dumps(mock_response_data)}\n```"
result = orchestrator_pm.generate_tracks("req", {}, [])
self.assertEqual(result, mock_response_data)
@patch('summarize.build_summary_markdown')
@patch('ai_client.send')
def test_generate_tracks_malformed_json(self, mock_send, mock_summarize):
mock_summarize.return_value = "REPO_MAP"
mock_send.return_value = "NOT A JSON"
# Should return empty list and print error (we can mock print if we want to be thorough)
with patch('builtins.print') as mock_print:
result = orchestrator_pm.generate_tracks("req", {}, [])
self.assertEqual(result, [])
mock_print.assert_any_call("Error parsing Tier 1 response: Expecting value: line 1 column 1 (char 0)")
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,76 @@
import unittest
from unittest.mock import patch, MagicMock
import os
import shutil
import json
from pathlib import Path
import orchestrator_pm
class TestOrchestratorPMHistory(unittest.TestCase):
def setUp(self):
self.test_dir = Path("test_conductor")
self.test_dir.mkdir(exist_ok=True)
self.archive_dir = self.test_dir / "archive"
self.tracks_dir = self.test_dir / "tracks"
self.archive_dir.mkdir(exist_ok=True)
self.tracks_dir.mkdir(exist_ok=True)
def tearDown(self):
if self.test_dir.exists():
shutil.rmtree(self.test_dir)
def create_track(self, parent_dir, track_id, title, status, overview):
track_path = parent_dir / track_id
track_path.mkdir(exist_ok=True)
metadata = {"title": title, "status": status}
with open(track_path / "metadata.json", "w") as f:
json.dump(metadata, f)
spec_content = f"# Specification\n\n## Overview\n{overview}"
with open(track_path / "spec.md", "w") as f:
f.write(spec_content)
@patch('orchestrator_pm.CONDUCTOR_PATH', Path("test_conductor"))
def test_get_track_history_summary(self):
# Setup mock tracks
self.create_track(self.archive_dir, "track_001", "Initial Setup", "completed", "Setting up the project structure.")
self.create_track(self.tracks_dir, "track_002", "Feature A", "in_progress", "Implementing Feature A.")
summary = orchestrator_pm.get_track_history_summary()
self.assertIn("Initial Setup", summary)
self.assertIn("completed", summary)
self.assertIn("Setting up the project structure.", summary)
self.assertIn("Feature A", summary)
self.assertIn("in_progress", summary)
self.assertIn("Implementing Feature A.", summary)
@patch('orchestrator_pm.CONDUCTOR_PATH', Path("test_conductor"))
def test_get_track_history_summary_missing_files(self):
# Track with missing spec.md
track_path = self.tracks_dir / "track_003"
track_path.mkdir(exist_ok=True)
with open(track_path / "metadata.json", "w") as f:
json.dump({"title": "Missing Spec", "status": "pending"}, f)
summary = orchestrator_pm.get_track_history_summary()
self.assertIn("Missing Spec", summary)
self.assertIn("pending", summary)
self.assertIn("No overview available", summary)
@patch('orchestrator_pm.summarize.build_summary_markdown')
@patch('ai_client.send')
def test_generate_tracks_with_history(self, mock_send, mock_summarize):
mock_summarize.return_value = "REPO_MAP"
mock_send.return_value = "[]"
history_summary = "PAST_HISTORY_SUMMARY"
orchestrator_pm.generate_tracks("req", {}, [], history_summary=history_summary)
args, kwargs = mock_send.call_args
self.assertIn(history_summary, kwargs['user_message'])
self.assertIn("### TRACK HISTORY:", kwargs['user_message'])
if __name__ == '__main__':
unittest.main()