From 2429b7c1b4c191325ccfc7e9f350a6c49686b454 Mon Sep 17 00:00:00 2001 From: Ed_ Date: Fri, 27 Feb 2026 20:12:23 -0500 Subject: [PATCH] feat(mma): Connect ExecutionEngine to ConductorEngine and Tech Lead --- conductor_tech_lead.py | 48 ++++++++-------------- multi_agent_conductor.py | 88 ++++++++++++++++++++++++---------------- 2 files changed, 70 insertions(+), 66 deletions(-) diff --git a/conductor_tech_lead.py b/conductor_tech_lead.py index 371b583..9d6c836 100644 --- a/conductor_tech_lead.py +++ b/conductor_tech_lead.py @@ -56,43 +56,29 @@ def generate_tickets(track_brief: str, module_skeletons: str) -> list[dict]: # Restore old system prompt ai_client.set_custom_system_prompt(old_system_prompt) +from dag_engine import TrackDAG +from models import Ticket + def topological_sort(tickets: list[dict]) -> list[dict]: """ Sorts a list of tickets based on their 'depends_on' field. Raises ValueError if a circular dependency or missing internal dependency is detected. """ - # 1. Map ID to ticket and build graph + # 1. Convert to Ticket objects for TrackDAG + ticket_objs = [] + for t_data in tickets: + ticket_objs.append(Ticket.from_dict(t_data)) + + # 2. Use TrackDAG for validation and sorting + dag = TrackDAG(ticket_objs) + try: + sorted_ids = dag.topological_sort() + except ValueError as e: + raise ValueError(f"DAG Validation Error: {e}") + + # 3. Return sorted dictionaries ticket_map = {t['id']: t for t in tickets} - adj = {t['id']: [] for t in tickets} - in_degree = {t['id']: 0 for t in tickets} - - for t in tickets: - for dep_id in t.get('depends_on', []): - if dep_id not in ticket_map: - raise ValueError(f"Missing dependency: Ticket '{t['id']}' depends on '{dep_id}', but '{dep_id}' is not in the ticket list.") - adj[dep_id].append(t['id']) - in_degree[t['id']] += 1 - - # 2. Find nodes with in-degree 0 - queue = [t['id'] for t in tickets if in_degree[t['id']] == 0] - sorted_ids = [] - - # 3. Process queue - while queue: - u_id = queue.pop(0) - sorted_ids.append(u_id) - for v_id in adj[u_id]: - in_degree[v_id] -= 1 - if in_degree[v_id] == 0: - queue.append(v_id) - - # 4. Check for cycles - if len(sorted_ids) != len(tickets): - # Find which tickets are part of a cycle (or blocked by one) - remaining = [t_id for t_id in ticket_map if t_id not in sorted_ids] - raise ValueError(f"Circular dependency detected among tickets: {remaining}") - - return [ticket_map[t_id] for t_id in sorted_ids] + return [ticket_map[tid] for tid in sorted_ids] if __name__ == "__main__": # Quick test if run directly diff --git a/multi_agent_conductor.py b/multi_agent_conductor.py index 340fe53..7606fd9 100644 --- a/multi_agent_conductor.py +++ b/multi_agent_conductor.py @@ -7,11 +7,13 @@ import events from models import Ticket, Track, WorkerContext from file_cache import ASTParser +from dag_engine import TrackDAG, ExecutionEngine + class ConductorEngine: """ Orchestrates the execution of tickets within a track. """ - def __init__(self, track: Track, event_queue: Optional[events.AsyncEventQueue] = None): + def __init__(self, track: Track, event_queue: Optional[events.AsyncEventQueue] = None, auto_queue: bool = False): self.track = track self.event_queue = event_queue self.tier_usage = { @@ -20,6 +22,8 @@ class ConductorEngine: "Tier 3": {"input": 0, "output": 0}, "Tier 4": {"input": 0, "output": 0}, } + self.dag = TrackDAG(self.track.tickets) + self.engine = ExecutionEngine(self.dag, auto_queue=auto_queue) async def _push_state(self, status: str = "running", active_tier: str = None): if not self.event_queue: @@ -59,58 +63,72 @@ class ConductorEngine: step_mode=ticket_data.get("step_mode", False) ) self.track.tickets.append(ticket) + + # Rebuild DAG and Engine after parsing new tickets + self.dag = TrackDAG(self.track.tickets) + self.engine = ExecutionEngine(self.dag, auto_queue=self.engine.auto_queue) + except json.JSONDecodeError as e: print(f"Error parsing JSON tickets: {e}") except KeyError as e: print(f"Missing required field in ticket definition: {e}") - async def run_linear(self): + async def run(self): """ - Executes tickets sequentially according to their dependencies. - Iterates through the track's executable tickets until no more can be run. - Supports dynamic execution as tickets added during runtime will be picked up - in the next iteration of the main loop. + Main execution loop using the DAG engine. """ await self._push_state(status="running", active_tier="Tier 2 (Tech Lead)") while True: - executable = self.track.get_executable_tickets() - if not executable: - # Check if we are finished or blocked + # 1. Identify ready tasks + ready_tasks = self.engine.tick() + + # 2. Check for completion or blockage + if not ready_tasks: all_done = all(t.status == "completed" for t in self.track.tickets) if all_done: print("Track completed successfully.") await self._push_state(status="done", active_tier=None) else: - # If we have no executable tickets but some are not completed, we might be blocked - # or there are simply no more tickets to run at this moment. - incomplete = [t for t in self.track.tickets if t.status != "completed"] - if not incomplete: - print("Track completed successfully.") - await self._push_state(status="done", active_tier=None) - else: - print(f"No more executable tickets. {len(incomplete)} tickets remain incomplete.") - await self._push_state(status="blocked", active_tier=None) + # Check if any tasks are in-progress or could be ready + if any(t.status == "running" for t in self.track.tickets): + # Wait for async tasks to complete + await asyncio.sleep(1) + continue + + print("No more executable tickets. Track is blocked or finished.") + await self._push_state(status="blocked", active_tier=None) break - - for ticket in executable: - # We re-check status in case it was modified by a parallel/dynamic process - # (though run_linear is currently single-threaded) - if ticket.status != "todo": - continue - print(f"Executing ticket {ticket.id}: {ticket.description}") - ticket.status = "running" - await self._push_state(active_tier=f"Tier 3 (Worker): {ticket.id}") + # 3. Process ready tasks + for ticket in ready_tasks: + # If auto_queue is on and step_mode is off, engine.tick() already marked it 'running' + # but we need to verify and handle the lifecycle. + if ticket.status != "running" and not ticket.step_mode and self.engine.auto_queue: + # This shouldn't happen with current ExecutionEngine.tick() but for safety: + ticket.status = "running" + + if ticket.status == "running": + print(f"Executing ticket {ticket.id}: {ticket.description}") + await self._push_state(active_tier=f"Tier 3 (Worker): {ticket.id}") + + context = WorkerContext( + ticket_id=ticket.id, + model_name="gemini-2.5-flash-lite", + messages=[] + ) + # Note: In a fully async version, we would wrap this in a task + run_worker_lifecycle(ticket, context, event_queue=self.event_queue, engine=self) + await self._push_state(active_tier="Tier 2 (Tech Lead)") - # For now, we use a default model name or take it from config - context = WorkerContext( - ticket_id=ticket.id, - model_name="gemini-2.5-flash-lite", - messages=[] - ) - run_worker_lifecycle(ticket, context, event_queue=self.event_queue, engine=self) - await self._push_state(active_tier="Tier 2 (Tech Lead)") + elif ticket.status == "todo" and (ticket.step_mode or not self.engine.auto_queue): + # Task is ready but needs approval + print(f"Ticket {ticket.id} is ready and awaiting approval.") + await self._push_state(active_tier=f"Awaiting Approval: {ticket.id}") + # In a real UI, this would wait for a user event. + # For headless/linear run, we might need a signal. + # For now, we'll treat it as a pause point if not auto-queued. + pass def confirm_execution(payload: str, event_queue: events.AsyncEventQueue, ticket_id: str) -> bool: """