Files
manual_slop/multi_agent_conductor.py

94 lines
3.3 KiB
Python

import ai_client
from typing import List, Optional
from models import Ticket, Track, WorkerContext
from file_cache import ASTParser
class ConductorEngine:
"""
Orchestrates the execution of tickets within a track.
"""
def __init__(self, track: Track):
self.track = track
def run_linear(self):
"""
Executes tickets sequentially according to their dependencies.
Iterates through the track's executable tickets until no more can be run.
"""
while True:
executable = self.track.get_executable_tickets()
if not executable:
# Check if we are finished or blocked
all_done = all(t.status == "completed" for t in self.track.tickets)
if all_done:
print("Track completed successfully.")
else:
print("No more executable tickets. Track may be blocked or finished.")
break
for ticket in executable:
print(f"Executing ticket {ticket.id}: {ticket.description}")
# For now, we use a default model name or take it from config
context = WorkerContext(
ticket_id=ticket.id,
model_name="gemini-2.5-flash-lite",
messages=[]
)
run_worker_lifecycle(ticket, context)
def confirm_execution(payload: str) -> bool:
"""
Placeholder for external confirmation function.
In a real scenario, this might trigger a UI prompt.
"""
return True
def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: List[str] = None):
"""
Simulates the lifecycle of a single agent working on a ticket.
Calls the AI client and updates the ticket status based on the response.
"""
context_injection = ""
if context_files:
parser = ASTParser(language="python")
for i, file_path in enumerate(context_files):
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if i == 0:
view = parser.get_curated_view(content)
else:
view = parser.get_skeleton(content)
context_injection += f"\nFile: {file_path}\n{view}\n"
except Exception as e:
context_injection += f"\nError reading {file_path}: {e}\n"
# Build a prompt for the worker
user_message = (
f"You are assigned to Ticket {ticket.id}.\n"
f"Task Description: {ticket.description}\n"
)
if context_injection:
user_message += f"\nContext Files:\n{context_injection}\n"
user_message += (
"Please complete this task. If you are blocked and cannot proceed, "
"start your response with 'BLOCKED' and explain why."
)
# In a real scenario, we would pass md_content from the aggregator
# and manage the conversation history in the context.
response = ai_client.send(
md_content="",
user_message=user_message,
base_dir=".",
pre_tool_callback=confirm_execution if ticket.step_mode else None
)
if "BLOCKED" in response.upper():
ticket.mark_blocked(response)
else:
ticket.mark_complete()
return response