# Architecture [Top](../README.md) | [Tools & IPC](guide_tools.md) | [MMA Orchestration](guide_mma.md) | [Simulations](guide_simulations.md) --- ## Philosophy: The Decoupled State Machine Manual Slop solves a single tension: **AI reasoning is high-latency and non-deterministic; GUI interaction must be low-latency and responsive.** The engine enforces strict decoupling between four thread domains so that multi-second LLM calls never block the render loop, and every AI-generated payload passes through a human-auditable gate before execution. The architectural philosophy follows data-oriented design principles: - The GUI (`gui_2.py`, `app_controller.py`) remains a pure visualization of application state - State mutations occur only through lock-guarded queues consumed on the main render thread - Background threads never write GUI state directly — they serialize task dicts for later consumption - All cross-thread communication uses explicit synchronization primitives (Locks, Conditions, Events) ## Project Structure The codebase is organized into a `src/` layout to separate implementation from configuration and artifacts. ``` manual_slop/ ├── conductor/ # Conductor tracks, specs, and plans ├── docs/ # Deep-dive architectural documentation ├── logs/ # Session logs, agent traces, and errors ├── scripts/ # Build, migration, and IPC bridge scripts ├── src/ # Core Python implementation │ ├── ai_client.py # LLM provider abstraction │ ├── gui_2.py # Main ImGui application │ ├── mcp_client.py # MCP tool implementation │ └── ... # Other core modules ├── tests/ # Pytest suite and simulation fixtures ├── simulation/ # Workflow and agent simulation logic ├── sloppy.py # Primary application entry point ├── config.toml # Global application settings └── manual_slop.toml # Project-specific configuration ``` --- ## Thread Domains Four distinct thread domains operate concurrently: | Domain | Created By | Purpose | Lifecycle | Key Synchronization Primitives | |---|---|---|---|---| | **Main / GUI** | `immapp.run()` | Dear ImGui retained-mode render loop; sole writer of GUI state | App lifetime | None (consumer of queues) | | **Asyncio Worker** | `App.__init__` via `threading.Thread(daemon=True)` | Event queue processing, AI client calls | Daemon (dies with process) | `AsyncEventQueue`, `threading.Lock` | | **HookServer** | `api_hooks.HookServer.start()` | HTTP API on `:8999` for external automation and IPC | Daemon thread | `threading.Lock`, `threading.Event` | | **Ad-hoc** | Transient `threading.Thread` calls | Model-fetching, legacy send paths, log pruning | Short-lived | Task-specific locks | The asyncio worker is **not** the main thread's event loop. It runs a dedicated `asyncio.new_event_loop()` on its own daemon thread: ```python # AppController.__init__: self._loop = asyncio.new_event_loop() self._loop_thread = threading.Thread(target=self._run_event_loop, daemon=True) self._loop_thread.start() # _run_event_loop: def _run_event_loop(self) -> None: asyncio.set_event_loop(self._loop) self._loop.create_task(self._process_event_queue()) self._loop.run_forever() ``` The GUI thread uses `asyncio.run_coroutine_threadsafe(coro, self._loop)` to push work into this loop. ### Thread-Local Context Isolation For concurrent multi-agent execution, the application uses `threading.local()` to manage per-thread context: ```python # ai_client.py _local_storage = threading.local() def get_current_tier() -> Optional[str]: """Returns the current tier from thread-local storage.""" return getattr(_local_storage, "current_tier", None) def set_current_tier(tier: Optional[str]) -> None: """Sets the current tier in thread-local storage.""" _local_storage.current_tier = tier ``` This ensures that comms log entries and tool calls are correctly tagged with their source tier even when multiple workers execute concurrently. --- ## Cross-Thread Data Structures All cross-thread communication uses one of three patterns: ### Pattern A: AsyncEventQueue (GUI -> Asyncio) ```python # events.py class AsyncEventQueue: _queue: asyncio.Queue # holds Tuple[str, Any] items async def put(self, event_name: str, payload: Any = None) -> None async def get(self) -> Tuple[str, Any] ``` The central event bus. Uses `asyncio.Queue`, so non-asyncio threads must enqueue via `asyncio.run_coroutine_threadsafe()`. Consumer is `App._process_event_queue()`, running as a long-lived coroutine on the asyncio loop. ### Pattern B: Guarded Lists (Any Thread -> GUI) Background threads cannot write GUI state directly. They append task dicts to lock-guarded lists; the main thread drains these once per frame: ```python # App.__init__: self._pending_gui_tasks: list[dict[str, Any]] = [] self._pending_gui_tasks_lock = threading.Lock() self._pending_comms: list[dict[str, Any]] = [] self._pending_comms_lock = threading.Lock() self._pending_tool_calls: list[tuple[str, str, float]] = [] self._pending_tool_calls_lock = threading.Lock() self._pending_history_adds: list[dict[str, Any]] = [] self._pending_history_adds_lock = threading.Lock() ``` Additional locks: ```python self._send_thread_lock = threading.Lock() # Guards send_thread creation self._pending_dialog_lock = threading.Lock() # Guards _pending_dialog + _pending_actions dict ``` ### Pattern C: Condition-Variable Dialogs (Bidirectional Blocking) Used for Human-in-the-Loop (HITL) approval. Background thread blocks on `threading.Condition`; GUI thread signals after user action. See the [HITL section](#the-execution-clutch-human-in-the-loop) below. --- ## Event System Three classes in `events.py` (89 lines, no external dependencies beyond `asyncio` and `typing`): ### EventEmitter ```python class EventEmitter: _listeners: Dict[str, List[Callable]] def on(self, event_name: str, callback: Callable) -> None def emit(self, event_name: str, *args: Any, **kwargs: Any) -> None ``` Synchronous pub-sub. Callbacks execute in the caller's thread. Used by `ai_client.events` for lifecycle hooks (`request_start`, `response_received`, `tool_execution`). No thread safety — relies on consistent single-thread usage. ### AsyncEventQueue Described above in Pattern A. ### UserRequestEvent ```python class UserRequestEvent: prompt: str # User's raw input text stable_md: str # Generated markdown context (files, screenshots) file_items: List[Any] # File attachment items for dynamic refresh disc_text: str # Serialized discussion history base_dir: str # Working directory for shell commands def to_dict(self) -> Dict[str, Any] ``` Pure data carrier. Created on the GUI thread in `_handle_generate_send`, consumed on the asyncio thread in `_handle_request_event`. --- ## Application Lifetime ### Boot Sequence The `App.__init__` (lines 152-296) follows this precise order: 1. **Config hydration**: Reads `config.toml` (global) and `.toml` (local). Builds the initial "world view" — tracked files, discussion history, active models. 2. **Thread bootstrapping**: - Asyncio event loop thread starts (`_loop_thread`). - `HookServer` starts as a daemon if `test_hooks_enabled` or provider is `gemini_cli`. 3. **Callback wiring** (`_init_ai_and_hooks`): Connects `ai_client.confirm_and_run_callback`, `comms_log_callback`, `tool_log_callback` to GUI handlers. 4. **UI entry**: Main thread enters `immapp.run()`. GUI is now alive; background threads are ready. ### Shutdown Sequence When `immapp.run()` returns (user closed window): 1. `hook_server.stop()` — shuts down HTTP server, joins thread. 2. `perf_monitor.stop()`. 3. `ai_client.cleanup()` — destroys server-side API caches (Gemini `CachedContent`). 4. **Dual-Flush persistence**: `_flush_to_project()`, `_save_active_project()`, `_flush_to_config()`, `save_config()` — commits state back to both project and global configs. 5. `session_logger.close_session()`. The asyncio loop thread is a daemon — it dies with the process. `App.shutdown()` exists for explicit cleanup in test scenarios: ```python def shutdown(self) -> None: if self._loop.is_running(): self._loop.call_soon_threadsafe(self._loop.stop) if self._loop_thread.is_alive(): self._loop_thread.join(timeout=2.0) ``` --- ## The Task Pipeline: Producer-Consumer Synchronization ### Request Flow ``` GUI Thread Asyncio Thread GUI Thread (next frame) ────────── ────────────── ────────────────────── 1. User clicks "Gen + Send" 2. _handle_generate_send(): - Compiles md context - Creates UserRequestEvent - Enqueues via run_coroutine_threadsafe ──> 3. _process_event_queue(): awaits event_queue.get() routes "user_request" to _handle_request_event() 4. Configures ai_client 5. ai_client.send() BLOCKS (seconds to minutes) 6. On completion, enqueues "response" event back ──> 7. _process_pending_gui_tasks(): Drains task list under lock Sets ai_response text Triggers terminal blink ``` ### Event Types Routed by `_process_event_queue` | Event Name | Action | |---|---| | `"user_request"` | Calls `_handle_request_event(payload)` — synchronous blocking AI call | | `"response"` | Appends `{"action": "handle_ai_response", ...}` to `_pending_gui_tasks` | | `"mma_state_update"` | Appends `{"action": "mma_state_update", ...}` to `_pending_gui_tasks` | | `"mma_spawn_approval"` | Appends the raw payload for HITL dialog creation | | `"mma_step_approval"` | Appends the raw payload for HITL dialog creation | The pattern: events arriving on the asyncio thread that need GUI state changes are **serialized into `_pending_gui_tasks`** for consumption on the next render frame. ### Frame-Sync Mechanism: `_process_pending_gui_tasks` Called once per ImGui frame on the **main GUI thread**. This is the sole safe point for mutating GUI-visible state. **Locking strategy** — copy-and-clear: ```python def _process_pending_gui_tasks(self) -> None: if not self._pending_gui_tasks: return with self._pending_gui_tasks_lock: tasks = self._pending_gui_tasks[:] # Snapshot self._pending_gui_tasks.clear() # Release lock fast for task in tasks: # Process each task outside the lock ``` Acquires the lock briefly to snapshot the task list, then processes outside the lock. Minimizes lock contention with producer threads. ### Complete Action Type Catalog | Action | Source | Effect | |---|---|---| | `"refresh_api_metrics"` | asyncio/hooks | Updates API metrics display | | `"handle_ai_response"` | asyncio | Sets `ai_response`, `ai_status`, `mma_streams[stream_id]`; triggers blink; optionally auto-adds to discussion history | | `"show_track_proposal"` | asyncio | Sets `proposed_tracks` list, opens modal | | `"mma_state_update"` | asyncio | Updates `mma_status`, `active_tier`, `mma_tier_usage`, `active_tickets`, `active_track` | | `"set_value"` | HookServer | Sets any field in `_settable_fields` map via `setattr`; special-cases `current_provider`/`current_model` to reconfigure AI client | | `"click"` | HookServer | Dispatches to `_clickable_actions` map; introspects signatures to decide whether to pass `user_data` | | `"select_list_item"` | HookServer | Routes to `_switch_discussion()` for discussion listbox | | `{"type": "ask"}` | HookServer | Opens ask dialog: sets `_pending_ask_dialog = True`, stores `_ask_request_id` and `_ask_tool_data` | | `"clear_ask"` | HookServer | Clears ask dialog state if request_id matches | | `"custom_callback"` | HookServer | Executes an arbitrary callable with args | | `"mma_step_approval"` | asyncio (MMA engine) | Creates `MMAApprovalDialog`, stores in `_pending_mma_approval` | | `"mma_spawn_approval"` | asyncio (MMA engine) | Creates `MMASpawnApprovalDialog`, stores in `_pending_mma_spawn` | | `"refresh_from_project"` | HookServer/internal | Reloads all UI state from project dict | --- ## The Execution Clutch: Human-in-the-Loop The "Execution Clutch" ensures every destructive AI action passes through an auditable human gate. Three dialog types implement this, all sharing the same blocking pattern. ### Dialog Classes **`ConfirmDialog`** — PowerShell script execution approval: ```python class ConfirmDialog: _uid: str # uuid4 identifier _script: str # The PowerShell script text (editable) _base_dir: str # Working directory _condition: threading.Condition # Blocking primitive _done: bool # Signal flag _approved: bool # User's decision def wait(self) -> tuple[bool, str] # Blocks until _done; returns (approved, script) ``` **`MMAApprovalDialog`** — MMA tier step approval: ```python class MMAApprovalDialog: _ticket_id: str _payload: str # The step payload (editable) _condition: threading.Condition _done: bool _approved: bool def wait(self) -> tuple[bool, str] # Returns (approved, payload) ``` **`MMASpawnApprovalDialog`** — Sub-agent spawn approval: ```python class MMASpawnApprovalDialog: _ticket_id: str _role: str # tier3-worker, tier4-qa, etc. _prompt: str # Spawn prompt (editable) _context_md: str # Context document (editable) _condition: threading.Condition _done: bool _approved: bool _abort: bool # Can abort entire track def wait(self) -> dict[str, Any] # Returns {approved, abort, prompt, context_md} ``` ### Blocking Flow Using `ConfirmDialog` as exemplar: ``` ASYNCIO THREAD (ai_client tool callback) GUI MAIN THREAD ───────────────────────────────────────── ─────────────── 1. ai_client calls _confirm_and_run(script) 2. Creates ConfirmDialog(script, base_dir) 3. Stores dialog: - Headless: _pending_actions[uid] = dialog - GUI mode: _pending_dialog = dialog 4. If test_hooks_enabled: pushes to _api_event_queue 5. dialog.wait() BLOCKS on _condition 6. Next frame: ImGui renders _pending_dialog in modal 7. User clicks Approve/Reject 8. _handle_approve_script(): with dialog._condition: dialog._approved = True dialog._done = True dialog._condition.notify_all() 9. wait() returns (True, potentially_edited_script) 10. Executes shell_runner.run_powershell() 11. Returns output to ai_client ``` The `_condition.wait(timeout=0.1)` uses a 100ms polling interval inside a loop — a polling-with-condition hybrid that ensures the blocking thread wakes periodically. ### Resolution Paths **GUI button path** (normal interactive use): `_handle_approve_script()` / `_handle_approve_mma_step()` / `_handle_approve_spawn()` directly manipulate the dialog's condition variable from the GUI thread. **HTTP API path** (headless/automation): `resolve_pending_action(action_id, approved)` looks up the dialog by UUID in `_pending_actions` dict (headless) or `_pending_dialog` (GUI), then signals the condition: ```python def resolve_pending_action(self, action_id: str, approved: bool) -> bool: with self._pending_dialog_lock: if action_id in self._pending_actions: dialog = self._pending_actions[action_id] with dialog._condition: dialog._approved = approved dialog._done = True dialog._condition.notify_all() return True ``` **MMA approval path**: `_handle_mma_respond(approved, payload, abort, prompt, context_md)` is the unified resolver. It uses a `dialog_container` — a one-element list `[None]` used as a mutable reference shared between the MMA engine (which creates the container) and the GUI (which populates it via `_process_pending_gui_tasks`). --- ## AI Client: Multi-Provider Architecture `ai_client.py` operates as a **stateful singleton** — all provider state is held in module-level globals. There is no class wrapping; the module itself is the abstraction layer. ### Module-Level State ```python _provider: str = "gemini" # "gemini" | "anthropic" | "deepseek" | "gemini_cli" | "minimax" _model: str = "gemini-2.5-flash-lite" _temperature: float = 0.0 _top_p: float = 1.0 _max_tokens: int = 8192 _history_trunc_limit: int = 8000 # Char limit for truncating old tool outputs _send_lock: threading.Lock # Serializes ALL send() calls across providers ``` Per-provider client objects: ```python # Gemini (SDK-managed stateful chat) _gemini_client: genai.Client | None _gemini_chat: Any # Holds history internally _gemini_cache: Any # Server-side CachedContent _gemini_cache_md_hash: str | None # Hash for cache invalidation _gemini_cache_created_at: float | None # Monotonic time of cache creation _gemini_cached_file_paths: list[str] # File paths included in the active cache _GEMINI_CACHE_TTL: int = 3600 # 1-hour; rebuilt at 90% (3240s) # Anthropic (client-managed history) _anthropic_client: anthropic.Anthropic | None _anthropic_history: list[dict] # Mutable [{role, content}, ...] _anthropic_history_lock: threading.Lock # DeepSeek (raw HTTP, client-managed history) _deepseek_client: Any | None _deepseek_history: list[dict] _deepseek_history_lock: threading.Lock # MiniMax (raw HTTP, client-managed history) _minimax_client: Any | None _minimax_history: list[dict] _minimax_history_lock: threading.Lock # Gemini CLI (adapter wrapper) _gemini_cli_adapter: GeminiCliAdapter | None ``` Safety limits: ```python MAX_TOOL_ROUNDS: int = 10 # Max tool-call loop iterations per send() _MAX_TOOL_OUTPUT_BYTES: int = 500_000 # 500KB cumulative tool output budget _ANTHROPIC_CHUNK_SIZE: int = 120_000 # Max chars per system text block _ANTHROPIC_MAX_PROMPT_TOKENS: int = 180_000 # 200k limit minus headroom _GEMINI_MAX_INPUT_TOKENS: int = 900_000 # 1M window minus headroom ``` ### The `send()` Dispatcher ```python def send(md_content, user_message, base_dir=".", file_items=None, discussion_history="", stream=False, pre_tool_callback=None, qa_callback=None, enable_tools=True, stream_callback=None, patch_callback=None, rag_engine=None) -> str: with _send_lock: if _provider == "gemini": return _send_gemini(...) elif _provider == "gemini_cli": return _send_gemini_cli(...) elif _provider == "anthropic": return _send_anthropic(...) elif _provider == "deepseek": return _send_deepseek(..., stream=stream) elif _provider == "minimax": return _send_minimax(..., stream=stream) ``` `_send_lock` serializes all API calls — only one provider call can be in-flight at a time. All providers share the same callback signatures. Return type is always `str`. **Parameter evolution** (newer parameters, may be missing from older docstring mirrors): - `enable_tools: bool = True` — Per-call gate for the PowerShell + MCP tool set. Tier 4 and certain planning calls pass `enable_tools=False` to force text-only responses. - `stream_callback: Optional[Callable[[str], None]]` — Provider-specific streaming sink. The DeepSeek and MiniMax paths invoke this as tokens arrive; other providers deliver the full response after the network round-trip. - `patch_callback: Optional[Callable[[str, str], Optional[str]]]` — Tier 4 patch generation hook. Receives `(error_text, file_context)` and returns an optional diff. See [Tier 4 Patch Generation](#tier-4-patch-generation-flow) below. - `rag_engine: Optional[Any]` — When provided, the dispatcher injects RAG-retrieved context into `md_content` before the provider call. The RAG engine is owned by the caller (typically `AppController` or `multi_agent_conductor.run_worker_lifecycle`); `ai_client` does not own its lifecycle. See [RAG Integration](#rag-integration) below. `_send_lock` serializes all API calls — only one provider call can be in-flight at a time. All providers share the same callback signatures. Return type is always `str`. ### Provider Comparison | Aspect | Gemini SDK | Anthropic | DeepSeek | Gemini CLI | MiniMax | |---|---|---|---|---|---| | **Client** | `genai.Client` | `anthropic.Anthropic` | Raw `requests.post` | `GeminiCliAdapter` (subprocess) | Raw `requests.post` (OpenAI-compatible endpoint) | | **History** | SDK-managed (`_gemini_chat._history`) | Client-managed list | Client-managed list | CLI-managed (session ID) | Client-managed list | | **Caching** | Server-side `CachedContent` with TTL | Prompt caching via `cache_control: ephemeral` (4 breakpoints) | None | None | None | | **Tool format** | `types.FunctionDeclaration` | JSON Schema dict | Not declared | Same as SDK via adapter | Not declared | | **Tool results** | `Part.from_function_response(response={"output": ...})` | `{"type": "tool_result", "tool_use_id": ..., "content": ...}` | `{"role": "tool", "tool_call_id": ..., "content": ...}` | `{"role": "tool", ...}` | `{"role": "tool", "tool_call_id": ..., "content": ...}` | | **History trimming** | In-place at 40% of 900K token estimate | 2-phase: strip stale file refreshes, then drop turn pairs at 180K | None | None | 2-phase: drop turn pairs at 180K (Anthropic-equivalent) | | **Streaming** | No | No | Yes | No | Yes | | **Error classifier** | `_classify_gemini_error` | `_classify_anthropic_error` | `_classify_deepseek_error` | (inherits Gemini) | `_classify_minimax_error` | | **Repair hook** | (SDK self-heals) | `_repair_anthropic_history` | `_repair_deepseek_history` | (CLI handles) | `_repair_minimax_history` | ### Tool-Call Loop (common pattern across providers) All providers follow the same high-level loop, iterated up to `MAX_TOOL_ROUNDS + 2` times: 1. Send message (or tool results from prior round) to API. 2. Extract text response and any function calls. 3. Log to comms log; emit events. 4. If no function calls or max rounds exceeded: **break**. 5. For each function call: - If `pre_tool_callback` rejects: return rejection text. - Dispatch to `mcp_client.dispatch()` or `shell_runner.run_powershell()`. - After the **last** call of this round: run `_reread_file_items()` for context refresh. - Truncate tool output at `_history_trunc_limit` chars. - Accumulate `_cumulative_tool_bytes`. 6. If cumulative bytes > 500KB: inject warning. 7. Package tool results in provider-specific format; loop. ### Context Refresh Mechanism After the last tool call in each round, `_reread_file_items(file_items)` checks mtimes of all tracked files: 1. For each file item: compare `Path.stat().st_mtime` against stored `mtime`. 2. If unchanged: pass through as-is. 3. If changed: re-read content, store `old_content` for diffing, update `mtime`. 4. Changed files are diffed via `_build_file_diff_text`: - Files <= 200 lines: emit full content. - Files > 200 lines with `old_content`: emit `difflib.unified_diff`. 5. Diff is appended to the last tool's output as `[SYSTEM: FILES UPDATED]\n\n{diff}`. 6. Stale `[FILES UPDATED]` blocks are stripped from older history turns by `_strip_stale_file_refreshes` to prevent context bloat. ### Anthropic Cache Strategy (4-Breakpoint System) Anthropic allows a maximum of 4 `cache_control: ephemeral` breakpoints: | # | Location | Purpose | |---|---|---| | 1 | Last block of stable system prompt | Cache base instructions | | 2 | Last block of context chunks | Cache file context | | 3 | Last tool definition | Cache tool schema | | 4 | Second-to-last user message | Cache conversation prefix | Before placing breakpoint 4, all existing `cache_control` is stripped from history to prevent exceeding the limit. ### Gemini Cache Strategy (Server-Side TTL) System instruction content is hashed. On each call, a 3-way decision: - **Hash changed**: Delete old cache, rebuild with new content. - **Cache age > 90% of TTL**: Proactive renewal (delete + rebuild). `cache_created_at` is tracked via `time.monotonic()` for this check. - **No cache exists**: Create new `CachedContent` if token count >= 2048; otherwise inline. The active cache's file inclusion set is tracked in `_gemini_cached_file_paths: list[str]`. On rebuild, the list is replaced atomically. The GUI uses this list to render the "cached files" indicator in the Cache Panel. --- ## Async Tool Execution Independent tool calls within a single round execute concurrently via `asyncio.gather`. This is the major latency win: when the AI emits 3 read_file calls in one turn, they run in parallel rather than sequentially. ### Entry Point ```python async def _execute_tool_calls_concurrently( calls: list[Any], base_dir: str, pre_tool_callback: ..., qa_callback: ..., r_idx: int, provider: str, patch_callback: ... = None, ) -> list[tuple[str, str, str, str]]: # (tool_name, call_id, output, original_name) ... ``` ### Per-Call Worker ```python async def _execute_single_tool_call_async( name: str, args: dict, call_id: str, base_dir: str, pre_tool_callback, qa_callback, r_idx: int, tier: str | None = None, patch_callback = None, ) -> tuple[str, str, str, str]: ... ``` `tier: str | None` is propagated to the comms log and pre-tool callback so audit trails can attribute tool calls to a specific MMA tier (e.g., "Tier 3", "Tier 4"). Thread-local `_local_storage.current_tier` is the source; the parameter is the explicit pass-through. ### Exception Handling If any individual call raises, `asyncio.gather` with `return_exceptions=True` converts the exception to a returned value rather than cancelling siblings. The post-round loop in `_send_*` then formats the error per provider. See [guide_tools.md](guide_tools.md#parallel-tool-execution) for the full implementation pattern and the timing analysis (sequential vs concurrent latency for a typical 3-call round). --- ## RAG Integration `ai_client.send()` accepts an optional `rag_engine` parameter. When supplied, the dispatcher augments `md_content` with RAG-retrieved context before the provider call. ```python def send(md_content, user_message, base_dir=".", file_items=None, ..., rag_engine: Optional[Any] = None) -> str: if rag_engine is not None: retrieved = rag_engine.query(user_message, top_k=5) md_content = _inject_rag_context(md_content, retrieved) ... ``` The RAG engine is **not** owned by `ai_client`; the caller (typically `AppController` for the main discussion flow, or `multi_agent_conductor.run_worker_lifecycle` for Tier 3 workers) is responsible for instantiating and configuring it. This keeps `ai_client` decoupled from any specific retrieval backend (ChromaDB local, external MCP RAG server, or none). **Lifecycle**: - The `AppController` constructs a single `RAGEngine` per project load. - The RAG engine is passed through to `send()` for every AI call. - If a project disables RAG, `rag_engine=None` is passed and the integration is a no-op. - See [guide_rag.md](guide_rag.md) (placeholder; written in Task 10) for the vector store, chunking, and indexing pipeline. --- ## Tier 4 Patch Generation Flow When a Tier 3 worker's test run fails, the engine can request a Tier 4 patch instead of just an error summary. This is a structured diff, not a free-form suggestion. ### Entry Point ```python def run_tier4_patch_generation(error: str, file_context: str) -> str: ... ``` ### Flow 1. Tier 3 worker fails a test; `stderr` is captured by the test runner. 2. The conductor thread calls `run_tier4_patch_callback(stderr, base_dir)` to get a candidate patch. 3. If a patch is generated, the GUI's patch modal (`src/patch_modal.py`) presents the diff for human review. 4. User clicks Apply Patch to resume the pipeline, or Reject to send the worker back for another attempt. 5. The `patch_callback` parameter on `send()` is the Tier 4 hook; it can be `None` for callers that don't support patch generation. ### Threading `run_tier4_patch_generation` calls `send()` with `enable_tools=False` to force a text-only response. The result is parsed as a unified diff. If parsing fails, the modal shows the raw response and the user can manually copy-edit. --- ## Discussion Compression Long discussions accumulate tool outputs and intermediate reasoning that bloat the context. The `run_discussion_compression` function asks the active provider to produce a compressed summary of the discussion so far. ### Entry Point ```python def run_discussion_compression(discussion_text: str) -> str: ... ``` ### Flow 1. Caller (typically the GUI's "Compress Discussion" button or an automatic trigger when history exceeds N tokens) invokes `run_discussion_compression(current_history)`. 2. The function dispatches to the active provider with `enable_tools=False` and a fixed system prompt instructing the model to summarize while preserving key decisions, file paths, and unresolved questions. 3. The returned string replaces the discussion history in subsequent `send()` calls. 4. The original history is archived to the session log (`logs/sessions//comms.log`) for audit. ### Provider Robustness The function tolerates case- and whitespace-variation in the provider string (`" MiniMax "` is normalized to `"minimax"`). This is important because the active provider may be set via different code paths (TOML, env var, runtime override). --- ## Subagent Summarization For very large files, the heuristic `summarise_file` in `src/summarize.py` may be insufficient. The `run_subagent_summarization` function asks the active provider to produce a high-signal summary of a single file using a model call rather than a heuristic. ### Entry Point ```python def run_subagent_summarization(file_path: str, content: str, is_code: bool, outline: str) -> str: ... ``` ### When Invoked - File exceeds the heuristic summary's effective scope (configurable, typically > 5000 lines or > 100KB) - The aggregation strategy in `aggregate.py` is set to `summarize` (rather than `full` or `skeleton`) - The Tier 2 ticket generation explicitly requests a sub-agent summary for a high-priority file ### Flow 1. Caller builds a structured prompt combining the file path, content, an AST outline (if `is_code=True`), and a "summary" instruction. 2. The function dispatches to the active provider with `enable_tools=False`. 3. The returned string is the file's summary, which replaces the full content in the aggregated context. ### Cost vs Quality Trade-off Sub-agent summarization is more expensive than heuristic summarization (one full provider call per file) but produces higher-quality results for complex files. The caller decides based on the project's token budget and quality requirements. --- ## Comms Log System Every API interaction is logged to a module-level list with real-time GUI push: ```python def _append_comms(direction: str, kind: str, payload: dict[str, Any]) -> None: entry = { "ts": datetime.now().strftime("%H:%M:%S"), "direction": direction, # "OUT" (to API) or "IN" (from API) "kind": kind, # "request" | "response" | "tool_call" | "tool_result" "provider": _provider, "model": _model, "payload": payload, } _comms_log.append(entry) if comms_log_callback: comms_log_callback(entry) # Real-time push to GUI ``` --- ## State Machines ### `ai_status` (Informal) ``` "idle" -> "sending..." -> [AI call in progress] -> "running powershell..." -> "powershell done, awaiting AI..." -> "fetching url..." | "searching web..." -> "done" | "error" -> "idle" (on reset) ``` ### HITL Dialog State (Binary per type) - `_pending_dialog is not None` — script confirmation active - `_pending_mma_approval is not None` — MMA step approval active - `_pending_mma_spawn is not None` — spawn approval active - `_pending_ask_dialog == True` — tool ask dialog active --- ## Security: The MCP Allowlist Every filesystem tool (read, list, search, write) is gated by the MCP Bridge (`mcp_client.py`). See [guide_tools.md](guide_tools.md) for the complete security model, tool inventory, and endpoint reference. Summary: Every path is resolved to an absolute path and checked against a dynamically-built allowlist constructed from the project's tracked files and base directories. Files named `history.toml` or `*_history.toml` are hard-blacklisted. --- ## Telemetry & Auditing Every interaction is designed to be auditable: - **JSON-L Comms Logs**: Raw API traffic logged to `logs/sessions//comms.log` for debugging and token cost analysis. - **Tool Call Logs**: Markdown-formatted sequential records to `toolcalls.log`. - **Generated Scripts**: Every PowerShell script that passes through the Execution Clutch is saved to `scripts/generated/_.ps1`. - **API Hook Logs**: All HTTP hook invocations logged to `apihooks.log`. - **CLI Call Logs**: Subprocess execution details (command, stdin, stdout, stderr, latency) to `clicalls.log` as JSON-L. - **Performance Monitor**: Real-time FPS, Frame Time, CPU, Input Lag tracked and queryable via Hook API. ### Telemetry Data Structures ```python # Comms log entry (JSON-L) { "ts": "14:32:05", "direction": "OUT", "kind": "tool_call", "provider": "gemini", "model": "gemini-2.5-flash-lite", "payload": { "name": "run_powershell", "id": "call_abc123", "script": "Get-ChildItem" }, "source_tier": "Tier 3", "local_ts": 1709875925.123 } # Performance metrics (via get_metrics()) { "fps": 60.0, "fps_avg": 58.5, "last_frame_time_ms": 16.67, "frame_time_ms_avg": 17.1, "cpu_percent": 12.5, "cpu_percent_avg": 15.2, "input_lag_ms": 2.3, "input_lag_ms_avg": 3.1, "time_render_mma_dashboard_ms": 5.2, "time_render_mma_dashboard_ms_avg": 4.8 } ``` --- ## MMA Engine Architecture ### WorkerPool: Concurrent Worker Management The `WorkerPool` class in `multi_agent_conductor.py` manages a bounded pool of worker threads: ```python class WorkerPool: def __init__(self, max_workers: int = 4): self.max_workers = max_workers self._active: dict[str, threading.Thread] = {} self._lock = threading.Lock() self._semaphore = threading.Semaphore(max_workers) def spawn(self, ticket_id: str, target: Callable, args: tuple) -> Optional[threading.Thread]: with self._lock: if len(self._active) >= self.max_workers: return None def wrapper(*a, **kw): try: with self._semaphore: target(*a, **kw) finally: with self._lock: self._active.pop(ticket_id, None) t = threading.Thread(target=wrapper, args=args, daemon=True) with self._lock: self._active[ticket_id] = t t.start() return t ``` **Key behaviors**: - **Bounded concurrency**: `max_workers` (default 4) limits parallel ticket execution - **Semaphore gating**: Ensures no more than `max_workers` can execute simultaneously - **Automatic cleanup**: Thread removes itself from `_active` dict on completion - **Non-blocking spawn**: Returns `None` if pool is full, allowing the engine to defer ### ConductorEngine: Orchestration Loop The `ConductorEngine` orchestrates ticket execution within a track: ```python class ConductorEngine: def __init__(self, track: Track, event_queue: Optional[SyncEventQueue] = None, auto_queue: bool = False) -> None: self.track = track self.event_queue = event_queue self.dag = TrackDAG(self.track.tickets) self.engine = ExecutionEngine(self.dag, auto_queue=auto_queue) self.pool = WorkerPool(max_workers=4) self._abort_events: dict[str, threading.Event] = {} self._pause_event = threading.Event() self._tier_usage_lock = threading.Lock() self.tier_usage = { "Tier 1": {"input": 0, "output": 0, "model": "gemini-3.1-pro-preview"}, "Tier 2": {"input": 0, "output": 0, "model": "gemini-3-flash-preview"}, "Tier 3": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite"}, "Tier 4": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite"}, } ``` **Main execution loop** (`run` method): 1. **Pause check**: If `_pause_event` is set, sleep and broadcast "paused" status 2. **DAG tick**: Call `engine.tick()` to get ready tasks 3. **Completion check**: If no ready tasks and all completed, break with "done" status 4. **Wait for workers**: If tasks in-progress or pool active, sleep and continue 5. **Blockage detection**: If no ready, no in-progress, and not all done, break with "blocked" status 6. **Spawn workers**: For each ready task, spawn a worker via `pool.spawn()` 7. **Model escalation**: Workers use `models_list[min(retry_count, 2)]` for capability upgrade on retries ### Abort Event Propagation Each ticket has an associated `threading.Event` for abort signaling: ```python # Before spawning worker self._abort_events[ticket.id] = threading.Event() # Worker checks abort at three points: # 1. Before major work if abort_event.is_set(): ticket.status = "killed" return "ABORTED" # 2. Before tool execution (in clutch_callback) if abort_event.is_set(): return False # Reject tool # 3. After blocking send() returns if abort_event.is_set(): ticket.status = "killed" return "ABORTED" ``` --- ## Architectural Invariants 1. **Single-writer principle**: All GUI state mutations happen on the main thread via `_process_pending_gui_tasks`. Background threads never write GUI state directly. 2. **Copy-and-clear lock pattern**: `_process_pending_gui_tasks` snapshots and clears the task list under the lock, then processes outside the lock. 3. **Context Amnesia**: Each MMA Tier 3 Worker starts with `ai_client.reset_session()`. No conversational bleed between tickets. 4. **Send serialization**: `_send_lock` ensures only one provider call is in-flight at a time across all threads. 5. **Dual-Flush persistence**: On exit, state is committed to both project-level and global-level config files. 6. **No cross-thread GUI mutation**: Background threads must push tasks to `_pending_gui_tasks` rather than calling GUI methods directly. 7. **Abort-before-execution**: Workers check abort events before major work phases, enabling clean cancellation. 8. **Bounded worker pool**: `WorkerPool` enforces `max_workers` limit to prevent resource exhaustion. --- ## Error Classification & Recovery ### ProviderError Taxonomy The `ProviderError` class provides structured error classification: ```python class ProviderError(Exception): def __init__(self, kind: str, provider: str, original: Exception): self.kind = kind # "quota" | "rate_limit" | "auth" | "balance" | "network" | "unknown" self.provider = provider self.original = original def ui_message(self) -> str: labels = { "quota": "QUOTA EXHAUSTED", "rate_limit": "RATE LIMITED", "auth": "AUTH / API KEY ERROR", "balance": "BALANCE / BILLING ERROR", "network": "NETWORK / CONNECTION ERROR", "unknown": "API ERROR", } return f"[{self.provider.upper()} {labels.get(self.kind, 'API ERROR')}]\n\n{self.original}" ``` ### Error Recovery Patterns | Error Kind | Recovery Strategy | |---|---| | `quota` | Display in UI, await user intervention | | `rate_limit` | Exponential backoff (not yet implemented) | | `auth` | Prompt for credential verification | | `balance` | Display billing alert | | `network` | Auto-retry with timeout | | `unknown` | Log full traceback, display in UI | --- ## Memory Management ### History Trimming Strategies **Gemini (40% threshold)**: ```python if total_in > _GEMINI_MAX_INPUT_TOKENS * 0.4: while len(hist) > 4 and total_in > _GEMINI_MAX_INPUT_TOKENS * 0.3: # Drop oldest message pairs hist.pop(0) # Assistant hist.pop(0) # User ``` **Anthropic (180K limit)**: ```python def _trim_anthropic_history(system_blocks, history): est = _estimate_prompt_tokens(system_blocks, history) while len(history) > 3 and est > _ANTHROPIC_MAX_PROMPT_TOKENS: # Drop turn pairs, preserving tool_result chains ... ``` ### Tool Output Budget ```python _MAX_TOOL_OUTPUT_BYTES: int = 500_000 # 500KB cumulative if _cumulative_tool_bytes > _MAX_TOOL_OUTPUT_BYTES: # Inject warning, force final answer parts.append("SYSTEM WARNING: Cumulative tool output exceeded 500KB budget.") ``` ### AST Cache (file_cache.py) ```python _ast_cache: Dict[str, Tuple[float, tree_sitter.Tree]] = {} def get_cached_tree(self, path: Optional[str], code: str) -> tree_sitter.Tree: mtime = p.stat().st_mtime if p.exists() else 0.0 if path in _ast_cache: cached_mtime, tree = _ast_cache[path] if cached_mtime == mtime: return tree # Parse and cache with simple LRU (max 10 entries) if len(_ast_cache) >= 10: del _ast_cache[next(iter(_ast_cache))] tree = self.parse(code) _ast_cache[path] = (mtime, tree) return tree ```