Private
Public Access
0
0
Files
manual_slop/docs/guide_multi_agent_conductor.md
T

21 KiB

src/multi_agent_conductor.py & src/dag_engine.py — MMA Engine

Top | Architecture | Testing | MMA (concepts)


Overview

The MMA (Multi-Model Architecture) engine orchestrates parallel AI worker execution for implementing multi-ticket tracks. Two files:

  • src/multi_agent_conductor.py (~28KB) — the high-level orchestrator, worker pool, ticket dispatch
  • src/dag_engine.py (~10KB) — the DAG resolution, cycle detection, topological sort

Together they implement a non-blocking execution engine with thread-safe state management, configurable concurrency, and programmable execution modes (Auto-Queue vs Step Mode).


Architecture

┌─────────────────────────────────────────────────┐
│  User: "Implement Track X"                        │
└─────────────────┬───────────────────────────────┘
                  │ controller.dispatch_mma_track(track_id)
                  ▼
┌─────────────────────────────────────────────────┐
│  MultiAgentConductor                              │
│  - Loads track from conductor/ directory          │
│  - Builds TrackDAG from ticket dependencies       │
│  - Detects cycles                                │
│  - Starts WorkerPool                              │
└─────────────────┬───────────────────────────────┘
                  │ enqueues ready tickets
                  ▼
┌─────────────────────────────────────────────────┐
│  WorkerPool                                       │
│  - Configurable concurrency (default 4)          │
│  - Threads pull ready tickets, spawn workers     │
│  - Workers call mma_exec.py with sub-prompt       │
└─────────────────┬───────────────────────────────┘
                  │ per ticket
                  ▼
┌─────────────────────────────────────────────────┐
│  mma_exec.py (tier3-worker / tier4-qa)            │
│  - Stateless Tier 3 or Tier 4 sub-agent           │
│  - TDD cycle: red → green → refactor             │
│  - Commits per task                               │
└─────────────────┬───────────────────────────────┘
                  │ completes
                  ▼
┌─────────────────────────────────────────────────┐
│  Conductor:                                       │
│  - Records ticket result                          │
│  - Updates DAG (downstream tickets unblocked)     │
│  - Emits events to controller.event_queue         │
│  - Writes logs to logs/mma_delegation.log         │
└─────────────────────────────────────────────────┘

The TrackDAG (in src/dag_engine.py)

The actual TrackDAG is a flat list of Ticket references with an O(1) id→ticket map, not a graph with separate node/edge dicts. The graph structure is implicit in each Ticket.depends_on: list[str].

Data Structure

class TrackDAG:
    """Directed acyclic graph of tickets in a track."""
    def __init__(self, tickets: list[Ticket]):
        self.tickets    = tickets                              # list[Ticket] (the source of truth)
        self.ticket_map = {t.id: t for t in tickets}          # dict[str, Ticket] (O(1) lookup)

The Ticket dataclass itself is defined in src/models.py (see guide_models.md). It carries the dependencies and status directly — there is no separate TicketNode wrapper, no edges dict, no reverse_edges dict. Adjacency lists are computed on demand inside cascade_blocks() and topological_sort().

Public Methods (actual signatures from src/dag_engine.py:41-163)

Method Returns Purpose
cascade_blocks() None BFS-propagate blocked status from currently-blocked tickets to transitive todo dependents (mutates tickets in place)
is_ticket_ready(t) bool True if all t.depends_on exist in ticket_map AND have status == 'completed'
get_ready_tasks() list[Ticket] All todo tickets whose dependencies are all completed (and whose status hasn't been blocked by cascade_blocks())
has_cycle() bool Iterative DFS cycle detection (returns True/False — NOT a list of cycles)
topological_sort() list[str] (ticket IDs in dep-first order) Kahn's algorithm (BFS + in-degree counter); raises ValueError("Dependency cycle detected") if len(result) < len(self.tickets) after the BFS drain

has_cycle() — Iterative DFS

Implements iterative DFS to avoid recursion overhead. The path set is the iterative equivalent of the recursion stack:

def has_cycle(self) -> bool:
    with get_monitor().scope("dag_has_cycle"):
        visited = set()
        for start_ticket in self.tickets:
            if start_ticket.id in visited:
                continue
            stack = [(start_ticket.id, False)]  # (id, is_backtracking)
            path  = set()
            while stack:
                node_id, is_backtracking = stack.pop()
                if is_backtracking:
                    path.remove(node_id)
                    continue
                if node_id in path:    return True
                if node_id in visited: continue
                visited.add(node_id)
                path.add(node_id)
                stack.append((node_id, True))
                ticket = self.ticket_map.get(node_id)
                if ticket:
                    for neighbor_id in ticket.depends_on:
                        stack.append((neighbor_id, False))
        return False

topological_sort() — Kahn's Algorithm

BFS-based topological sort. The cycle detection is implicit: after the BFS drain, if not every ticket was emitted, a cycle exists.

def topological_sort(self) -> list[str]:
    with get_monitor().scope("dag_topological_sort"):
        in_degree  = {t.id: len(t.depends_on) for t in self.tickets}
        dependents = {t.id: [] for t in self.tickets}
        for t in self.tickets:
            for dep_id in t.depends_on:
                if dep_id in dependents:
                    dependents[dep_id].append(t.id)

        queue  = [t.id for t in self.tickets if in_degree[t.id] == 0]
        result = []
        idx    = 0
        while idx < len(queue):
            u    = queue[idx]
            idx += 1
            result.append(u)
            for v_id in dependents.get(u, []):
                in_degree[v_id] -= 1
                if in_degree[v_id] == 0:
                    queue.append(v_id)

        if len(result) < len(self.tickets):
            raise ValueError("Dependency cycle detected")
        return result

get_executable_tickets(track) (free function, src/dag_engine.py:165-173)

def get_executable_tickets(track: "Track") -> list[Ticket]:
    """Convenience: returns the ready-to-execute tickets of a Track.
    Free function (instead of Track.get_executable_tickets) so that
    src/models.py does not need to import TrackDAG at module level,
    breaking the models<->dag_engine circular dependency.
    """
    return TrackDAG(track.tickets).get_ready_tasks()

Thread Safety

TrackDAG is NOT thread-safe. Callers must synchronize access if used from multiple threads (per the module docstring at src/dag_engine.py:20-22). The ConductorEngine is currently the only caller; the WorkerPool reads from self.engine under the ConductorEngine's own lock discipline.


The ExecutionEngine


The ExecutionEngine (in src/dag_engine.py:176-228)

ExecutionEngine is a state machine facade around TrackDAG, not an enum-driven auto/step dispatcher. There is no ExecutionMode enum; the engine takes a single auto_queue: bool flag.

class ExecutionEngine:
    """A state machine that governs the progression of tasks within a TrackDAG.
    Handles automatic queueing and manual task approval."""
    def __init__(self, dag: TrackDAG, auto_queue: bool = False) -> None:
        self.dag        = dag
        self.auto_queue = auto_queue

Methods (actual signatures):

Method Returns Purpose
tick() list[Ticket] Calls dag.cascade_blocks() then dag.get_ready_tasks(). Returns the ready list. Does NOT auto-promote tickets to in_progress.
approve_task(task_id) None Manual transition: ticket.status "todo" → "in_progress" IFF dependencies are met (is_ticket_ready returns True).
update_task_status(task_id, status) None Force-update a ticket's status (e.g. to "completed" or "blocked").

Step Mode / Auto-Queue is implemented at the caller layer (ConductorEngine.run in multi_agent_conductor.py), not inside ExecutionEngine. The auto_queue parameter is consulted there, not in tick(). The ConductorEngine pushes tickets to in_progress based on auto_queue and the per-ticket step_mode flag.


The ConductorEngine (in src/multi_agent_conductor.py:116+)

The actual class is named ConductorEngine, not MultiAgentConductor. It owns the DAG, the engine, and a WorkerPool; pushes state to the GUI; and runs the main async dispatch loop.

__init__(track, event_queue=None, auto_queue=False, max_workers=4)

class ConductorEngine:
    def __init__(
        self,
        track: Track,
        event_queue: Optional[events.AsyncEventQueue] = None,
        auto_queue: bool = False,
        max_workers: int = 4,
    ) -> None:
        self.track = track
        self.event_queue = event_queue
        self.tier_usage = {
            "Tier 1": {"input": 0, "output": 0, "model": "gemini-3.1-pro-preview", "tool_preset": None, "persona": None},
            "Tier 2": {"input": 0, "output": 0, "model": "gemini-3-flash-preview",  "tool_preset": None, "persona": None},
            "Tier 3": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite",   "tool_preset": None, "persona": None},
            "Tier 4": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite",   "tool_preset": None, "persona": None},
        }
        self.dag    = TrackDAG(self.track.tickets)
        self.engine = ExecutionEngine(self.dag, auto_queue=auto_queue)
        self.pool   = WorkerPool(max_workers=max_workers)
        self._workers_lock   = threading.Lock()
        self._active_workers: dict[str, threading.Thread] = {}
        self._abort_events:   dict[str, threading.Event]    = {}
        self._pause_event:    threading.Event               = threading.Event()
        self._tier_usage_lock = threading.Lock()
        self._dirty:          bool = True

max_workers is not read from config.toml by the engine — it's a constructor parameter. The 3 call sites in AppController (src/app_controller.py:4132-4133, 4145-4146, 4223-4224) all read [mma].max_workers from TOML and pass it in. Default is 4.

Key methods

Method Returns Purpose
update_usage(tier, input_tokens, output_tokens) None Cumulative token accounting per tier (under _tier_usage_lock)
pause() / resume() None Set/clear _pause_event
approve_task(task_id) None Delegates to engine.approve_task; sets _dirty = True
update_task_status(task_id, status) None Delegates to engine.update_task_status; sets _dirty = True
kill_worker(ticket_id) None Sets the per-ticket abort event; joins the thread with 1.0s timeout
_push_state(status, active_tier) None Builds a payload dict and await event_queue.put("mma_state_update", payload)
parse_json_tickets(json_str) list[dict] Parses Tier 2 LLM output into a list of ticket dicts (the ingestion path)
run() (async coroutine) The main async dispatch loop; see below

run() — The Main Async Dispatch Loop

The actual run() is an async coroutine that runs the dispatch loop, NOT a _dispatch_loop background thread. It uses asyncio and loop.run_in_executor to bridge to the blocking run_worker_lifecycle call.

_push_state() payload shape

async def _push_state(self, status: str = "running", active_tier: str = None) -> None:
    if not self.event_queue:
        return
    payload = {
        "status": status,
        "active_tier": active_tier,
        "tier_usage": self.tier_usage,
        "track": {"id": self.track.id, "title": self.track.description},
        "tickets": [asdict(t) for t in self.track.tickets],
    }
    await self.event_queue.put("mma_state_update", payload)

The WorkerPool (in src/multi_agent_conductor.py:50-114)

A dict[str, Thread] + threading.Lock + threading.Semaphore, NOT a ThreadPoolExecutor wrapper.

class WorkerPool:
    def __init__(self, max_workers: int = 4):
        self.max_workers = max_workers
        self._active:    dict[str, threading.Thread] = {}
        self._lock       = threading.Lock()
        self._semaphore  = threading.Semaphore(max_workers)

Methods (actual signatures):

Method Returns Purpose
spawn(ticket_id, target, args=()) None Spawns a daemon thread running target(*args) if the pool has capacity. No-op if full. Tracks the thread in self._active[ticket_id].
join_all(timeout=None) None Joins every active thread.
get_active_count() int Returns len(self._active).
is_full() bool True if get_active_count() >= self.max_workers.

Concurrency control is a Semaphore, not a thread-pool executor. The semaphore is acquired in spawn and released in the spawned thread's _run wrapper. self._active is the source of truth for get_active_count/is_full; the semaphore prevents over-spawning.


Sub-Agent Invocation (mma_exec.py)

The ConductorEngine does not spawn mma_exec.py directly. Sub-agent invocation is a synchronous CLI bridge at scripts/mma_exec.py invoked from a Tier 3 worker (see conductor/workflow.md "MMA Bridge" section). Each sub-agent is invoked via:

uv run python scripts/mma_exec.py --role tier3-worker "[PROMPT]"

The --role flag selects between tier1-orchestrator, tier2-tech-lead, tier3-worker, and tier4-qa. Sub-agents receive context via stdin (or as additional CLI args) and exit after one round-trip. The actual prompt construction lives in run_worker_lifecycle at src/multi_agent_conductor.py (the free function referenced by both ConductorEngine.run and the worker spawn flow).

The "Token Firewall" effect — each worker starts with a clean context window — is achieved by the ai_client.reset_session() call at the start of run_worker_lifecycle (see guide_mma.md "Context Amnesia").

Track Loading

Tracks live in the project's conductor/tracks/ directory:

conductor/tracks/
├── 2026-05-20_initial_setup/
│   ├── plan.md             # Ticket list with dependencies
│   ├── decisions.md        # Architectural notes
│   └── tasks/
│       ├── 001_init_repo.md
│       ├── 002_install_deps.md
│       └── ...
└── 2026-06-02_command_palette/
    ├── plan.md
    └── ...

plan.md Format

# Phase 1: Foundation
- [ ] Task 1.1: Initialize the project
- [x] Task 1.2: Install dependencies (done)
- [~] Task 1.3: Configure paths (in progress)

# Phase 2: Implementation
- [ ] Task 2.1: Add command palette [depends: 1.3]
- [ ] Task 2.2: Hook API integration [depends: 2.1]
- [ ] Task 2.3: Documentation [depends: 2.2]

The parser extracts:

  • Ticket ID (1.1, 1.2, ...)
  • Title (text after : )
  • Status ([ ] = pending, [~] = in progress, [x] = done, [!] = blocked)
  • Dependencies ([depends: ...])

Thread Safety

The conductor uses:

  • threading.Lock for _emit_event (queue.put is atomic but logging needs guarding)
  • threading.Event for _stop_event (clean shutdown signaling)
  • ThreadPoolExecutor for worker isolation
  • subprocess.Popen for hard isolation (Tier 3/4 sub-agents)

threading.local() Context

The ai_client uses threading.local() to track the source tier of each request (so comms logs can be tagged with the originating tier). The conductor passes the tier name when calling ai_client.send().

# In ai_client.py
_local = threading.local()

def send(prompt, ...):
    source = getattr(_local, 'source', 'main')
    comms_log(f"[{source}] {prompt[:50]}...")

# In multi_agent_conductor.py
_local.source = "tier3-worker"
ai_client.send(prompt)

Stop & Cleanup

def stop(self) -> None:
    """Stop the conductor. Workers continue to completion or are cancelled."""
    self._stop_event.set()
    if self._dispatch_thread:
        self._dispatch_thread.join(timeout=5)
    if self.worker_pool:
        self.worker_pool.stop(wait=False)

The conductor is designed for graceful shutdown: in-flight workers complete, no new ones spawn.


Observability

Logging

All conductor activity is logged to logs/mma_delegation.log (JSON-L format):

{
    "timestamp": "2026-06-02T12:34:56.789Z",
    "event": "ticket_dispatched",
    "track_id": "2026-06-02_command_palette",
    "ticket_id": "2.1",
    "role": "tier3-worker",
    "persona": "code-implementer"
}

Per-worker logs are written to logs/agents/<track_id>_<ticket_id>_<role>_<timestamp>.log.

GUI Dashboard

The MMA Observability Dashboard (in gui_2.py) reads from the conductor's state:

  • Track list with progress bars
  • Active ticket's worker output stream
  • DAG visualization (via imgui-node-editor)
  • Per-tier strategy streams

Beads Mode Integration

When the project is in Beads mode (.beads/ directory exists), the conductor delegates to the Beads CLI instead of parsing plan.md:

def load_track(self, track_id: str) -> Track:
    if self.controller.app_state.use_beads:
        from src.beads_client import BeadsClient
        client = BeadsClient()
        tickets = client.list_tickets(track_id=track_id)
        return Track(id=track_id, tickets=tickets)
    else:
        return self._load_markdown_track(track_id)

The downstream conductor logic is the same — it operates on a Track object regardless of source.

See docs/guide_beads.md for Beads details.


Testing

Unit Tests

  • tests/test_dag_engine.pyTrackDAG cycle detection, ready_tickets, blocking cascade
  • tests/test_execution_engine.py — mode transitions, approval flow
  • tests/test_worker_pool.py — concurrency limit, has_capacity, stop
  • tests/test_mma_conductor.py — track loading, dispatch flow, error handling

Integration Tests (live_gui)

tests/test_mma_observability_dashboard.py — drives the dashboard via Hook API.

Mocking

Tests use unittest.mock.patch to mock subprocess.Popen and ai_client.send for hermetic tests.


See Also

  • guide_architecture.md — Threading model
  • guide_mma.md — MMA concepts (4-Tier hierarchy, Token Firewall)
  • guide_app_controller.md — How the conductor is owned by the controller
  • guide_models.mdTicket and Track data structures
  • scripts/mma_exec.py — The sub-agent entry point
  • scripts/mma.ps1 — PowerShell wrapper
  • conductor/workflow.md](../conductor/workflow.md) — Track execution protocol