Per the docs Refresh Protocol (conductor/workflow.md), after a
reference/analysis track ships, the affected guides must be updated
to reflect new module structure or new conventions. The nagent_review
track (9cc51ca9) produced a deep-dive + 10 actionable takeaways that
named 3 documentation gaps in /docs. This commit fills them.
3 new guides (1,122 lines total):
1. guide_discussions.md (353 lines) — The Discussion system
- 23-operation matrix: A1-A7 per-entry + B1-B11 discussion-level
+ C1-C5 undo/redo
- Take naming convention (<base>_take_<n>), branching, promotion
- User-managed role list (app.disc_roles)
- Per-role filter linked to MMA persona focus
- _disc_entries_lock thread-safety contract
- Hook API session endpoints
- Persistence: _flush_to_project, _flush_disc_entries_to_project,
context_snapshot
- 9 file:line refs into gui_2.py:3770-4260 + history.py
2. guide_state_lifecycle.md (375 lines) — Undo/redo + reset + state
delegation
- HistoryManager + UISnapshot (13 captured fields, 100-snapshot
capacity, debounced change-detection at render frame)
- _handle_reset_session (clears 30+ fields, replaces project,
preserves active_project_path per the 2026-06-08 regression fix)
- App.__getattr__/__setattr__ state delegation to Controller
- 4-thread access pattern with 7 lock-protected regions
- State persistence: in-memory vs project TOML vs config TOML
- Hot-reload integration
- Hook API registries (_predefined_callbacks, _gettable_fields)
- 14 file:line refs into gui_2.py:1140-1170, history.py,
app_controller.py:3286-3356
3. guide_context_aggregation.md (394 lines) — The aggregate.py
pipeline
- 3 aggregation strategies (auto, summarize, full)
- 7 per-file view modes (full, summary, skeleton, outline,
masked, custom, none)
- Full FileItem schema (9 fields + __post_init__ normalizer)
at models.py:510-559
- ContextPreset schema and ContextPresetManager
- Tier 3 worker variant (build_tier3_context with FuzzyAnchor
re-resolution and focus-file handling)
- force_full / auto_aggregate short-circuits
- Cache strategy (static prefix + dynamic history)
- 23 file:line refs into aggregate.py:36-518 + models.py:909-937
8 existing guides cross-linked to the 3 new guides and to the
nagent_review track:
- guide_gui_2.md (+ See Also entries for discussions,
state lifecycle, context aggregation,
nagent_review report)
- guide_app_controller.md (+ See Also entries for discussions,
state lifecycle, context aggregation,
nagent_review report)
- guide_context_curation.md (+ new See Also section pointing to
context aggregation + nagent_review)
- guide_architecture.md (+ new See Also section listing all 10
guides + nagent_review report)
- guide_ai_client.md (+ See Also entries for state lifecycle,
context aggregation, nagent_review
pitfalls #2 and #4)
- guide_mma.md (+ new See Also section pointing to
context aggregation, discussions,
nagent_review report §9 + takeaways §3/§10
for SubConversationRunner priority)
- guide_models.md (+ See Also entries for context
aggregation, discussions, nagent_review
report §6 on FileItem as strongest
curation dimension)
- Readme.md (+ 3 new guide entries in the index
table, with one-line summaries)
No code modified. This is documentation only.
Why these 3 guides specifically:
- guide_discussions.md: The discussion system is the user's most
edited surface. nagent_review's report §3 enumerated 23 operations
(A1-C5) that previously existed only as scattered file:line refs
across gui_2.py. A dedicated guide makes the operation matrix
discoverable.
- guide_state_lifecycle.md: The undo/redo + reset + state delegation
machinery is architecturally load-bearing but scattered across 4
files. After nagent_review identified the provider-side history
divergence as Pitfall #4, the relationship between Manual Slop's
state and the provider's state needs explicit documentation.
- guide_context_aggregation.md: aggregate.py (518 lines) is the
most-touched module after ai_client.py but had no dedicated
guide. nagent_review confirmed it's Manual Slop's strongest
curation dimension. A dedicated guide makes the 7 view modes
and 3 strategies discoverable.
The 3 new guides total 1,122 lines and follow the existing
per-source-file deep-dive style (architectural, data-oriented,
state-management-focused).
24 KiB
MMA: 4-Tier Multi-Model Agent Orchestration
Top | Architecture | Tools & IPC | Simulations
Overview
The MMA (Multi-Model Agent) system is a hierarchical task decomposition and execution engine. A high-level "epic" is broken into tracks, tracks are decomposed into tickets with dependency relationships, and tickets are executed by stateless workers with human-in-the-loop approval at every destructive boundary.
Tier 1: Orchestrator — product alignment, epic → tracks
Tier 2: Tech Lead — track → tickets (DAG), architectural oversight
Tier 3: Worker — stateless TDD implementation per ticket
Tier 4: QA — stateless error analysis, no fixes
Data Structures (models.py)
Ticket
The atomic unit of work. All MMA execution revolves around transitioning tickets through their state machine.
@dataclass
class Ticket:
id: str # e.g., "T-001"
description: str # Human-readable task description
status: str # "todo" | "in_progress" | "completed" | "blocked"
assigned_to: str # Tier assignment: "tier3-worker", "tier4-qa"
target_file: Optional[str] = None # File this ticket modifies
context_requirements: List[str] = field() # Files needed for context injection
depends_on: List[str] = field() # Ticket IDs that must complete first
blocked_reason: Optional[str] = None # Why this ticket is blocked
step_mode: bool = False # If True, requires manual approval before execution
persona_id: Optional[str] = None # Per-ticket persona override; see Persona Application
retry_count: int = 0 # Increments on failure; drives model escalation
model_override: Optional[str] = None # If set, bypasses persona/model_list selection
def mark_blocked(self, reason: str) -> None # Sets status="blocked", stores reason
def mark_complete(self) -> None # Sets status="completed"
def to_dict(self) -> Dict[str, Any]
@classmethod
def from_dict(cls, data) -> "Ticket"
Status state machine:
todo ──> in_progress ──> completed
| |
v v
blocked blocked
Track
A collection of tickets with a shared goal.
@dataclass
class Track:
id: str # Track identifier
description: str # Track-level brief
tickets: List[Ticket] = field() # Ordered list of tickets
def get_executable_tickets(self) -> List[Ticket]
# Returns all 'todo' tickets whose depends_on are all 'completed'
WorkerContext
@dataclass
class WorkerContext:
ticket_id: str # Which ticket this worker is processing
model_name: str # LLM model to use (e.g., "gemini-2.5-flash-lite")
messages: List[dict] # Conversation history for this worker
persona_id: Optional[str] = None # Per-worker persona (set in run_worker_lifecycle)
tool_preset: Optional[str] = None # Fallback tool preset if persona has none
DAG Engine (dag_engine.py)
Two classes: TrackDAG (graph) and ExecutionEngine (state machine).
TrackDAG
class TrackDAG:
def __init__(self, tickets: List[Ticket]):
self.tickets = tickets
self.ticket_map = {t.id: t for t in tickets} # O(1) lookup by ID
get_ready_tasks(): Returns tickets where status == 'todo' AND all depends_on have status == 'completed'. Missing dependencies are treated as NOT completed (fail-safe).
has_cycle(): Classic DFS cycle detection using visited set + recursion stack:
def has_cycle(self) -> bool:
visited = set()
rec_stack = set()
def is_cyclic(ticket_id):
if ticket_id in rec_stack: return True # Back edge = cycle
if ticket_id in visited: return False # Already explored
visited.add(ticket_id)
rec_stack.add(ticket_id)
for neighbor in ticket.depends_on:
if is_cyclic(neighbor): return True
rec_stack.remove(ticket_id)
return False
for ticket in self.tickets:
if ticket.id not in visited:
if is_cyclic(ticket.id): return True
return False
topological_sort(): Calls has_cycle() first — raises ValueError if cycle found. Standard DFS post-order topological sort. Returns list of ticket ID strings in dependency order.
ExecutionEngine
class ExecutionEngine:
def __init__(self, dag: TrackDAG, auto_queue: bool = False):
self.dag = dag
self.auto_queue = auto_queue
tick() — the heartbeat. On each call:
- Queries
dag.get_ready_tasks()for eligible tickets. - If
auto_queueis enabled: non-step_modetasks are automatically promoted toin_progress. step_modetasks remain intodountilapprove_task()is called.- Returns the list of ready tasks.
approve_task(task_id): Manually transitions todo → in_progress if all dependencies are met.
update_task_status(task_id, status): Force-sets status (used by workers to mark completed or blocked).
WorkerPool (multi_agent_conductor.py)
Bounded concurrent worker pool with semaphore gating.
class WorkerPool:
def __init__(self, max_workers: int = 4):
self.max_workers = max_workers
self._active: dict[str, threading.Thread] = {}
self._lock = threading.Lock()
self._semaphore = threading.Semaphore(max_workers)
Key Methods:
spawn(ticket_id, target, args)— Spawns a worker thread if pool has capacity. ReturnsNoneif full.join_all(timeout)— Waits for all active workers to complete.get_active_count()— Returns current number of active workers.is_full()— ReturnsTrueif at capacity.
Thread Safety: All state mutations are protected by _lock. The semaphore ensures at most max_workers threads execute concurrently.
Configuration: max_workers is loaded from config.toml → [mma].max_workers (default: 4).
ConductorEngine (multi_agent_conductor.py)
The Tier 2 orchestrator. Owns the execution loop that drives tickets through the DAG.
class ConductorEngine:
def __init__(self, track: Track, event_queue=None, auto_queue=False):
self.track = track
self.event_queue = event_queue
self.tier_usage = {
"Tier 1": {"input": 0, "output": 0, "model": "gemini-3.1-pro-preview", "tool_preset": None, "persona": None},
"Tier 2": {"input": 0, "output": 0, "model": "gemini-3-flash-preview", "tool_preset": None, "persona": None},
"Tier 3": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite", "tool_preset": None, "persona": None},
"Tier 4": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite", "tool_preset": None, "persona": None},
}
self.dag = TrackDAG(self.track.tickets)
self.engine = ExecutionEngine(self.dag, auto_queue=auto_queue)
self.pool = WorkerPool(max_workers=max_workers)
self._abort_events: dict[str, threading.Event] = {}
self._pause_event: threading.Event = threading.Event()
Per-tier tier_usage schema (each tier entry):
| Key | Type | Purpose |
|---|---|---|
input |
int |
Cumulative input tokens for this tier |
output |
int |
Cumulative output tokens for this tier |
model |
str |
Default model name (overridable per ticket via model_override or persona) |
tool_preset |
Optional[str] |
Active tool preset name (set via set_tool_preset or persona) |
persona |
Optional[str] |
Active persona name (set when a ticket's persona is applied) |
State Broadcast (_push_state)
On every state change, the engine pushes the full orchestration state to the GUI via AsyncEventQueue:
async def _push_state(self, status="running", active_tier=None):
payload = {
"status": status, # "running" | "done" | "blocked"
"active_tier": active_tier, # e.g., "Tier 2 (Tech Lead)", "Tier 3 (Worker): T-001"
"tier_usage": self.tier_usage,
"track": {"id": self.track.id, "title": self.track.description},
"tickets": [asdict(t) for t in self.track.tickets]
}
await self.event_queue.put("mma_state_update", payload)
This payload is consumed by the GUI's _process_pending_gui_tasks handler for "mma_state_update", which updates mma_status, active_tier, mma_tier_usage, active_tickets, and active_track.
Ticket Ingestion (parse_json_tickets)
Parses a JSON array of ticket dicts (from Tier 2 LLM output) into Ticket objects, appends to self.track.tickets, then rebuilds the TrackDAG and ExecutionEngine.
Main Execution Loop (run)
async def run(self):
while True:
ready_tasks = self.engine.tick()
if not ready_tasks:
if all tickets completed:
await self._push_state("done")
break
if any in_progress:
await asyncio.sleep(1) # Waiting for async workers
continue
else:
await self._push_state("blocked")
break
for ticket in ready_tasks:
if in_progress or (auto_queue and not step_mode):
ticket.status = "in_progress"
await self._push_state("running", f"Tier 3 (Worker): {ticket.id}")
# Create worker context
context = WorkerContext(
ticket_id=ticket.id,
model_name="gemini-2.5-flash-lite",
messages=[]
)
# Execute in thread pool (blocking AI call)
await loop.run_in_executor(
None, run_worker_lifecycle, ticket, context, ...
)
await self._push_state("running", "Tier 2 (Tech Lead)")
elif todo and (step_mode or not auto_queue):
await self._push_state("running", f"Awaiting Approval: {ticket.id}")
await asyncio.sleep(1) # Pause for HITL approval
Tier 2: Tech Lead (conductor_tech_lead.py)
The Tier 2 AI call converts a high-level Track brief into discrete Tier 3 tickets.
generate_tickets(track_brief, module_skeletons) -> list[dict]
def generate_tickets(track_brief: str, module_skeletons: str) -> list[dict]:
system_prompt = mma_prompts.PROMPTS.get("tier2_sprint_planning")
user_message = (
f"### TRACK BRIEF:\n{track_brief}\n\n"
f"### MODULE SKELETONS:\n{module_skeletons}\n\n"
"Please generate the implementation tickets for this track."
)
# Temporarily override system prompt
old_system_prompt = ai_client._custom_system_prompt
ai_client.set_custom_system_prompt(system_prompt)
try:
response = ai_client.send(md_content="", user_message=user_message)
# Multi-layer JSON extraction:
# 1. Try ```json ... ``` blocks
# 2. Try ``` ... ``` blocks
# 3. Regex search for [ { ... } ] pattern
tickets = json.loads(json_match)
return tickets
finally:
ai_client.set_custom_system_prompt(old_system_prompt)
The JSON extraction is defensive — handles markdown code fences, bare JSON, and regex fallback for embedded arrays.
topological_sort(tickets: list[dict]) -> list[dict]
Convenience wrapper: converts raw dicts to Ticket objects, builds a TrackDAG, calls dag.topological_sort(), returns the original dicts reordered by sorted IDs.
Tier 3: Worker Lifecycle (run_worker_lifecycle)
This free function executes a single ticket. Key behaviors:
Context Amnesia
ai_client.reset_session() # Each ticket starts with a clean slate
No conversational bleed between tickets. Every worker is stateless.
Context Injection
For context_requirements files:
- First file:
parser.get_curated_view(content)— full skeleton with@core_logicand[HOT]bodies preserved. - Subsequent files:
parser.get_skeleton(content)— cheaper, signatures + docstrings only.
Prompt Construction
user_message = (
f"You are assigned to Ticket {ticket.id}.\n"
f"Task Description: {ticket.description}\n"
f"\nContext Files:\n{context_injection}\n"
"Please complete this task. If you are blocked and cannot proceed, "
"start your response with 'BLOCKED' and explain why."
)
HITL Clutch Integration
If event_queue is provided, confirm_spawn() is called before executing, allowing the user to:
- Read the prompt and context.
- Edit both the prompt and context markdown.
- Approve, reject, or abort the entire track.
The confirm_spawn function uses the dialog_container pattern:
- Create
dialog_container = [None](mutable container for thread communication). - Push
"mma_spawn_approval"task to event queue with the container. - Poll
dialog_container[0]every 100ms for up to 60 seconds. - When the GUI fills in the dialog, call
.wait()to get the result. - Returns
(approved, modified_prompt, modified_context).
Persona Application
When a ticket has persona_id set (or a tier-level persona is active), run_worker_lifecycle resolves the persona from PersonaManager and applies it before the AI call:
# Apply Persona if specified
persona = None
if context.persona_id:
pm = PersonaManager(...)
personas = pm.load_all()
if context.persona_id in personas:
persona = personas[context.persona_id]
if persona.system_prompt:
ai_client.set_custom_system_prompt(persona.system_prompt)
if persona.bias_profile:
ai_client.set_bias_profile(persona.bias_profile)
if persona.preferred_models:
preferred_models = persona.preferred_models
if persona.tool_preset:
persona_tool_preset = persona.tool_preset
# Apply tool preset: use persona's tool_preset if available, otherwise fall back to context.tool_preset
effective_tool_preset = persona_tool_preset or context.tool_preset
A single persona may override:
system_prompt— replaces the default system prompt for the workerbias_profile— influences tool selection via semantic nudgingpreferred_models— list used for model escalation (replaces the defaultmodels_list)tool_preset— applied viaset_tool_preset(); takes precedence over the ticket'scontext.tool_presetaggregation_strategy— sets the file aggregation strategy (auto/full/summarize/skeleton) for the worker's context
Resolution order at model selection time (in run_worker_lifecycle):
ticket.model_override(if set) — used unconditionallypersona.preferred_models(if persona applied) — first item is the initial modelticket.retry_count-indexed entry in the resolvedmodels_list— escalates on retries
If the persona fails to load (file not found, parse error), the worker logs a warning and falls back to the default model list. The persona is not a hard failure point.
See guide_personas.md (placeholder; written in Task 10) for the full persona schema, scope inheritance rules, and editor modal.
Tier 4: QA Error Analysis
Stateless error analysis. Invoked via the qa_callback parameter in shell_runner.run_powershell() when a command fails.
def run_tier4_analysis(error_message: str) -> str:
"""Stateless Tier 4 QA analysis of an error message."""
# Uses a dedicated system prompt for error triage
# Returns analysis text (root cause, suggested fix)
# Does NOT modify any code — analysis only
Integrated directly into the shell execution pipeline: if qa_callback is provided and the command has non-zero exit or stderr output, the callback result is appended to the tool output as QA ANALYSIS:\n<result>.
Cross-System Data Flow
The full MMA lifecycle from epic to completion:
- Tier 1 (Orchestrator): User enters an epic description in the GUI. Creates a
Trackwith a brief. - Tier 2 (Tech Lead):
conductor_tech_lead.generate_tickets()callsai_client.send()with thetier2_sprint_planningprompt, producing a JSON ticket list. - Ingestion:
ConductorEngine.parse_json_tickets()ingests the JSON, buildsTicketobjects, constructsTrackDAG+ExecutionEngine. - Execution loop:
ConductorEngine.run()enters the async loop, callingengine.tick()each iteration. - Worker dispatch: For each ready ticket,
run_worker_lifecycle()is called in a thread executor. It usesai_client.send()with MCP tools (dispatched throughmcp_client.dispatch()). - Security enforcement: MCP tools enforce the allowlist via
_resolve_and_check()on every filesystem operation. - State broadcast:
_push_state()→AsyncEventQueue→ GUI renders DAG + ticket status. - External visibility:
ApiHookClient.get_mma_status()queries the Hook API for the full orchestration state. - HITL gates:
confirm_spawn()pushes to event queue → GUI renders dialog → user approves/edits →dialog_container[0].wait()returns the decision.
Token Firewalling
Each tier operates within its own token budget:
- Tier 3 workers use lightweight models (default:
gemini-2.5-flash-lite) and receive only the files listed incontext_requirements. - Context Amnesia ensures no accumulated history bleeds between tickets.
- Tier 2 tracks cumulative
tier_usageper tier:{"input": N, "output": N, "model": ..., "tool_preset": ..., "persona": ...}for token cost monitoring and persona attribution. - First file vs subsequent files: The first
context_requirementsfile gets a curated view (preserving hot paths); subsequent files get only skeletons. - RAG augmentation is caller-injected: The ConductorEngine does not own a RAG engine. The caller (typically
AppControllerfor the main discussion, or the GUI's RAG panel for project-wide queries) is responsible for instantiating anRAGEngineand passing it through toai_client.send(rag_engine=...)for each worker call. See guide_architecture.md for the dispatch flow.
Abort Event Propagation
Workers can be killed mid-execution via abort events:
# In ConductorEngine.__init__:
self._abort_events: dict[str, threading.Event] = {}
# When spawning a worker:
self._abort_events[ticket.id] = threading.Event()
# To kill a worker:
def kill_worker(self, ticket_id: str) -> None:
if ticket_id in self._abort_events:
self._abort_events[ticket_id].set() # Signal abort
thread = self._active_workers.get(ticket_id)
if thread:
thread.join(timeout=1.0) # Wait for graceful shutdown
Abort Check Points in run_worker_lifecycle:
- Before major work — checked immediately after
ai_client.reset_session() - During clutch_callback — checked before each tool execution
- After blocking send() — checked after AI call returns
When abort is detected, the ticket status is set to "killed" and the worker exits immediately.
Pause/Resume Control
The engine supports pausing the entire orchestration pipeline:
def pause(self) -> None:
self._pause_event.set()
def resume(self) -> None:
self._pause_event.clear()
In the main run() loop:
while True:
if self._pause_event.is_set():
self._push_state(status="paused", active_tier="Paused")
time.sleep(0.5)
continue
# ... normal execution
This allows the user to pause execution without killing workers.
Model Escalation
Workers automatically escalate to more capable models on retry:
models_list = [
"gemini-2.5-flash-lite", # First attempt
"gemini-2.5-flash", # Second attempt
"gemini-3.1-pro-preview" # Third+ attempt
]
model_idx = min(ticket.retry_count, len(models_list) - 1)
model_name = models_list[model_idx]
The ticket.model_override field can bypass this logic with a specific model.
Track State Persistence
Track state can be persisted to disk via project_manager.py:
conductor/tracks/<track_id>/
spec.md # Track specification (human-authored)
plan.md # Implementation plan with checkbox tasks
metadata.json # Track metadata (id, type, status, timestamps)
state.toml # Structured TrackState with task list
project_manager.get_all_tracks(base_dir) scans the tracks directory with a three-tier metadata fallback:
state.toml(structuredTrackState) — counts tasks withstatus == "completed".metadata.json(legacy) — gets id/title/status only.plan.md(regex) — counts- [x]vs- [ ]checkboxes for progress.
Beads Integration (Roadmap)
Beads is a Dolt-backed issue tracking system. The src/beads_client.py module provides a Python client for bd CLI calls (bd_create, bd_list, bd_ready, bd_update). The client is functional but not yet integrated into the ConductorEngine execution loop.
Current state (as of 2026-06-02):
BeadsClientis instantiable; it detects whether a project's.beads/directory exists and falls back to no-op if not.- Tools
bd_create,bd_list,bd_ready,bd_updateare exposed via the MCP bridge (see guide_tools.md). - The ConductorEngine still writes track state to
conductor/tracks/<id>/(markdown-based), not to a Beads repo. - A project's TOML may specify a conductor directory override (
[conductor].dir) but does not yet support a Beads repository path.
Planned integration:
- The ConductorEngine's
parse_json_ticketswould optionally forward ingested tickets toBeadsClient.bd_createwhen Beads mode is active. save_track_statewould write to.beads/instead ofconductor/tracks/<id>/state.tomlwhen Beads is active.- The Visual DAG would query
bd_listfor real-time ticket status instead of the in-memoryTrackDAG.
See guide_beads.md (placeholder; written in Task 10) for the full Beads client API and the toolset exposed to agents.
Workspace Profile Auto-Switching (Roadmap)
The WorkspaceManager (src/workspace_manager.py) supports binding workspace profiles to MMA tier context. Currently, profiles can be saved and loaded manually; the auto-switch hook is implemented but not yet wired into ConductorEngine.
Current state:
WorkspaceProfile(named docking + window state) can be saved/loaded via the GUI.- Scope inheritance (Global vs Project) is supported.
- A
bind_to_context(context_id: str, profile_name: str)method exists onWorkspaceManager.
Planned integration:
- On Tier transition (
tier1 → tier2 → tier3),ConductorEnginewould callWorkspaceManager.bind_to_context("tier3", active_profile)to reshape the UI for the current cognitive load. - This is opt-in via
[conductor].auto_switch_profiles = trueinconfig.toml.
See guide_workspace_profiles.md (placeholder; written in Task 10) for the full profile schema.
See Also
- guide_architecture.md — Threading model that MMA's worker pool respects
- guide_multi_agent_conductor.md — The
multi_agent_conductor.py+dag_engine.pyruntime - guide_app_controller.md — How the AppController drives MMA via
_cb_start_track,_do_generate,_process_event_queue - guide_context_aggregation.md — The
aggregate.py:build_tier3_contextvariant used by MMA workers - guide_discussions.md — The Discussion system; MMA worker prompts are built from the active discussion
- conductor/tracks/nagent_review_20260608/report.md §9 — Deep-dive on the MMA sub-conversation pattern vs nagent's
<nagent-conversation>tag; the highest-priority future-track is to extract MMA'srun_worker_lifecycleinto a reusableSubConversationRunnerfor 1:1 discussions (per user-flagged want) - conductor/tracks/nagent_review_20260608/nagent_takeaways_20260608.md §3 and §10 — Actionable patterns for the SubConversationRunner; the design constraint that sub-agents return a concise artifact (not a full transcript) is baked into the recommendation