Private
Public Access
0
0

docs(mma): fix 5 drift points (has_cycle iterative/DFS->iterative, topological_sort DFS->Kahn, tick auto-promotion, ConductorEngine.__init__ signature+max_workers)

This commit is contained in:
2026-06-10 23:27:46 -04:00
parent 81e8824170
commit 57143b7ab2
+47 -28
View File
@@ -99,28 +99,35 @@ class TrackDAG:
**`get_ready_tasks()`**: Returns tickets where `status == 'todo'` AND all `depends_on` have `status == 'completed'`. Missing dependencies are treated as NOT completed (fail-safe).
**`has_cycle()`**: Classic DFS cycle detection using visited set + recursion stack:
**`has_cycle()`**: Iterative DFS cycle detection using an explicit stack of `(node_id, is_backtracking)` tuples plus a `path` set (no recursion):
```python
def has_cycle(self) -> bool:
visited = set()
rec_stack = set()
def is_cyclic(ticket_id):
if ticket_id in rec_stack: return True # Back edge = cycle
if ticket_id in visited: return False # Already explored
visited.add(ticket_id)
rec_stack.add(ticket_id)
for neighbor in ticket.depends_on:
if is_cyclic(neighbor): return True
rec_stack.remove(ticket_id)
with get_monitor().scope("dag_has_cycle"):
visited = set()
for start_ticket in self.tickets:
if start_ticket.id in visited:
continue
stack = [(start_ticket.id, False)] # (id, is_backtracking)
path = set()
while stack:
node_id, is_backtracking = stack.pop()
if is_backtracking:
path.remove(node_id)
continue
if node_id in path: return True # back-edge -> cycle
if node_id in visited: continue
visited.add(node_id)
path.add(node_id)
stack.append((node_id, True)) # post-visit marker
ticket = self.ticket_map.get(node_id)
if ticket:
for neighbor_id in ticket.depends_on:
stack.append((neighbor_id, False))
return False
for ticket in self.tickets:
if ticket.id not in visited:
if is_cyclic(ticket.id): return True
return False
```
**`topological_sort()`**: Calls `has_cycle()` first — raises `ValueError` if cycle found. Standard DFS post-order topological sort. Returns list of ticket ID strings in dependency order.
**`topological_sort()`**: **Kahn's algorithm** (BFS-based, in-degree counter), not DFS post-order. Cycle detection is implicit — if `len(result) < len(self.tickets)` after the BFS drain, a `ValueError("Dependency cycle detected")` is raised. Returns a list of ticket ID strings in dependency order.
### ExecutionEngine
@@ -132,10 +139,10 @@ class ExecutionEngine:
```
**`tick()`** — the heartbeat. On each call:
1. Queries `dag.get_ready_tasks()` for eligible tickets.
2. If `auto_queue` is enabled: non-`step_mode` tasks are automatically promoted to `in_progress`.
3. `step_mode` tasks remain in `todo` until `approve_task()` is called.
4. Returns the list of ready tasks.
1. Calls `dag.cascade_blocks()` to propagate `blocked` status from any blocked ticket to its transitive `todo` dependents.
2. Returns `dag.get_ready_tasks()` — the list of tickets that are `todo` with all dependencies `completed`.
**`tick()` does NOT promote tickets to `in_progress`**. The auto-promotion (`status = "in_progress"`) happens in the **caller**`ConductorEngine.run()` at `src/multi_agent_conductor.py` — not in `tick()`. `auto_queue` is therefore a parameter that the `ConductorEngine` consults in its own loop; `ExecutionEngine.tick()` itself only returns the ready list. Step-mode approval also happens in `ConductorEngine.run()` via `approve_task()`; the engine never moves a `todo` ticket on its own.
**`approve_task(task_id)`**: Manually transitions `todo``in_progress` if all dependencies are met.
@@ -174,22 +181,34 @@ The Tier 2 orchestrator. Owns the execution loop that drives tickets through the
```python
class ConductorEngine:
def __init__(self, track: Track, event_queue=None, auto_queue=False):
def __init__(
self,
track: Track,
event_queue: Optional[events.AsyncEventQueue] = None,
auto_queue: bool = False,
max_workers: int = 4,
):
self.track = track
self.event_queue = event_queue
self.tier_usage = {
"Tier 1": {"input": 0, "output": 0, "model": "gemini-3.1-pro-preview", "tool_preset": None, "persona": None},
"Tier 2": {"input": 0, "output": 0, "model": "gemini-3-flash-preview", "tool_preset": None, "persona": None},
"Tier 3": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite", "tool_preset": None, "persona": None},
"Tier 4": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite", "tool_preset": None, "persona": None},
"Tier 2": {"input": 0, "output": 0, "model": "gemini-3-flash-preview", "tool_preset": None, "persona": None},
"Tier 3": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite", "tool_preset": None, "persona": None},
"Tier 4": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite", "tool_preset": None, "persona": None},
}
self.dag = TrackDAG(self.track.tickets)
self.dag = TrackDAG(self.track.tickets)
self.engine = ExecutionEngine(self.dag, auto_queue=auto_queue)
self.pool = WorkerPool(max_workers=max_workers)
self._abort_events: dict[str, threading.Event] = {}
self._pause_event: threading.Event = threading.Event()
self.pool = WorkerPool(max_workers=max_workers)
self._workers_lock = threading.Lock()
self._active_workers: dict[str, threading.Thread] = {}
self._abort_events: dict[str, threading.Event] = {}
self._pause_event: threading.Event = threading.Event()
self._tier_usage_lock = threading.Lock()
self._dirty: bool = True
```
`max_workers` is **NOT** read from `config.toml` by `ConductorEngine` itself — it is supplied by the caller. The 3 call sites in `AppController` (at `src/app_controller.py:4132-4133`, `4145-4146`, `4223-4224`) all read `config.toml``[mma].max_workers` and pass it in. The default in the constructor signature is 4.
**Per-tier `tier_usage` schema** (each tier entry):
| Key | Type | Purpose |