Private
Public Access
0
0
Files
manual_slop/docs/guide_architecture.md
T

42 KiB

Architecture

Top | Tools & IPC | MMA Orchestration | Simulations


Philosophy: The Decoupled State Machine

Manual Slop solves a single tension: AI reasoning is high-latency and non-deterministic; GUI interaction must be low-latency and responsive. The engine enforces strict decoupling between four thread domains so that multi-second LLM calls never block the render loop, and every AI-generated payload passes through a human-auditable gate before execution.

The architectural philosophy follows data-oriented design principles:

  • The GUI (gui_2.py, app_controller.py) remains a pure visualization of application state
  • State mutations occur only through lock-guarded queues consumed on the main render thread
  • Background threads never write GUI state directly — they serialize task dicts for later consumption
  • All cross-thread communication uses explicit synchronization primitives (Locks, Conditions, Events)

Project Structure

The codebase is organized into a src/ layout to separate implementation from configuration and artifacts.

manual_slop/
├── conductor/          # Conductor tracks, specs, and plans
├── docs/               # Deep-dive architectural documentation
├── logs/               # Session logs, agent traces, and errors
├── scripts/            # Build, migration, and IPC bridge scripts
├── src/                # Core Python implementation
│   ├── ai_client.py    # LLM provider abstraction
│   ├── gui_2.py        # Main ImGui application
│   ├── mcp_client.py   # MCP tool implementation
│   └── ...             # Other core modules
├── tests/              # Pytest suite and simulation fixtures
├── simulation/         # Workflow and agent simulation logic
├── sloppy.py           # Primary application entry point
├── config.toml         # Global application settings
└── manual_slop.toml    # Project-specific configuration

Thread Domains

Four distinct thread domains operate concurrently:

Domain Created By Purpose Lifecycle Key Synchronization Primitives
Main / GUI immapp.run() Dear ImGui retained-mode render loop; sole writer of GUI state App lifetime None (consumer of queues)
Asyncio Worker App.__init__ via threading.Thread(daemon=True) Event queue processing, AI client calls Daemon (dies with process) AsyncEventQueue, threading.Lock
HookServer api_hooks.HookServer.start() HTTP API on :8999 for external automation and IPC Daemon thread threading.Lock, threading.Event
Ad-hoc Transient threading.Thread calls Model-fetching, legacy send paths, log pruning Short-lived Task-specific locks

The asyncio worker is not the main thread's event loop. It runs a dedicated asyncio.new_event_loop() on its own daemon thread:

# AppController.__init__:
self._loop = asyncio.new_event_loop()
self._loop_thread = threading.Thread(target=self._run_event_loop, daemon=True)
self._loop_thread.start()

# _run_event_loop:
def _run_event_loop(self) -> None:
    asyncio.set_event_loop(self._loop)
    self._loop.create_task(self._process_event_queue())
    self._loop.run_forever()

The GUI thread uses asyncio.run_coroutine_threadsafe(coro, self._loop) to push work into this loop.

Thread-Local Context Isolation

For concurrent multi-agent execution, the application uses threading.local() to manage per-thread context:

# ai_client.py
_local_storage = threading.local()

def get_current_tier() -> Optional[str]:
    """Returns the current tier from thread-local storage."""
    return getattr(_local_storage, "current_tier", None)

def set_current_tier(tier: Optional[str]) -> None:
    """Sets the current tier in thread-local storage."""
    _local_storage.current_tier = tier

This ensures that comms log entries and tool calls are correctly tagged with their source tier even when multiple workers execute concurrently.


Cross-Thread Data Structures

All cross-thread communication uses one of three patterns:

Pattern A: AsyncEventQueue (GUI -> Asyncio)

# events.py
class AsyncEventQueue:
    _queue: asyncio.Queue  # holds Tuple[str, Any] items

    async def put(self, event_name: str, payload: Any = None) -> None
    async def get(self) -> Tuple[str, Any]

The central event bus. Uses asyncio.Queue, so non-asyncio threads must enqueue via asyncio.run_coroutine_threadsafe(). Consumer is App._process_event_queue(), running as a long-lived coroutine on the asyncio loop.

Pattern B: Guarded Lists (Any Thread -> GUI)

Background threads cannot write GUI state directly. They append task dicts to lock-guarded lists; the main thread drains these once per frame:

# App.__init__:
self._pending_gui_tasks: list[dict[str, Any]] = []
self._pending_gui_tasks_lock = threading.Lock()

self._pending_comms: list[dict[str, Any]] = []
self._pending_comms_lock = threading.Lock()

self._pending_tool_calls: list[tuple[str, str, float]] = []
self._pending_tool_calls_lock = threading.Lock()

self._pending_history_adds: list[dict[str, Any]] = []
self._pending_history_adds_lock = threading.Lock()

Additional locks:

self._send_thread_lock = threading.Lock()       # Guards send_thread creation
self._pending_dialog_lock = threading.Lock()     # Guards _pending_dialog + _pending_actions dict

Pattern C: Condition-Variable Dialogs (Bidirectional Blocking)

Used for Human-in-the-Loop (HITL) approval. Background thread blocks on threading.Condition; GUI thread signals after user action. See the HITL section below.


Event System

Three classes in events.py (89 lines, no external dependencies beyond asyncio and typing):

EventEmitter

class EventEmitter:
    _listeners: Dict[str, List[Callable]]

    def on(self, event_name: str, callback: Callable) -> None
    def emit(self, event_name: str, *args: Any, **kwargs: Any) -> None

Synchronous pub-sub. Callbacks execute in the caller's thread. Used by ai_client.events for lifecycle hooks (request_start, response_received, tool_execution). No thread safety — relies on consistent single-thread usage.

AsyncEventQueue

Described above in Pattern A.

UserRequestEvent

class UserRequestEvent:
    prompt: str           # User's raw input text
    stable_md: str        # Generated markdown context (files, screenshots)
    file_items: List[Any] # File attachment items for dynamic refresh
    disc_text: str        # Serialized discussion history
    base_dir: str         # Working directory for shell commands

    def to_dict(self) -> Dict[str, Any]

Pure data carrier. Created on the GUI thread in _handle_generate_send, consumed on the asyncio thread in _handle_request_event.


Application Lifetime

Boot Sequence

The App.__init__ (lines 152-296) follows this precise order:

  1. Config hydration: Reads config.toml (global) and <project>.toml (local). Builds the initial "world view" — tracked files, discussion history, active models.
  2. Thread bootstrapping:
    • Asyncio event loop thread starts (_loop_thread).
    • HookServer starts as a daemon if test_hooks_enabled or provider is gemini_cli.
  3. Callback wiring (_init_ai_and_hooks): Connects ai_client.confirm_and_run_callback, comms_log_callback, tool_log_callback to GUI handlers.
  4. UI entry: Main thread enters immapp.run(). GUI is now alive; background threads are ready.

Shutdown Sequence

When immapp.run() returns (user closed window):

  1. hook_server.stop() — shuts down HTTP server, joins thread.
  2. perf_monitor.stop().
  3. ai_client.cleanup() — destroys server-side API caches (Gemini CachedContent).
  4. Dual-Flush persistence: _flush_to_project(), _save_active_project(), _flush_to_config(), save_config() — commits state back to both project and global configs.
  5. session_logger.close_session().

The asyncio loop thread is a daemon — it dies with the process. App.shutdown() exists for explicit cleanup in test scenarios:

def shutdown(self) -> None:
    if self._loop.is_running():
        self._loop.call_soon_threadsafe(self._loop.stop)
    if self._loop_thread.is_alive():
        self._loop_thread.join(timeout=2.0)

The Task Pipeline: Producer-Consumer Synchronization

Request Flow

GUI Thread                    Asyncio Thread                      GUI Thread (next frame)
──────────                    ──────────────                      ──────────────────────
1. User clicks "Gen + Send"
2. _handle_generate_send():
   - Compiles md context
   - Creates UserRequestEvent
   - Enqueues via
     run_coroutine_threadsafe  ──>  3. _process_event_queue():
                                       awaits event_queue.get()
                                       routes "user_request" to
                                       _handle_request_event()
                                   4. Configures ai_client
                                   5. ai_client.send() BLOCKS
                                      (seconds to minutes)
                                   6. On completion, enqueues
                                      "response" event back       ──>  7. _process_pending_gui_tasks():
                                                                          Drains task list under lock
                                                                          Sets ai_response text
                                                                          Triggers terminal blink

Event Types Routed by _process_event_queue

Event Name Action
"user_request" Calls _handle_request_event(payload) — synchronous blocking AI call
"response" Appends {"action": "handle_ai_response", ...} to _pending_gui_tasks
"mma_state_update" Appends {"action": "mma_state_update", ...} to _pending_gui_tasks
"mma_spawn_approval" Appends the raw payload for HITL dialog creation
"mma_step_approval" Appends the raw payload for HITL dialog creation

The pattern: events arriving on the asyncio thread that need GUI state changes are serialized into _pending_gui_tasks for consumption on the next render frame.

Frame-Sync Mechanism: _process_pending_gui_tasks

Called once per ImGui frame on the main GUI thread. This is the sole safe point for mutating GUI-visible state.

Locking strategy — copy-and-clear:

def _process_pending_gui_tasks(self) -> None:
    if not self._pending_gui_tasks:
        return
    with self._pending_gui_tasks_lock:
        tasks = self._pending_gui_tasks[:]   # Snapshot
        self._pending_gui_tasks.clear()       # Release lock fast
    for task in tasks:
        # Process each task outside the lock

Acquires the lock briefly to snapshot the task list, then processes outside the lock. Minimizes lock contention with producer threads.

Complete Action Type Catalog

Action Source Effect
"refresh_api_metrics" asyncio/hooks Updates API metrics display
"handle_ai_response" asyncio Sets ai_response, ai_status, mma_streams[stream_id]; triggers blink; optionally auto-adds to discussion history
"show_track_proposal" asyncio Sets proposed_tracks list, opens modal
"mma_state_update" asyncio Updates mma_status, active_tier, mma_tier_usage, active_tickets, active_track
"set_value" HookServer Sets any field in _settable_fields map via setattr; special-cases current_provider/current_model to reconfigure AI client
"click" HookServer Dispatches to _clickable_actions map; introspects signatures to decide whether to pass user_data
"select_list_item" HookServer Routes to _switch_discussion() for discussion listbox
{"type": "ask"} HookServer Opens ask dialog: sets _pending_ask_dialog = True, stores _ask_request_id and _ask_tool_data
"clear_ask" HookServer Clears ask dialog state if request_id matches
"custom_callback" HookServer Executes an arbitrary callable with args
"mma_step_approval" asyncio (MMA engine) Creates MMAApprovalDialog, stores in _pending_mma_approval
"mma_spawn_approval" asyncio (MMA engine) Creates MMASpawnApprovalDialog, stores in _pending_mma_spawn
"refresh_from_project" HookServer/internal Reloads all UI state from project dict

The Execution Clutch: Human-in-the-Loop

The "Execution Clutch" ensures every destructive AI action passes through an auditable human gate. Three dialog types implement this, all sharing the same blocking pattern.

Dialog Classes

ConfirmDialog — PowerShell script execution approval:

class ConfirmDialog:
    _uid: str                        # uuid4 identifier
    _script: str                     # The PowerShell script text (editable)
    _base_dir: str                   # Working directory
    _condition: threading.Condition  # Blocking primitive
    _done: bool                      # Signal flag
    _approved: bool                  # User's decision

    def wait(self) -> tuple[bool, str]   # Blocks until _done; returns (approved, script)

MMAApprovalDialog — MMA tier step approval:

class MMAApprovalDialog:
    _ticket_id: str
    _payload: str                    # The step payload (editable)
    _condition: threading.Condition
    _done: bool
    _approved: bool

    def wait(self) -> tuple[bool, str]   # Returns (approved, payload)

MMASpawnApprovalDialog — Sub-agent spawn approval:

class MMASpawnApprovalDialog:
    _ticket_id: str
    _role: str                       # tier3-worker, tier4-qa, etc.
    _prompt: str                     # Spawn prompt (editable)
    _context_md: str                 # Context document (editable)
    _condition: threading.Condition
    _done: bool
    _approved: bool
    _abort: bool                     # Can abort entire track

    def wait(self) -> dict[str, Any]   # Returns {approved, abort, prompt, context_md}

Blocking Flow

Using ConfirmDialog as exemplar:

   ASYNCIO THREAD (ai_client tool callback)         GUI MAIN THREAD
   ─────────────────────────────────────────         ───────────────
   1. ai_client calls _confirm_and_run(script)
   2. Creates ConfirmDialog(script, base_dir)
   3. Stores dialog:
      - Headless: _pending_actions[uid] = dialog
      - GUI mode: _pending_dialog = dialog
   4. If test_hooks_enabled:
      pushes to _api_event_queue
   5. dialog.wait() BLOCKS on _condition
                                                    6. Next frame: ImGui renders
                                                       _pending_dialog in modal
                                                    7. User clicks Approve/Reject
                                                    8. _handle_approve_script():
                                                       with dialog._condition:
                                                           dialog._approved = True
                                                           dialog._done = True
                                                           dialog._condition.notify_all()
   9. wait() returns (True, potentially_edited_script)
   10. Executes shell_runner.run_powershell()
   11. Returns output to ai_client

The _condition.wait(timeout=0.1) uses a 100ms polling interval inside a loop — a polling-with-condition hybrid that ensures the blocking thread wakes periodically.

Resolution Paths

GUI button path (normal interactive use): _handle_approve_script() / _handle_approve_mma_step() / _handle_approve_spawn() directly manipulate the dialog's condition variable from the GUI thread.

HTTP API path (headless/automation): resolve_pending_action(action_id, approved) looks up the dialog by UUID in _pending_actions dict (headless) or _pending_dialog (GUI), then signals the condition:

def resolve_pending_action(self, action_id: str, approved: bool) -> bool:
    with self._pending_dialog_lock:
        if action_id in self._pending_actions:
            dialog = self._pending_actions[action_id]
            with dialog._condition:
                dialog._approved = approved
                dialog._done = True
                dialog._condition.notify_all()
            return True

MMA approval path: _handle_mma_respond(approved, payload, abort, prompt, context_md) is the unified resolver. It uses a dialog_container — a one-element list [None] used as a mutable reference shared between the MMA engine (which creates the container) and the GUI (which populates it via _process_pending_gui_tasks).


AI Client: Multi-Provider Architecture

ai_client.py operates as a stateful singleton — all provider state is held in module-level globals. There is no class wrapping; the module itself is the abstraction layer.

Module-Level State

_provider: str = "gemini"              # "gemini" | "anthropic" | "deepseek" | "gemini_cli" | "minimax"
_model: str = "gemini-2.5-flash-lite"
_temperature: float = 0.0
_top_p: float = 1.0
_max_tokens: int = 8192
_history_trunc_limit: int = 8000       # Char limit for truncating old tool outputs

_send_lock: threading.Lock             # Serializes ALL send() calls across providers

Per-provider client objects:

# Gemini (SDK-managed stateful chat)
_gemini_client: genai.Client | None
_gemini_chat: Any                      # Holds history internally
_gemini_cache: Any                     # Server-side CachedContent
_gemini_cache_md_hash: str | None      # Hash for cache invalidation
_gemini_cache_created_at: float | None # Monotonic time of cache creation
_gemini_cached_file_paths: list[str]   # File paths included in the active cache
_GEMINI_CACHE_TTL: int = 3600          # 1-hour; rebuilt at 90% (3240s)

# Anthropic (client-managed history)
_anthropic_client: anthropic.Anthropic | None
_anthropic_history: list[dict]         # Mutable [{role, content}, ...]
_anthropic_history_lock: threading.Lock

# DeepSeek (raw HTTP, client-managed history)
_deepseek_client: Any | None
_deepseek_history: list[dict]
_deepseek_history_lock: threading.Lock

# MiniMax (raw HTTP, client-managed history)
_minimax_client: Any | None
_minimax_history: list[dict]
_minimax_history_lock: threading.Lock

# Gemini CLI (adapter wrapper)
_gemini_cli_adapter: GeminiCliAdapter | None

Safety limits:

MAX_TOOL_ROUNDS: int = 10              # Max tool-call loop iterations per send()
_MAX_TOOL_OUTPUT_BYTES: int = 500_000  # 500KB cumulative tool output budget
_ANTHROPIC_CHUNK_SIZE: int = 120_000   # Max chars per system text block
_ANTHROPIC_MAX_PROMPT_TOKENS: int = 180_000  # 200k limit minus headroom
_GEMINI_MAX_INPUT_TOKENS: int = 900_000      # 1M window minus headroom

The send() Dispatcher

def send(md_content, user_message, base_dir=".", file_items=None,
         discussion_history="", stream=False,
         pre_tool_callback=None, qa_callback=None,
         enable_tools=True, stream_callback=None, patch_callback=None,
         rag_engine=None) -> str:
    with _send_lock:
        if _provider == "gemini":      return _send_gemini(...)
        elif _provider == "gemini_cli": return _send_gemini_cli(...)
        elif _provider == "anthropic":  return _send_anthropic(...)
        elif _provider == "deepseek":   return _send_deepseek(..., stream=stream)
        elif _provider == "minimax":    return _send_minimax(..., stream=stream)

_send_lock serializes all API calls — only one provider call can be in-flight at a time. All providers share the same callback signatures. Return type is always str.

Parameter evolution (newer parameters, may be missing from older docstring mirrors):

  • enable_tools: bool = True — Per-call gate for the PowerShell + MCP tool set. Tier 4 and certain planning calls pass enable_tools=False to force text-only responses.
  • stream_callback: Optional[Callable[[str], None]] — Provider-specific streaming sink. The DeepSeek and MiniMax paths invoke this as tokens arrive; other providers deliver the full response after the network round-trip.
  • patch_callback: Optional[Callable[[str, str], Optional[str]]] — Tier 4 patch generation hook. Receives (error_text, file_context) and returns an optional diff. See Tier 4 Patch Generation below.
  • rag_engine: Optional[Any] — When provided, the dispatcher injects RAG-retrieved context into md_content before the provider call. The RAG engine is owned by the caller (typically AppController or multi_agent_conductor.run_worker_lifecycle); ai_client does not own its lifecycle. See RAG Integration below.

_send_lock serializes all API calls — only one provider call can be in-flight at a time. All providers share the same callback signatures. Return type is always str.

Provider Comparison

Aspect Gemini SDK Anthropic DeepSeek Gemini CLI MiniMax
Client genai.Client anthropic.Anthropic Raw requests.post GeminiCliAdapter (subprocess) Raw requests.post (OpenAI-compatible endpoint)
History SDK-managed (_gemini_chat._history) Client-managed list Client-managed list CLI-managed (session ID) Client-managed list
Caching Server-side CachedContent with TTL Prompt caching via cache_control: ephemeral (4 breakpoints) None None None
Tool format types.FunctionDeclaration JSON Schema dict Not declared Same as SDK via adapter Not declared
Tool results Part.from_function_response(response={"output": ...}) {"type": "tool_result", "tool_use_id": ..., "content": ...} {"role": "tool", "tool_call_id": ..., "content": ...} {"role": "tool", ...} {"role": "tool", "tool_call_id": ..., "content": ...}
History trimming In-place at 40% of 900K token estimate 2-phase: strip stale file refreshes, then drop turn pairs at 180K None None 2-phase: drop turn pairs at 180K (Anthropic-equivalent)
Streaming No No Yes No Yes
Error classifier _classify_gemini_error _classify_anthropic_error _classify_deepseek_error (inherits Gemini) _classify_minimax_error
Repair hook (SDK self-heals) _repair_anthropic_history _repair_deepseek_history (CLI handles) _repair_minimax_history

Tool-Call Loop (common pattern across providers)

All providers follow the same high-level loop, iterated up to MAX_TOOL_ROUNDS + 2 times:

  1. Send message (or tool results from prior round) to API.
  2. Extract text response and any function calls.
  3. Log to comms log; emit events.
  4. If no function calls or max rounds exceeded: break.
  5. For each function call:
    • If pre_tool_callback rejects: return rejection text.
    • Dispatch to mcp_client.dispatch() or shell_runner.run_powershell().
    • After the last call of this round: run _reread_file_items() for context refresh.
    • Truncate tool output at _history_trunc_limit chars.
    • Accumulate _cumulative_tool_bytes.
  6. If cumulative bytes > 500KB: inject warning.
  7. Package tool results in provider-specific format; loop.

Context Refresh Mechanism

After the last tool call in each round, _reread_file_items(file_items) checks mtimes of all tracked files:

  1. For each file item: compare Path.stat().st_mtime against stored mtime.
  2. If unchanged: pass through as-is.
  3. If changed: re-read content, store old_content for diffing, update mtime.
  4. Changed files are diffed via _build_file_diff_text:
    • Files <= 200 lines: emit full content.
    • Files > 200 lines with old_content: emit difflib.unified_diff.
  5. Diff is appended to the last tool's output as [SYSTEM: FILES UPDATED]\n\n{diff}.
  6. Stale [FILES UPDATED] blocks are stripped from older history turns by _strip_stale_file_refreshes to prevent context bloat.

Anthropic Cache Strategy (4-Breakpoint System)

Anthropic allows a maximum of 4 cache_control: ephemeral breakpoints:

# Location Purpose
1 Last block of stable system prompt Cache base instructions
2 Last block of context chunks Cache file context
3 Last tool definition Cache tool schema
4 Second-to-last user message Cache conversation prefix

Before placing breakpoint 4, all existing cache_control is stripped from history to prevent exceeding the limit.

Gemini Cache Strategy (Server-Side TTL)

System instruction content is hashed. On each call, a 3-way decision:

  • Hash changed: Delete old cache, rebuild with new content.
  • Cache age > 90% of TTL: Proactive renewal (delete + rebuild). cache_created_at is tracked via time.monotonic() for this check.
  • No cache exists: Create new CachedContent if token count >= 2048; otherwise inline.

The active cache's file inclusion set is tracked in _gemini_cached_file_paths: list[str]. On rebuild, the list is replaced atomically. The GUI uses this list to render the "cached files" indicator in the Cache Panel.


Async Tool Execution

Independent tool calls within a single round execute concurrently via asyncio.gather. This is the major latency win: when the AI emits 3 read_file calls in one turn, they run in parallel rather than sequentially.

Entry Point

async def _execute_tool_calls_concurrently(
    calls: list[Any],
    base_dir: str,
    pre_tool_callback: ...,
    qa_callback: ...,
    r_idx: int,
    provider: str,
    patch_callback: ... = None,
) -> list[tuple[str, str, str, str]]:  # (tool_name, call_id, output, original_name)
    ...

Per-Call Worker

async def _execute_single_tool_call_async(
    name: str, args: dict, call_id: str, base_dir: str,
    pre_tool_callback, qa_callback, r_idx: int,
    tier: str | None = None,
    patch_callback = None,
) -> tuple[str, str, str, str]:
    ...

tier: str | None is propagated to the comms log and pre-tool callback so audit trails can attribute tool calls to a specific MMA tier (e.g., "Tier 3", "Tier 4"). Thread-local _local_storage.current_tier is the source; the parameter is the explicit pass-through.

Exception Handling

If any individual call raises, asyncio.gather with return_exceptions=True converts the exception to a returned value rather than cancelling siblings. The post-round loop in _send_* then formats the error per provider. See guide_tools.md for the full implementation pattern and the timing analysis (sequential vs concurrent latency for a typical 3-call round).


RAG Integration

ai_client.send() accepts an optional rag_engine parameter. When supplied, the dispatcher augments md_content with RAG-retrieved context before the provider call.

def send(md_content, user_message, base_dir=".", file_items=None, ...,
         rag_engine: Optional[Any] = None) -> str:
    if rag_engine is not None:
        retrieved = rag_engine.query(user_message, top_k=5)
        md_content = _inject_rag_context(md_content, retrieved)
    ...

The RAG engine is not owned by ai_client; the caller (typically AppController for the main discussion flow, or multi_agent_conductor.run_worker_lifecycle for Tier 3 workers) is responsible for instantiating and configuring it. This keeps ai_client decoupled from any specific retrieval backend (ChromaDB local, external MCP RAG server, or none).

Lifecycle:

  • The AppController constructs a single RAGEngine per project load.
  • The RAG engine is passed through to send() for every AI call.
  • If a project disables RAG, rag_engine=None is passed and the integration is a no-op.
  • See guide_rag.md (placeholder; written in Task 10) for the vector store, chunking, and indexing pipeline.

Tier 4 Patch Generation Flow

When a Tier 3 worker's test run fails, the engine can request a Tier 4 patch instead of just an error summary. This is a structured diff, not a free-form suggestion.

Entry Point

def run_tier4_patch_generation(error: str, file_context: str) -> str:
    ...

Flow

  1. Tier 3 worker fails a test; stderr is captured by the test runner.
  2. The conductor thread calls run_tier4_patch_callback(stderr, base_dir) to get a candidate patch.
  3. If a patch is generated, the GUI's patch modal (src/patch_modal.py) presents the diff for human review.
  4. User clicks Apply Patch to resume the pipeline, or Reject to send the worker back for another attempt.
  5. The patch_callback parameter on send() is the Tier 4 hook; it can be None for callers that don't support patch generation.

Threading

run_tier4_patch_generation calls send() with enable_tools=False to force a text-only response. The result is parsed as a unified diff. If parsing fails, the modal shows the raw response and the user can manually copy-edit.


Discussion Compression

Long discussions accumulate tool outputs and intermediate reasoning that bloat the context. The run_discussion_compression function asks the active provider to produce a compressed summary of the discussion so far.

Entry Point

def run_discussion_compression(discussion_text: str) -> str:
    ...

Flow

  1. Caller (typically the GUI's "Compress Discussion" button or an automatic trigger when history exceeds N tokens) invokes run_discussion_compression(current_history).
  2. The function dispatches to the active provider with enable_tools=False and a fixed system prompt instructing the model to summarize while preserving key decisions, file paths, and unresolved questions.
  3. The returned string replaces the discussion history in subsequent send() calls.
  4. The original history is archived to the session log (logs/sessions/<id>/comms.log) for audit.

Provider Robustness

The function tolerates case- and whitespace-variation in the provider string (" MiniMax " is normalized to "minimax"). This is important because the active provider may be set via different code paths (TOML, env var, runtime override).


Subagent Summarization

For very large files, the heuristic summarise_file in src/summarize.py may be insufficient. The run_subagent_summarization function asks the active provider to produce a high-signal summary of a single file using a model call rather than a heuristic.

Entry Point

def run_subagent_summarization(file_path: str, content: str, is_code: bool, outline: str) -> str:
    ...

When Invoked

  • File exceeds the heuristic summary's effective scope (configurable, typically > 5000 lines or > 100KB)
  • The aggregation strategy in aggregate.py is set to summarize (rather than full or skeleton)
  • The Tier 2 ticket generation explicitly requests a sub-agent summary for a high-priority file

Flow

  1. Caller builds a structured prompt combining the file path, content, an AST outline (if is_code=True), and a "summary" instruction.
  2. The function dispatches to the active provider with enable_tools=False.
  3. The returned string is the file's summary, which replaces the full content in the aggregated context.

Cost vs Quality Trade-off

Sub-agent summarization is more expensive than heuristic summarization (one full provider call per file) but produces higher-quality results for complex files. The caller decides based on the project's token budget and quality requirements.


Comms Log System

Every API interaction is logged to a module-level list with real-time GUI push:

def _append_comms(direction: str, kind: str, payload: dict[str, Any]) -> None:
    entry = {
        "ts":        datetime.now().strftime("%H:%M:%S"),
        "direction": direction,     # "OUT" (to API) or "IN" (from API)
        "kind":      kind,          # "request" | "response" | "tool_call" | "tool_result"
        "provider":  _provider,
        "model":     _model,
        "payload":   payload,
    }
    _comms_log.append(entry)
    if comms_log_callback:
        comms_log_callback(entry)   # Real-time push to GUI

State Machines

ai_status (Informal)

"idle" -> "sending..." -> [AI call in progress]
    -> "running powershell..." -> "powershell done, awaiting AI..."
    -> "fetching url..." | "searching web..."
    -> "done" | "error"
    -> "idle" (on reset)

HITL Dialog State (Binary per type)

  • _pending_dialog is not None — script confirmation active
  • _pending_mma_approval is not None — MMA step approval active
  • _pending_mma_spawn is not None — spawn approval active
  • _pending_ask_dialog == True — tool ask dialog active

Security: The MCP Allowlist

Every filesystem tool (read, list, search, write) is gated by the MCP Bridge (mcp_client.py). See guide_tools.md for the complete security model, tool inventory, and endpoint reference.

Summary: Every path is resolved to an absolute path and checked against a dynamically-built allowlist constructed from the project's tracked files and base directories. Files named history.toml or *_history.toml are hard-blacklisted.


Telemetry & Auditing

Every interaction is designed to be auditable:

  • JSON-L Comms Logs: Raw API traffic logged to logs/sessions/<id>/comms.log for debugging and token cost analysis.
  • Tool Call Logs: Markdown-formatted sequential records to toolcalls.log.
  • Generated Scripts: Every PowerShell script that passes through the Execution Clutch is saved to scripts/generated/<ts>_<seq>.ps1.
  • API Hook Logs: All HTTP hook invocations logged to apihooks.log.
  • CLI Call Logs: Subprocess execution details (command, stdin, stdout, stderr, latency) to clicalls.log as JSON-L.
  • Performance Monitor: Real-time FPS, Frame Time, CPU, Input Lag tracked and queryable via Hook API.

Telemetry Data Structures

# Comms log entry (JSON-L)
{
    "ts": "14:32:05",
    "direction": "OUT",
    "kind": "tool_call",
    "provider": "gemini",
    "model": "gemini-2.5-flash-lite",
    "payload": {
        "name": "run_powershell",
        "id": "call_abc123",
        "script": "Get-ChildItem"
    },
    "source_tier": "Tier 3",
    "local_ts": 1709875925.123
}

# Performance metrics (via get_metrics())
{
    "fps": 60.0,
    "fps_avg": 58.5,
    "last_frame_time_ms": 16.67,
    "frame_time_ms_avg": 17.1,
    "cpu_percent": 12.5,
    "cpu_percent_avg": 15.2,
    "input_lag_ms": 2.3,
    "input_lag_ms_avg": 3.1,
    "time_render_mma_dashboard_ms": 5.2,
    "time_render_mma_dashboard_ms_avg": 4.8
}

MMA Engine Architecture

WorkerPool: Concurrent Worker Management

The WorkerPool class in multi_agent_conductor.py manages a bounded pool of worker threads:

class WorkerPool:
    def __init__(self, max_workers: int = 4):
        self.max_workers = max_workers
        self._active: dict[str, threading.Thread] = {}
        self._lock = threading.Lock()
        self._semaphore = threading.Semaphore(max_workers)

    def spawn(self, ticket_id: str, target: Callable, args: tuple) -> Optional[threading.Thread]:
        with self._lock:
            if len(self._active) >= self.max_workers:
                return None
        
        def wrapper(*a, **kw):
            try:
                with self._semaphore:
                    target(*a, **kw)
            finally:
                with self._lock:
                    self._active.pop(ticket_id, None)
        
        t = threading.Thread(target=wrapper, args=args, daemon=True)
        with self._lock:
            self._active[ticket_id] = t
        t.start()
        return t

Key behaviors:

  • Bounded concurrency: max_workers (default 4) limits parallel ticket execution
  • Semaphore gating: Ensures no more than max_workers can execute simultaneously
  • Automatic cleanup: Thread removes itself from _active dict on completion
  • Non-blocking spawn: Returns None if pool is full, allowing the engine to defer

ConductorEngine: Orchestration Loop

The ConductorEngine orchestrates ticket execution within a track:

class ConductorEngine:
    def __init__(self, track: Track, event_queue: Optional[SyncEventQueue] = None, 
                 auto_queue: bool = False) -> None:
        self.track = track
        self.event_queue = event_queue
        self.dag = TrackDAG(self.track.tickets)
        self.engine = ExecutionEngine(self.dag, auto_queue=auto_queue)
        self.pool = WorkerPool(max_workers=4)
        self._abort_events: dict[str, threading.Event] = {}
        self._pause_event = threading.Event()
        self._tier_usage_lock = threading.Lock()
        self.tier_usage = {
            "Tier 1": {"input": 0, "output": 0, "model": "gemini-3.1-pro-preview"},
            "Tier 2": {"input": 0, "output": 0, "model": "gemini-3-flash-preview"},
            "Tier 3": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite"},
            "Tier 4": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite"},
        }

Main execution loop (run method):

  1. Pause check: If _pause_event is set, sleep and broadcast "paused" status
  2. DAG tick: Call engine.tick() to get ready tasks
  3. Completion check: If no ready tasks and all completed, break with "done" status
  4. Wait for workers: If tasks in-progress or pool active, sleep and continue
  5. Blockage detection: If no ready, no in-progress, and not all done, break with "blocked" status
  6. Spawn workers: For each ready task, spawn a worker via pool.spawn()
  7. Model escalation: Workers use models_list[min(retry_count, 2)] for capability upgrade on retries

Abort Event Propagation

Each ticket has an associated threading.Event for abort signaling:

# Before spawning worker
self._abort_events[ticket.id] = threading.Event()

# Worker checks abort at three points:
# 1. Before major work
if abort_event.is_set():
    ticket.status = "killed"
    return "ABORTED"

# 2. Before tool execution (in clutch_callback)
if abort_event.is_set():
    return False  # Reject tool

# 3. After blocking send() returns
if abort_event.is_set():
    ticket.status = "killed"
    return "ABORTED"

Architectural Invariants

  1. Single-writer principle: All GUI state mutations happen on the main thread via _process_pending_gui_tasks. Background threads never write GUI state directly.

  2. Copy-and-clear lock pattern: _process_pending_gui_tasks snapshots and clears the task list under the lock, then processes outside the lock.

  3. Context Amnesia: Each MMA Tier 3 Worker starts with ai_client.reset_session(). No conversational bleed between tickets.

  4. Send serialization: _send_lock ensures only one provider call is in-flight at a time across all threads.

  5. Dual-Flush persistence: On exit, state is committed to both project-level and global-level config files.

  6. No cross-thread GUI mutation: Background threads must push tasks to _pending_gui_tasks rather than calling GUI methods directly.

  7. Abort-before-execution: Workers check abort events before major work phases, enabling clean cancellation.

  8. Bounded worker pool: WorkerPool enforces max_workers limit to prevent resource exhaustion.


Error Classification & Recovery

ProviderError Taxonomy

The ProviderError class provides structured error classification:

class ProviderError(Exception):
    def __init__(self, kind: str, provider: str, original: Exception):
        self.kind = kind      # "quota" | "rate_limit" | "auth" | "balance" | "network" | "unknown"
        self.provider = provider
        self.original = original
    
    def ui_message(self) -> str:
        labels = {
            "quota": "QUOTA EXHAUSTED",
            "rate_limit": "RATE LIMITED",
            "auth": "AUTH / API KEY ERROR",
            "balance": "BALANCE / BILLING ERROR",
            "network": "NETWORK / CONNECTION ERROR",
            "unknown": "API ERROR",
        }
        return f"[{self.provider.upper()} {labels.get(self.kind, 'API ERROR')}]\n\n{self.original}"

Error Recovery Patterns

Error Kind Recovery Strategy
quota Display in UI, await user intervention
rate_limit Exponential backoff (not yet implemented)
auth Prompt for credential verification
balance Display billing alert
network Auto-retry with timeout
unknown Log full traceback, display in UI

Memory Management

History Trimming Strategies

Gemini (40% threshold):

if total_in > _GEMINI_MAX_INPUT_TOKENS * 0.4:
    while len(hist) > 4 and total_in > _GEMINI_MAX_INPUT_TOKENS * 0.3:
        # Drop oldest message pairs
        hist.pop(0)  # Assistant
        hist.pop(0)  # User

Anthropic (180K limit):

def _trim_anthropic_history(system_blocks, history):
    est = _estimate_prompt_tokens(system_blocks, history)
    while len(history) > 3 and est > _ANTHROPIC_MAX_PROMPT_TOKENS:
        # Drop turn pairs, preserving tool_result chains
        ...

Tool Output Budget

_MAX_TOOL_OUTPUT_BYTES: int = 500_000  # 500KB cumulative

if _cumulative_tool_bytes > _MAX_TOOL_OUTPUT_BYTES:
    # Inject warning, force final answer
    parts.append("SYSTEM WARNING: Cumulative tool output exceeded 500KB budget.")

AST Cache (file_cache.py)

_ast_cache: Dict[str, Tuple[float, tree_sitter.Tree]] = {}

def get_cached_tree(self, path: Optional[str], code: str) -> tree_sitter.Tree:
    mtime = p.stat().st_mtime if p.exists() else 0.0
    if path in _ast_cache:
        cached_mtime, tree = _ast_cache[path]
        if cached_mtime == mtime:
            return tree
    # Parse and cache with simple LRU (max 10 entries)
    if len(_ast_cache) >= 10:
        del _ast_cache[next(iter(_ast_cache))]
    tree = self.parse(code)
    _ast_cache[path] = (mtime, tree)
    return tree