Private
Public Access
0
0

docs(mma-conductor): replace fictional TrackDAG section with actual src/dag_engine.py API

This commit is contained in:
2026-06-10 23:30:04 -04:00
parent 394987f8b3
commit a49e5ffb16
+78 -94
View File
@@ -60,122 +60,106 @@ Together they implement a **non-blocking execution engine** with thread-safe sta
## The `TrackDAG` (in `src/dag_engine.py`)
The `TrackDAG` class holds the ticket dependency graph for a single track.
The actual `TrackDAG` is a **flat list of `Ticket` references with an `O(1)` id→ticket map**, not a graph with separate node/edge dicts. The graph structure is implicit in each `Ticket.depends_on: list[str]`.
### Data Structures
### Data Structure
```python
class TrackDAG:
"""Directed acyclic graph of tickets in a track."""
def __init__(self, tickets: list[dict]):
# ticket_id -> {status, depends_on[], blocks[]}
self.nodes: dict[str, TicketNode] = {}
# ticket_id -> list of ticket_ids it depends on
self.edges: dict[str, set[str]] = {}
# reverse: ticket_id -> list of ticket_ids that depend on it
self.reverse_edges: dict[str, set[str]] = {}
def ready_tickets(self) -> list[str]:
"""Tickets with all dependencies DONE and status PENDING."""
def is_blocked(self, ticket_id: str) -> bool:
"""Cascades blocked status from upstream."""
def mark_done(self, ticket_id: str) -> None:
"""Update downstream tickets' blocked status."""
def detect_cycles(self) -> list[list[str]] | None:
"""Returns list of cycles (each cycle = list of ticket_ids), or None if acyclic."""
def __init__(self, tickets: list[Ticket]):
self.tickets = tickets # list[Ticket] (the source of truth)
self.ticket_map = {t.id: t for t in tickets} # dict[str, Ticket] (O(1) lookup)
```
### `TicketNode`
The `Ticket` dataclass itself is defined in `src/models.py` (see [guide_models.md](guide_models.md)). It carries the dependencies and status directly — there is no separate `TicketNode` wrapper, no `edges` dict, no `reverse_edges` dict. Adjacency lists are computed on demand inside `cascade_blocks()` and `topological_sort()`.
### Public Methods (actual signatures from `src/dag_engine.py:41-163`)
| Method | Returns | Purpose |
|---|---|---|
| `cascade_blocks()` | `None` | BFS-propagate `blocked` status from currently-blocked tickets to transitive `todo` dependents (mutates tickets in place) |
| `is_ticket_ready(t)` | `bool` | True if all `t.depends_on` exist in `ticket_map` AND have `status == 'completed'` |
| `get_ready_tasks()` | `list[Ticket]` | All `todo` tickets whose dependencies are all `completed` (and whose status hasn't been blocked by `cascade_blocks()`) |
| `has_cycle()` | `bool` | Iterative DFS cycle detection (returns True/False — NOT a list of cycles) |
| `topological_sort()` | `list[str]` (ticket IDs in dep-first order) | Kahn's algorithm (BFS + in-degree counter); raises `ValueError("Dependency cycle detected")` if `len(result) < len(self.tickets)` after the BFS drain |
### `has_cycle()` — Iterative DFS
Implements iterative DFS to avoid recursion overhead. The `path` set is the iterative equivalent of the recursion stack:
```python
@dataclass
class TicketNode:
ticket_id: str
status: Literal["pending", "running", "done", "blocked", "skipped"]
priority: Literal["high", "medium", "low"] = "medium"
depends_on: set[str] = field(default_factory=set)
blocks: set[str] = field(default_factory=set)
result: dict | None = None # Populated when done
error: str | None = None
def has_cycle(self) -> bool:
with get_monitor().scope("dag_has_cycle"):
visited = set()
for start_ticket in self.tickets:
if start_ticket.id in visited:
continue
stack = [(start_ticket.id, False)] # (id, is_backtracking)
path = set()
while stack:
node_id, is_backtracking = stack.pop()
if is_backtracking:
path.remove(node_id)
continue
if node_id in path: return True
if node_id in visited: continue
visited.add(node_id)
path.add(node_id)
stack.append((node_id, True))
ticket = self.ticket_map.get(node_id)
if ticket:
for neighbor_id in ticket.depends_on:
stack.append((neighbor_id, False))
return False
```
### `detect_cycles`
### `topological_sort()` — Kahn's Algorithm
Implements **iterative DFS** to avoid recursion overhead:
BFS-based topological sort. The cycle detection is implicit: after the BFS drain, if not every ticket was emitted, a cycle exists.
```python
def detect_cycles(self) -> list[list[str]] | None:
"""Iterative DFS cycle detection. O(V+E)."""
WHITE, GRAY, BLACK = 0, 1, 2
color = {tid: WHITE for tid in self.nodes}
parent = {tid: None for tid in self.nodes}
stack = []
cycles = []
def topological_sort(self) -> list[str]:
with get_monitor().scope("dag_topological_sort"):
in_degree = {t.id: len(t.depends_on) for t in self.tickets}
dependents = {t.id: [] for t in self.tickets}
for t in self.tickets:
for dep_id in t.depends_on:
if dep_id in dependents:
dependents[dep_id].append(t.id)
for start in self.nodes:
if color[start] != WHITE:
continue
stack.append((start, iter(self.edges[start])))
while stack:
node, children = stack[-1]
try:
child = next(children)
if color[child] == GRAY:
# Back edge: cycle found
cycle = [child]
while node != child:
cycle.append(node)
node = parent[node]
cycle.append(child)
cycles.append(list(reversed(cycle)))
elif color[child] == WHITE:
color[child] = GRAY
parent[child] = node
stack.append((child, iter(self.edges[child])))
except StopIteration:
color[node] = BLACK
stack.pop()
return cycles if cycles else None
queue = [t.id for t in self.tickets if in_degree[t.id] == 0]
result = []
idx = 0
while idx < len(queue):
u = queue[idx]
idx += 1
result.append(u)
for v_id in dependents.get(u, []):
in_degree[v_id] -= 1
if in_degree[v_id] == 0:
queue.append(v_id)
if len(result) < len(self.tickets):
raise ValueError("Dependency cycle detected")
return result
```
Returns `None` for an acyclic graph, or a list of cycles for debugging.
### `ready_tickets` (Kahn's Algorithm Variant)
Returns the set of tickets that are PENDING and have all dependencies DONE.
### `get_executable_tickets(track)` (free function, `src/dag_engine.py:165-173`)
```python
def ready_tickets(self) -> list[str]:
ready = []
for tid, node in self.nodes.items():
if node.status != "pending":
continue
if all(self.nodes[dep].status == "done" for dep in node.depends_on):
ready.append(tid)
return ready
def get_executable_tickets(track: "Track") -> list[Ticket]:
"""Convenience: returns the ready-to-execute tickets of a Track.
Free function (instead of Track.get_executable_tickets) so that
src/models.py does not need to import TrackDAG at module level,
breaking the models<->dag_engine circular dependency.
"""
return TrackDAG(track.tickets).get_ready_tasks()
```
Sorted by priority (high > medium > low) for deterministic dispatch order.
### Thread Safety
### `is_blocked` (Transitive Blocking Propagation)
When a ticket is BLOCKED, all downstream tickets are also BLOCKED:
```python
def is_blocked(self, ticket_id: str) -> bool:
"""Cascades blocked status from upstream (any failed/skipped/blocked dep)."""
for dep in self.nodes[ticket_id].depends_on:
if self.nodes[dep].status in ("blocked", "skipped"):
return True
if self.is_blocked(dep): # Recursive cascade
return True
return False
```
This prevents execution stalls when an upstream ticket is blocked.
`TrackDAG` is **NOT thread-safe**. Callers must synchronize access if used from multiple threads (per the module docstring at `src/dag_engine.py:20-22`). The `ConductorEngine` is currently the only caller; the WorkerPool reads from `self.engine` under the ConductorEngine's own lock discipline.
---