feat(dag_engine): implement cascade_blocks and call in ExecutionEngine.tick

This commit is contained in:
2026-03-02 18:47:47 -05:00
parent dd882b928d
commit 5b8a0739f7
2 changed files with 78 additions and 0 deletions

View File

@@ -16,6 +16,23 @@ class TrackDAG:
self.tickets = tickets
self.ticket_map = {t.id: t for t in tickets}
def cascade_blocks(self) -> None:
"""
Transitively marks `todo` tickets as `blocked` if any dependency is `blocked`.
Runs until stable (handles multi-hop chains: A→B→C where A blocked cascades to B then C).
"""
changed = True
while changed:
changed = False
for ticket in self.tickets:
if ticket.status == 'todo':
for dep_id in ticket.depends_on:
dep = self.ticket_map.get(dep_id)
if dep and dep.status == 'blocked':
ticket.status = 'blocked'
changed = True
break
def get_ready_tasks(self) -> List[Ticket]:
"""
Returns a list of tickets that are in 'todo' status and whose dependencies are all 'completed'.
@@ -116,6 +133,7 @@ class ExecutionEngine:
Returns:
A list of ready Ticket objects.
"""
self.dag.cascade_blocks()
ready = self.dag.get_ready_tasks()
if self.auto_queue:
for ticket in ready:

View File

@@ -0,0 +1,60 @@
import pytest
from models import Ticket
from dag_engine import TrackDAG, ExecutionEngine
def test_cascade_blocks_simple() -> None:
"""Test that a blocked dependency blocks its immediate dependent."""
t1 = Ticket(id="T1", description="T1", status="blocked", assigned_to="worker")
t2 = Ticket(id="T2", description="T2", status="todo", assigned_to="worker", depends_on=["T1"])
dag = TrackDAG([t1, t2])
dag.cascade_blocks()
assert t2.status == "blocked"
def test_cascade_blocks_multi_hop() -> None:
"""Test that blocking cascades through multiple levels: A(blocked) -> B -> C."""
t1 = Ticket(id="T1", description="T1", status="blocked", assigned_to="worker")
t2 = Ticket(id="T2", description="T2", status="todo", assigned_to="worker", depends_on=["T1"])
t3 = Ticket(id="T3", description="T3", status="todo", assigned_to="worker", depends_on=["T2"])
dag = TrackDAG([t1, t2, t3])
dag.cascade_blocks()
assert t2.status == "blocked"
assert t3.status == "blocked"
def test_cascade_blocks_no_cascade_to_completed() -> None:
"""Test that completed tasks are not changed even if a dependency is blocked (though this shouldn't normally happen)."""
t1 = Ticket(id="T1", description="T1", status="blocked", assigned_to="worker")
t2 = Ticket(id="T2", description="T2", status="completed", assigned_to="worker", depends_on=["T1"])
dag = TrackDAG([t1, t2])
dag.cascade_blocks()
assert t2.status == "completed"
def test_cascade_blocks_partial_dependencies() -> None:
"""Test that if one dependency is blocked, the dependent is blocked even if others are completed."""
t1 = Ticket(id="T1", description="T1", status="blocked", assigned_to="worker")
t2 = Ticket(id="T2", description="T2", status="completed", assigned_to="worker")
t3 = Ticket(id="T3", description="T3", status="todo", assigned_to="worker", depends_on=["T1", "T2"])
dag = TrackDAG([t1, t2, t3])
dag.cascade_blocks()
assert t3.status == "blocked"
def test_cascade_blocks_already_in_progress() -> None:
"""Test that in_progress tasks are not blocked automatically (only todo)."""
t1 = Ticket(id="T1", description="T1", status="blocked", assigned_to="worker")
t2 = Ticket(id="T2", description="T2", status="in_progress", assigned_to="worker", depends_on=["T1"])
dag = TrackDAG([t1, t2])
dag.cascade_blocks()
assert t2.status == "in_progress"
def test_execution_engine_tick_cascades_blocks() -> None:
"""Test that ExecutionEngine.tick() triggers the cascading blocks."""
t1 = Ticket(id="T1", description="T1", status="blocked", assigned_to="worker")
t2 = Ticket(id="T2", description="T2", status="todo", assigned_to="worker", depends_on=["T1"])
dag = TrackDAG([t1, t2])
engine = ExecutionEngine(dag)
# Before tick, T2 is todo
assert t2.status == "todo"
# After tick, T2 should be blocked
engine.tick()
assert t2.status == "blocked"