docs(mma-conductor): replace fictional TrackDAG section with actual src/dag_engine.py API
This commit is contained in:
@@ -60,122 +60,106 @@ Together they implement a **non-blocking execution engine** with thread-safe sta
|
|||||||
|
|
||||||
## The `TrackDAG` (in `src/dag_engine.py`)
|
## The `TrackDAG` (in `src/dag_engine.py`)
|
||||||
|
|
||||||
The `TrackDAG` class holds the ticket dependency graph for a single track.
|
The actual `TrackDAG` is a **flat list of `Ticket` references with an `O(1)` id→ticket map**, not a graph with separate node/edge dicts. The graph structure is implicit in each `Ticket.depends_on: list[str]`.
|
||||||
|
|
||||||
### Data Structures
|
### Data Structure
|
||||||
|
|
||||||
```python
|
```python
|
||||||
class TrackDAG:
|
class TrackDAG:
|
||||||
"""Directed acyclic graph of tickets in a track."""
|
"""Directed acyclic graph of tickets in a track."""
|
||||||
def __init__(self, tickets: list[dict]):
|
def __init__(self, tickets: list[Ticket]):
|
||||||
# ticket_id -> {status, depends_on[], blocks[]}
|
self.tickets = tickets # list[Ticket] (the source of truth)
|
||||||
self.nodes: dict[str, TicketNode] = {}
|
self.ticket_map = {t.id: t for t in tickets} # dict[str, Ticket] (O(1) lookup)
|
||||||
# ticket_id -> list of ticket_ids it depends on
|
|
||||||
self.edges: dict[str, set[str]] = {}
|
|
||||||
# reverse: ticket_id -> list of ticket_ids that depend on it
|
|
||||||
self.reverse_edges: dict[str, set[str]] = {}
|
|
||||||
|
|
||||||
def ready_tickets(self) -> list[str]:
|
|
||||||
"""Tickets with all dependencies DONE and status PENDING."""
|
|
||||||
|
|
||||||
def is_blocked(self, ticket_id: str) -> bool:
|
|
||||||
"""Cascades blocked status from upstream."""
|
|
||||||
|
|
||||||
def mark_done(self, ticket_id: str) -> None:
|
|
||||||
"""Update downstream tickets' blocked status."""
|
|
||||||
|
|
||||||
def detect_cycles(self) -> list[list[str]] | None:
|
|
||||||
"""Returns list of cycles (each cycle = list of ticket_ids), or None if acyclic."""
|
|
||||||
```
|
```
|
||||||
|
|
||||||
### `TicketNode`
|
The `Ticket` dataclass itself is defined in `src/models.py` (see [guide_models.md](guide_models.md)). It carries the dependencies and status directly — there is no separate `TicketNode` wrapper, no `edges` dict, no `reverse_edges` dict. Adjacency lists are computed on demand inside `cascade_blocks()` and `topological_sort()`.
|
||||||
|
|
||||||
|
### Public Methods (actual signatures from `src/dag_engine.py:41-163`)
|
||||||
|
|
||||||
|
| Method | Returns | Purpose |
|
||||||
|
|---|---|---|
|
||||||
|
| `cascade_blocks()` | `None` | BFS-propagate `blocked` status from currently-blocked tickets to transitive `todo` dependents (mutates tickets in place) |
|
||||||
|
| `is_ticket_ready(t)` | `bool` | True if all `t.depends_on` exist in `ticket_map` AND have `status == 'completed'` |
|
||||||
|
| `get_ready_tasks()` | `list[Ticket]` | All `todo` tickets whose dependencies are all `completed` (and whose status hasn't been blocked by `cascade_blocks()`) |
|
||||||
|
| `has_cycle()` | `bool` | Iterative DFS cycle detection (returns True/False — NOT a list of cycles) |
|
||||||
|
| `topological_sort()` | `list[str]` (ticket IDs in dep-first order) | Kahn's algorithm (BFS + in-degree counter); raises `ValueError("Dependency cycle detected")` if `len(result) < len(self.tickets)` after the BFS drain |
|
||||||
|
|
||||||
|
### `has_cycle()` — Iterative DFS
|
||||||
|
|
||||||
|
Implements iterative DFS to avoid recursion overhead. The `path` set is the iterative equivalent of the recursion stack:
|
||||||
|
|
||||||
```python
|
```python
|
||||||
@dataclass
|
def has_cycle(self) -> bool:
|
||||||
class TicketNode:
|
with get_monitor().scope("dag_has_cycle"):
|
||||||
ticket_id: str
|
visited = set()
|
||||||
status: Literal["pending", "running", "done", "blocked", "skipped"]
|
for start_ticket in self.tickets:
|
||||||
priority: Literal["high", "medium", "low"] = "medium"
|
if start_ticket.id in visited:
|
||||||
depends_on: set[str] = field(default_factory=set)
|
continue
|
||||||
blocks: set[str] = field(default_factory=set)
|
stack = [(start_ticket.id, False)] # (id, is_backtracking)
|
||||||
result: dict | None = None # Populated when done
|
path = set()
|
||||||
error: str | None = None
|
while stack:
|
||||||
|
node_id, is_backtracking = stack.pop()
|
||||||
|
if is_backtracking:
|
||||||
|
path.remove(node_id)
|
||||||
|
continue
|
||||||
|
if node_id in path: return True
|
||||||
|
if node_id in visited: continue
|
||||||
|
visited.add(node_id)
|
||||||
|
path.add(node_id)
|
||||||
|
stack.append((node_id, True))
|
||||||
|
ticket = self.ticket_map.get(node_id)
|
||||||
|
if ticket:
|
||||||
|
for neighbor_id in ticket.depends_on:
|
||||||
|
stack.append((neighbor_id, False))
|
||||||
|
return False
|
||||||
```
|
```
|
||||||
|
|
||||||
### `detect_cycles`
|
### `topological_sort()` — Kahn's Algorithm
|
||||||
|
|
||||||
Implements **iterative DFS** to avoid recursion overhead:
|
BFS-based topological sort. The cycle detection is implicit: after the BFS drain, if not every ticket was emitted, a cycle exists.
|
||||||
|
|
||||||
```python
|
```python
|
||||||
def detect_cycles(self) -> list[list[str]] | None:
|
def topological_sort(self) -> list[str]:
|
||||||
"""Iterative DFS cycle detection. O(V+E)."""
|
with get_monitor().scope("dag_topological_sort"):
|
||||||
WHITE, GRAY, BLACK = 0, 1, 2
|
in_degree = {t.id: len(t.depends_on) for t in self.tickets}
|
||||||
color = {tid: WHITE for tid in self.nodes}
|
dependents = {t.id: [] for t in self.tickets}
|
||||||
parent = {tid: None for tid in self.nodes}
|
for t in self.tickets:
|
||||||
stack = []
|
for dep_id in t.depends_on:
|
||||||
cycles = []
|
if dep_id in dependents:
|
||||||
|
dependents[dep_id].append(t.id)
|
||||||
|
|
||||||
for start in self.nodes:
|
queue = [t.id for t in self.tickets if in_degree[t.id] == 0]
|
||||||
if color[start] != WHITE:
|
result = []
|
||||||
continue
|
idx = 0
|
||||||
stack.append((start, iter(self.edges[start])))
|
while idx < len(queue):
|
||||||
while stack:
|
u = queue[idx]
|
||||||
node, children = stack[-1]
|
idx += 1
|
||||||
try:
|
result.append(u)
|
||||||
child = next(children)
|
for v_id in dependents.get(u, []):
|
||||||
if color[child] == GRAY:
|
in_degree[v_id] -= 1
|
||||||
# Back edge: cycle found
|
if in_degree[v_id] == 0:
|
||||||
cycle = [child]
|
queue.append(v_id)
|
||||||
while node != child:
|
|
||||||
cycle.append(node)
|
if len(result) < len(self.tickets):
|
||||||
node = parent[node]
|
raise ValueError("Dependency cycle detected")
|
||||||
cycle.append(child)
|
return result
|
||||||
cycles.append(list(reversed(cycle)))
|
|
||||||
elif color[child] == WHITE:
|
|
||||||
color[child] = GRAY
|
|
||||||
parent[child] = node
|
|
||||||
stack.append((child, iter(self.edges[child])))
|
|
||||||
except StopIteration:
|
|
||||||
color[node] = BLACK
|
|
||||||
stack.pop()
|
|
||||||
return cycles if cycles else None
|
|
||||||
```
|
```
|
||||||
|
|
||||||
Returns `None` for an acyclic graph, or a list of cycles for debugging.
|
### `get_executable_tickets(track)` (free function, `src/dag_engine.py:165-173`)
|
||||||
|
|
||||||
### `ready_tickets` (Kahn's Algorithm Variant)
|
|
||||||
|
|
||||||
Returns the set of tickets that are PENDING and have all dependencies DONE.
|
|
||||||
|
|
||||||
```python
|
```python
|
||||||
def ready_tickets(self) -> list[str]:
|
def get_executable_tickets(track: "Track") -> list[Ticket]:
|
||||||
ready = []
|
"""Convenience: returns the ready-to-execute tickets of a Track.
|
||||||
for tid, node in self.nodes.items():
|
Free function (instead of Track.get_executable_tickets) so that
|
||||||
if node.status != "pending":
|
src/models.py does not need to import TrackDAG at module level,
|
||||||
continue
|
breaking the models<->dag_engine circular dependency.
|
||||||
if all(self.nodes[dep].status == "done" for dep in node.depends_on):
|
"""
|
||||||
ready.append(tid)
|
return TrackDAG(track.tickets).get_ready_tasks()
|
||||||
return ready
|
|
||||||
```
|
```
|
||||||
|
|
||||||
Sorted by priority (high > medium > low) for deterministic dispatch order.
|
### Thread Safety
|
||||||
|
|
||||||
### `is_blocked` (Transitive Blocking Propagation)
|
`TrackDAG` is **NOT thread-safe**. Callers must synchronize access if used from multiple threads (per the module docstring at `src/dag_engine.py:20-22`). The `ConductorEngine` is currently the only caller; the WorkerPool reads from `self.engine` under the ConductorEngine's own lock discipline.
|
||||||
|
|
||||||
When a ticket is BLOCKED, all downstream tickets are also BLOCKED:
|
|
||||||
|
|
||||||
```python
|
|
||||||
def is_blocked(self, ticket_id: str) -> bool:
|
|
||||||
"""Cascades blocked status from upstream (any failed/skipped/blocked dep)."""
|
|
||||||
for dep in self.nodes[ticket_id].depends_on:
|
|
||||||
if self.nodes[dep].status in ("blocked", "skipped"):
|
|
||||||
return True
|
|
||||||
if self.is_blocked(dep): # Recursive cascade
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
```
|
|
||||||
|
|
||||||
This prevents execution stalls when an upstream ticket is blocked.
|
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user