docs: Complete documentation rewrite at gencpp/VEFontCache reference quality

Rewrites all docs from Gemini's 330-line executive summaries to 1874 lines
of expert-level architectural reference matching the pedagogical depth of
gencpp (Parser_Algo.md, AST_Types.md) and VEFontCache-Odin (guide_architecture.md).

Changes:
- guide_architecture.md: 73 -> 542 lines. Adds inline data structures for all
  dialog classes, cross-thread communication patterns, complete action type
  catalog, provider comparison table, 4-breakpoint Anthropic cache strategy,
  Gemini server-side cache lifecycle, context refresh algorithm.
- guide_tools.md: 66 -> 385 lines. Full 26-tool inventory with parameters,
  3-layer MCP security model walkthrough, all Hook API GET/POST endpoints
  with request/response formats, ApiHookClient method reference, /api/ask
  synchronous HITL protocol, shell runner with env config.
- guide_mma.md: NEW (368 lines). Fills major documentation gap — complete
  Ticket/Track/WorkerContext data structures, DAG engine algorithms (cycle
  detection, topological sort), ConductorEngine execution loop, Tier 2 ticket
  generation, Tier 3 worker lifecycle with context amnesia, token firewalling.
- guide_simulations.md: 64 -> 377 lines. 8-stage Puppeteer simulation
  lifecycle, mock_gemini_cli.py JSON-L protocol, approval automation pattern,
  ASTParser tree-sitter vs stdlib ast comparison, VerificationLogger.
- Readme.md: Rewritten with module map, architecture summary, config examples.
- docs/Readme.md: Proper index with guide contents table and GUI panel docs.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-01 09:44:50 -05:00
parent bf4468f125
commit 08e003a137
6 changed files with 1742 additions and 193 deletions

138
Readme.md
View File

@@ -1,66 +1,128 @@
# Manual Slop
An experimental, high-density AI orchestration engine designed for expert developers. Manual Slop provides a strictly controlled environment for executing complex, multi-tier AI workflows with deterministic human-in-the-loop (HITL) overrides.
A GUI orchestrator for local LLM-driven coding sessions. Manual Slop bridges high-latency AI reasoning with a low-latency ImGui render loop via a thread-safe asynchronous pipeline, ensuring every AI-generated payload passes through a human-auditable gate before execution.
**Tech Stack**: Python 3.11+, Dear PyGui / ImGui, FastAPI, Uvicorn
**Providers**: Gemini API, Anthropic API, DeepSeek, Gemini CLI (headless)
**Platform**: Windows (PowerShell) — single developer, local use
---
## 1. Technical Philosophy
## Architecture at a Glance
Manual Slop is not a chat interface. It is a **Decoupled State Machine** built on the principle that AI reasoning should be observable, mutable, and interruptible. It bridges high-latency AI execution with a low-latency, retained-mode GUI via a thread-safe asynchronous pipeline.
Four thread domains operate concurrently: the ImGui main loop, an asyncio worker for AI calls, a `HookServer` (HTTP on `:8999`) for external automation, and transient threads for model fetching. Background threads never write GUI state directly — they serialize task dicts into lock-guarded lists that the main thread drains once per frame ([details](./docs/guide_architecture.md#the-task-pipeline-producer-consumer-synchronization)).
### Core Features
* **Hierarchical MMA (4-Tier Architecture):** Orchestrate complex tracks using a tiered model (Orchestrator -> Tech Lead -> Worker -> QA) with explicit token firewalling.
* **The Execution Clutch:** A deterministic "gear-shifting" mechanism that pauses execution for human inspection and mutation of AI-generated payloads.
* **MCP-Bridge & Tooling:** Integrated filesystem sandboxing and native search/fetch tools with project-wide security allowlists.
* **Live Simulation Framework:** A robust verification suite using API hooks for automated visual and state assertions.
The **Execution Clutch** suspends the AI execution thread on a `threading.Condition` when a destructive action (PowerShell script, sub-agent spawn) is requested. The GUI renders a modal where the user can read, edit, or reject the payload. On approval, the condition is signaled and execution resumes ([details](./docs/guide_architecture.md#the-execution-clutch-human-in-the-loop)).
The **MMA (Multi-Model Agent)** system decomposes epics into tracks, tracks into DAG-ordered tickets, and executes each ticket with a stateless Tier 3 worker that starts from `ai_client.reset_session()` — no conversational bleed between tickets ([details](./docs/guide_mma.md)).
---
## 2. Deep-Dive Documentation
## Documentation
For expert-level technical details, refer to our specialized guides:
* **[Architectural Technical Reference](./docs/guide_architecture.md):** Deep-dive into thread synchronization, the task pipeline, and the decoupled state machine.
* **[Tooling & IPC Reference](./docs/guide_tools.md):** Specification of the Hook API, MCP bridge, and the HITL communication protocol.
* **[Verification & Simulation Framework](./docs/guide_simulations.md):** Detailed breakdown of the live GUI testing infrastructure and simulation lifecycle.
| Guide | Scope |
|---|---|
| [Architecture](./docs/guide_architecture.md) | Threading model, event system, AI client multi-provider architecture, HITL mechanism, comms logging |
| [Tools & IPC](./docs/guide_tools.md) | MCP Bridge security model, all 26 native tools, Hook API endpoints, ApiHookClient reference, shell runner |
| [MMA Orchestration](./docs/guide_mma.md) | 4-tier hierarchy, Ticket/Track data structures, DAG engine, ConductorEngine execution loop, worker lifecycle |
| [Simulations](./docs/guide_simulations.md) | `live_gui` fixture, Puppeteer pattern, mock provider, visual verification patterns, ASTParser / summarizer |
---
## 3. Setup & Environment
## Module Map
| File | Lines | Role |
|---|---|---|
| `gui_2.py` | ~3080 | Primary ImGui interface — App class, frame-sync, HITL dialogs |
| `ai_client.py` | ~1800 | Multi-provider LLM abstraction (Gemini, Anthropic, DeepSeek, Gemini CLI) |
| `mcp_client.py` | ~870 | 26 MCP tools with filesystem sandboxing and tool dispatch |
| `api_hooks.py` | ~330 | HookServer — REST API for external automation on `:8999` |
| `api_hook_client.py` | ~245 | Python client for the Hook API (used by tests and external tooling) |
| `multi_agent_conductor.py` | ~250 | ConductorEngine — Tier 2 orchestration loop with DAG execution |
| `conductor_tech_lead.py` | ~100 | Tier 2 ticket generation from track briefs |
| `dag_engine.py` | ~100 | TrackDAG (dependency graph) + ExecutionEngine (tick-based state machine) |
| `models.py` | ~100 | Ticket, Track, WorkerContext dataclasses |
| `events.py` | ~89 | EventEmitter, AsyncEventQueue, UserRequestEvent |
| `project_manager.py` | ~300 | TOML config persistence, discussion management, track state |
| `session_logger.py` | ~200 | JSON-L + markdown audit trails (comms, tools, CLI, hooks) |
| `shell_runner.py` | ~100 | PowerShell execution with timeout, env config, QA callback |
| `file_cache.py` | ~150 | ASTParser (tree-sitter) — skeleton and curated views |
| `summarize.py` | ~120 | Heuristic file summaries (imports, classes, functions) |
| `outline_tool.py` | ~80 | Hierarchical code outline via stdlib `ast` |
---
## Setup
### Prerequisites
* Python 3.11+
* [`uv`](https://github.com/astral-sh/uv) for high-speed package management.
- Python 3.11+
- [`uv`](https://github.com/astral-sh/uv) for package management
### Installation
1. Clone the repository.
2. Install dependencies:
```powershell
uv sync
```
3. Configure credentials in `credentials.toml`:
```toml
[gemini]
api_key = "YOUR_KEY"
[anthropic]
api_key = "YOUR_KEY"
```
### Running the Engine
Launch the main GUI application:
```powershell
uv run gui_2.py
git clone <repo>
cd manual_slop
uv sync
```
To enable the Hook API for external telemetry or testing:
### Credentials
Configure in `credentials.toml`:
```toml
[gemini]
api_key = "YOUR_KEY"
[anthropic]
api_key = "YOUR_KEY"
[deepseek]
api_key = "YOUR_KEY"
```
### Running
```powershell
uv run gui_2.py --enable-test-hooks
uv run gui_2.py # Normal mode
uv run gui_2.py --enable-test-hooks # With Hook API on :8999
```
### Running Tests
```powershell
uv run pytest tests/ -v
```
---
## 4. Feature Roadmap (2026)
## Project Configuration
* **DAG-Based Task Execution:** Real-time visual tracking of multi-agent ticket dependencies.
* **Token Budgeting & Throttling:** Granular control over cost and context accumulation per tier.
* **Advanced Simulation Suite:** Expanded visual verification for multi-modal reasoning tracks.
Projects are stored as `<name>.toml` files. The discussion history is split into a sibling `<name>_history.toml` to keep the main config lean.
```toml
[project]
name = "my_project"
git_dir = "./my_repo"
system_prompt = ""
[files]
base_dir = "./my_repo"
paths = ["src/**/*.py", "README.md"]
[screenshots]
base_dir = "./my_repo"
paths = []
[output]
output_dir = "./md_gen"
[gemini_cli]
binary_path = "gemini"
[agent.tools]
run_powershell = true
read_file = true
# ... 26 tool flags
```

View File

@@ -1,59 +1,74 @@
# Manual Slop
# Documentation Index
A GUI orchestrator for local LLM-driven coding sessions, built to prevent the AI from running wild and to provide total transparency into the context and execution state.
[Top](../Readme.md)
## Core Management Panels
---
## Guides
| Guide | Contents |
|---|---|
| [Architecture](guide_architecture.md) | Thread domains, cross-thread data structures, event system, application lifetime, task pipeline (producer-consumer), Execution Clutch (HITL), AI client multi-provider architecture, Anthropic/Gemini caching strategies, context refresh, comms logging, state machines |
| [Tools & IPC](guide_tools.md) | MCP Bridge 3-layer security model, all 26 native tool signatures, Hook API GET/POST endpoints with request/response formats, ApiHookClient method reference, `/api/ask` synchronous HITL protocol, session logging, shell runner |
| [MMA Orchestration](guide_mma.md) | Ticket/Track/WorkerContext data structures, DAG engine (cycle detection, topological sort), ConductorEngine execution loop, Tier 2 ticket generation, Tier 3 worker lifecycle with context amnesia, Tier 4 QA integration, token firewalling, track state persistence |
| [Simulations](guide_simulations.md) | `live_gui` pytest fixture lifecycle, `VerificationLogger`, process cleanup, Puppeteer pattern (8-stage MMA simulation), approval automation, mock provider (`mock_gemini_cli.py`) with JSON-L protocol, visual verification patterns, ASTParser (tree-sitter) vs summarizer (stdlib `ast`) |
---
## GUI Panels
### Projects Panel
The heart of context management.
Configuration and context management. Specifies the Git Directory (for commit tracking) and tracked file paths. Project switching swaps the active file list, discussion history, and settings via `<project>.toml` profiles.
> **Note:** The Config panel has been removed. Output directory and auto-add history settings are now integrated into the Projects and Discussion History panels respectively.
- **Configuration:** You specify the Git Directory (for commit tracking) and a Main Context File (the markdown file containing your project's notes and schema).
- **Word-Wrap Toggle:** Dynamically swaps text rendering in large read-only panels (Responses, Comms Log) between unwrapped (ideal for viewing precise code formatting) and wrapped (ideal for prose).
- **Project Switching:** Switch between different <project>.toml profiles to instantly swap out your entire active file list, discussion history, and settings.
- **Word-Wrap Toggle**: Dynamically swaps text rendering in large read-only panels (Responses, Comms Log) between unwrapped (code formatting) and wrapped (prose).
### Discussion History
Manages your conversational branches, preventing context poisoning across different tasks.
Manages conversational branches to prevent context poisoning across tasks.
- **Discussions Sub-Menu:** Allows you to create separate timelines for different tasks (e.g., "Refactoring Auth" vs. "Adding API Endpoints").
- **Git Commit Tracking:** Clicking "Update Commit" reads HEAD from your project's git directory and stamps the discussion.
- **Entry Management:** Each turn has a Role (User, AI, System). You can toggle entries between **Read** and **Edit** modes, collapse them, or hit [+ Max] to open them in the Global Text Viewer.
- **Auto-Add:** If toggled, anything sent from the "Message" panel and returned to the "Response" panel is automatically appended to the current discussion history.
- **Discussions Sub-Menu**: Create separate timelines for different tasks (e.g., "Refactoring Auth" vs. "Adding API Endpoints").
- **Git Commit Tracking**: "Update Commit" reads HEAD from the project's git directory and stamps the discussion.
- **Entry Management**: Each turn has a Role (User, AI, System). Toggle between Read/Edit modes, collapse entries, or open in the Global Text Viewer via `[+ Max]`.
- **Auto-Add**: When toggled, Message panel sends and Response panel returns are automatically appended to the current discussion.
### Files & Screenshots
Controls what is explicitly fed into the context compiler.
Controls what is fed into the context compiler.
- **Base Dir:** Defines the root for path resolution and tool constraints.
- **Paths:** Explicit files or wildcard globs (e.g., src/**/*.rs).
- When generating a request, full file contents are inlined into the context by default (`summary_only=False`). The AI can also call `get_file_summary` via its MCP tools to get a compact structural view of any file on demand.
## Interaction Panels
- **Base Dir**: Defines the root for path resolution and MCP tool constraints.
- **Paths**: Explicit files or wildcard globs (`src/**/*.rs`).
- Full file contents are inlined by default. The AI can call `get_file_summary` for compact structural views.
### Provider
Switch between API backends (Gemini, Anthropic) on the fly. Clicking "Fetch Models" queries the active provider for the latest model list.
Switches between API backends (Gemini, Anthropic, DeepSeek, Gemini CLI). "Fetch Models" queries the active provider for the latest model list.
### Message & Response
- **Message:** Your input field.
- **Gen + Send:** Compiles the markdown context and dispatches the background thread to the AI.
- **MD Only:** Dry-runs the compiler so you can inspect the generated <project>_00N.md without triggering an API charge.
- **Response:** The read-only output. Flashes green when a new response arrives.
- **Message**: User input field.
- **Gen + Send**: Compiles markdown context and dispatches to the AI via `AsyncEventQueue`.
- **MD Only**: Dry-runs the compiler for context inspection without API cost.
- **Response**: Read-only output; flashes green on new response.
### Global Text Viewer & Script Outputs
- **Last Script Output:** Whenever the AI executes a background script, this window pops up, flashing blue. It contains both the executed script and the stdout/stderr. The `[+ Maximize]` buttons read directly from stored instance variables (`_last_script`, `_last_output`) rather than DPG widget tags, so they work correctly regardless of word-wrap state.
- **Text Viewer:** A large, resizable global popup invoked anytime you click a [+] or [+ Maximize] button in the UI. Used for deep-reading long logs, discussion entries, or script bodies.
- **Confirm Dialog:** The `[+ Maximize]` button in the script approval modal passes the script text directly as `user_data` at button-creation time, so it remains safe to click even after the dialog has been dismissed.
- **Last Script Output**: Pops up (flashing blue) whenever the AI executes a script. Shows both the executed script and stdout/stderr. `[+ Maximize]` reads from stored instance variables, not DPG widget tags, so it works regardless of word-wrap state.
- **Text Viewer**: Large resizable popup invoked by `[+]` / `[+ Maximize]` buttons. For deep-reading long logs, discussion entries, or script bodies.
- **Confirm Dialog**: The `[+ Maximize]` button in the script approval modal passes script text as `user_data` at button-creation time safe to click even after the dialog is dismissed.
## System Prompts
### Tool Calls & Comms History
Provides two text inputs for overriding default instructions:
Real-time display of MCP tool invocations and raw API traffic. Each comms entry: timestamp, direction (OUT/IN), kind, provider, model, payload.
1. **Global:** Applied across every project you load.
2. **Project:** Specific to the active workspace.
These are concatenated onto the strict tool-usage guidelines the agent is initialized with.
### MMA Dashboard
Displays the 4-tier orchestration state: active track, ticket DAG with status indicators, per-tier token usage, output streams. Approval buttons for spawn/step/tool gates.
### System Prompts
Two text inputs for instruction overrides:
1. **Global**: Applied across every project.
2. **Project**: Specific to the active workspace.
Concatenated onto the base tool-usage guidelines.

View File

@@ -1,72 +1,542 @@
# Manual Slop: Architectural Technical Reference
# Architecture
A deep-dive into the asynchronous orchestration, state synchronization, and the "Linear Execution Clutch" of the Manual Slop engine. This document is designed to move the reader from a high-level mental model to a low-level implementation understanding.
[Top](../Readme.md) | [Tools & IPC](guide_tools.md) | [MMA Orchestration](guide_mma.md) | [Simulations](guide_simulations.md)
---
## 1. Philosophy: The Decoupled State Machine
## Philosophy: The Decoupled State Machine
Manual Slop is built on a single, core realization: **AI reasoning is high-latency and non-deterministic, while GUI interaction must be low-latency and responsive.**
To solve this, the engine enforces a strict decoupling between three distinct boundaries:
* **The GUI Boundary (Main Thread):** A retained-mode loop (ImGui) that must never block. It handles visual telemetry and user "Seal of Approval" actions.
* **The AI Boundary (Daemon Threads):** Stateless execution loops that handle the "heavy lifting" of context aggregation, LLM communication, and tool reasoning.
* **The Orchestration Boundary (Asyncio):** A background thread that manages the flow of data between the other two, ensuring thread-safe communication without blocking the UI.
Manual Slop solves a single tension: **AI reasoning is high-latency and non-deterministic; GUI interaction must be low-latency and responsive.** The engine enforces strict decoupling between three thread domains so that multi-second LLM calls never block the render loop, and every AI-generated payload passes through a human-auditable gate before execution.
---
## 2. System Lifetime & Initialization
## Thread Domains
The application lifecycle, managed by `App` in `gui_2.py`, follows a precise sequence to ensure the environment is ready before the first frame:
Four distinct thread domains operate concurrently:
1. **Context Hydration:** The engine reads `config.toml` (global) and `<project>.toml` (local). This builds the initial "world view" of the project—what files are tracked, what the discussion history is, and which AI models are active.
2. **Thread Bootstrapping:**
* The `Asyncio` event loop thread is started (`_loop_thread`).
* The `HookServer` (FastAPI) is started as a daemon to handle IPC.
3. **UI Entry:** The main thread enters `immapp.run()`. At this point, the GUI is "alive," and the background threads are ready to receive tasks.
4. **The Dual-Flush Shutdown:** On exit, the system commits state back to both project and global configs. This ensures that your window positions, active discussions, and even pending tool results are preserved for the next session.
| Domain | Created By | Purpose | Lifecycle |
|---|---|---|---|
| **Main / GUI** | `immapp.run()` | Dear ImGui retained-mode render loop; sole writer of GUI state | App lifetime |
| **Asyncio Worker** | `App.__init__` via `threading.Thread(daemon=True)` | Event queue processing, AI client calls | Daemon (dies with process) |
| **HookServer** | `api_hooks.HookServer.start()` | HTTP API on `:8999` for external automation and IPC | Daemon thread |
| **Ad-hoc** | Transient `threading.Thread` calls | Model-fetching, legacy send paths | Short-lived |
The asyncio worker is **not** the main thread's event loop. It runs a dedicated `asyncio.new_event_loop()` on its own daemon thread:
```python
# App.__init__:
self._loop = asyncio.new_event_loop()
self._loop_thread = threading.Thread(target=self._run_event_loop, daemon=True)
self._loop_thread.start()
# _run_event_loop:
def _run_event_loop(self) -> None:
asyncio.set_event_loop(self._loop)
self._loop.create_task(self._process_event_queue())
self._loop.run_forever()
```
The GUI thread uses `asyncio.run_coroutine_threadsafe(coro, self._loop)` to push work into this loop.
---
## 3. The Task Pipeline: Producer-Consumer Synchronization
## Cross-Thread Data Structures
Because ImGui state cannot be safely modified from a background thread, Manual Slop uses a **Producer-Consumer** model for all updates.
All cross-thread communication uses one of three patterns:
### The Flow of an AI Request
1. **Produce:** When you click "Gen + Send," the GUI thread produces a `UserRequestEvent` and pushes it into the `AsyncEventQueue`.
2. **Consume:** The background `asyncio` loop pops this event and dispatches it to the `ai_client`. The GUI thread remains free to render and respond to other inputs.
3. **Task Backlog:** When the AI responds, the background thread *cannot* update the UI text boxes directly. Instead, it appends a **Task Dictionary** to the `_pending_gui_tasks` list.
4. **Sync:** On every frame, the GUI thread checks this list. If tasks exist, it acquires a lock, clears the list, and executes the updates (e.g., "Set AI response text," "Blink the terminal indicator").
### Pattern A: AsyncEventQueue (GUI -> Asyncio)
```python
# events.py
class AsyncEventQueue:
_queue: asyncio.Queue # holds Tuple[str, Any] items
async def put(self, event_name: str, payload: Any = None) -> None
async def get(self) -> Tuple[str, Any]
```
The central event bus. Uses `asyncio.Queue`, so non-asyncio threads must enqueue via `asyncio.run_coroutine_threadsafe()`. Consumer is `App._process_event_queue()`, running as a long-lived coroutine on the asyncio loop.
### Pattern B: Guarded Lists (Any Thread -> GUI)
Background threads cannot write GUI state directly. They append task dicts to lock-guarded lists; the main thread drains these once per frame:
```python
# App.__init__:
self._pending_gui_tasks: list[dict[str, Any]] = []
self._pending_gui_tasks_lock = threading.Lock()
self._pending_comms: list[dict[str, Any]] = []
self._pending_comms_lock = threading.Lock()
self._pending_tool_calls: list[tuple[str, str, float]] = []
self._pending_tool_calls_lock = threading.Lock()
self._pending_history_adds: list[dict[str, Any]] = []
self._pending_history_adds_lock = threading.Lock()
```
Additional locks:
```python
self._send_thread_lock = threading.Lock() # Guards send_thread creation
self._pending_dialog_lock = threading.Lock() # Guards _pending_dialog + _pending_actions dict
```
### Pattern C: Condition-Variable Dialogs (Bidirectional Blocking)
Used for Human-in-the-Loop (HITL) approval. Background thread blocks on `threading.Condition`; GUI thread signals after user action. See the [HITL section](#the-execution-clutch-human-in-the-loop) below.
---
## 4. The Execution Clutch: Human-In-The-Loop (HITL)
## Event System
The "Execution Clutch" is our answer to the "Black Box" problem of AI. It allows you to shift from automatic execution to a manual, deterministic step-through mode.
Three classes in `events.py` (89 lines, no external dependencies beyond `asyncio` and `typing`):
### How the "Shifting" Works
When the AI requests a destructive action (like running a PowerShell script), the background execution thread is **suspended** using a `threading.Condition`:
### EventEmitter
1. **The Pause:** The thread enters a `.wait()` state. It is physically blocked.
2. **The Modal:** A task is sent to the GUI to open a modal dialog.
3. **The Mutation:** The user can read the script, edit it, or reject it.
4. **The Unleash:** When the user clicks "Approve," the GUI thread updates the shared state and calls `.notify_all()`. The background thread "wakes up," executes the (potentially modified) script, and reports the result back to the AI.
```python
class EventEmitter:
_listeners: Dict[str, List[Callable]]
def on(self, event_name: str, callback: Callable) -> None
def emit(self, event_name: str, *args: Any, **kwargs: Any) -> None
```
Synchronous pub-sub. Callbacks execute in the caller's thread. Used by `ai_client.events` for lifecycle hooks (`request_start`, `response_received`, `tool_execution`). No thread safety — relies on consistent single-thread usage.
### AsyncEventQueue
Described above in Pattern A.
### UserRequestEvent
```python
class UserRequestEvent:
prompt: str # User's raw input text
stable_md: str # Generated markdown context (files, screenshots)
file_items: List[Any] # File attachment items for dynamic refresh
disc_text: str # Serialized discussion history
base_dir: str # Working directory for shell commands
def to_dict(self) -> Dict[str, Any]
```
Pure data carrier. Created on the GUI thread in `_handle_generate_send`, consumed on the asyncio thread in `_handle_request_event`.
---
## 5. Security: The MCP Allowlist
## Application Lifetime
To prevent "hallucinated" file access, every filesystem tool (read, list, search) is gated by the **MCP (Model Context Protocol) Bridge**:
### Boot Sequence
* **Resolution:** Every path requested by the AI is resolved to an absolute path.
* **Checking:** It is verified against the project's `base_dir`. If the AI tries to `read_file("C:/Windows/System32/...")`, the bridge intercepts the call and returns an `ACCESS DENIED` error to the model before the OS is ever touched.
The `App.__init__` (lines 152-296) follows this precise order:
1. **Config hydration**: Reads `config.toml` (global) and `<project>.toml` (local). Builds the initial "world view" — tracked files, discussion history, active models.
2. **Thread bootstrapping**:
- Asyncio event loop thread starts (`_loop_thread`).
- `HookServer` starts as a daemon if `test_hooks_enabled` or provider is `gemini_cli`.
3. **Callback wiring** (`_init_ai_and_hooks`): Connects `ai_client.confirm_and_run_callback`, `comms_log_callback`, `tool_log_callback` to GUI handlers.
4. **UI entry**: Main thread enters `immapp.run()`. GUI is now alive; background threads are ready.
### Shutdown Sequence
When `immapp.run()` returns (user closed window):
1. `hook_server.stop()` — shuts down HTTP server, joins thread.
2. `perf_monitor.stop()`.
3. `ai_client.cleanup()` — destroys server-side API caches (Gemini `CachedContent`).
4. **Dual-Flush persistence**: `_flush_to_project()`, `_save_active_project()`, `_flush_to_config()`, `save_config()` — commits state back to both project and global configs.
5. `session_logger.close_session()`.
The asyncio loop thread is a daemon — it dies with the process. `App.shutdown()` exists for explicit cleanup in test scenarios:
```python
def shutdown(self) -> None:
if self._loop.is_running():
self._loop.call_soon_threadsafe(self._loop.stop)
if self._loop_thread.is_alive():
self._loop_thread.join(timeout=2.0)
```
---
## 6. Telemetry & Auditing
## The Task Pipeline: Producer-Consumer Synchronization
Every interaction in Manual Slop is designed to be auditable:
* **JSON-L Comms Logs:** Raw API traffic is logged for debugging and token cost analysis.
* **Generated Scripts:** Every script that passes through the "Clutch" is saved to `scripts/generated/`.
* **Performance Monitor:** Real-time metrics (FPS, Frame Time, Input Lag) are tracked and can be queried via the Hook API to ensure the UI remains "fluid" under load.
### Request Flow
```
GUI Thread Asyncio Thread GUI Thread (next frame)
────────── ────────────── ──────────────────────
1. User clicks "Gen + Send"
2. _handle_generate_send():
- Compiles md context
- Creates UserRequestEvent
- Enqueues via
run_coroutine_threadsafe ──> 3. _process_event_queue():
awaits event_queue.get()
routes "user_request" to
_handle_request_event()
4. Configures ai_client
5. ai_client.send() BLOCKS
(seconds to minutes)
6. On completion, enqueues
"response" event back ──> 7. _process_pending_gui_tasks():
Drains task list under lock
Sets ai_response text
Triggers terminal blink
```
### Event Types Routed by `_process_event_queue`
| Event Name | Action |
|---|---|
| `"user_request"` | Calls `_handle_request_event(payload)` — synchronous blocking AI call |
| `"response"` | Appends `{"action": "handle_ai_response", ...}` to `_pending_gui_tasks` |
| `"mma_state_update"` | Appends `{"action": "mma_state_update", ...}` to `_pending_gui_tasks` |
| `"mma_spawn_approval"` | Appends the raw payload for HITL dialog creation |
| `"mma_step_approval"` | Appends the raw payload for HITL dialog creation |
The pattern: events arriving on the asyncio thread that need GUI state changes are **serialized into `_pending_gui_tasks`** for consumption on the next render frame.
### Frame-Sync Mechanism: `_process_pending_gui_tasks`
Called once per ImGui frame on the **main GUI thread**. This is the sole safe point for mutating GUI-visible state.
**Locking strategy** — copy-and-clear:
```python
def _process_pending_gui_tasks(self) -> None:
if not self._pending_gui_tasks:
return
with self._pending_gui_tasks_lock:
tasks = self._pending_gui_tasks[:] # Snapshot
self._pending_gui_tasks.clear() # Release lock fast
for task in tasks:
# Process each task outside the lock
```
Acquires the lock briefly to snapshot the task list, then processes outside the lock. Minimizes lock contention with producer threads.
### Complete Action Type Catalog
| Action | Source | Effect |
|---|---|---|
| `"refresh_api_metrics"` | asyncio/hooks | Updates API metrics display |
| `"handle_ai_response"` | asyncio | Sets `ai_response`, `ai_status`, `mma_streams[stream_id]`; triggers blink; optionally auto-adds to discussion history |
| `"show_track_proposal"` | asyncio | Sets `proposed_tracks` list, opens modal |
| `"mma_state_update"` | asyncio | Updates `mma_status`, `active_tier`, `mma_tier_usage`, `active_tickets`, `active_track` |
| `"set_value"` | HookServer | Sets any field in `_settable_fields` map via `setattr`; special-cases `current_provider`/`current_model` to reconfigure AI client |
| `"click"` | HookServer | Dispatches to `_clickable_actions` map; introspects signatures to decide whether to pass `user_data` |
| `"select_list_item"` | HookServer | Routes to `_switch_discussion()` for discussion listbox |
| `{"type": "ask"}` | HookServer | Opens ask dialog: sets `_pending_ask_dialog = True`, stores `_ask_request_id` and `_ask_tool_data` |
| `"clear_ask"` | HookServer | Clears ask dialog state if request_id matches |
| `"custom_callback"` | HookServer | Executes an arbitrary callable with args |
| `"mma_step_approval"` | asyncio (MMA engine) | Creates `MMAApprovalDialog`, stores in `_pending_mma_approval` |
| `"mma_spawn_approval"` | asyncio (MMA engine) | Creates `MMASpawnApprovalDialog`, stores in `_pending_mma_spawn` |
| `"refresh_from_project"` | HookServer/internal | Reloads all UI state from project dict |
---
## The Execution Clutch: Human-in-the-Loop
The "Execution Clutch" ensures every destructive AI action passes through an auditable human gate. Three dialog types implement this, all sharing the same blocking pattern.
### Dialog Classes
**`ConfirmDialog`** — PowerShell script execution approval:
```python
class ConfirmDialog:
_uid: str # uuid4 identifier
_script: str # The PowerShell script text (editable)
_base_dir: str # Working directory
_condition: threading.Condition # Blocking primitive
_done: bool # Signal flag
_approved: bool # User's decision
def wait(self) -> tuple[bool, str] # Blocks until _done; returns (approved, script)
```
**`MMAApprovalDialog`** — MMA tier step approval:
```python
class MMAApprovalDialog:
_ticket_id: str
_payload: str # The step payload (editable)
_condition: threading.Condition
_done: bool
_approved: bool
def wait(self) -> tuple[bool, str] # Returns (approved, payload)
```
**`MMASpawnApprovalDialog`** — Sub-agent spawn approval:
```python
class MMASpawnApprovalDialog:
_ticket_id: str
_role: str # tier3-worker, tier4-qa, etc.
_prompt: str # Spawn prompt (editable)
_context_md: str # Context document (editable)
_condition: threading.Condition
_done: bool
_approved: bool
_abort: bool # Can abort entire track
def wait(self) -> dict[str, Any] # Returns {approved, abort, prompt, context_md}
```
### Blocking Flow
Using `ConfirmDialog` as exemplar:
```
ASYNCIO THREAD (ai_client tool callback) GUI MAIN THREAD
───────────────────────────────────────── ───────────────
1. ai_client calls _confirm_and_run(script)
2. Creates ConfirmDialog(script, base_dir)
3. Stores dialog:
- Headless: _pending_actions[uid] = dialog
- GUI mode: _pending_dialog = dialog
4. If test_hooks_enabled:
pushes to _api_event_queue
5. dialog.wait() BLOCKS on _condition
6. Next frame: ImGui renders
_pending_dialog in modal
7. User clicks Approve/Reject
8. _handle_approve_script():
with dialog._condition:
dialog._approved = True
dialog._done = True
dialog._condition.notify_all()
9. wait() returns (True, potentially_edited_script)
10. Executes shell_runner.run_powershell()
11. Returns output to ai_client
```
The `_condition.wait(timeout=0.1)` uses a 100ms polling interval inside a loop — a polling-with-condition hybrid that ensures the blocking thread wakes periodically.
### Resolution Paths
**GUI button path** (normal interactive use):
`_handle_approve_script()` / `_handle_approve_mma_step()` / `_handle_approve_spawn()` directly manipulate the dialog's condition variable from the GUI thread.
**HTTP API path** (headless/automation):
`resolve_pending_action(action_id, approved)` looks up the dialog by UUID in `_pending_actions` dict (headless) or `_pending_dialog` (GUI), then signals the condition:
```python
def resolve_pending_action(self, action_id: str, approved: bool) -> bool:
with self._pending_dialog_lock:
if action_id in self._pending_actions:
dialog = self._pending_actions[action_id]
with dialog._condition:
dialog._approved = approved
dialog._done = True
dialog._condition.notify_all()
return True
```
**MMA approval path**:
`_handle_mma_respond(approved, payload, abort, prompt, context_md)` is the unified resolver. It uses a `dialog_container` — a one-element list `[None]` used as a mutable reference shared between the MMA engine (which creates the container) and the GUI (which populates it via `_process_pending_gui_tasks`).
---
## AI Client: Multi-Provider Architecture
`ai_client.py` operates as a **stateful singleton** — all provider state is held in module-level globals. There is no class wrapping; the module itself is the abstraction layer.
### Module-Level State
```python
_provider: str = "gemini" # "gemini" | "anthropic" | "deepseek" | "gemini_cli"
_model: str = "gemini-2.5-flash-lite"
_temperature: float = 0.0
_max_tokens: int = 8192
_history_trunc_limit: int = 8000 # Char limit for truncating old tool outputs
_send_lock: threading.Lock # Serializes ALL send() calls across providers
```
Per-provider client objects:
```python
# Gemini (SDK-managed stateful chat)
_gemini_client: genai.Client | None
_gemini_chat: Any # Holds history internally
_gemini_cache: Any # Server-side CachedContent
_gemini_cache_md_hash: int | None # For cache invalidation
_GEMINI_CACHE_TTL: int = 3600 # 1-hour; rebuilt at 90% (3240s)
# Anthropic (client-managed history)
_anthropic_client: anthropic.Anthropic | None
_anthropic_history: list[dict] # Mutable [{role, content}, ...]
_anthropic_history_lock: threading.Lock
# DeepSeek (raw HTTP, client-managed history)
_deepseek_history: list[dict]
_deepseek_history_lock: threading.Lock
# Gemini CLI (adapter wrapper)
_gemini_cli_adapter: GeminiCliAdapter | None
```
Safety limits:
```python
MAX_TOOL_ROUNDS: int = 10 # Max tool-call loop iterations per send()
_MAX_TOOL_OUTPUT_BYTES: int = 500_000 # 500KB cumulative tool output budget
_ANTHROPIC_CHUNK_SIZE: int = 120_000 # Max chars per system text block
_ANTHROPIC_MAX_PROMPT_TOKENS: int = 180_000 # 200k limit minus headroom
_GEMINI_MAX_INPUT_TOKENS: int = 900_000 # 1M window minus headroom
```
### The `send()` Dispatcher
```python
def send(md_content, user_message, base_dir=".", file_items=None,
discussion_history="", stream=False,
pre_tool_callback=None, qa_callback=None) -> str:
with _send_lock:
if _provider == "gemini": return _send_gemini(...)
elif _provider == "gemini_cli": return _send_gemini_cli(...)
elif _provider == "anthropic": return _send_anthropic(...)
elif _provider == "deepseek": return _send_deepseek(..., stream=stream)
```
`_send_lock` serializes all API calls — only one provider call can be in-flight at a time. All providers share the same callback signatures. Return type is always `str`.
### Provider Comparison
| Aspect | Gemini SDK | Anthropic | DeepSeek | Gemini CLI |
|---|---|---|---|---|
| **Client** | `genai.Client` | `anthropic.Anthropic` | Raw `requests.post` | `GeminiCliAdapter` (subprocess) |
| **History** | SDK-managed (`_gemini_chat._history`) | Client-managed list | Client-managed list | CLI-managed (session ID) |
| **Caching** | Server-side `CachedContent` with TTL | Prompt caching via `cache_control: ephemeral` (4 breakpoints) | None | None |
| **Tool format** | `types.FunctionDeclaration` | JSON Schema dict | Not declared | Same as SDK via adapter |
| **Tool results** | `Part.from_function_response(response={"output": ...})` | `{"type": "tool_result", "tool_use_id": ..., "content": ...}` | `{"role": "tool", "tool_call_id": ..., "content": ...}` | `{"role": "tool", ...}` |
| **History trimming** | In-place at 40% of 900K token estimate | 2-phase: strip stale file refreshes, then drop turn pairs at 180K | None | None |
| **Streaming** | No | No | Yes | No |
### Tool-Call Loop (common pattern across providers)
All providers follow the same high-level loop, iterated up to `MAX_TOOL_ROUNDS + 2` times:
1. Send message (or tool results from prior round) to API.
2. Extract text response and any function calls.
3. Log to comms log; emit events.
4. If no function calls or max rounds exceeded: **break**.
5. For each function call:
- If `pre_tool_callback` rejects: return rejection text.
- Dispatch to `mcp_client.dispatch()` or `shell_runner.run_powershell()`.
- After the **last** call of this round: run `_reread_file_items()` for context refresh.
- Truncate tool output at `_history_trunc_limit` chars.
- Accumulate `_cumulative_tool_bytes`.
6. If cumulative bytes > 500KB: inject warning.
7. Package tool results in provider-specific format; loop.
### Context Refresh Mechanism
After the last tool call in each round, `_reread_file_items(file_items)` checks mtimes of all tracked files:
1. For each file item: compare `Path.stat().st_mtime` against stored `mtime`.
2. If unchanged: pass through as-is.
3. If changed: re-read content, store `old_content` for diffing, update `mtime`.
4. Changed files are diffed via `_build_file_diff_text`:
- Files <= 200 lines: emit full content.
- Files > 200 lines with `old_content`: emit `difflib.unified_diff`.
5. Diff is appended to the last tool's output as `[SYSTEM: FILES UPDATED]\n\n{diff}`.
6. Stale `[FILES UPDATED]` blocks are stripped from older history turns by `_strip_stale_file_refreshes` to prevent context bloat.
### Anthropic Cache Strategy (4-Breakpoint System)
Anthropic allows a maximum of 4 `cache_control: ephemeral` breakpoints:
| # | Location | Purpose |
|---|---|---|
| 1 | Last block of stable system prompt | Cache base instructions |
| 2 | Last block of context chunks | Cache file context |
| 3 | Last tool definition | Cache tool schema |
| 4 | Second-to-last user message | Cache conversation prefix |
Before placing breakpoint 4, all existing `cache_control` is stripped from history to prevent exceeding the limit.
### Gemini Cache Strategy (Server-Side TTL)
System instruction content is hashed. On each call, a 3-way decision:
- **Hash changed**: Delete old cache, rebuild with new content.
- **Cache age > 90% of TTL**: Proactive renewal (delete + rebuild).
- **No cache exists**: Create new `CachedContent` if token count >= 2048; otherwise inline.
---
## Comms Log System
Every API interaction is logged to a module-level list with real-time GUI push:
```python
def _append_comms(direction: str, kind: str, payload: dict[str, Any]) -> None:
entry = {
"ts": datetime.now().strftime("%H:%M:%S"),
"direction": direction, # "OUT" (to API) or "IN" (from API)
"kind": kind, # "request" | "response" | "tool_call" | "tool_result"
"provider": _provider,
"model": _model,
"payload": payload,
}
_comms_log.append(entry)
if comms_log_callback:
comms_log_callback(entry) # Real-time push to GUI
```
---
## State Machines
### `ai_status` (Informal)
```
"idle" -> "sending..." -> [AI call in progress]
-> "running powershell..." -> "powershell done, awaiting AI..."
-> "fetching url..." | "searching web..."
-> "done" | "error"
-> "idle" (on reset)
```
### HITL Dialog State (Binary per type)
- `_pending_dialog is not None` — script confirmation active
- `_pending_mma_approval is not None` — MMA step approval active
- `_pending_mma_spawn is not None` — spawn approval active
- `_pending_ask_dialog == True` — tool ask dialog active
---
## Security: The MCP Allowlist
Every filesystem tool (read, list, search, write) is gated by the MCP Bridge (`mcp_client.py`). See [guide_tools.md](guide_tools.md) for the complete security model, tool inventory, and endpoint reference.
Summary: Every path is resolved to an absolute path and checked against a dynamically-built allowlist constructed from the project's tracked files and base directories. Files named `history.toml` or `*_history.toml` are hard-blacklisted.
---
## Telemetry & Auditing
Every interaction is designed to be auditable:
- **JSON-L Comms Logs**: Raw API traffic logged to `logs/sessions/<id>/comms.log` for debugging and token cost analysis.
- **Tool Call Logs**: Markdown-formatted sequential records to `toolcalls.log`.
- **Generated Scripts**: Every PowerShell script that passes through the Execution Clutch is saved to `scripts/generated/<ts>_<seq>.ps1`.
- **API Hook Logs**: All HTTP hook invocations logged to `apihooks.log`.
- **CLI Call Logs**: Subprocess execution details (command, stdin, stdout, stderr, latency) to `clicalls.log` as JSON-L.
- **Performance Monitor**: Real-time FPS, Frame Time, CPU, Input Lag tracked and queryable via Hook API.
---
## Architectural Invariants
1. **Single-writer principle**: All GUI state mutations happen on the main thread via `_process_pending_gui_tasks`. Background threads never write GUI state directly.
2. **Copy-and-clear lock pattern**: `_process_pending_gui_tasks` snapshots and clears the task list under the lock, then processes outside the lock.
3. **Context Amnesia**: Each MMA Tier 3 Worker starts with `ai_client.reset_session()`. No conversational bleed between tickets.
4. **Send serialization**: `_send_lock` ensures only one provider call is in-flight at a time across all threads.
5. **Dual-Flush persistence**: On exit, state is committed to both project-level and global-level config files.

368
docs/guide_mma.md Normal file
View File

@@ -0,0 +1,368 @@
# MMA: 4-Tier Multi-Model Agent Orchestration
[Top](../Readme.md) | [Architecture](guide_architecture.md) | [Tools & IPC](guide_tools.md) | [Simulations](guide_simulations.md)
---
## Overview
The MMA (Multi-Model Agent) system is a hierarchical task decomposition and execution engine. A high-level "epic" is broken into tracks, tracks are decomposed into tickets with dependency relationships, and tickets are executed by stateless workers with human-in-the-loop approval at every destructive boundary.
```
Tier 1: Orchestrator — product alignment, epic → tracks
Tier 2: Tech Lead — track → tickets (DAG), architectural oversight
Tier 3: Worker — stateless TDD implementation per ticket
Tier 4: QA — stateless error analysis, no fixes
```
---
## Data Structures (`models.py`)
### Ticket
The atomic unit of work. All MMA execution revolves around transitioning tickets through their state machine.
```python
@dataclass
class Ticket:
id: str # e.g., "T-001"
description: str # Human-readable task description
status: str # "todo" | "in_progress" | "completed" | "blocked"
assigned_to: str # Tier assignment: "tier3-worker", "tier4-qa"
target_file: Optional[str] = None # File this ticket modifies
context_requirements: List[str] = field() # Files needed for context injection
depends_on: List[str] = field() # Ticket IDs that must complete first
blocked_reason: Optional[str] = None # Why this ticket is blocked
step_mode: bool = False # If True, requires manual approval before execution
def mark_blocked(self, reason: str) -> None # Sets status="blocked", stores reason
def mark_complete(self) -> None # Sets status="completed"
def to_dict(self) -> Dict[str, Any]
@classmethod
def from_dict(cls, data) -> "Ticket"
```
**Status state machine:**
```
todo ──> in_progress ──> completed
| |
v v
blocked blocked
```
### Track
A collection of tickets with a shared goal.
```python
@dataclass
class Track:
id: str # Track identifier
description: str # Track-level brief
tickets: List[Ticket] = field() # Ordered list of tickets
def get_executable_tickets(self) -> List[Ticket]
# Returns all 'todo' tickets whose depends_on are all 'completed'
```
### WorkerContext
```python
@dataclass
class WorkerContext:
ticket_id: str # Which ticket this worker is processing
model_name: str # LLM model to use (e.g., "gemini-2.5-flash-lite")
messages: List[dict] # Conversation history for this worker
```
---
## DAG Engine (`dag_engine.py`)
Two classes: `TrackDAG` (graph) and `ExecutionEngine` (state machine).
### TrackDAG
```python
class TrackDAG:
def __init__(self, tickets: List[Ticket]):
self.tickets = tickets
self.ticket_map = {t.id: t for t in tickets} # O(1) lookup by ID
```
**`get_ready_tasks()`**: Returns tickets where `status == 'todo'` AND all `depends_on` have `status == 'completed'`. Missing dependencies are treated as NOT completed (fail-safe).
**`has_cycle()`**: Classic DFS cycle detection using visited set + recursion stack:
```python
def has_cycle(self) -> bool:
visited = set()
rec_stack = set()
def is_cyclic(ticket_id):
if ticket_id in rec_stack: return True # Back edge = cycle
if ticket_id in visited: return False # Already explored
visited.add(ticket_id)
rec_stack.add(ticket_id)
for neighbor in ticket.depends_on:
if is_cyclic(neighbor): return True
rec_stack.remove(ticket_id)
return False
for ticket in self.tickets:
if ticket.id not in visited:
if is_cyclic(ticket.id): return True
return False
```
**`topological_sort()`**: Calls `has_cycle()` first — raises `ValueError` if cycle found. Standard DFS post-order topological sort. Returns list of ticket ID strings in dependency order.
### ExecutionEngine
```python
class ExecutionEngine:
def __init__(self, dag: TrackDAG, auto_queue: bool = False):
self.dag = dag
self.auto_queue = auto_queue
```
**`tick()`** — the heartbeat. On each call:
1. Queries `dag.get_ready_tasks()` for eligible tickets.
2. If `auto_queue` is enabled: non-`step_mode` tasks are automatically promoted to `in_progress`.
3. `step_mode` tasks remain in `todo` until `approve_task()` is called.
4. Returns the list of ready tasks.
**`approve_task(task_id)`**: Manually transitions `todo``in_progress` if all dependencies are met.
**`update_task_status(task_id, status)`**: Force-sets status (used by workers to mark `completed` or `blocked`).
---
## ConductorEngine (`multi_agent_conductor.py`)
The Tier 2 orchestrator. Owns the execution loop that drives tickets through the DAG.
```python
class ConductorEngine:
def __init__(self, track: Track, event_queue=None, auto_queue=False):
self.track = track
self.event_queue = event_queue
self.tier_usage = {
"Tier 1": {"input": 0, "output": 0},
"Tier 2": {"input": 0, "output": 0},
"Tier 3": {"input": 0, "output": 0},
"Tier 4": {"input": 0, "output": 0},
}
self.dag = TrackDAG(self.track.tickets)
self.engine = ExecutionEngine(self.dag, auto_queue=auto_queue)
```
### State Broadcast (`_push_state`)
On every state change, the engine pushes the full orchestration state to the GUI via `AsyncEventQueue`:
```python
async def _push_state(self, status="running", active_tier=None):
payload = {
"status": status, # "running" | "done" | "blocked"
"active_tier": active_tier, # e.g., "Tier 2 (Tech Lead)", "Tier 3 (Worker): T-001"
"tier_usage": self.tier_usage,
"track": {"id": self.track.id, "title": self.track.description},
"tickets": [asdict(t) for t in self.track.tickets]
}
await self.event_queue.put("mma_state_update", payload)
```
This payload is consumed by the GUI's `_process_pending_gui_tasks` handler for `"mma_state_update"`, which updates `mma_status`, `active_tier`, `mma_tier_usage`, `active_tickets`, and `active_track`.
### Ticket Ingestion (`parse_json_tickets`)
Parses a JSON array of ticket dicts (from Tier 2 LLM output) into `Ticket` objects, appends to `self.track.tickets`, then rebuilds the `TrackDAG` and `ExecutionEngine`.
### Main Execution Loop (`run`)
```python
async def run(self):
while True:
ready_tasks = self.engine.tick()
if not ready_tasks:
if all tickets completed:
await self._push_state("done")
break
if any in_progress:
await asyncio.sleep(1) # Waiting for async workers
continue
else:
await self._push_state("blocked")
break
for ticket in ready_tasks:
if in_progress or (auto_queue and not step_mode):
ticket.status = "in_progress"
await self._push_state("running", f"Tier 3 (Worker): {ticket.id}")
# Create worker context
context = WorkerContext(
ticket_id=ticket.id,
model_name="gemini-2.5-flash-lite",
messages=[]
)
# Execute in thread pool (blocking AI call)
await loop.run_in_executor(
None, run_worker_lifecycle, ticket, context, ...
)
await self._push_state("running", "Tier 2 (Tech Lead)")
elif todo and (step_mode or not auto_queue):
await self._push_state("running", f"Awaiting Approval: {ticket.id}")
await asyncio.sleep(1) # Pause for HITL approval
```
---
## Tier 2: Tech Lead (`conductor_tech_lead.py`)
The Tier 2 AI call converts a high-level Track brief into discrete Tier 3 tickets.
### `generate_tickets(track_brief, module_skeletons) -> list[dict]`
```python
def generate_tickets(track_brief: str, module_skeletons: str) -> list[dict]:
system_prompt = mma_prompts.PROMPTS.get("tier2_sprint_planning")
user_message = (
f"### TRACK BRIEF:\n{track_brief}\n\n"
f"### MODULE SKELETONS:\n{module_skeletons}\n\n"
"Please generate the implementation tickets for this track."
)
# Temporarily override system prompt
old_system_prompt = ai_client._custom_system_prompt
ai_client.set_custom_system_prompt(system_prompt)
try:
response = ai_client.send(md_content="", user_message=user_message)
# Multi-layer JSON extraction:
# 1. Try ```json ... ``` blocks
# 2. Try ``` ... ``` blocks
# 3. Regex search for [ { ... } ] pattern
tickets = json.loads(json_match)
return tickets
finally:
ai_client.set_custom_system_prompt(old_system_prompt)
```
The JSON extraction is defensive — handles markdown code fences, bare JSON, and regex fallback for embedded arrays.
### `topological_sort(tickets: list[dict]) -> list[dict]`
Convenience wrapper: converts raw dicts to `Ticket` objects, builds a `TrackDAG`, calls `dag.topological_sort()`, returns the original dicts reordered by sorted IDs.
---
## Tier 3: Worker Lifecycle (`run_worker_lifecycle`)
This free function executes a single ticket. Key behaviors:
### Context Amnesia
```python
ai_client.reset_session() # Each ticket starts with a clean slate
```
No conversational bleed between tickets. Every worker is stateless.
### Context Injection
For `context_requirements` files:
- First file: `parser.get_curated_view(content)` — full skeleton with `@core_logic` and `[HOT]` bodies preserved.
- Subsequent files: `parser.get_skeleton(content)` — cheaper, signatures + docstrings only.
### Prompt Construction
```python
user_message = (
f"You are assigned to Ticket {ticket.id}.\n"
f"Task Description: {ticket.description}\n"
f"\nContext Files:\n{context_injection}\n"
"Please complete this task. If you are blocked and cannot proceed, "
"start your response with 'BLOCKED' and explain why."
)
```
### HITL Clutch Integration
If `event_queue` is provided, `confirm_spawn()` is called before executing, allowing the user to:
- Read the prompt and context.
- Edit both the prompt and context markdown.
- Approve, reject, or abort the entire track.
The `confirm_spawn` function uses the `dialog_container` pattern:
1. Create `dialog_container = [None]` (mutable container for thread communication).
2. Push `"mma_spawn_approval"` task to event queue with the container.
3. Poll `dialog_container[0]` every 100ms for up to 60 seconds.
4. When the GUI fills in the dialog, call `.wait()` to get the result.
5. Returns `(approved, modified_prompt, modified_context)`.
---
## Tier 4: QA Error Analysis
Stateless error analysis. Invoked via the `qa_callback` parameter in `shell_runner.run_powershell()` when a command fails.
```python
def run_tier4_analysis(error_message: str) -> str:
"""Stateless Tier 4 QA analysis of an error message."""
# Uses a dedicated system prompt for error triage
# Returns analysis text (root cause, suggested fix)
# Does NOT modify any code — analysis only
```
Integrated directly into the shell execution pipeline: if `qa_callback` is provided and the command has non-zero exit or stderr output, the callback result is appended to the tool output as `QA ANALYSIS:\n<result>`.
---
## Cross-System Data Flow
The full MMA lifecycle from epic to completion:
1. **Tier 1 (Orchestrator)**: User enters an epic description in the GUI. Creates a `Track` with a brief.
2. **Tier 2 (Tech Lead)**: `conductor_tech_lead.generate_tickets()` calls `ai_client.send()` with the `tier2_sprint_planning` prompt, producing a JSON ticket list.
3. **Ingestion**: `ConductorEngine.parse_json_tickets()` ingests the JSON, builds `Ticket` objects, constructs `TrackDAG` + `ExecutionEngine`.
4. **Execution loop**: `ConductorEngine.run()` enters the async loop, calling `engine.tick()` each iteration.
5. **Worker dispatch**: For each ready ticket, `run_worker_lifecycle()` is called in a thread executor. It uses `ai_client.send()` with MCP tools (dispatched through `mcp_client.dispatch()`).
6. **Security enforcement**: MCP tools enforce the allowlist via `_resolve_and_check()` on every filesystem operation.
7. **State broadcast**: `_push_state()``AsyncEventQueue` → GUI renders DAG + ticket status.
8. **External visibility**: `ApiHookClient.get_mma_status()` queries the Hook API for the full orchestration state.
9. **HITL gates**: `confirm_spawn()` pushes to event queue → GUI renders dialog → user approves/edits → `dialog_container[0].wait()` returns the decision.
---
## Token Firewalling
Each tier operates within its own token budget:
- **Tier 3 workers** use lightweight models (default: `gemini-2.5-flash-lite`) and receive only the files listed in `context_requirements`.
- **Context Amnesia** ensures no accumulated history bleeds between tickets.
- **Tier 2** tracks cumulative `tier_usage` per tier: `{"input": N, "output": N}` for token cost monitoring.
- **First file vs subsequent files**: The first `context_requirements` file gets a curated view (preserving hot paths); subsequent files get only skeletons.
---
## Track State Persistence
Track state can be persisted to disk via `project_manager.py`:
```
conductor/tracks/<track_id>/
spec.md # Track specification (human-authored)
plan.md # Implementation plan with checkbox tasks
metadata.json # Track metadata (id, type, status, timestamps)
state.toml # Structured TrackState with task list
```
`project_manager.get_all_tracks(base_dir)` scans the tracks directory with a three-tier metadata fallback:
1. `state.toml` (structured `TrackState`) — counts tasks with `status == "completed"`.
2. `metadata.json` (legacy) — gets id/title/status only.
3. `plan.md` (regex) — counts `- [x]` vs `- [ ]` checkboxes for progress.

View File

@@ -1,63 +1,377 @@
# Manual Slop: Verification & Simulation Framework
# Verification & Simulation Framework
Detailed specification of the live GUI testing infrastructure, simulation lifecycle, and the mock provider strategy.
[Top](../Readme.md) | [Architecture](guide_architecture.md) | [Tools & IPC](guide_tools.md) | [MMA Orchestration](guide_mma.md)
---
## 1. Live GUI Verification Infrastructure
To verify complex UI state and asynchronous interactions, Manual Slop employs a **Live Verification** strategy using the application's built-in API hooks.
## Infrastructure
### `--enable-test-hooks`
When launched with this flag, the application starts the `HookServer` on port `8999`, exposing its internal state to external HTTP requests. This is the foundation for all automated visual verification.
When launched with this flag, the application starts the `HookServer` on port `8999`, exposing its internal state to external HTTP requests. This is the foundation for all automated verification. Without this flag, the Hook API is only available when the provider is `gemini_cli`.
### The `live_gui` pytest Fixture
Defined in `tests/conftest.py`, this session-scoped fixture manages the lifecycle of the application under test:
1. **Startup:** Spawns `gui_2.py` in a separate process with `--enable-test-hooks`.
2. **Telemetry:** Polls `/status` until the hook server is ready.
3. **Isolation:** Resets the AI session and clears comms logs between tests to prevent state pollution.
4. **Teardown:** Robustly kills the process tree on completion or failure.
Defined in `tests/conftest.py`, this session-scoped fixture manages the lifecycle of the application under test.
**Spawning:**
```python
@pytest.fixture(scope="session")
def live_gui() -> Generator[tuple[subprocess.Popen, str], None, None]:
process = subprocess.Popen(
["uv", "run", "python", "-u", gui_script, "--enable-test-hooks"],
stdout=log_file, stderr=log_file, text=True,
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if os.name == 'nt' else 0
)
```
- **`-u` flag**: Disables output buffering for real-time log capture.
- **Process group**: On Windows, uses `CREATE_NEW_PROCESS_GROUP` so the entire tree (GUI + child processes) can be killed cleanly.
- **Logging**: Stdout/stderr redirected to `logs/gui_2_py_test.log`.
**Readiness polling:**
```python
max_retries = 15 # seconds
while time.time() - start_time < max_retries:
response = requests.get("http://127.0.0.1:8999/status", timeout=0.5)
if response.status_code == 200:
ready = True; break
if process.poll() is not None: break # Process died early
time.sleep(0.5)
```
Polls `GET /status` every 500ms for up to 15 seconds. Checks `process.poll()` each iteration to detect early crashes (avoids waiting the full timeout if the GUI exits). Pre-check: tests if port 8999 is already occupied.
**Failure path:** If the hook server never responds, kills the process tree and calls `pytest.fail()` to abort the entire test session. Diagnostic telemetry (startup time, PID, success/fail) is written via `VerificationLogger`.
**Teardown:**
```python
finally:
client = ApiHookClient()
client.reset_session() # Clean GUI state before killing
time.sleep(0.5)
kill_process_tree(process.pid)
log_file.close()
```
Sends `reset_session()` via `ApiHookClient` before killing to prevent stale state files.
**Yield value:** `(process: subprocess.Popen, gui_script: str)`.
### Session Isolation
```python
@pytest.fixture(autouse=True)
def reset_ai_client() -> Generator[None, None, None]:
ai_client.reset_session()
ai_client.set_provider("gemini", "gemini-2.5-flash-lite")
yield
```
Runs automatically before every test. Resets the `ai_client` module state and defaults to a safe model, preventing state pollution between tests.
### Process Cleanup
```python
def kill_process_tree(pid: int | None) -> None:
```
- **Windows**: `taskkill /F /T /PID <pid>` — force-kills the process and all children (`/T` is critical since the GUI spawns child processes).
- **Unix**: `os.killpg(os.getpgid(pid), SIGKILL)` to kill the entire process group.
### VerificationLogger
Structured diagnostic logging for test telemetry:
```python
class VerificationLogger:
def __init__(self, test_name: str, script_name: str):
self.logs_dir = Path(f"logs/test/{datetime.now().strftime('%Y%m%d_%H%M%S')}")
def log_state(self, field: str, before: Any, after: Any, delta: Any = None)
def finalize(self, description: str, status: str, result_msg: str)
```
Output format: fixed-width column table (`Field | Before | After | Delta`) written to `logs/test/<timestamp>/<script_name>.txt`. Dual output: file + tagged stdout lines for CI visibility.
---
## 2. Simulation Lifecycle: The "Puppeteer" Pattern
## Simulation Lifecycle: The "Puppeteer" Pattern
Simulations (like `tests/visual_sim_mma_v2.py`) act as a "Puppeteer," driving the GUI through the `ApiHookClient`.
Simulations act as external puppeteers, driving the GUI through the `ApiHookClient` HTTP interface. The canonical example is `tests/visual_sim_mma_v2.py`.
### Phase 1: Environment Setup
* **Provider Mocking:** The simulation sets the `current_provider` to `gemini_cli` and redirects the `gcli_path` to a mock script (e.g., `tests/mock_gemini_cli.py`).
* **Workspace Isolation:** The `files_base_dir` is pointed to a temporary artifacts directory to prevent accidental modification of the host project.
### Stage 1: Mock Provider Setup
### Phase 2: User Interaction Loop
The simulation replicates a human workflow by invoking client methods:
1. `client.set_value('mma_epic_input', '...')`: Injects the epic description.
2. `client.click('btn_mma_plan_epic')`: Triggers the orchestration engine.
```python
client = ApiHookClient()
client.set_value('current_provider', 'gemini_cli')
mock_cli_path = f'{sys.executable} {os.path.abspath("tests/mock_gemini_cli.py")}'
client.set_value('gcli_path', mock_cli_path)
client.set_value('files_base_dir', 'tests/artifacts/temp_workspace')
client.click('btn_project_save')
```
### Phase 3: Polling & Assertion
Because AI orchestration is asynchronous, simulations use a **Polling with Multi-Modal Approval** loop:
* **State Polling:** The script polls `client.get_mma_status()` in a loop.
* **Auto-Approval:** If the status indicates a pending tool or spawn request, the simulation automatically clicks the approval buttons (`btn_approve_spawn`, `btn_approve_tool`).
* **Verification:** Once the expected state (e.g., "Mock Goal 1" appears in the track list) is detected, the simulation proceeds to the next phase or asserts success.
- Switches the GUI's LLM provider to `gemini_cli` (the CLI adapter).
- Points the CLI binary to `python tests/mock_gemini_cli.py` — all LLM calls go to the mock.
- Redirects `files_base_dir` to a temp workspace to prevent polluting real project directories.
- Saves the project configuration.
### Stage 2: Epic Planning
```python
client.set_value('mma_epic_input', 'Develop a new feature')
client.click('btn_mma_plan_epic')
```
Enters an epic description and triggers planning. The GUI invokes the LLM (which hits the mock).
### Stage 3: Poll for Proposed Tracks (60s timeout)
```python
for _ in range(60):
status = client.get_mma_status()
if status.get('pending_mma_spawn_approval'): client.click('btn_approve_spawn')
elif status.get('pending_mma_step_approval'): client.click('btn_approve_mma_step')
elif status.get('pending_tool_approval'): client.click('btn_approve_tool')
if status.get('proposed_tracks') and len(status['proposed_tracks']) > 0: break
time.sleep(1)
```
The **approval automation** is a critical pattern repeated in every polling loop. The MMA engine has three approval gates:
- **Spawn approval**: Permission to create a new worker subprocess.
- **Step approval**: Permission to proceed with the next orchestration step.
- **Tool approval**: Permission to execute a tool call.
All three are auto-approved by clicking the corresponding button. Without this, the engine would block indefinitely at each gate.
### Stage 4: Accept Tracks
```python
client.click('btn_mma_accept_tracks')
```
### Stage 5: Poll for Tracks Populated (30s timeout)
Waits until `status['tracks']` contains a track with `'Mock Goal 1'` in its title.
### Stage 6: Load Track and Verify Tickets (60s timeout)
```python
client.click('btn_mma_load_track', user_data=track_id_to_load)
```
Then polls until:
- `active_track` matches the loaded track ID.
- `active_tickets` list is non-empty.
### Stage 7: Verify MMA Status Transitions (120s timeout)
Polls until `mma_status == 'running'` or `'done'`. Continues auto-approving all gates.
### Stage 8: Verify Worker Output in Streams (60s timeout)
```python
streams = status.get('mma_streams', {})
if any("Tier 3" in k for k in streams.keys()):
tier3_key = [k for k in streams.keys() if "Tier 3" in k][0]
if "SUCCESS: Mock Tier 3 worker" in streams[tier3_key]:
streams_found = True
```
Verifies that `mma_streams` contains a key with "Tier 3" and the value contains the exact mock output string.
### Assertions Summary
1. Mock provider setup succeeds (try/except with `pytest.fail`).
2. `proposed_tracks` appears within 60 seconds.
3. `'Mock Goal 1'` track exists in tracks list within 30 seconds.
4. Track loads and `active_tickets` populate within 60 seconds.
5. MMA status becomes `'running'` or `'done'` within 120 seconds.
6. Tier 3 worker output with specific mock content appears in `mma_streams` within 60 seconds.
---
## 3. Mock Provider Strategy
To test the 4-Tier MMA hierarchy without incurring API costs or latency, Manual Slop uses a **Script-Based Mocking** strategy via the `gemini_cli` adapter.
## Mock Provider Strategy
### `tests/mock_gemini_cli.py`
This script simulates the behavior of the `gemini` CLI by:
1. **Input Parsing:** Reading the system prompt and user message from the environment/stdin.
2. **Deterministic Response:** Returning pre-defined JSON payloads (e.g., track definitions, worker implementation scripts) based on keywords in the prompt.
3. **Tool Simulation:** Mimicking function-call responses to trigger the "Execution Clutch" within the GUI.
A fake Gemini CLI executable that replaces the real `gemini` binary during integration tests. Outputs JSON-L messages matching the real CLI's streaming output protocol.
**Input mechanism:**
```python
prompt = sys.stdin.read() # Primary: prompt via stdin
sys.argv # Secondary: management command detection
os.environ.get('GEMINI_CLI_HOOK_CONTEXT') # Tertiary: environment variable
```
**Management command bypass:**
```python
if len(sys.argv) > 1 and sys.argv[1] in ["mcp", "extensions", "skills", "hooks"]:
return # Silent exit
```
**Response routing** — keyword matching on stdin content:
| Prompt Contains | Response | Session ID |
|---|---|---|
| `'PATH: Epic Initialization'` | Two mock Track objects (`mock-track-1`, `mock-track-2`) | `mock-session-epic` |
| `'PATH: Sprint Planning'` | Two mock Ticket objects (`mock-ticket-1` independent, `mock-ticket-2` depends on `mock-ticket-1`) | `mock-session-sprint` |
| `'"role": "tool"'` or `'"tool_call_id"'` | Success message (simulates post-tool-call final answer) | `mock-session-final` |
| Default (Tier 3 worker prompts) | `"SUCCESS: Mock Tier 3 worker implemented the change. [MOCK OUTPUT]"` | `mock-session-default` |
**Output protocol** — every response is exactly two JSON-L lines:
```json
{"type": "message", "role": "assistant", "content": "<response>"}
{"type": "result", "status": "success", "stats": {"total_tokens": N, ...}, "session_id": "mock-session-*"}
```
This matches the real Gemini CLI's streaming output format. `flush=True` on every `print()` ensures the consuming process receives data immediately.
**Tool call simulation:** The mock does **not** emit tool calls. It detects tool results in the prompt (`'"role": "tool"'` check) and responds with a final answer, simulating the second turn of a tool-call conversation without actually issuing calls.
**Debug output:** All debug information goes to stderr, keeping stdout clean for the JSON-L protocol.
---
## 4. Visual Verification Examples
## Visual Verification Patterns
Tests in this framework don't just check return values; they verify the **rendered state** of the application:
* **DAG Integrity:** Verifying that `active_tickets` in the MMA status matches the expected task graph.
* **Stream Telemetry:** Checking `mma_streams` to ensure that output from multiple tiers is correctly captured and displayed in the terminal.
* **Modal State:** Asserting that the correct dialog (e.g., `ConfirmDialog`) is active during a pending tool call.
Tests in this framework don't just check return values they verify the **rendered state** of the application via the Hook API.
By combining these techniques, Manual Slop achieves a level of verification rigor usually reserved for high-stakes embedded systems or complex graphics engines.
### DAG Integrity
Verify that `active_tickets` in the MMA status matches the expected task graph:
```python
status = client.get_mma_status()
tickets = status.get('active_tickets', [])
assert len(tickets) >= 2
assert any(t['id'] == 'mock-ticket-1' for t in tickets)
```
### Stream Telemetry
Check `mma_streams` to ensure output from multiple tiers is correctly captured and routed:
```python
streams = status.get('mma_streams', {})
tier3_keys = [k for k in streams.keys() if "Tier 3" in k]
assert len(tier3_keys) > 0
assert "SUCCESS" in streams[tier3_keys[0]]
```
### Modal State
Assert that the correct dialog is active during a pending tool call:
```python
status = client.get_mma_status()
assert status.get('pending_tool_approval') == True
# or
diag = client.get_indicator_state('thinking')
assert diag.get('thinking') == True
```
### Performance Monitoring
Verify UI responsiveness under load:
```python
perf = client.get_performance()
assert perf['fps'] > 30
assert perf['input_lag_ms'] < 100
```
---
## Supporting Analysis Modules
### `file_cache.py` — ASTParser (tree-sitter)
```python
class ASTParser:
def __init__(self, language: str = "python"):
self.language = tree_sitter.Language(tree_sitter_python.language())
self.parser = tree_sitter.Parser(self.language)
def parse(self, code: str) -> tree_sitter.Tree
def get_skeleton(self, code: str) -> str
def get_curated_view(self, code: str) -> str
```
**`get_skeleton` algorithm:**
1. Parse code to tree-sitter AST.
2. Walk all `function_definition` nodes.
3. For each body (`block` node):
- If first non-comment child is a docstring: preserve docstring, replace rest with `...`.
- Otherwise: replace entire body with `...`.
4. Apply edits in reverse byte order (maintains valid offsets).
**`get_curated_view` algorithm:**
Enhanced skeleton that preserves bodies under two conditions:
- Function has `@core_logic` decorator.
- Function body contains a `# [HOT]` comment anywhere in its descendants.
If either condition is true, the body is preserved verbatim. This enables a two-tier code view: hot paths shown in full, boilerplate compressed.
### `summarize.py` — Heuristic File Summaries
Token-efficient structural descriptions without AI calls:
```python
_SUMMARISERS: dict[str, Callable] = {
".py": _summarise_python, # imports, classes, methods, functions, constants
".toml": _summarise_toml, # table keys + array lengths
".md": _summarise_markdown, # h1-h3 headings
".ini": _summarise_generic, # line count + preview
}
```
**`_summarise_python`** uses stdlib `ast`:
1. Parse with `ast.parse()`.
2. Extract deduplicated imports (top-level module names only).
3. Extract `ALL_CAPS` constants (both `Assign` and `AnnAssign`).
4. Extract classes with their method names.
5. Extract top-level function names.
Output:
```
**Python** — 150 lines
imports: ast, json, pathlib
constants: TIMEOUT_SECONDS
class ASTParser: __init__, parse, get_skeleton
functions: summarise_file, build_summary_markdown
```
### `outline_tool.py` — Hierarchical Code Outline
```python
class CodeOutliner:
def outline(self, code: str) -> str
```
Walks top-level `ast` nodes:
- `ClassDef``[Class] Name (Lines X-Y)` + docstring + recurse for methods
- `FunctionDef``[Func] Name (Lines X-Y)` or `[Method] Name` if nested
- `AsyncFunctionDef``[Async Func] Name (Lines X-Y)`
Only extracts first line of docstrings. Uses indentation depth as heuristic for method vs function.
---
## Two Parallel Code Analysis Implementations
The codebase has two parallel approaches for structural code analysis:
| Aspect | `file_cache.py` (tree-sitter) | `summarize.py` / `outline_tool.py` (stdlib `ast`) |
|---|---|---|
| Parser | tree-sitter with `tree_sitter_python` | Python's built-in `ast` module |
| Precision | Byte-accurate, preserves exact syntax | Line-level, may lose formatting nuance |
| `@core_logic` / `[HOT]` | Supported (selective body preservation) | Not supported |
| Used by | `py_get_skeleton` MCP tool, worker context injection | `get_file_summary` MCP tool, `py_get_code_outline` |
| Performance | Slightly slower (C extension + tree walk) | Faster (pure Python, simpler walk) |

View File

@@ -1,65 +1,385 @@
# Manual Slop: Tooling & IPC Technical Reference
# Tooling & IPC Technical Reference
A deep-dive into the Model Context Protocol (MCP) bridge, the Hook API, and the "Human-in-the-Loop" communication protocol.
[Top](../Readme.md) | [Architecture](guide_architecture.md) | [MMA Orchestration](guide_mma.md) | [Simulations](guide_simulations.md)
---
## 1. The MCP Bridge: Filesystem Security
## The MCP Bridge: Filesystem Security
The AI's ability to interact with your filesystem is mediated by a strict security allowlist.
The AI's ability to interact with the filesystem is mediated by a three-layer security model in `mcp_client.py`. Every tool accessing the disk passes through `_resolve_and_check(path)` before any I/O occurs.
### Path Resolution & Sandboxing
Every tool accessing the disk (e.g., `read_file`, `list_directory`, `search_files`) executes `_resolve_and_check(path)`:
1. **Normalization:** The requested path is converted to an absolute path.
2. **Constraint Check:** The path must reside within the project's `base_dir`.
3. **Enforcement:** Violations trigger a `PermissionError`, returned to the model as an `ACCESS DENIED` status.
### Global State
### Native Toolset
* **`read_file(path)`:** UTF-8 extraction, clamped by token budgets.
* **`list_directory(path)`:** Returns a structural map (Name, Type, Size).
* **`get_file_summary(path)`:** AST-based heuristic parsing for high-signal architectural mapping without full-file read costs.
* **`web_search(query)`:** Scrapes DuckDuckGo raw HTML via a dependency-free parser.
```python
_allowed_paths: set[Path] = set() # Explicit file allowlist (resolved absolutes)
_base_dirs: set[Path] = set() # Directory roots for containment checks
_primary_base_dir: Path | None = None # Used for resolving relative paths
perf_monitor_callback: Optional[Callable[[], dict[str, Any]]] = None
```
### Layer 1: Allowlist Construction (`configure`)
Called by `ai_client` before each send cycle. Takes `file_items` (from `aggregate.build_file_items()`) and optional `extra_base_dirs`.
1. Resets `_allowed_paths` and `_base_dirs` to empty sets on every call.
2. Sets `_primary_base_dir` from `extra_base_dirs[0]` (resolved) or falls back to `Path.cwd()`.
3. Iterates all `file_items`, resolving each `item["path"]` to an absolute path. Each resolved path is added to `_allowed_paths`; its parent directory is added to `_base_dirs`.
4. Any entries in `extra_base_dirs` that are valid directories are also added to `_base_dirs`.
### Layer 2: Path Validation (`_is_allowed`)
Checks run in this exact order:
1. **Blacklist** (hard deny): If filename is `history.toml` or ends with `_history.toml`, return `False`. Prevents the AI from reading conversation history.
2. **Explicit allowlist**: If resolved path is in `_allowed_paths`, return `True`.
3. **CWD fallback**: If `_base_dirs` is empty, any path under `cwd()` is allowed.
4. **Base directory containment**: Path must be a subpath of at least one entry in `_base_dirs` (via `relative_to()`).
5. **Default deny**: All other paths are rejected.
All paths are resolved (following symlinks) before comparison, preventing symlink-based traversal.
### Layer 3: Resolution Gate (`_resolve_and_check`)
Every tool call passes through this:
1. Convert raw path string to `Path`.
2. If not absolute, prepend `_primary_base_dir`.
3. Resolve to absolute.
4. Call `_is_allowed()`.
5. Return `(resolved_path, "")` on success or `(None, error_message)` on failure.
The error message includes the full list of allowed base directories for debugging.
---
## 2. The Hook API: Remote Control & Telemetry
## Native Tool Inventory
Manual Slop exposes a REST-based IPC interface (running by default on port `8999`) to facilitate automated verification and external monitoring.
The `dispatch` function (line 806) is a flat if/elif chain mapping 26 tool names to implementations. All tools are categorized below with their parameters and behavior.
### Core Endpoints
* `GET /status`: Engine health and hook server readiness.
* `GET /mma_status`: Retrieves the 4-Tier state, active track metadata, and current ticket DAG status.
* `POST /api/gui`: Pushes events into the `AsyncEventQueue`.
* Payload example: `{"action": "set_value", "item": "current_provider", "value": "anthropic"}`
* `GET /diagnostics`: High-frequency telemetry for UI performance (FPS, CPU, Input Lag).
### File I/O Tools
### ApiHookClient Implementation
The `api_hook_client.py` provides a robust wrapper for the Hook API:
* **Synchronous Wait:** `wait_for_server()` polls `/status` with exponential backoff.
* **State Polling:** `wait_for_value()` blocks until a specific GUI element matches an expected state.
* **Remote Interaction:** `click()`, `set_value()`, and `select_tab()` methods allow external agents to drive the GUI.
| Tool | Parameters | Description |
|---|---|---|
| `read_file` | `path` | UTF-8 file content extraction |
| `list_directory` | `path` | Compact table: `[file/dir] name size`. Applies blacklist filter to entries. |
| `search_files` | `path`, `pattern` | Glob pattern matching within an allowed directory. Applies blacklist filter. |
| `get_file_slice` | `path`, `start_line`, `end_line` | Returns specific line range (1-based, inclusive) |
| `set_file_slice` | `path`, `start_line`, `end_line`, `new_content` | Replaces a line range with new content (surgical edit) |
| `get_tree` | `path`, `max_depth` | Directory structure up to `max_depth` levels |
### AST-Based Tools (Python only)
These use `file_cache.ASTParser` (tree-sitter) or stdlib `ast` for structural code analysis:
| Tool | Parameters | Description |
|---|---|---|
| `py_get_skeleton` | `path` | Signatures + docstrings, bodies replaced with `...`. Uses tree-sitter. |
| `py_get_code_outline` | `path` | Hierarchical outline: `[Class] Name (Lines X-Y)` with nested methods. Uses stdlib `ast`. |
| `py_get_definition` | `path`, `name` | Full source of a specific class/function/method. Supports `ClassName.method` dot notation. |
| `py_update_definition` | `path`, `name`, `new_content` | Surgical replacement: locates symbol via `ast`, delegates to `set_file_slice`. |
| `py_get_signature` | `path`, `name` | Only the `def` line through the colon. |
| `py_set_signature` | `path`, `name`, `new_signature` | Replaces only the signature, preserving body. |
| `py_get_class_summary` | `path`, `name` | Class docstring + list of method signatures. |
| `py_get_var_declaration` | `path`, `name` | Module-level or class-level variable assignment line(s). |
| `py_set_var_declaration` | `path`, `name`, `new_declaration` | Surgical variable replacement. |
| `py_find_usages` | `path`, `name` | Exact string match search across a file or directory. |
| `py_get_imports` | `path` | Parses AST, returns strict dependency list. |
| `py_check_syntax` | `path` | Quick syntax validation via `ast.parse()`. |
| `py_get_hierarchy` | `path`, `class_name` | Scans directory for subclasses of a given class. |
| `py_get_docstring` | `path`, `name` | Extracts docstring for module, class, or function. |
### Analysis Tools
| Tool | Parameters | Description |
|---|---|---|
| `get_file_summary` | `path` | Heuristic summary via `summarize.py`: imports, classes, functions, constants for `.py`; table keys for `.toml`; headings for `.md`. |
| `get_git_diff` | `path`, `base_rev`, `head_rev` | Git diff output for a file or directory. |
### Network Tools
| Tool | Parameters | Description |
|---|---|---|
| `web_search` | `query` | Scrapes DuckDuckGo HTML via dependency-free `_DDGParser` (HTMLParser subclass). Returns top 5 results with title, URL, snippet. |
| `fetch_url` | `url` | Fetches URL content, strips HTML tags via `_TextExtractor`. |
### Runtime Tools
| Tool | Parameters | Description |
|---|---|---|
| `get_ui_performance` | (none) | Returns FPS, Frame Time, CPU, Input Lag via injected `perf_monitor_callback`. No security check (no filesystem access). |
### Tool Implementation Patterns
**AST-based read tools** follow this pattern:
```python
def py_get_skeleton(path: str) -> str:
p, err = _resolve_and_check(path)
if err: return err
if not p.exists(): return f"ERROR: file not found: {path}"
if not p.is_file() or p.suffix != ".py": return f"ERROR: not a python file: {path}"
from file_cache import ASTParser
code = p.read_text(encoding="utf-8")
parser = ASTParser("python")
return parser.get_skeleton(code)
```
**AST-based write tools** use stdlib `ast` (not tree-sitter) to locate symbols, then delegate to `set_file_slice`:
```python
def py_update_definition(path: str, name: str, new_content: str) -> str:
p, err = _resolve_and_check(path)
if err: return err
code = p.read_text(encoding="utf-8").lstrip(chr(0xFEFF)) # Strip BOM
tree = ast.parse(code)
node = _get_symbol_node(tree, name) # Walks AST for matching node
if not node: return f"ERROR: could not find definition '{name}'"
start = getattr(node, "lineno")
end = getattr(node, "end_lineno")
return set_file_slice(path, start, end, new_content)
```
The `_get_symbol_node` helper supports dot notation (`ClassName.method_name`) by first finding the class, then searching its body for the method.
---
## 3. The HITL IPC Flow: `ask/respond`
## The Hook API: Remote Control & Telemetry
Manual Slop supports a synchronous "Human-in-the-Loop" request pattern for operations requiring explicit confirmation or manual data mutation.
Manual Slop exposes a REST-based IPC interface on `127.0.0.1:8999` using Python's `ThreadingHTTPServer`. Each incoming request gets its own thread.
### Sequence of Operation
1. **Request:** A background agent (e.g., a Tier 3 Worker) calls `/api/ask` with a JSON payload.
2. **Intercept:** the `HookServer` generates a unique `request_id` and pushes a `type: "ask"` event to the GUI's `_pending_gui_tasks`.
3. **Modal Display:** The GUI renders an `Approve/Reject` modal with the payload details.
4. **Response:** Upon user action, the GUI thread `POST`s to `/api/ask/respond`.
5. **Resume:** The original agent call to `/api/ask` (which was polling for completion) unblocks and receives the user's response.
### Server Architecture
This pattern is the foundation of the **Execution Clutch**, ensuring that no destructive action occurs without an auditable human signal.
```python
class HookServerInstance(ThreadingHTTPServer):
app: Any # Reference to main App instance
class HookHandler(BaseHTTPRequestHandler):
# Accesses self.server.app for all state
class HookServer:
app: Any
port: int = 8999
server: HookServerInstance | None
thread: threading.Thread | None
```
**Start conditions**: Only starts if `app.test_hooks_enabled == True` OR current provider is `'gemini_cli'`. Otherwise `start()` silently returns.
**Initialization**: On start, ensures the app has `_pending_gui_tasks` + lock, `_pending_asks` + `_ask_responses` dicts, and `_api_event_queue` + lock.
### GUI Thread Trampoline Pattern
The HookServer **never reads GUI state directly** (thread safety). For state reads, it uses a trampoline:
1. Create a `threading.Event()` and a `result` dict.
2. Push a `custom_callback` closure into `_pending_gui_tasks` that reads state and calls `event.set()`.
3. Block on `event.wait(timeout=60)`.
4. Return `result` as JSON, or 504 on timeout.
This ensures all state reads happen on the GUI main thread during `_process_pending_gui_tasks`.
### GET Endpoints
| Endpoint | Thread Safety | Response |
|---|---|---|
| `GET /status` | Direct (stateless) | `{"status": "ok"}` |
| `GET /api/project` | Direct read | `{"project": <flat_config>}` via `project_manager.flat_config()` |
| `GET /api/session` | Direct read | `{"session": {"entries": [...]}}` from `app.disc_entries` |
| `GET /api/performance` | Direct read | `{"performance": <metrics>}` from `app.perf_monitor.get_metrics()` |
| `GET /api/events` | Lock-guarded drain | `{"events": [...]}` — drains and clears `_api_event_queue` |
| `GET /api/gui/value` | GUI trampoline | `{"value": <val>}` — reads from `_settable_fields` map |
| `GET /api/gui/value/<tag>` | GUI trampoline | Same, via URL path param |
| `GET /api/gui/mma_status` | GUI trampoline | Full MMA state dict (see below) |
| `GET /api/gui/diagnostics` | GUI trampoline | `{thinking, live, prior}` booleans |
**`/api/gui/mma_status` response fields:**
```python
{
"mma_status": str, # "idle" | "planning" | "executing" | "done"
"ai_status": str, # "idle" | "sending..." | etc.
"active_tier": str | None,
"active_track": str, # Track ID or raw value
"active_tickets": list, # Serialized ticket dicts
"mma_step_mode": bool,
"pending_tool_approval": bool, # _pending_ask_dialog
"pending_mma_step_approval": bool, # _pending_mma_approval is not None
"pending_mma_spawn_approval": bool, # _pending_mma_spawn is not None
"pending_approval": bool, # Backward compat: step OR tool
"pending_spawn": bool, # Alias for spawn approval
"tracks": list,
"proposed_tracks": list,
"mma_streams": dict, # {stream_id: output_text}
}
```
**`/api/gui/diagnostics` response fields:**
```python
{
"thinking": bool, # ai_status in ["sending...", "running powershell..."]
"live": bool, # ai_status in ["running powershell...", "fetching url...", ...]
"prior": bool, # app.is_viewing_prior_session
}
```
### POST Endpoints
| Endpoint | Body | Response | Effect |
|---|---|---|---|
| `POST /api/project` | `{"project": {...}}` | `{"status": "updated"}` | Sets `app.project` |
| `POST /api/session` | `{"session": {"entries": [...]}}` | `{"status": "updated"}` | Sets `app.disc_entries` |
| `POST /api/gui` | Any JSON dict | `{"status": "queued"}` | Appends to `_pending_gui_tasks` |
| `POST /api/ask` | Any JSON dict | `{"status": "ok", "response": ...}` or 504 | Blocking ask dialog |
| `POST /api/ask/respond` | `{"request_id": ..., "response": ...}` | `{"status": "ok"}` or 404 | Resolves a pending ask |
### The `/api/ask` Protocol (Synchronous HITL via HTTP)
This is the most complex endpoint — it implements a blocking request-response dialog over HTTP:
1. Generate a UUID `request_id`.
2. Create a `threading.Event`.
3. Register in `app._pending_asks[request_id] = event`.
4. Push an `ask_received` event to `_api_event_queue` (for client discovery).
5. Append `{"type": "ask", "request_id": ..., "data": ...}` to `_pending_gui_tasks`.
6. Block on `event.wait(timeout=60.0)`.
7. On signal: read `app._ask_responses[request_id]`, clean up, return 200.
8. On timeout: clean up, return 504.
The counterpart `/api/ask/respond`:
1. Look up `request_id` in `app._pending_asks`.
2. Store `response` in `app._ask_responses[request_id]`.
3. Signal the event (`event.set()`).
4. Queue a `clear_ask` GUI task.
5. Return 200 (or 404 if `request_id` not found).
---
## 4. Synthetic Context Refresh
## ApiHookClient: The Automation Interface
To minimize token churn and redundant `read_file` calls, the `ai_client` performs a post-tool-execution refresh:
1. **Detection:** Triggered after the final tool call in a reasoning round.
2. **Collection:** re-reads all project-tracked files from disk.
3. **Injection:** The updated content is injected into the next LLM turn as a `[SYSTEM: FILES UPDATED]` block.
4. **Pruning:** Older snapshots are stripped from history in subsequent rounds to maintain a lean context window.
`api_hook_client.py` provides a synchronous Python client for the Hook API, used by test scripts and external tooling.
```python
class ApiHookClient:
def __init__(self, base_url="http://127.0.0.1:8999", max_retries=5, retry_delay=0.2)
```
### Connection Methods
| Method | Description |
|---|---|
| `wait_for_server(timeout=3)` | Polls `/status` with exponential backoff until server is ready. |
| `_make_request(method, endpoint, data, timeout)` | Core HTTP client with retry logic. |
### State Query Methods
| Method | Endpoint | Description |
|---|---|---|
| `get_status()` | `GET /status` | Health check |
| `get_project()` | `GET /api/project` | Full project config |
| `get_session()` | `GET /api/session` | Discussion entries |
| `get_mma_status()` | `GET /api/gui/mma_status` | Full MMA orchestration state |
| `get_performance()` | `GET /api/performance` | UI metrics (FPS, CPU, etc.) |
| `get_value(item)` | `GET /api/gui/value/<item>` | Read any `_settable_fields` value |
| `get_text_value(item_tag)` | Wraps `get_value` | Returns string representation or None |
| `get_events()` | `GET /api/events` | Fetches and clears the event queue |
| `get_indicator_state(tag)` | `GET /api/gui/diagnostics` | Checks if an indicator is shown |
| `get_node_status(node_tag)` | Two-phase: `get_value` then `diagnostics` | DAG node status with fallback |
### GUI Manipulation Methods
| Method | Endpoint | Description |
|---|---|---|
| `set_value(item, value)` | `POST /api/gui` | Sets any `_settable_fields` value; special-cases `current_provider` and `gcli_path` |
| `click(item, *args, **kwargs)` | `POST /api/gui` | Simulates button click; passes optional `user_data` |
| `select_tab(tab_bar, tab)` | `POST /api/gui` | Switches to a specific tab |
| `select_list_item(listbox, item_value)` | `POST /api/gui` | Selects an item in a listbox |
| `push_event(event_type, payload)` | `POST /api/gui` | Pushes event into `AsyncEventQueue` |
| `post_gui(gui_data)` | `POST /api/gui` | Raw task dict injection |
| `reset_session()` | Clicks `btn_reset_session` | Simulates clicking the Reset Session button |
### Polling Methods
| Method | Description |
|---|---|
| `wait_for_event(event_type, timeout=5)` | Polls `get_events()` until a matching event type appears. |
| `wait_for_value(item, expected, timeout=5)` | Polls `get_value(item)` until it equals `expected`. |
### HITL Method
| Method | Description |
|---|---|
| `request_confirmation(tool_name, args)` | Sends to `/api/ask`, blocks until user responds via the GUI dialog. |
---
## Synthetic Context Refresh
To minimize token churn and redundant `read_file` calls, the `ai_client` performs a post-tool-execution context refresh. See [guide_architecture.md](guide_architecture.md#context-refresh-mechanism) for the full algorithm.
Summary:
1. **Detection**: Triggered after the final tool call in each reasoning round.
2. **Collection**: Re-reads all project-tracked files, comparing mtimes.
3. **Injection**: Changed files are diffed and appended as `[SYSTEM: FILES UPDATED]` to the last tool output.
4. **Pruning**: Older `[FILES UPDATED]` blocks are stripped from history in subsequent rounds.
---
## Session Logging
`session_logger.py` opens timestamped log files at GUI startup and keeps them open for the process lifetime.
### File Layout
```
logs/sessions/<session_id>/
comms.log # JSON-L: every API interaction (direction, kind, payload)
toolcalls.log # Markdown: sequential tool invocation records
apihooks.log # API hook invocations
clicalls.log # JSON-L: CLI subprocess details (command, stdin, stdout, stderr, latency)
scripts/generated/
<ts>_<seq:04d>.ps1 # Each AI-generated PowerShell script, preserved in order
```
### Logging Functions
| Function | Target | Format |
|---|---|---|
| `log_comms(entry)` | `comms.log` | JSON-L line per entry |
| `log_tool_call(script, result, script_path)` | `toolcalls.log` + `scripts/generated/` | Markdown record + preserved `.ps1` file |
| `log_api_hook(method, path, body)` | `apihooks.log` | Timestamped text line |
| `log_cli_call(command, stdin, stdout, stderr, latency)` | `clicalls.log` | JSON-L with latency tracking |
### Lifecycle
- `open_session(label)`: Called once at GUI startup. Idempotent (checks if already open). Registers `atexit.register(close_session)`.
- `close_session()`: Flushes and closes all file handles.
---
## Shell Runner
`shell_runner.py` executes PowerShell scripts with environment configuration, timeout handling, and optional QA integration.
### Environment Configuration via `mcp_env.toml`
```toml
[path]
prepend = ["C:/custom/bin", "C:/other/tools"]
[env]
MY_VAR = "some_value"
EXPANDED = "${HOME}/subdir"
```
`_build_subprocess_env()` copies `os.environ`, prepends `[path].prepend` entries to `PATH`, and sets `[env]` key-value pairs with `${VAR}` expansion.
### `run_powershell(script, base_dir, qa_callback=None) -> str`
1. Prepends `Set-Location -LiteralPath '<base_dir>'` (with escaped single quotes).
2. Locates PowerShell: tries `powershell.exe`, `pwsh.exe`, `powershell`, `pwsh` in order.
3. Runs via `subprocess.Popen([exe, "-NoProfile", "-NonInteractive", "-Command", full_script])`.
4. `process.communicate(timeout=60)` — 60-second hard timeout.
5. On `TimeoutExpired`: kills process tree via `taskkill /F /T /PID`, returns `"ERROR: timed out after 60s"`.
6. Returns combined output: `STDOUT:\n<out>\nSTDERR:\n<err>\nEXIT CODE: <code>`.
7. If `qa_callback` provided and command failed: appends `QA ANALYSIS:\n<qa_callback(stderr)>` — integrates Tier 4 QA error analysis directly.