Gitea (and any case-sensitive filesystem) was rendering the [Top]
nav links in /docs as broken because of two bugs:
1. Case-sensitivity: 22 links used '../README.md' (all-uppercase)
but the actual file is 'docs/Readme.md' (capital R, lowercase
rest). 21 guide_*.md nav bars were affected, plus 1 internal
cross-link in Readme.md itself. Works on Windows (case-
insensitive) but broken on Linux/Gitea.
Fix: 22 occurrences across 22 files changed
'../README.md' -> '../Readme.md'
2. Wrong relative-path level: 16 links used '../../conductor/...'
from 'docs/guide_*.md' to reach 'conductor/'. This goes up 2
levels to 'projects/', which doesn't exist. The correct path
from 'docs/guide_*.md' to 'conductor/' is 1 level up
('../conductor/...'). 12 unique patterns across 10 files
affected.
Fix: 16 occurrences across 10 files changed
'../../conductor/' -> '../conductor/'
3. Bonus: 1 planned-guide link in guide_context_curation.md
referenced a never-written 'guide_context_presets.md'. The
ContextPreset schema is now fully covered in the new
'guide_context_aggregation.md' (per the 2026-06-08 docs
refresh). Fix: link target updated.
No content was changed, only link paths. 24 files, 37 link
replacements, 37 deletions.
Verification:
- All .md links in docs/ now resolve to existing files
(validated by path-resolution check from each file's directory)
- The 3 new guides from the previous docs refresh commit
(guide_discussions.md, guide_state_lifecycle.md,
guide_context_aggregation.md) had the case bug inherited from
guide_architecture.md's existing nav pattern; their top-of-file
nav bars are now correct
- The 21 pre-existing guide nav bars that had the same bug
(all 21 of them, except the 3 that used the correct case:
guide_mma.md, guide_simulations.md, guide_tools.md) are now
also fixed
- Inter-guide links (e.g. [Discussions](guide_discussions.md))
were not affected; they were always correct because both the
link text and the actual filename are lowercase
This is a docs-only fix. No code modified.
43 KiB
Architecture
Top | Tools & IPC | MMA Orchestration | Simulations
Philosophy: The Decoupled State Machine
Manual Slop solves a single tension: AI reasoning is high-latency and non-deterministic; GUI interaction must be low-latency and responsive. The engine enforces strict decoupling between four thread domains so that multi-second LLM calls never block the render loop, and every AI-generated payload passes through a human-auditable gate before execution.
The architectural philosophy follows data-oriented design principles:
- The GUI (
gui_2.py,app_controller.py) remains a pure visualization of application state - State mutations occur only through lock-guarded queues consumed on the main render thread
- Background threads never write GUI state directly — they serialize task dicts for later consumption
- All cross-thread communication uses explicit synchronization primitives (Locks, Conditions, Events)
Project Structure
The codebase is organized into a src/ layout to separate implementation from configuration and artifacts.
manual_slop/
├── conductor/ # Conductor tracks, specs, and plans
├── docs/ # Deep-dive architectural documentation
├── logs/ # Session logs, agent traces, and errors
├── scripts/ # Build, migration, and IPC bridge scripts
├── src/ # Core Python implementation
│ ├── ai_client.py # LLM provider abstraction
│ ├── gui_2.py # Main ImGui application
│ ├── mcp_client.py # MCP tool implementation
│ └── ... # Other core modules
├── tests/ # Pytest suite and simulation fixtures
├── simulation/ # Workflow and agent simulation logic
├── sloppy.py # Primary application entry point
├── config.toml # Global application settings
└── manual_slop.toml # Project-specific configuration
Thread Domains
Four distinct thread domains operate concurrently:
| Domain | Created By | Purpose | Lifecycle | Key Synchronization Primitives |
|---|---|---|---|---|
| Main / GUI | immapp.run() |
Dear ImGui retained-mode render loop; sole writer of GUI state | App lifetime | None (consumer of queues) |
| Asyncio Worker | App.__init__ via threading.Thread(daemon=True) |
Event queue processing, AI client calls | Daemon (dies with process) | AsyncEventQueue, threading.Lock |
| HookServer | api_hooks.HookServer.start() |
HTTP API on :8999 for external automation and IPC |
Daemon thread | threading.Lock, threading.Event |
| Ad-hoc | Transient threading.Thread calls |
Model-fetching, legacy send paths, log pruning | Short-lived | Task-specific locks |
The asyncio worker is not the main thread's event loop. It runs a dedicated asyncio.new_event_loop() on its own daemon thread:
# AppController.__init__:
self._loop = asyncio.new_event_loop()
self._loop_thread = threading.Thread(target=self._run_event_loop, daemon=True)
self._loop_thread.start()
# _run_event_loop:
def _run_event_loop(self) -> None:
asyncio.set_event_loop(self._loop)
self._loop.create_task(self._process_event_queue())
self._loop.run_forever()
The GUI thread uses asyncio.run_coroutine_threadsafe(coro, self._loop) to push work into this loop.
Thread-Local Context Isolation
For concurrent multi-agent execution, the application uses threading.local() to manage per-thread context:
# ai_client.py
_local_storage = threading.local()
def get_current_tier() -> Optional[str]:
"""Returns the current tier from thread-local storage."""
return getattr(_local_storage, "current_tier", None)
def set_current_tier(tier: Optional[str]) -> None:
"""Sets the current tier in thread-local storage."""
_local_storage.current_tier = tier
This ensures that comms log entries and tool calls are correctly tagged with their source tier even when multiple workers execute concurrently.
Cross-Thread Data Structures
All cross-thread communication uses one of three patterns:
Pattern A: AsyncEventQueue (GUI -> Asyncio)
# events.py
class AsyncEventQueue:
_queue: asyncio.Queue # holds Tuple[str, Any] items
async def put(self, event_name: str, payload: Any = None) -> None
async def get(self) -> Tuple[str, Any]
The central event bus. Uses asyncio.Queue, so non-asyncio threads must enqueue via asyncio.run_coroutine_threadsafe(). Consumer is App._process_event_queue(), running as a long-lived coroutine on the asyncio loop.
Pattern B: Guarded Lists (Any Thread -> GUI)
Background threads cannot write GUI state directly. They append task dicts to lock-guarded lists; the main thread drains these once per frame:
# App.__init__:
self._pending_gui_tasks: list[dict[str, Any]] = []
self._pending_gui_tasks_lock = threading.Lock()
self._pending_comms: list[dict[str, Any]] = []
self._pending_comms_lock = threading.Lock()
self._pending_tool_calls: list[tuple[str, str, float]] = []
self._pending_tool_calls_lock = threading.Lock()
self._pending_history_adds: list[dict[str, Any]] = []
self._pending_history_adds_lock = threading.Lock()
Additional locks:
self._send_thread_lock = threading.Lock() # Guards send_thread creation
self._pending_dialog_lock = threading.Lock() # Guards _pending_dialog + _pending_actions dict
Pattern C: Condition-Variable Dialogs (Bidirectional Blocking)
Used for Human-in-the-Loop (HITL) approval. Background thread blocks on threading.Condition; GUI thread signals after user action. See the HITL section below.
Event System
Three classes in events.py (89 lines, no external dependencies beyond asyncio and typing):
EventEmitter
class EventEmitter:
_listeners: Dict[str, List[Callable]]
def on(self, event_name: str, callback: Callable) -> None
def emit(self, event_name: str, *args: Any, **kwargs: Any) -> None
Synchronous pub-sub. Callbacks execute in the caller's thread. Used by ai_client.events for lifecycle hooks (request_start, response_received, tool_execution). No thread safety — relies on consistent single-thread usage.
AsyncEventQueue
Described above in Pattern A.
UserRequestEvent
class UserRequestEvent:
prompt: str # User's raw input text
stable_md: str # Generated markdown context (files, screenshots)
file_items: List[Any] # File attachment items for dynamic refresh
disc_text: str # Serialized discussion history
base_dir: str # Working directory for shell commands
def to_dict(self) -> Dict[str, Any]
Pure data carrier. Created on the GUI thread in _handle_generate_send, consumed on the asyncio thread in _handle_request_event.
Application Lifetime
Boot Sequence
The App.__init__ (lines 152-296) follows this precise order:
- Config hydration: Reads
config.toml(global) and<project>.toml(local). Builds the initial "world view" — tracked files, discussion history, active models. - Thread bootstrapping:
- Asyncio event loop thread starts (
_loop_thread). HookServerstarts as a daemon iftest_hooks_enabledor provider isgemini_cli.
- Asyncio event loop thread starts (
- Callback wiring (
_init_ai_and_hooks): Connectsai_client.confirm_and_run_callback,comms_log_callback,tool_log_callbackto GUI handlers. - UI entry: Main thread enters
immapp.run(). GUI is now alive; background threads are ready.
Shutdown Sequence
When immapp.run() returns (user closed window):
hook_server.stop()— shuts down HTTP server, joins thread.perf_monitor.stop().ai_client.cleanup()— destroys server-side API caches (GeminiCachedContent).- Dual-Flush persistence:
_flush_to_project(),_save_active_project(),_flush_to_config(),save_config()— commits state back to both project and global configs. session_logger.close_session().
The asyncio loop thread is a daemon — it dies with the process. App.shutdown() exists for explicit cleanup in test scenarios:
def shutdown(self) -> None:
if self._loop.is_running():
self._loop.call_soon_threadsafe(self._loop.stop)
if self._loop_thread.is_alive():
self._loop_thread.join(timeout=2.0)
The Task Pipeline: Producer-Consumer Synchronization
Request Flow
GUI Thread Asyncio Thread GUI Thread (next frame)
────────── ────────────── ──────────────────────
1. User clicks "Gen + Send"
2. _handle_generate_send():
- Compiles md context
- Creates UserRequestEvent
- Enqueues via
run_coroutine_threadsafe ──> 3. _process_event_queue():
awaits event_queue.get()
routes "user_request" to
_handle_request_event()
4. Configures ai_client
5. ai_client.send() BLOCKS
(seconds to minutes)
6. On completion, enqueues
"response" event back ──> 7. _process_pending_gui_tasks():
Drains task list under lock
Sets ai_response text
Triggers terminal blink
Event Types Routed by _process_event_queue
| Event Name | Action |
|---|---|
"user_request" |
Calls _handle_request_event(payload) — synchronous blocking AI call |
"response" |
Appends {"action": "handle_ai_response", ...} to _pending_gui_tasks |
"mma_state_update" |
Appends {"action": "mma_state_update", ...} to _pending_gui_tasks |
"mma_spawn_approval" |
Appends the raw payload for HITL dialog creation |
"mma_step_approval" |
Appends the raw payload for HITL dialog creation |
The pattern: events arriving on the asyncio thread that need GUI state changes are serialized into _pending_gui_tasks for consumption on the next render frame.
Frame-Sync Mechanism: _process_pending_gui_tasks
Called once per ImGui frame on the main GUI thread. This is the sole safe point for mutating GUI-visible state.
Locking strategy — copy-and-clear:
def _process_pending_gui_tasks(self) -> None:
if not self._pending_gui_tasks:
return
with self._pending_gui_tasks_lock:
tasks = self._pending_gui_tasks[:] # Snapshot
self._pending_gui_tasks.clear() # Release lock fast
for task in tasks:
# Process each task outside the lock
Acquires the lock briefly to snapshot the task list, then processes outside the lock. Minimizes lock contention with producer threads.
Complete Action Type Catalog
| Action | Source | Effect |
|---|---|---|
"refresh_api_metrics" |
asyncio/hooks | Updates API metrics display |
"handle_ai_response" |
asyncio | Sets ai_response, ai_status, mma_streams[stream_id]; triggers blink; optionally auto-adds to discussion history |
"show_track_proposal" |
asyncio | Sets proposed_tracks list, opens modal |
"mma_state_update" |
asyncio | Updates mma_status, active_tier, mma_tier_usage, active_tickets, active_track |
"set_value" |
HookServer | Sets any field in _settable_fields map via setattr; special-cases current_provider/current_model to reconfigure AI client |
"click" |
HookServer | Dispatches to _clickable_actions map; introspects signatures to decide whether to pass user_data |
"select_list_item" |
HookServer | Routes to _switch_discussion() for discussion listbox |
{"type": "ask"} |
HookServer | Opens ask dialog: sets _pending_ask_dialog = True, stores _ask_request_id and _ask_tool_data |
"clear_ask" |
HookServer | Clears ask dialog state if request_id matches |
"custom_callback" |
HookServer | Executes an arbitrary callable with args |
"mma_step_approval" |
asyncio (MMA engine) | Creates MMAApprovalDialog, stores in _pending_mma_approval |
"mma_spawn_approval" |
asyncio (MMA engine) | Creates MMASpawnApprovalDialog, stores in _pending_mma_spawn |
"refresh_from_project" |
HookServer/internal | Reloads all UI state from project dict |
The Execution Clutch: Human-in-the-Loop
The "Execution Clutch" ensures every destructive AI action passes through an auditable human gate. Three dialog types implement this, all sharing the same blocking pattern.
Dialog Classes
ConfirmDialog — PowerShell script execution approval:
class ConfirmDialog:
_uid: str # uuid4 identifier
_script: str # The PowerShell script text (editable)
_base_dir: str # Working directory
_condition: threading.Condition # Blocking primitive
_done: bool # Signal flag
_approved: bool # User's decision
def wait(self) -> tuple[bool, str] # Blocks until _done; returns (approved, script)
MMAApprovalDialog — MMA tier step approval:
class MMAApprovalDialog:
_ticket_id: str
_payload: str # The step payload (editable)
_condition: threading.Condition
_done: bool
_approved: bool
def wait(self) -> tuple[bool, str] # Returns (approved, payload)
MMASpawnApprovalDialog — Sub-agent spawn approval:
class MMASpawnApprovalDialog:
_ticket_id: str
_role: str # tier3-worker, tier4-qa, etc.
_prompt: str # Spawn prompt (editable)
_context_md: str # Context document (editable)
_condition: threading.Condition
_done: bool
_approved: bool
_abort: bool # Can abort entire track
def wait(self) -> dict[str, Any] # Returns {approved, abort, prompt, context_md}
Blocking Flow
Using ConfirmDialog as exemplar:
ASYNCIO THREAD (ai_client tool callback) GUI MAIN THREAD
───────────────────────────────────────── ───────────────
1. ai_client calls _confirm_and_run(script)
2. Creates ConfirmDialog(script, base_dir)
3. Stores dialog:
- Headless: _pending_actions[uid] = dialog
- GUI mode: _pending_dialog = dialog
4. If test_hooks_enabled:
pushes to _api_event_queue
5. dialog.wait() BLOCKS on _condition
6. Next frame: ImGui renders
_pending_dialog in modal
7. User clicks Approve/Reject
8. _handle_approve_script():
with dialog._condition:
dialog._approved = True
dialog._done = True
dialog._condition.notify_all()
9. wait() returns (True, potentially_edited_script)
10. Executes shell_runner.run_powershell()
11. Returns output to ai_client
The _condition.wait(timeout=0.1) uses a 100ms polling interval inside a loop — a polling-with-condition hybrid that ensures the blocking thread wakes periodically.
Resolution Paths
GUI button path (normal interactive use):
_handle_approve_script() / _handle_approve_mma_step() / _handle_approve_spawn() directly manipulate the dialog's condition variable from the GUI thread.
HTTP API path (headless/automation):
resolve_pending_action(action_id, approved) looks up the dialog by UUID in _pending_actions dict (headless) or _pending_dialog (GUI), then signals the condition:
def resolve_pending_action(self, action_id: str, approved: bool) -> bool:
with self._pending_dialog_lock:
if action_id in self._pending_actions:
dialog = self._pending_actions[action_id]
with dialog._condition:
dialog._approved = approved
dialog._done = True
dialog._condition.notify_all()
return True
MMA approval path:
_handle_mma_respond(approved, payload, abort, prompt, context_md) is the unified resolver. It uses a dialog_container — a one-element list [None] used as a mutable reference shared between the MMA engine (which creates the container) and the GUI (which populates it via _process_pending_gui_tasks).
AI Client: Multi-Provider Architecture
ai_client.py operates as a stateful singleton — all provider state is held in module-level globals. There is no class wrapping; the module itself is the abstraction layer.
Module-Level State
_provider: str = "gemini" # "gemini" | "anthropic" | "deepseek" | "gemini_cli" | "minimax"
_model: str = "gemini-2.5-flash-lite"
_temperature: float = 0.0
_top_p: float = 1.0
_max_tokens: int = 8192
_history_trunc_limit: int = 8000 # Char limit for truncating old tool outputs
_send_lock: threading.Lock # Serializes ALL send() calls across providers
Per-provider client objects:
# Gemini (SDK-managed stateful chat)
_gemini_client: genai.Client | None
_gemini_chat: Any # Holds history internally
_gemini_cache: Any # Server-side CachedContent
_gemini_cache_md_hash: str | None # Hash for cache invalidation
_gemini_cache_created_at: float | None # Monotonic time of cache creation
_gemini_cached_file_paths: list[str] # File paths included in the active cache
_GEMINI_CACHE_TTL: int = 3600 # 1-hour; rebuilt at 90% (3240s)
# Anthropic (client-managed history)
_anthropic_client: anthropic.Anthropic | None
_anthropic_history: list[dict] # Mutable [{role, content}, ...]
_anthropic_history_lock: threading.Lock
# DeepSeek (raw HTTP, client-managed history)
_deepseek_client: Any | None
_deepseek_history: list[dict]
_deepseek_history_lock: threading.Lock
# MiniMax (raw HTTP, client-managed history)
_minimax_client: Any | None
_minimax_history: list[dict]
_minimax_history_lock: threading.Lock
# Gemini CLI (adapter wrapper)
_gemini_cli_adapter: GeminiCliAdapter | None
Safety limits:
MAX_TOOL_ROUNDS: int = 10 # Max tool-call loop iterations per send()
_MAX_TOOL_OUTPUT_BYTES: int = 500_000 # 500KB cumulative tool output budget
_ANTHROPIC_CHUNK_SIZE: int = 120_000 # Max chars per system text block
_ANTHROPIC_MAX_PROMPT_TOKENS: int = 180_000 # 200k limit minus headroom
_GEMINI_MAX_INPUT_TOKENS: int = 900_000 # 1M window minus headroom
The send() Dispatcher
def send(md_content, user_message, base_dir=".", file_items=None,
discussion_history="", stream=False,
pre_tool_callback=None, qa_callback=None,
enable_tools=True, stream_callback=None, patch_callback=None,
rag_engine=None) -> str:
with _send_lock:
if _provider == "gemini": return _send_gemini(...)
elif _provider == "gemini_cli": return _send_gemini_cli(...)
elif _provider == "anthropic": return _send_anthropic(...)
elif _provider == "deepseek": return _send_deepseek(..., stream=stream)
elif _provider == "minimax": return _send_minimax(..., stream=stream)
_send_lock serializes all API calls — only one provider call can be in-flight at a time. All providers share the same callback signatures. Return type is always str.
Parameter evolution (newer parameters, may be missing from older docstring mirrors):
enable_tools: bool = True— Per-call gate for the PowerShell + MCP tool set. Tier 4 and certain planning calls passenable_tools=Falseto force text-only responses.stream_callback: Optional[Callable[[str], None]]— Provider-specific streaming sink. The DeepSeek and MiniMax paths invoke this as tokens arrive; other providers deliver the full response after the network round-trip.patch_callback: Optional[Callable[[str, str], Optional[str]]]— Tier 4 patch generation hook. Receives(error_text, file_context)and returns an optional diff. See Tier 4 Patch Generation below.rag_engine: Optional[Any]— When provided, the dispatcher injects RAG-retrieved context intomd_contentbefore the provider call. The RAG engine is owned by the caller (typicallyAppControllerormulti_agent_conductor.run_worker_lifecycle);ai_clientdoes not own its lifecycle. See RAG Integration below.
_send_lock serializes all API calls — only one provider call can be in-flight at a time. All providers share the same callback signatures. Return type is always str.
Provider Comparison
| Aspect | Gemini SDK | Anthropic | DeepSeek | Gemini CLI | MiniMax |
|---|---|---|---|---|---|
| Client | genai.Client |
anthropic.Anthropic |
Raw requests.post |
GeminiCliAdapter (subprocess) |
Raw requests.post (OpenAI-compatible endpoint) |
| History | SDK-managed (_gemini_chat._history) |
Client-managed list | Client-managed list | CLI-managed (session ID) | Client-managed list |
| Caching | Server-side CachedContent with TTL |
Prompt caching via cache_control: ephemeral (4 breakpoints) |
None | None | None |
| Tool format | types.FunctionDeclaration |
JSON Schema dict | Not declared | Same as SDK via adapter | Not declared |
| Tool results | Part.from_function_response(response={"output": ...}) |
{"type": "tool_result", "tool_use_id": ..., "content": ...} |
{"role": "tool", "tool_call_id": ..., "content": ...} |
{"role": "tool", ...} |
{"role": "tool", "tool_call_id": ..., "content": ...} |
| History trimming | In-place at 40% of 900K token estimate | 2-phase: strip stale file refreshes, then drop turn pairs at 180K | None | None | 2-phase: drop turn pairs at 180K (Anthropic-equivalent) |
| Streaming | No | No | Yes | No | Yes |
| Error classifier | _classify_gemini_error |
_classify_anthropic_error |
_classify_deepseek_error |
(inherits Gemini) | _classify_minimax_error |
| Repair hook | (SDK self-heals) | _repair_anthropic_history |
_repair_deepseek_history |
(CLI handles) | _repair_minimax_history |
Tool-Call Loop (common pattern across providers)
All providers follow the same high-level loop, iterated up to MAX_TOOL_ROUNDS + 2 times:
- Send message (or tool results from prior round) to API.
- Extract text response and any function calls.
- Log to comms log; emit events.
- If no function calls or max rounds exceeded: break.
- For each function call:
- If
pre_tool_callbackrejects: return rejection text. - Dispatch to
mcp_client.dispatch()orshell_runner.run_powershell(). - After the last call of this round: run
_reread_file_items()for context refresh. - Truncate tool output at
_history_trunc_limitchars. - Accumulate
_cumulative_tool_bytes.
- If
- If cumulative bytes > 500KB: inject warning.
- Package tool results in provider-specific format; loop.
Context Refresh Mechanism
After the last tool call in each round, _reread_file_items(file_items) checks mtimes of all tracked files:
- For each file item: compare
Path.stat().st_mtimeagainst storedmtime. - If unchanged: pass through as-is.
- If changed: re-read content, store
old_contentfor diffing, updatemtime. - Changed files are diffed via
_build_file_diff_text:- Files <= 200 lines: emit full content.
- Files > 200 lines with
old_content: emitdifflib.unified_diff.
- Diff is appended to the last tool's output as
[SYSTEM: FILES UPDATED]\n\n{diff}. - Stale
[FILES UPDATED]blocks are stripped from older history turns by_strip_stale_file_refreshesto prevent context bloat.
Anthropic Cache Strategy (4-Breakpoint System)
Anthropic allows a maximum of 4 cache_control: ephemeral breakpoints:
| # | Location | Purpose |
|---|---|---|
| 1 | Last block of stable system prompt | Cache base instructions |
| 2 | Last block of context chunks | Cache file context |
| 3 | Last tool definition | Cache tool schema |
| 4 | Second-to-last user message | Cache conversation prefix |
Before placing breakpoint 4, all existing cache_control is stripped from history to prevent exceeding the limit.
Gemini Cache Strategy (Server-Side TTL)
System instruction content is hashed. On each call, a 3-way decision:
- Hash changed: Delete old cache, rebuild with new content.
- Cache age > 90% of TTL: Proactive renewal (delete + rebuild).
cache_created_atis tracked viatime.monotonic()for this check. - No cache exists: Create new
CachedContentif token count >= 2048; otherwise inline.
The active cache's file inclusion set is tracked in _gemini_cached_file_paths: list[str]. On rebuild, the list is replaced atomically. The GUI uses this list to render the "cached files" indicator in the Cache Panel.
Async Tool Execution
Independent tool calls within a single round execute concurrently via asyncio.gather. This is the major latency win: when the AI emits 3 read_file calls in one turn, they run in parallel rather than sequentially.
Entry Point
async def _execute_tool_calls_concurrently(
calls: list[Any],
base_dir: str,
pre_tool_callback: ...,
qa_callback: ...,
r_idx: int,
provider: str,
patch_callback: ... = None,
) -> list[tuple[str, str, str, str]]: # (tool_name, call_id, output, original_name)
...
Per-Call Worker
async def _execute_single_tool_call_async(
name: str, args: dict, call_id: str, base_dir: str,
pre_tool_callback, qa_callback, r_idx: int,
tier: str | None = None,
patch_callback = None,
) -> tuple[str, str, str, str]:
...
tier: str | None is propagated to the comms log and pre-tool callback so audit trails can attribute tool calls to a specific MMA tier (e.g., "Tier 3", "Tier 4"). Thread-local _local_storage.current_tier is the source; the parameter is the explicit pass-through.
Exception Handling
If any individual call raises, asyncio.gather with return_exceptions=True converts the exception to a returned value rather than cancelling siblings. The post-round loop in _send_* then formats the error per provider. See guide_tools.md for the full implementation pattern and the timing analysis (sequential vs concurrent latency for a typical 3-call round).
RAG Integration
ai_client.send() accepts an optional rag_engine parameter. When supplied, the dispatcher augments md_content with RAG-retrieved context before the provider call.
def send(md_content, user_message, base_dir=".", file_items=None, ...,
rag_engine: Optional[Any] = None) -> str:
if rag_engine is not None:
retrieved = rag_engine.query(user_message, top_k=5)
md_content = _inject_rag_context(md_content, retrieved)
...
The RAG engine is not owned by ai_client; the caller (typically AppController for the main discussion flow, or multi_agent_conductor.run_worker_lifecycle for Tier 3 workers) is responsible for instantiating and configuring it. This keeps ai_client decoupled from any specific retrieval backend (ChromaDB local, external MCP RAG server, or none).
Lifecycle:
- The
AppControllerconstructs a singleRAGEngineper project load. - The RAG engine is passed through to
send()for every AI call. - If a project disables RAG,
rag_engine=Noneis passed and the integration is a no-op. - See guide_rag.md (placeholder; written in Task 10) for the vector store, chunking, and indexing pipeline.
Tier 4 Patch Generation Flow
When a Tier 3 worker's test run fails, the engine can request a Tier 4 patch instead of just an error summary. This is a structured diff, not a free-form suggestion.
Entry Point
def run_tier4_patch_generation(error: str, file_context: str) -> str:
...
Flow
- Tier 3 worker fails a test;
stderris captured by the test runner. - The conductor thread calls
run_tier4_patch_callback(stderr, base_dir)to get a candidate patch. - If a patch is generated, the GUI's patch modal (
src/patch_modal.py) presents the diff for human review. - User clicks Apply Patch to resume the pipeline, or Reject to send the worker back for another attempt.
- The
patch_callbackparameter onsend()is the Tier 4 hook; it can beNonefor callers that don't support patch generation.
Threading
run_tier4_patch_generation calls send() with enable_tools=False to force a text-only response. The result is parsed as a unified diff. If parsing fails, the modal shows the raw response and the user can manually copy-edit.
Discussion Compression
Long discussions accumulate tool outputs and intermediate reasoning that bloat the context. The run_discussion_compression function asks the active provider to produce a compressed summary of the discussion so far.
Entry Point
def run_discussion_compression(discussion_text: str) -> str:
...
Flow
- Caller (typically the GUI's "Compress Discussion" button or an automatic trigger when history exceeds N tokens) invokes
run_discussion_compression(current_history). - The function dispatches to the active provider with
enable_tools=Falseand a fixed system prompt instructing the model to summarize while preserving key decisions, file paths, and unresolved questions. - The returned string replaces the discussion history in subsequent
send()calls. - The original history is archived to the session log (
logs/sessions/<id>/comms.log) for audit.
Provider Robustness
The function tolerates case- and whitespace-variation in the provider string (" MiniMax " is normalized to "minimax"). This is important because the active provider may be set via different code paths (TOML, env var, runtime override).
Subagent Summarization
For very large files, the heuristic summarise_file in src/summarize.py may be insufficient. The run_subagent_summarization function asks the active provider to produce a high-signal summary of a single file using a model call rather than a heuristic.
Entry Point
def run_subagent_summarization(file_path: str, content: str, is_code: bool, outline: str) -> str:
...
When Invoked
- File exceeds the heuristic summary's effective scope (configurable, typically > 5000 lines or > 100KB)
- The aggregation strategy in
aggregate.pyis set tosummarize(rather thanfullorskeleton) - The Tier 2 ticket generation explicitly requests a sub-agent summary for a high-priority file
Flow
- Caller builds a structured prompt combining the file path, content, an AST outline (if
is_code=True), and a "summary" instruction. - The function dispatches to the active provider with
enable_tools=False. - The returned string is the file's summary, which replaces the full content in the aggregated context.
Cost vs Quality Trade-off
Sub-agent summarization is more expensive than heuristic summarization (one full provider call per file) but produces higher-quality results for complex files. The caller decides based on the project's token budget and quality requirements.
Comms Log System
Every API interaction is logged to a module-level list with real-time GUI push:
def _append_comms(direction: str, kind: str, payload: dict[str, Any]) -> None:
entry = {
"ts": datetime.now().strftime("%H:%M:%S"),
"direction": direction, # "OUT" (to API) or "IN" (from API)
"kind": kind, # "request" | "response" | "tool_call" | "tool_result"
"provider": _provider,
"model": _model,
"payload": payload,
}
_comms_log.append(entry)
if comms_log_callback:
comms_log_callback(entry) # Real-time push to GUI
State Machines
ai_status (Informal)
"idle" -> "sending..." -> [AI call in progress]
-> "running powershell..." -> "powershell done, awaiting AI..."
-> "fetching url..." | "searching web..."
-> "done" | "error"
-> "idle" (on reset)
HITL Dialog State (Binary per type)
_pending_dialog is not None— script confirmation active_pending_mma_approval is not None— MMA step approval active_pending_mma_spawn is not None— spawn approval active_pending_ask_dialog == True— tool ask dialog active
Security: The MCP Allowlist
Every filesystem tool (read, list, search, write) is gated by the MCP Bridge (mcp_client.py). See guide_tools.md for the complete security model, tool inventory, and endpoint reference.
Summary: Every path is resolved to an absolute path and checked against a dynamically-built allowlist constructed from the project's tracked files and base directories. Files named history.toml or *_history.toml are hard-blacklisted.
Telemetry & Auditing
Every interaction is designed to be auditable:
- JSON-L Comms Logs: Raw API traffic logged to
logs/sessions/<id>/comms.logfor debugging and token cost analysis. - Tool Call Logs: Markdown-formatted sequential records to
toolcalls.log. - Generated Scripts: Every PowerShell script that passes through the Execution Clutch is saved to
scripts/generated/<ts>_<seq>.ps1. - API Hook Logs: All HTTP hook invocations logged to
apihooks.log. - CLI Call Logs: Subprocess execution details (command, stdin, stdout, stderr, latency) to
clicalls.logas JSON-L. - Performance Monitor: Real-time FPS, Frame Time, CPU, Input Lag tracked and queryable via Hook API.
Telemetry Data Structures
# Comms log entry (JSON-L)
{
"ts": "14:32:05",
"direction": "OUT",
"kind": "tool_call",
"provider": "gemini",
"model": "gemini-2.5-flash-lite",
"payload": {
"name": "run_powershell",
"id": "call_abc123",
"script": "Get-ChildItem"
},
"source_tier": "Tier 3",
"local_ts": 1709875925.123
}
# Performance metrics (via get_metrics())
{
"fps": 60.0,
"fps_avg": 58.5,
"last_frame_time_ms": 16.67,
"frame_time_ms_avg": 17.1,
"cpu_percent": 12.5,
"cpu_percent_avg": 15.2,
"input_lag_ms": 2.3,
"input_lag_ms_avg": 3.1,
"time_render_mma_dashboard_ms": 5.2,
"time_render_mma_dashboard_ms_avg": 4.8
}
MMA Engine Architecture
WorkerPool: Concurrent Worker Management
The WorkerPool class in multi_agent_conductor.py manages a bounded pool of worker threads:
class WorkerPool:
def __init__(self, max_workers: int = 4):
self.max_workers = max_workers
self._active: dict[str, threading.Thread] = {}
self._lock = threading.Lock()
self._semaphore = threading.Semaphore(max_workers)
def spawn(self, ticket_id: str, target: Callable, args: tuple) -> Optional[threading.Thread]:
with self._lock:
if len(self._active) >= self.max_workers:
return None
def wrapper(*a, **kw):
try:
with self._semaphore:
target(*a, **kw)
finally:
with self._lock:
self._active.pop(ticket_id, None)
t = threading.Thread(target=wrapper, args=args, daemon=True)
with self._lock:
self._active[ticket_id] = t
t.start()
return t
Key behaviors:
- Bounded concurrency:
max_workers(default 4) limits parallel ticket execution - Semaphore gating: Ensures no more than
max_workerscan execute simultaneously - Automatic cleanup: Thread removes itself from
_activedict on completion - Non-blocking spawn: Returns
Noneif pool is full, allowing the engine to defer
ConductorEngine: Orchestration Loop
The ConductorEngine orchestrates ticket execution within a track:
class ConductorEngine:
def __init__(self, track: Track, event_queue: Optional[SyncEventQueue] = None,
auto_queue: bool = False) -> None:
self.track = track
self.event_queue = event_queue
self.dag = TrackDAG(self.track.tickets)
self.engine = ExecutionEngine(self.dag, auto_queue=auto_queue)
self.pool = WorkerPool(max_workers=4)
self._abort_events: dict[str, threading.Event] = {}
self._pause_event = threading.Event()
self._tier_usage_lock = threading.Lock()
self.tier_usage = {
"Tier 1": {"input": 0, "output": 0, "model": "gemini-3.1-pro-preview"},
"Tier 2": {"input": 0, "output": 0, "model": "gemini-3-flash-preview"},
"Tier 3": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite"},
"Tier 4": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite"},
}
Main execution loop (run method):
- Pause check: If
_pause_eventis set, sleep and broadcast "paused" status - DAG tick: Call
engine.tick()to get ready tasks - Completion check: If no ready tasks and all completed, break with "done" status
- Wait for workers: If tasks in-progress or pool active, sleep and continue
- Blockage detection: If no ready, no in-progress, and not all done, break with "blocked" status
- Spawn workers: For each ready task, spawn a worker via
pool.spawn() - Model escalation: Workers use
models_list[min(retry_count, 2)]for capability upgrade on retries
Abort Event Propagation
Each ticket has an associated threading.Event for abort signaling:
# Before spawning worker
self._abort_events[ticket.id] = threading.Event()
# Worker checks abort at three points:
# 1. Before major work
if abort_event.is_set():
ticket.status = "killed"
return "ABORTED"
# 2. Before tool execution (in clutch_callback)
if abort_event.is_set():
return False # Reject tool
# 3. After blocking send() returns
if abort_event.is_set():
ticket.status = "killed"
return "ABORTED"
Architectural Invariants
-
Single-writer principle: All GUI state mutations happen on the main thread via
_process_pending_gui_tasks. Background threads never write GUI state directly. -
Copy-and-clear lock pattern:
_process_pending_gui_taskssnapshots and clears the task list under the lock, then processes outside the lock. -
Context Amnesia: Each MMA Tier 3 Worker starts with
ai_client.reset_session(). No conversational bleed between tickets. -
Send serialization:
_send_lockensures only one provider call is in-flight at a time across all threads. -
Dual-Flush persistence: On exit, state is committed to both project-level and global-level config files.
-
No cross-thread GUI mutation: Background threads must push tasks to
_pending_gui_tasksrather than calling GUI methods directly. -
Abort-before-execution: Workers check abort events before major work phases, enabling clean cancellation.
-
Bounded worker pool:
WorkerPoolenforcesmax_workerslimit to prevent resource exhaustion.
Error Classification & Recovery
ProviderError Taxonomy
The ProviderError class provides structured error classification:
class ProviderError(Exception):
def __init__(self, kind: str, provider: str, original: Exception):
self.kind = kind # "quota" | "rate_limit" | "auth" | "balance" | "network" | "unknown"
self.provider = provider
self.original = original
def ui_message(self) -> str:
labels = {
"quota": "QUOTA EXHAUSTED",
"rate_limit": "RATE LIMITED",
"auth": "AUTH / API KEY ERROR",
"balance": "BALANCE / BILLING ERROR",
"network": "NETWORK / CONNECTION ERROR",
"unknown": "API ERROR",
}
return f"[{self.provider.upper()} {labels.get(self.kind, 'API ERROR')}]\n\n{self.original}"
Error Recovery Patterns
| Error Kind | Recovery Strategy |
|---|---|
quota |
Display in UI, await user intervention |
rate_limit |
Exponential backoff (not yet implemented) |
auth |
Prompt for credential verification |
balance |
Display billing alert |
network |
Auto-retry with timeout |
unknown |
Log full traceback, display in UI |
Memory Management
History Trimming Strategies
Gemini (40% threshold):
if total_in > _GEMINI_MAX_INPUT_TOKENS * 0.4:
while len(hist) > 4 and total_in > _GEMINI_MAX_INPUT_TOKENS * 0.3:
# Drop oldest message pairs
hist.pop(0) # Assistant
hist.pop(0) # User
Anthropic (180K limit):
def _trim_anthropic_history(system_blocks, history):
est = _estimate_prompt_tokens(system_blocks, history)
while len(history) > 3 and est > _ANTHROPIC_MAX_PROMPT_TOKENS:
# Drop turn pairs, preserving tool_result chains
...
Tool Output Budget
_MAX_TOOL_OUTPUT_BYTES: int = 500_000 # 500KB cumulative
if _cumulative_tool_bytes > _MAX_TOOL_OUTPUT_BYTES:
# Inject warning, force final answer
parts.append("SYSTEM WARNING: Cumulative tool output exceeded 500KB budget.")
AST Cache (file_cache.py)
_ast_cache: Dict[str, Tuple[float, tree_sitter.Tree]] = {}
def get_cached_tree(self, path: Optional[str], code: str) -> tree_sitter.Tree:
mtime = p.stat().st_mtime if p.exists() else 0.0
if path in _ast_cache:
cached_mtime, tree = _ast_cache[path]
if cached_mtime == mtime:
return tree
# Parse and cache with simple LRU (max 10 entries)
if len(_ast_cache) >= 10:
del _ast_cache[next(iter(_ast_cache))]
tree = self.parse(code)
_ast_cache[path] = (mtime, tree)
return tree
See Also
- guide_ai_client.md — The multi-provider LLM client whose dispatch the architecture supports
- guide_app_controller.md — The headless orchestrator that owns all the AppController-owned state
- guide_mma.md — The 4-tier Multi-Model Architecture
- guide_multi_agent_conductor.md — The
multi_agent_conductor.py+dag_engine.pyruntime - guide_context_aggregation.md — The
aggregate.pypipeline; covers thebuild_tier3_contextandbuild_markdown_from_itemsflows referenced in this guide's "Cache Hit Strategy" - guide_discussions.md — The Discussion system; covers the "Discussion Compression" flow documented in this guide
- guide_state_lifecycle.md — Undo/redo and the
App.__getattr__/__setattr__state delegation pattern - guide_hot_reload.md — Hot-reload architecture; the delegation pattern documented here is what makes hot-reload possible
- guide_meta_boundary.md — The Application vs Meta-Tooling distinction
- conductor/tracks/nagent_review_20260608/report.md — Deep-dive comparison of Manual Slop's threading model to nagent's single-process loop pattern; includes the data-oriented + thread-disciplined + GUI-decoupled philosophy in §1 and §5