feat(mma): Implement dynamic ticket parsing and dispatcher loop in ConductorEngine

This commit is contained in:
2026-02-26 20:40:16 -05:00
parent 0acd1ea442
commit 1dacd3613e
2 changed files with 104 additions and 1 deletions

View File

@@ -1,4 +1,5 @@
import ai_client import ai_client
import json
from typing import List, Optional from typing import List, Optional
from models import Ticket, Track, WorkerContext from models import Ticket, Track, WorkerContext
from file_cache import ASTParser from file_cache import ASTParser
@@ -10,10 +11,39 @@ class ConductorEngine:
def __init__(self, track: Track): def __init__(self, track: Track):
self.track = track self.track = track
def parse_json_tickets(self, json_str: str):
"""
Parses a JSON string of ticket definitions (Godot ECS Flat List format)
and populates the Track's ticket list.
"""
try:
data = json.loads(json_str)
if not isinstance(data, list):
print("Error: JSON input must be a list of ticket definitions.")
return
for ticket_data in data:
# Construct Ticket object, using defaults for optional fields
ticket = Ticket(
id=ticket_data["id"],
description=ticket_data["description"],
status=ticket_data.get("status", "todo"),
assigned_to=ticket_data.get("assigned_to", "unassigned"),
depends_on=ticket_data.get("depends_on", []),
step_mode=ticket_data.get("step_mode", False)
)
self.track.tickets.append(ticket)
except json.JSONDecodeError as e:
print(f"Error parsing JSON tickets: {e}")
except KeyError as e:
print(f"Missing required field in ticket definition: {e}")
def run_linear(self): def run_linear(self):
""" """
Executes tickets sequentially according to their dependencies. Executes tickets sequentially according to their dependencies.
Iterates through the track's executable tickets until no more can be run. Iterates through the track's executable tickets until no more can be run.
Supports dynamic execution as tickets added during runtime will be picked up
in the next iteration of the main loop.
""" """
while True: while True:
executable = self.track.get_executable_tickets() executable = self.track.get_executable_tickets()
@@ -23,10 +53,21 @@ class ConductorEngine:
if all_done: if all_done:
print("Track completed successfully.") print("Track completed successfully.")
else: else:
print("No more executable tickets. Track may be blocked or finished.") # If we have no executable tickets but some are not completed, we might be blocked
# or there are simply no more tickets to run at this moment.
incomplete = [t for t in self.track.tickets if t.status != "completed"]
if not incomplete:
print("Track completed successfully.")
else:
print(f"No more executable tickets. {len(incomplete)} tickets remain incomplete.")
break break
for ticket in executable: for ticket in executable:
# We re-check status in case it was modified by a parallel/dynamic process
# (though run_linear is currently single-threaded)
if ticket.status != "todo":
continue
print(f"Executing ticket {ticket.id}: {ticket.description}") print(f"Executing ticket {ticket.id}: {ticket.description}")
# For now, we use a default model name or take it from config # For now, we use a default model name or take it from config
context = WorkerContext( context = WorkerContext(

View File

@@ -194,3 +194,65 @@ def test_run_worker_lifecycle_step_mode_rejection():
# Since we've already tested ai_client's implementation of pre_tool_callback (mentally or via other tests), # Since we've already tested ai_client's implementation of pre_tool_callback (mentally or via other tests),
# here we just verify the wiring. # here we just verify the wiring.
def test_conductor_engine_dynamic_parsing_and_execution():
"""
Test that parse_json_tickets correctly populates the track and run_linear executes them in dependency order.
"""
import json
from multi_agent_conductor import ConductorEngine
track = Track(id="dynamic_track", description="Dynamic Track")
engine = ConductorEngine(track=track)
tickets_json = json.dumps([
{
"id": "T1",
"description": "Initial task",
"status": "todo",
"assigned_to": "worker1",
"depends_on": []
},
{
"id": "T2",
"description": "Dependent task",
"status": "todo",
"assigned_to": "worker2",
"depends_on": ["T1"]
},
{
"id": "T3",
"description": "Another initial task",
"status": "todo",
"assigned_to": "worker3",
"depends_on": []
}
])
engine.parse_json_tickets(tickets_json)
assert len(engine.track.tickets) == 3
assert engine.track.tickets[0].id == "T1"
assert engine.track.tickets[1].id == "T2"
assert engine.track.tickets[2].id == "T3"
# Mock run_worker_lifecycle to mark tickets as complete
with patch("multi_agent_conductor.run_worker_lifecycle") as mock_lifecycle:
def side_effect(ticket, context):
ticket.mark_complete()
return "Success"
mock_lifecycle.side_effect = side_effect
engine.run_linear()
assert mock_lifecycle.call_count == 3
# Verify dependency order: T1 must be called before T2
calls = [call[0][0].id for call in mock_lifecycle.call_args_list]
t1_idx = calls.index("T1")
t2_idx = calls.index("T2")
assert t1_idx < t2_idx
# T3 can be anywhere relative to T1 and T2, but T1 < T2 is mandatory
assert "T3" in calls