Files
manual_slop/multi_agent_conductor.py
Ed_ 3eefdfd29d fix(mma): Replace token stats stub with real comms log extraction in run_worker_lifecycle
Task 2.1 of mma_pipeline_fix_20260301: capture comms baseline before send(), then sum input_tokens/output_tokens from IN/response entries to populate engine.tier_usage['Tier 3'].
2026-03-01 13:22:15 -05:00

310 lines
12 KiB
Python

import ai_client
import json
import asyncio
import threading
import time
import traceback
from typing import List, Optional, Tuple
from dataclasses import asdict
import events
from models import Ticket, Track, WorkerContext
from file_cache import ASTParser
from pathlib import Path
from dag_engine import TrackDAG, ExecutionEngine
class ConductorEngine:
"""
Orchestrates the execution of tickets within a track.
"""
def __init__(self, track: Track, event_queue: Optional[events.AsyncEventQueue] = None, auto_queue: bool = False) -> None:
self.track = track
self.event_queue = event_queue
self.tier_usage = {
"Tier 1": {"input": 0, "output": 0},
"Tier 2": {"input": 0, "output": 0},
"Tier 3": {"input": 0, "output": 0},
"Tier 4": {"input": 0, "output": 0},
}
self.dag = TrackDAG(self.track.tickets)
self.engine = ExecutionEngine(self.dag, auto_queue=auto_queue)
async def _push_state(self, status: str = "running", active_tier: str = None) -> None:
if not self.event_queue:
return
payload = {
"status": status,
"active_tier": active_tier,
"tier_usage": self.tier_usage,
"track": {
"id": self.track.id,
"title": self.track.description,
},
"tickets": [asdict(t) for t in self.track.tickets]
}
await self.event_queue.put("mma_state_update", payload)
def parse_json_tickets(self, json_str: str) -> None:
"""
Parses a JSON string of ticket definitions (Godot ECS Flat List format)
and populates the Track's ticket list.
"""
try:
data = json.loads(json_str)
if not isinstance(data, list):
print("Error: JSON input must be a list of ticket definitions.")
return
for ticket_data in data:
# Construct Ticket object, using defaults for optional fields
ticket = Ticket(
id=ticket_data["id"],
description=ticket_data["description"],
status=ticket_data.get("status", "todo"),
assigned_to=ticket_data.get("assigned_to", "unassigned"),
depends_on=ticket_data.get("depends_on", []),
step_mode=ticket_data.get("step_mode", False)
)
self.track.tickets.append(ticket)
# Rebuild DAG and Engine after parsing new tickets
self.dag = TrackDAG(self.track.tickets)
self.engine = ExecutionEngine(self.dag, auto_queue=self.engine.auto_queue)
except json.JSONDecodeError as e:
print(f"Error parsing JSON tickets: {e}")
except KeyError as e:
print(f"Missing required field in ticket definition: {e}")
async def run(self, md_content: str = "") -> None:
"""
Main execution loop using the DAG engine.
Args:
md_content: The full markdown context (history + files) for AI workers.
"""
await self._push_state(status="running", active_tier="Tier 2 (Tech Lead)")
loop = asyncio.get_event_loop()
while True:
# 1. Identify ready tasks
ready_tasks = self.engine.tick()
# 2. Check for completion or blockage
if not ready_tasks:
all_done = all(t.status == "completed" for t in self.track.tickets)
if all_done:
print("Track completed successfully.")
await self._push_state(status="done", active_tier=None)
else:
# Check if any tasks are in-progress or could be ready
if any(t.status == "in_progress" for t in self.track.tickets):
# Wait for async tasks to complete
await asyncio.sleep(1)
continue
print("No more executable tickets. Track is blocked or finished.")
await self._push_state(status="blocked", active_tier=None)
break
# 3. Process ready tasks
for ticket in ready_tasks:
# If auto_queue is on and step_mode is off, engine.tick() already marked it 'in_progress'
# but we need to verify and handle the lifecycle.
if ticket.status == "in_progress" or (not ticket.step_mode and self.engine.auto_queue):
ticket.status = "in_progress"
print(f"Executing ticket {ticket.id}: {ticket.description}")
await self._push_state(active_tier=f"Tier 3 (Worker): {ticket.id}")
context = WorkerContext(
ticket_id=ticket.id,
model_name="gemini-2.5-flash-lite",
messages=[]
)
# Offload the blocking lifecycle call to a thread to avoid blocking the async event loop.
# We pass the md_content so the worker has full context.
context_files = ticket.context_requirements if ticket.context_requirements else None
await loop.run_in_executor(
None,
run_worker_lifecycle,
ticket,
context,
context_files,
self.event_queue,
self,
md_content,
loop
)
await self._push_state(active_tier="Tier 2 (Tech Lead)")
elif ticket.status == "todo" and (ticket.step_mode or not self.engine.auto_queue):
# Task is ready but needs approval
print(f"Ticket {ticket.id} is ready and awaiting approval.")
await self._push_state(active_tier=f"Awaiting Approval: {ticket.id}")
# In a real UI, this would wait for a user event.
# For now, we'll treat it as a pause point if not auto-queued.
await asyncio.sleep(1)
def _queue_put(event_queue: events.AsyncEventQueue, loop: asyncio.AbstractEventLoop, event_name: str, payload) -> None:
"""Thread-safe helper to push an event to the AsyncEventQueue from a worker thread."""
asyncio.run_coroutine_threadsafe(event_queue.put(event_name, payload), loop)
def confirm_execution(payload: str, event_queue: events.AsyncEventQueue, ticket_id: str, loop: asyncio.AbstractEventLoop = None) -> bool:
"""
Pushes an approval request to the GUI and waits for response.
"""
dialog_container = [None]
task = {
"action": "mma_step_approval",
"ticket_id": ticket_id,
"payload": payload,
"dialog_container": dialog_container
}
if loop:
_queue_put(event_queue, loop, "mma_step_approval", task)
else:
raise RuntimeError("loop is required for thread-safe event queue access")
# Wait for the GUI to create the dialog and for the user to respond
start = time.time()
while dialog_container[0] is None and time.time() - start < 60:
time.sleep(0.1)
if dialog_container[0]:
approved, final_payload = dialog_container[0].wait()
return approved
return False
def confirm_spawn(role: str, prompt: str, context_md: str, event_queue: events.AsyncEventQueue, ticket_id: str, loop: asyncio.AbstractEventLoop = None) -> Tuple[bool, str, str]:
"""
Pushes a spawn approval request to the GUI and waits for response.
Returns (approved, modified_prompt, modified_context)
"""
dialog_container = [None]
task = {
"action": "mma_spawn_approval",
"ticket_id": ticket_id,
"role": role,
"prompt": prompt,
"context_md": context_md,
"dialog_container": dialog_container
}
if loop:
_queue_put(event_queue, loop, "mma_spawn_approval", task)
else:
raise RuntimeError("loop is required for thread-safe event queue access")
# Wait for the GUI to create the dialog and for the user to respond
start = time.time()
while dialog_container[0] is None and time.time() - start < 60:
time.sleep(0.1)
if dialog_container[0]:
res = dialog_container[0].wait()
if isinstance(res, dict):
approved = res.get("approved", False)
abort = res.get("abort", False)
modified_prompt = res.get("prompt", prompt)
modified_context = res.get("context_md", context_md)
return approved and not abort, modified_prompt, modified_context
else:
# Fallback for old tuple style if any
approved, final_payload = res
modified_prompt = prompt
modified_context = context_md
if isinstance(final_payload, dict):
modified_prompt = final_payload.get("prompt", prompt)
modified_context = final_payload.get("context_md", context_md)
return approved, modified_prompt, modified_context
return False, prompt, context_md
def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: List[str] | None = None, event_queue: events.AsyncEventQueue | None = None, engine: Optional['ConductorEngine'] = None, md_content: str = "", loop: asyncio.AbstractEventLoop = None) -> None:
"""
Simulates the lifecycle of a single agent working on a ticket.
Calls the AI client and updates the ticket status based on the response.
Args:
ticket: The ticket to process.
context: The worker context.
context_files: List of files to include in the context.
event_queue: Queue for pushing state updates and receiving approvals.
engine: The conductor engine.
md_content: The markdown context (history + files) for AI workers.
loop: The main asyncio event loop (required for thread-safe queue access).
"""
# Enforce Context Amnesia: each ticket starts with a clean slate.
ai_client.reset_session()
context_injection = ""
if context_files:
parser = ASTParser(language="python")
for i, file_path in enumerate(context_files):
try:
abs_path = Path(file_path)
# (This is a bit simplified, but helps)
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if i == 0:
view = parser.get_curated_view(content)
else:
view = parser.get_skeleton(content)
context_injection += f"\nFile: {file_path}\n{view}\n"
except Exception as e:
context_injection += f"\nError reading {file_path}: {e}\n"
# Build a prompt for the worker
user_message = (
f"You are assigned to Ticket {ticket.id}.\n"
f"Task Description: {ticket.description}\n"
)
if context_injection:
user_message += f"\nContext Files:\n{context_injection}\n"
user_message += (
"Please complete this task. If you are blocked and cannot proceed, "
"start your response with 'BLOCKED' and explain why."
)
# HITL Clutch: call confirm_spawn if event_queue is provided
if event_queue:
approved, modified_prompt, modified_context = confirm_spawn(
role="Tier 3 Worker",
prompt=user_message,
context_md=md_content,
event_queue=event_queue,
ticket_id=ticket.id,
loop=loop
)
if not approved:
ticket.mark_blocked("Spawn rejected by user.")
return "BLOCKED: Spawn rejected by user."
user_message = modified_prompt
md_content = modified_context
# HITL Clutch: pass the queue and ticket_id to confirm_execution
def clutch_callback(payload: str) -> bool:
if not event_queue:
return True
return confirm_execution(payload, event_queue, ticket.id, loop=loop)
comms_baseline = len(ai_client.get_comms_log())
response = ai_client.send(
md_content=md_content,
user_message=user_message,
base_dir=".",
pre_tool_callback=clutch_callback if ticket.step_mode else None,
qa_callback=ai_client.run_tier4_analysis
)
if event_queue:
# Push via "response" event type — _process_event_queue wraps this
# as {"action": "handle_ai_response", "payload": ...} for the GUI.
try:
response_payload = {
"text": response,
"stream_id": f"Tier 3 (Worker): {ticket.id}",
"status": "done"
}
print(f"[MMA] Pushing Tier 3 response for {ticket.id}, loop={'present' if loop else 'NONE'}, stream_id={response_payload['stream_id']}")
if loop:
_queue_put(event_queue, loop, "response", response_payload)
else:
raise RuntimeError("loop is required for thread-safe event queue access")
except Exception as e:
print(f"[MMA] ERROR pushing response to UI: {e}\n{traceback.format_exc()}")
# Update usage in engine if provided
if engine:
_new_comms = ai_client.get_comms_log()[comms_baseline:]
_resp_entries = [e for e in _new_comms if e.get("direction") == "IN" and e.get("kind") == "response"]
_in_tokens = sum(e.get("payload", {}).get("usage", {}).get("input_tokens", 0) for e in _resp_entries)
_out_tokens = sum(e.get("payload", {}).get("usage", {}).get("output_tokens", 0) for e in _resp_entries)
engine.tier_usage["Tier 3"]["input"] += _in_tokens
engine.tier_usage["Tier 3"]["output"] += _out_tokens
if "BLOCKED" in response.upper():
ticket.mark_blocked(response)
else:
ticket.mark_complete()
return response