fix(conductor): Apply review suggestions for track 'mma_data_architecture_dag_engine'

This commit is contained in:
2026-02-27 20:20:01 -05:00
parent c15e8b8d1f
commit 6548ce6496
3 changed files with 102 additions and 32 deletions

View File

@@ -1,13 +1,26 @@
from typing import List from typing import List, Optional
from models import Ticket from models import Ticket
class TrackDAG: class TrackDAG:
"""
Manages a Directed Acyclic Graph of implementation tickets.
Provides methods for dependency resolution, cycle detection, and topological sorting.
"""
def __init__(self, tickets: List[Ticket]): def __init__(self, tickets: List[Ticket]):
"""
Initializes the TrackDAG with a list of Ticket objects.
Args:
tickets: A list of Ticket instances defining the graph nodes and edges.
"""
self.tickets = tickets self.tickets = tickets
self.ticket_map = {t.id: t for t in tickets} self.ticket_map = {t.id: t for t in tickets}
def get_ready_tasks(self) -> List[Ticket]: def get_ready_tasks(self) -> List[Ticket]:
"""Returns tickets that are 'todo' and whose dependencies are all 'completed'.""" """
Returns a list of tickets that are in 'todo' status and whose dependencies are all 'completed'.
Returns:
A list of Ticket objects ready for execution.
"""
ready = [] ready = []
for ticket in self.tickets: for ticket in self.tickets:
if ticket.status == 'todo': if ticket.status == 'todo':
@@ -23,11 +36,16 @@ class TrackDAG:
return ready return ready
def has_cycle(self) -> bool: def has_cycle(self) -> bool:
"""Returns True if there's a dependency cycle.""" """
Performs a Depth-First Search to detect cycles in the dependency graph.
Returns:
True if a cycle is detected, False otherwise.
"""
visited = set() visited = set()
rec_stack = set() rec_stack = set()
def is_cyclic(ticket_id): def is_cyclic(ticket_id: str) -> bool:
"""Internal recursive helper for cycle detection."""
if ticket_id in rec_stack: if ticket_id in rec_stack:
return True return True
if ticket_id in visited: if ticket_id in visited:
@@ -53,8 +71,11 @@ class TrackDAG:
def topological_sort(self) -> List[str]: def topological_sort(self) -> List[str]:
""" """
Returns a list of ticket IDs in topological order. Returns a list of ticket IDs in topological order (dependencies before dependents).
Raises ValueError if a cycle is detected. Returns:
A list of ticket ID strings.
Raises:
ValueError: If a dependency cycle is detected.
""" """
if self.has_cycle(): if self.has_cycle():
raise ValueError("Dependency cycle detected") raise ValueError("Dependency cycle detected")
@@ -62,7 +83,8 @@ class TrackDAG:
visited = set() visited = set()
stack = [] stack = []
def visit(ticket_id): def visit(ticket_id: str):
"""Internal recursive helper for topological sorting."""
if ticket_id in visited: if ticket_id in visited:
return return
visited.add(ticket_id) visited.add(ticket_id)
@@ -78,16 +100,26 @@ class TrackDAG:
return stack return stack
class ExecutionEngine: class ExecutionEngine:
"""
A state machine that governs the progression of tasks within a TrackDAG.
Handles automatic queueing and manual task approval.
"""
def __init__(self, dag: TrackDAG, auto_queue: bool = False): def __init__(self, dag: TrackDAG, auto_queue: bool = False):
"""
Initializes the ExecutionEngine.
Args:
dag: The TrackDAG instance to manage.
auto_queue: If True, ready tasks will automatically move to 'in_progress'.
"""
self.dag = dag self.dag = dag
self.auto_queue = auto_queue self.auto_queue = auto_queue
def tick(self) -> List[Ticket]: def tick(self) -> List[Ticket]:
""" """
Returns a list of tasks that are currently 'ready' to be executed. Evaluates the DAG and returns a list of tasks that are currently 'ready' for execution.
A task is ready if its status is 'todo' and all its dependencies are 'completed'. If auto_queue is enabled, tasks without 'step_mode' will be marked as 'in_progress'.
If auto_queue=True, it will automatically mark 'ready' tasks as 'in-progress', Returns:
unless step_mode=True is set on the task. A list of ready Ticket objects.
""" """
ready = self.dag.get_ready_tasks() ready = self.dag.get_ready_tasks()
@@ -100,8 +132,9 @@ class ExecutionEngine:
def approve_task(self, task_id: str): def approve_task(self, task_id: str):
""" """
Manually approves a task to move it to 'in_progress'. Manually transitions a task from 'todo' to 'in_progress' if its dependencies are met.
Typically used for tasks with step_mode=True or when auto_queue is False. Args:
task_id: The ID of the task to approve.
""" """
ticket = self.dag.ticket_map.get(task_id) ticket = self.dag.ticket_map.get(task_id)
if ticket and ticket.status == "todo": if ticket and ticket.status == "todo":
@@ -112,13 +145,16 @@ class ExecutionEngine:
if not dep or dep.status != "completed": if not dep or dep.status != "completed":
all_done = False all_done = False
break break
if all_done: if all_done:
ticket.status = "in_progress" ticket.status = "in_progress"
def update_task_status(self, task_id: str, status: str): def update_task_status(self, task_id: str, status: str):
""" """
Updates the status of a specific task within the DAG. Force-updates the status of a specific task.
Args:
task_id: The ID of the task.
status: The new status string (e.g., 'todo', 'in_progress', 'completed', 'blocked').
""" """
ticket = self.dag.ticket_map.get(task_id) ticket = self.dag.ticket_map.get(task_id)
if ticket: if ticket:

View File

@@ -2088,11 +2088,24 @@ class App:
track_id = f"track_{uuid.uuid4().hex[:8]}" track_id = f"track_{uuid.uuid4().hex[:8]}"
track = Track(id=track_id, description=title, tickets=tickets) track = Track(id=track_id, description=title, tickets=tickets)
# 4. Initialize ConductorEngine and run_linear loop # Initialize track state in the filesystem
from models import TrackState, Metadata
from datetime import datetime
now = datetime.now()
meta = Metadata(id=track_id, name=title, status="todo", created_at=now, updated_at=now)
state = TrackState(metadata=meta, discussion=[], tasks=tickets)
project_manager.save_track_state(track_id, state, self.ui_files_base_dir)
# 4. Initialize ConductorEngine and run loop
engine = multi_agent_conductor.ConductorEngine(track, self.event_queue) engine = multi_agent_conductor.ConductorEngine(track, self.event_queue)
# Use current full markdown context for the track execution
track_id_param = track.id
flat = project_manager.flat_config(self.project, self.active_discussion, track_id=track_id_param)
full_md, _, _ = aggregate.run(flat)
# Schedule the coroutine on the internal event loop # Schedule the coroutine on the internal event loop
asyncio.run_coroutine_threadsafe(engine.run_linear(), self._loop) asyncio.run_coroutine_threadsafe(engine.run(md_content=full_md), self._loop)
except Exception as e: except Exception as e:
self.ai_status = f"Track start error: {e}" self.ai_status = f"Track start error: {e}"
print(f"ERROR in _start_track_logic: {e}") print(f"ERROR in _start_track_logic: {e}")

View File

@@ -73,9 +73,11 @@ class ConductorEngine:
except KeyError as e: except KeyError as e:
print(f"Missing required field in ticket definition: {e}") print(f"Missing required field in ticket definition: {e}")
async def run(self): async def run(self, md_content: str = ""):
""" """
Main execution loop using the DAG engine. Main execution loop using the DAG engine.
Args:
md_content: The full markdown context (history + files) for AI workers.
""" """
await self._push_state(status="running", active_tier="Tier 2 (Tech Lead)") await self._push_state(status="running", active_tier="Tier 2 (Tech Lead)")
@@ -91,7 +93,7 @@ class ConductorEngine:
await self._push_state(status="done", active_tier=None) await self._push_state(status="done", active_tier=None)
else: else:
# Check if any tasks are in-progress or could be ready # Check if any tasks are in-progress or could be ready
if any(t.status == "running" for t in self.track.tickets): if any(t.status == "in_progress" for t in self.track.tickets):
# Wait for async tasks to complete # Wait for async tasks to complete
await asyncio.sleep(1) await asyncio.sleep(1)
continue continue
@@ -101,14 +103,12 @@ class ConductorEngine:
break break
# 3. Process ready tasks # 3. Process ready tasks
loop = asyncio.get_event_loop()
for ticket in ready_tasks: for ticket in ready_tasks:
# If auto_queue is on and step_mode is off, engine.tick() already marked it 'running' # If auto_queue is on and step_mode is off, engine.tick() already marked it 'in_progress'
# but we need to verify and handle the lifecycle. # but we need to verify and handle the lifecycle.
if ticket.status != "running" and not ticket.step_mode and self.engine.auto_queue: if ticket.status == "in_progress" or (not ticket.step_mode and self.engine.auto_queue):
# This shouldn't happen with current ExecutionEngine.tick() but for safety: ticket.status = "in_progress"
ticket.status = "running"
if ticket.status == "running":
print(f"Executing ticket {ticket.id}: {ticket.description}") print(f"Executing ticket {ticket.id}: {ticket.description}")
await self._push_state(active_tier=f"Tier 3 (Worker): {ticket.id}") await self._push_state(active_tier=f"Tier 3 (Worker): {ticket.id}")
@@ -117,8 +117,20 @@ class ConductorEngine:
model_name="gemini-2.5-flash-lite", model_name="gemini-2.5-flash-lite",
messages=[] messages=[]
) )
# Note: In a fully async version, we would wrap this in a task
run_worker_lifecycle(ticket, context, event_queue=self.event_queue, engine=self) # Offload the blocking lifecycle call to a thread to avoid blocking the async event loop.
# We pass the md_content so the worker has full context.
context_files = ticket.context_requirements if ticket.context_requirements else None
await loop.run_in_executor(
None,
run_worker_lifecycle,
ticket,
context,
context_files,
self.event_queue,
self,
md_content
)
await self._push_state(active_tier="Tier 2 (Tech Lead)") await self._push_state(active_tier="Tier 2 (Tech Lead)")
elif ticket.status == "todo" and (ticket.step_mode or not self.engine.auto_queue): elif ticket.status == "todo" and (ticket.step_mode or not self.engine.auto_queue):
@@ -126,7 +138,6 @@ class ConductorEngine:
print(f"Ticket {ticket.id} is ready and awaiting approval.") print(f"Ticket {ticket.id} is ready and awaiting approval.")
await self._push_state(active_tier=f"Awaiting Approval: {ticket.id}") await self._push_state(active_tier=f"Awaiting Approval: {ticket.id}")
# In a real UI, this would wait for a user event. # In a real UI, this would wait for a user event.
# For headless/linear run, we might need a signal.
# For now, we'll treat it as a pause point if not auto-queued. # For now, we'll treat it as a pause point if not auto-queued.
pass pass
@@ -170,10 +181,17 @@ def confirm_execution(payload: str, event_queue: events.AsyncEventQueue, ticket_
return False return False
def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: List[str] = None, event_queue: events.AsyncEventQueue = None, engine: Optional['ConductorEngine'] = None): def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: List[str] = None, event_queue: events.AsyncEventQueue = None, engine: Optional['ConductorEngine'] = None, md_content: str = ""):
""" """
Simulates the lifecycle of a single agent working on a ticket. Simulates the lifecycle of a single agent working on a ticket.
Calls the AI client and updates the ticket status based on the response. Calls the AI client and updates the ticket status based on the response.
Args:
ticket: The ticket to process.
context: The worker context.
context_files: List of files to include in the context.
event_queue: Queue for pushing state updates and receiving approvals.
engine: The conductor engine.
md_content: The markdown context (history + files) for AI workers.
""" """
# Enforce Context Amnesia: each ticket starts with a clean slate. # Enforce Context Amnesia: each ticket starts with a clean slate.
ai_client.reset_session() ai_client.reset_session()
@@ -183,6 +201,11 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
parser = ASTParser(language="python") parser = ASTParser(language="python")
for i, file_path in enumerate(context_files): for i, file_path in enumerate(context_files):
try: try:
abs_path = Path(file_path)
if not abs_path.is_absolute() and engine:
# Resolve relative to project base if possible
# (This is a bit simplified, but helps)
pass
with open(file_path, 'r', encoding='utf-8') as f: with open(file_path, 'r', encoding='utf-8') as f:
content = f.read() content = f.read()
if i == 0: if i == 0:
@@ -206,8 +229,6 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
"start your response with 'BLOCKED' and explain why." "start your response with 'BLOCKED' and explain why."
) )
# In a real scenario, we would pass md_content from the aggregator
# and manage the conversation history in the context.
# HITL Clutch: pass the queue and ticket_id to confirm_execution # HITL Clutch: pass the queue and ticket_id to confirm_execution
def clutch_callback(payload: str) -> bool: def clutch_callback(payload: str) -> bool:
if not event_queue: if not event_queue:
@@ -215,7 +236,7 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
return confirm_execution(payload, event_queue, ticket.id) return confirm_execution(payload, event_queue, ticket.id)
response = ai_client.send( response = ai_client.send(
md_content="", md_content=md_content,
user_message=user_message, user_message=user_message,
base_dir=".", base_dir=".",
pre_tool_callback=clutch_callback if ticket.step_mode else None, pre_tool_callback=clutch_callback if ticket.step_mode else None,