Private
Public Access
0
0
Files
manual_slop/docs/guide_multi_agent_conductor.md
T

20 KiB

src/multi_agent_conductor.py & src/dag_engine.py — MMA Engine

Top | Architecture | Testing | MMA (concepts)


Overview

The MMA (Multi-Model Architecture) engine orchestrates parallel AI worker execution for implementing multi-ticket tracks. Two files:

  • src/multi_agent_conductor.py (~28KB) — the high-level orchestrator, worker pool, ticket dispatch
  • src/dag_engine.py (~10KB) — the DAG resolution, cycle detection, topological sort

Together they implement a non-blocking execution engine with thread-safe state management, configurable concurrency, and programmable execution modes (Auto-Queue vs Step Mode).


Architecture

┌─────────────────────────────────────────────────┐
│  User: "Implement Track X"                        │
└─────────────────┬───────────────────────────────┘
                  │ controller.dispatch_mma_track(track_id)
                  ▼
┌─────────────────────────────────────────────────┐
│  MultiAgentConductor                              │
│  - Loads track from conductor/ directory          │
│  - Builds TrackDAG from ticket dependencies       │
│  - Detects cycles                                │
│  - Starts WorkerPool                              │
└─────────────────┬───────────────────────────────┘
                  │ enqueues ready tickets
                  ▼
┌─────────────────────────────────────────────────┐
│  WorkerPool                                       │
│  - Configurable concurrency (default 4)          │
│  - Threads pull ready tickets, spawn workers     │
│  - Workers call mma_exec.py with sub-prompt       │
└─────────────────┬───────────────────────────────┘
                  │ per ticket
                  ▼
┌─────────────────────────────────────────────────┐
│  mma_exec.py (tier3-worker / tier4-qa)            │
│  - Stateless Tier 3 or Tier 4 sub-agent           │
│  - TDD cycle: red → green → refactor             │
│  - Commits per task                               │
└─────────────────┬───────────────────────────────┘
                  │ completes
                  ▼
┌─────────────────────────────────────────────────┐
│  Conductor:                                       │
│  - Records ticket result                          │
│  - Updates DAG (downstream tickets unblocked)     │
│  - Emits events to controller.event_queue         │
│  - Writes logs to logs/mma_delegation.log         │
└─────────────────────────────────────────────────┘

The TrackDAG (in src/dag_engine.py)

The TrackDAG class holds the ticket dependency graph for a single track.

Data Structures

class TrackDAG:
    """Directed acyclic graph of tickets in a track."""
    def __init__(self, tickets: list[dict]):
        # ticket_id -> {status, depends_on[], blocks[]}
        self.nodes: dict[str, TicketNode] = {}
        # ticket_id -> list of ticket_ids it depends on
        self.edges: dict[str, set[str]] = {}
        # reverse: ticket_id -> list of ticket_ids that depend on it
        self.reverse_edges: dict[str, set[str]] = {}

    def ready_tickets(self) -> list[str]:
        """Tickets with all dependencies DONE and status PENDING."""

    def is_blocked(self, ticket_id: str) -> bool:
        """Cascades blocked status from upstream."""

    def mark_done(self, ticket_id: str) -> None:
        """Update downstream tickets' blocked status."""

    def detect_cycles(self) -> list[list[str]] | None:
        """Returns list of cycles (each cycle = list of ticket_ids), or None if acyclic."""

TicketNode

@dataclass
class TicketNode:
    ticket_id: str
    status: Literal["pending", "running", "done", "blocked", "skipped"]
    priority: Literal["high", "medium", "low"] = "medium"
    depends_on: set[str] = field(default_factory=set)
    blocks: set[str] = field(default_factory=set)
    result: dict | None = None  # Populated when done
    error: str | None = None

detect_cycles

Implements iterative DFS to avoid recursion overhead:

def detect_cycles(self) -> list[list[str]] | None:
    """Iterative DFS cycle detection. O(V+E)."""
    WHITE, GRAY, BLACK = 0, 1, 2
    color = {tid: WHITE for tid in self.nodes}
    parent = {tid: None for tid in self.nodes}
    stack = []
    cycles = []

    for start in self.nodes:
        if color[start] != WHITE:
            continue
        stack.append((start, iter(self.edges[start])))
        while stack:
            node, children = stack[-1]
            try:
                child = next(children)
                if color[child] == GRAY:
                    # Back edge: cycle found
                    cycle = [child]
                    while node != child:
                        cycle.append(node)
                        node = parent[node]
                    cycle.append(child)
                    cycles.append(list(reversed(cycle)))
                elif color[child] == WHITE:
                    color[child] = GRAY
                    parent[child] = node
                    stack.append((child, iter(self.edges[child])))
            except StopIteration:
                color[node] = BLACK
                stack.pop()
    return cycles if cycles else None

Returns None for an acyclic graph, or a list of cycles for debugging.

ready_tickets (Kahn's Algorithm Variant)

Returns the set of tickets that are PENDING and have all dependencies DONE.

def ready_tickets(self) -> list[str]:
    ready = []
    for tid, node in self.nodes.items():
        if node.status != "pending":
            continue
        if all(self.nodes[dep].status == "done" for dep in node.depends_on):
            ready.append(tid)
    return ready

Sorted by priority (high > medium > low) for deterministic dispatch order.

is_blocked (Transitive Blocking Propagation)

When a ticket is BLOCKED, all downstream tickets are also BLOCKED:

def is_blocked(self, ticket_id: str) -> bool:
    """Cascades blocked status from upstream (any failed/skipped/blocked dep)."""
    for dep in self.nodes[ticket_id].depends_on:
        if self.nodes[dep].status in ("blocked", "skipped"):
            return True
        if self.is_blocked(dep):  # Recursive cascade
            return True
    return False

This prevents execution stalls when an upstream ticket is blocked.


The ExecutionEngine

The execution engine handles state machine transitions between Auto-Queue and Step Mode.

Execution Modes

class ExecutionMode(Enum):
    AUTO_QUEUE = "auto_queue"      # Autonomous worker spawning
    STEP_MODE = "step_mode"        # Explicit manual approval per transition

ExecutionEngine.tick()

The main loop, called by the conductor at a configurable interval (default 100ms):

class ExecutionEngine:
    def __init__(self, dag: TrackDAG, mode: ExecutionMode = ExecutionMode.AUTO_QUEUE):
        self.dag = dag
        self.mode = mode
        self.pending_approval: list[str] = []  # Ticket IDs awaiting approval in Step Mode

    def tick(self) -> list[str]:
        """Returns list of ticket IDs to dispatch in this tick."""
        if self.mode == ExecutionMode.AUTO_QUEUE:
            return self.dag.ready_tickets()
        elif self.mode == ExecutionMode.STEP_MODE:
            # Only return one ticket at a time, requiring explicit approval
            if self.pending_approval:
                return self.pending_approval[:1]
            ready = self.dag.ready_tickets()
            if ready:
                self.pending_approval = ready[:1]
            return self.pending_approval

Programmable Transitions

def set_mode(self, mode: ExecutionMode) -> None:
    self.mode = mode
    if mode == ExecutionMode.AUTO_QUEUE:
        self.pending_approval.clear()

def approve_next(self) -> str | None:
    """Returns the approved ticket ID, or None if nothing pending."""
    if not self.pending_approval:
        return None
    return self.pending_approval.pop(0)

def reject_next(self) -> str | None:
    """Returns the rejected ticket ID, marking it BLOCKED."""
    if not self.pending_approval:
        return None
    tid = self.pending_approval.pop(0)
    self.dag.nodes[tid].status = "blocked"
    return tid

These are exposed to the GUI as "Step Mode" controls and to the Hook API as set_execution_mode / approve_ticket / reject_ticket.


The MultiAgentConductor (in src/multi_agent_conductor.py)

__init__(self, controller: AppController)

class MultiAgentConductor:
    def __init__(self, controller: AppController):
        self.controller = controller
        self.dag_engine: ExecutionEngine | None = None
        self.worker_pool: WorkerPool | None = None
        self.current_track: Track | None = None
        self.tier_assignments: dict[str, str] = {}  # ticket_id -> "tier3-worker"
        self.persona_overrides: dict[str, str] = {}  # ticket_id -> persona_name
        self._stop_event = threading.Event()
        self._dispatch_thread: threading.Thread | None = None

load_track(track_id: str) -> Track

Reads conductor/tracks/<track_id>/plan.md and parses the ticket list:

def load_track(self, track_id: str) -> Track:
    track_dir = self.controller.paths.tracks_dir / track_id
    plan_path = track_dir / "plan.md"
    tickets = parse_plan_md(plan_path)  # Returns list[dict]
    track = Track(id=track_id, tickets=tickets, plan_path=plan_path)
    return track

The track is then passed to the DAG engine:

def start(self, track: Track, mode: ExecutionMode = ExecutionMode.AUTO_QUEUE) -> None:
    self.current_track = track
    self.dag_engine = ExecutionEngine(TrackDAG(track.tickets), mode)
    self.worker_pool = WorkerPool(max_concurrency=self.controller.app_state.max_concurrency)
    self.worker_pool.start()
    self._dispatch_thread = threading.Thread(target=self._dispatch_loop, daemon=True)
    self._dispatch_thread.start()

_dispatch_loop (Background Thread)

def _dispatch_loop(self) -> None:
    while not self._stop_event.is_set():
        ready = self.dag_engine.tick()
        for ticket_id in ready:
            if self.worker_pool.has_capacity():
                self._spawn_worker(ticket_id)
        time.sleep(0.1)  # 100ms tick

The loop runs in a daemon thread. The main thread can call self.stop() to break out.

_spawn_worker(ticket_id: str)

Spawns a worker via mma_exec.py:

def _spawn_worker(self, ticket_id: str) -> None:
    ticket = self.current_track.get_ticket(ticket_id)
    role = self.tier_assignments.get(ticket_id, "tier3-worker")
    persona = self.persona_overrides.get(ticket_id)

    prompt = self._build_worker_prompt(ticket, persona)
    context = self._gather_context(ticket)

    future = self.worker_pool.submit(role, prompt, context)
    future.add_done_callback(lambda f: self._on_worker_done(ticket_id, f))

The WorkerPool is a ThreadPoolExecutor-backed pool with a semaphore for concurrency control.

_on_worker_done(ticket_id, future)

Called when a worker finishes (success or failure):

def _on_worker_done(self, ticket_id: str, future: Future) -> None:
    try:
        result = future.result()
        self.dag_engine.dag.mark_done(ticket_id)
        self.dag_engine.dag.nodes[ticket_id].result = result
        self._emit_event(MMA_TICKET_COMPLETED, ticket_id, result)
    except Exception as e:
        self.dag_engine.dag.nodes[ticket_id].status = "blocked"
        self.dag_engine.dag.nodes[ticket_id].error = str(e)
        self._emit_event(MMA_TICKET_FAILED, ticket_id, str(e))
        # Re-evaluate downstream blocking
        for downstream in self.dag_engine.dag.reverse_edges[ticket_id]:
            self.dag_engine.dag.mark_done(downstream)

_emit_event(type, *args)

Pushes an event to the controller's event_queue for the GUI to consume:

def _emit_event(self, event_type: str, *args) -> None:
    self.controller.event_queue.put(Event(type=event_type, args=args, timestamp=time.time()))

The GUI polls controller.event_queue.get_all() once per frame and dispatches to render functions.


The WorkerPool

A thin wrapper around ThreadPoolExecutor with concurrency limiting:

class WorkerPool:
    def __init__(self, max_concurrency: int = 4):
        self.max_concurrency = max_concurrency
        self.semaphore = threading.Semaphore(max_concurrency)
        self.executor = ThreadPoolExecutor(max_workers=max_concurrency)
        self.active_workers: set[str] = set()  # ticket_ids currently running

    def submit(self, role: str, prompt: str, context: dict) -> Future:
        """Submit a worker task. Returns a Future for the result."""
        def _wrapped():
            with self.semaphore:
                self.active_workers.add(context["ticket_id"])
                try:
                    return run_mma_worker(role, prompt, context)
                finally:
                    self.active_workers.discard(context["ticket_id"])
        return self.executor.submit(_wrapped)

    def has_capacity(self) -> bool:
        return len(self.active_workers) < self.max_concurrency

    def stop(self, wait: bool = True) -> None:
        self.executor.shutdown(wait=wait)

run_mma_worker

Invokes mma_exec.py as a subprocess (not in-process) to enforce Context Amnesia — the sub-agent has zero state from the parent:

def run_mma_worker(role: str, prompt: str, context: dict) -> dict:
    """Spawn a fresh tier3-worker sub-agent for the ticket."""
    cmd = [
        sys.executable, "scripts/mma_exec.py",
        "--role", role,
        "--ticket-id", context["ticket_id"],
    ]
    # Pipe prompt via stdin
    proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    payload = json.dumps({"prompt": prompt, "context": context}).encode("utf-8")
    stdout, stderr = proc.communicate(payload, timeout=600)
    if proc.returncode != 0:
        raise RuntimeError(f"Worker failed: {stderr.decode('utf-8')}")
    return json.loads(stdout.decode("utf-8"))

This is the Token Firewall in action: each worker is a fresh subprocess with a clean context window, receiving only the prompt and the relevant context slice.


Track Loading

Tracks live in the project's conductor/tracks/ directory:

conductor/tracks/
├── 2026-05-20_initial_setup/
│   ├── plan.md             # Ticket list with dependencies
│   ├── decisions.md        # Architectural notes
│   └── tasks/
│       ├── 001_init_repo.md
│       ├── 002_install_deps.md
│       └── ...
└── 2026-06-02_command_palette/
    ├── plan.md
    └── ...

plan.md Format

# Phase 1: Foundation
- [ ] Task 1.1: Initialize the project
- [x] Task 1.2: Install dependencies (done)
- [~] Task 1.3: Configure paths (in progress)

# Phase 2: Implementation
- [ ] Task 2.1: Add command palette [depends: 1.3]
- [ ] Task 2.2: Hook API integration [depends: 2.1]
- [ ] Task 2.3: Documentation [depends: 2.2]

The parser extracts:

  • Ticket ID (1.1, 1.2, ...)
  • Title (text after : )
  • Status ([ ] = pending, [~] = in progress, [x] = done, [!] = blocked)
  • Dependencies ([depends: ...])

Thread Safety

The conductor uses:

  • threading.Lock for _emit_event (queue.put is atomic but logging needs guarding)
  • threading.Event for _stop_event (clean shutdown signaling)
  • ThreadPoolExecutor for worker isolation
  • subprocess.Popen for hard isolation (Tier 3/4 sub-agents)

threading.local() Context

The ai_client uses threading.local() to track the source tier of each request (so comms logs can be tagged with the originating tier). The conductor passes the tier name when calling ai_client.send().

# In ai_client.py
_local = threading.local()

def send(prompt, ...):
    source = getattr(_local, 'source', 'main')
    comms_log(f"[{source}] {prompt[:50]}...")

# In multi_agent_conductor.py
_local.source = "tier3-worker"
ai_client.send(prompt)

Stop & Cleanup

def stop(self) -> None:
    """Stop the conductor. Workers continue to completion or are cancelled."""
    self._stop_event.set()
    if self._dispatch_thread:
        self._dispatch_thread.join(timeout=5)
    if self.worker_pool:
        self.worker_pool.stop(wait=False)

The conductor is designed for graceful shutdown: in-flight workers complete, no new ones spawn.


Observability

Logging

All conductor activity is logged to logs/mma_delegation.log (JSON-L format):

{
    "timestamp": "2026-06-02T12:34:56.789Z",
    "event": "ticket_dispatched",
    "track_id": "2026-06-02_command_palette",
    "ticket_id": "2.1",
    "role": "tier3-worker",
    "persona": "code-implementer"
}

Per-worker logs are written to logs/agents/<track_id>_<ticket_id>_<role>_<timestamp>.log.

GUI Dashboard

The MMA Observability Dashboard (in gui_2.py) reads from the conductor's state:

  • Track list with progress bars
  • Active ticket's worker output stream
  • DAG visualization (via imgui-node-editor)
  • Per-tier strategy streams

Beads Mode Integration

When the project is in Beads mode (.beads/ directory exists), the conductor delegates to the Beads CLI instead of parsing plan.md:

def load_track(self, track_id: str) -> Track:
    if self.controller.app_state.use_beads:
        from src.beads_client import BeadsClient
        client = BeadsClient()
        tickets = client.list_tickets(track_id=track_id)
        return Track(id=track_id, tickets=tickets)
    else:
        return self._load_markdown_track(track_id)

The downstream conductor logic is the same — it operates on a Track object regardless of source.

See docs/guide_beads.md for Beads details.


Testing

Unit Tests

  • tests/test_dag_engine.pyTrackDAG cycle detection, ready_tickets, blocking cascade
  • tests/test_execution_engine.py — mode transitions, approval flow
  • tests/test_worker_pool.py — concurrency limit, has_capacity, stop
  • tests/test_mma_conductor.py — track loading, dispatch flow, error handling

Integration Tests (live_gui)

tests/test_mma_observability_dashboard.py — drives the dashboard via Hook API.

Mocking

Tests use unittest.mock.patch to mock subprocess.Popen and ai_client.send for hermetic tests.


See Also

  • guide_architecture.md — Threading model
  • guide_mma.md — MMA concepts (4-Tier hierarchy, Token Firewall)
  • guide_app_controller.md — How the conductor is owned by the controller
  • guide_models.mdTicket and Track data structures
  • scripts/mma_exec.py — The sub-agent entry point
  • scripts/mma.ps1 — PowerShell wrapper
  • conductor/workflow.md](../../conductor/workflow.md) — Track execution protocol