Private
Public Access
0
0

docs(mma): fix 5 drift points (has_cycle iterative/DFS->iterative, topological_sort DFS->Kahn, tick auto-promotion, ConductorEngine.__init__ signature+max_workers)

This commit is contained in:
2026-06-10 23:27:46 -04:00
parent 81e8824170
commit 57143b7ab2
+39 -20
View File
@@ -99,28 +99,35 @@ class TrackDAG:
**`get_ready_tasks()`**: Returns tickets where `status == 'todo'` AND all `depends_on` have `status == 'completed'`. Missing dependencies are treated as NOT completed (fail-safe). **`get_ready_tasks()`**: Returns tickets where `status == 'todo'` AND all `depends_on` have `status == 'completed'`. Missing dependencies are treated as NOT completed (fail-safe).
**`has_cycle()`**: Classic DFS cycle detection using visited set + recursion stack: **`has_cycle()`**: Iterative DFS cycle detection using an explicit stack of `(node_id, is_backtracking)` tuples plus a `path` set (no recursion):
```python ```python
def has_cycle(self) -> bool: def has_cycle(self) -> bool:
with get_monitor().scope("dag_has_cycle"):
visited = set() visited = set()
rec_stack = set() for start_ticket in self.tickets:
def is_cyclic(ticket_id): if start_ticket.id in visited:
if ticket_id in rec_stack: return True # Back edge = cycle continue
if ticket_id in visited: return False # Already explored stack = [(start_ticket.id, False)] # (id, is_backtracking)
visited.add(ticket_id) path = set()
rec_stack.add(ticket_id) while stack:
for neighbor in ticket.depends_on: node_id, is_backtracking = stack.pop()
if is_cyclic(neighbor): return True if is_backtracking:
rec_stack.remove(ticket_id) path.remove(node_id)
return False continue
for ticket in self.tickets: if node_id in path: return True # back-edge -> cycle
if ticket.id not in visited: if node_id in visited: continue
if is_cyclic(ticket.id): return True visited.add(node_id)
path.add(node_id)
stack.append((node_id, True)) # post-visit marker
ticket = self.ticket_map.get(node_id)
if ticket:
for neighbor_id in ticket.depends_on:
stack.append((neighbor_id, False))
return False return False
``` ```
**`topological_sort()`**: Calls `has_cycle()` first — raises `ValueError` if cycle found. Standard DFS post-order topological sort. Returns list of ticket ID strings in dependency order. **`topological_sort()`**: **Kahn's algorithm** (BFS-based, in-degree counter), not DFS post-order. Cycle detection is implicit — if `len(result) < len(self.tickets)` after the BFS drain, a `ValueError("Dependency cycle detected")` is raised. Returns a list of ticket ID strings in dependency order.
### ExecutionEngine ### ExecutionEngine
@@ -132,10 +139,10 @@ class ExecutionEngine:
``` ```
**`tick()`** — the heartbeat. On each call: **`tick()`** — the heartbeat. On each call:
1. Queries `dag.get_ready_tasks()` for eligible tickets. 1. Calls `dag.cascade_blocks()` to propagate `blocked` status from any blocked ticket to its transitive `todo` dependents.
2. If `auto_queue` is enabled: non-`step_mode` tasks are automatically promoted to `in_progress`. 2. Returns `dag.get_ready_tasks()` — the list of tickets that are `todo` with all dependencies `completed`.
3. `step_mode` tasks remain in `todo` until `approve_task()` is called.
4. Returns the list of ready tasks. **`tick()` does NOT promote tickets to `in_progress`**. The auto-promotion (`status = "in_progress"`) happens in the **caller**`ConductorEngine.run()` at `src/multi_agent_conductor.py` — not in `tick()`. `auto_queue` is therefore a parameter that the `ConductorEngine` consults in its own loop; `ExecutionEngine.tick()` itself only returns the ready list. Step-mode approval also happens in `ConductorEngine.run()` via `approve_task()`; the engine never moves a `todo` ticket on its own.
**`approve_task(task_id)`**: Manually transitions `todo``in_progress` if all dependencies are met. **`approve_task(task_id)`**: Manually transitions `todo``in_progress` if all dependencies are met.
@@ -174,7 +181,13 @@ The Tier 2 orchestrator. Owns the execution loop that drives tickets through the
```python ```python
class ConductorEngine: class ConductorEngine:
def __init__(self, track: Track, event_queue=None, auto_queue=False): def __init__(
self,
track: Track,
event_queue: Optional[events.AsyncEventQueue] = None,
auto_queue: bool = False,
max_workers: int = 4,
):
self.track = track self.track = track
self.event_queue = event_queue self.event_queue = event_queue
self.tier_usage = { self.tier_usage = {
@@ -186,10 +199,16 @@ class ConductorEngine:
self.dag = TrackDAG(self.track.tickets) self.dag = TrackDAG(self.track.tickets)
self.engine = ExecutionEngine(self.dag, auto_queue=auto_queue) self.engine = ExecutionEngine(self.dag, auto_queue=auto_queue)
self.pool = WorkerPool(max_workers=max_workers) self.pool = WorkerPool(max_workers=max_workers)
self._workers_lock = threading.Lock()
self._active_workers: dict[str, threading.Thread] = {}
self._abort_events: dict[str, threading.Event] = {} self._abort_events: dict[str, threading.Event] = {}
self._pause_event: threading.Event = threading.Event() self._pause_event: threading.Event = threading.Event()
self._tier_usage_lock = threading.Lock()
self._dirty: bool = True
``` ```
`max_workers` is **NOT** read from `config.toml` by `ConductorEngine` itself — it is supplied by the caller. The 3 call sites in `AppController` (at `src/app_controller.py:4132-4133`, `4145-4146`, `4223-4224`) all read `config.toml``[mma].max_workers` and pass it in. The default in the constructor signature is 4.
**Per-tier `tier_usage` schema** (each tier entry): **Per-tier `tier_usage` schema** (each tier entry):
| Key | Type | Purpose | | Key | Type | Purpose |