21 KiB
src/multi_agent_conductor.py & src/dag_engine.py — MMA Engine
Top | Architecture | Testing | MMA (concepts)
Overview
The MMA (Multi-Model Architecture) engine orchestrates parallel AI worker execution for implementing multi-ticket tracks. Two files:
src/multi_agent_conductor.py(~28KB) — the high-level orchestrator, worker pool, ticket dispatchsrc/dag_engine.py(~10KB) — the DAG resolution, cycle detection, topological sort
Together they implement a non-blocking execution engine with thread-safe state management, configurable concurrency, and programmable execution modes (Auto-Queue vs Step Mode).
Architecture
┌─────────────────────────────────────────────────┐
│ User: "Implement Track X" │
└─────────────────┬───────────────────────────────┘
│ controller.dispatch_mma_track(track_id)
▼
┌─────────────────────────────────────────────────┐
│ MultiAgentConductor │
│ - Loads track from conductor/ directory │
│ - Builds TrackDAG from ticket dependencies │
│ - Detects cycles │
│ - Starts WorkerPool │
└─────────────────┬───────────────────────────────┘
│ enqueues ready tickets
▼
┌─────────────────────────────────────────────────┐
│ WorkerPool │
│ - Configurable concurrency (default 4) │
│ - Threads pull ready tickets, spawn workers │
│ - Workers call mma_exec.py with sub-prompt │
└─────────────────┬───────────────────────────────┘
│ per ticket
▼
┌─────────────────────────────────────────────────┐
│ mma_exec.py (tier3-worker / tier4-qa) │
│ - Stateless Tier 3 or Tier 4 sub-agent │
│ - TDD cycle: red → green → refactor │
│ - Commits per task │
└─────────────────┬───────────────────────────────┘
│ completes
▼
┌─────────────────────────────────────────────────┐
│ Conductor: │
│ - Records ticket result │
│ - Updates DAG (downstream tickets unblocked) │
│ - Emits events to controller.event_queue │
│ - Writes logs to logs/mma_delegation.log │
└─────────────────────────────────────────────────┘
The TrackDAG (in src/dag_engine.py)
The actual TrackDAG is a flat list of Ticket references with an O(1) id→ticket map, not a graph with separate node/edge dicts. The graph structure is implicit in each Ticket.depends_on: list[str].
Data Structure
class TrackDAG:
"""Directed acyclic graph of tickets in a track."""
def __init__(self, tickets: list[Ticket]):
self.tickets = tickets # list[Ticket] (the source of truth)
self.ticket_map = {t.id: t for t in tickets} # dict[str, Ticket] (O(1) lookup)
The Ticket dataclass itself is defined in src/models.py (see guide_models.md). It carries the dependencies and status directly — there is no separate TicketNode wrapper, no edges dict, no reverse_edges dict. Adjacency lists are computed on demand inside cascade_blocks() and topological_sort().
Public Methods (actual signatures from src/dag_engine.py:41-163)
| Method | Returns | Purpose |
|---|---|---|
cascade_blocks() |
None |
BFS-propagate blocked status from currently-blocked tickets to transitive todo dependents (mutates tickets in place) |
is_ticket_ready(t) |
bool |
True if all t.depends_on exist in ticket_map AND have status == 'completed' |
get_ready_tasks() |
list[Ticket] |
All todo tickets whose dependencies are all completed (and whose status hasn't been blocked by cascade_blocks()) |
has_cycle() |
bool |
Iterative DFS cycle detection (returns True/False — NOT a list of cycles) |
topological_sort() |
list[str] (ticket IDs in dep-first order) |
Kahn's algorithm (BFS + in-degree counter); raises ValueError("Dependency cycle detected") if len(result) < len(self.tickets) after the BFS drain |
has_cycle() — Iterative DFS
Implements iterative DFS to avoid recursion overhead. The path set is the iterative equivalent of the recursion stack:
def has_cycle(self) -> bool:
with get_monitor().scope("dag_has_cycle"):
visited = set()
for start_ticket in self.tickets:
if start_ticket.id in visited:
continue
stack = [(start_ticket.id, False)] # (id, is_backtracking)
path = set()
while stack:
node_id, is_backtracking = stack.pop()
if is_backtracking:
path.remove(node_id)
continue
if node_id in path: return True
if node_id in visited: continue
visited.add(node_id)
path.add(node_id)
stack.append((node_id, True))
ticket = self.ticket_map.get(node_id)
if ticket:
for neighbor_id in ticket.depends_on:
stack.append((neighbor_id, False))
return False
topological_sort() — Kahn's Algorithm
BFS-based topological sort. The cycle detection is implicit: after the BFS drain, if not every ticket was emitted, a cycle exists.
def topological_sort(self) -> list[str]:
with get_monitor().scope("dag_topological_sort"):
in_degree = {t.id: len(t.depends_on) for t in self.tickets}
dependents = {t.id: [] for t in self.tickets}
for t in self.tickets:
for dep_id in t.depends_on:
if dep_id in dependents:
dependents[dep_id].append(t.id)
queue = [t.id for t in self.tickets if in_degree[t.id] == 0]
result = []
idx = 0
while idx < len(queue):
u = queue[idx]
idx += 1
result.append(u)
for v_id in dependents.get(u, []):
in_degree[v_id] -= 1
if in_degree[v_id] == 0:
queue.append(v_id)
if len(result) < len(self.tickets):
raise ValueError("Dependency cycle detected")
return result
get_executable_tickets(track) (free function, src/dag_engine.py:165-173)
def get_executable_tickets(track: "Track") -> list[Ticket]:
"""Convenience: returns the ready-to-execute tickets of a Track.
Free function (instead of Track.get_executable_tickets) so that
src/models.py does not need to import TrackDAG at module level,
breaking the models<->dag_engine circular dependency.
"""
return TrackDAG(track.tickets).get_ready_tasks()
Thread Safety
TrackDAG is NOT thread-safe. Callers must synchronize access if used from multiple threads (per the module docstring at src/dag_engine.py:20-22). The ConductorEngine is currently the only caller; the WorkerPool reads from self.engine under the ConductorEngine's own lock discipline.
The ExecutionEngine
The execution engine handles state machine transitions between Auto-Queue and Step Mode.
Execution Modes
class ExecutionMode(Enum):
AUTO_QUEUE = "auto_queue" # Autonomous worker spawning
STEP_MODE = "step_mode" # Explicit manual approval per transition
ExecutionEngine.tick()
The main loop, called by the conductor at a configurable interval (default 100ms):
class ExecutionEngine:
def __init__(self, dag: TrackDAG, mode: ExecutionMode = ExecutionMode.AUTO_QUEUE):
self.dag = dag
self.mode = mode
self.pending_approval: list[str] = [] # Ticket IDs awaiting approval in Step Mode
def tick(self) -> list[str]:
"""Returns list of ticket IDs to dispatch in this tick."""
if self.mode == ExecutionMode.AUTO_QUEUE:
return self.dag.ready_tickets()
elif self.mode == ExecutionMode.STEP_MODE:
# Only return one ticket at a time, requiring explicit approval
if self.pending_approval:
return self.pending_approval[:1]
ready = self.dag.ready_tickets()
if ready:
self.pending_approval = ready[:1]
return self.pending_approval
Programmable Transitions
def set_mode(self, mode: ExecutionMode) -> None:
self.mode = mode
if mode == ExecutionMode.AUTO_QUEUE:
self.pending_approval.clear()
def approve_next(self) -> str | None:
"""Returns the approved ticket ID, or None if nothing pending."""
if not self.pending_approval:
return None
return self.pending_approval.pop(0)
def reject_next(self) -> str | None:
"""Returns the rejected ticket ID, marking it BLOCKED."""
if not self.pending_approval:
return None
tid = self.pending_approval.pop(0)
self.dag.nodes[tid].status = "blocked"
return tid
These are exposed to the GUI as "Step Mode" controls and to the Hook API as set_execution_mode / approve_ticket / reject_ticket.
The MultiAgentConductor (in src/multi_agent_conductor.py)
__init__(self, controller: AppController)
class MultiAgentConductor:
def __init__(self, controller: AppController):
self.controller = controller
self.dag_engine: ExecutionEngine | None = None
self.worker_pool: WorkerPool | None = None
self.current_track: Track | None = None
self.tier_assignments: dict[str, str] = {} # ticket_id -> "tier3-worker"
self.persona_overrides: dict[str, str] = {} # ticket_id -> persona_name
self._stop_event = threading.Event()
self._dispatch_thread: threading.Thread | None = None
load_track(track_id: str) -> Track
Reads conductor/tracks/<track_id>/plan.md and parses the ticket list:
def load_track(self, track_id: str) -> Track:
track_dir = self.controller.paths.tracks_dir / track_id
plan_path = track_dir / "plan.md"
tickets = parse_plan_md(plan_path) # Returns list[dict]
track = Track(id=track_id, tickets=tickets, plan_path=plan_path)
return track
The track is then passed to the DAG engine:
def start(self, track: Track, mode: ExecutionMode = ExecutionMode.AUTO_QUEUE) -> None:
self.current_track = track
self.dag_engine = ExecutionEngine(TrackDAG(track.tickets), mode)
self.worker_pool = WorkerPool(max_concurrency=self.controller.app_state.max_concurrency)
self.worker_pool.start()
self._dispatch_thread = threading.Thread(target=self._dispatch_loop, daemon=True)
self._dispatch_thread.start()
_dispatch_loop (Background Thread)
def _dispatch_loop(self) -> None:
while not self._stop_event.is_set():
ready = self.dag_engine.tick()
for ticket_id in ready:
if self.worker_pool.has_capacity():
self._spawn_worker(ticket_id)
time.sleep(0.1) # 100ms tick
The loop runs in a daemon thread. The main thread can call self.stop() to break out.
_spawn_worker(ticket_id: str)
Spawns a worker via mma_exec.py:
def _spawn_worker(self, ticket_id: str) -> None:
ticket = self.current_track.get_ticket(ticket_id)
role = self.tier_assignments.get(ticket_id, "tier3-worker")
persona = self.persona_overrides.get(ticket_id)
prompt = self._build_worker_prompt(ticket, persona)
context = self._gather_context(ticket)
future = self.worker_pool.submit(role, prompt, context)
future.add_done_callback(lambda f: self._on_worker_done(ticket_id, f))
The WorkerPool is a ThreadPoolExecutor-backed pool with a semaphore for concurrency control.
_on_worker_done(ticket_id, future)
Called when a worker finishes (success or failure):
def _on_worker_done(self, ticket_id: str, future: Future) -> None:
try:
result = future.result()
self.dag_engine.dag.mark_done(ticket_id)
self.dag_engine.dag.nodes[ticket_id].result = result
self._emit_event(MMA_TICKET_COMPLETED, ticket_id, result)
except Exception as e:
self.dag_engine.dag.nodes[ticket_id].status = "blocked"
self.dag_engine.dag.nodes[ticket_id].error = str(e)
self._emit_event(MMA_TICKET_FAILED, ticket_id, str(e))
# Re-evaluate downstream blocking
for downstream in self.dag_engine.dag.reverse_edges[ticket_id]:
self.dag_engine.dag.mark_done(downstream)
_emit_event(type, *args)
Pushes an event to the controller's event_queue for the GUI to consume:
def _emit_event(self, event_type: str, *args) -> None:
self.controller.event_queue.put(Event(type=event_type, args=args, timestamp=time.time()))
The GUI polls controller.event_queue.get_all() once per frame and dispatches to render functions.
The WorkerPool
A thin wrapper around ThreadPoolExecutor with concurrency limiting:
class WorkerPool:
def __init__(self, max_concurrency: int = 4):
self.max_concurrency = max_concurrency
self.semaphore = threading.Semaphore(max_concurrency)
self.executor = ThreadPoolExecutor(max_workers=max_concurrency)
self.active_workers: set[str] = set() # ticket_ids currently running
def submit(self, role: str, prompt: str, context: dict) -> Future:
"""Submit a worker task. Returns a Future for the result."""
def _wrapped():
with self.semaphore:
self.active_workers.add(context["ticket_id"])
try:
return run_mma_worker(role, prompt, context)
finally:
self.active_workers.discard(context["ticket_id"])
return self.executor.submit(_wrapped)
def has_capacity(self) -> bool:
return len(self.active_workers) < self.max_concurrency
def stop(self, wait: bool = True) -> None:
self.executor.shutdown(wait=wait)
run_mma_worker
Invokes mma_exec.py as a subprocess (not in-process) to enforce Context Amnesia — the sub-agent has zero state from the parent:
def run_mma_worker(role: str, prompt: str, context: dict) -> dict:
"""Spawn a fresh tier3-worker sub-agent for the ticket."""
cmd = [
sys.executable, "scripts/mma_exec.py",
"--role", role,
"--ticket-id", context["ticket_id"],
]
# Pipe prompt via stdin
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
payload = json.dumps({"prompt": prompt, "context": context}).encode("utf-8")
stdout, stderr = proc.communicate(payload, timeout=600)
if proc.returncode != 0:
raise RuntimeError(f"Worker failed: {stderr.decode('utf-8')}")
return json.loads(stdout.decode("utf-8"))
This is the Token Firewall in action: each worker is a fresh subprocess with a clean context window, receiving only the prompt and the relevant context slice.
Track Loading
Tracks live in the project's conductor/tracks/ directory:
conductor/tracks/
├── 2026-05-20_initial_setup/
│ ├── plan.md # Ticket list with dependencies
│ ├── decisions.md # Architectural notes
│ └── tasks/
│ ├── 001_init_repo.md
│ ├── 002_install_deps.md
│ └── ...
└── 2026-06-02_command_palette/
├── plan.md
└── ...
plan.md Format
# Phase 1: Foundation
- [ ] Task 1.1: Initialize the project
- [x] Task 1.2: Install dependencies (done)
- [~] Task 1.3: Configure paths (in progress)
# Phase 2: Implementation
- [ ] Task 2.1: Add command palette [depends: 1.3]
- [ ] Task 2.2: Hook API integration [depends: 2.1]
- [ ] Task 2.3: Documentation [depends: 2.2]
The parser extracts:
- Ticket ID (
1.1,1.2, ...) - Title (text after
:) - Status (
[ ]= pending,[~]= in progress,[x]= done,[!]= blocked) - Dependencies (
[depends: ...])
Thread Safety
The conductor uses:
threading.Lockfor_emit_event(queue.put is atomic but logging needs guarding)threading.Eventfor_stop_event(clean shutdown signaling)ThreadPoolExecutorfor worker isolationsubprocess.Popenfor hard isolation (Tier 3/4 sub-agents)
threading.local() Context
The ai_client uses threading.local() to track the source tier of each request (so comms logs can be tagged with the originating tier). The conductor passes the tier name when calling ai_client.send().
# In ai_client.py
_local = threading.local()
def send(prompt, ...):
source = getattr(_local, 'source', 'main')
comms_log(f"[{source}] {prompt[:50]}...")
# In multi_agent_conductor.py
_local.source = "tier3-worker"
ai_client.send(prompt)
Stop & Cleanup
def stop(self) -> None:
"""Stop the conductor. Workers continue to completion or are cancelled."""
self._stop_event.set()
if self._dispatch_thread:
self._dispatch_thread.join(timeout=5)
if self.worker_pool:
self.worker_pool.stop(wait=False)
The conductor is designed for graceful shutdown: in-flight workers complete, no new ones spawn.
Observability
Logging
All conductor activity is logged to logs/mma_delegation.log (JSON-L format):
{
"timestamp": "2026-06-02T12:34:56.789Z",
"event": "ticket_dispatched",
"track_id": "2026-06-02_command_palette",
"ticket_id": "2.1",
"role": "tier3-worker",
"persona": "code-implementer"
}
Per-worker logs are written to logs/agents/<track_id>_<ticket_id>_<role>_<timestamp>.log.
GUI Dashboard
The MMA Observability Dashboard (in gui_2.py) reads from the conductor's state:
- Track list with progress bars
- Active ticket's worker output stream
- DAG visualization (via
imgui-node-editor) - Per-tier strategy streams
Beads Mode Integration
When the project is in Beads mode (.beads/ directory exists), the conductor delegates to the Beads CLI instead of parsing plan.md:
def load_track(self, track_id: str) -> Track:
if self.controller.app_state.use_beads:
from src.beads_client import BeadsClient
client = BeadsClient()
tickets = client.list_tickets(track_id=track_id)
return Track(id=track_id, tickets=tickets)
else:
return self._load_markdown_track(track_id)
The downstream conductor logic is the same — it operates on a Track object regardless of source.
See docs/guide_beads.md for Beads details.
Testing
Unit Tests
tests/test_dag_engine.py—TrackDAGcycle detection, ready_tickets, blocking cascadetests/test_execution_engine.py— mode transitions, approval flowtests/test_worker_pool.py— concurrency limit, has_capacity, stoptests/test_mma_conductor.py— track loading, dispatch flow, error handling
Integration Tests (live_gui)
tests/test_mma_observability_dashboard.py — drives the dashboard via Hook API.
Mocking
Tests use unittest.mock.patch to mock subprocess.Popen and ai_client.send for hermetic tests.
See Also
- guide_architecture.md — Threading model
- guide_mma.md — MMA concepts (4-Tier hierarchy, Token Firewall)
- guide_app_controller.md — How the conductor is owned by the controller
- guide_models.md —
TicketandTrackdata structures scripts/mma_exec.py— The sub-agent entry pointscripts/mma.ps1— PowerShell wrapperconductor/workflow.md](../conductor/workflow.md) — Track execution protocol