570 lines
20 KiB
Markdown
570 lines
20 KiB
Markdown
# `src/multi_agent_conductor.py` & `src/dag_engine.py` — MMA Engine
|
|
|
|
[Top](../README.md) | [Architecture](guide_architecture.md) | [Testing](guide_testing.md) | [MMA (concepts)](guide_mma.md)
|
|
|
|
---
|
|
|
|
## Overview
|
|
|
|
The MMA (Multi-Model Architecture) engine orchestrates **parallel AI worker execution** for implementing multi-ticket tracks. Two files:
|
|
- **`src/multi_agent_conductor.py`** (~28KB) — the high-level orchestrator, worker pool, ticket dispatch
|
|
- **`src/dag_engine.py`** (~10KB) — the DAG resolution, cycle detection, topological sort
|
|
|
|
Together they implement a **non-blocking execution engine** with thread-safe state management, configurable concurrency, and programmable execution modes (Auto-Queue vs Step Mode).
|
|
|
|
---
|
|
|
|
## Architecture
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────┐
|
|
│ User: "Implement Track X" │
|
|
└─────────────────┬───────────────────────────────┘
|
|
│ controller.dispatch_mma_track(track_id)
|
|
▼
|
|
┌─────────────────────────────────────────────────┐
|
|
│ MultiAgentConductor │
|
|
│ - Loads track from conductor/ directory │
|
|
│ - Builds TrackDAG from ticket dependencies │
|
|
│ - Detects cycles │
|
|
│ - Starts WorkerPool │
|
|
└─────────────────┬───────────────────────────────┘
|
|
│ enqueues ready tickets
|
|
▼
|
|
┌─────────────────────────────────────────────────┐
|
|
│ WorkerPool │
|
|
│ - Configurable concurrency (default 4) │
|
|
│ - Threads pull ready tickets, spawn workers │
|
|
│ - Workers call mma_exec.py with sub-prompt │
|
|
└─────────────────┬───────────────────────────────┘
|
|
│ per ticket
|
|
▼
|
|
┌─────────────────────────────────────────────────┐
|
|
│ mma_exec.py (tier3-worker / tier4-qa) │
|
|
│ - Stateless Tier 3 or Tier 4 sub-agent │
|
|
│ - TDD cycle: red → green → refactor │
|
|
│ - Commits per task │
|
|
└─────────────────┬───────────────────────────────┘
|
|
│ completes
|
|
▼
|
|
┌─────────────────────────────────────────────────┐
|
|
│ Conductor: │
|
|
│ - Records ticket result │
|
|
│ - Updates DAG (downstream tickets unblocked) │
|
|
│ - Emits events to controller.event_queue │
|
|
│ - Writes logs to logs/mma_delegation.log │
|
|
└─────────────────────────────────────────────────┘
|
|
```
|
|
|
|
---
|
|
|
|
## The `TrackDAG` (in `src/dag_engine.py`)
|
|
|
|
The `TrackDAG` class holds the ticket dependency graph for a single track.
|
|
|
|
### Data Structures
|
|
|
|
```python
|
|
class TrackDAG:
|
|
"""Directed acyclic graph of tickets in a track."""
|
|
def __init__(self, tickets: list[dict]):
|
|
# ticket_id -> {status, depends_on[], blocks[]}
|
|
self.nodes: dict[str, TicketNode] = {}
|
|
# ticket_id -> list of ticket_ids it depends on
|
|
self.edges: dict[str, set[str]] = {}
|
|
# reverse: ticket_id -> list of ticket_ids that depend on it
|
|
self.reverse_edges: dict[str, set[str]] = {}
|
|
|
|
def ready_tickets(self) -> list[str]:
|
|
"""Tickets with all dependencies DONE and status PENDING."""
|
|
|
|
def is_blocked(self, ticket_id: str) -> bool:
|
|
"""Cascades blocked status from upstream."""
|
|
|
|
def mark_done(self, ticket_id: str) -> None:
|
|
"""Update downstream tickets' blocked status."""
|
|
|
|
def detect_cycles(self) -> list[list[str]] | None:
|
|
"""Returns list of cycles (each cycle = list of ticket_ids), or None if acyclic."""
|
|
```
|
|
|
|
### `TicketNode`
|
|
|
|
```python
|
|
@dataclass
|
|
class TicketNode:
|
|
ticket_id: str
|
|
status: Literal["pending", "running", "done", "blocked", "skipped"]
|
|
priority: Literal["high", "medium", "low"] = "medium"
|
|
depends_on: set[str] = field(default_factory=set)
|
|
blocks: set[str] = field(default_factory=set)
|
|
result: dict | None = None # Populated when done
|
|
error: str | None = None
|
|
```
|
|
|
|
### `detect_cycles`
|
|
|
|
Implements **iterative DFS** to avoid recursion overhead:
|
|
|
|
```python
|
|
def detect_cycles(self) -> list[list[str]] | None:
|
|
"""Iterative DFS cycle detection. O(V+E)."""
|
|
WHITE, GRAY, BLACK = 0, 1, 2
|
|
color = {tid: WHITE for tid in self.nodes}
|
|
parent = {tid: None for tid in self.nodes}
|
|
stack = []
|
|
cycles = []
|
|
|
|
for start in self.nodes:
|
|
if color[start] != WHITE:
|
|
continue
|
|
stack.append((start, iter(self.edges[start])))
|
|
while stack:
|
|
node, children = stack[-1]
|
|
try:
|
|
child = next(children)
|
|
if color[child] == GRAY:
|
|
# Back edge: cycle found
|
|
cycle = [child]
|
|
while node != child:
|
|
cycle.append(node)
|
|
node = parent[node]
|
|
cycle.append(child)
|
|
cycles.append(list(reversed(cycle)))
|
|
elif color[child] == WHITE:
|
|
color[child] = GRAY
|
|
parent[child] = node
|
|
stack.append((child, iter(self.edges[child])))
|
|
except StopIteration:
|
|
color[node] = BLACK
|
|
stack.pop()
|
|
return cycles if cycles else None
|
|
```
|
|
|
|
Returns `None` for an acyclic graph, or a list of cycles for debugging.
|
|
|
|
### `ready_tickets` (Kahn's Algorithm Variant)
|
|
|
|
Returns the set of tickets that are PENDING and have all dependencies DONE.
|
|
|
|
```python
|
|
def ready_tickets(self) -> list[str]:
|
|
ready = []
|
|
for tid, node in self.nodes.items():
|
|
if node.status != "pending":
|
|
continue
|
|
if all(self.nodes[dep].status == "done" for dep in node.depends_on):
|
|
ready.append(tid)
|
|
return ready
|
|
```
|
|
|
|
Sorted by priority (high > medium > low) for deterministic dispatch order.
|
|
|
|
### `is_blocked` (Transitive Blocking Propagation)
|
|
|
|
When a ticket is BLOCKED, all downstream tickets are also BLOCKED:
|
|
|
|
```python
|
|
def is_blocked(self, ticket_id: str) -> bool:
|
|
"""Cascades blocked status from upstream (any failed/skipped/blocked dep)."""
|
|
for dep in self.nodes[ticket_id].depends_on:
|
|
if self.nodes[dep].status in ("blocked", "skipped"):
|
|
return True
|
|
if self.is_blocked(dep): # Recursive cascade
|
|
return True
|
|
return False
|
|
```
|
|
|
|
This prevents execution stalls when an upstream ticket is blocked.
|
|
|
|
---
|
|
|
|
## The `ExecutionEngine`
|
|
|
|
The execution engine handles **state machine transitions** between Auto-Queue and Step Mode.
|
|
|
|
### Execution Modes
|
|
|
|
```python
|
|
class ExecutionMode(Enum):
|
|
AUTO_QUEUE = "auto_queue" # Autonomous worker spawning
|
|
STEP_MODE = "step_mode" # Explicit manual approval per transition
|
|
```
|
|
|
|
### `ExecutionEngine.tick()`
|
|
|
|
The main loop, called by the conductor at a configurable interval (default 100ms):
|
|
|
|
```python
|
|
class ExecutionEngine:
|
|
def __init__(self, dag: TrackDAG, mode: ExecutionMode = ExecutionMode.AUTO_QUEUE):
|
|
self.dag = dag
|
|
self.mode = mode
|
|
self.pending_approval: list[str] = [] # Ticket IDs awaiting approval in Step Mode
|
|
|
|
def tick(self) -> list[str]:
|
|
"""Returns list of ticket IDs to dispatch in this tick."""
|
|
if self.mode == ExecutionMode.AUTO_QUEUE:
|
|
return self.dag.ready_tickets()
|
|
elif self.mode == ExecutionMode.STEP_MODE:
|
|
# Only return one ticket at a time, requiring explicit approval
|
|
if self.pending_approval:
|
|
return self.pending_approval[:1]
|
|
ready = self.dag.ready_tickets()
|
|
if ready:
|
|
self.pending_approval = ready[:1]
|
|
return self.pending_approval
|
|
```
|
|
|
|
### Programmable Transitions
|
|
|
|
```python
|
|
def set_mode(self, mode: ExecutionMode) -> None:
|
|
self.mode = mode
|
|
if mode == ExecutionMode.AUTO_QUEUE:
|
|
self.pending_approval.clear()
|
|
|
|
def approve_next(self) -> str | None:
|
|
"""Returns the approved ticket ID, or None if nothing pending."""
|
|
if not self.pending_approval:
|
|
return None
|
|
return self.pending_approval.pop(0)
|
|
|
|
def reject_next(self) -> str | None:
|
|
"""Returns the rejected ticket ID, marking it BLOCKED."""
|
|
if not self.pending_approval:
|
|
return None
|
|
tid = self.pending_approval.pop(0)
|
|
self.dag.nodes[tid].status = "blocked"
|
|
return tid
|
|
```
|
|
|
|
These are exposed to the GUI as "Step Mode" controls and to the Hook API as `set_execution_mode` / `approve_ticket` / `reject_ticket`.
|
|
|
|
---
|
|
|
|
## The `MultiAgentConductor` (in `src/multi_agent_conductor.py`)
|
|
|
|
### `__init__(self, controller: AppController)`
|
|
|
|
```python
|
|
class MultiAgentConductor:
|
|
def __init__(self, controller: AppController):
|
|
self.controller = controller
|
|
self.dag_engine: ExecutionEngine | None = None
|
|
self.worker_pool: WorkerPool | None = None
|
|
self.current_track: Track | None = None
|
|
self.tier_assignments: dict[str, str] = {} # ticket_id -> "tier3-worker"
|
|
self.persona_overrides: dict[str, str] = {} # ticket_id -> persona_name
|
|
self._stop_event = threading.Event()
|
|
self._dispatch_thread: threading.Thread | None = None
|
|
```
|
|
|
|
### `load_track(track_id: str) -> Track`
|
|
|
|
Reads `conductor/tracks/<track_id>/plan.md` and parses the ticket list:
|
|
|
|
```python
|
|
def load_track(self, track_id: str) -> Track:
|
|
track_dir = self.controller.paths.tracks_dir / track_id
|
|
plan_path = track_dir / "plan.md"
|
|
tickets = parse_plan_md(plan_path) # Returns list[dict]
|
|
track = Track(id=track_id, tickets=tickets, plan_path=plan_path)
|
|
return track
|
|
```
|
|
|
|
The track is then passed to the DAG engine:
|
|
|
|
```python
|
|
def start(self, track: Track, mode: ExecutionMode = ExecutionMode.AUTO_QUEUE) -> None:
|
|
self.current_track = track
|
|
self.dag_engine = ExecutionEngine(TrackDAG(track.tickets), mode)
|
|
self.worker_pool = WorkerPool(max_concurrency=self.controller.app_state.max_concurrency)
|
|
self.worker_pool.start()
|
|
self._dispatch_thread = threading.Thread(target=self._dispatch_loop, daemon=True)
|
|
self._dispatch_thread.start()
|
|
```
|
|
|
|
### `_dispatch_loop` (Background Thread)
|
|
|
|
```python
|
|
def _dispatch_loop(self) -> None:
|
|
while not self._stop_event.is_set():
|
|
ready = self.dag_engine.tick()
|
|
for ticket_id in ready:
|
|
if self.worker_pool.has_capacity():
|
|
self._spawn_worker(ticket_id)
|
|
time.sleep(0.1) # 100ms tick
|
|
```
|
|
|
|
The loop runs in a daemon thread. The main thread can call `self.stop()` to break out.
|
|
|
|
### `_spawn_worker(ticket_id: str)`
|
|
|
|
Spawns a worker via `mma_exec.py`:
|
|
|
|
```python
|
|
def _spawn_worker(self, ticket_id: str) -> None:
|
|
ticket = self.current_track.get_ticket(ticket_id)
|
|
role = self.tier_assignments.get(ticket_id, "tier3-worker")
|
|
persona = self.persona_overrides.get(ticket_id)
|
|
|
|
prompt = self._build_worker_prompt(ticket, persona)
|
|
context = self._gather_context(ticket)
|
|
|
|
future = self.worker_pool.submit(role, prompt, context)
|
|
future.add_done_callback(lambda f: self._on_worker_done(ticket_id, f))
|
|
```
|
|
|
|
The `WorkerPool` is a `ThreadPoolExecutor`-backed pool with a semaphore for concurrency control.
|
|
|
|
### `_on_worker_done(ticket_id, future)`
|
|
|
|
Called when a worker finishes (success or failure):
|
|
|
|
```python
|
|
def _on_worker_done(self, ticket_id: str, future: Future) -> None:
|
|
try:
|
|
result = future.result()
|
|
self.dag_engine.dag.mark_done(ticket_id)
|
|
self.dag_engine.dag.nodes[ticket_id].result = result
|
|
self._emit_event(MMA_TICKET_COMPLETED, ticket_id, result)
|
|
except Exception as e:
|
|
self.dag_engine.dag.nodes[ticket_id].status = "blocked"
|
|
self.dag_engine.dag.nodes[ticket_id].error = str(e)
|
|
self._emit_event(MMA_TICKET_FAILED, ticket_id, str(e))
|
|
# Re-evaluate downstream blocking
|
|
for downstream in self.dag_engine.dag.reverse_edges[ticket_id]:
|
|
self.dag_engine.dag.mark_done(downstream)
|
|
```
|
|
|
|
### `_emit_event(type, *args)`
|
|
|
|
Pushes an event to the controller's `event_queue` for the GUI to consume:
|
|
|
|
```python
|
|
def _emit_event(self, event_type: str, *args) -> None:
|
|
self.controller.event_queue.put(Event(type=event_type, args=args, timestamp=time.time()))
|
|
```
|
|
|
|
The GUI polls `controller.event_queue.get_all()` once per frame and dispatches to render functions.
|
|
|
|
---
|
|
|
|
## The `WorkerPool`
|
|
|
|
A thin wrapper around `ThreadPoolExecutor` with concurrency limiting:
|
|
|
|
```python
|
|
class WorkerPool:
|
|
def __init__(self, max_concurrency: int = 4):
|
|
self.max_concurrency = max_concurrency
|
|
self.semaphore = threading.Semaphore(max_concurrency)
|
|
self.executor = ThreadPoolExecutor(max_workers=max_concurrency)
|
|
self.active_workers: set[str] = set() # ticket_ids currently running
|
|
|
|
def submit(self, role: str, prompt: str, context: dict) -> Future:
|
|
"""Submit a worker task. Returns a Future for the result."""
|
|
def _wrapped():
|
|
with self.semaphore:
|
|
self.active_workers.add(context["ticket_id"])
|
|
try:
|
|
return run_mma_worker(role, prompt, context)
|
|
finally:
|
|
self.active_workers.discard(context["ticket_id"])
|
|
return self.executor.submit(_wrapped)
|
|
|
|
def has_capacity(self) -> bool:
|
|
return len(self.active_workers) < self.max_concurrency
|
|
|
|
def stop(self, wait: bool = True) -> None:
|
|
self.executor.shutdown(wait=wait)
|
|
```
|
|
|
|
### `run_mma_worker`
|
|
|
|
Invokes `mma_exec.py` as a subprocess (not in-process) to enforce **Context Amnesia** — the sub-agent has zero state from the parent:
|
|
|
|
```python
|
|
def run_mma_worker(role: str, prompt: str, context: dict) -> dict:
|
|
"""Spawn a fresh tier3-worker sub-agent for the ticket."""
|
|
cmd = [
|
|
sys.executable, "scripts/mma_exec.py",
|
|
"--role", role,
|
|
"--ticket-id", context["ticket_id"],
|
|
]
|
|
# Pipe prompt via stdin
|
|
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
payload = json.dumps({"prompt": prompt, "context": context}).encode("utf-8")
|
|
stdout, stderr = proc.communicate(payload, timeout=600)
|
|
if proc.returncode != 0:
|
|
raise RuntimeError(f"Worker failed: {stderr.decode('utf-8')}")
|
|
return json.loads(stdout.decode("utf-8"))
|
|
```
|
|
|
|
This is the **Token Firewall** in action: each worker is a fresh subprocess with a clean context window, receiving only the prompt and the relevant context slice.
|
|
|
|
---
|
|
|
|
## Track Loading
|
|
|
|
Tracks live in the project's `conductor/tracks/` directory:
|
|
|
|
```
|
|
conductor/tracks/
|
|
├── 2026-05-20_initial_setup/
|
|
│ ├── plan.md # Ticket list with dependencies
|
|
│ ├── decisions.md # Architectural notes
|
|
│ └── tasks/
|
|
│ ├── 001_init_repo.md
|
|
│ ├── 002_install_deps.md
|
|
│ └── ...
|
|
└── 2026-06-02_command_palette/
|
|
├── plan.md
|
|
└── ...
|
|
```
|
|
|
|
### `plan.md` Format
|
|
|
|
```markdown
|
|
# Phase 1: Foundation
|
|
- [ ] Task 1.1: Initialize the project
|
|
- [x] Task 1.2: Install dependencies (done)
|
|
- [~] Task 1.3: Configure paths (in progress)
|
|
|
|
# Phase 2: Implementation
|
|
- [ ] Task 2.1: Add command palette [depends: 1.3]
|
|
- [ ] Task 2.2: Hook API integration [depends: 2.1]
|
|
- [ ] Task 2.3: Documentation [depends: 2.2]
|
|
```
|
|
|
|
The parser extracts:
|
|
- Ticket ID (`1.1`, `1.2`, ...)
|
|
- Title (text after `: `)
|
|
- Status (`[ ]` = pending, `[~]` = in progress, `[x]` = done, `[!]` = blocked)
|
|
- Dependencies (`[depends: ...]`)
|
|
|
|
---
|
|
|
|
## Thread Safety
|
|
|
|
The conductor uses:
|
|
- `threading.Lock` for `_emit_event` (queue.put is atomic but logging needs guarding)
|
|
- `threading.Event` for `_stop_event` (clean shutdown signaling)
|
|
- `ThreadPoolExecutor` for worker isolation
|
|
- `subprocess.Popen` for hard isolation (Tier 3/4 sub-agents)
|
|
|
|
### `threading.local()` Context
|
|
|
|
The `ai_client` uses `threading.local()` to track the **source tier** of each request (so comms logs can be tagged with the originating tier). The conductor passes the tier name when calling `ai_client.send()`.
|
|
|
|
```python
|
|
# In ai_client.py
|
|
_local = threading.local()
|
|
|
|
def send(prompt, ...):
|
|
source = getattr(_local, 'source', 'main')
|
|
comms_log(f"[{source}] {prompt[:50]}...")
|
|
|
|
# In multi_agent_conductor.py
|
|
_local.source = "tier3-worker"
|
|
ai_client.send(prompt)
|
|
```
|
|
|
|
---
|
|
|
|
## Stop & Cleanup
|
|
|
|
```python
|
|
def stop(self) -> None:
|
|
"""Stop the conductor. Workers continue to completion or are cancelled."""
|
|
self._stop_event.set()
|
|
if self._dispatch_thread:
|
|
self._dispatch_thread.join(timeout=5)
|
|
if self.worker_pool:
|
|
self.worker_pool.stop(wait=False)
|
|
```
|
|
|
|
The conductor is designed for **graceful shutdown**: in-flight workers complete, no new ones spawn.
|
|
|
|
---
|
|
|
|
## Observability
|
|
|
|
### Logging
|
|
|
|
All conductor activity is logged to `logs/mma_delegation.log` (JSON-L format):
|
|
|
|
```json
|
|
{
|
|
"timestamp": "2026-06-02T12:34:56.789Z",
|
|
"event": "ticket_dispatched",
|
|
"track_id": "2026-06-02_command_palette",
|
|
"ticket_id": "2.1",
|
|
"role": "tier3-worker",
|
|
"persona": "code-implementer"
|
|
}
|
|
```
|
|
|
|
Per-worker logs are written to `logs/agents/<track_id>_<ticket_id>_<role>_<timestamp>.log`.
|
|
|
|
### GUI Dashboard
|
|
|
|
The `MMA Observability Dashboard` (in `gui_2.py`) reads from the conductor's state:
|
|
- Track list with progress bars
|
|
- Active ticket's worker output stream
|
|
- DAG visualization (via `imgui-node-editor`)
|
|
- Per-tier strategy streams
|
|
|
|
---
|
|
|
|
## Beads Mode Integration
|
|
|
|
When the project is in Beads mode (`.beads/` directory exists), the conductor **delegates to the Beads CLI** instead of parsing `plan.md`:
|
|
|
|
```python
|
|
def load_track(self, track_id: str) -> Track:
|
|
if self.controller.app_state.use_beads:
|
|
from src.beads_client import BeadsClient
|
|
client = BeadsClient()
|
|
tickets = client.list_tickets(track_id=track_id)
|
|
return Track(id=track_id, tickets=tickets)
|
|
else:
|
|
return self._load_markdown_track(track_id)
|
|
```
|
|
|
|
The downstream conductor logic is the same — it operates on a `Track` object regardless of source.
|
|
|
|
See **[docs/guide_beads.md](guide_beads.md)** for Beads details.
|
|
|
|
---
|
|
|
|
## Testing
|
|
|
|
### Unit Tests
|
|
|
|
- `tests/test_dag_engine.py` — `TrackDAG` cycle detection, ready_tickets, blocking cascade
|
|
- `tests/test_execution_engine.py` — mode transitions, approval flow
|
|
- `tests/test_worker_pool.py` — concurrency limit, has_capacity, stop
|
|
- `tests/test_mma_conductor.py` — track loading, dispatch flow, error handling
|
|
|
|
### Integration Tests (live_gui)
|
|
|
|
`tests/test_mma_observability_dashboard.py` — drives the dashboard via Hook API.
|
|
|
|
### Mocking
|
|
|
|
Tests use `unittest.mock.patch` to mock `subprocess.Popen` and `ai_client.send` for hermetic tests.
|
|
|
|
---
|
|
|
|
## See Also
|
|
|
|
- **[guide_architecture.md](guide_architecture.md)** — Threading model
|
|
- **[guide_mma.md](guide_mma.md)** — MMA concepts (4-Tier hierarchy, Token Firewall)
|
|
- **[guide_app_controller.md](guide_app_controller.md)** — How the conductor is owned by the controller
|
|
- **[guide_models.md](guide_models.md)** — `Ticket` and `Track` data structures
|
|
- **`scripts/mma_exec.py`** — The sub-agent entry point
|
|
- **`scripts/mma.ps1`** — PowerShell wrapper
|
|
- **`conductor/workflow.md`**](../../conductor/workflow.md) — Track execution protocol
|