diff --git a/docs/guide_multi_agent_conductor.md b/docs/guide_multi_agent_conductor.md index 2bb4074e..7547062f 100644 --- a/docs/guide_multi_agent_conductor.md +++ b/docs/guide_multi_agent_conductor.md @@ -60,122 +60,106 @@ Together they implement a **non-blocking execution engine** with thread-safe sta ## The `TrackDAG` (in `src/dag_engine.py`) -The `TrackDAG` class holds the ticket dependency graph for a single track. +The actual `TrackDAG` is a **flat list of `Ticket` references with an `O(1)` id→ticket map**, not a graph with separate node/edge dicts. The graph structure is implicit in each `Ticket.depends_on: list[str]`. -### Data Structures +### Data Structure ```python class TrackDAG: """Directed acyclic graph of tickets in a track.""" - def __init__(self, tickets: list[dict]): - # ticket_id -> {status, depends_on[], blocks[]} - self.nodes: dict[str, TicketNode] = {} - # ticket_id -> list of ticket_ids it depends on - self.edges: dict[str, set[str]] = {} - # reverse: ticket_id -> list of ticket_ids that depend on it - self.reverse_edges: dict[str, set[str]] = {} - - def ready_tickets(self) -> list[str]: - """Tickets with all dependencies DONE and status PENDING.""" - - def is_blocked(self, ticket_id: str) -> bool: - """Cascades blocked status from upstream.""" - - def mark_done(self, ticket_id: str) -> None: - """Update downstream tickets' blocked status.""" - - def detect_cycles(self) -> list[list[str]] | None: - """Returns list of cycles (each cycle = list of ticket_ids), or None if acyclic.""" + def __init__(self, tickets: list[Ticket]): + self.tickets = tickets # list[Ticket] (the source of truth) + self.ticket_map = {t.id: t for t in tickets} # dict[str, Ticket] (O(1) lookup) ``` -### `TicketNode` +The `Ticket` dataclass itself is defined in `src/models.py` (see [guide_models.md](guide_models.md)). It carries the dependencies and status directly — there is no separate `TicketNode` wrapper, no `edges` dict, no `reverse_edges` dict. Adjacency lists are computed on demand inside `cascade_blocks()` and `topological_sort()`. + +### Public Methods (actual signatures from `src/dag_engine.py:41-163`) + +| Method | Returns | Purpose | +|---|---|---| +| `cascade_blocks()` | `None` | BFS-propagate `blocked` status from currently-blocked tickets to transitive `todo` dependents (mutates tickets in place) | +| `is_ticket_ready(t)` | `bool` | True if all `t.depends_on` exist in `ticket_map` AND have `status == 'completed'` | +| `get_ready_tasks()` | `list[Ticket]` | All `todo` tickets whose dependencies are all `completed` (and whose status hasn't been blocked by `cascade_blocks()`) | +| `has_cycle()` | `bool` | Iterative DFS cycle detection (returns True/False — NOT a list of cycles) | +| `topological_sort()` | `list[str]` (ticket IDs in dep-first order) | Kahn's algorithm (BFS + in-degree counter); raises `ValueError("Dependency cycle detected")` if `len(result) < len(self.tickets)` after the BFS drain | + +### `has_cycle()` — Iterative DFS + +Implements iterative DFS to avoid recursion overhead. The `path` set is the iterative equivalent of the recursion stack: ```python -@dataclass -class TicketNode: - ticket_id: str - status: Literal["pending", "running", "done", "blocked", "skipped"] - priority: Literal["high", "medium", "low"] = "medium" - depends_on: set[str] = field(default_factory=set) - blocks: set[str] = field(default_factory=set) - result: dict | None = None # Populated when done - error: str | None = None +def has_cycle(self) -> bool: + with get_monitor().scope("dag_has_cycle"): + visited = set() + for start_ticket in self.tickets: + if start_ticket.id in visited: + continue + stack = [(start_ticket.id, False)] # (id, is_backtracking) + path = set() + while stack: + node_id, is_backtracking = stack.pop() + if is_backtracking: + path.remove(node_id) + continue + if node_id in path: return True + if node_id in visited: continue + visited.add(node_id) + path.add(node_id) + stack.append((node_id, True)) + ticket = self.ticket_map.get(node_id) + if ticket: + for neighbor_id in ticket.depends_on: + stack.append((neighbor_id, False)) + return False ``` -### `detect_cycles` +### `topological_sort()` — Kahn's Algorithm -Implements **iterative DFS** to avoid recursion overhead: +BFS-based topological sort. The cycle detection is implicit: after the BFS drain, if not every ticket was emitted, a cycle exists. ```python -def detect_cycles(self) -> list[list[str]] | None: - """Iterative DFS cycle detection. O(V+E).""" - WHITE, GRAY, BLACK = 0, 1, 2 - color = {tid: WHITE for tid in self.nodes} - parent = {tid: None for tid in self.nodes} - stack = [] - cycles = [] +def topological_sort(self) -> list[str]: + with get_monitor().scope("dag_topological_sort"): + in_degree = {t.id: len(t.depends_on) for t in self.tickets} + dependents = {t.id: [] for t in self.tickets} + for t in self.tickets: + for dep_id in t.depends_on: + if dep_id in dependents: + dependents[dep_id].append(t.id) - for start in self.nodes: - if color[start] != WHITE: - continue - stack.append((start, iter(self.edges[start]))) - while stack: - node, children = stack[-1] - try: - child = next(children) - if color[child] == GRAY: - # Back edge: cycle found - cycle = [child] - while node != child: - cycle.append(node) - node = parent[node] - cycle.append(child) - cycles.append(list(reversed(cycle))) - elif color[child] == WHITE: - color[child] = GRAY - parent[child] = node - stack.append((child, iter(self.edges[child]))) - except StopIteration: - color[node] = BLACK - stack.pop() - return cycles if cycles else None + queue = [t.id for t in self.tickets if in_degree[t.id] == 0] + result = [] + idx = 0 + while idx < len(queue): + u = queue[idx] + idx += 1 + result.append(u) + for v_id in dependents.get(u, []): + in_degree[v_id] -= 1 + if in_degree[v_id] == 0: + queue.append(v_id) + + if len(result) < len(self.tickets): + raise ValueError("Dependency cycle detected") + return result ``` -Returns `None` for an acyclic graph, or a list of cycles for debugging. - -### `ready_tickets` (Kahn's Algorithm Variant) - -Returns the set of tickets that are PENDING and have all dependencies DONE. +### `get_executable_tickets(track)` (free function, `src/dag_engine.py:165-173`) ```python -def ready_tickets(self) -> list[str]: - ready = [] - for tid, node in self.nodes.items(): - if node.status != "pending": - continue - if all(self.nodes[dep].status == "done" for dep in node.depends_on): - ready.append(tid) - return ready +def get_executable_tickets(track: "Track") -> list[Ticket]: + """Convenience: returns the ready-to-execute tickets of a Track. + Free function (instead of Track.get_executable_tickets) so that + src/models.py does not need to import TrackDAG at module level, + breaking the models<->dag_engine circular dependency. + """ + return TrackDAG(track.tickets).get_ready_tasks() ``` -Sorted by priority (high > medium > low) for deterministic dispatch order. +### Thread Safety -### `is_blocked` (Transitive Blocking Propagation) - -When a ticket is BLOCKED, all downstream tickets are also BLOCKED: - -```python -def is_blocked(self, ticket_id: str) -> bool: - """Cascades blocked status from upstream (any failed/skipped/blocked dep).""" - for dep in self.nodes[ticket_id].depends_on: - if self.nodes[dep].status in ("blocked", "skipped"): - return True - if self.is_blocked(dep): # Recursive cascade - return True - return False -``` - -This prevents execution stalls when an upstream ticket is blocked. +`TrackDAG` is **NOT thread-safe**. Callers must synchronize access if used from multiple threads (per the module docstring at `src/dag_engine.py:20-22`). The `ConductorEngine` is currently the only caller; the WorkerPool reads from `self.engine` under the ConductorEngine's own lock discipline. ---