import ai_client import json import asyncio import threading import time from typing import List, Optional, Tuple from dataclasses import asdict import events from models import Ticket, Track, WorkerContext from file_cache import ASTParser from pathlib import Path from dag_engine import TrackDAG, ExecutionEngine class ConductorEngine: """ Orchestrates the execution of tickets within a track. """ def __init__(self, track: Track, event_queue: Optional[events.AsyncEventQueue] = None, auto_queue: bool = False) -> None: self.track = track self.event_queue = event_queue self.tier_usage = { "Tier 1": {"input": 0, "output": 0}, "Tier 2": {"input": 0, "output": 0}, "Tier 3": {"input": 0, "output": 0}, "Tier 4": {"input": 0, "output": 0}, } self.dag = TrackDAG(self.track.tickets) self.engine = ExecutionEngine(self.dag, auto_queue=auto_queue) async def _push_state(self, status: str = "running", active_tier: str = None) -> None: if not self.event_queue: return payload = { "status": status, "active_tier": active_tier, "tier_usage": self.tier_usage, "track": { "id": self.track.id, "title": self.track.description, }, "tickets": [asdict(t) for t in self.track.tickets] } await self.event_queue.put("mma_state_update", payload) def parse_json_tickets(self, json_str: str) -> None: """ Parses a JSON string of ticket definitions (Godot ECS Flat List format) and populates the Track's ticket list. """ try: data = json.loads(json_str) if not isinstance(data, list): print("Error: JSON input must be a list of ticket definitions.") return for ticket_data in data: # Construct Ticket object, using defaults for optional fields ticket = Ticket( id=ticket_data["id"], description=ticket_data["description"], status=ticket_data.get("status", "todo"), assigned_to=ticket_data.get("assigned_to", "unassigned"), depends_on=ticket_data.get("depends_on", []), step_mode=ticket_data.get("step_mode", False) ) self.track.tickets.append(ticket) # Rebuild DAG and Engine after parsing new tickets self.dag = TrackDAG(self.track.tickets) self.engine = ExecutionEngine(self.dag, auto_queue=self.engine.auto_queue) except json.JSONDecodeError as e: print(f"Error parsing JSON tickets: {e}") except KeyError as e: print(f"Missing required field in ticket definition: {e}") async def run(self, md_content: str = "") -> None: """ Main execution loop using the DAG engine. Args: md_content: The full markdown context (history + files) for AI workers. """ await self._push_state(status="running", active_tier="Tier 2 (Tech Lead)") loop = asyncio.get_event_loop() while True: # 1. Identify ready tasks ready_tasks = self.engine.tick() # 2. Check for completion or blockage if not ready_tasks: all_done = all(t.status == "completed" for t in self.track.tickets) if all_done: print("Track completed successfully.") await self._push_state(status="done", active_tier=None) else: # Check if any tasks are in-progress or could be ready if any(t.status == "in_progress" for t in self.track.tickets): # Wait for async tasks to complete await asyncio.sleep(1) continue print("No more executable tickets. Track is blocked or finished.") await self._push_state(status="blocked", active_tier=None) break # 3. Process ready tasks for ticket in ready_tasks: # If auto_queue is on and step_mode is off, engine.tick() already marked it 'in_progress' # but we need to verify and handle the lifecycle. if ticket.status == "in_progress" or (not ticket.step_mode and self.engine.auto_queue): ticket.status = "in_progress" print(f"Executing ticket {ticket.id}: {ticket.description}") await self._push_state(active_tier=f"Tier 3 (Worker): {ticket.id}") context = WorkerContext( ticket_id=ticket.id, model_name="gemini-2.5-flash-lite", messages=[] ) # Offload the blocking lifecycle call to a thread to avoid blocking the async event loop. # We pass the md_content so the worker has full context. context_files = ticket.context_requirements if ticket.context_requirements else None await loop.run_in_executor( None, run_worker_lifecycle, ticket, context, context_files, self.event_queue, self, md_content, loop ) await self._push_state(active_tier="Tier 2 (Tech Lead)") elif ticket.status == "todo" and (ticket.step_mode or not self.engine.auto_queue): # Task is ready but needs approval print(f"Ticket {ticket.id} is ready and awaiting approval.") await self._push_state(active_tier=f"Awaiting Approval: {ticket.id}") # In a real UI, this would wait for a user event. # For now, we'll treat it as a pause point if not auto-queued. await asyncio.sleep(1) def _queue_put(event_queue: events.AsyncEventQueue, loop: asyncio.AbstractEventLoop, event_name: str, payload) -> None: """Thread-safe helper to push an event to the AsyncEventQueue from a worker thread.""" asyncio.run_coroutine_threadsafe(event_queue.put(event_name, payload), loop) def confirm_execution(payload: str, event_queue: events.AsyncEventQueue, ticket_id: str, loop: asyncio.AbstractEventLoop = None) -> bool: """ Pushes an approval request to the GUI and waits for response. """ dialog_container = [None] task = { "action": "mma_step_approval", "ticket_id": ticket_id, "payload": payload, "dialog_container": dialog_container } if loop: _queue_put(event_queue, loop, "mma_step_approval", task) else: event_queue._queue.put_nowait(("mma_step_approval", task)) # Wait for the GUI to create the dialog and for the user to respond start = time.time() while dialog_container[0] is None and time.time() - start < 60: time.sleep(0.1) if dialog_container[0]: approved, final_payload = dialog_container[0].wait() return approved return False def confirm_spawn(role: str, prompt: str, context_md: str, event_queue: events.AsyncEventQueue, ticket_id: str, loop: asyncio.AbstractEventLoop = None) -> Tuple[bool, str, str]: """ Pushes a spawn approval request to the GUI and waits for response. Returns (approved, modified_prompt, modified_context) """ dialog_container = [None] task = { "action": "mma_spawn_approval", "ticket_id": ticket_id, "role": role, "prompt": prompt, "context_md": context_md, "dialog_container": dialog_container } if loop: _queue_put(event_queue, loop, "mma_spawn_approval", task) else: event_queue._queue.put_nowait(("mma_spawn_approval", task)) # Wait for the GUI to create the dialog and for the user to respond start = time.time() while dialog_container[0] is None and time.time() - start < 60: time.sleep(0.1) if dialog_container[0]: res = dialog_container[0].wait() if isinstance(res, dict): approved = res.get("approved", False) abort = res.get("abort", False) modified_prompt = res.get("prompt", prompt) modified_context = res.get("context_md", context_md) return approved and not abort, modified_prompt, modified_context else: # Fallback for old tuple style if any approved, final_payload = res modified_prompt = prompt modified_context = context_md if isinstance(final_payload, dict): modified_prompt = final_payload.get("prompt", prompt) modified_context = final_payload.get("context_md", context_md) return approved, modified_prompt, modified_context return False, prompt, context_md def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: List[str] | None = None, event_queue: events.AsyncEventQueue | None = None, engine: Optional['ConductorEngine'] = None, md_content: str = "", loop: asyncio.AbstractEventLoop = None) -> None: """ Simulates the lifecycle of a single agent working on a ticket. Calls the AI client and updates the ticket status based on the response. Args: ticket: The ticket to process. context: The worker context. context_files: List of files to include in the context. event_queue: Queue for pushing state updates and receiving approvals. engine: The conductor engine. md_content: The markdown context (history + files) for AI workers. loop: The main asyncio event loop (required for thread-safe queue access). """ # Enforce Context Amnesia: each ticket starts with a clean slate. ai_client.reset_session() context_injection = "" if context_files: parser = ASTParser(language="python") for i, file_path in enumerate(context_files): try: abs_path = Path(file_path) # (This is a bit simplified, but helps) with open(file_path, 'r', encoding='utf-8') as f: content = f.read() if i == 0: view = parser.get_curated_view(content) else: view = parser.get_skeleton(content) context_injection += f"\nFile: {file_path}\n{view}\n" except Exception as e: context_injection += f"\nError reading {file_path}: {e}\n" # Build a prompt for the worker user_message = ( f"You are assigned to Ticket {ticket.id}.\n" f"Task Description: {ticket.description}\n" ) if context_injection: user_message += f"\nContext Files:\n{context_injection}\n" user_message += ( "Please complete this task. If you are blocked and cannot proceed, " "start your response with 'BLOCKED' and explain why." ) # HITL Clutch: call confirm_spawn if event_queue is provided if event_queue: approved, modified_prompt, modified_context = confirm_spawn( role="Tier 3 Worker", prompt=user_message, context_md=md_content, event_queue=event_queue, ticket_id=ticket.id, loop=loop ) if not approved: ticket.mark_blocked("Spawn rejected by user.") return "BLOCKED: Spawn rejected by user." user_message = modified_prompt md_content = modified_context # HITL Clutch: pass the queue and ticket_id to confirm_execution def clutch_callback(payload: str) -> bool: if not event_queue: return True return confirm_execution(payload, event_queue, ticket.id, loop=loop) response = ai_client.send( md_content=md_content, user_message=user_message, base_dir=".", pre_tool_callback=clutch_callback if ticket.step_mode else None, qa_callback=ai_client.run_tier4_analysis ) if event_queue: # Push via "response" event type — _process_event_queue wraps this # as {"action": "handle_ai_response", "payload": ...} for the GUI. try: response_payload = { "text": response, "stream_id": f"Tier 3 (Worker): {ticket.id}", "status": "done" } if loop: _queue_put(event_queue, loop, "response", response_payload) else: event_queue._queue.put_nowait(("response", response_payload)) except Exception as e: print(f"Error pushing response to UI: {e}") # Update usage in engine if provided if engine: stats = {} # ai_client.get_token_stats() is not available engine.tier_usage["Tier 3"]["input"] += stats.get("prompt_tokens", 0) engine.tier_usage["Tier 3"]["output"] += stats.get("candidates_tokens", 0) if "BLOCKED" in response.upper(): ticket.mark_blocked(response) else: ticket.mark_complete() return response