126 lines
4.0 KiB
Python
126 lines
4.0 KiB
Python
from typing import List
|
|
from models import Ticket
|
|
|
|
class TrackDAG:
|
|
def __init__(self, tickets: List[Ticket]):
|
|
self.tickets = tickets
|
|
self.ticket_map = {t.id: t for t in tickets}
|
|
|
|
def get_ready_tasks(self) -> List[Ticket]:
|
|
"""Returns tickets that are 'todo' and whose dependencies are all 'completed'."""
|
|
ready = []
|
|
for ticket in self.tickets:
|
|
if ticket.status == 'todo':
|
|
# Check if all dependencies exist and are completed
|
|
all_done = True
|
|
for dep_id in ticket.depends_on:
|
|
dep = self.ticket_map.get(dep_id)
|
|
if not dep or dep.status != 'completed':
|
|
all_done = False
|
|
break
|
|
if all_done:
|
|
ready.append(ticket)
|
|
return ready
|
|
|
|
def has_cycle(self) -> bool:
|
|
"""Returns True if there's a dependency cycle."""
|
|
visited = set()
|
|
rec_stack = set()
|
|
|
|
def is_cyclic(ticket_id):
|
|
if ticket_id in rec_stack:
|
|
return True
|
|
if ticket_id in visited:
|
|
return False
|
|
|
|
visited.add(ticket_id)
|
|
rec_stack.add(ticket_id)
|
|
|
|
ticket = self.ticket_map.get(ticket_id)
|
|
if ticket:
|
|
for neighbor in ticket.depends_on:
|
|
if is_cyclic(neighbor):
|
|
return True
|
|
|
|
rec_stack.remove(ticket_id)
|
|
return False
|
|
|
|
for ticket in self.tickets:
|
|
if ticket.id not in visited:
|
|
if is_cyclic(ticket.id):
|
|
return True
|
|
return False
|
|
|
|
def topological_sort(self) -> List[str]:
|
|
"""
|
|
Returns a list of ticket IDs in topological order.
|
|
Raises ValueError if a cycle is detected.
|
|
"""
|
|
if self.has_cycle():
|
|
raise ValueError("Dependency cycle detected")
|
|
|
|
visited = set()
|
|
stack = []
|
|
|
|
def visit(ticket_id):
|
|
if ticket_id in visited:
|
|
return
|
|
visited.add(ticket_id)
|
|
ticket = self.ticket_map.get(ticket_id)
|
|
if ticket:
|
|
for dep_id in ticket.depends_on:
|
|
visit(dep_id)
|
|
stack.append(ticket_id)
|
|
|
|
for ticket in self.tickets:
|
|
visit(ticket.id)
|
|
|
|
return stack
|
|
|
|
class ExecutionEngine:
|
|
def __init__(self, dag: TrackDAG, auto_queue: bool = False):
|
|
self.dag = dag
|
|
self.auto_queue = auto_queue
|
|
|
|
def tick(self) -> List[Ticket]:
|
|
"""
|
|
Returns a list of tasks that are currently 'ready' to be executed.
|
|
A task is ready if its status is 'todo' and all its dependencies are 'completed'.
|
|
If auto_queue=True, it will automatically mark 'ready' tasks as 'in-progress',
|
|
unless step_mode=True is set on the task.
|
|
"""
|
|
ready = self.dag.get_ready_tasks()
|
|
|
|
if self.auto_queue:
|
|
for ticket in ready:
|
|
if not ticket.step_mode:
|
|
ticket.status = "in_progress"
|
|
|
|
return ready
|
|
|
|
def approve_task(self, task_id: str):
|
|
"""
|
|
Manually approves a task to move it to 'in_progress'.
|
|
Typically used for tasks with step_mode=True or when auto_queue is False.
|
|
"""
|
|
ticket = self.dag.ticket_map.get(task_id)
|
|
if ticket and ticket.status == "todo":
|
|
# Check if dependencies are met first
|
|
all_done = True
|
|
for dep_id in ticket.depends_on:
|
|
dep = self.dag.ticket_map.get(dep_id)
|
|
if not dep or dep.status != "completed":
|
|
all_done = False
|
|
break
|
|
|
|
if all_done:
|
|
ticket.status = "in_progress"
|
|
|
|
def update_task_status(self, task_id: str, status: str):
|
|
"""
|
|
Updates the status of a specific task within the DAG.
|
|
"""
|
|
ticket = self.dag.ticket_map.get(task_id)
|
|
if ticket:
|
|
ticket.status = status
|