Private
Public Access
0
0

docs(ai-client): add guide_ai_client.md

This commit is contained in:
2026-06-02 23:25:28 -04:00
parent 779eb006ba
commit 0426239a13
+424
View File
@@ -0,0 +1,424 @@
# `src/ai_client.py` — Multi-Provider LLM Abstraction
[Top](../README.md) | [Architecture](guide_architecture.md) | [Testing](guide_testing.md) | [MMA](guide_mma.md)
---
## Overview
`src/ai_client.py` (~116KB) is the **unified LLM client** for 5 providers. It abstracts the differences between providers (Gemini, Anthropic, DeepSeek, MiniMax, Gemini CLI) behind a single `send()` function.
The module is a **stateful singleton** — all provider state is held in module-level globals. There is no class wrapping; the module itself is the abstraction layer.
---
## Architecture
```
┌─────────────────────────────────────────────────┐
│ ai_client.send(md_content, user_message, ...) │
│ │
│ 1. _send_lock.acquire() — serialize all calls │
│ 2. Read _provider / _model │
│ 3. Route to provider-specific _send_<provider>() │
│ 4. Return str response │
└─────────────────┬───────────────────────────────┘
│ dispatches based on _provider
┌────────┬─────────┬────────┬──────────┐
▼ ▼ ▼ ▼ ▼
_gemini _anthropic _deepseek _minimax _gemini_cli
(subprocess)
```
---
## State
All state is module-level globals. The most important:
| Variable | Type | Purpose |
|---|---|---|
| `_provider: str` | `"gemini" \| "anthropic" \| "deepseek" \| "minimax" \| "gemini_cli"` | Active provider |
| `_model: str` | `str` | Active model name |
| `_temperature: float` | `0.0` | Sampling temperature |
| `_top_p: float` | `1.0` | Nucleus sampling |
| `_max_tokens: int` | `8192` | Output token cap |
| `_history_trunc_limit: int` | `8000` | Char limit for truncating old tool outputs |
| `_send_lock` | `threading.Lock` | Serializes all send() calls |
| `_current_palette: str` | theme | Last-applied theme palette |
### Per-Provider State
```python
_gemini_client: Optional[genai.Client] = None
_gemini_chat: Any = None
_gemini_cache: Any = None
_gemini_cache_md_hash: Optional[str] = None
_gemini_cache_created_at: Optional[float] = None
_gemini_cached_file_paths: list[str] = []
_anthropic_client: Optional[anthropic.Anthropic] = None
_anthropic_history: list[dict] = []
_anthropic_history_lock: threading.Lock = threading.Lock()
_deepseek_client: Any = None
_deepseek_history: list[dict] = []
_deepseek_history_lock: threading.Lock = threading.Lock()
_minimax_client: Any = None
_minimax_history: list[dict] = []
_minimax_history_lock: threading.Lock = threading.Lock()
_gemini_cli_adapter: Optional[GeminiCliAdapter] = None
```
---
## The Public API
### `send(...)` — The Main Entry Point
```python
def send(
md_content: str,
user_message: str,
base_dir: str = ".",
file_items: list[dict] | None = None,
discussion_history: str = "",
stream: bool = False,
pre_tool_callback: Optional[Callable] = None,
qa_callback: Optional[Callable] = None,
enable_tools: bool = True,
stream_callback: Optional[Callable] = None,
patch_callback: Optional[Callable] = None,
rag_engine: Optional[Any] = None,
) -> str:
```
Returns the model's response as a string. All provider calls go through here.
**Parameters:**
- `md_content` — the system prompt + context (markdown)
- `user_message` — the user's message
- `base_dir` — for MCP tool filesystem operations
- `file_items` — files in the context (deprecated path; usually empty)
- `discussion_history` — legacy parameter
- `stream` / `stream_callback` — for streaming responses
- `pre_tool_callback` — called before each tool execution (HITL gate)
- `qa_callback` — called when an error occurs (Tier 4 integration)
- `enable_tools` — whether to enable PowerShell + MCP tools
- `patch_callback` — Tier 4 patch generation hook
- `rag_engine` — optional RAG engine for context augmentation
### Provider Switching
```python
from src import ai_client
ai_client.set_provider("gemini", "gemini-3-flash-preview")
ai_client.set_provider("anthropic", "claude-3-5-sonnet-latest")
ai_client.set_provider("deepseek", "deepseek-chat")
ai_client.set_provider("minimax", "grok-2-latest")
ai_client.set_provider("gemini_cli", "gemini-2.0-flash")
```
### Parameter Setters
```python
ai_client.set_model_params(temp=0.7, max_tok=4096, top_p=0.9, trunc_limit=4000)
```
### Session Management
```python
ai_client.reset_session() # Clears all provider state, history, cache
```
### Event Hooks
```python
from src import ai_client
# Confirmation hook (called before destructive tool execution)
ai_client.confirm_and_run_callback = my_gui_callback
# Comms log hook (called on every API call)
ai_client.comms_log_callback = my_logging_callback
# Tool log hook (called on every tool completion)
ai_client.tool_log_callback = my_tool_logging_callback
# Event emitter (for any subscriber)
ai_client.events.on("my_event", my_handler)
```
### Comms Log
```python
ai_client._append_comms(direction, kind, payload) # Add entry
ai_client.get_comms_log() # Read all
ai_client.clear_comms_log() # Clear
ai_client.get_token_stats(md_content) # Estimate token usage
```
### Provider Error Taxonomy
```python
class ProviderError(Exception):
kind: str # "quota" | "rate_limit" | "auth" | "balance" | "network" | "unknown"
provider: str
original: Exception
def ui_message(self) -> str:
"""Returns a user-friendly error message."""
```
`ProviderError` is raised by provider-specific `_send_*` functions on failure. The caller (typically `app_controller.py`) catches it and surfaces the error to the user via `app.ai_status`.
---
## The Tool-Call Loop
All providers follow the same high-level pattern in `_send_*`:
```python
def _send_<provider>(md_content, user_message, ...):
for round in range(MAX_TOOL_ROUNDS + 2): # up to 10 rounds
response = provider_api_call(md_content, user_message, history, tools)
comms_log(direction="IN", kind="response", payload=response)
if not has_function_calls(response):
return extract_text(response)
for call in response.function_calls:
if pre_tool_callback and pre_tool_callback(...) is rejected:
return rejection_message
tool_result = dispatch(call.name, call.args, base_dir)
append_tool_result_to_history(call, tool_result)
# Context refresh: re-read all tracked files (mtime check)
_reread_file_items(file_items)
# Truncate tool outputs at _history_trunc_limit
truncate_tool_outputs(history)
# Cumulative byte check
if cumulative_tool_bytes > 500_000:
inject_warning()
return final_response
```
The constants:
- `MAX_TOOL_ROUNDS: int = 10` — max tool-call iterations per `send()`
- `_MAX_TOOL_OUTPUT_BYTES: int = 500_000` — cumulative tool output budget
- `_ANTHROPIC_CHUNK_SIZE: int = 120_000` — chars per Anthropic system text block
- `_ANTHROPIC_MAX_PROMPT_TOKENS: int = 180_000` — Anthropic prompt limit (200K minus headroom)
- `_GEMINI_MAX_INPUT_TOKENS: int = 900_000` — Gemini 1M window minus headroom
---
## Provider-Specific Behaviors
### Gemini (SDK)
- **Server-side cache**: `genai.CachedContent` with TTL management
- **Cache rebuild at 90% TTL**: proactive renewal
- **Cache hash**: tracks content hash for invalidation
- **Cached file paths**: tracks which files are in the active cache
### Anthropic
- **Ephemeral prompt caching**: 4 `cache_control: ephemeral` breakpoints
- **Breakpoints**: system prompt, context chunks, tool def, conversation prefix
- **History trimming at 180K tokens**: 2-phase (strip stale file refreshes, then drop turn pairs)
- **History repair**: `_repair_anthropic_history` handles tool_result chain breaks
### DeepSeek
- **Raw HTTP**: uses `requests.post` directly (no SDK)
- **Streaming**: supports streaming responses
- **History repair**: `_repair_deepseek_history` for tool result chains
### MiniMax
- **OpenAI-compatible endpoint**: uses the `openai` SDK
- **History trimming**: similar to Anthropic (drop turn pairs at threshold)
- **History repair**: `_repair_minimax_history`
### Gemini CLI
- **Subprocess adapter**: `GeminiCliAdapter` in `src/gemini_cli_adapter.py`
- **Persistent session**: CLI maintains its own session ID
- **JSONL output protocol**: parses streaming JSONL from the CLI subprocess
- **Full feature parity**: tool calls, streaming, usage metadata
---
## History Trimming Strategies
### Gemini (40% threshold)
```python
if total_in > _GEMINI_MAX_INPUT_TOKENS * 0.4:
while len(hist) > 4 and total_in > _GEMINI_MAX_INPUT_TOKENS * 0.3:
hist.pop(0) # Assistant
hist.pop(0) # User
```
### Anthropic (180K limit)
`_trim_anthropic_history(system_blocks, history)` — two-phase:
1. Strip stale `[SYSTEM: FILES UPDATED]` blocks
2. Drop oldest turn pairs (preserving tool_result chains)
### MiniMax
Same pattern as Anthropic (similar 180K limit).
### DeepSeek
No built-in trimming (relies on the caller to keep history short).
---
## Caching Strategies
### Gemini Server-Side Cache
```python
_gemini_cache_md_hash: Optional[str] = None # Hash of cached content
_gemini_cache_created_at: Optional[float] = None # Monotonic time
```
The cache decision is a 3-way branch on each `_send_gemini` call:
- **Hash changed**: delete old, rebuild with new content
- **Cache age > 90% of TTL** (3240s of 3600s): proactive renewal
- **No cache exists**: create new if token count >= 2048, otherwise inline
### Anthropic Cache (4-Breakpoint System)
```
[System prompt]─breakpoint 1
[Context chunks]─breakpoint 2
[Tool definitions]─breakpoint 3
[Last user message]─breakpoint 4
```
Before placing breakpoint 4, all existing `cache_control` is stripped to prevent exceeding the 4-breakpoint limit.
---
## Context Refresh Mechanism
After the last tool call in each round, `_reread_file_items(file_items)` checks mtimes:
1. For each file item: compare `Path.stat().st_mtime` against stored `mtime`
2. If unchanged: pass through as-is
3. If changed: re-read content, store `old_content` for diffing, update `mtime`
4. Changed files are diffed via `_build_file_diff_text`:
- Files ≤ 200 lines: emit full content
- Files > 200 lines with `old_content`: emit `difflib.unified_diff`
5. Diff is appended to the last tool's output as `[SYSTEM: FILES UPDATED]\n\n{diff}`
6. Stale `[FILES UPDATED]` blocks are stripped from older history turns by `_strip_stale_file_refreshes`
This is the "agent always sees current code" mechanism.
---
## Subagent Summarization
For Tier 4: when an error occurs, `qa_callback` may be invoked to get a Tier 4 AI summary of the traceback. The summary is injected back into the worker's history as a hint.
```python
def run_tier4_analysis(stderr: str) -> str:
"""Stateless Tier 4 QA analysis of an error message."""
# Uses a dedicated system prompt for error triage
# Returns analysis text (root cause, suggested fix)
# Does NOT modify any code — analysis only
```
For Tier 4 patch generation:
```python
def run_tier4_patch_generation(error: str, file_context: str) -> str:
"""Generate a unified diff patch from an error and file context."""
# Returns the patch as a string
# The caller (typically the patch modal) presents it for human review
```
---
## Public API Quick Reference
| Function | Purpose |
|---|---|
| `send(...)` | The main entry point — call the active provider |
| `set_provider(provider, model)` | Switch active provider and model |
| `get_provider() -> str` | Get the active provider name |
| `set_model_params(temp, max_tok, trunc_limit, top_p)` | Update generation params |
| `set_custom_system_prompt(prompt)` | Set the per-session system prompt override |
| `set_base_system_prompt(prompt)` | Set the foundational base prompt (advanced) |
| `set_use_default_base_prompt(use: bool)` | Toggle whether the base prompt is included |
| `set_project_context_marker(marker)` | Set the project-specific context tag |
| `reset_session()` | Clear all provider state |
| `get_comms_log()` | Read the in-memory comms log |
| `clear_comms_log()` | Clear the in-memory comms log |
| `get_token_stats(md_content)` | Estimate token usage for the given content |
| `cleanup()` | Tear down (delete Gemini caches, etc.) |
| `get_current_palette() -> str` | Get the current theme palette name |
| `list_models(provider) -> list[str]` | List available models for a provider |
| `run_tier4_analysis(stderr) -> str` | Tier 4 error analysis |
| `run_tier4_patch_generation(error, file_context) -> str` | Tier 4 patch generation |
| `run_subagent_summarization(file_path, content, is_code, outline) -> str` | AI summary of a file |
| `run_discussion_compression(text) -> str` | AI compression of a long discussion |
---
## Thread Safety
- `_send_lock: threading.Lock` — serializes all provider calls. No two `send()` calls run concurrently.
- Per-provider history locks (`_anthropic_history_lock`, etc.) — guard the history list mutations.
- The `EventEmitter` (in `src/events.py`) is thread-safe for subscribe/emit.
---
## Testing
### Unit Tests (no real API calls)
```python
def test_set_provider():
from src import ai_client
ai_client.set_provider("anthropic", "claude-3-5-sonnet-latest")
assert ai_client.get_provider() == "anthropic"
ai_client.reset_session() # Cleanup
```
### Mocked Tests
```python
from unittest.mock import patch
def test_send_routes_to_provider(monkeypatch):
with patch.object(ai_client, "_send_anthropic", return_value="mocked") as m:
ai_client.set_provider("anthropic", "claude-3-5-sonnet-latest")
result = ai_client.send("system", "user")
assert result == "mocked"
m.assert_called_once()
ai_client.reset_session()
```
### Integration (real API)
Gated by env var (e.g., `RUN_REAL_AI_TESTS=1`). Hits the real API. Not in default CI.
---
## See Also
- **[guide_architecture.md](guide_architecture.md#ai-client-multi-provider-architecture)** — Threading model and provider dispatch
- **[guide_mma.md](guide_mma.md#tier-3-worker-lifecycle-run_worker_lifecycle)** — How Tier 3 workers use ai_client
- **[guide_mcp_client.md](guide_mcp_client.md)** — The 45 tools that ai_client can invoke
- **[guide_rag.md](guide_rag.md)** — RAG engine integration via `rag_engine` parameter
- **[conductor/product.md](../../conductor/product.md#multi-provider-integration)** — Product-level overview of providers