# `src/ai_client.py` — Multi-Provider LLM Abstraction [Top](../README.md) | [Architecture](guide_architecture.md) | [Testing](guide_testing.md) | [MMA](guide_mma.md) --- ## Overview `src/ai_client.py` (~116KB) is the **unified LLM client** for 5 providers. It abstracts the differences between providers (Gemini, Anthropic, DeepSeek, MiniMax, Gemini CLI) behind a single `send()` function. The module is a **stateful singleton** — all provider state is held in module-level globals. There is no class wrapping; the module itself is the abstraction layer. --- ## Architecture ``` ┌─────────────────────────────────────────────────┐ │ ai_client.send(md_content, user_message, ...) │ │ │ │ 1. _send_lock.acquire() — serialize all calls │ │ 2. Read _provider / _model │ │ 3. Route to provider-specific _send_() │ │ 4. Return str response │ └─────────────────┬───────────────────────────────┘ │ dispatches based on _provider ▼ ┌────────┬─────────┬────────┬──────────┐ ▼ ▼ ▼ ▼ ▼ _gemini _anthropic _deepseek _minimax _gemini_cli (subprocess) ``` --- ## State All state is module-level globals. The most important: | Variable | Type | Purpose | |---|---|---| | `_provider: str` | `"gemini" \| "anthropic" \| "deepseek" \| "minimax" \| "gemini_cli"` | Active provider | | `_model: str` | `str` | Active model name | | `_temperature: float` | `0.0` | Sampling temperature | | `_top_p: float` | `1.0` | Nucleus sampling | | `_max_tokens: int` | `8192` | Output token cap | | `_history_trunc_limit: int` | `8000` | Char limit for truncating old tool outputs | | `_send_lock` | `threading.Lock` | Serializes all send() calls | | `_current_palette: str` | theme | Last-applied theme palette | ### Per-Provider State ```python _gemini_client: Optional[genai.Client] = None _gemini_chat: Any = None _gemini_cache: Any = None _gemini_cache_md_hash: Optional[str] = None _gemini_cache_created_at: Optional[float] = None _gemini_cached_file_paths: list[str] = [] _anthropic_client: Optional[anthropic.Anthropic] = None _anthropic_history: list[dict] = [] _anthropic_history_lock: threading.Lock = threading.Lock() _deepseek_client: Any = None _deepseek_history: list[dict] = [] _deepseek_history_lock: threading.Lock = threading.Lock() _minimax_client: Any = None _minimax_history: list[dict] = [] _minimax_history_lock: threading.Lock = threading.Lock() _gemini_cli_adapter: Optional[GeminiCliAdapter] = None ``` --- ## The Public API ### `send(...)` — The Main Entry Point ```python def send( md_content: str, user_message: str, base_dir: str = ".", file_items: list[dict] | None = None, discussion_history: str = "", stream: bool = False, pre_tool_callback: Optional[Callable] = None, qa_callback: Optional[Callable] = None, enable_tools: bool = True, stream_callback: Optional[Callable] = None, patch_callback: Optional[Callable] = None, rag_engine: Optional[Any] = None, ) -> str: ``` Returns the model's response as a string. All provider calls go through here. **Parameters:** - `md_content` — the system prompt + context (markdown) - `user_message` — the user's message - `base_dir` — for MCP tool filesystem operations - `file_items` — files in the context (deprecated path; usually empty) - `discussion_history` — legacy parameter - `stream` / `stream_callback` — for streaming responses - `pre_tool_callback` — called before each tool execution (HITL gate) - `qa_callback` — called when an error occurs (Tier 4 integration) - `enable_tools` — whether to enable PowerShell + MCP tools - `patch_callback` — Tier 4 patch generation hook - `rag_engine` — optional RAG engine for context augmentation ### Provider Switching ```python from src import ai_client ai_client.set_provider("gemini", "gemini-3-flash-preview") ai_client.set_provider("anthropic", "claude-3-5-sonnet-latest") ai_client.set_provider("deepseek", "deepseek-chat") ai_client.set_provider("minimax", "grok-2-latest") ai_client.set_provider("gemini_cli", "gemini-2.0-flash") ``` ### Parameter Setters ```python ai_client.set_model_params(temp=0.7, max_tok=4096, top_p=0.9, trunc_limit=4000) ``` ### Session Management ```python ai_client.reset_session() # Clears all provider state, history, cache ``` ### Event Hooks ```python from src import ai_client # Confirmation hook (called before destructive tool execution) ai_client.confirm_and_run_callback = my_gui_callback # Comms log hook (called on every API call) ai_client.comms_log_callback = my_logging_callback # Tool log hook (called on every tool completion) ai_client.tool_log_callback = my_tool_logging_callback # Event emitter (for any subscriber) ai_client.events.on("my_event", my_handler) ``` ### Comms Log ```python ai_client._append_comms(direction, kind, payload) # Add entry ai_client.get_comms_log() # Read all ai_client.clear_comms_log() # Clear ai_client.get_token_stats(md_content) # Estimate token usage ``` ### Provider Error Taxonomy ```python class ProviderError(Exception): kind: str # "quota" | "rate_limit" | "auth" | "balance" | "network" | "unknown" provider: str original: Exception def ui_message(self) -> str: """Returns a user-friendly error message.""" ``` `ProviderError` is raised by provider-specific `_send_*` functions on failure. The caller (typically `app_controller.py`) catches it and surfaces the error to the user via `app.ai_status`. --- ## The Tool-Call Loop All providers follow the same high-level pattern in `_send_*`: ```python def _send_(md_content, user_message, ...): for round in range(MAX_TOOL_ROUNDS + 2): # up to 10 rounds response = provider_api_call(md_content, user_message, history, tools) comms_log(direction="IN", kind="response", payload=response) if not has_function_calls(response): return extract_text(response) for call in response.function_calls: if pre_tool_callback and pre_tool_callback(...) is rejected: return rejection_message tool_result = dispatch(call.name, call.args, base_dir) append_tool_result_to_history(call, tool_result) # Context refresh: re-read all tracked files (mtime check) _reread_file_items(file_items) # Truncate tool outputs at _history_trunc_limit truncate_tool_outputs(history) # Cumulative byte check if cumulative_tool_bytes > 500_000: inject_warning() return final_response ``` The constants: - `MAX_TOOL_ROUNDS: int = 10` — max tool-call iterations per `send()` - `_MAX_TOOL_OUTPUT_BYTES: int = 500_000` — cumulative tool output budget - `_ANTHROPIC_CHUNK_SIZE: int = 120_000` — chars per Anthropic system text block - `_ANTHROPIC_MAX_PROMPT_TOKENS: int = 180_000` — Anthropic prompt limit (200K minus headroom) - `_GEMINI_MAX_INPUT_TOKENS: int = 900_000` — Gemini 1M window minus headroom --- ## Provider-Specific Behaviors ### Gemini (SDK) - **Server-side cache**: `genai.CachedContent` with TTL management - **Cache rebuild at 90% TTL**: proactive renewal - **Cache hash**: tracks content hash for invalidation - **Cached file paths**: tracks which files are in the active cache ### Anthropic - **Ephemeral prompt caching**: 4 `cache_control: ephemeral` breakpoints - **Breakpoints**: system prompt, context chunks, tool def, conversation prefix - **History trimming at 180K tokens**: 2-phase (strip stale file refreshes, then drop turn pairs) - **History repair**: `_repair_anthropic_history` handles tool_result chain breaks ### DeepSeek - **Raw HTTP**: uses `requests.post` directly (no SDK) - **Streaming**: supports streaming responses - **History repair**: `_repair_deepseek_history` for tool result chains ### MiniMax - **OpenAI-compatible endpoint**: uses the `openai` SDK - **History trimming**: similar to Anthropic (drop turn pairs at threshold) - **History repair**: `_repair_minimax_history` ### Gemini CLI - **Subprocess adapter**: `GeminiCliAdapter` in `src/gemini_cli_adapter.py` - **Persistent session**: CLI maintains its own session ID - **JSONL output protocol**: parses streaming JSONL from the CLI subprocess - **Full feature parity**: tool calls, streaming, usage metadata --- ## History Trimming Strategies ### Gemini (40% threshold) ```python if total_in > _GEMINI_MAX_INPUT_TOKENS * 0.4: while len(hist) > 4 and total_in > _GEMINI_MAX_INPUT_TOKENS * 0.3: hist.pop(0) # Assistant hist.pop(0) # User ``` ### Anthropic (180K limit) `_trim_anthropic_history(system_blocks, history)` — two-phase: 1. Strip stale `[SYSTEM: FILES UPDATED]` blocks 2. Drop oldest turn pairs (preserving tool_result chains) ### MiniMax Same pattern as Anthropic (similar 180K limit). ### DeepSeek No built-in trimming (relies on the caller to keep history short). --- ## Caching Strategies ### Gemini Server-Side Cache ```python _gemini_cache_md_hash: Optional[str] = None # Hash of cached content _gemini_cache_created_at: Optional[float] = None # Monotonic time ``` The cache decision is a 3-way branch on each `_send_gemini` call: - **Hash changed**: delete old, rebuild with new content - **Cache age > 90% of TTL** (3240s of 3600s): proactive renewal - **No cache exists**: create new if token count >= 2048, otherwise inline ### Anthropic Cache (4-Breakpoint System) ``` [System prompt]─breakpoint 1 [Context chunks]─breakpoint 2 [Tool definitions]─breakpoint 3 [Last user message]─breakpoint 4 ``` Before placing breakpoint 4, all existing `cache_control` is stripped to prevent exceeding the 4-breakpoint limit. --- ## Context Refresh Mechanism After the last tool call in each round, `_reread_file_items(file_items)` checks mtimes: 1. For each file item: compare `Path.stat().st_mtime` against stored `mtime` 2. If unchanged: pass through as-is 3. If changed: re-read content, store `old_content` for diffing, update `mtime` 4. Changed files are diffed via `_build_file_diff_text`: - Files ≤ 200 lines: emit full content - Files > 200 lines with `old_content`: emit `difflib.unified_diff` 5. Diff is appended to the last tool's output as `[SYSTEM: FILES UPDATED]\n\n{diff}` 6. Stale `[FILES UPDATED]` blocks are stripped from older history turns by `_strip_stale_file_refreshes` This is the "agent always sees current code" mechanism. --- ## Subagent Summarization For Tier 4: when an error occurs, `qa_callback` may be invoked to get a Tier 4 AI summary of the traceback. The summary is injected back into the worker's history as a hint. ```python def run_tier4_analysis(stderr: str) -> str: """Stateless Tier 4 QA analysis of an error message.""" # Uses a dedicated system prompt for error triage # Returns analysis text (root cause, suggested fix) # Does NOT modify any code — analysis only ``` For Tier 4 patch generation: ```python def run_tier4_patch_generation(error: str, file_context: str) -> str: """Generate a unified diff patch from an error and file context.""" # Returns the patch as a string # The caller (typically the patch modal) presents it for human review ``` --- ## Public API Quick Reference | Function | Purpose | |---|---| | `send(...)` | The main entry point — call the active provider | | `set_provider(provider, model)` | Switch active provider and model | | `get_provider() -> str` | Get the active provider name | | `set_model_params(temp, max_tok, trunc_limit, top_p)` | Update generation params | | `set_custom_system_prompt(prompt)` | Set the per-session system prompt override | | `set_base_system_prompt(prompt)` | Set the foundational base prompt (advanced) | | `set_use_default_base_prompt(use: bool)` | Toggle whether the base prompt is included | | `set_project_context_marker(marker)` | Set the project-specific context tag | | `reset_session()` | Clear all provider state | | `get_comms_log()` | Read the in-memory comms log | | `clear_comms_log()` | Clear the in-memory comms log | | `get_token_stats(md_content)` | Estimate token usage for the given content | | `cleanup()` | Tear down (delete Gemini caches, etc.) | | `get_current_palette() -> str` | Get the current theme palette name | | `list_models(provider) -> list[str]` | List available models for a provider | | `run_tier4_analysis(stderr) -> str` | Tier 4 error analysis | | `run_tier4_patch_generation(error, file_context) -> str` | Tier 4 patch generation | | `run_subagent_summarization(file_path, content, is_code, outline) -> str` | AI summary of a file | | `run_discussion_compression(text) -> str` | AI compression of a long discussion | --- ## Thread Safety - `_send_lock: threading.Lock` — serializes all provider calls. No two `send()` calls run concurrently. - Per-provider history locks (`_anthropic_history_lock`, etc.) — guard the history list mutations. - The `EventEmitter` (in `src/events.py`) is thread-safe for subscribe/emit. --- ## Testing ### Unit Tests (no real API calls) ```python def test_set_provider(): from src import ai_client ai_client.set_provider("anthropic", "claude-3-5-sonnet-latest") assert ai_client.get_provider() == "anthropic" ai_client.reset_session() # Cleanup ``` ### Mocked Tests ```python from unittest.mock import patch def test_send_routes_to_provider(monkeypatch): with patch.object(ai_client, "_send_anthropic", return_value="mocked") as m: ai_client.set_provider("anthropic", "claude-3-5-sonnet-latest") result = ai_client.send("system", "user") assert result == "mocked" m.assert_called_once() ai_client.reset_session() ``` ### Integration (real API) Gated by env var (e.g., `RUN_REAL_AI_TESTS=1`). Hits the real API. Not in default CI. --- ## See Also - **[guide_architecture.md](guide_architecture.md#ai-client-multi-provider-architecture)** — Threading model and provider dispatch - **[guide_mma.md](guide_mma.md#tier-3-worker-lifecycle-run_worker_lifecycle)** — How Tier 3 workers use ai_client - **[guide_mcp_client.md](guide_mcp_client.md)** — The 45 tools that ai_client can invoke - **[guide_rag.md](guide_rag.md)** — RAG engine integration via `rag_engine` parameter - **[conductor/product.md](../../conductor/product.md#multi-provider-integration)** — Product-level overview of providers