import pytest from models import Ticket from dag_engine import TrackDAG, ExecutionEngine def test_execution_engine_basic_flow(): # Setup tickets with dependencies t1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker") t2 = Ticket(id="T2", description="Task 2", status="todo", assigned_to="worker", depends_on=["T1"]) t3 = Ticket(id="T3", description="Task 3", status="todo", assigned_to="worker", depends_on=["T1"]) t4 = Ticket(id="T4", description="Task 4", status="todo", assigned_to="worker", depends_on=["T2", "T3"]) dag = TrackDAG([t1, t2, t3, t4]) engine = ExecutionEngine(dag) # Tick 1: Only T1 should be ready ready = engine.tick() assert len(ready) == 1 assert ready[0].id == "T1" # Complete T1 engine.update_task_status("T1", "completed") # Tick 2: T2 and T3 should be ready ready = engine.tick() assert len(ready) == 2 ids = {t.id for t in ready} assert ids == {"T2", "T3"} # Complete T2 engine.update_task_status("T2", "completed") # Tick 3: Only T3 should be ready (T4 depends on T2 AND T3) ready = engine.tick() assert len(ready) == 1 assert ready[0].id == "T3" # Complete T3 engine.update_task_status("T3", "completed") # Tick 4: T4 should be ready ready = engine.tick() assert len(ready) == 1 assert ready[0].id == "T4" # Complete T4 engine.update_task_status("T4", "completed") # Tick 5: Nothing ready ready = engine.tick() assert len(ready) == 0 def test_execution_engine_update_nonexistent_task(): dag = TrackDAG([]) engine = ExecutionEngine(dag) # Should not raise error, or handle gracefully engine.update_task_status("NONEXISTENT", "completed") def test_execution_engine_status_persistence(): t1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker") dag = TrackDAG([t1]) engine = ExecutionEngine(dag) engine.update_task_status("T1", "in_progress") assert t1.status == "in_progress" ready = engine.tick() assert len(ready) == 0 # Only 'todo' tasks should be returned by tick() if they are ready def test_execution_engine_auto_queue(): t1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker") t2 = Ticket(id="T2", description="Task 2", status="todo", assigned_to="worker", depends_on=["T1"]) dag = TrackDAG([t1, t2]) engine = ExecutionEngine(dag, auto_queue=True) # Tick 1: T1 is ready and should be automatically marked as 'in_progress' ready = engine.tick() assert len(ready) == 1 assert ready[0].id == "T1" assert t1.status == "in_progress" # Tick 2: T1 is in_progress, so T2 is NOT ready yet (T1 must be 'completed') ready = engine.tick() assert len(ready) == 0 assert t2.status == "todo" # Complete T1 engine.update_task_status("T1", "completed") # Tick 3: T2 is now ready and should be automatically marked as 'in_progress' ready = engine.tick() assert len(ready) == 1 assert ready[0].id == "T2" assert t2.status == "in_progress" def test_execution_engine_step_mode(): t1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker", step_mode=True) dag = TrackDAG([t1]) engine = ExecutionEngine(dag, auto_queue=True) # Tick 1: T1 is ready, but step_mode=True, so it should NOT be automatically marked as 'in_progress' ready = engine.tick() assert len(ready) == 1 assert ready[0].id == "T1" assert t1.status == "todo" # Manual approval engine.approve_task("T1") assert t1.status == "in_progress" # Tick 2: T1 is already in_progress, should not be returned by tick() (it's not 'ready'/todo) ready = engine.tick() assert len(ready) == 0 def test_execution_engine_approve_task(): t1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker") dag = TrackDAG([t1]) engine = ExecutionEngine(dag, auto_queue=False) # Should be able to approve even if auto_queue is False engine.approve_task("T1") assert t1.status == "in_progress"