# `src/multi_agent_conductor.py` & `src/dag_engine.py` — MMA Engine [Top](../README.md) | [Architecture](guide_architecture.md) | [Testing](guide_testing.md) | [MMA (concepts)](guide_mma.md) --- ## Overview The MMA (Multi-Model Architecture) engine orchestrates **parallel AI worker execution** for implementing multi-ticket tracks. Two files: - **`src/multi_agent_conductor.py`** (~28KB) — the high-level orchestrator, worker pool, ticket dispatch - **`src/dag_engine.py`** (~10KB) — the DAG resolution, cycle detection, topological sort Together they implement a **non-blocking execution engine** with thread-safe state management, configurable concurrency, and programmable execution modes (Auto-Queue vs Step Mode). --- ## Architecture ``` ┌─────────────────────────────────────────────────┐ │ User: "Implement Track X" │ └─────────────────┬───────────────────────────────┘ │ controller.dispatch_mma_track(track_id) ▼ ┌─────────────────────────────────────────────────┐ │ MultiAgentConductor │ │ - Loads track from conductor/ directory │ │ - Builds TrackDAG from ticket dependencies │ │ - Detects cycles │ │ - Starts WorkerPool │ └─────────────────┬───────────────────────────────┘ │ enqueues ready tickets ▼ ┌─────────────────────────────────────────────────┐ │ WorkerPool │ │ - Configurable concurrency (default 4) │ │ - Threads pull ready tickets, spawn workers │ │ - Workers call mma_exec.py with sub-prompt │ └─────────────────┬───────────────────────────────┘ │ per ticket ▼ ┌─────────────────────────────────────────────────┐ │ mma_exec.py (tier3-worker / tier4-qa) │ │ - Stateless Tier 3 or Tier 4 sub-agent │ │ - TDD cycle: red → green → refactor │ │ - Commits per task │ └─────────────────┬───────────────────────────────┘ │ completes ▼ ┌─────────────────────────────────────────────────┐ │ Conductor: │ │ - Records ticket result │ │ - Updates DAG (downstream tickets unblocked) │ │ - Emits events to controller.event_queue │ │ - Writes logs to logs/mma_delegation.log │ └─────────────────────────────────────────────────┘ ``` --- ## The `TrackDAG` (in `src/dag_engine.py`) The `TrackDAG` class holds the ticket dependency graph for a single track. ### Data Structures ```python class TrackDAG: """Directed acyclic graph of tickets in a track.""" def __init__(self, tickets: list[dict]): # ticket_id -> {status, depends_on[], blocks[]} self.nodes: dict[str, TicketNode] = {} # ticket_id -> list of ticket_ids it depends on self.edges: dict[str, set[str]] = {} # reverse: ticket_id -> list of ticket_ids that depend on it self.reverse_edges: dict[str, set[str]] = {} def ready_tickets(self) -> list[str]: """Tickets with all dependencies DONE and status PENDING.""" def is_blocked(self, ticket_id: str) -> bool: """Cascades blocked status from upstream.""" def mark_done(self, ticket_id: str) -> None: """Update downstream tickets' blocked status.""" def detect_cycles(self) -> list[list[str]] | None: """Returns list of cycles (each cycle = list of ticket_ids), or None if acyclic.""" ``` ### `TicketNode` ```python @dataclass class TicketNode: ticket_id: str status: Literal["pending", "running", "done", "blocked", "skipped"] priority: Literal["high", "medium", "low"] = "medium" depends_on: set[str] = field(default_factory=set) blocks: set[str] = field(default_factory=set) result: dict | None = None # Populated when done error: str | None = None ``` ### `detect_cycles` Implements **iterative DFS** to avoid recursion overhead: ```python def detect_cycles(self) -> list[list[str]] | None: """Iterative DFS cycle detection. O(V+E).""" WHITE, GRAY, BLACK = 0, 1, 2 color = {tid: WHITE for tid in self.nodes} parent = {tid: None for tid in self.nodes} stack = [] cycles = [] for start in self.nodes: if color[start] != WHITE: continue stack.append((start, iter(self.edges[start]))) while stack: node, children = stack[-1] try: child = next(children) if color[child] == GRAY: # Back edge: cycle found cycle = [child] while node != child: cycle.append(node) node = parent[node] cycle.append(child) cycles.append(list(reversed(cycle))) elif color[child] == WHITE: color[child] = GRAY parent[child] = node stack.append((child, iter(self.edges[child]))) except StopIteration: color[node] = BLACK stack.pop() return cycles if cycles else None ``` Returns `None` for an acyclic graph, or a list of cycles for debugging. ### `ready_tickets` (Kahn's Algorithm Variant) Returns the set of tickets that are PENDING and have all dependencies DONE. ```python def ready_tickets(self) -> list[str]: ready = [] for tid, node in self.nodes.items(): if node.status != "pending": continue if all(self.nodes[dep].status == "done" for dep in node.depends_on): ready.append(tid) return ready ``` Sorted by priority (high > medium > low) for deterministic dispatch order. ### `is_blocked` (Transitive Blocking Propagation) When a ticket is BLOCKED, all downstream tickets are also BLOCKED: ```python def is_blocked(self, ticket_id: str) -> bool: """Cascades blocked status from upstream (any failed/skipped/blocked dep).""" for dep in self.nodes[ticket_id].depends_on: if self.nodes[dep].status in ("blocked", "skipped"): return True if self.is_blocked(dep): # Recursive cascade return True return False ``` This prevents execution stalls when an upstream ticket is blocked. --- ## The `ExecutionEngine` The execution engine handles **state machine transitions** between Auto-Queue and Step Mode. ### Execution Modes ```python class ExecutionMode(Enum): AUTO_QUEUE = "auto_queue" # Autonomous worker spawning STEP_MODE = "step_mode" # Explicit manual approval per transition ``` ### `ExecutionEngine.tick()` The main loop, called by the conductor at a configurable interval (default 100ms): ```python class ExecutionEngine: def __init__(self, dag: TrackDAG, mode: ExecutionMode = ExecutionMode.AUTO_QUEUE): self.dag = dag self.mode = mode self.pending_approval: list[str] = [] # Ticket IDs awaiting approval in Step Mode def tick(self) -> list[str]: """Returns list of ticket IDs to dispatch in this tick.""" if self.mode == ExecutionMode.AUTO_QUEUE: return self.dag.ready_tickets() elif self.mode == ExecutionMode.STEP_MODE: # Only return one ticket at a time, requiring explicit approval if self.pending_approval: return self.pending_approval[:1] ready = self.dag.ready_tickets() if ready: self.pending_approval = ready[:1] return self.pending_approval ``` ### Programmable Transitions ```python def set_mode(self, mode: ExecutionMode) -> None: self.mode = mode if mode == ExecutionMode.AUTO_QUEUE: self.pending_approval.clear() def approve_next(self) -> str | None: """Returns the approved ticket ID, or None if nothing pending.""" if not self.pending_approval: return None return self.pending_approval.pop(0) def reject_next(self) -> str | None: """Returns the rejected ticket ID, marking it BLOCKED.""" if not self.pending_approval: return None tid = self.pending_approval.pop(0) self.dag.nodes[tid].status = "blocked" return tid ``` These are exposed to the GUI as "Step Mode" controls and to the Hook API as `set_execution_mode` / `approve_ticket` / `reject_ticket`. --- ## The `MultiAgentConductor` (in `src/multi_agent_conductor.py`) ### `__init__(self, controller: AppController)` ```python class MultiAgentConductor: def __init__(self, controller: AppController): self.controller = controller self.dag_engine: ExecutionEngine | None = None self.worker_pool: WorkerPool | None = None self.current_track: Track | None = None self.tier_assignments: dict[str, str] = {} # ticket_id -> "tier3-worker" self.persona_overrides: dict[str, str] = {} # ticket_id -> persona_name self._stop_event = threading.Event() self._dispatch_thread: threading.Thread | None = None ``` ### `load_track(track_id: str) -> Track` Reads `conductor/tracks//plan.md` and parses the ticket list: ```python def load_track(self, track_id: str) -> Track: track_dir = self.controller.paths.tracks_dir / track_id plan_path = track_dir / "plan.md" tickets = parse_plan_md(plan_path) # Returns list[dict] track = Track(id=track_id, tickets=tickets, plan_path=plan_path) return track ``` The track is then passed to the DAG engine: ```python def start(self, track: Track, mode: ExecutionMode = ExecutionMode.AUTO_QUEUE) -> None: self.current_track = track self.dag_engine = ExecutionEngine(TrackDAG(track.tickets), mode) self.worker_pool = WorkerPool(max_concurrency=self.controller.app_state.max_concurrency) self.worker_pool.start() self._dispatch_thread = threading.Thread(target=self._dispatch_loop, daemon=True) self._dispatch_thread.start() ``` ### `_dispatch_loop` (Background Thread) ```python def _dispatch_loop(self) -> None: while not self._stop_event.is_set(): ready = self.dag_engine.tick() for ticket_id in ready: if self.worker_pool.has_capacity(): self._spawn_worker(ticket_id) time.sleep(0.1) # 100ms tick ``` The loop runs in a daemon thread. The main thread can call `self.stop()` to break out. ### `_spawn_worker(ticket_id: str)` Spawns a worker via `mma_exec.py`: ```python def _spawn_worker(self, ticket_id: str) -> None: ticket = self.current_track.get_ticket(ticket_id) role = self.tier_assignments.get(ticket_id, "tier3-worker") persona = self.persona_overrides.get(ticket_id) prompt = self._build_worker_prompt(ticket, persona) context = self._gather_context(ticket) future = self.worker_pool.submit(role, prompt, context) future.add_done_callback(lambda f: self._on_worker_done(ticket_id, f)) ``` The `WorkerPool` is a `ThreadPoolExecutor`-backed pool with a semaphore for concurrency control. ### `_on_worker_done(ticket_id, future)` Called when a worker finishes (success or failure): ```python def _on_worker_done(self, ticket_id: str, future: Future) -> None: try: result = future.result() self.dag_engine.dag.mark_done(ticket_id) self.dag_engine.dag.nodes[ticket_id].result = result self._emit_event(MMA_TICKET_COMPLETED, ticket_id, result) except Exception as e: self.dag_engine.dag.nodes[ticket_id].status = "blocked" self.dag_engine.dag.nodes[ticket_id].error = str(e) self._emit_event(MMA_TICKET_FAILED, ticket_id, str(e)) # Re-evaluate downstream blocking for downstream in self.dag_engine.dag.reverse_edges[ticket_id]: self.dag_engine.dag.mark_done(downstream) ``` ### `_emit_event(type, *args)` Pushes an event to the controller's `event_queue` for the GUI to consume: ```python def _emit_event(self, event_type: str, *args) -> None: self.controller.event_queue.put(Event(type=event_type, args=args, timestamp=time.time())) ``` The GUI polls `controller.event_queue.get_all()` once per frame and dispatches to render functions. --- ## The `WorkerPool` A thin wrapper around `ThreadPoolExecutor` with concurrency limiting: ```python class WorkerPool: def __init__(self, max_concurrency: int = 4): self.max_concurrency = max_concurrency self.semaphore = threading.Semaphore(max_concurrency) self.executor = ThreadPoolExecutor(max_workers=max_concurrency) self.active_workers: set[str] = set() # ticket_ids currently running def submit(self, role: str, prompt: str, context: dict) -> Future: """Submit a worker task. Returns a Future for the result.""" def _wrapped(): with self.semaphore: self.active_workers.add(context["ticket_id"]) try: return run_mma_worker(role, prompt, context) finally: self.active_workers.discard(context["ticket_id"]) return self.executor.submit(_wrapped) def has_capacity(self) -> bool: return len(self.active_workers) < self.max_concurrency def stop(self, wait: bool = True) -> None: self.executor.shutdown(wait=wait) ``` ### `run_mma_worker` Invokes `mma_exec.py` as a subprocess (not in-process) to enforce **Context Amnesia** — the sub-agent has zero state from the parent: ```python def run_mma_worker(role: str, prompt: str, context: dict) -> dict: """Spawn a fresh tier3-worker sub-agent for the ticket.""" cmd = [ sys.executable, "scripts/mma_exec.py", "--role", role, "--ticket-id", context["ticket_id"], ] # Pipe prompt via stdin proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) payload = json.dumps({"prompt": prompt, "context": context}).encode("utf-8") stdout, stderr = proc.communicate(payload, timeout=600) if proc.returncode != 0: raise RuntimeError(f"Worker failed: {stderr.decode('utf-8')}") return json.loads(stdout.decode("utf-8")) ``` This is the **Token Firewall** in action: each worker is a fresh subprocess with a clean context window, receiving only the prompt and the relevant context slice. --- ## Track Loading Tracks live in the project's `conductor/tracks/` directory: ``` conductor/tracks/ ├── 2026-05-20_initial_setup/ │ ├── plan.md # Ticket list with dependencies │ ├── decisions.md # Architectural notes │ └── tasks/ │ ├── 001_init_repo.md │ ├── 002_install_deps.md │ └── ... └── 2026-06-02_command_palette/ ├── plan.md └── ... ``` ### `plan.md` Format ```markdown # Phase 1: Foundation - [ ] Task 1.1: Initialize the project - [x] Task 1.2: Install dependencies (done) - [~] Task 1.3: Configure paths (in progress) # Phase 2: Implementation - [ ] Task 2.1: Add command palette [depends: 1.3] - [ ] Task 2.2: Hook API integration [depends: 2.1] - [ ] Task 2.3: Documentation [depends: 2.2] ``` The parser extracts: - Ticket ID (`1.1`, `1.2`, ...) - Title (text after `: `) - Status (`[ ]` = pending, `[~]` = in progress, `[x]` = done, `[!]` = blocked) - Dependencies (`[depends: ...]`) --- ## Thread Safety The conductor uses: - `threading.Lock` for `_emit_event` (queue.put is atomic but logging needs guarding) - `threading.Event` for `_stop_event` (clean shutdown signaling) - `ThreadPoolExecutor` for worker isolation - `subprocess.Popen` for hard isolation (Tier 3/4 sub-agents) ### `threading.local()` Context The `ai_client` uses `threading.local()` to track the **source tier** of each request (so comms logs can be tagged with the originating tier). The conductor passes the tier name when calling `ai_client.send()`. ```python # In ai_client.py _local = threading.local() def send(prompt, ...): source = getattr(_local, 'source', 'main') comms_log(f"[{source}] {prompt[:50]}...") # In multi_agent_conductor.py _local.source = "tier3-worker" ai_client.send(prompt) ``` --- ## Stop & Cleanup ```python def stop(self) -> None: """Stop the conductor. Workers continue to completion or are cancelled.""" self._stop_event.set() if self._dispatch_thread: self._dispatch_thread.join(timeout=5) if self.worker_pool: self.worker_pool.stop(wait=False) ``` The conductor is designed for **graceful shutdown**: in-flight workers complete, no new ones spawn. --- ## Observability ### Logging All conductor activity is logged to `logs/mma_delegation.log` (JSON-L format): ```json { "timestamp": "2026-06-02T12:34:56.789Z", "event": "ticket_dispatched", "track_id": "2026-06-02_command_palette", "ticket_id": "2.1", "role": "tier3-worker", "persona": "code-implementer" } ``` Per-worker logs are written to `logs/agents/___.log`. ### GUI Dashboard The `MMA Observability Dashboard` (in `gui_2.py`) reads from the conductor's state: - Track list with progress bars - Active ticket's worker output stream - DAG visualization (via `imgui-node-editor`) - Per-tier strategy streams --- ## Beads Mode Integration When the project is in Beads mode (`.beads/` directory exists), the conductor **delegates to the Beads CLI** instead of parsing `plan.md`: ```python def load_track(self, track_id: str) -> Track: if self.controller.app_state.use_beads: from src.beads_client import BeadsClient client = BeadsClient() tickets = client.list_tickets(track_id=track_id) return Track(id=track_id, tickets=tickets) else: return self._load_markdown_track(track_id) ``` The downstream conductor logic is the same — it operates on a `Track` object regardless of source. See **[docs/guide_beads.md](guide_beads.md)** for Beads details. --- ## Testing ### Unit Tests - `tests/test_dag_engine.py` — `TrackDAG` cycle detection, ready_tickets, blocking cascade - `tests/test_execution_engine.py` — mode transitions, approval flow - `tests/test_worker_pool.py` — concurrency limit, has_capacity, stop - `tests/test_mma_conductor.py` — track loading, dispatch flow, error handling ### Integration Tests (live_gui) `tests/test_mma_observability_dashboard.py` — drives the dashboard via Hook API. ### Mocking Tests use `unittest.mock.patch` to mock `subprocess.Popen` and `ai_client.send` for hermetic tests. --- ## See Also - **[guide_architecture.md](guide_architecture.md)** — Threading model - **[guide_mma.md](guide_mma.md)** — MMA concepts (4-Tier hierarchy, Token Firewall) - **[guide_app_controller.md](guide_app_controller.md)** — How the conductor is owned by the controller - **[guide_models.md](guide_models.md)** — `Ticket` and `Track` data structures - **`scripts/mma_exec.py`** — The sub-agent entry point - **`scripts/mma.ps1`** — PowerShell wrapper - **`conductor/workflow.md`**](../../conductor/workflow.md) — Track execution protocol