From 7a301685c393ea89cfc84618f8e99d2e3fe937f2 Mon Sep 17 00:00:00 2001 From: Ed_ Date: Thu, 26 Feb 2026 20:07:51 -0500 Subject: [PATCH] feat(mma): Implement ConductorEngine and run_worker_lifecycle --- multi_agent_conductor.py | 63 ++++++++++++++++++++++++ tests/test_conductor_engine.py | 88 ++++++++++++++++++++++++++++++++++ 2 files changed, 151 insertions(+) create mode 100644 multi_agent_conductor.py create mode 100644 tests/test_conductor_engine.py diff --git a/multi_agent_conductor.py b/multi_agent_conductor.py new file mode 100644 index 0000000..79f6c12 --- /dev/null +++ b/multi_agent_conductor.py @@ -0,0 +1,63 @@ +import ai_client +from models import Ticket, Track, WorkerContext + +class ConductorEngine: + """ + Orchestrates the execution of tickets within a track. + """ + def __init__(self, track: Track): + self.track = track + + def run_linear(self): + """ + Executes tickets sequentially according to their dependencies. + Iterates through the track's executable tickets until no more can be run. + """ + while True: + executable = self.track.get_executable_tickets() + if not executable: + # Check if we are finished or blocked + all_done = all(t.status == "completed" for t in self.track.tickets) + if all_done: + print("Track completed successfully.") + else: + print("No more executable tickets. Track may be blocked or finished.") + break + + for ticket in executable: + print(f"Executing ticket {ticket.id}: {ticket.description}") + # For now, we use a default model name or take it from config + context = WorkerContext( + ticket_id=ticket.id, + model_name="gemini-2.5-flash-lite", + messages=[] + ) + run_worker_lifecycle(ticket, context) + +def run_worker_lifecycle(ticket: Ticket, context: WorkerContext): + """ + Simulates the lifecycle of a single agent working on a ticket. + Calls the AI client and updates the ticket status based on the response. + """ + # Build a prompt for the worker + user_message = ( + f"You are assigned to Ticket {ticket.id}.\n" + f"Task Description: {ticket.description}\n" + "Please complete this task. If you are blocked and cannot proceed, " + "start your response with 'BLOCKED' and explain why." + ) + + # In a real scenario, we would pass md_content from the aggregator + # and manage the conversation history in the context. + response = ai_client.send( + md_content="", + user_message=user_message, + base_dir="." + ) + + if "BLOCKED" in response.upper(): + ticket.mark_blocked(response) + else: + ticket.mark_complete() + + return response diff --git a/tests/test_conductor_engine.py b/tests/test_conductor_engine.py new file mode 100644 index 0000000..c48d398 --- /dev/null +++ b/tests/test_conductor_engine.py @@ -0,0 +1,88 @@ +import pytest +from unittest.mock import MagicMock, patch +from models import Ticket, Track, WorkerContext + +# These tests define the expected interface for multi_agent_conductor.py +# which will be implemented in the next phase of TDD. + +def test_conductor_engine_initialization(): + """ + Test that ConductorEngine can be initialized with a Track. + """ + track = Track(id="test_track", description="Test Track") + from multi_agent_conductor import ConductorEngine + engine = ConductorEngine(track=track) + assert engine.track == track + +def test_conductor_engine_run_linear_executes_tickets_in_order(): + """ + Test that run_linear iterates through executable tickets and calls the worker lifecycle. + """ + ticket1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1") + ticket2 = Ticket(id="T2", description="Task 2", status="todo", assigned_to="worker2", depends_on=["T1"]) + track = Track(id="track1", description="Track 1", tickets=[ticket1, ticket2]) + + from multi_agent_conductor import ConductorEngine + engine = ConductorEngine(track=track) + + # We mock run_worker_lifecycle as it is expected to be in the same module + with patch("multi_agent_conductor.run_worker_lifecycle") as mock_lifecycle: + # Mocking lifecycle to mark ticket as complete so dependencies can be resolved + def side_effect(ticket, context): + ticket.mark_complete() + return "Success" + mock_lifecycle.side_effect = side_effect + + engine.run_linear() + + # Track.get_executable_tickets() should be called repeatedly until all are done + # T1 should run first, then T2. + assert mock_lifecycle.call_count == 2 + assert ticket1.status == "completed" + assert ticket2.status == "completed" + + # Verify sequence: T1 before T2 + calls = mock_lifecycle.call_args_list + assert calls[0][0][0].id == "T1" + assert calls[1][0][0].id == "T2" + +def test_run_worker_lifecycle_calls_ai_client_send(): + """ + Test that run_worker_lifecycle triggers the AI client and updates ticket status on success. + """ + ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1") + context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[]) + + from multi_agent_conductor import run_worker_lifecycle + + with patch("ai_client.send") as mock_send: + mock_send.return_value = "Task complete. I have updated the file." + + result = run_worker_lifecycle(ticket, context) + + assert result == "Task complete. I have updated the file." + assert ticket.status == "completed" + mock_send.assert_called_once() + + # Check if description was passed to send() + args, kwargs = mock_send.call_args + # user_message is passed as a keyword argument + assert ticket.description in kwargs["user_message"] + +def test_run_worker_lifecycle_handles_blocked_response(): + """ + Test that run_worker_lifecycle marks the ticket as blocked if the AI indicates it cannot proceed. + """ + ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1") + context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[]) + + from multi_agent_conductor import run_worker_lifecycle + + with patch("ai_client.send") as mock_send: + # Simulate a response indicating a block + mock_send.return_value = "I am BLOCKED because I don't have enough information." + + run_worker_lifecycle(ticket, context) + + assert ticket.status == "blocked" + assert "BLOCKED" in ticket.blocked_reason