docs(mma-conductor): replace fictional TrackDAG section with actual src/dag_engine.py API
This commit is contained in:
@@ -60,122 +60,106 @@ Together they implement a **non-blocking execution engine** with thread-safe sta
|
||||
|
||||
## The `TrackDAG` (in `src/dag_engine.py`)
|
||||
|
||||
The `TrackDAG` class holds the ticket dependency graph for a single track.
|
||||
The actual `TrackDAG` is a **flat list of `Ticket` references with an `O(1)` id→ticket map**, not a graph with separate node/edge dicts. The graph structure is implicit in each `Ticket.depends_on: list[str]`.
|
||||
|
||||
### Data Structures
|
||||
### Data Structure
|
||||
|
||||
```python
|
||||
class TrackDAG:
|
||||
"""Directed acyclic graph of tickets in a track."""
|
||||
def __init__(self, tickets: list[dict]):
|
||||
# ticket_id -> {status, depends_on[], blocks[]}
|
||||
self.nodes: dict[str, TicketNode] = {}
|
||||
# ticket_id -> list of ticket_ids it depends on
|
||||
self.edges: dict[str, set[str]] = {}
|
||||
# reverse: ticket_id -> list of ticket_ids that depend on it
|
||||
self.reverse_edges: dict[str, set[str]] = {}
|
||||
|
||||
def ready_tickets(self) -> list[str]:
|
||||
"""Tickets with all dependencies DONE and status PENDING."""
|
||||
|
||||
def is_blocked(self, ticket_id: str) -> bool:
|
||||
"""Cascades blocked status from upstream."""
|
||||
|
||||
def mark_done(self, ticket_id: str) -> None:
|
||||
"""Update downstream tickets' blocked status."""
|
||||
|
||||
def detect_cycles(self) -> list[list[str]] | None:
|
||||
"""Returns list of cycles (each cycle = list of ticket_ids), or None if acyclic."""
|
||||
def __init__(self, tickets: list[Ticket]):
|
||||
self.tickets = tickets # list[Ticket] (the source of truth)
|
||||
self.ticket_map = {t.id: t for t in tickets} # dict[str, Ticket] (O(1) lookup)
|
||||
```
|
||||
|
||||
### `TicketNode`
|
||||
The `Ticket` dataclass itself is defined in `src/models.py` (see [guide_models.md](guide_models.md)). It carries the dependencies and status directly — there is no separate `TicketNode` wrapper, no `edges` dict, no `reverse_edges` dict. Adjacency lists are computed on demand inside `cascade_blocks()` and `topological_sort()`.
|
||||
|
||||
### Public Methods (actual signatures from `src/dag_engine.py:41-163`)
|
||||
|
||||
| Method | Returns | Purpose |
|
||||
|---|---|---|
|
||||
| `cascade_blocks()` | `None` | BFS-propagate `blocked` status from currently-blocked tickets to transitive `todo` dependents (mutates tickets in place) |
|
||||
| `is_ticket_ready(t)` | `bool` | True if all `t.depends_on` exist in `ticket_map` AND have `status == 'completed'` |
|
||||
| `get_ready_tasks()` | `list[Ticket]` | All `todo` tickets whose dependencies are all `completed` (and whose status hasn't been blocked by `cascade_blocks()`) |
|
||||
| `has_cycle()` | `bool` | Iterative DFS cycle detection (returns True/False — NOT a list of cycles) |
|
||||
| `topological_sort()` | `list[str]` (ticket IDs in dep-first order) | Kahn's algorithm (BFS + in-degree counter); raises `ValueError("Dependency cycle detected")` if `len(result) < len(self.tickets)` after the BFS drain |
|
||||
|
||||
### `has_cycle()` — Iterative DFS
|
||||
|
||||
Implements iterative DFS to avoid recursion overhead. The `path` set is the iterative equivalent of the recursion stack:
|
||||
|
||||
```python
|
||||
@dataclass
|
||||
class TicketNode:
|
||||
ticket_id: str
|
||||
status: Literal["pending", "running", "done", "blocked", "skipped"]
|
||||
priority: Literal["high", "medium", "low"] = "medium"
|
||||
depends_on: set[str] = field(default_factory=set)
|
||||
blocks: set[str] = field(default_factory=set)
|
||||
result: dict | None = None # Populated when done
|
||||
error: str | None = None
|
||||
def has_cycle(self) -> bool:
|
||||
with get_monitor().scope("dag_has_cycle"):
|
||||
visited = set()
|
||||
for start_ticket in self.tickets:
|
||||
if start_ticket.id in visited:
|
||||
continue
|
||||
stack = [(start_ticket.id, False)] # (id, is_backtracking)
|
||||
path = set()
|
||||
while stack:
|
||||
node_id, is_backtracking = stack.pop()
|
||||
if is_backtracking:
|
||||
path.remove(node_id)
|
||||
continue
|
||||
if node_id in path: return True
|
||||
if node_id in visited: continue
|
||||
visited.add(node_id)
|
||||
path.add(node_id)
|
||||
stack.append((node_id, True))
|
||||
ticket = self.ticket_map.get(node_id)
|
||||
if ticket:
|
||||
for neighbor_id in ticket.depends_on:
|
||||
stack.append((neighbor_id, False))
|
||||
return False
|
||||
```
|
||||
|
||||
### `detect_cycles`
|
||||
### `topological_sort()` — Kahn's Algorithm
|
||||
|
||||
Implements **iterative DFS** to avoid recursion overhead:
|
||||
BFS-based topological sort. The cycle detection is implicit: after the BFS drain, if not every ticket was emitted, a cycle exists.
|
||||
|
||||
```python
|
||||
def detect_cycles(self) -> list[list[str]] | None:
|
||||
"""Iterative DFS cycle detection. O(V+E)."""
|
||||
WHITE, GRAY, BLACK = 0, 1, 2
|
||||
color = {tid: WHITE for tid in self.nodes}
|
||||
parent = {tid: None for tid in self.nodes}
|
||||
stack = []
|
||||
cycles = []
|
||||
def topological_sort(self) -> list[str]:
|
||||
with get_monitor().scope("dag_topological_sort"):
|
||||
in_degree = {t.id: len(t.depends_on) for t in self.tickets}
|
||||
dependents = {t.id: [] for t in self.tickets}
|
||||
for t in self.tickets:
|
||||
for dep_id in t.depends_on:
|
||||
if dep_id in dependents:
|
||||
dependents[dep_id].append(t.id)
|
||||
|
||||
for start in self.nodes:
|
||||
if color[start] != WHITE:
|
||||
continue
|
||||
stack.append((start, iter(self.edges[start])))
|
||||
while stack:
|
||||
node, children = stack[-1]
|
||||
try:
|
||||
child = next(children)
|
||||
if color[child] == GRAY:
|
||||
# Back edge: cycle found
|
||||
cycle = [child]
|
||||
while node != child:
|
||||
cycle.append(node)
|
||||
node = parent[node]
|
||||
cycle.append(child)
|
||||
cycles.append(list(reversed(cycle)))
|
||||
elif color[child] == WHITE:
|
||||
color[child] = GRAY
|
||||
parent[child] = node
|
||||
stack.append((child, iter(self.edges[child])))
|
||||
except StopIteration:
|
||||
color[node] = BLACK
|
||||
stack.pop()
|
||||
return cycles if cycles else None
|
||||
queue = [t.id for t in self.tickets if in_degree[t.id] == 0]
|
||||
result = []
|
||||
idx = 0
|
||||
while idx < len(queue):
|
||||
u = queue[idx]
|
||||
idx += 1
|
||||
result.append(u)
|
||||
for v_id in dependents.get(u, []):
|
||||
in_degree[v_id] -= 1
|
||||
if in_degree[v_id] == 0:
|
||||
queue.append(v_id)
|
||||
|
||||
if len(result) < len(self.tickets):
|
||||
raise ValueError("Dependency cycle detected")
|
||||
return result
|
||||
```
|
||||
|
||||
Returns `None` for an acyclic graph, or a list of cycles for debugging.
|
||||
|
||||
### `ready_tickets` (Kahn's Algorithm Variant)
|
||||
|
||||
Returns the set of tickets that are PENDING and have all dependencies DONE.
|
||||
### `get_executable_tickets(track)` (free function, `src/dag_engine.py:165-173`)
|
||||
|
||||
```python
|
||||
def ready_tickets(self) -> list[str]:
|
||||
ready = []
|
||||
for tid, node in self.nodes.items():
|
||||
if node.status != "pending":
|
||||
continue
|
||||
if all(self.nodes[dep].status == "done" for dep in node.depends_on):
|
||||
ready.append(tid)
|
||||
return ready
|
||||
def get_executable_tickets(track: "Track") -> list[Ticket]:
|
||||
"""Convenience: returns the ready-to-execute tickets of a Track.
|
||||
Free function (instead of Track.get_executable_tickets) so that
|
||||
src/models.py does not need to import TrackDAG at module level,
|
||||
breaking the models<->dag_engine circular dependency.
|
||||
"""
|
||||
return TrackDAG(track.tickets).get_ready_tasks()
|
||||
```
|
||||
|
||||
Sorted by priority (high > medium > low) for deterministic dispatch order.
|
||||
### Thread Safety
|
||||
|
||||
### `is_blocked` (Transitive Blocking Propagation)
|
||||
|
||||
When a ticket is BLOCKED, all downstream tickets are also BLOCKED:
|
||||
|
||||
```python
|
||||
def is_blocked(self, ticket_id: str) -> bool:
|
||||
"""Cascades blocked status from upstream (any failed/skipped/blocked dep)."""
|
||||
for dep in self.nodes[ticket_id].depends_on:
|
||||
if self.nodes[dep].status in ("blocked", "skipped"):
|
||||
return True
|
||||
if self.is_blocked(dep): # Recursive cascade
|
||||
return True
|
||||
return False
|
||||
```
|
||||
|
||||
This prevents execution stalls when an upstream ticket is blocked.
|
||||
`TrackDAG` is **NOT thread-safe**. Callers must synchronize access if used from multiple threads (per the module docstring at `src/dag_engine.py:20-22`). The `ConductorEngine` is currently the only caller; the WorkerPool reads from `self.engine` under the ConductorEngine's own lock discipline.
|
||||
|
||||
---
|
||||
|
||||
|
||||
Reference in New Issue
Block a user