import ai_client import json import asyncio import time import traceback from typing import List, Optional, Tuple from dataclasses import asdict import events from models import Ticket, Track, WorkerContext from file_cache import ASTParser from pathlib import Path from dag_engine import TrackDAG, ExecutionEngine class ConductorEngine: """ Orchestrates the execution of tickets within a track. """ def __init__(self, track: Track, event_queue: Optional[events.AsyncEventQueue] = None, auto_queue: bool = False) -> None: self.track = track self.event_queue = event_queue self.tier_usage = { "Tier 1": {"input": 0, "output": 0, "model": "gemini-3.1-pro-preview"}, "Tier 2": {"input": 0, "output": 0, "model": "gemini-3-flash-preview"}, "Tier 3": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite"}, "Tier 4": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite"}, } self.dag = TrackDAG(self.track.tickets) self.engine = ExecutionEngine(self.dag, auto_queue=auto_queue) async def _push_state(self, status: str = "running", active_tier: str = None) -> None: if not self.event_queue: return payload = { "status": status, "active_tier": active_tier, "tier_usage": self.tier_usage, "track": { "id": self.track.id, "title": self.track.description, }, "tickets": [asdict(t) for t in self.track.tickets] } await self.event_queue.put("mma_state_update", payload) def parse_json_tickets(self, json_str: str) -> None: """ Parses a JSON string of ticket definitions (Godot ECS Flat List format) and populates the Track's ticket list. """ try: data = json.loads(json_str) if not isinstance(data, list): print("Error: JSON input must be a list of ticket definitions.") return for ticket_data in data: # Construct Ticket object, using defaults for optional fields ticket = Ticket( id=ticket_data["id"], description=ticket_data["description"], status=ticket_data.get("status", "todo"), assigned_to=ticket_data.get("assigned_to", "unassigned"), depends_on=ticket_data.get("depends_on", []), step_mode=ticket_data.get("step_mode", False) ) self.track.tickets.append(ticket) # Rebuild DAG and Engine after parsing new tickets self.dag = TrackDAG(self.track.tickets) self.engine = ExecutionEngine(self.dag, auto_queue=self.engine.auto_queue) except json.JSONDecodeError as e: print(f"Error parsing JSON tickets: {e}") except KeyError as e: print(f"Missing required field in ticket definition: {e}") async def run(self, md_content: str = "") -> None: """ Main execution loop using the DAG engine. Args: md_content: The full markdown context (history + files) for AI workers. """ await self._push_state(status="running", active_tier="Tier 2 (Tech Lead)") loop = asyncio.get_event_loop() while True: # 1. Identify ready tasks ready_tasks = self.engine.tick() # 2. Check for completion or blockage if not ready_tasks: all_done = all(t.status == "completed" for t in self.track.tickets) if all_done: print("Track completed successfully.") await self._push_state(status="done", active_tier=None) else: # Check if any tasks are in-progress or could be ready if any(t.status == "in_progress" for t in self.track.tickets): # Wait for async tasks to complete await asyncio.sleep(1) continue print("No more executable tickets. Track is blocked or finished.") await self._push_state(status="blocked", active_tier=None) break # 3. Process ready tasks to_run = [t for t in ready_tasks if t.status == "in_progress" or (not t.step_mode and self.engine.auto_queue)] # Handle those awaiting approval for ticket in ready_tasks: if ticket not in to_run and ticket.status == "todo": print(f"Ticket {ticket.id} is ready and awaiting approval.") await self._push_state(active_tier=f"Awaiting Approval: {ticket.id}") await asyncio.sleep(1) if to_run: tasks = [] for ticket in to_run: ticket.status = "in_progress" print(f"Executing ticket {ticket.id}: {ticket.description}") await self._push_state(active_tier=f"Tier 3 (Worker): {ticket.id}") # Escalation logic based on retry_count models = ["gemini-2.5-flash-lite", "gemini-2.5-flash", "gemini-3.1-pro-preview"] model_idx = min(ticket.retry_count, len(models) - 1) model_name = models[model_idx] context = WorkerContext( ticket_id=ticket.id, model_name=model_name, messages=[] ) context_files = ticket.context_requirements if ticket.context_requirements else None tasks.append(loop.run_in_executor( None, run_worker_lifecycle, ticket, context, context_files, self.event_queue, self, md_content, loop )) await asyncio.gather(*tasks) # 4. Retry and escalation logic for ticket in to_run: if ticket.status == 'blocked': if ticket.get('retry_count', 0) < 2: ticket.retry_count += 1 ticket.status = 'todo' print(f"Ticket {ticket.id} BLOCKED. Escalating to {models[min(ticket.retry_count, len(models)-1)]} and retrying...") await self._push_state(active_tier="Tier 2 (Tech Lead)") def _queue_put(event_queue: events.AsyncEventQueue, loop: asyncio.AbstractEventLoop, event_name: str, payload) -> None: """Thread-safe helper to push an event to the AsyncEventQueue from a worker thread.""" asyncio.run_coroutine_threadsafe(event_queue.put(event_name, payload), loop) def confirm_execution(payload: str, event_queue: events.AsyncEventQueue, ticket_id: str, loop: asyncio.AbstractEventLoop = None) -> bool: """ Pushes an approval request to the GUI and waits for response. """ dialog_container = [None] task = { "action": "mma_step_approval", "ticket_id": ticket_id, "payload": payload, "dialog_container": dialog_container } if loop: _queue_put(event_queue, loop, "mma_step_approval", task) else: raise RuntimeError("loop is required for thread-safe event queue access") # Wait for the GUI to create the dialog and for the user to respond start = time.time() while dialog_container[0] is None and time.time() - start < 60: time.sleep(0.1) if dialog_container[0]: approved, final_payload = dialog_container[0].wait() return approved return False def confirm_spawn(role: str, prompt: str, context_md: str, event_queue: events.AsyncEventQueue, ticket_id: str, loop: asyncio.AbstractEventLoop = None) -> Tuple[bool, str, str]: """ Pushes a spawn approval request to the GUI and waits for response. Returns (approved, modified_prompt, modified_context) """ dialog_container = [None] task = { "action": "mma_spawn_approval", "ticket_id": ticket_id, "role": role, "prompt": prompt, "context_md": context_md, "dialog_container": dialog_container } if loop: _queue_put(event_queue, loop, "mma_spawn_approval", task) else: raise RuntimeError("loop is required for thread-safe event queue access") # Wait for the GUI to create the dialog and for the user to respond start = time.time() while dialog_container[0] is None and time.time() - start < 60: time.sleep(0.1) if dialog_container[0]: res = dialog_container[0].wait() if isinstance(res, dict): approved = res.get("approved", False) abort = res.get("abort", False) modified_prompt = res.get("prompt", prompt) modified_context = res.get("context_md", context_md) return approved and not abort, modified_prompt, modified_context else: # Fallback for old tuple style if any approved, final_payload = res modified_prompt = prompt modified_context = context_md if isinstance(final_payload, dict): modified_prompt = final_payload.get("prompt", prompt) modified_context = final_payload.get("context_md", context_md) return approved, modified_prompt, modified_context return False, prompt, context_md def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: List[str] | None = None, event_queue: events.AsyncEventQueue | None = None, engine: Optional['ConductorEngine'] = None, md_content: str = "", loop: asyncio.AbstractEventLoop = None) -> None: """ Simulates the lifecycle of a single agent working on a ticket. Calls the AI client and updates the ticket status based on the response. Args: ticket: The ticket to process. context: The worker context. context_files: List of files to include in the context. event_queue: Queue for pushing state updates and receiving approvals. engine: The conductor engine. md_content: The markdown context (history + files) for AI workers. loop: The main asyncio event loop (required for thread-safe queue access). """ # Enforce Context Amnesia: each ticket starts with a clean slate. ai_client.reset_session() ai_client.set_provider(ai_client.get_provider(), context.model_name) context_injection = "" if context_files: parser = ASTParser(language="python") for i, file_path in enumerate(context_files): try: Path(file_path) # (This is a bit simplified, but helps) with open(file_path, 'r', encoding='utf-8') as f: content = f.read() if i == 0: view = parser.get_curated_view(content) else: view = parser.get_skeleton(content) context_injection += f"\nFile: {file_path}\n{view}\n" except Exception as e: context_injection += f"\nError reading {file_path}: {e}\n" # Build a prompt for the worker user_message = ( f"You are assigned to Ticket {ticket.id}.\n" f"Task Description: {ticket.description}\n" ) if context_injection: user_message += f"\nContext Files:\n{context_injection}\n" user_message += ( "Please complete this task. If you are blocked and cannot proceed, " "start your response with 'BLOCKED' and explain why." ) # HITL Clutch: call confirm_spawn if event_queue is provided if event_queue: approved, modified_prompt, modified_context = confirm_spawn( role="Tier 3 Worker", prompt=user_message, context_md=md_content, event_queue=event_queue, ticket_id=ticket.id, loop=loop ) if not approved: ticket.mark_blocked("Spawn rejected by user.") return "BLOCKED: Spawn rejected by user." user_message = modified_prompt md_content = modified_context # HITL Clutch: pass the queue and ticket_id to confirm_execution def clutch_callback(payload: str) -> bool: if not event_queue: return True return confirm_execution(payload, event_queue, ticket.id, loop=loop) def stream_callback(chunk: str) -> None: if event_queue and loop: _queue_put(event_queue, loop, 'mma_stream', {'stream_id': f'Tier 3 (Worker): {ticket.id}', 'text': chunk}) old_comms_cb = ai_client.comms_log_callback def worker_comms_callback(entry: dict) -> None: if event_queue and loop: kind = entry.get("kind") payload = entry.get("payload", {}) chunk = "" if kind == "tool_call": chunk = f"\n\n[TOOL CALL] {payload.get('name')}\n{json.dumps(payload.get('script') or payload.get('args'))}\n" elif kind == "tool_result": res = str(payload.get("output", "")) if len(res) > 500: res = res[:500] + "... (truncated)" chunk = f"\n[TOOL RESULT]\n{res}\n" if chunk: _queue_put(event_queue, loop, "response", {"text": chunk, "stream_id": f"Tier 3 (Worker): {ticket.id}", "status": "streaming..."}) if old_comms_cb: old_comms_cb(entry) ai_client.comms_log_callback = worker_comms_callback ai_client.current_tier = "Tier 3" try: comms_baseline = len(ai_client.get_comms_log()) response = ai_client.send( md_content=md_content, user_message=user_message, base_dir=".", pre_tool_callback=clutch_callback if ticket.step_mode else None, qa_callback=ai_client.run_tier4_analysis, stream_callback=stream_callback ) finally: ai_client.comms_log_callback = old_comms_cb ai_client.current_tier = None if event_queue: # Push via "response" event type — _process_event_queue wraps this # as {"action": "handle_ai_response", "payload": ...} for the GUI. try: response_payload = { "text": response, "stream_id": f"Tier 3 (Worker): {ticket.id}", "status": "done" } print(f"[MMA] Pushing Tier 3 response for {ticket.id}, loop={'present' if loop else 'NONE'}, stream_id={response_payload['stream_id']}") if loop: _queue_put(event_queue, loop, "response", response_payload) else: raise RuntimeError("loop is required for thread-safe event queue access") except Exception as e: print(f"[MMA] ERROR pushing response to UI: {e}\n{traceback.format_exc()}") # Update usage in engine if provided if engine: _new_comms = ai_client.get_comms_log()[comms_baseline:] _resp_entries = [e for e in _new_comms if e.get("direction") == "IN" and e.get("kind") == "response"] _in_tokens = sum(e.get("payload", {}).get("usage", {}).get("input_tokens", 0) for e in _resp_entries) _out_tokens = sum(e.get("payload", {}).get("usage", {}).get("output_tokens", 0) for e in _resp_entries) engine.tier_usage["Tier 3"]["input"] += _in_tokens engine.tier_usage["Tier 3"]["output"] += _out_tokens if "BLOCKED" in response.upper(): ticket.mark_blocked(response) else: ticket.mark_complete() return response