466 lines
21 KiB
Markdown
466 lines
21 KiB
Markdown
# `src/multi_agent_conductor.py` & `src/dag_engine.py` — MMA Engine
|
|
|
|
[Top](../Readme.md) | [Architecture](guide_architecture.md) | [Testing](guide_testing.md) | [MMA (concepts)](guide_mma.md)
|
|
|
|
---
|
|
|
|
## Overview
|
|
|
|
The MMA (Multi-Model Architecture) engine orchestrates **parallel AI worker execution** for implementing multi-ticket tracks. Two files:
|
|
- **`src/multi_agent_conductor.py`** (~28KB) — the high-level orchestrator, worker pool, ticket dispatch
|
|
- **`src/dag_engine.py`** (~10KB) — the DAG resolution, cycle detection, topological sort
|
|
|
|
Together they implement a **non-blocking execution engine** with thread-safe state management, configurable concurrency, and programmable execution modes (Auto-Queue vs Step Mode).
|
|
|
|
---
|
|
|
|
## Architecture
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────┐
|
|
│ User: "Implement Track X" │
|
|
└─────────────────┬───────────────────────────────┘
|
|
│ controller.dispatch_mma_track(track_id)
|
|
▼
|
|
┌─────────────────────────────────────────────────┐
|
|
│ MultiAgentConductor │
|
|
│ - Loads track from conductor/ directory │
|
|
│ - Builds TrackDAG from ticket dependencies │
|
|
│ - Detects cycles │
|
|
│ - Starts WorkerPool │
|
|
└─────────────────┬───────────────────────────────┘
|
|
│ enqueues ready tickets
|
|
▼
|
|
┌─────────────────────────────────────────────────┐
|
|
│ WorkerPool │
|
|
│ - Configurable concurrency (default 4) │
|
|
│ - Threads pull ready tickets, spawn workers │
|
|
│ - Workers call mma_exec.py with sub-prompt │
|
|
└─────────────────┬───────────────────────────────┘
|
|
│ per ticket
|
|
▼
|
|
┌─────────────────────────────────────────────────┐
|
|
│ mma_exec.py (tier3-worker / tier4-qa) │
|
|
│ - Stateless Tier 3 or Tier 4 sub-agent │
|
|
│ - TDD cycle: red → green → refactor │
|
|
│ - Commits per task │
|
|
└─────────────────┬───────────────────────────────┘
|
|
│ completes
|
|
▼
|
|
┌─────────────────────────────────────────────────┐
|
|
│ Conductor: │
|
|
│ - Records ticket result │
|
|
│ - Updates DAG (downstream tickets unblocked) │
|
|
│ - Emits events to controller.event_queue │
|
|
│ - Writes logs to logs/mma_delegation.log │
|
|
└─────────────────────────────────────────────────┘
|
|
```
|
|
|
|
---
|
|
|
|
## The `TrackDAG` (in `src/dag_engine.py`)
|
|
|
|
The actual `TrackDAG` is a **flat list of `Ticket` references with an `O(1)` id→ticket map**, not a graph with separate node/edge dicts. The graph structure is implicit in each `Ticket.depends_on: list[str]`.
|
|
|
|
### Data Structure
|
|
|
|
```python
|
|
class TrackDAG:
|
|
"""Directed acyclic graph of tickets in a track."""
|
|
def __init__(self, tickets: list[Ticket]):
|
|
self.tickets = tickets # list[Ticket] (the source of truth)
|
|
self.ticket_map = {t.id: t for t in tickets} # dict[str, Ticket] (O(1) lookup)
|
|
```
|
|
|
|
The `Ticket` dataclass itself is defined in `src/models.py` (see [guide_models.md](guide_models.md)). It carries the dependencies and status directly — there is no separate `TicketNode` wrapper, no `edges` dict, no `reverse_edges` dict. Adjacency lists are computed on demand inside `cascade_blocks()` and `topological_sort()`.
|
|
|
|
### Public Methods (actual signatures from `src/dag_engine.py:41-163`)
|
|
|
|
| Method | Returns | Purpose |
|
|
|---|---|---|
|
|
| `cascade_blocks()` | `None` | BFS-propagate `blocked` status from currently-blocked tickets to transitive `todo` dependents (mutates tickets in place) |
|
|
| `is_ticket_ready(t)` | `bool` | True if all `t.depends_on` exist in `ticket_map` AND have `status == 'completed'` |
|
|
| `get_ready_tasks()` | `list[Ticket]` | All `todo` tickets whose dependencies are all `completed` (and whose status hasn't been blocked by `cascade_blocks()`) |
|
|
| `has_cycle()` | `bool` | Iterative DFS cycle detection (returns True/False — NOT a list of cycles) |
|
|
| `topological_sort()` | `list[str]` (ticket IDs in dep-first order) | Kahn's algorithm (BFS + in-degree counter); raises `ValueError("Dependency cycle detected")` if `len(result) < len(self.tickets)` after the BFS drain |
|
|
|
|
### `has_cycle()` — Iterative DFS
|
|
|
|
Implements iterative DFS to avoid recursion overhead. The `path` set is the iterative equivalent of the recursion stack:
|
|
|
|
```python
|
|
def has_cycle(self) -> bool:
|
|
with get_monitor().scope("dag_has_cycle"):
|
|
visited = set()
|
|
for start_ticket in self.tickets:
|
|
if start_ticket.id in visited:
|
|
continue
|
|
stack = [(start_ticket.id, False)] # (id, is_backtracking)
|
|
path = set()
|
|
while stack:
|
|
node_id, is_backtracking = stack.pop()
|
|
if is_backtracking:
|
|
path.remove(node_id)
|
|
continue
|
|
if node_id in path: return True
|
|
if node_id in visited: continue
|
|
visited.add(node_id)
|
|
path.add(node_id)
|
|
stack.append((node_id, True))
|
|
ticket = self.ticket_map.get(node_id)
|
|
if ticket:
|
|
for neighbor_id in ticket.depends_on:
|
|
stack.append((neighbor_id, False))
|
|
return False
|
|
```
|
|
|
|
### `topological_sort()` — Kahn's Algorithm
|
|
|
|
BFS-based topological sort. The cycle detection is implicit: after the BFS drain, if not every ticket was emitted, a cycle exists.
|
|
|
|
```python
|
|
def topological_sort(self) -> list[str]:
|
|
with get_monitor().scope("dag_topological_sort"):
|
|
in_degree = {t.id: len(t.depends_on) for t in self.tickets}
|
|
dependents = {t.id: [] for t in self.tickets}
|
|
for t in self.tickets:
|
|
for dep_id in t.depends_on:
|
|
if dep_id in dependents:
|
|
dependents[dep_id].append(t.id)
|
|
|
|
queue = [t.id for t in self.tickets if in_degree[t.id] == 0]
|
|
result = []
|
|
idx = 0
|
|
while idx < len(queue):
|
|
u = queue[idx]
|
|
idx += 1
|
|
result.append(u)
|
|
for v_id in dependents.get(u, []):
|
|
in_degree[v_id] -= 1
|
|
if in_degree[v_id] == 0:
|
|
queue.append(v_id)
|
|
|
|
if len(result) < len(self.tickets):
|
|
raise ValueError("Dependency cycle detected")
|
|
return result
|
|
```
|
|
|
|
### `get_executable_tickets(track)` (free function, `src/dag_engine.py:165-173`)
|
|
|
|
```python
|
|
def get_executable_tickets(track: "Track") -> list[Ticket]:
|
|
"""Convenience: returns the ready-to-execute tickets of a Track.
|
|
Free function (instead of Track.get_executable_tickets) so that
|
|
src/models.py does not need to import TrackDAG at module level,
|
|
breaking the models<->dag_engine circular dependency.
|
|
"""
|
|
return TrackDAG(track.tickets).get_ready_tasks()
|
|
```
|
|
|
|
### Thread Safety
|
|
|
|
`TrackDAG` is **NOT thread-safe**. Callers must synchronize access if used from multiple threads (per the module docstring at `src/dag_engine.py:20-22`). The `ConductorEngine` is currently the only caller; the WorkerPool reads from `self.engine` under the ConductorEngine's own lock discipline.
|
|
|
|
---
|
|
|
|
## The `ExecutionEngine`
|
|
|
|
---
|
|
|
|
## The `ExecutionEngine` (in `src/dag_engine.py:176-228`)
|
|
|
|
`ExecutionEngine` is a **state machine facade around `TrackDAG`**, not an enum-driven auto/step dispatcher. There is no `ExecutionMode` enum; the engine takes a single `auto_queue: bool` flag.
|
|
|
|
```python
|
|
class ExecutionEngine:
|
|
"""A state machine that governs the progression of tasks within a TrackDAG.
|
|
Handles automatic queueing and manual task approval."""
|
|
def __init__(self, dag: TrackDAG, auto_queue: bool = False) -> None:
|
|
self.dag = dag
|
|
self.auto_queue = auto_queue
|
|
```
|
|
|
|
**Methods** (actual signatures):
|
|
|
|
| Method | Returns | Purpose |
|
|
|---|---|---|
|
|
| `tick()` | `list[Ticket]` | Calls `dag.cascade_blocks()` then `dag.get_ready_tasks()`. Returns the ready list. **Does NOT auto-promote tickets to `in_progress`.** |
|
|
| `approve_task(task_id)` | `None` | Manual transition: `ticket.status` "todo" → "in_progress" IFF dependencies are met (`is_ticket_ready` returns True). |
|
|
| `update_task_status(task_id, status)` | `None` | Force-update a ticket's status (e.g. to "completed" or "blocked"). |
|
|
|
|
**Step Mode / Auto-Queue** is implemented at the **caller** layer (`ConductorEngine.run` in `multi_agent_conductor.py`), not inside `ExecutionEngine`. The `auto_queue` parameter is consulted there, not in `tick()`. The ConductorEngine pushes tickets to `in_progress` based on `auto_queue` and the per-ticket `step_mode` flag.
|
|
|
|
---
|
|
|
|
## The `ConductorEngine` (in `src/multi_agent_conductor.py:116+`)
|
|
|
|
The actual class is named `ConductorEngine`, not `MultiAgentConductor`. It owns the DAG, the engine, and a `WorkerPool`; pushes state to the GUI; and runs the main async dispatch loop.
|
|
|
|
### `__init__(track, event_queue=None, auto_queue=False, max_workers=4)`
|
|
|
|
```python
|
|
class ConductorEngine:
|
|
def __init__(
|
|
self,
|
|
track: Track,
|
|
event_queue: Optional[events.AsyncEventQueue] = None,
|
|
auto_queue: bool = False,
|
|
max_workers: int = 4,
|
|
) -> None:
|
|
self.track = track
|
|
self.event_queue = event_queue
|
|
self.tier_usage = {
|
|
"Tier 1": {"input": 0, "output": 0, "model": "gemini-3.1-pro-preview", "tool_preset": None, "persona": None},
|
|
"Tier 2": {"input": 0, "output": 0, "model": "gemini-3-flash-preview", "tool_preset": None, "persona": None},
|
|
"Tier 3": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite", "tool_preset": None, "persona": None},
|
|
"Tier 4": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite", "tool_preset": None, "persona": None},
|
|
}
|
|
self.dag = TrackDAG(self.track.tickets)
|
|
self.engine = ExecutionEngine(self.dag, auto_queue=auto_queue)
|
|
self.pool = WorkerPool(max_workers=max_workers)
|
|
self._workers_lock = threading.Lock()
|
|
self._active_workers: dict[str, threading.Thread] = {}
|
|
self._abort_events: dict[str, threading.Event] = {}
|
|
self._pause_event: threading.Event = threading.Event()
|
|
self._tier_usage_lock = threading.Lock()
|
|
self._dirty: bool = True
|
|
```
|
|
|
|
`max_workers` is **not** read from `config.toml` by the engine — it's a constructor parameter. The 3 call sites in `AppController` (`src/app_controller.py:4132-4133`, `4145-4146`, `4223-4224`) all read `[mma].max_workers` from TOML and pass it in. Default is 4.
|
|
|
|
### Key methods
|
|
|
|
| Method | Returns | Purpose |
|
|
|---|---|---|
|
|
| `update_usage(tier, input_tokens, output_tokens)` | `None` | Cumulative token accounting per tier (under `_tier_usage_lock`) |
|
|
| `pause()` / `resume()` | `None` | Set/clear `_pause_event` |
|
|
| `approve_task(task_id)` | `None` | Delegates to `engine.approve_task`; sets `_dirty = True` |
|
|
| `update_task_status(task_id, status)` | `None` | Delegates to `engine.update_task_status`; sets `_dirty = True` |
|
|
| `kill_worker(ticket_id)` | `None` | Sets the per-ticket abort event; joins the thread with 1.0s timeout |
|
|
| `_push_state(status, active_tier)` | `None` | Builds a payload dict and `await event_queue.put("mma_state_update", payload)` |
|
|
| `parse_json_tickets(json_str)` | `list[dict]` | Parses Tier 2 LLM output into a list of ticket dicts (the ingestion path) |
|
|
| `run()` | (async coroutine) | The main async dispatch loop; see below |
|
|
|
|
### `run()` — The Main Async Dispatch Loop
|
|
|
|
The actual `run()` is an `async` coroutine that runs the dispatch loop, NOT a `_dispatch_loop` background thread. It uses `asyncio` and `loop.run_in_executor` to bridge to the blocking `run_worker_lifecycle` call.
|
|
|
|
### `_push_state()` payload shape
|
|
|
|
```python
|
|
async def _push_state(self, status: str = "running", active_tier: str = None) -> None:
|
|
if not self.event_queue:
|
|
return
|
|
payload = {
|
|
"status": status,
|
|
"active_tier": active_tier,
|
|
"tier_usage": self.tier_usage,
|
|
"track": {"id": self.track.id, "title": self.track.description},
|
|
"tickets": [asdict(t) for t in self.track.tickets],
|
|
}
|
|
await self.event_queue.put("mma_state_update", payload)
|
|
```
|
|
|
|
---
|
|
|
|
## The `WorkerPool` (in `src/multi_agent_conductor.py:50-114`)
|
|
|
|
A `dict[str, Thread]` + `threading.Lock` + `threading.Semaphore`, NOT a `ThreadPoolExecutor` wrapper.
|
|
|
|
```python
|
|
class WorkerPool:
|
|
def __init__(self, max_workers: int = 4):
|
|
self.max_workers = max_workers
|
|
self._active: dict[str, threading.Thread] = {}
|
|
self._lock = threading.Lock()
|
|
self._semaphore = threading.Semaphore(max_workers)
|
|
```
|
|
|
|
**Methods** (actual signatures):
|
|
|
|
| Method | Returns | Purpose |
|
|
|---|---|---|
|
|
| `spawn(ticket_id, target, args=())` | `None` | Spawns a daemon thread running `target(*args)` if the pool has capacity. No-op if full. Tracks the thread in `self._active[ticket_id]`. |
|
|
| `join_all(timeout=None)` | `None` | Joins every active thread. |
|
|
| `get_active_count()` | `int` | Returns `len(self._active)`. |
|
|
| `is_full()` | `bool` | True if `get_active_count() >= self.max_workers`. |
|
|
|
|
**Concurrency control is a `Semaphore`**, not a thread-pool executor. The semaphore is acquired in `spawn` and released in the spawned thread's `_run` wrapper. `self._active` is the source of truth for `get_active_count`/`is_full`; the semaphore prevents over-spawning.
|
|
|
|
---
|
|
|
|
## Sub-Agent Invocation (`mma_exec.py`)
|
|
|
|
The ConductorEngine does **not** spawn `mma_exec.py` directly. Sub-agent invocation is a **synchronous CLI bridge** at `scripts/mma_exec.py` invoked from a Tier 3 worker (see [conductor/workflow.md](../../conductor/workflow.md) "MMA Bridge" section). Each sub-agent is invoked via:
|
|
|
|
```bash
|
|
uv run python scripts/mma_exec.py --role tier3-worker "[PROMPT]"
|
|
```
|
|
|
|
The `--role` flag selects between `tier1-orchestrator`, `tier2-tech-lead`, `tier3-worker`, and `tier4-qa`. Sub-agents receive context via stdin (or as additional CLI args) and exit after one round-trip. The actual prompt construction lives in `run_worker_lifecycle` at `src/multi_agent_conductor.py` (the free function referenced by both `ConductorEngine.run` and the worker spawn flow).
|
|
|
|
The "Token Firewall" effect — each worker starts with a clean context window — is achieved by the `ai_client.reset_session()` call at the start of `run_worker_lifecycle` (see [guide_mma.md](guide_mma.md) "Context Amnesia").
|
|
---
|
|
|
|
## Track Loading
|
|
|
|
Tracks live in the project's `conductor/tracks/` directory:
|
|
|
|
```
|
|
conductor/tracks/
|
|
├── 2026-05-20_initial_setup/
|
|
│ ├── plan.md # Ticket list with dependencies
|
|
│ ├── decisions.md # Architectural notes
|
|
│ └── tasks/
|
|
│ ├── 001_init_repo.md
|
|
│ ├── 002_install_deps.md
|
|
│ └── ...
|
|
└── 2026-06-02_command_palette/
|
|
├── plan.md
|
|
└── ...
|
|
```
|
|
|
|
### `plan.md` Format
|
|
|
|
```markdown
|
|
# Phase 1: Foundation
|
|
- [ ] Task 1.1: Initialize the project
|
|
- [x] Task 1.2: Install dependencies (done)
|
|
- [~] Task 1.3: Configure paths (in progress)
|
|
|
|
# Phase 2: Implementation
|
|
- [ ] Task 2.1: Add command palette [depends: 1.3]
|
|
- [ ] Task 2.2: Hook API integration [depends: 2.1]
|
|
- [ ] Task 2.3: Documentation [depends: 2.2]
|
|
```
|
|
|
|
The parser extracts:
|
|
- Ticket ID (`1.1`, `1.2`, ...)
|
|
- Title (text after `: `)
|
|
- Status (`[ ]` = pending, `[~]` = in progress, `[x]` = done, `[!]` = blocked)
|
|
- Dependencies (`[depends: ...]`)
|
|
|
|
---
|
|
|
|
## Thread Safety
|
|
|
|
The conductor uses:
|
|
- `threading.Lock` for `_emit_event` (queue.put is atomic but logging needs guarding)
|
|
- `threading.Event` for `_stop_event` (clean shutdown signaling)
|
|
- `ThreadPoolExecutor` for worker isolation
|
|
- `subprocess.Popen` for hard isolation (Tier 3/4 sub-agents)
|
|
|
|
### `threading.local()` Context
|
|
|
|
The `ai_client` uses `threading.local()` to track the **source tier** of each request (so comms logs can be tagged with the originating tier). The conductor passes the tier name when calling `ai_client.send()`.
|
|
|
|
```python
|
|
# In ai_client.py
|
|
_local = threading.local()
|
|
|
|
def send(prompt, ...):
|
|
source = getattr(_local, 'source', 'main')
|
|
comms_log(f"[{source}] {prompt[:50]}...")
|
|
|
|
# In multi_agent_conductor.py
|
|
_local.source = "tier3-worker"
|
|
ai_client.send(prompt)
|
|
```
|
|
|
|
---
|
|
|
|
## Stop & Cleanup
|
|
|
|
```python
|
|
def stop(self) -> None:
|
|
"""Stop the conductor. Workers continue to completion or are cancelled."""
|
|
self._stop_event.set()
|
|
if self._dispatch_thread:
|
|
self._dispatch_thread.join(timeout=5)
|
|
if self.worker_pool:
|
|
self.worker_pool.stop(wait=False)
|
|
```
|
|
|
|
The conductor is designed for **graceful shutdown**: in-flight workers complete, no new ones spawn.
|
|
|
|
---
|
|
|
|
## Observability
|
|
|
|
### Logging
|
|
|
|
All conductor activity is logged to `logs/mma_delegation.log` (JSON-L format):
|
|
|
|
```json
|
|
{
|
|
"timestamp": "2026-06-02T12:34:56.789Z",
|
|
"event": "ticket_dispatched",
|
|
"track_id": "2026-06-02_command_palette",
|
|
"ticket_id": "2.1",
|
|
"role": "tier3-worker",
|
|
"persona": "code-implementer"
|
|
}
|
|
```
|
|
|
|
Per-worker logs are written to `logs/agents/<track_id>_<ticket_id>_<role>_<timestamp>.log`.
|
|
|
|
### GUI Dashboard
|
|
|
|
The `MMA Observability Dashboard` (in `gui_2.py`) reads from the conductor's state:
|
|
- Track list with progress bars
|
|
- Active ticket's worker output stream
|
|
- DAG visualization (via `imgui-node-editor`)
|
|
- Per-tier strategy streams
|
|
|
|
---
|
|
|
|
## Beads Mode Integration
|
|
|
|
When the project is in Beads mode (`.beads/` directory exists), the conductor **delegates to the Beads CLI** instead of parsing `plan.md`:
|
|
|
|
```python
|
|
def load_track(self, track_id: str) -> Track:
|
|
if self.controller.app_state.use_beads:
|
|
from src.beads_client import BeadsClient
|
|
client = BeadsClient()
|
|
tickets = client.list_tickets(track_id=track_id)
|
|
return Track(id=track_id, tickets=tickets)
|
|
else:
|
|
return self._load_markdown_track(track_id)
|
|
```
|
|
|
|
The downstream conductor logic is the same — it operates on a `Track` object regardless of source.
|
|
|
|
See **[docs/guide_beads.md](guide_beads.md)** for Beads details.
|
|
|
|
---
|
|
|
|
## Testing
|
|
|
|
### Unit Tests
|
|
|
|
- `tests/test_dag_engine.py` — `TrackDAG` cycle detection, ready_tickets, blocking cascade
|
|
- `tests/test_execution_engine.py` — mode transitions, approval flow
|
|
- `tests/test_worker_pool.py` — concurrency limit, has_capacity, stop
|
|
- `tests/test_mma_conductor.py` — track loading, dispatch flow, error handling
|
|
|
|
### Integration Tests (live_gui)
|
|
|
|
`tests/test_mma_observability_dashboard.py` — drives the dashboard via Hook API.
|
|
|
|
### Mocking
|
|
|
|
Tests use `unittest.mock.patch` to mock `subprocess.Popen` and `ai_client.send` for hermetic tests.
|
|
|
|
---
|
|
|
|
## See Also
|
|
|
|
- **[guide_architecture.md](guide_architecture.md)** — Threading model
|
|
- **[guide_mma.md](guide_mma.md)** — MMA concepts (4-Tier hierarchy, Token Firewall)
|
|
- **[guide_app_controller.md](guide_app_controller.md)** — How the conductor is owned by the controller
|
|
- **[guide_models.md](guide_models.md)** — `Ticket` and `Track` data structures
|
|
- **`scripts/mma_exec.py`** — The sub-agent entry point
|
|
- **`scripts/mma.ps1`** — PowerShell wrapper
|
|
- **`conductor/workflow.md`**](../conductor/workflow.md) — Track execution protocol
|