Files
manual_slop/src/dag_engine.py
T

227 lines
10 KiB
Python

"""
DAG Engine - Directed Acyclic Graph execution for MMA ticket orchestration.
This module provides the core graph data structures and state machine logic
for executing implementation tickets in dependency order within the MMA
(Multi-Model Agent) system.
Key Classes:
- TrackDAG: Graph representation with cycle detection, topological sorting,
and transitive blocking propagation.
- ExecutionEngine: Tick-based state machine that evaluates the DAG and
manages task status transitions.
Architecture Integration:
- TrackDAG is constructed from a list of Ticket objects (from models.py)
- ExecutionEngine is consumed by ConductorEngine (multi_agent_conductor.py)
- The tick() method is called in the main orchestration loop to determine
which tasks are ready for execution
Thread Safety:
- This module is NOT thread-safe. Callers must synchronize access if used
from multiple threads (e.g., the ConductorEngine's async loop).
See Also:
- docs/guide_mma.md for the full MMA orchestration documentation
- src/models.py for Ticket and Track data structures
- src/multi_agent_conductor.py for ConductorEngine integration
"""
from typing import List
from src.models import Ticket
from src.performance_monitor import get_monitor
class TrackDAG:
"""
Manages a Directed Acyclic Graph of implementation tickets.
Provides methods for dependency resolution, cycle detection, and topological sorting.
"""
def __init__(self, tickets: List[Ticket]) -> None:
"""
Initializes the TrackDAG with a list of Ticket objects.
Args:
tickets: A list of Ticket instances defining the graph nodes and edges.
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
"""
self.tickets = tickets
self.ticket_map = {t.id: t for t in tickets}
def cascade_blocks(self) -> None:
"""
Transitively marks `todo` tickets as `blocked` if any dependency is `blocked`.
Propagates 'blocked' status from initially blocked nodes to their dependents.
[C: tests/test_perf_dag.py:test_dag_performance]
"""
with get_monitor().scope("dag_cascade_blocks"):
# Build adjacency list of dependents using object references to avoid lookups
dependents = {t.id: [] for t in self.tickets}
for t in self.tickets:
for dep_id in t.depends_on:
if dep_id in dependents:
dependents[dep_id].append(t)
# Use a queue-based propagation (BFS) from all currently blocked tickets
queue = [t for t in self.tickets if t.status == 'blocked']
idx = 0
while idx < len(queue):
curr = queue[idx]
idx += 1
for dep_ticket in dependents.get(curr.id, []):
if dep_ticket.status == 'todo':
dep_ticket.status = 'blocked'
# Optional: preserve the reason for blocking
if not dep_ticket.blocked_reason:
dep_ticket.blocked_reason = f"Dependency {curr.id} is blocked."
queue.append(dep_ticket)
def is_ticket_ready(self, ticket: Ticket) -> bool:
"""Returns True if all dependencies of the ticket are completed."""
for dep_id in ticket.depends_on:
dep = self.ticket_map.get(dep_id)
if not dep or dep.status != 'completed':
return False
return True
def get_ready_tasks(self) -> List[Ticket]:
"""
Returns a list of tickets that are in 'todo' status and whose dependencies are all 'completed'.
Returns:
A list of Ticket objects ready for execution.
[C: src/models.py:Track.get_executable_tickets, tests/test_dag_engine.py:test_get_ready_tasks_branching, tests/test_dag_engine.py:test_get_ready_tasks_linear, tests/test_dag_engine.py:test_get_ready_tasks_multiple_deps, tests/test_orchestration_logic.py:test_track_executable_tickets]
"""
ready = []
for ticket in self.tickets:
if ticket.status == 'todo' and self.is_ticket_ready(ticket):
ready.append(ticket)
return ready
def has_cycle(self) -> bool:
"""
Performs an iterative Depth-First Search to detect cycles in the dependency graph.
Returns:
True if a cycle is detected, False otherwise.
[C: src/gui_2.py:App._render_task_dag_panel, tests/test_dag_engine.py:test_has_cycle_complex_no_cycle, tests/test_dag_engine.py:test_has_cycle_direct_cycle, tests/test_dag_engine.py:test_has_cycle_indirect_cycle, tests/test_dag_engine.py:test_has_cycle_no_cycle, tests/test_perf_dag.py:test_dag_edge_cases, tests/test_perf_dag.py:test_dag_performance]
"""
with get_monitor().scope("dag_has_cycle"):
visited = set()
for start_ticket in self.tickets:
if start_ticket.id in visited:
continue
stack = [(start_ticket.id, False)] # (id, is_backtracking)
path = set()
while stack:
node_id, is_backtracking = stack.pop()
if is_backtracking:
path.remove(node_id)
continue
if node_id in path:
return True
if node_id in visited:
continue
visited.add(node_id)
path.add(node_id)
stack.append((node_id, True))
ticket = self.ticket_map.get(node_id)
if ticket:
for neighbor_id in ticket.depends_on:
stack.append((neighbor_id, False))
return False
def topological_sort(self) -> List[str]:
"""
Returns a list of ticket IDs in topological order (dependencies before dependents).
Uses Kahn's algorithm for efficient O(V+E) sorting and cycle detection.
Returns:
A list of ticket ID strings.
Raises:
ValueError: If a dependency cycle is detected.
[C: tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_complex, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_cycle, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_empty, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_linear, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_missing_dependency, tests/test_conductor_tech_lead.py:test_topological_sort_vlog, tests/test_dag_engine.py:test_topological_sort, tests/test_dag_engine.py:test_topological_sort_cycle, tests/test_orchestration_logic.py:test_topological_sort, tests/test_orchestration_logic.py:test_topological_sort_circular, tests/test_perf_dag.py:test_dag_edge_cases, tests/test_perf_dag.py:test_dag_performance]
"""
with get_monitor().scope("dag_topological_sort"):
in_degree = {t.id: len(t.depends_on) for t in self.tickets}
dependents = {t.id: [] for t in self.tickets}
for t in self.tickets:
for dep_id in t.depends_on:
if dep_id in dependents:
dependents[dep_id].append(t.id)
# Queue starts with nodes having no dependencies
queue = [t.id for t in self.tickets if in_degree[t.id] == 0]
result = []
idx = 0
while idx < len(queue):
u = queue[idx]
idx += 1
result.append(u)
for v_id in dependents.get(u, []):
in_degree[v_id] -= 1
if in_degree[v_id] == 0:
queue.append(v_id)
if len(result) < len(self.tickets):
raise ValueError("Dependency cycle detected")
return result
class ExecutionEngine:
"""
A state machine that governs the progression of tasks within a TrackDAG.
Handles automatic queueing and manual task approval.
"""
def __init__(self, dag: TrackDAG, auto_queue: bool = False) -> None:
"""
Initializes the ExecutionEngine.
Args:
dag: The TrackDAG instance to manage.
auto_queue: If True, ready tasks will automatically move to 'in_progress'.
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
"""
self.dag = dag
self.auto_queue = auto_queue
def tick(self) -> List[Ticket]:
"""
Evaluates the DAG and returns a list of tasks that are currently 'ready' for execution.
If auto_queue is enabled, tasks without 'step_mode' will be marked as 'in_progress'.
Returns:
A list of ready Ticket objects.
[C: src/multi_agent_conductor.py:ConductorEngine.run, tests/test_arch_boundary_phase3.py:TestArchBoundaryPhase3.test_cascade_blocks_multi_hop, tests/test_arch_boundary_phase3.py:TestArchBoundaryPhase3.test_cascade_blocks_simple, tests/test_arch_boundary_phase3.py:TestArchBoundaryPhase3.test_execution_engine_tick_cascades_blocks, tests/test_arch_boundary_phase3.py:TestArchBoundaryPhase3.test_in_progress_not_blocked, tests/test_arch_boundary_phase3.py:TestArchBoundaryPhase3.test_manual_unblock_restores_todo, tests/test_execution_engine.py:test_execution_engine_auto_queue, tests/test_execution_engine.py:test_execution_engine_basic_flow, tests/test_execution_engine.py:test_execution_engine_step_mode]
"""
with get_monitor().scope("dag_tick"):
self.dag.cascade_blocks()
ready = self.dag.get_ready_tasks()
return ready
def approve_task(self, task_id: str) -> None:
"""
Manually transitions a task from 'todo' to 'in_progress' if its dependencies are met.
Args:
task_id: The ID of the task to approve.
[C: src/multi_agent_conductor.py:ConductorEngine.approve_task, tests/test_execution_engine.py:test_execution_engine_approve_task, tests/test_execution_engine.py:test_execution_engine_step_mode]
"""
ticket = self.dag.ticket_map.get(task_id)
if ticket and ticket.status == "todo" and self.dag.is_ticket_ready(ticket):
ticket.status = "in_progress"
def update_task_status(self, task_id: str, status: str) -> None:
"""
Force-updates the status of a specific task.
Args:
task_id: The ID of the task.
status: The new status string (e.g., 'todo', 'in_progress', 'completed', 'blocked').
[C: src/multi_agent_conductor.py:ConductorEngine.update_task_status, tests/test_arch_boundary_phase3.py:TestArchBoundaryPhase3.test_manual_unblock_restores_todo, tests/test_execution_engine.py:test_execution_engine_auto_queue, tests/test_execution_engine.py:test_execution_engine_basic_flow, tests/test_execution_engine.py:test_execution_engine_status_persistence, tests/test_execution_engine.py:test_execution_engine_update_nonexistent_task]
"""
ticket = self.dag.ticket_map.get(task_id)
if ticket:
ticket.status = status