Per plan Task 7.1: removed all deprecation language about ai_client.send() from docs/guide_ai_client.md: - Removed the 'Public API > ai_client.send(...) deprecated' section - Updated 'Migration Notes for Existing Callers' to reflect the public_api_migration_and_ui_polish_20260615 completion - Updated 'Public API Result Migration' line in the see-also section to mark the follow-up track as COMPLETED (not 'planned') Verification: rg -i 'deprecat.*send|send.*deprecat' docs/guide_ai_client.md returns 0 hits (the only remaining 'deprecat' mention is the resolved Public API Result Migration bullet which now describes the resolution path, not a deprecation).
37 KiB
src/ai_client.py — Multi-Provider LLM Abstraction
Top | Architecture | Testing | MMA
Overview
src/ai_client.py (~116KB) is the unified LLM client for 8 providers. It abstracts the differences between providers (Gemini, Anthropic, DeepSeek, MiniMax, Gemini CLI, Qwen, Grok, Llama) behind a single send() function.
The module is a stateful singleton — all provider state is held in module-level globals. There is no class wrapping; the module itself is the abstraction layer.
The 8 providers split into 3 API shapes:
- Native SDK: Gemini (google-genai), Anthropic (anthropic), Qwen (DashScope)
- OpenAI-compatible: MiniMax, Grok, Llama (Ollama/OpenRouter/custom), DeepSeek
- Subprocess: Gemini CLI
The OpenAI-compatible vendors all call the shared helper in src/openai_compatible.py (added 2026-06-06 by the qwen_llama_grok_integration_20260606 track; see "Shared OpenAI-Compatible Helper" section below). The MiniMax provider's _send_minimax was refactored to use this helper (Phase 4 of the same track, 231 → 75 lines, 68% reduction).
Module-Level Imports
Important: The 5 provider SDKs are NOT imported at module level.
import google.genai,import anthropic,import openai, andimport fastapiare heavy (~430-955ms each on cold load) and are now obtained viasrc.module_loader._require_warmed("google.genai")and similar calls, after theWarmupManagerhas loaded them in the background. The module-level globals you see in the State section (_gemini_client,_anthropic_client, etc.) are typed asOptionalbecause they're populated by_require_warmed()on first use, not at import time.
This change was part of the 2026-06-06 startup_speedup_20260606 track. Before: import src.ai_client took ~1800ms. After: ~161ms. The remaining cost is the bare module skeleton.
Architecture
┌─────────────────────────────────────────────────┐
│ ai_client.send(md_content, user_message, ...) │
│ │
│ 1. _send_lock.acquire() — serialize all calls │
│ 2. Read _provider / _model │
│ 3. Route to provider-specific _send_<provider>() │
│ 4. Return str response │
└─────────────────┬───────────────────────────────┘
│ dispatches based on _provider
▼
┌────────┬─────────┬────────┬──────────┐
▼ ▼ ▼ ▼ ▼
_gemini _anthropic _deepseek _minimax _gemini_cli
(subprocess)
State
All state is module-level globals. The most important:
| Variable | Type | Purpose |
|---|---|---|
_provider: str |
"gemini" | "anthropic" | "deepseek" | "minimax" | "gemini_cli" |
Active provider |
_model: str |
str |
Active model name |
_temperature: float |
0.0 |
Sampling temperature |
_top_p: float |
1.0 |
Nucleus sampling |
_max_tokens: int |
8192 |
Output token cap |
_history_trunc_limit: int |
8000 |
Char limit for truncating old tool outputs |
_send_lock |
threading.Lock |
Serializes all send() calls |
_current_palette: str |
theme | Last-applied theme palette |
Per-Provider State
_gemini_client: Optional[genai.Client] = None
_gemini_chat: Any = None
_gemini_cache: Any = None
_gemini_cache_md_hash: Optional[str] = None
_gemini_cache_created_at: Optional[float] = None
_gemini_cached_file_paths: list[str] = []
_anthropic_client: Optional[anthropic.Anthropic] = None
_anthropic_history: list[dict] = []
_anthropic_history_lock: threading.Lock = threading.Lock()
_deepseek_client: Any = None
_deepseek_history: list[dict] = []
_deepseek_history_lock: threading.Lock = threading.Lock()
_minimax_client: Any = None
_minimax_history: list[dict] = []
_minimax_history_lock: threading.Lock = threading.Lock()
_gemini_cli_adapter: Optional[GeminiCliAdapter] = None
The Public API
send(...) — The Main Entry Point
def send(
md_content: str,
user_message: str,
base_dir: str = ".",
file_items: list[dict] | None = None,
discussion_history: str = "",
stream: bool = False,
pre_tool_callback: Optional[Callable] = None,
qa_callback: Optional[Callable] = None,
enable_tools: bool = True,
stream_callback: Optional[Callable] = None,
patch_callback: Optional[Callable] = None,
rag_engine: Optional[Any] = None,
) -> str:
Returns the model's response as a string. All provider calls go through here.
Parameters:
md_content— the system prompt + context (markdown)user_message— the user's messagebase_dir— for MCP tool filesystem operationsfile_items— files in the context (deprecated path; usually empty)discussion_history— legacy parameterstream/stream_callback— for streaming responsespre_tool_callback— called before each tool execution (HITL gate)qa_callback— called when an error occurs (Tier 4 integration)enable_tools— whether to enable PowerShell + MCP toolspatch_callback— Tier 4 patch generation hookrag_engine— optional RAG engine for context augmentation
Provider Switching
from src import ai_client
ai_client.set_provider("gemini", "gemini-3-flash-preview")
ai_client.set_provider("anthropic", "claude-3-5-sonnet-latest")
ai_client.set_provider("deepseek", "deepseek-chat")
ai_client.set_provider("minimax", "grok-2-latest")
ai_client.set_provider("gemini_cli", "gemini-2.0-flash")
Parameter Setters
ai_client.set_model_params(temp=0.7, max_tok=4096, top_p=0.9, trunc_limit=4000)
Session Management
ai_client.reset_session() # Clears all provider state, history, cache
Event Hooks
from src import ai_client
# Confirmation hook (called before destructive tool execution)
ai_client.confirm_and_run_callback = my_gui_callback
# Comms log hook (called on every API call)
ai_client.comms_log_callback = my_logging_callback
# Tool log hook (called on every tool completion)
ai_client.tool_log_callback = my_tool_logging_callback
# Event emitter (for any subscriber)
ai_client.events.on("my_event", my_handler)
Comms Log
ai_client._append_comms(direction, kind, payload) # Add entry
ai_client.get_comms_log() # Read all
ai_client.clear_comms_log() # Clear
ai_client.get_token_stats(md_content) # Estimate token usage
Provider Error Taxonomy — Legacy (Pre-Refactor)
As of 2026-06-11: This section describes the pre-refactor exception-based pattern. The
ProviderErrorclass is removed in thedata_oriented_error_handling_20260606track. See the new Data-Oriented Error Handling (Fleury Pattern) section below for the current convention.
class ProviderError(Exception):
kind: str # "quota" | "rate_limit" | "auth" | "balance" | "network" | "unknown"
provider: str
original: Exception
def ui_message(self) -> str:
"""Returns a user-friendly error message."""
ProviderError was raised by provider-specific _send_* functions on failure.
The caller (typically app_controller.py) caught it and surfaced the error to
the user via app.ai_status. Post-refactor, the same flow uses ErrorInfo
dataclasses inside Result[str] returns — see the new section below.
The Tool-Call Loop
All providers follow the same high-level pattern in _send_*:
def _send_<provider>(md_content, user_message, ...):
for round in range(MAX_TOOL_ROUNDS + 2): # up to 10 rounds
response = provider_api_call(md_content, user_message, history, tools)
comms_log(direction="IN", kind="response", payload=response)
if not has_function_calls(response):
return extract_text(response)
for call in response.function_calls:
if pre_tool_callback and pre_tool_callback(...) is rejected:
return rejection_message
tool_result = dispatch(call.name, call.args, base_dir)
append_tool_result_to_history(call, tool_result)
# Context refresh: re-read all tracked files (mtime check)
_reread_file_items(file_items)
# Truncate tool outputs at _history_trunc_limit
truncate_tool_outputs(history)
# Cumulative byte check
if cumulative_tool_bytes > 500_000:
inject_warning()
return final_response
The constants:
MAX_TOOL_ROUNDS: int = 10— max tool-call iterations persend()_MAX_TOOL_OUTPUT_BYTES: int = 500_000— cumulative tool output budget_ANTHROPIC_CHUNK_SIZE: int = 120_000— chars per Anthropic system text block_ANTHROPIC_MAX_PROMPT_TOKENS: int = 180_000— Anthropic prompt limit (200K minus headroom)_GEMINI_MAX_INPUT_TOKENS: int = 900_000— Gemini 1M window minus headroom
Provider-Specific Behaviors
Gemini (SDK)
- Server-side cache:
genai.CachedContentwith TTL management - Cache rebuild at 90% TTL: proactive renewal
- Cache hash: tracks content hash for invalidation
- Cached file paths: tracks which files are in the active cache
Anthropic
- Ephemeral prompt caching: 4
cache_control: ephemeralbreakpoints - Breakpoints: system prompt, context chunks, tool def, conversation prefix
- History trimming at 180K tokens: 2-phase (strip stale file refreshes, then drop turn pairs)
- History repair:
_repair_anthropic_historyhandles tool_result chain breaks
DeepSeek
- Raw HTTP: uses
requests.postdirectly (no SDK) - Streaming: supports streaming responses
- History repair:
_repair_deepseek_historyfor tool result chains
MiniMax
- OpenAI-compatible endpoint: uses the
openaiSDK - History trimming: similar to Anthropic (drop turn pairs at threshold)
- History repair:
_repair_minimax_history
Gemini CLI
- Subprocess adapter:
GeminiCliAdapterinsrc/gemini_cli_adapter.py - Persistent session: CLI maintains its own session ID
- JSONL output protocol: parses streaming JSONL from the CLI subprocess
- Full feature parity: tool calls, streaming, usage metadata
History Trimming Strategies
Gemini (40% threshold)
if total_in > _GEMINI_MAX_INPUT_TOKENS * 0.4:
while len(hist) > 4 and total_in > _GEMINI_MAX_INPUT_TOKENS * 0.3:
hist.pop(0) # Assistant
hist.pop(0) # User
Anthropic (180K limit)
_trim_anthropic_history(system_blocks, history) — two-phase:
- Strip stale
[SYSTEM: FILES UPDATED]blocks - Drop oldest turn pairs (preserving tool_result chains)
MiniMax
Same pattern as Anthropic (similar 180K limit).
DeepSeek
No built-in trimming (relies on the caller to keep history short).
Caching Strategies
Gemini Server-Side Cache
_gemini_cache_md_hash: Optional[str] = None # Hash of cached content
_gemini_cache_created_at: Optional[float] = None # Monotonic time
The cache decision is a 3-way branch on each _send_gemini call:
- Hash changed: delete old, rebuild with new content
- Cache age > 90% of TTL (3240s of 3600s): proactive renewal
- No cache exists: create new if token count >= 2048, otherwise inline
Anthropic Cache (4-Breakpoint System)
[System prompt]─breakpoint 1
[Context chunks]─breakpoint 2
[Tool definitions]─breakpoint 3
[Last user message]─breakpoint 4
Before placing breakpoint 4, all existing cache_control is stripped to prevent exceeding the 4-breakpoint limit.
Context Refresh Mechanism
After the last tool call in each round, _reread_file_items(file_items) checks mtimes:
- For each file item: compare
Path.stat().st_mtimeagainst storedmtime - If unchanged: pass through as-is
- If changed: re-read content, store
old_contentfor diffing, updatemtime - Changed files are diffed via
_build_file_diff_text:- Files ≤ 200 lines: emit full content
- Files > 200 lines with
old_content: emitdifflib.unified_diff
- Diff is appended to the last tool's output as
[SYSTEM: FILES UPDATED]\n\n{diff} - Stale
[FILES UPDATED]blocks are stripped from older history turns by_strip_stale_file_refreshes
This is the "agent always sees current code" mechanism.
Subagent Summarization
For Tier 4: when an error occurs, qa_callback may be invoked to get a Tier 4 AI summary of the traceback. The summary is injected back into the worker's history as a hint.
def run_tier4_analysis(stderr: str) -> str:
"""Stateless Tier 4 QA analysis of an error message."""
# Uses a dedicated system prompt for error triage
# Returns analysis text (root cause, suggested fix)
# Does NOT modify any code — analysis only
For Tier 4 patch generation:
def run_tier4_patch_generation(error: str, file_context: str) -> str:
"""Generate a unified diff patch from an error and file context."""
# Returns the patch as a string
# The caller (typically the patch modal) presents it for human review
Public API Quick Reference
| Function | Purpose |
|---|---|
send(...) |
The main entry point — call the active provider |
set_provider(provider, model) |
Switch active provider and model |
get_provider() -> str |
Get the active provider name |
set_model_params(temp, max_tok, trunc_limit, top_p) |
Update generation params |
set_custom_system_prompt(prompt) |
Set the per-session system prompt override |
set_base_system_prompt(prompt) |
Set the foundational base prompt (advanced) |
set_use_default_base_prompt(use: bool) |
Toggle whether the base prompt is included |
set_project_context_marker(marker) |
Set the project-specific context tag |
reset_session() |
Clear all provider state |
get_comms_log() |
Read the in-memory comms log |
clear_comms_log() |
Clear the in-memory comms log |
get_token_stats(md_content) |
Estimate token usage for the given content |
cleanup() |
Tear down (delete Gemini caches, etc.) |
get_current_palette() -> str |
Get the current theme palette name |
list_models(provider) -> list[str] |
List available models for a provider |
run_tier4_analysis(stderr) -> str |
Tier 4 error analysis |
run_tier4_patch_generation(error, file_context) -> str |
Tier 4 patch generation |
run_subagent_summarization(file_path, content, is_code, outline) -> str |
AI summary of a file |
run_discussion_compression(text) -> str |
AI compression of a long discussion |
Thread Safety
_send_lock: threading.Lock— serializes all provider calls. No twosend()calls run concurrently.- Per-provider history locks (
_anthropic_history_lock, etc.) — guard the history list mutations. - The
EventEmitter(insrc/events.py) is thread-safe for subscribe/emit.
Testing
Unit Tests (no real API calls)
def test_set_provider():
from src import ai_client
ai_client.set_provider("anthropic", "claude-3-5-sonnet-latest")
assert ai_client.get_provider() == "anthropic"
ai_client.reset_session() # Cleanup
Mocked Tests
from unittest.mock import patch
def test_send_routes_to_provider(monkeypatch):
with patch.object(ai_client, "_send_anthropic", return_value="mocked") as m:
ai_client.set_provider("anthropic", "claude-3-5-sonnet-latest")
result = ai_client.send("system", "user")
assert result == "mocked"
m.assert_called_once()
ai_client.reset_session()
Integration (real API)
Gated by env var (e.g., RUN_REAL_AI_TESTS=1). Hits the real API. Not in default CI.
Data-Oriented Error Handling (Fleury Pattern)
The provider layer follows the "errors are just cases" framework
(Ryan Fleury, The Easiest Way To Handle
Errors). The
canonical reference is
conductor/code_styleguides/error_handling.md.
Result-Based Returns
All _send_<vendor>_result() functions (8 vendors: Gemini, Anthropic,
DeepSeek, MiniMax, Gemini CLI, Qwen, Llama, Grok — plus the
_send_llama_native Ollama adapter) return Result[str, ErrorInfo]. SDK
exceptions are caught at the boundary (src/openai_compatible.py,
src/qwen_adapter.py) and converted to ErrorInfo dataclasses. The
_classify_<vendor>_error() functions return ErrorInfo (not raise
ProviderError, which has been removed).
The 12 canonical ErrorKind values: NETWORK, AUTH, QUOTA,
RATE_LIMIT, BALANCE, PERMISSION, NOT_FOUND, INVALID_INPUT,
NOT_READY, UNKNOWN, CONFIG, INTERNAL. Each has exactly one
meaning — do not overload UNKNOWN when a new failure mode surfaces
(Lottes's anti-pattern). ErrorInfo.source is one of
"ai_client.<vendor>" (e.g., "ai_client.gemini",
"ai_client.anthropic") for diagnostic routing.
Public API
ai_client.send_result(...)— the public API. ReturnsResult[str, ErrorInfo]. Accepts 13+ parameters including 8 callbacks. Internally calls_send_<vendor>()for the active provider (the vendor functions returnResult[str]directly).
Example
from src import ai_client
from src.result_types import ErrorKind
r = ai_client.send_result("system prompt", "user message")
if not r.ok:
for err in r.errors:
log.error(err.ui_message())
# err.kind is one of ErrorKind.*; err.source is "ai_client.<vendor>"
# use r.data regardless (it's the zero-initialized "" on failure)
print(r.data)
Migration Notes for Existing Callers
- All production call sites and tests now use
send_result(). The legacysend()function was removed in thepublic_api_migration_and_ui_polish_20260615track. - Tests that mock
ai_client._send_<vendor>should use theResult(data=...)return value pattern.
See Also (in-doc)
conductor/code_styleguides/error_handling.md— canonical styleguide (5 patterns, data model, decision tree, anti-patterns)conductor/tracks/data_oriented_error_handling_20260606/spec.md— the spec that introduced this patterndocs/guide_mcp_client.md— same pattern in the MCP tool layerdocs/guide_rag.md— same pattern in the RAG engine
See Also
- guide_architecture.md — Threading model and provider dispatch
- guide_mma.md — How Tier 3 workers use ai_client
- guide_mcp_client.md — The 46 tools that ai_client can invoke (canonical list in
models.AGENT_TOOL_NAMES) - guide_rag.md — RAG engine integration via
rag_engineparameter - guide_state_lifecycle.md — The per-provider history globals (
_anthropic_history, etc.) are managed here; their locking and reset behavior is documented - guide_context_aggregation.md — The
aggregate.pypipeline that produces the markdown the AI client sends - conductor/product.md — Product-level overview of providers
- docs/reports/qwen_llama_grok_followup_audit_20260611.md — Audit of the parent track's gaps; follow-up track
qwen_llama_grok_followup_20260611covers them - Gemini / Gemini CLI thinking-format compatibility (deferred from
ai_loop_regressions_20260614) — the user's complaint included Gemini; the likely cause is a format mismatch between the Gemini SDK output andparse_thinking_trace. Empirically investigate by running a Gemini request that produces reasoning and inspecting the rawresp.text. Resolved 2026-06-15 bydoeh_test_thinking_cleanup_20260615: thegoogle-genaiSDK filtersthought=Trueparts out ofresp.text. The new helper_extract_gemini_thoughtsinsrc/ai_client.pyscansresp.candidates[0].content.partsforthought=Trueand prepends the concatenated text as<thinking>...</thinking>soparse_thinking_traceextracts it. 5 regression tests intests/test_gemini_thinking_format.pycover the helper and the wrap path. See track spec §3.2 G15. <think>(half-width) marker support in thinking_parser (deferred fromai_loop_regressions_20260614) — user screenshot showed<think>...</think>format; currentparse_thinking_tracerequires<thinking>. The change is small (~3 lines insrc/thinking_parser.py:9). Resolved 2026-06-15 bydoeh_test_thinking_cleanup_20260615: thetag_patternregex insrc/thinking_parser.py:20now also matches<think>...</think>(the backreference\1matches the closing tag). New testtest_parse_half_width_think_tagintests/test_thinking_trace.py. All 8 thinking_trace tests pass.- Public API Result Migration (planned, separate track
public_api_migration_20260606) — the 5 production + 63 test call sites not migrated in this track; the follow-up removes the deprecatedai_client.send(). See parent track spec §12.1. Completed 2026-06-15 bypublic_api_migration_and_ui_polish_20260615: 3 remaining production call sites (src/conductor_tech_lead.py:68, src/orchestrator_pm.py:86, src/multi_agent_conductor.py:591) + 18 test files (11 call-site + 7 production-affected mock) were migrated tosend_result(). The deprecatedsend()function was removed fromsrc/ai_client.py. See track spec. doeh_test_thinking_cleanup_20260615(shipped 2026-06-15) — cleanup follow-up todata_oriented_error_handling_20260606andai_loop_regressions_20260614. Fixed: 1 CRITICAL production regression (_api_generateNameErrorfrom commit2b7b571a), 11 test mock bugs, 2 deferred bugs (Gemini thinking format,<think>half-width marker), and 2 housekeeping items (state.toml duplicate keys, tracks.md row 24). See track spec + plan.
Shared OpenAI-Compatible Helper (src/openai_compatible.py)
Added 2026-06-06 by the qwen_llama_grok_integration_20260606 track. Operates on a normalized request/response data structure so 4 OpenAI-compatible vendors (MiniMax, Grok, Llama, DeepSeek) can share the same request building, response parsing, streaming aggregation, tool call detection, and error classification logic.
Data Structures
@dataclass(frozen=True)
class NormalizedResponse:
text: str
tool_calls: list[dict[str, Any]]
usage_input_tokens: int
usage_output_tokens: int
usage_cache_read_tokens: int
usage_cache_creation_tokens: int
raw_response: Any
@dataclass
class OpenAICompatibleRequest:
messages: list[dict[str, Any]]
model: str
temperature: float = 0.0
top_p: float = 1.0
max_tokens: int = 8192
tools: Optional[list[dict[str, Any]]] = None
tool_choice: str = "auto"
stream: bool = False
stream_callback: Optional[Callable[[str], None]] = None
The Function
def send_openai_compatible(
client: Any, # openai.OpenAI client with vendor-specific base_url + auth
request: OpenAICompatibleRequest,
*, capabilities: "VendorCapabilities", # from src/vendor_capabilities.py
) -> NormalizedResponse:
The function:
- Translates
request.messagesinto the OpenAI SDK'smessagesparameter (passthrough — already in OpenAI shape). - Translates
request.toolsif non-None (passthrough for now; future: strip unsupported fields based oncapabilities). - Calls
client.chat.completions.create(...)with the right parameters. - If streaming: aggregates chunks; calls
stream_callback(text_chunk)for each text delta; collects final usage from the last chunk. - If non-streaming: parses the response in one shot.
- Returns a
NormalizedResponsewith text, tool calls (in OpenAI shape), usage stats. - On exception: classifies the OpenAI exception and re-raises as
ProviderError.
Usage Pattern (per vendor)
# _send_grok, _send_llama (single-shot placeholders), _send_minimax (with restored tool loop)
def _send_grok(md_content, user_message, base_dir, file_items=None, discussion_history="", stream=False, ...):
client = _ensure_grok_client() # openai.OpenAI(api_key=..., base_url="https://api.x.ai/v1")
with _grok_history_lock:
# ... build messages, append user, system + context ...
request = OpenAICompatibleRequest(
messages=messages, model=_model, stream=stream,
stream_callback=stream_callback,
)
caps = get_capabilities("grok", _model)
response = send_openai_compatible(client, request, capabilities=caps)
# ... append to history, return response.text ...
Qwen Adapter (src/qwen_adapter.py)
Qwen uses Alibaba's DashScope native SDK (not OpenAI-compatible) because DashScope's OpenAI-compatible mode drops important features (Qwen-Audio, Qwen-Long custom chunking, Qwen-VL-Max enhanced vision). The adapter normalizes DashScope tool format to OpenAI shape via build_dashscope_tools() and classifies DashScope exceptions via classify_dashscope_error().
Llama Multi-Backend
_send_llama supports 3 backends via the state globals _llama_base_url and _llama_api_key:
- Ollama (local):
http://localhost:11434/v1; no auth - OpenRouter (cloud aggregator):
https://openrouter.ai/api/v1 - Custom URL (escape hatch): any OpenAI-compatible endpoint
run_with_tool_loop — Shared Tool-Call Loop Helper
Added 2026-06-11 by the qwen_llama_grok_followup_20260611 track. Wraps send_openai_compatible with the tool-call loop, so 4+ OpenAI-compatible vendors share the same dispatch + history logic instead of each having their own inline loop.
Signature (in src/ai_client.py:806):
def run_with_tool_loop(
client: Any,
request: OpenAICompatibleRequest | Callable[[int], OpenAICompatibleRequest],
*,
capabilities: "VendorCapabilities",
pre_tool_callback: Optional[Callable] = None,
qa_callback: Optional[Callable] = None,
stream_callback: Optional[Callable[[str], None]] = None,
patch_callback: Optional[Callable] = None,
base_dir: str,
vendor_name: str,
history_lock: Optional[threading.Lock] = None,
history: Optional[list] = None,
trim_func: Optional[Callable] = None,
send_func: Optional[Callable[[int], "NormalizedResponse"]] = None,
on_pre_dispatch: Optional[Callable] = None,
) -> str:
Two extensions were added beyond the original signature:
requestaccepts aCallable[[int], OpenAICompatibleRequest](per-round history rebuild). Use this when the vendor mutates history between rounds (e.g., MiniMax's per-round append).send_func + on_pre_dispatchallows vendored call paths (e.g., Gemini CLI'sGeminiCliAdapter) to share the loop + dispatch without going throughsend_openai_compatible.
Vendors applied (as of 2026-06-11):
_send_minimax(was inline, now uses helper)_send_grok(was single-shot, now has loop)_send_llama(was single-shot, now has loop)_send_gemini_cli(usessend_func+on_pre_dispatch)
Vendors still deferred (multi-day refactor; see conductor/tracks/qwen_llama_grok_followup_20260611/state.toml t5_6/7/8):
_send_anthropic(uses anthropic SDK)_send_gemini(uses google-genai streaming)_send_deepseek(uses requests.post)
Audit enforcement: scripts/audit_no_inline_tool_loops.py fails if any non-deferred _send_<vendor>() has an inline for ... in range(MAX_TOOL_ROUNDS) loop.
Native Ollama Adapter (Phase 4)
Added 2026-06-11. When _llama_base_url is localhost / 127.0.0.1 (Ollama default), _send_llama routes to _send_llama_native (which wraps ollama_chat). The native adapter POSTs to /api/chat (NOT /v1/chat/completions) and supports Ollama's vendor-specific fields:
think:low|medium|high— reasoning depth hintimages: list of base64-encoded images (for vision-capable models)thinking: returned field; captured in history for subsequent rounds
The dispatcher check is in _send_llama at the function head:
if "localhost" in _llama_base_url or "127.0.0.1" in _llama_base_url:
return _send_llama_native(...)
For OpenRouter, custom URLs, and other cloud Llama endpoints, the existing OpenAI-compat path is unchanged.
V2 Capability Matrix (Phase 4)
Added 2026-06-11. The VendorCapabilities dataclass in src/vendor_capabilities.py now has 12 v2 fields beyond the original 7 v1 fields:
V1 fields (unchanged):
vision,tool_calling,caching,streaming,model_discovery,context_window,cost_tracking
V2 fields (added):
local— backend is on-device (Ollama, etc.); consumed by_apply_runtime_caps_overridefor llama+localhostreasoning— model supportsthinking/ reasoning traces (e.g., MiniMax-M2.5/M2.7, DeepSeek R1, llama-3.1-405b-reasoning)structured_output— model supports JSON / tool-use output formatcode_execution— model can run code (server-side; e.g., gemini-2.0-experimental)web_search— model can do live web search (e.g., grok-2, gemini-grounded)x_search— X/Twitter search (grok-specific)file_search— model has a file_search tool (Anthropic)mcp_support— model supports the Model Context Protocol (Anthropic, gemini)audio— model accepts audio input (gemini-2.5+, qwen-audio)video— model accepts video input (gemini-2.5+, qwen-vl-max)grounding— model supports grounding (gemini)computer_use— model can drive a computer (Anthropic claude-3.5+)
GUI rendering: src/gui_2.py:_render_v2_capability_badges renders small green badges in the provider panel for each field where caps.<field> = True. The user can see at a glance which capabilities their active vendor+model supports.
Static + runtime: Most v2 fields are per-model properties in the registry. caps.local is unique — it's runtime state (URL-dependent), so the GUI uses dataclasses.replace(caps, local=True) to override when the active backend is Ollama.
PROVIDERS Location (Phase 2)
The PROVIDERS list moved from src/models.py to src/ai_client.py:56 per the AGENTS.md HARD RULE (no new src/<thing>.py files). A PEP 562 __getattr__ re-export in src/models.py:261 maintains backward compatibility (lazy import; breaks the circular dependency where src/ai_client.py imports ToolPreset from src/models.py).
Audit: scripts/audit_providers_source_of_truth.py fails if PROVIDERS is declared in src/models.py.
Tests
tests/test_vendor_capabilities.py(3 tests): registry lookup, vendor-default fallback, unknown-vendor raisestests/test_openai_compatible.py(6 tests): non-streaming, streaming aggregation, tool call detection, vision, error classification, frozen dataclass- conductor/tracks/nagent_review_20260608/report.md §15 Pitfalls #2 and #4 — Deep-dive on the per-provider history globals and the stateful singleton pattern; future-track candidate for stateless LLMClient
Addition (2026-06-12) — Cache strategy and the 12-layer model
The nagent review (v2.3, §3.2 + §5) formalizes the cache strategy that this client implements. The strategy: stable-to-volatile context ordering, where layers 1-7 of the initial context are byte-identical across turns and across discussions of the same mode (and therefore cacheable), and layers 8-12 are per-turn (and therefore not cached).
The 12-layer model (the recap)
The canonical reference is conductor/code_styleguides/cache_friendly_context.md §1 (the full 12-layer table with the stable/volatile classification + the ─── data markings + the byte-comparison test). This section is a pointer.
The one-line summary: layers 1-7 (role instructions, function-calling schema, tool descriptions, system prompt, persona, project context, knowledge digest) are byte-identical across turns and cacheable. Layers 8-12 are per-turn and NOT cached. The cache boundary is at layer 7/8.
The byte-comparison test (the design contract)
The test in tests/test_aggregate_caching.py ensures the first N characters of the context are byte-identical across turns:
def test_aggregate_stable_to_volatile_ordering():
ctrl = mock_app_controller()
turn1 = aggregate.build_initial_context(ctrl, user_message="first")
turn2 = aggregate.build_initial_context(ctrl, user_message="second")
N = aggregate.stable_prefix_length(ctrl)
assert turn1[:N] == turn2[:N], f"Stable prefix mismatch: {turn1[:N]!r} != {turn2[:N]!r}"
The test is the contract. If a new layer is added in the wrong position, the test fails; the agent must move the layer to the stable position or update the test with written justification.
The provider-specific cache strategies
Anthropic (5-min ephemeral, 4 breakpoints max)
def _send_anthropic(messages, *, cache_prefix_chars=None):
if cache_prefix_chars is not None:
content_blocks = cache_prefix_blocks(messages, cache_prefix_chars)
else:
content_blocks = messages
response = anthropic_client.messages.create(
model=model,
max_tokens=8192,
messages=[{"role": "user", "content": content_blocks}],
)
return _result_with_usage(response.content, response.usage, messages)
The cache_prefix_blocks helper splits the message at the given char offsets and marks each prefix with cache_control: {"type": "ephemeral"}. Max 3 prefix blocks (provider limit is 4 breakpoints per request).
The Anthropic usage accounting (in _result_with_usage): cache_read_input_tokens + cache_creation_input_tokens are added to input_tokens so the accounting stays "tokens sent" across providers. Caching is invisible in the user-facing number.
Gemini (1-h explicit, configurable TTL)
def _send_gemini(messages, *, cache_ttl_seconds=3600):
if cache_ttl_seconds > 0:
cached_content = genai_client.caches.create(
model=model, contents=stable_prefix_messages, ttl=f"{cache_ttl_seconds}s",
)
response = genai_client.models.generate_content(
model=model, contents=volatile_messages,
config=genai.types.GenerateContentConfig(cached_content=cached_content.name),
)
else:
response = genai_client.models.generate_content(model=model, contents=messages)
return _result_with_usage(response.text, response.usage_metadata, messages)
The default TTL is 1 hour; configurable per-discussion via the GUI.
OpenAI (5-10 min implicit, provider-managed)
No application-side control; the provider handles caching. The GUI just shows "Cached by OpenAI; TTL: provider-managed."
The GUI exposure (the "Caching" Operations Hub sub-panel)
| Provider | Default TTL | Configurable? |
|---|---|---|
| Anthropic ephemeral | 5 min | yes (per-discussion state) |
| Gemini explicit | 1 h | yes (TTL override) |
| OpenAI implicit | 5-10 min (provider-managed) | no |
| claude-code (Claude Agent SDK) | varies (provider-managed) | no |
The new AI client state:
@dataclass
class DiscussionCacheState:
discussion_id: str
provider: str
cached_at: datetime
expires_at: Optional[datetime] # None for OpenAI implicit
hit_count: int = 0
tokens_cached: int = 0
last_invalidated_at: Optional[datetime] = None
caching_enabled: bool = True
The Hook API additions:
GET /api/cache # list all discussion cache states
GET /api/cache/<discussion_id> # get one
POST /api/cache/<discussion_id>/invalidate
POST /api/cache/<discussion_id>/disable
POST /api/cache/<discussion_id>/enable
The 5th provider (claude-code)
claude-code uses the Claude Agent SDK with local Claude Code authentication (no API key). The caching behavior is provider-managed.
def _send_claude_code(message, model, *, allowed_tools=None, max_turns=1):
options = ClaudeAgentOptions(
model=None if not model or model == "default" else model,
max_turns=max_turns,
tools=list(allowed_tools) if allowed_tools else [],
allowed_tools=list(allowed_tools) if allowed_tools else [],
cwd=os.getcwd(),
)
# ... claude_agent_sdk.query(prompt=message, options=options)
return _result_with_usage(text, usage, message)
The cross-references
docs/guide_caching_strategy.md— the user-facing deep-diveconductor/code_styleguides/cache_friendly_context.md— the canonical styleguidedocs/guide_agent_memory_dimensions.md— the 4 dims (where the cache hits)conductor/tracks/nagent_review_20260608/nagent_review_v2_3_20260612.md§3.2, §5 — the nagent pattern