import ai_client import json import asyncio from typing import List, Optional from dataclasses import asdict import events from models import Ticket, Track, WorkerContext from file_cache import ASTParser from dag_engine import TrackDAG, ExecutionEngine class ConductorEngine: """ Orchestrates the execution of tickets within a track. """ def __init__(self, track: Track, event_queue: Optional[events.AsyncEventQueue] = None, auto_queue: bool = False): self.track = track self.event_queue = event_queue self.tier_usage = { "Tier 1": {"input": 0, "output": 0}, "Tier 2": {"input": 0, "output": 0}, "Tier 3": {"input": 0, "output": 0}, "Tier 4": {"input": 0, "output": 0}, } self.dag = TrackDAG(self.track.tickets) self.engine = ExecutionEngine(self.dag, auto_queue=auto_queue) async def _push_state(self, status: str = "running", active_tier: str = None): if not self.event_queue: return payload = { "status": status, "active_tier": active_tier, "tier_usage": self.tier_usage, "track": { "id": self.track.id, "title": self.track.description, }, "tickets": [asdict(t) for t in self.track.tickets] } await self.event_queue.put("mma_state_update", payload) def parse_json_tickets(self, json_str: str): """ Parses a JSON string of ticket definitions (Godot ECS Flat List format) and populates the Track's ticket list. """ try: data = json.loads(json_str) if not isinstance(data, list): print("Error: JSON input must be a list of ticket definitions.") return for ticket_data in data: # Construct Ticket object, using defaults for optional fields ticket = Ticket( id=ticket_data["id"], description=ticket_data["description"], status=ticket_data.get("status", "todo"), assigned_to=ticket_data.get("assigned_to", "unassigned"), depends_on=ticket_data.get("depends_on", []), step_mode=ticket_data.get("step_mode", False) ) self.track.tickets.append(ticket) # Rebuild DAG and Engine after parsing new tickets self.dag = TrackDAG(self.track.tickets) self.engine = ExecutionEngine(self.dag, auto_queue=self.engine.auto_queue) except json.JSONDecodeError as e: print(f"Error parsing JSON tickets: {e}") except KeyError as e: print(f"Missing required field in ticket definition: {e}") async def run(self): """ Main execution loop using the DAG engine. """ await self._push_state(status="running", active_tier="Tier 2 (Tech Lead)") while True: # 1. Identify ready tasks ready_tasks = self.engine.tick() # 2. Check for completion or blockage if not ready_tasks: all_done = all(t.status == "completed" for t in self.track.tickets) if all_done: print("Track completed successfully.") await self._push_state(status="done", active_tier=None) else: # Check if any tasks are in-progress or could be ready if any(t.status == "running" for t in self.track.tickets): # Wait for async tasks to complete await asyncio.sleep(1) continue print("No more executable tickets. Track is blocked or finished.") await self._push_state(status="blocked", active_tier=None) break # 3. Process ready tasks for ticket in ready_tasks: # If auto_queue is on and step_mode is off, engine.tick() already marked it 'running' # but we need to verify and handle the lifecycle. if ticket.status != "running" and not ticket.step_mode and self.engine.auto_queue: # This shouldn't happen with current ExecutionEngine.tick() but for safety: ticket.status = "running" if ticket.status == "running": print(f"Executing ticket {ticket.id}: {ticket.description}") await self._push_state(active_tier=f"Tier 3 (Worker): {ticket.id}") context = WorkerContext( ticket_id=ticket.id, model_name="gemini-2.5-flash-lite", messages=[] ) # Note: In a fully async version, we would wrap this in a task run_worker_lifecycle(ticket, context, event_queue=self.event_queue, engine=self) await self._push_state(active_tier="Tier 2 (Tech Lead)") elif ticket.status == "todo" and (ticket.step_mode or not self.engine.auto_queue): # Task is ready but needs approval print(f"Ticket {ticket.id} is ready and awaiting approval.") await self._push_state(active_tier=f"Awaiting Approval: {ticket.id}") # In a real UI, this would wait for a user event. # For headless/linear run, we might need a signal. # For now, we'll treat it as a pause point if not auto-queued. pass def confirm_execution(payload: str, event_queue: events.AsyncEventQueue, ticket_id: str) -> bool: """ Pushes an approval request to the GUI and waits for response. """ import threading import time import asyncio # We use a list container so the GUI can inject the actual Dialog object back to us # since the dialog is created in the GUI thread. dialog_container = [None] task = { "action": "mma_step_approval", "ticket_id": ticket_id, "payload": payload, "dialog_container": dialog_container } # Push to queue try: loop = asyncio.get_event_loop() if loop.is_running(): asyncio.run_coroutine_threadsafe(event_queue.put("mma_step_approval", task), loop) else: event_queue._queue.put_nowait(("mma_step_approval", task)) except Exception: # Fallback if no loop event_queue._queue.put_nowait(("mma_step_approval", task)) # Wait for the GUI to create the dialog and for the user to respond start = time.time() while dialog_container[0] is None and time.time() - start < 60: time.sleep(0.1) if dialog_container[0]: approved, final_payload = dialog_container[0].wait() return approved return False def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: List[str] = None, event_queue: events.AsyncEventQueue = None, engine: Optional['ConductorEngine'] = None): """ Simulates the lifecycle of a single agent working on a ticket. Calls the AI client and updates the ticket status based on the response. """ # Enforce Context Amnesia: each ticket starts with a clean slate. ai_client.reset_session() context_injection = "" if context_files: parser = ASTParser(language="python") for i, file_path in enumerate(context_files): try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() if i == 0: view = parser.get_curated_view(content) else: view = parser.get_skeleton(content) context_injection += f"\nFile: {file_path}\n{view}\n" except Exception as e: context_injection += f"\nError reading {file_path}: {e}\n" # Build a prompt for the worker user_message = ( f"You are assigned to Ticket {ticket.id}.\n" f"Task Description: {ticket.description}\n" ) if context_injection: user_message += f"\nContext Files:\n{context_injection}\n" user_message += ( "Please complete this task. If you are blocked and cannot proceed, " "start your response with 'BLOCKED' and explain why." ) # In a real scenario, we would pass md_content from the aggregator # and manage the conversation history in the context. # HITL Clutch: pass the queue and ticket_id to confirm_execution def clutch_callback(payload: str) -> bool: if not event_queue: return True return confirm_execution(payload, event_queue, ticket.id) response = ai_client.send( md_content="", user_message=user_message, base_dir=".", pre_tool_callback=clutch_callback if ticket.step_mode else None, qa_callback=ai_client.run_tier4_analysis ) # Update usage in engine if provided if engine: stats = {} # ai_client.get_token_stats() is not available # ai_client provides aggregate stats, for granular tier tracking # we'd need to diff before/after or have ai_client return usage per call. # For Phase 4, we'll use a simplified diff approach. engine.tier_usage["Tier 3"]["input"] += stats.get("prompt_tokens", 0) engine.tier_usage["Tier 3"]["output"] += stats.get("candidates_tokens", 0) if "BLOCKED" in response.upper(): ticket.mark_blocked(response) else: ticket.mark_complete() return response