feat(mma): Implement ExecutionEngine with auto-queue and step-mode support
This commit is contained in:
@@ -76,3 +76,50 @@ class TrackDAG:
|
||||
visit(ticket.id)
|
||||
|
||||
return stack
|
||||
|
||||
class ExecutionEngine:
|
||||
def __init__(self, dag: TrackDAG, auto_queue: bool = False):
|
||||
self.dag = dag
|
||||
self.auto_queue = auto_queue
|
||||
|
||||
def tick(self) -> List[Ticket]:
|
||||
"""
|
||||
Returns a list of tasks that are currently 'ready' to be executed.
|
||||
A task is ready if its status is 'todo' and all its dependencies are 'completed'.
|
||||
If auto_queue=True, it will automatically mark 'ready' tasks as 'in-progress',
|
||||
unless step_mode=True is set on the task.
|
||||
"""
|
||||
ready = self.dag.get_ready_tasks()
|
||||
|
||||
if self.auto_queue:
|
||||
for ticket in ready:
|
||||
if not ticket.step_mode:
|
||||
ticket.status = "in_progress"
|
||||
|
||||
return ready
|
||||
|
||||
def approve_task(self, task_id: str):
|
||||
"""
|
||||
Manually approves a task to move it to 'in_progress'.
|
||||
Typically used for tasks with step_mode=True or when auto_queue is False.
|
||||
"""
|
||||
ticket = self.dag.ticket_map.get(task_id)
|
||||
if ticket and ticket.status == "todo":
|
||||
# Check if dependencies are met first
|
||||
all_done = True
|
||||
for dep_id in ticket.depends_on:
|
||||
dep = self.dag.ticket_map.get(dep_id)
|
||||
if not dep or dep.status != "completed":
|
||||
all_done = False
|
||||
break
|
||||
|
||||
if all_done:
|
||||
ticket.status = "in_progress"
|
||||
|
||||
def update_task_status(self, task_id: str, status: str):
|
||||
"""
|
||||
Updates the status of a specific task within the DAG.
|
||||
"""
|
||||
ticket = self.dag.ticket_map.get(task_id)
|
||||
if ticket:
|
||||
ticket.status = status
|
||||
|
||||
123
tests/test_execution_engine.py
Normal file
123
tests/test_execution_engine.py
Normal file
@@ -0,0 +1,123 @@
|
||||
import pytest
|
||||
from models import Ticket
|
||||
from dag_engine import TrackDAG, ExecutionEngine
|
||||
|
||||
def test_execution_engine_basic_flow():
|
||||
# Setup tickets with dependencies
|
||||
t1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker")
|
||||
t2 = Ticket(id="T2", description="Task 2", status="todo", assigned_to="worker", depends_on=["T1"])
|
||||
t3 = Ticket(id="T3", description="Task 3", status="todo", assigned_to="worker", depends_on=["T1"])
|
||||
t4 = Ticket(id="T4", description="Task 4", status="todo", assigned_to="worker", depends_on=["T2", "T3"])
|
||||
|
||||
dag = TrackDAG([t1, t2, t3, t4])
|
||||
engine = ExecutionEngine(dag)
|
||||
|
||||
# Tick 1: Only T1 should be ready
|
||||
ready = engine.tick()
|
||||
assert len(ready) == 1
|
||||
assert ready[0].id == "T1"
|
||||
|
||||
# Complete T1
|
||||
engine.update_task_status("T1", "completed")
|
||||
|
||||
# Tick 2: T2 and T3 should be ready
|
||||
ready = engine.tick()
|
||||
assert len(ready) == 2
|
||||
ids = {t.id for t in ready}
|
||||
assert ids == {"T2", "T3"}
|
||||
|
||||
# Complete T2
|
||||
engine.update_task_status("T2", "completed")
|
||||
|
||||
# Tick 3: Only T3 should be ready (T4 depends on T2 AND T3)
|
||||
ready = engine.tick()
|
||||
assert len(ready) == 1
|
||||
assert ready[0].id == "T3"
|
||||
|
||||
# Complete T3
|
||||
engine.update_task_status("T3", "completed")
|
||||
|
||||
# Tick 4: T4 should be ready
|
||||
ready = engine.tick()
|
||||
assert len(ready) == 1
|
||||
assert ready[0].id == "T4"
|
||||
|
||||
# Complete T4
|
||||
engine.update_task_status("T4", "completed")
|
||||
|
||||
# Tick 5: Nothing ready
|
||||
ready = engine.tick()
|
||||
assert len(ready) == 0
|
||||
|
||||
def test_execution_engine_update_nonexistent_task():
|
||||
dag = TrackDAG([])
|
||||
engine = ExecutionEngine(dag)
|
||||
# Should not raise error, or handle gracefully
|
||||
engine.update_task_status("NONEXISTENT", "completed")
|
||||
|
||||
def test_execution_engine_status_persistence():
|
||||
t1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker")
|
||||
dag = TrackDAG([t1])
|
||||
engine = ExecutionEngine(dag)
|
||||
|
||||
engine.update_task_status("T1", "in_progress")
|
||||
assert t1.status == "in_progress"
|
||||
|
||||
ready = engine.tick()
|
||||
assert len(ready) == 0 # Only 'todo' tasks should be returned by tick() if they are ready
|
||||
|
||||
def test_execution_engine_auto_queue():
|
||||
t1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker")
|
||||
t2 = Ticket(id="T2", description="Task 2", status="todo", assigned_to="worker", depends_on=["T1"])
|
||||
|
||||
dag = TrackDAG([t1, t2])
|
||||
engine = ExecutionEngine(dag, auto_queue=True)
|
||||
|
||||
# Tick 1: T1 is ready and should be automatically marked as 'in_progress'
|
||||
ready = engine.tick()
|
||||
assert len(ready) == 1
|
||||
assert ready[0].id == "T1"
|
||||
assert t1.status == "in_progress"
|
||||
|
||||
# Tick 2: T1 is in_progress, so T2 is NOT ready yet (T1 must be 'completed')
|
||||
ready = engine.tick()
|
||||
assert len(ready) == 0
|
||||
assert t2.status == "todo"
|
||||
|
||||
# Complete T1
|
||||
engine.update_task_status("T1", "completed")
|
||||
|
||||
# Tick 3: T2 is now ready and should be automatically marked as 'in_progress'
|
||||
ready = engine.tick()
|
||||
assert len(ready) == 1
|
||||
assert ready[0].id == "T2"
|
||||
assert t2.status == "in_progress"
|
||||
|
||||
def test_execution_engine_step_mode():
|
||||
t1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker", step_mode=True)
|
||||
|
||||
dag = TrackDAG([t1])
|
||||
engine = ExecutionEngine(dag, auto_queue=True)
|
||||
|
||||
# Tick 1: T1 is ready, but step_mode=True, so it should NOT be automatically marked as 'in_progress'
|
||||
ready = engine.tick()
|
||||
assert len(ready) == 1
|
||||
assert ready[0].id == "T1"
|
||||
assert t1.status == "todo"
|
||||
|
||||
# Manual approval
|
||||
engine.approve_task("T1")
|
||||
assert t1.status == "in_progress"
|
||||
|
||||
# Tick 2: T1 is already in_progress, should not be returned by tick() (it's not 'ready'/todo)
|
||||
ready = engine.tick()
|
||||
assert len(ready) == 0
|
||||
|
||||
def test_execution_engine_approve_task():
|
||||
t1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker")
|
||||
dag = TrackDAG([t1])
|
||||
engine = ExecutionEngine(dag, auto_queue=False)
|
||||
|
||||
# Should be able to approve even if auto_queue is False
|
||||
engine.approve_task("T1")
|
||||
assert t1.status == "in_progress"
|
||||
Reference in New Issue
Block a user