diff --git a/dag_engine.py b/dag_engine.py index 29028c5..11660d2 100644 --- a/dag_engine.py +++ b/dag_engine.py @@ -76,3 +76,50 @@ class TrackDAG: visit(ticket.id) return stack + +class ExecutionEngine: + def __init__(self, dag: TrackDAG, auto_queue: bool = False): + self.dag = dag + self.auto_queue = auto_queue + + def tick(self) -> List[Ticket]: + """ + Returns a list of tasks that are currently 'ready' to be executed. + A task is ready if its status is 'todo' and all its dependencies are 'completed'. + If auto_queue=True, it will automatically mark 'ready' tasks as 'in-progress', + unless step_mode=True is set on the task. + """ + ready = self.dag.get_ready_tasks() + + if self.auto_queue: + for ticket in ready: + if not ticket.step_mode: + ticket.status = "in_progress" + + return ready + + def approve_task(self, task_id: str): + """ + Manually approves a task to move it to 'in_progress'. + Typically used for tasks with step_mode=True or when auto_queue is False. + """ + ticket = self.dag.ticket_map.get(task_id) + if ticket and ticket.status == "todo": + # Check if dependencies are met first + all_done = True + for dep_id in ticket.depends_on: + dep = self.dag.ticket_map.get(dep_id) + if not dep or dep.status != "completed": + all_done = False + break + + if all_done: + ticket.status = "in_progress" + + def update_task_status(self, task_id: str, status: str): + """ + Updates the status of a specific task within the DAG. + """ + ticket = self.dag.ticket_map.get(task_id) + if ticket: + ticket.status = status diff --git a/tests/test_execution_engine.py b/tests/test_execution_engine.py new file mode 100644 index 0000000..69132e5 --- /dev/null +++ b/tests/test_execution_engine.py @@ -0,0 +1,123 @@ +import pytest +from models import Ticket +from dag_engine import TrackDAG, ExecutionEngine + +def test_execution_engine_basic_flow(): + # Setup tickets with dependencies + t1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker") + t2 = Ticket(id="T2", description="Task 2", status="todo", assigned_to="worker", depends_on=["T1"]) + t3 = Ticket(id="T3", description="Task 3", status="todo", assigned_to="worker", depends_on=["T1"]) + t4 = Ticket(id="T4", description="Task 4", status="todo", assigned_to="worker", depends_on=["T2", "T3"]) + + dag = TrackDAG([t1, t2, t3, t4]) + engine = ExecutionEngine(dag) + + # Tick 1: Only T1 should be ready + ready = engine.tick() + assert len(ready) == 1 + assert ready[0].id == "T1" + + # Complete T1 + engine.update_task_status("T1", "completed") + + # Tick 2: T2 and T3 should be ready + ready = engine.tick() + assert len(ready) == 2 + ids = {t.id for t in ready} + assert ids == {"T2", "T3"} + + # Complete T2 + engine.update_task_status("T2", "completed") + + # Tick 3: Only T3 should be ready (T4 depends on T2 AND T3) + ready = engine.tick() + assert len(ready) == 1 + assert ready[0].id == "T3" + + # Complete T3 + engine.update_task_status("T3", "completed") + + # Tick 4: T4 should be ready + ready = engine.tick() + assert len(ready) == 1 + assert ready[0].id == "T4" + + # Complete T4 + engine.update_task_status("T4", "completed") + + # Tick 5: Nothing ready + ready = engine.tick() + assert len(ready) == 0 + +def test_execution_engine_update_nonexistent_task(): + dag = TrackDAG([]) + engine = ExecutionEngine(dag) + # Should not raise error, or handle gracefully + engine.update_task_status("NONEXISTENT", "completed") + +def test_execution_engine_status_persistence(): + t1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker") + dag = TrackDAG([t1]) + engine = ExecutionEngine(dag) + + engine.update_task_status("T1", "in_progress") + assert t1.status == "in_progress" + + ready = engine.tick() + assert len(ready) == 0 # Only 'todo' tasks should be returned by tick() if they are ready + +def test_execution_engine_auto_queue(): + t1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker") + t2 = Ticket(id="T2", description="Task 2", status="todo", assigned_to="worker", depends_on=["T1"]) + + dag = TrackDAG([t1, t2]) + engine = ExecutionEngine(dag, auto_queue=True) + + # Tick 1: T1 is ready and should be automatically marked as 'in_progress' + ready = engine.tick() + assert len(ready) == 1 + assert ready[0].id == "T1" + assert t1.status == "in_progress" + + # Tick 2: T1 is in_progress, so T2 is NOT ready yet (T1 must be 'completed') + ready = engine.tick() + assert len(ready) == 0 + assert t2.status == "todo" + + # Complete T1 + engine.update_task_status("T1", "completed") + + # Tick 3: T2 is now ready and should be automatically marked as 'in_progress' + ready = engine.tick() + assert len(ready) == 1 + assert ready[0].id == "T2" + assert t2.status == "in_progress" + +def test_execution_engine_step_mode(): + t1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker", step_mode=True) + + dag = TrackDAG([t1]) + engine = ExecutionEngine(dag, auto_queue=True) + + # Tick 1: T1 is ready, but step_mode=True, so it should NOT be automatically marked as 'in_progress' + ready = engine.tick() + assert len(ready) == 1 + assert ready[0].id == "T1" + assert t1.status == "todo" + + # Manual approval + engine.approve_task("T1") + assert t1.status == "in_progress" + + # Tick 2: T1 is already in_progress, should not be returned by tick() (it's not 'ready'/todo) + ready = engine.tick() + assert len(ready) == 0 + +def test_execution_engine_approve_task(): + t1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker") + dag = TrackDAG([t1]) + engine = ExecutionEngine(dag, auto_queue=False) + + # Should be able to approve even if auto_queue is False + engine.approve_task("T1") + assert t1.status == "in_progress"