# MMA: 4-Tier Multi-Model Agent Orchestration [Top](../Readme.md) | [Architecture](guide_architecture.md) | [Tools & IPC](guide_tools.md) | [Simulations](guide_simulations.md) --- ## Overview The MMA (Multi-Model Agent) system is a hierarchical task decomposition and execution engine. A high-level "epic" is broken into tracks, tracks are decomposed into tickets with dependency relationships, and tickets are executed by stateless workers with human-in-the-loop approval at every destructive boundary. ``` Tier 1: Orchestrator — product alignment, epic → tracks Tier 2: Tech Lead — track → tickets (DAG), architectural oversight Tier 3: Worker — stateless TDD implementation per ticket Tier 4: QA — stateless error analysis, no fixes ``` --- ## Data Structures (`models.py`) ### Ticket The atomic unit of work. All MMA execution revolves around transitioning tickets through their state machine. ```python @dataclass class Ticket: id: str # e.g., "T-001" description: str # Human-readable task description status: str # "todo" | "in_progress" | "completed" | "blocked" assigned_to: str # Tier assignment: "tier3-worker", "tier4-qa" target_file: Optional[str] = None # File this ticket modifies context_requirements: List[str] = field() # Files needed for context injection depends_on: List[str] = field() # Ticket IDs that must complete first blocked_reason: Optional[str] = None # Why this ticket is blocked step_mode: bool = False # If True, requires manual approval before execution persona_id: Optional[str] = None # Per-ticket persona override; see Persona Application retry_count: int = 0 # Increments on failure; drives model escalation model_override: Optional[str] = None # If set, bypasses persona/model_list selection def mark_blocked(self, reason: str) -> None # Sets status="blocked", stores reason def mark_complete(self) -> None # Sets status="completed" def to_dict(self) -> Dict[str, Any] @classmethod def from_dict(cls, data) -> "Ticket" ``` **Status state machine:** ``` todo ──> in_progress ──> completed | | v v blocked blocked ``` ### Track A collection of tickets with a shared goal. ```python @dataclass class Track: id: str # Track identifier description: str # Track-level brief tickets: List[Ticket] = field() # Ordered list of tickets def get_executable_tickets(self) -> List[Ticket] # Returns all 'todo' tickets whose depends_on are all 'completed' ``` ### WorkerContext ```python @dataclass class WorkerContext: ticket_id: str # Which ticket this worker is processing model_name: str # LLM model to use (e.g., "gemini-2.5-flash-lite") messages: List[dict] # Conversation history for this worker persona_id: Optional[str] = None # Per-worker persona (set in run_worker_lifecycle) tool_preset: Optional[str] = None # Fallback tool preset if persona has none ``` --- ## DAG Engine (`dag_engine.py`) Two classes: `TrackDAG` (graph) and `ExecutionEngine` (state machine). ### TrackDAG ```python class TrackDAG: def __init__(self, tickets: List[Ticket]): self.tickets = tickets self.ticket_map = {t.id: t for t in tickets} # O(1) lookup by ID ``` **`get_ready_tasks()`**: Returns tickets where `status == 'todo'` AND all `depends_on` have `status == 'completed'`. Missing dependencies are treated as NOT completed (fail-safe). **`has_cycle()`**: Classic DFS cycle detection using visited set + recursion stack: ```python def has_cycle(self) -> bool: visited = set() rec_stack = set() def is_cyclic(ticket_id): if ticket_id in rec_stack: return True # Back edge = cycle if ticket_id in visited: return False # Already explored visited.add(ticket_id) rec_stack.add(ticket_id) for neighbor in ticket.depends_on: if is_cyclic(neighbor): return True rec_stack.remove(ticket_id) return False for ticket in self.tickets: if ticket.id not in visited: if is_cyclic(ticket.id): return True return False ``` **`topological_sort()`**: Calls `has_cycle()` first — raises `ValueError` if cycle found. Standard DFS post-order topological sort. Returns list of ticket ID strings in dependency order. ### ExecutionEngine ```python class ExecutionEngine: def __init__(self, dag: TrackDAG, auto_queue: bool = False): self.dag = dag self.auto_queue = auto_queue ``` **`tick()`** — the heartbeat. On each call: 1. Queries `dag.get_ready_tasks()` for eligible tickets. 2. If `auto_queue` is enabled: non-`step_mode` tasks are automatically promoted to `in_progress`. 3. `step_mode` tasks remain in `todo` until `approve_task()` is called. 4. Returns the list of ready tasks. **`approve_task(task_id)`**: Manually transitions `todo` → `in_progress` if all dependencies are met. **`update_task_status(task_id, status)`**: Force-sets status (used by workers to mark `completed` or `blocked`). --- ## WorkerPool (`multi_agent_conductor.py`) Bounded concurrent worker pool with semaphore gating. ```python class WorkerPool: def __init__(self, max_workers: int = 4): self.max_workers = max_workers self._active: dict[str, threading.Thread] = {} self._lock = threading.Lock() self._semaphore = threading.Semaphore(max_workers) ``` **Key Methods:** - `spawn(ticket_id, target, args)` — Spawns a worker thread if pool has capacity. Returns `None` if full. - `join_all(timeout)` — Waits for all active workers to complete. - `get_active_count()` — Returns current number of active workers. - `is_full()` — Returns `True` if at capacity. **Thread Safety:** All state mutations are protected by `_lock`. The semaphore ensures at most `max_workers` threads execute concurrently. **Configuration:** `max_workers` is loaded from `config.toml` → `[mma].max_workers` (default: 4). --- ## ConductorEngine (`multi_agent_conductor.py`) The Tier 2 orchestrator. Owns the execution loop that drives tickets through the DAG. ```python class ConductorEngine: def __init__(self, track: Track, event_queue=None, auto_queue=False): self.track = track self.event_queue = event_queue self.tier_usage = { "Tier 1": {"input": 0, "output": 0, "model": "gemini-3.1-pro-preview", "tool_preset": None, "persona": None}, "Tier 2": {"input": 0, "output": 0, "model": "gemini-3-flash-preview", "tool_preset": None, "persona": None}, "Tier 3": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite", "tool_preset": None, "persona": None}, "Tier 4": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite", "tool_preset": None, "persona": None}, } self.dag = TrackDAG(self.track.tickets) self.engine = ExecutionEngine(self.dag, auto_queue=auto_queue) self.pool = WorkerPool(max_workers=max_workers) self._abort_events: dict[str, threading.Event] = {} self._pause_event: threading.Event = threading.Event() ``` **Per-tier `tier_usage` schema** (each tier entry): | Key | Type | Purpose | |---|---|---| | `input` | `int` | Cumulative input tokens for this tier | | `output` | `int` | Cumulative output tokens for this tier | | `model` | `str` | Default model name (overridable per ticket via `model_override` or persona) | | `tool_preset` | `Optional[str]` | Active tool preset name (set via `set_tool_preset` or persona) | | `persona` | `Optional[str]` | Active persona name (set when a ticket's persona is applied) | ### State Broadcast (`_push_state`) On every state change, the engine pushes the full orchestration state to the GUI via `AsyncEventQueue`: ```python async def _push_state(self, status="running", active_tier=None): payload = { "status": status, # "running" | "done" | "blocked" "active_tier": active_tier, # e.g., "Tier 2 (Tech Lead)", "Tier 3 (Worker): T-001" "tier_usage": self.tier_usage, "track": {"id": self.track.id, "title": self.track.description}, "tickets": [asdict(t) for t in self.track.tickets] } await self.event_queue.put("mma_state_update", payload) ``` This payload is consumed by the GUI's `_process_pending_gui_tasks` handler for `"mma_state_update"`, which updates `mma_status`, `active_tier`, `mma_tier_usage`, `active_tickets`, and `active_track`. ### Ticket Ingestion (`parse_json_tickets`) Parses a JSON array of ticket dicts (from Tier 2 LLM output) into `Ticket` objects, appends to `self.track.tickets`, then rebuilds the `TrackDAG` and `ExecutionEngine`. ### Main Execution Loop (`run`) ```python async def run(self): while True: ready_tasks = self.engine.tick() if not ready_tasks: if all tickets completed: await self._push_state("done") break if any in_progress: await asyncio.sleep(1) # Waiting for async workers continue else: await self._push_state("blocked") break for ticket in ready_tasks: if in_progress or (auto_queue and not step_mode): ticket.status = "in_progress" await self._push_state("running", f"Tier 3 (Worker): {ticket.id}") # Create worker context context = WorkerContext( ticket_id=ticket.id, model_name="gemini-2.5-flash-lite", messages=[] ) # Execute in thread pool (blocking AI call) await loop.run_in_executor( None, run_worker_lifecycle, ticket, context, ... ) await self._push_state("running", "Tier 2 (Tech Lead)") elif todo and (step_mode or not auto_queue): await self._push_state("running", f"Awaiting Approval: {ticket.id}") await asyncio.sleep(1) # Pause for HITL approval ``` --- ## Tier 2: Tech Lead (`conductor_tech_lead.py`) The Tier 2 AI call converts a high-level Track brief into discrete Tier 3 tickets. ### `generate_tickets(track_brief, module_skeletons) -> list[dict]` ```python def generate_tickets(track_brief: str, module_skeletons: str) -> list[dict]: system_prompt = mma_prompts.PROMPTS.get("tier2_sprint_planning") user_message = ( f"### TRACK BRIEF:\n{track_brief}\n\n" f"### MODULE SKELETONS:\n{module_skeletons}\n\n" "Please generate the implementation tickets for this track." ) # Temporarily override system prompt old_system_prompt = ai_client._custom_system_prompt ai_client.set_custom_system_prompt(system_prompt) try: response = ai_client.send(md_content="", user_message=user_message) # Multi-layer JSON extraction: # 1. Try ```json ... ``` blocks # 2. Try ``` ... ``` blocks # 3. Regex search for [ { ... } ] pattern tickets = json.loads(json_match) return tickets finally: ai_client.set_custom_system_prompt(old_system_prompt) ``` The JSON extraction is defensive — handles markdown code fences, bare JSON, and regex fallback for embedded arrays. ### `topological_sort(tickets: list[dict]) -> list[dict]` Convenience wrapper: converts raw dicts to `Ticket` objects, builds a `TrackDAG`, calls `dag.topological_sort()`, returns the original dicts reordered by sorted IDs. --- ## Tier 3: Worker Lifecycle (`run_worker_lifecycle`) This free function executes a single ticket. Key behaviors: ### Context Amnesia ```python ai_client.reset_session() # Each ticket starts with a clean slate ``` No conversational bleed between tickets. Every worker is stateless. ### Context Injection For `context_requirements` files: - First file: `parser.get_curated_view(content)` — full skeleton with `@core_logic` and `[HOT]` bodies preserved. - Subsequent files: `parser.get_skeleton(content)` — cheaper, signatures + docstrings only. ### Prompt Construction ```python user_message = ( f"You are assigned to Ticket {ticket.id}.\n" f"Task Description: {ticket.description}\n" f"\nContext Files:\n{context_injection}\n" "Please complete this task. If you are blocked and cannot proceed, " "start your response with 'BLOCKED' and explain why." ) ``` ### HITL Clutch Integration If `event_queue` is provided, `confirm_spawn()` is called before executing, allowing the user to: - Read the prompt and context. - Edit both the prompt and context markdown. - Approve, reject, or abort the entire track. The `confirm_spawn` function uses the `dialog_container` pattern: 1. Create `dialog_container = [None]` (mutable container for thread communication). 2. Push `"mma_spawn_approval"` task to event queue with the container. 3. Poll `dialog_container[0]` every 100ms for up to 60 seconds. 4. When the GUI fills in the dialog, call `.wait()` to get the result. 5. Returns `(approved, modified_prompt, modified_context)`. ### Persona Application When a ticket has `persona_id` set (or a tier-level persona is active), `run_worker_lifecycle` resolves the persona from `PersonaManager` and applies it before the AI call: ```python # Apply Persona if specified persona = None if context.persona_id: pm = PersonaManager(...) personas = pm.load_all() if context.persona_id in personas: persona = personas[context.persona_id] if persona.system_prompt: ai_client.set_custom_system_prompt(persona.system_prompt) if persona.bias_profile: ai_client.set_bias_profile(persona.bias_profile) if persona.preferred_models: preferred_models = persona.preferred_models if persona.tool_preset: persona_tool_preset = persona.tool_preset # Apply tool preset: use persona's tool_preset if available, otherwise fall back to context.tool_preset effective_tool_preset = persona_tool_preset or context.tool_preset ``` A single persona may override: - **`system_prompt`** — replaces the default system prompt for the worker - **`bias_profile`** — influences tool selection via semantic nudging - **`preferred_models`** — list used for model escalation (replaces the default `models_list`) - **`tool_preset`** — applied via `set_tool_preset()`; takes precedence over the ticket's `context.tool_preset` - **`aggregation_strategy`** — sets the file aggregation strategy (`auto`/`full`/`summarize`/`skeleton`) for the worker's context **Resolution order at model selection time** (in `run_worker_lifecycle`): 1. `ticket.model_override` (if set) — used unconditionally 2. `persona.preferred_models` (if persona applied) — first item is the initial model 3. `ticket.retry_count`-indexed entry in the resolved `models_list` — escalates on retries If the persona fails to load (file not found, parse error), the worker logs a warning and falls back to the default model list. The persona is **not** a hard failure point. See [guide_personas.md](guide_personas.md) (placeholder; written in Task 10) for the full persona schema, scope inheritance rules, and editor modal. --- ## Tier 4: QA Error Analysis Stateless error analysis. Invoked via the `qa_callback` parameter in `shell_runner.run_powershell()` when a command fails. ```python def run_tier4_analysis(error_message: str) -> str: """Stateless Tier 4 QA analysis of an error message.""" # Uses a dedicated system prompt for error triage # Returns analysis text (root cause, suggested fix) # Does NOT modify any code — analysis only ``` Integrated directly into the shell execution pipeline: if `qa_callback` is provided and the command has non-zero exit or stderr output, the callback result is appended to the tool output as `QA ANALYSIS:\n`. --- ## Cross-System Data Flow The full MMA lifecycle from epic to completion: 1. **Tier 1 (Orchestrator)**: User enters an epic description in the GUI. Creates a `Track` with a brief. 2. **Tier 2 (Tech Lead)**: `conductor_tech_lead.generate_tickets()` calls `ai_client.send()` with the `tier2_sprint_planning` prompt, producing a JSON ticket list. 3. **Ingestion**: `ConductorEngine.parse_json_tickets()` ingests the JSON, builds `Ticket` objects, constructs `TrackDAG` + `ExecutionEngine`. 4. **Execution loop**: `ConductorEngine.run()` enters the async loop, calling `engine.tick()` each iteration. 5. **Worker dispatch**: For each ready ticket, `run_worker_lifecycle()` is called in a thread executor. It uses `ai_client.send()` with MCP tools (dispatched through `mcp_client.dispatch()`). 6. **Security enforcement**: MCP tools enforce the allowlist via `_resolve_and_check()` on every filesystem operation. 7. **State broadcast**: `_push_state()` → `AsyncEventQueue` → GUI renders DAG + ticket status. 8. **External visibility**: `ApiHookClient.get_mma_status()` queries the Hook API for the full orchestration state. 9. **HITL gates**: `confirm_spawn()` pushes to event queue → GUI renders dialog → user approves/edits → `dialog_container[0].wait()` returns the decision. --- ## Token Firewalling Each tier operates within its own token budget: - **Tier 3 workers** use lightweight models (default: `gemini-2.5-flash-lite`) and receive only the files listed in `context_requirements`. - **Context Amnesia** ensures no accumulated history bleeds between tickets. - **Tier 2** tracks cumulative `tier_usage` per tier: `{"input": N, "output": N, "model": ..., "tool_preset": ..., "persona": ...}` for token cost monitoring and persona attribution. - **First file vs subsequent files**: The first `context_requirements` file gets a curated view (preserving hot paths); subsequent files get only skeletons. - **RAG augmentation is caller-injected**: The ConductorEngine does not own a RAG engine. The caller (typically `AppController` for the main discussion, or the GUI's RAG panel for project-wide queries) is responsible for instantiating an `RAGEngine` and passing it through to `ai_client.send(rag_engine=...)` for each worker call. See [guide_architecture.md](guide_architecture.md#rag-integration) for the dispatch flow. --- ## Abort Event Propagation Workers can be killed mid-execution via abort events: ```python # In ConductorEngine.__init__: self._abort_events: dict[str, threading.Event] = {} # When spawning a worker: self._abort_events[ticket.id] = threading.Event() # To kill a worker: def kill_worker(self, ticket_id: str) -> None: if ticket_id in self._abort_events: self._abort_events[ticket_id].set() # Signal abort thread = self._active_workers.get(ticket_id) if thread: thread.join(timeout=1.0) # Wait for graceful shutdown ``` **Abort Check Points in `run_worker_lifecycle`:** 1. **Before major work** — checked immediately after `ai_client.reset_session()` 2. **During clutch_callback** — checked before each tool execution 3. **After blocking send()** — checked after AI call returns When abort is detected, the ticket status is set to `"killed"` and the worker exits immediately. --- ## Pause/Resume Control The engine supports pausing the entire orchestration pipeline: ```python def pause(self) -> None: self._pause_event.set() def resume(self) -> None: self._pause_event.clear() ``` In the main `run()` loop: ```python while True: if self._pause_event.is_set(): self._push_state(status="paused", active_tier="Paused") time.sleep(0.5) continue # ... normal execution ``` This allows the user to pause execution without killing workers. --- ## Model Escalation Workers automatically escalate to more capable models on retry: ```python models_list = [ "gemini-2.5-flash-lite", # First attempt "gemini-2.5-flash", # Second attempt "gemini-3.1-pro-preview" # Third+ attempt ] model_idx = min(ticket.retry_count, len(models_list) - 1) model_name = models_list[model_idx] ``` The `ticket.model_override` field can bypass this logic with a specific model. --- ## Track State Persistence Track state can be persisted to disk via `project_manager.py`: ``` conductor/tracks// spec.md # Track specification (human-authored) plan.md # Implementation plan with checkbox tasks metadata.json # Track metadata (id, type, status, timestamps) state.toml # Structured TrackState with task list ``` `project_manager.get_all_tracks(base_dir)` scans the tracks directory with a three-tier metadata fallback: 1. `state.toml` (structured `TrackState`) — counts tasks with `status == "completed"`. 2. `metadata.json` (legacy) — gets id/title/status only. 3. `plan.md` (regex) — counts `- [x]` vs `- [ ]` checkboxes for progress. --- ## Beads Integration (Roadmap) [Beads](https://github.com/steveyegge/beads) is a Dolt-backed issue tracking system. The `src/beads_client.py` module provides a Python client for `bd` CLI calls (`bd_create`, `bd_list`, `bd_ready`, `bd_update`). The client is functional but not yet integrated into the `ConductorEngine` execution loop. **Current state (as of 2026-06-02):** - `BeadsClient` is instantiable; it detects whether a project's `.beads/` directory exists and falls back to no-op if not. - Tools `bd_create`, `bd_list`, `bd_ready`, `bd_update` are exposed via the MCP bridge (see [guide_tools.md](guide_tools.md)). - The ConductorEngine still writes track state to `conductor/tracks//` (markdown-based), not to a Beads repo. - A project's TOML may specify a conductor directory override (`[conductor].dir`) but does not yet support a Beads repository path. **Planned integration:** - The ConductorEngine's `parse_json_tickets` would optionally forward ingested tickets to `BeadsClient.bd_create` when Beads mode is active. - `save_track_state` would write to `.beads/` instead of `conductor/tracks//state.toml` when Beads is active. - The Visual DAG would query `bd_list` for real-time ticket status instead of the in-memory `TrackDAG`. See [guide_beads.md](guide_beads.md) (placeholder; written in Task 10) for the full Beads client API and the toolset exposed to agents. --- ## Workspace Profile Auto-Switching (Roadmap) The `WorkspaceManager` (`src/workspace_manager.py`) supports binding workspace profiles to MMA tier context. Currently, profiles can be saved and loaded manually; the auto-switch hook is implemented but not yet wired into `ConductorEngine`. **Current state:** - `WorkspaceProfile` (named docking + window state) can be saved/loaded via the GUI. - Scope inheritance (Global vs Project) is supported. - A `bind_to_context(context_id: str, profile_name: str)` method exists on `WorkspaceManager`. **Planned integration:** - On Tier transition (`tier1 → tier2 → tier3`), `ConductorEngine` would call `WorkspaceManager.bind_to_context("tier3", active_profile)` to reshape the UI for the current cognitive load. - This is opt-in via `[conductor].auto_switch_profiles = true` in `config.toml`. See [guide_workspace_profiles.md](guide_workspace_profiles.md) (placeholder; written in Task 10) for the full profile schema.