Phase 6 t6.1 + t6.2 (no archive per user directive): - docs/guide_ai_client.md: update Overview to mention 8 providers (was 5); add 'Shared OpenAI-Compatible Helper' section explaining src/openai_compatible.py (NormalizedResponse, OpenAICompatibleRequest, send_openai_compatible, usage pattern); document the Qwen adapter and Llama multi-backend. - docs/guide_models.md: update PROVIDERS list to 8 entries (was 5). - conductor/tracks.md: update the Qwen track entry to reflect '50/79 tasks done; Phase 6 in progress; NOT archiving - has follow-up'; add detailed status note pointing to the follow-up track + audit report. - docs/reports/qwen_llama_grok_followup_audit_20260611.md: NEW report explaining why a follow-up is needed (7 categories of gaps; the Tech Lead's 'footnote for now' failure mode; the lessons learned). - conductor/tracks/qwen_llama_grok_followup_20260611/: NEW follow-up track setup (spec.md, state.toml, metadata.json, TODO.md). 5 phases: tool loop lift, PROVIDERS move, UX adaptations 2-9, local-first + matrix v2, Anthropic/Gemini/DeepSeek migration. Phase 6 t6.3 (git mv to archive) and t6.4 (mark Recently Completed) are NOT applied per user directive: 'we can then doc this we're not archiving yet, if we have a follow up track I need this one to stay up because there is still alot todo'.
20 KiB
src/ai_client.py — Multi-Provider LLM Abstraction
Top | Architecture | Testing | MMA
Overview
src/ai_client.py (~116KB) is the unified LLM client for 8 providers. It abstracts the differences between providers (Gemini, Anthropic, DeepSeek, MiniMax, Gemini CLI, Qwen, Grok, Llama) behind a single send() function.
The module is a stateful singleton — all provider state is held in module-level globals. There is no class wrapping; the module itself is the abstraction layer.
The 8 providers split into 3 API shapes:
- Native SDK: Gemini (google-genai), Anthropic (anthropic), Qwen (DashScope)
- OpenAI-compatible: MiniMax, Grok, Llama (Ollama/OpenRouter/custom), DeepSeek
- Subprocess: Gemini CLI
The OpenAI-compatible vendors all call the shared helper in src/openai_compatible.py (added 2026-06-06 by the qwen_llama_grok_integration_20260606 track; see "Shared OpenAI-Compatible Helper" section below). The MiniMax provider's _send_minimax was refactored to use this helper (Phase 4 of the same track, 231 → 75 lines, 68% reduction).
Module-Level Imports
Important: The 5 provider SDKs are NOT imported at module level.
import google.genai,import anthropic,import openai, andimport fastapiare heavy (~430-955ms each on cold load) and are now obtained viasrc.module_loader._require_warmed("google.genai")and similar calls, after theWarmupManagerhas loaded them in the background. The module-level globals you see in the State section (_gemini_client,_anthropic_client, etc.) are typed asOptionalbecause they're populated by_require_warmed()on first use, not at import time.
This change was part of the 2026-06-06 startup_speedup_20260606 track. Before: import src.ai_client took ~1800ms. After: ~161ms. The remaining cost is the bare module skeleton.
Architecture
┌─────────────────────────────────────────────────┐
│ ai_client.send(md_content, user_message, ...) │
│ │
│ 1. _send_lock.acquire() — serialize all calls │
│ 2. Read _provider / _model │
│ 3. Route to provider-specific _send_<provider>() │
│ 4. Return str response │
└─────────────────┬───────────────────────────────┘
│ dispatches based on _provider
▼
┌────────┬─────────┬────────┬──────────┐
▼ ▼ ▼ ▼ ▼
_gemini _anthropic _deepseek _minimax _gemini_cli
(subprocess)
State
All state is module-level globals. The most important:
| Variable | Type | Purpose |
|---|---|---|
_provider: str |
"gemini" | "anthropic" | "deepseek" | "minimax" | "gemini_cli" |
Active provider |
_model: str |
str |
Active model name |
_temperature: float |
0.0 |
Sampling temperature |
_top_p: float |
1.0 |
Nucleus sampling |
_max_tokens: int |
8192 |
Output token cap |
_history_trunc_limit: int |
8000 |
Char limit for truncating old tool outputs |
_send_lock |
threading.Lock |
Serializes all send() calls |
_current_palette: str |
theme | Last-applied theme palette |
Per-Provider State
_gemini_client: Optional[genai.Client] = None
_gemini_chat: Any = None
_gemini_cache: Any = None
_gemini_cache_md_hash: Optional[str] = None
_gemini_cache_created_at: Optional[float] = None
_gemini_cached_file_paths: list[str] = []
_anthropic_client: Optional[anthropic.Anthropic] = None
_anthropic_history: list[dict] = []
_anthropic_history_lock: threading.Lock = threading.Lock()
_deepseek_client: Any = None
_deepseek_history: list[dict] = []
_deepseek_history_lock: threading.Lock = threading.Lock()
_minimax_client: Any = None
_minimax_history: list[dict] = []
_minimax_history_lock: threading.Lock = threading.Lock()
_gemini_cli_adapter: Optional[GeminiCliAdapter] = None
The Public API
send(...) — The Main Entry Point
def send(
md_content: str,
user_message: str,
base_dir: str = ".",
file_items: list[dict] | None = None,
discussion_history: str = "",
stream: bool = False,
pre_tool_callback: Optional[Callable] = None,
qa_callback: Optional[Callable] = None,
enable_tools: bool = True,
stream_callback: Optional[Callable] = None,
patch_callback: Optional[Callable] = None,
rag_engine: Optional[Any] = None,
) -> str:
Returns the model's response as a string. All provider calls go through here.
Parameters:
md_content— the system prompt + context (markdown)user_message— the user's messagebase_dir— for MCP tool filesystem operationsfile_items— files in the context (deprecated path; usually empty)discussion_history— legacy parameterstream/stream_callback— for streaming responsespre_tool_callback— called before each tool execution (HITL gate)qa_callback— called when an error occurs (Tier 4 integration)enable_tools— whether to enable PowerShell + MCP toolspatch_callback— Tier 4 patch generation hookrag_engine— optional RAG engine for context augmentation
Provider Switching
from src import ai_client
ai_client.set_provider("gemini", "gemini-3-flash-preview")
ai_client.set_provider("anthropic", "claude-3-5-sonnet-latest")
ai_client.set_provider("deepseek", "deepseek-chat")
ai_client.set_provider("minimax", "grok-2-latest")
ai_client.set_provider("gemini_cli", "gemini-2.0-flash")
Parameter Setters
ai_client.set_model_params(temp=0.7, max_tok=4096, top_p=0.9, trunc_limit=4000)
Session Management
ai_client.reset_session() # Clears all provider state, history, cache
Event Hooks
from src import ai_client
# Confirmation hook (called before destructive tool execution)
ai_client.confirm_and_run_callback = my_gui_callback
# Comms log hook (called on every API call)
ai_client.comms_log_callback = my_logging_callback
# Tool log hook (called on every tool completion)
ai_client.tool_log_callback = my_tool_logging_callback
# Event emitter (for any subscriber)
ai_client.events.on("my_event", my_handler)
Comms Log
ai_client._append_comms(direction, kind, payload) # Add entry
ai_client.get_comms_log() # Read all
ai_client.clear_comms_log() # Clear
ai_client.get_token_stats(md_content) # Estimate token usage
Provider Error Taxonomy
class ProviderError(Exception):
kind: str # "quota" | "rate_limit" | "auth" | "balance" | "network" | "unknown"
provider: str
original: Exception
def ui_message(self) -> str:
"""Returns a user-friendly error message."""
ProviderError is raised by provider-specific _send_* functions on failure. The caller (typically app_controller.py) catches it and surfaces the error to the user via app.ai_status.
The Tool-Call Loop
All providers follow the same high-level pattern in _send_*:
def _send_<provider>(md_content, user_message, ...):
for round in range(MAX_TOOL_ROUNDS + 2): # up to 10 rounds
response = provider_api_call(md_content, user_message, history, tools)
comms_log(direction="IN", kind="response", payload=response)
if not has_function_calls(response):
return extract_text(response)
for call in response.function_calls:
if pre_tool_callback and pre_tool_callback(...) is rejected:
return rejection_message
tool_result = dispatch(call.name, call.args, base_dir)
append_tool_result_to_history(call, tool_result)
# Context refresh: re-read all tracked files (mtime check)
_reread_file_items(file_items)
# Truncate tool outputs at _history_trunc_limit
truncate_tool_outputs(history)
# Cumulative byte check
if cumulative_tool_bytes > 500_000:
inject_warning()
return final_response
The constants:
MAX_TOOL_ROUNDS: int = 10— max tool-call iterations persend()_MAX_TOOL_OUTPUT_BYTES: int = 500_000— cumulative tool output budget_ANTHROPIC_CHUNK_SIZE: int = 120_000— chars per Anthropic system text block_ANTHROPIC_MAX_PROMPT_TOKENS: int = 180_000— Anthropic prompt limit (200K minus headroom)_GEMINI_MAX_INPUT_TOKENS: int = 900_000— Gemini 1M window minus headroom
Provider-Specific Behaviors
Gemini (SDK)
- Server-side cache:
genai.CachedContentwith TTL management - Cache rebuild at 90% TTL: proactive renewal
- Cache hash: tracks content hash for invalidation
- Cached file paths: tracks which files are in the active cache
Anthropic
- Ephemeral prompt caching: 4
cache_control: ephemeralbreakpoints - Breakpoints: system prompt, context chunks, tool def, conversation prefix
- History trimming at 180K tokens: 2-phase (strip stale file refreshes, then drop turn pairs)
- History repair:
_repair_anthropic_historyhandles tool_result chain breaks
DeepSeek
- Raw HTTP: uses
requests.postdirectly (no SDK) - Streaming: supports streaming responses
- History repair:
_repair_deepseek_historyfor tool result chains
MiniMax
- OpenAI-compatible endpoint: uses the
openaiSDK - History trimming: similar to Anthropic (drop turn pairs at threshold)
- History repair:
_repair_minimax_history
Gemini CLI
- Subprocess adapter:
GeminiCliAdapterinsrc/gemini_cli_adapter.py - Persistent session: CLI maintains its own session ID
- JSONL output protocol: parses streaming JSONL from the CLI subprocess
- Full feature parity: tool calls, streaming, usage metadata
History Trimming Strategies
Gemini (40% threshold)
if total_in > _GEMINI_MAX_INPUT_TOKENS * 0.4:
while len(hist) > 4 and total_in > _GEMINI_MAX_INPUT_TOKENS * 0.3:
hist.pop(0) # Assistant
hist.pop(0) # User
Anthropic (180K limit)
_trim_anthropic_history(system_blocks, history) — two-phase:
- Strip stale
[SYSTEM: FILES UPDATED]blocks - Drop oldest turn pairs (preserving tool_result chains)
MiniMax
Same pattern as Anthropic (similar 180K limit).
DeepSeek
No built-in trimming (relies on the caller to keep history short).
Caching Strategies
Gemini Server-Side Cache
_gemini_cache_md_hash: Optional[str] = None # Hash of cached content
_gemini_cache_created_at: Optional[float] = None # Monotonic time
The cache decision is a 3-way branch on each _send_gemini call:
- Hash changed: delete old, rebuild with new content
- Cache age > 90% of TTL (3240s of 3600s): proactive renewal
- No cache exists: create new if token count >= 2048, otherwise inline
Anthropic Cache (4-Breakpoint System)
[System prompt]─breakpoint 1
[Context chunks]─breakpoint 2
[Tool definitions]─breakpoint 3
[Last user message]─breakpoint 4
Before placing breakpoint 4, all existing cache_control is stripped to prevent exceeding the 4-breakpoint limit.
Context Refresh Mechanism
After the last tool call in each round, _reread_file_items(file_items) checks mtimes:
- For each file item: compare
Path.stat().st_mtimeagainst storedmtime - If unchanged: pass through as-is
- If changed: re-read content, store
old_contentfor diffing, updatemtime - Changed files are diffed via
_build_file_diff_text:- Files ≤ 200 lines: emit full content
- Files > 200 lines with
old_content: emitdifflib.unified_diff
- Diff is appended to the last tool's output as
[SYSTEM: FILES UPDATED]\n\n{diff} - Stale
[FILES UPDATED]blocks are stripped from older history turns by_strip_stale_file_refreshes
This is the "agent always sees current code" mechanism.
Subagent Summarization
For Tier 4: when an error occurs, qa_callback may be invoked to get a Tier 4 AI summary of the traceback. The summary is injected back into the worker's history as a hint.
def run_tier4_analysis(stderr: str) -> str:
"""Stateless Tier 4 QA analysis of an error message."""
# Uses a dedicated system prompt for error triage
# Returns analysis text (root cause, suggested fix)
# Does NOT modify any code — analysis only
For Tier 4 patch generation:
def run_tier4_patch_generation(error: str, file_context: str) -> str:
"""Generate a unified diff patch from an error and file context."""
# Returns the patch as a string
# The caller (typically the patch modal) presents it for human review
Public API Quick Reference
| Function | Purpose |
|---|---|
send(...) |
The main entry point — call the active provider |
set_provider(provider, model) |
Switch active provider and model |
get_provider() -> str |
Get the active provider name |
set_model_params(temp, max_tok, trunc_limit, top_p) |
Update generation params |
set_custom_system_prompt(prompt) |
Set the per-session system prompt override |
set_base_system_prompt(prompt) |
Set the foundational base prompt (advanced) |
set_use_default_base_prompt(use: bool) |
Toggle whether the base prompt is included |
set_project_context_marker(marker) |
Set the project-specific context tag |
reset_session() |
Clear all provider state |
get_comms_log() |
Read the in-memory comms log |
clear_comms_log() |
Clear the in-memory comms log |
get_token_stats(md_content) |
Estimate token usage for the given content |
cleanup() |
Tear down (delete Gemini caches, etc.) |
get_current_palette() -> str |
Get the current theme palette name |
list_models(provider) -> list[str] |
List available models for a provider |
run_tier4_analysis(stderr) -> str |
Tier 4 error analysis |
run_tier4_patch_generation(error, file_context) -> str |
Tier 4 patch generation |
run_subagent_summarization(file_path, content, is_code, outline) -> str |
AI summary of a file |
run_discussion_compression(text) -> str |
AI compression of a long discussion |
Thread Safety
_send_lock: threading.Lock— serializes all provider calls. No twosend()calls run concurrently.- Per-provider history locks (
_anthropic_history_lock, etc.) — guard the history list mutations. - The
EventEmitter(insrc/events.py) is thread-safe for subscribe/emit.
Testing
Unit Tests (no real API calls)
def test_set_provider():
from src import ai_client
ai_client.set_provider("anthropic", "claude-3-5-sonnet-latest")
assert ai_client.get_provider() == "anthropic"
ai_client.reset_session() # Cleanup
Mocked Tests
from unittest.mock import patch
def test_send_routes_to_provider(monkeypatch):
with patch.object(ai_client, "_send_anthropic", return_value="mocked") as m:
ai_client.set_provider("anthropic", "claude-3-5-sonnet-latest")
result = ai_client.send("system", "user")
assert result == "mocked"
m.assert_called_once()
ai_client.reset_session()
Integration (real API)
Gated by env var (e.g., RUN_REAL_AI_TESTS=1). Hits the real API. Not in default CI.
See Also
- guide_architecture.md — Threading model and provider dispatch
- guide_mma.md — How Tier 3 workers use ai_client
- guide_mcp_client.md — The 46 tools that ai_client can invoke (canonical list in
models.AGENT_TOOL_NAMES) - guide_rag.md — RAG engine integration via
rag_engineparameter - guide_state_lifecycle.md — The per-provider history globals (
_anthropic_history, etc.) are managed here; their locking and reset behavior is documented - guide_context_aggregation.md — The
aggregate.pypipeline that produces the markdown the AI client sends - conductor/product.md — Product-level overview of providers
- docs/reports/qwen_llama_grok_followup_audit_20260611.md — Audit of the parent track's gaps; follow-up track
qwen_llama_grok_followup_20260611covers them
Shared OpenAI-Compatible Helper (src/openai_compatible.py)
Added 2026-06-06 by the qwen_llama_grok_integration_20260606 track. Operates on a normalized request/response data structure so 4 OpenAI-compatible vendors (MiniMax, Grok, Llama, DeepSeek) can share the same request building, response parsing, streaming aggregation, tool call detection, and error classification logic.
Data Structures
@dataclass(frozen=True)
class NormalizedResponse:
text: str
tool_calls: list[dict[str, Any]]
usage_input_tokens: int
usage_output_tokens: int
usage_cache_read_tokens: int
usage_cache_creation_tokens: int
raw_response: Any
@dataclass
class OpenAICompatibleRequest:
messages: list[dict[str, Any]]
model: str
temperature: float = 0.0
top_p: float = 1.0
max_tokens: int = 8192
tools: Optional[list[dict[str, Any]]] = None
tool_choice: str = "auto"
stream: bool = False
stream_callback: Optional[Callable[[str], None]] = None
The Function
def send_openai_compatible(
client: Any, # openai.OpenAI client with vendor-specific base_url + auth
request: OpenAICompatibleRequest,
*, capabilities: "VendorCapabilities", # from src/vendor_capabilities.py
) -> NormalizedResponse:
The function:
- Translates
request.messagesinto the OpenAI SDK'smessagesparameter (passthrough — already in OpenAI shape). - Translates
request.toolsif non-None (passthrough for now; future: strip unsupported fields based oncapabilities). - Calls
client.chat.completions.create(...)with the right parameters. - If streaming: aggregates chunks; calls
stream_callback(text_chunk)for each text delta; collects final usage from the last chunk. - If non-streaming: parses the response in one shot.
- Returns a
NormalizedResponsewith text, tool calls (in OpenAI shape), usage stats. - On exception: classifies the OpenAI exception and re-raises as
ProviderError.
Usage Pattern (per vendor)
# _send_grok, _send_llama (single-shot placeholders), _send_minimax (with restored tool loop)
def _send_grok(md_content, user_message, base_dir, file_items=None, discussion_history="", stream=False, ...):
client = _ensure_grok_client() # openai.OpenAI(api_key=..., base_url="https://api.x.ai/v1")
with _grok_history_lock:
# ... build messages, append user, system + context ...
request = OpenAICompatibleRequest(
messages=messages, model=_model, stream=stream,
stream_callback=stream_callback,
)
caps = get_capabilities("grok", _model)
response = send_openai_compatible(client, request, capabilities=caps)
# ... append to history, return response.text ...
Qwen Adapter (src/qwen_adapter.py)
Qwen uses Alibaba's DashScope native SDK (not OpenAI-compatible) because DashScope's OpenAI-compatible mode drops important features (Qwen-Audio, Qwen-Long custom chunking, Qwen-VL-Max enhanced vision). The adapter normalizes DashScope tool format to OpenAI shape via build_dashscope_tools() and classifies DashScope exceptions via classify_dashscope_error().
Llama Multi-Backend
_send_llama supports 3 backends via the state globals _llama_base_url and _llama_api_key:
- Ollama (local):
http://localhost:11434/v1; no auth - OpenRouter (cloud aggregator):
https://openrouter.ai/api/v1 - Custom URL (escape hatch): any OpenAI-compatible endpoint
The local-LLM signal is _get_llama_cost_tracking() (returns False for localhost/127.0.0.1).
Tests
tests/test_vendor_capabilities.py(3 tests): registry lookup, vendor-default fallback, unknown-vendor raisestests/test_openai_compatible.py(6 tests): non-streaming, streaming aggregation, tool call detection, vision, error classification, frozen dataclass- conductor/tracks/nagent_review_20260608/report.md §15 Pitfalls #2 and #4 — Deep-dive on the per-provider history globals and the stateful singleton pattern; future-track candidate for stateless LLMClient