diff --git a/docs/guide_multi_agent_conductor.md b/docs/guide_multi_agent_conductor.md new file mode 100644 index 00000000..9a03d0e8 --- /dev/null +++ b/docs/guide_multi_agent_conductor.md @@ -0,0 +1,569 @@ +# `src/multi_agent_conductor.py` & `src/dag_engine.py` — MMA Engine + +[Top](../README.md) | [Architecture](guide_architecture.md) | [Testing](guide_testing.md) | [MMA (concepts)](guide_mma.md) + +--- + +## Overview + +The MMA (Multi-Model Architecture) engine orchestrates **parallel AI worker execution** for implementing multi-ticket tracks. Two files: +- **`src/multi_agent_conductor.py`** (~28KB) — the high-level orchestrator, worker pool, ticket dispatch +- **`src/dag_engine.py`** (~10KB) — the DAG resolution, cycle detection, topological sort + +Together they implement a **non-blocking execution engine** with thread-safe state management, configurable concurrency, and programmable execution modes (Auto-Queue vs Step Mode). + +--- + +## Architecture + +``` +┌─────────────────────────────────────────────────┐ +│ User: "Implement Track X" │ +└─────────────────┬───────────────────────────────┘ + │ controller.dispatch_mma_track(track_id) + ▼ +┌─────────────────────────────────────────────────┐ +│ MultiAgentConductor │ +│ - Loads track from conductor/ directory │ +│ - Builds TrackDAG from ticket dependencies │ +│ - Detects cycles │ +│ - Starts WorkerPool │ +└─────────────────┬───────────────────────────────┘ + │ enqueues ready tickets + ▼ +┌─────────────────────────────────────────────────┐ +│ WorkerPool │ +│ - Configurable concurrency (default 4) │ +│ - Threads pull ready tickets, spawn workers │ +│ - Workers call mma_exec.py with sub-prompt │ +└─────────────────┬───────────────────────────────┘ + │ per ticket + ▼ +┌─────────────────────────────────────────────────┐ +│ mma_exec.py (tier3-worker / tier4-qa) │ +│ - Stateless Tier 3 or Tier 4 sub-agent │ +│ - TDD cycle: red → green → refactor │ +│ - Commits per task │ +└─────────────────┬───────────────────────────────┘ + │ completes + ▼ +┌─────────────────────────────────────────────────┐ +│ Conductor: │ +│ - Records ticket result │ +│ - Updates DAG (downstream tickets unblocked) │ +│ - Emits events to controller.event_queue │ +│ - Writes logs to logs/mma_delegation.log │ +└─────────────────────────────────────────────────┘ +``` + +--- + +## The `TrackDAG` (in `src/dag_engine.py`) + +The `TrackDAG` class holds the ticket dependency graph for a single track. + +### Data Structures + +```python +class TrackDAG: + """Directed acyclic graph of tickets in a track.""" + def __init__(self, tickets: list[dict]): + # ticket_id -> {status, depends_on[], blocks[]} + self.nodes: dict[str, TicketNode] = {} + # ticket_id -> list of ticket_ids it depends on + self.edges: dict[str, set[str]] = {} + # reverse: ticket_id -> list of ticket_ids that depend on it + self.reverse_edges: dict[str, set[str]] = {} + + def ready_tickets(self) -> list[str]: + """Tickets with all dependencies DONE and status PENDING.""" + + def is_blocked(self, ticket_id: str) -> bool: + """Cascades blocked status from upstream.""" + + def mark_done(self, ticket_id: str) -> None: + """Update downstream tickets' blocked status.""" + + def detect_cycles(self) -> list[list[str]] | None: + """Returns list of cycles (each cycle = list of ticket_ids), or None if acyclic.""" +``` + +### `TicketNode` + +```python +@dataclass +class TicketNode: + ticket_id: str + status: Literal["pending", "running", "done", "blocked", "skipped"] + priority: Literal["high", "medium", "low"] = "medium" + depends_on: set[str] = field(default_factory=set) + blocks: set[str] = field(default_factory=set) + result: dict | None = None # Populated when done + error: str | None = None +``` + +### `detect_cycles` + +Implements **iterative DFS** to avoid recursion overhead: + +```python +def detect_cycles(self) -> list[list[str]] | None: + """Iterative DFS cycle detection. O(V+E).""" + WHITE, GRAY, BLACK = 0, 1, 2 + color = {tid: WHITE for tid in self.nodes} + parent = {tid: None for tid in self.nodes} + stack = [] + cycles = [] + + for start in self.nodes: + if color[start] != WHITE: + continue + stack.append((start, iter(self.edges[start]))) + while stack: + node, children = stack[-1] + try: + child = next(children) + if color[child] == GRAY: + # Back edge: cycle found + cycle = [child] + while node != child: + cycle.append(node) + node = parent[node] + cycle.append(child) + cycles.append(list(reversed(cycle))) + elif color[child] == WHITE: + color[child] = GRAY + parent[child] = node + stack.append((child, iter(self.edges[child]))) + except StopIteration: + color[node] = BLACK + stack.pop() + return cycles if cycles else None +``` + +Returns `None` for an acyclic graph, or a list of cycles for debugging. + +### `ready_tickets` (Kahn's Algorithm Variant) + +Returns the set of tickets that are PENDING and have all dependencies DONE. + +```python +def ready_tickets(self) -> list[str]: + ready = [] + for tid, node in self.nodes.items(): + if node.status != "pending": + continue + if all(self.nodes[dep].status == "done" for dep in node.depends_on): + ready.append(tid) + return ready +``` + +Sorted by priority (high > medium > low) for deterministic dispatch order. + +### `is_blocked` (Transitive Blocking Propagation) + +When a ticket is BLOCKED, all downstream tickets are also BLOCKED: + +```python +def is_blocked(self, ticket_id: str) -> bool: + """Cascades blocked status from upstream (any failed/skipped/blocked dep).""" + for dep in self.nodes[ticket_id].depends_on: + if self.nodes[dep].status in ("blocked", "skipped"): + return True + if self.is_blocked(dep): # Recursive cascade + return True + return False +``` + +This prevents execution stalls when an upstream ticket is blocked. + +--- + +## The `ExecutionEngine` + +The execution engine handles **state machine transitions** between Auto-Queue and Step Mode. + +### Execution Modes + +```python +class ExecutionMode(Enum): + AUTO_QUEUE = "auto_queue" # Autonomous worker spawning + STEP_MODE = "step_mode" # Explicit manual approval per transition +``` + +### `ExecutionEngine.tick()` + +The main loop, called by the conductor at a configurable interval (default 100ms): + +```python +class ExecutionEngine: + def __init__(self, dag: TrackDAG, mode: ExecutionMode = ExecutionMode.AUTO_QUEUE): + self.dag = dag + self.mode = mode + self.pending_approval: list[str] = [] # Ticket IDs awaiting approval in Step Mode + + def tick(self) -> list[str]: + """Returns list of ticket IDs to dispatch in this tick.""" + if self.mode == ExecutionMode.AUTO_QUEUE: + return self.dag.ready_tickets() + elif self.mode == ExecutionMode.STEP_MODE: + # Only return one ticket at a time, requiring explicit approval + if self.pending_approval: + return self.pending_approval[:1] + ready = self.dag.ready_tickets() + if ready: + self.pending_approval = ready[:1] + return self.pending_approval +``` + +### Programmable Transitions + +```python +def set_mode(self, mode: ExecutionMode) -> None: + self.mode = mode + if mode == ExecutionMode.AUTO_QUEUE: + self.pending_approval.clear() + +def approve_next(self) -> str | None: + """Returns the approved ticket ID, or None if nothing pending.""" + if not self.pending_approval: + return None + return self.pending_approval.pop(0) + +def reject_next(self) -> str | None: + """Returns the rejected ticket ID, marking it BLOCKED.""" + if not self.pending_approval: + return None + tid = self.pending_approval.pop(0) + self.dag.nodes[tid].status = "blocked" + return tid +``` + +These are exposed to the GUI as "Step Mode" controls and to the Hook API as `set_execution_mode` / `approve_ticket` / `reject_ticket`. + +--- + +## The `MultiAgentConductor` (in `src/multi_agent_conductor.py`) + +### `__init__(self, controller: AppController)` + +```python +class MultiAgentConductor: + def __init__(self, controller: AppController): + self.controller = controller + self.dag_engine: ExecutionEngine | None = None + self.worker_pool: WorkerPool | None = None + self.current_track: Track | None = None + self.tier_assignments: dict[str, str] = {} # ticket_id -> "tier3-worker" + self.persona_overrides: dict[str, str] = {} # ticket_id -> persona_name + self._stop_event = threading.Event() + self._dispatch_thread: threading.Thread | None = None +``` + +### `load_track(track_id: str) -> Track` + +Reads `conductor/tracks//plan.md` and parses the ticket list: + +```python +def load_track(self, track_id: str) -> Track: + track_dir = self.controller.paths.tracks_dir / track_id + plan_path = track_dir / "plan.md" + tickets = parse_plan_md(plan_path) # Returns list[dict] + track = Track(id=track_id, tickets=tickets, plan_path=plan_path) + return track +``` + +The track is then passed to the DAG engine: + +```python +def start(self, track: Track, mode: ExecutionMode = ExecutionMode.AUTO_QUEUE) -> None: + self.current_track = track + self.dag_engine = ExecutionEngine(TrackDAG(track.tickets), mode) + self.worker_pool = WorkerPool(max_concurrency=self.controller.app_state.max_concurrency) + self.worker_pool.start() + self._dispatch_thread = threading.Thread(target=self._dispatch_loop, daemon=True) + self._dispatch_thread.start() +``` + +### `_dispatch_loop` (Background Thread) + +```python +def _dispatch_loop(self) -> None: + while not self._stop_event.is_set(): + ready = self.dag_engine.tick() + for ticket_id in ready: + if self.worker_pool.has_capacity(): + self._spawn_worker(ticket_id) + time.sleep(0.1) # 100ms tick +``` + +The loop runs in a daemon thread. The main thread can call `self.stop()` to break out. + +### `_spawn_worker(ticket_id: str)` + +Spawns a worker via `mma_exec.py`: + +```python +def _spawn_worker(self, ticket_id: str) -> None: + ticket = self.current_track.get_ticket(ticket_id) + role = self.tier_assignments.get(ticket_id, "tier3-worker") + persona = self.persona_overrides.get(ticket_id) + + prompt = self._build_worker_prompt(ticket, persona) + context = self._gather_context(ticket) + + future = self.worker_pool.submit(role, prompt, context) + future.add_done_callback(lambda f: self._on_worker_done(ticket_id, f)) +``` + +The `WorkerPool` is a `ThreadPoolExecutor`-backed pool with a semaphore for concurrency control. + +### `_on_worker_done(ticket_id, future)` + +Called when a worker finishes (success or failure): + +```python +def _on_worker_done(self, ticket_id: str, future: Future) -> None: + try: + result = future.result() + self.dag_engine.dag.mark_done(ticket_id) + self.dag_engine.dag.nodes[ticket_id].result = result + self._emit_event(MMA_TICKET_COMPLETED, ticket_id, result) + except Exception as e: + self.dag_engine.dag.nodes[ticket_id].status = "blocked" + self.dag_engine.dag.nodes[ticket_id].error = str(e) + self._emit_event(MMA_TICKET_FAILED, ticket_id, str(e)) + # Re-evaluate downstream blocking + for downstream in self.dag_engine.dag.reverse_edges[ticket_id]: + self.dag_engine.dag.mark_done(downstream) +``` + +### `_emit_event(type, *args)` + +Pushes an event to the controller's `event_queue` for the GUI to consume: + +```python +def _emit_event(self, event_type: str, *args) -> None: + self.controller.event_queue.put(Event(type=event_type, args=args, timestamp=time.time())) +``` + +The GUI polls `controller.event_queue.get_all()` once per frame and dispatches to render functions. + +--- + +## The `WorkerPool` + +A thin wrapper around `ThreadPoolExecutor` with concurrency limiting: + +```python +class WorkerPool: + def __init__(self, max_concurrency: int = 4): + self.max_concurrency = max_concurrency + self.semaphore = threading.Semaphore(max_concurrency) + self.executor = ThreadPoolExecutor(max_workers=max_concurrency) + self.active_workers: set[str] = set() # ticket_ids currently running + + def submit(self, role: str, prompt: str, context: dict) -> Future: + """Submit a worker task. Returns a Future for the result.""" + def _wrapped(): + with self.semaphore: + self.active_workers.add(context["ticket_id"]) + try: + return run_mma_worker(role, prompt, context) + finally: + self.active_workers.discard(context["ticket_id"]) + return self.executor.submit(_wrapped) + + def has_capacity(self) -> bool: + return len(self.active_workers) < self.max_concurrency + + def stop(self, wait: bool = True) -> None: + self.executor.shutdown(wait=wait) +``` + +### `run_mma_worker` + +Invokes `mma_exec.py` as a subprocess (not in-process) to enforce **Context Amnesia** — the sub-agent has zero state from the parent: + +```python +def run_mma_worker(role: str, prompt: str, context: dict) -> dict: + """Spawn a fresh tier3-worker sub-agent for the ticket.""" + cmd = [ + sys.executable, "scripts/mma_exec.py", + "--role", role, + "--ticket-id", context["ticket_id"], + ] + # Pipe prompt via stdin + proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + payload = json.dumps({"prompt": prompt, "context": context}).encode("utf-8") + stdout, stderr = proc.communicate(payload, timeout=600) + if proc.returncode != 0: + raise RuntimeError(f"Worker failed: {stderr.decode('utf-8')}") + return json.loads(stdout.decode("utf-8")) +``` + +This is the **Token Firewall** in action: each worker is a fresh subprocess with a clean context window, receiving only the prompt and the relevant context slice. + +--- + +## Track Loading + +Tracks live in the project's `conductor/tracks/` directory: + +``` +conductor/tracks/ +├── 2026-05-20_initial_setup/ +│ ├── plan.md # Ticket list with dependencies +│ ├── decisions.md # Architectural notes +│ └── tasks/ +│ ├── 001_init_repo.md +│ ├── 002_install_deps.md +│ └── ... +└── 2026-06-02_command_palette/ + ├── plan.md + └── ... +``` + +### `plan.md` Format + +```markdown +# Phase 1: Foundation +- [ ] Task 1.1: Initialize the project +- [x] Task 1.2: Install dependencies (done) +- [~] Task 1.3: Configure paths (in progress) + +# Phase 2: Implementation +- [ ] Task 2.1: Add command palette [depends: 1.3] +- [ ] Task 2.2: Hook API integration [depends: 2.1] +- [ ] Task 2.3: Documentation [depends: 2.2] +``` + +The parser extracts: +- Ticket ID (`1.1`, `1.2`, ...) +- Title (text after `: `) +- Status (`[ ]` = pending, `[~]` = in progress, `[x]` = done, `[!]` = blocked) +- Dependencies (`[depends: ...]`) + +--- + +## Thread Safety + +The conductor uses: +- `threading.Lock` for `_emit_event` (queue.put is atomic but logging needs guarding) +- `threading.Event` for `_stop_event` (clean shutdown signaling) +- `ThreadPoolExecutor` for worker isolation +- `subprocess.Popen` for hard isolation (Tier 3/4 sub-agents) + +### `threading.local()` Context + +The `ai_client` uses `threading.local()` to track the **source tier** of each request (so comms logs can be tagged with the originating tier). The conductor passes the tier name when calling `ai_client.send()`. + +```python +# In ai_client.py +_local = threading.local() + +def send(prompt, ...): + source = getattr(_local, 'source', 'main') + comms_log(f"[{source}] {prompt[:50]}...") + +# In multi_agent_conductor.py +_local.source = "tier3-worker" +ai_client.send(prompt) +``` + +--- + +## Stop & Cleanup + +```python +def stop(self) -> None: + """Stop the conductor. Workers continue to completion or are cancelled.""" + self._stop_event.set() + if self._dispatch_thread: + self._dispatch_thread.join(timeout=5) + if self.worker_pool: + self.worker_pool.stop(wait=False) +``` + +The conductor is designed for **graceful shutdown**: in-flight workers complete, no new ones spawn. + +--- + +## Observability + +### Logging + +All conductor activity is logged to `logs/mma_delegation.log` (JSON-L format): + +```json +{ + "timestamp": "2026-06-02T12:34:56.789Z", + "event": "ticket_dispatched", + "track_id": "2026-06-02_command_palette", + "ticket_id": "2.1", + "role": "tier3-worker", + "persona": "code-implementer" +} +``` + +Per-worker logs are written to `logs/agents/___.log`. + +### GUI Dashboard + +The `MMA Observability Dashboard` (in `gui_2.py`) reads from the conductor's state: +- Track list with progress bars +- Active ticket's worker output stream +- DAG visualization (via `imgui-node-editor`) +- Per-tier strategy streams + +--- + +## Beads Mode Integration + +When the project is in Beads mode (`.beads/` directory exists), the conductor **delegates to the Beads CLI** instead of parsing `plan.md`: + +```python +def load_track(self, track_id: str) -> Track: + if self.controller.app_state.use_beads: + from src.beads_client import BeadsClient + client = BeadsClient() + tickets = client.list_tickets(track_id=track_id) + return Track(id=track_id, tickets=tickets) + else: + return self._load_markdown_track(track_id) +``` + +The downstream conductor logic is the same — it operates on a `Track` object regardless of source. + +See **[docs/guide_beads.md](guide_beads.md)** for Beads details. + +--- + +## Testing + +### Unit Tests + +- `tests/test_dag_engine.py` — `TrackDAG` cycle detection, ready_tickets, blocking cascade +- `tests/test_execution_engine.py` — mode transitions, approval flow +- `tests/test_worker_pool.py` — concurrency limit, has_capacity, stop +- `tests/test_mma_conductor.py` — track loading, dispatch flow, error handling + +### Integration Tests (live_gui) + +`tests/test_mma_observability_dashboard.py` — drives the dashboard via Hook API. + +### Mocking + +Tests use `unittest.mock.patch` to mock `subprocess.Popen` and `ai_client.send` for hermetic tests. + +--- + +## See Also + +- **[guide_architecture.md](guide_architecture.md)** — Threading model +- **[guide_mma.md](guide_mma.md)** — MMA concepts (4-Tier hierarchy, Token Firewall) +- **[guide_app_controller.md](guide_app_controller.md)** — How the conductor is owned by the controller +- **[guide_models.md](guide_models.md)** — `Ticket` and `Track` data structures +- **`scripts/mma_exec.py`** — The sub-agent entry point +- **`scripts/mma.ps1`** — PowerShell wrapper +- **`conductor/workflow.md`**](../../conductor/workflow.md) — Track execution protocol