Files
manual_slop/docs/guide_mma.md
Ed_ 08e003a137 docs: Complete documentation rewrite at gencpp/VEFontCache reference quality
Rewrites all docs from Gemini's 330-line executive summaries to 1874 lines
of expert-level architectural reference matching the pedagogical depth of
gencpp (Parser_Algo.md, AST_Types.md) and VEFontCache-Odin (guide_architecture.md).

Changes:
- guide_architecture.md: 73 -> 542 lines. Adds inline data structures for all
  dialog classes, cross-thread communication patterns, complete action type
  catalog, provider comparison table, 4-breakpoint Anthropic cache strategy,
  Gemini server-side cache lifecycle, context refresh algorithm.
- guide_tools.md: 66 -> 385 lines. Full 26-tool inventory with parameters,
  3-layer MCP security model walkthrough, all Hook API GET/POST endpoints
  with request/response formats, ApiHookClient method reference, /api/ask
  synchronous HITL protocol, shell runner with env config.
- guide_mma.md: NEW (368 lines). Fills major documentation gap — complete
  Ticket/Track/WorkerContext data structures, DAG engine algorithms (cycle
  detection, topological sort), ConductorEngine execution loop, Tier 2 ticket
  generation, Tier 3 worker lifecycle with context amnesia, token firewalling.
- guide_simulations.md: 64 -> 377 lines. 8-stage Puppeteer simulation
  lifecycle, mock_gemini_cli.py JSON-L protocol, approval automation pattern,
  ASTParser tree-sitter vs stdlib ast comparison, VerificationLogger.
- Readme.md: Rewritten with module map, architecture summary, config examples.
- docs/Readme.md: Proper index with guide contents table and GUI panel docs.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-01 09:44:50 -05:00

14 KiB

MMA: 4-Tier Multi-Model Agent Orchestration

Top | Architecture | Tools & IPC | Simulations


Overview

The MMA (Multi-Model Agent) system is a hierarchical task decomposition and execution engine. A high-level "epic" is broken into tracks, tracks are decomposed into tickets with dependency relationships, and tickets are executed by stateless workers with human-in-the-loop approval at every destructive boundary.

Tier 1: Orchestrator   — product alignment, epic → tracks
Tier 2: Tech Lead      — track → tickets (DAG), architectural oversight
Tier 3: Worker         — stateless TDD implementation per ticket
Tier 4: QA             — stateless error analysis, no fixes

Data Structures (models.py)

Ticket

The atomic unit of work. All MMA execution revolves around transitioning tickets through their state machine.

@dataclass
class Ticket:
    id: str                                    # e.g., "T-001"
    description: str                           # Human-readable task description
    status: str                                # "todo" | "in_progress" | "completed" | "blocked"
    assigned_to: str                           # Tier assignment: "tier3-worker", "tier4-qa"
    target_file: Optional[str] = None          # File this ticket modifies
    context_requirements: List[str] = field()  # Files needed for context injection
    depends_on: List[str] = field()            # Ticket IDs that must complete first
    blocked_reason: Optional[str] = None       # Why this ticket is blocked
    step_mode: bool = False                    # If True, requires manual approval before execution

    def mark_blocked(self, reason: str) -> None   # Sets status="blocked", stores reason
    def mark_complete(self) -> None                # Sets status="completed"
    def to_dict(self) -> Dict[str, Any]
    @classmethod
    def from_dict(cls, data) -> "Ticket"

Status state machine:

todo ──> in_progress ──> completed
 |            |
 v            v
blocked     blocked

Track

A collection of tickets with a shared goal.

@dataclass
class Track:
    id: str                                    # Track identifier
    description: str                           # Track-level brief
    tickets: List[Ticket] = field()            # Ordered list of tickets

    def get_executable_tickets(self) -> List[Ticket]
        # Returns all 'todo' tickets whose depends_on are all 'completed'

WorkerContext

@dataclass
class WorkerContext:
    ticket_id: str          # Which ticket this worker is processing
    model_name: str         # LLM model to use (e.g., "gemini-2.5-flash-lite")
    messages: List[dict]    # Conversation history for this worker

DAG Engine (dag_engine.py)

Two classes: TrackDAG (graph) and ExecutionEngine (state machine).

TrackDAG

class TrackDAG:
    def __init__(self, tickets: List[Ticket]):
        self.tickets = tickets
        self.ticket_map = {t.id: t for t in tickets}  # O(1) lookup by ID

get_ready_tasks(): Returns tickets where status == 'todo' AND all depends_on have status == 'completed'. Missing dependencies are treated as NOT completed (fail-safe).

has_cycle(): Classic DFS cycle detection using visited set + recursion stack:

def has_cycle(self) -> bool:
    visited = set()
    rec_stack = set()
    def is_cyclic(ticket_id):
        if ticket_id in rec_stack: return True    # Back edge = cycle
        if ticket_id in visited: return False      # Already explored
        visited.add(ticket_id)
        rec_stack.add(ticket_id)
        for neighbor in ticket.depends_on:
            if is_cyclic(neighbor): return True
        rec_stack.remove(ticket_id)
        return False
    for ticket in self.tickets:
        if ticket.id not in visited:
            if is_cyclic(ticket.id): return True
    return False

topological_sort(): Calls has_cycle() first — raises ValueError if cycle found. Standard DFS post-order topological sort. Returns list of ticket ID strings in dependency order.

ExecutionEngine

class ExecutionEngine:
    def __init__(self, dag: TrackDAG, auto_queue: bool = False):
        self.dag = dag
        self.auto_queue = auto_queue

tick() — the heartbeat. On each call:

  1. Queries dag.get_ready_tasks() for eligible tickets.
  2. If auto_queue is enabled: non-step_mode tasks are automatically promoted to in_progress.
  3. step_mode tasks remain in todo until approve_task() is called.
  4. Returns the list of ready tasks.

approve_task(task_id): Manually transitions todoin_progress if all dependencies are met.

update_task_status(task_id, status): Force-sets status (used by workers to mark completed or blocked).


ConductorEngine (multi_agent_conductor.py)

The Tier 2 orchestrator. Owns the execution loop that drives tickets through the DAG.

class ConductorEngine:
    def __init__(self, track: Track, event_queue=None, auto_queue=False):
        self.track = track
        self.event_queue = event_queue
        self.tier_usage = {
            "Tier 1": {"input": 0, "output": 0},
            "Tier 2": {"input": 0, "output": 0},
            "Tier 3": {"input": 0, "output": 0},
            "Tier 4": {"input": 0, "output": 0},
        }
        self.dag = TrackDAG(self.track.tickets)
        self.engine = ExecutionEngine(self.dag, auto_queue=auto_queue)

State Broadcast (_push_state)

On every state change, the engine pushes the full orchestration state to the GUI via AsyncEventQueue:

async def _push_state(self, status="running", active_tier=None):
    payload = {
        "status": status,           # "running" | "done" | "blocked"
        "active_tier": active_tier,  # e.g., "Tier 2 (Tech Lead)", "Tier 3 (Worker): T-001"
        "tier_usage": self.tier_usage,
        "track": {"id": self.track.id, "title": self.track.description},
        "tickets": [asdict(t) for t in self.track.tickets]
    }
    await self.event_queue.put("mma_state_update", payload)

This payload is consumed by the GUI's _process_pending_gui_tasks handler for "mma_state_update", which updates mma_status, active_tier, mma_tier_usage, active_tickets, and active_track.

Ticket Ingestion (parse_json_tickets)

Parses a JSON array of ticket dicts (from Tier 2 LLM output) into Ticket objects, appends to self.track.tickets, then rebuilds the TrackDAG and ExecutionEngine.

Main Execution Loop (run)

async def run(self):
    while True:
        ready_tasks = self.engine.tick()

        if not ready_tasks:
            if all tickets completed:
                await self._push_state("done")
                break
            if any in_progress:
                await asyncio.sleep(1)    # Waiting for async workers
                continue
            else:
                await self._push_state("blocked")
                break

        for ticket in ready_tasks:
            if in_progress or (auto_queue and not step_mode):
                ticket.status = "in_progress"
                await self._push_state("running", f"Tier 3 (Worker): {ticket.id}")

                # Create worker context
                context = WorkerContext(
                    ticket_id=ticket.id,
                    model_name="gemini-2.5-flash-lite",
                    messages=[]
                )

                # Execute in thread pool (blocking AI call)
                await loop.run_in_executor(
                    None, run_worker_lifecycle, ticket, context, ...
                )

                await self._push_state("running", "Tier 2 (Tech Lead)")

            elif todo and (step_mode or not auto_queue):
                await self._push_state("running", f"Awaiting Approval: {ticket.id}")
                await asyncio.sleep(1)    # Pause for HITL approval

Tier 2: Tech Lead (conductor_tech_lead.py)

The Tier 2 AI call converts a high-level Track brief into discrete Tier 3 tickets.

generate_tickets(track_brief, module_skeletons) -> list[dict]

def generate_tickets(track_brief: str, module_skeletons: str) -> list[dict]:
    system_prompt = mma_prompts.PROMPTS.get("tier2_sprint_planning")
    user_message = (
        f"### TRACK BRIEF:\n{track_brief}\n\n"
        f"### MODULE SKELETONS:\n{module_skeletons}\n\n"
        "Please generate the implementation tickets for this track."
    )
    # Temporarily override system prompt
    old_system_prompt = ai_client._custom_system_prompt
    ai_client.set_custom_system_prompt(system_prompt)
    try:
        response = ai_client.send(md_content="", user_message=user_message)
        # Multi-layer JSON extraction:
        #   1. Try ```json ... ``` blocks
        #   2. Try ``` ... ``` blocks
        #   3. Regex search for [ { ... } ] pattern
        tickets = json.loads(json_match)
        return tickets
    finally:
        ai_client.set_custom_system_prompt(old_system_prompt)

The JSON extraction is defensive — handles markdown code fences, bare JSON, and regex fallback for embedded arrays.

topological_sort(tickets: list[dict]) -> list[dict]

Convenience wrapper: converts raw dicts to Ticket objects, builds a TrackDAG, calls dag.topological_sort(), returns the original dicts reordered by sorted IDs.


Tier 3: Worker Lifecycle (run_worker_lifecycle)

This free function executes a single ticket. Key behaviors:

Context Amnesia

ai_client.reset_session()  # Each ticket starts with a clean slate

No conversational bleed between tickets. Every worker is stateless.

Context Injection

For context_requirements files:

  • First file: parser.get_curated_view(content) — full skeleton with @core_logic and [HOT] bodies preserved.
  • Subsequent files: parser.get_skeleton(content) — cheaper, signatures + docstrings only.

Prompt Construction

user_message = (
    f"You are assigned to Ticket {ticket.id}.\n"
    f"Task Description: {ticket.description}\n"
    f"\nContext Files:\n{context_injection}\n"
    "Please complete this task. If you are blocked and cannot proceed, "
    "start your response with 'BLOCKED' and explain why."
)

HITL Clutch Integration

If event_queue is provided, confirm_spawn() is called before executing, allowing the user to:

  • Read the prompt and context.
  • Edit both the prompt and context markdown.
  • Approve, reject, or abort the entire track.

The confirm_spawn function uses the dialog_container pattern:

  1. Create dialog_container = [None] (mutable container for thread communication).
  2. Push "mma_spawn_approval" task to event queue with the container.
  3. Poll dialog_container[0] every 100ms for up to 60 seconds.
  4. When the GUI fills in the dialog, call .wait() to get the result.
  5. Returns (approved, modified_prompt, modified_context).

Tier 4: QA Error Analysis

Stateless error analysis. Invoked via the qa_callback parameter in shell_runner.run_powershell() when a command fails.

def run_tier4_analysis(error_message: str) -> str:
    """Stateless Tier 4 QA analysis of an error message."""
    # Uses a dedicated system prompt for error triage
    # Returns analysis text (root cause, suggested fix)
    # Does NOT modify any code — analysis only

Integrated directly into the shell execution pipeline: if qa_callback is provided and the command has non-zero exit or stderr output, the callback result is appended to the tool output as QA ANALYSIS:\n<result>.


Cross-System Data Flow

The full MMA lifecycle from epic to completion:

  1. Tier 1 (Orchestrator): User enters an epic description in the GUI. Creates a Track with a brief.
  2. Tier 2 (Tech Lead): conductor_tech_lead.generate_tickets() calls ai_client.send() with the tier2_sprint_planning prompt, producing a JSON ticket list.
  3. Ingestion: ConductorEngine.parse_json_tickets() ingests the JSON, builds Ticket objects, constructs TrackDAG + ExecutionEngine.
  4. Execution loop: ConductorEngine.run() enters the async loop, calling engine.tick() each iteration.
  5. Worker dispatch: For each ready ticket, run_worker_lifecycle() is called in a thread executor. It uses ai_client.send() with MCP tools (dispatched through mcp_client.dispatch()).
  6. Security enforcement: MCP tools enforce the allowlist via _resolve_and_check() on every filesystem operation.
  7. State broadcast: _push_state()AsyncEventQueue → GUI renders DAG + ticket status.
  8. External visibility: ApiHookClient.get_mma_status() queries the Hook API for the full orchestration state.
  9. HITL gates: confirm_spawn() pushes to event queue → GUI renders dialog → user approves/edits → dialog_container[0].wait() returns the decision.

Token Firewalling

Each tier operates within its own token budget:

  • Tier 3 workers use lightweight models (default: gemini-2.5-flash-lite) and receive only the files listed in context_requirements.
  • Context Amnesia ensures no accumulated history bleeds between tickets.
  • Tier 2 tracks cumulative tier_usage per tier: {"input": N, "output": N} for token cost monitoring.
  • First file vs subsequent files: The first context_requirements file gets a curated view (preserving hot paths); subsequent files get only skeletons.

Track State Persistence

Track state can be persisted to disk via project_manager.py:

conductor/tracks/<track_id>/
    spec.md          # Track specification (human-authored)
    plan.md          # Implementation plan with checkbox tasks
    metadata.json    # Track metadata (id, type, status, timestamps)
    state.toml       # Structured TrackState with task list

project_manager.get_all_tracks(base_dir) scans the tracks directory with a three-tier metadata fallback:

  1. state.toml (structured TrackState) — counts tasks with status == "completed".
  2. metadata.json (legacy) — gets id/title/status only.
  3. plan.md (regex) — counts - [x] vs - [ ] checkboxes for progress.