Files
manual_slop/multi_agent_conductor.py

139 lines
5.4 KiB
Python

import ai_client
import json
from typing import List, Optional
from models import Ticket, Track, WorkerContext
from file_cache import ASTParser
class ConductorEngine:
"""
Orchestrates the execution of tickets within a track.
"""
def __init__(self, track: Track):
self.track = track
def parse_json_tickets(self, json_str: str):
"""
Parses a JSON string of ticket definitions (Godot ECS Flat List format)
and populates the Track's ticket list.
"""
try:
data = json.loads(json_str)
if not isinstance(data, list):
print("Error: JSON input must be a list of ticket definitions.")
return
for ticket_data in data:
# Construct Ticket object, using defaults for optional fields
ticket = Ticket(
id=ticket_data["id"],
description=ticket_data["description"],
status=ticket_data.get("status", "todo"),
assigned_to=ticket_data.get("assigned_to", "unassigned"),
depends_on=ticket_data.get("depends_on", []),
step_mode=ticket_data.get("step_mode", False)
)
self.track.tickets.append(ticket)
except json.JSONDecodeError as e:
print(f"Error parsing JSON tickets: {e}")
except KeyError as e:
print(f"Missing required field in ticket definition: {e}")
def run_linear(self):
"""
Executes tickets sequentially according to their dependencies.
Iterates through the track's executable tickets until no more can be run.
Supports dynamic execution as tickets added during runtime will be picked up
in the next iteration of the main loop.
"""
while True:
executable = self.track.get_executable_tickets()
if not executable:
# Check if we are finished or blocked
all_done = all(t.status == "completed" for t in self.track.tickets)
if all_done:
print("Track completed successfully.")
else:
# If we have no executable tickets but some are not completed, we might be blocked
# or there are simply no more tickets to run at this moment.
incomplete = [t for t in self.track.tickets if t.status != "completed"]
if not incomplete:
print("Track completed successfully.")
else:
print(f"No more executable tickets. {len(incomplete)} tickets remain incomplete.")
break
for ticket in executable:
# We re-check status in case it was modified by a parallel/dynamic process
# (though run_linear is currently single-threaded)
if ticket.status != "todo":
continue
print(f"Executing ticket {ticket.id}: {ticket.description}")
# For now, we use a default model name or take it from config
context = WorkerContext(
ticket_id=ticket.id,
model_name="gemini-2.5-flash-lite",
messages=[]
)
run_worker_lifecycle(ticket, context)
def confirm_execution(payload: str) -> bool:
"""
Placeholder for external confirmation function.
In a real scenario, this might trigger a UI prompt.
"""
return True
def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: List[str] = None):
"""
Simulates the lifecycle of a single agent working on a ticket.
Calls the AI client and updates the ticket status based on the response.
"""
# Enforce Context Amnesia: each ticket starts with a clean slate.
ai_client.reset_session()
context_injection = ""
if context_files:
parser = ASTParser(language="python")
for i, file_path in enumerate(context_files):
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if i == 0:
view = parser.get_curated_view(content)
else:
view = parser.get_skeleton(content)
context_injection += f"\nFile: {file_path}\n{view}\n"
except Exception as e:
context_injection += f"\nError reading {file_path}: {e}\n"
# Build a prompt for the worker
user_message = (
f"You are assigned to Ticket {ticket.id}.\n"
f"Task Description: {ticket.description}\n"
)
if context_injection:
user_message += f"\nContext Files:\n{context_injection}\n"
user_message += (
"Please complete this task. If you are blocked and cannot proceed, "
"start your response with 'BLOCKED' and explain why."
)
# In a real scenario, we would pass md_content from the aggregator
# and manage the conversation history in the context.
response = ai_client.send(
md_content="",
user_message=user_message,
base_dir=".",
pre_tool_callback=confirm_execution if ticket.step_mode else None,
qa_callback=ai_client.run_tier4_analysis
)
if "BLOCKED" in response.upper():
ticket.mark_blocked(response)
else:
ticket.mark_complete()
return response