21 KiB
src/multi_agent_conductor.py & src/dag_engine.py — MMA Engine
Top | Architecture | Testing | MMA (concepts)
Overview
The MMA (Multi-Model Architecture) engine orchestrates parallel AI worker execution for implementing multi-ticket tracks. Two files:
src/multi_agent_conductor.py(~28KB) — the high-level orchestrator, worker pool, ticket dispatchsrc/dag_engine.py(~10KB) — the DAG resolution, cycle detection, topological sort
Together they implement a non-blocking execution engine with thread-safe state management, configurable concurrency, and programmable execution modes (Auto-Queue vs Step Mode).
Architecture
┌─────────────────────────────────────────────────┐
│ User: "Implement Track X" │
└─────────────────┬───────────────────────────────┘
│ controller.dispatch_mma_track(track_id)
▼
┌─────────────────────────────────────────────────┐
│ MultiAgentConductor │
│ - Loads track from conductor/ directory │
│ - Builds TrackDAG from ticket dependencies │
│ - Detects cycles │
│ - Starts WorkerPool │
└─────────────────┬───────────────────────────────┘
│ enqueues ready tickets
▼
┌─────────────────────────────────────────────────┐
│ WorkerPool │
│ - Configurable concurrency (default 4) │
│ - Threads pull ready tickets, spawn workers │
│ - Workers call mma_exec.py with sub-prompt │
└─────────────────┬───────────────────────────────┘
│ per ticket
▼
┌─────────────────────────────────────────────────┐
│ mma_exec.py (tier3-worker / tier4-qa) │
│ - Stateless Tier 3 or Tier 4 sub-agent │
│ - TDD cycle: red → green → refactor │
│ - Commits per task │
└─────────────────┬───────────────────────────────┘
│ completes
▼
┌─────────────────────────────────────────────────┐
│ Conductor: │
│ - Records ticket result │
│ - Updates DAG (downstream tickets unblocked) │
│ - Emits events to controller.event_queue │
│ - Writes logs to logs/mma_delegation.log │
└─────────────────────────────────────────────────┘
The TrackDAG (in src/dag_engine.py)
The actual TrackDAG is a flat list of Ticket references with an O(1) id→ticket map, not a graph with separate node/edge dicts. The graph structure is implicit in each Ticket.depends_on: list[str].
Data Structure
class TrackDAG:
"""Directed acyclic graph of tickets in a track."""
def __init__(self, tickets: list[Ticket]):
self.tickets = tickets # list[Ticket] (the source of truth)
self.ticket_map = {t.id: t for t in tickets} # dict[str, Ticket] (O(1) lookup)
The Ticket dataclass itself is defined in src/models.py (see guide_models.md). It carries the dependencies and status directly — there is no separate TicketNode wrapper, no edges dict, no reverse_edges dict. Adjacency lists are computed on demand inside cascade_blocks() and topological_sort().
Public Methods (actual signatures from src/dag_engine.py:41-163)
| Method | Returns | Purpose |
|---|---|---|
cascade_blocks() |
None |
BFS-propagate blocked status from currently-blocked tickets to transitive todo dependents (mutates tickets in place) |
is_ticket_ready(t) |
bool |
True if all t.depends_on exist in ticket_map AND have status == 'completed' |
get_ready_tasks() |
list[Ticket] |
All todo tickets whose dependencies are all completed (and whose status hasn't been blocked by cascade_blocks()) |
has_cycle() |
bool |
Iterative DFS cycle detection (returns True/False — NOT a list of cycles) |
topological_sort() |
list[str] (ticket IDs in dep-first order) |
Kahn's algorithm (BFS + in-degree counter); raises ValueError("Dependency cycle detected") if len(result) < len(self.tickets) after the BFS drain |
has_cycle() — Iterative DFS
Implements iterative DFS to avoid recursion overhead. The path set is the iterative equivalent of the recursion stack:
def has_cycle(self) -> bool:
with get_monitor().scope("dag_has_cycle"):
visited = set()
for start_ticket in self.tickets:
if start_ticket.id in visited:
continue
stack = [(start_ticket.id, False)] # (id, is_backtracking)
path = set()
while stack:
node_id, is_backtracking = stack.pop()
if is_backtracking:
path.remove(node_id)
continue
if node_id in path: return True
if node_id in visited: continue
visited.add(node_id)
path.add(node_id)
stack.append((node_id, True))
ticket = self.ticket_map.get(node_id)
if ticket:
for neighbor_id in ticket.depends_on:
stack.append((neighbor_id, False))
return False
topological_sort() — Kahn's Algorithm
BFS-based topological sort. The cycle detection is implicit: after the BFS drain, if not every ticket was emitted, a cycle exists.
def topological_sort(self) -> list[str]:
with get_monitor().scope("dag_topological_sort"):
in_degree = {t.id: len(t.depends_on) for t in self.tickets}
dependents = {t.id: [] for t in self.tickets}
for t in self.tickets:
for dep_id in t.depends_on:
if dep_id in dependents:
dependents[dep_id].append(t.id)
queue = [t.id for t in self.tickets if in_degree[t.id] == 0]
result = []
idx = 0
while idx < len(queue):
u = queue[idx]
idx += 1
result.append(u)
for v_id in dependents.get(u, []):
in_degree[v_id] -= 1
if in_degree[v_id] == 0:
queue.append(v_id)
if len(result) < len(self.tickets):
raise ValueError("Dependency cycle detected")
return result
get_executable_tickets(track) (free function, src/dag_engine.py:165-173)
def get_executable_tickets(track: "Track") -> list[Ticket]:
"""Convenience: returns the ready-to-execute tickets of a Track.
Free function (instead of Track.get_executable_tickets) so that
src/models.py does not need to import TrackDAG at module level,
breaking the models<->dag_engine circular dependency.
"""
return TrackDAG(track.tickets).get_ready_tasks()
Thread Safety
TrackDAG is NOT thread-safe. Callers must synchronize access if used from multiple threads (per the module docstring at src/dag_engine.py:20-22). The ConductorEngine is currently the only caller; the WorkerPool reads from self.engine under the ConductorEngine's own lock discipline.
The ExecutionEngine
The ExecutionEngine (in src/dag_engine.py:176-228)
ExecutionEngine is a state machine facade around TrackDAG, not an enum-driven auto/step dispatcher. There is no ExecutionMode enum; the engine takes a single auto_queue: bool flag.
class ExecutionEngine:
"""A state machine that governs the progression of tasks within a TrackDAG.
Handles automatic queueing and manual task approval."""
def __init__(self, dag: TrackDAG, auto_queue: bool = False) -> None:
self.dag = dag
self.auto_queue = auto_queue
Methods (actual signatures):
| Method | Returns | Purpose |
|---|---|---|
tick() |
list[Ticket] |
Calls dag.cascade_blocks() then dag.get_ready_tasks(). Returns the ready list. Does NOT auto-promote tickets to in_progress. |
approve_task(task_id) |
None |
Manual transition: ticket.status "todo" → "in_progress" IFF dependencies are met (is_ticket_ready returns True). |
update_task_status(task_id, status) |
None |
Force-update a ticket's status (e.g. to "completed" or "blocked"). |
Step Mode / Auto-Queue is implemented at the caller layer (ConductorEngine.run in multi_agent_conductor.py), not inside ExecutionEngine. The auto_queue parameter is consulted there, not in tick(). The ConductorEngine pushes tickets to in_progress based on auto_queue and the per-ticket step_mode flag.
The ConductorEngine (in src/multi_agent_conductor.py:116+)
The actual class is named ConductorEngine, not MultiAgentConductor. It owns the DAG, the engine, and a WorkerPool; pushes state to the GUI; and runs the main async dispatch loop.
__init__(track, event_queue=None, auto_queue=False, max_workers=4)
class ConductorEngine:
def __init__(
self,
track: Track,
event_queue: Optional[events.AsyncEventQueue] = None,
auto_queue: bool = False,
max_workers: int = 4,
) -> None:
self.track = track
self.event_queue = event_queue
self.tier_usage = {
"Tier 1": {"input": 0, "output": 0, "model": "gemini-3.1-pro-preview", "tool_preset": None, "persona": None},
"Tier 2": {"input": 0, "output": 0, "model": "gemini-3-flash-preview", "tool_preset": None, "persona": None},
"Tier 3": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite", "tool_preset": None, "persona": None},
"Tier 4": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite", "tool_preset": None, "persona": None},
}
self.dag = TrackDAG(self.track.tickets)
self.engine = ExecutionEngine(self.dag, auto_queue=auto_queue)
self.pool = WorkerPool(max_workers=max_workers)
self._workers_lock = threading.Lock()
self._active_workers: dict[str, threading.Thread] = {}
self._abort_events: dict[str, threading.Event] = {}
self._pause_event: threading.Event = threading.Event()
self._tier_usage_lock = threading.Lock()
self._dirty: bool = True
max_workers is not read from config.toml by the engine — it's a constructor parameter. The 3 call sites in AppController (src/app_controller.py:4132-4133, 4145-4146, 4223-4224) all read [mma].max_workers from TOML and pass it in. Default is 4.
Key methods
| Method | Returns | Purpose |
|---|---|---|
update_usage(tier, input_tokens, output_tokens) |
None |
Cumulative token accounting per tier (under _tier_usage_lock) |
pause() / resume() |
None |
Set/clear _pause_event |
approve_task(task_id) |
None |
Delegates to engine.approve_task; sets _dirty = True |
update_task_status(task_id, status) |
None |
Delegates to engine.update_task_status; sets _dirty = True |
kill_worker(ticket_id) |
None |
Sets the per-ticket abort event; joins the thread with 1.0s timeout |
_push_state(status, active_tier) |
None |
Builds a payload dict and await event_queue.put("mma_state_update", payload) |
parse_json_tickets(json_str) |
list[dict] |
Parses Tier 2 LLM output into a list of ticket dicts (the ingestion path) |
run() |
(async coroutine) | The main async dispatch loop; see below |
run() — The Main Async Dispatch Loop
The actual run() is an async coroutine that runs the dispatch loop, NOT a _dispatch_loop background thread. It uses asyncio and loop.run_in_executor to bridge to the blocking run_worker_lifecycle call.
_push_state() payload shape
async def _push_state(self, status: str = "running", active_tier: str = None) -> None:
if not self.event_queue:
return
payload = {
"status": status,
"active_tier": active_tier,
"tier_usage": self.tier_usage,
"track": {"id": self.track.id, "title": self.track.description},
"tickets": [asdict(t) for t in self.track.tickets],
}
await self.event_queue.put("mma_state_update", payload)
The WorkerPool (in src/multi_agent_conductor.py:50-114)
A dict[str, Thread] + threading.Lock + threading.Semaphore, NOT a ThreadPoolExecutor wrapper.
class WorkerPool:
def __init__(self, max_workers: int = 4):
self.max_workers = max_workers
self._active: dict[str, threading.Thread] = {}
self._lock = threading.Lock()
self._semaphore = threading.Semaphore(max_workers)
Methods (actual signatures):
| Method | Returns | Purpose |
|---|---|---|
spawn(ticket_id, target, args=()) |
None |
Spawns a daemon thread running target(*args) if the pool has capacity. No-op if full. Tracks the thread in self._active[ticket_id]. |
join_all(timeout=None) |
None |
Joins every active thread. |
get_active_count() |
int |
Returns len(self._active). |
is_full() |
bool |
True if get_active_count() >= self.max_workers. |
Concurrency control is a Semaphore, not a thread-pool executor. The semaphore is acquired in spawn and released in the spawned thread's _run wrapper. self._active is the source of truth for get_active_count/is_full; the semaphore prevents over-spawning.
Sub-Agent Invocation (mma_exec.py)
The ConductorEngine does not spawn mma_exec.py directly. Sub-agent invocation is a synchronous CLI bridge at scripts/mma_exec.py invoked from a Tier 3 worker (see conductor/workflow.md "MMA Bridge" section). Each sub-agent is invoked via:
uv run python scripts/mma_exec.py --role tier3-worker "[PROMPT]"
The --role flag selects between tier1-orchestrator, tier2-tech-lead, tier3-worker, and tier4-qa. Sub-agents receive context via stdin (or as additional CLI args) and exit after one round-trip. The actual prompt construction lives in run_worker_lifecycle at src/multi_agent_conductor.py (the free function referenced by both ConductorEngine.run and the worker spawn flow).
The "Token Firewall" effect — each worker starts with a clean context window — is achieved by the ai_client.reset_session() call at the start of run_worker_lifecycle (see guide_mma.md "Context Amnesia").
Track Loading
Tracks live in the project's conductor/tracks/ directory:
conductor/tracks/
├── 2026-05-20_initial_setup/
│ ├── plan.md # Ticket list with dependencies
│ ├── decisions.md # Architectural notes
│ └── tasks/
│ ├── 001_init_repo.md
│ ├── 002_install_deps.md
│ └── ...
└── 2026-06-02_command_palette/
├── plan.md
└── ...
plan.md Format
# Phase 1: Foundation
- [ ] Task 1.1: Initialize the project
- [x] Task 1.2: Install dependencies (done)
- [~] Task 1.3: Configure paths (in progress)
# Phase 2: Implementation
- [ ] Task 2.1: Add command palette [depends: 1.3]
- [ ] Task 2.2: Hook API integration [depends: 2.1]
- [ ] Task 2.3: Documentation [depends: 2.2]
The parser extracts:
- Ticket ID (
1.1,1.2, ...) - Title (text after
:) - Status (
[ ]= pending,[~]= in progress,[x]= done,[!]= blocked) - Dependencies (
[depends: ...])
Thread Safety
The conductor uses:
threading.Lockfor_emit_event(queue.put is atomic but logging needs guarding)threading.Eventfor_stop_event(clean shutdown signaling)ThreadPoolExecutorfor worker isolationsubprocess.Popenfor hard isolation (Tier 3/4 sub-agents)
threading.local() Context
The ai_client uses threading.local() to track the source tier of each request (so comms logs can be tagged with the originating tier). The conductor passes the tier name when calling ai_client.send().
# In ai_client.py
_local = threading.local()
def send(prompt, ...):
source = getattr(_local, 'source', 'main')
comms_log(f"[{source}] {prompt[:50]}...")
# In multi_agent_conductor.py
_local.source = "tier3-worker"
ai_client.send(prompt)
Stop & Cleanup
def stop(self) -> None:
"""Stop the conductor. Workers continue to completion or are cancelled."""
self._stop_event.set()
if self._dispatch_thread:
self._dispatch_thread.join(timeout=5)
if self.worker_pool:
self.worker_pool.stop(wait=False)
The conductor is designed for graceful shutdown: in-flight workers complete, no new ones spawn.
Observability
Logging
All conductor activity is logged to logs/mma_delegation.log (JSON-L format):
{
"timestamp": "2026-06-02T12:34:56.789Z",
"event": "ticket_dispatched",
"track_id": "2026-06-02_command_palette",
"ticket_id": "2.1",
"role": "tier3-worker",
"persona": "code-implementer"
}
Per-worker logs are written to logs/agents/<track_id>_<ticket_id>_<role>_<timestamp>.log.
GUI Dashboard
The MMA Observability Dashboard (in gui_2.py) reads from the conductor's state:
- Track list with progress bars
- Active ticket's worker output stream
- DAG visualization (via
imgui-node-editor) - Per-tier strategy streams
Beads Mode Integration
When the project is in Beads mode (.beads/ directory exists), the conductor delegates to the Beads CLI instead of parsing plan.md:
def load_track(self, track_id: str) -> Track:
if self.controller.app_state.use_beads:
from src.beads_client import BeadsClient
client = BeadsClient()
tickets = client.list_tickets(track_id=track_id)
return Track(id=track_id, tickets=tickets)
else:
return self._load_markdown_track(track_id)
The downstream conductor logic is the same — it operates on a Track object regardless of source.
See docs/guide_beads.md for Beads details.
Testing
Unit Tests
tests/test_dag_engine.py—TrackDAGcycle detection, ready_tickets, blocking cascadetests/test_execution_engine.py— mode transitions, approval flowtests/test_worker_pool.py— concurrency limit, has_capacity, stoptests/test_mma_conductor.py— track loading, dispatch flow, error handling
Integration Tests (live_gui)
tests/test_mma_observability_dashboard.py — drives the dashboard via Hook API.
Mocking
Tests use unittest.mock.patch to mock subprocess.Popen and ai_client.send for hermetic tests.
See Also
- guide_architecture.md — Threading model
- guide_mma.md — MMA concepts (4-Tier hierarchy, Token Firewall)
- guide_app_controller.md — How the conductor is owned by the controller
- guide_models.md —
TicketandTrackdata structures scripts/mma_exec.py— The sub-agent entry pointscripts/mma.ps1— PowerShell wrapperconductor/workflow.md](../conductor/workflow.md) — Track execution protocol