Private
Public Access
0
0
Files
manual_slop/docs/guide_ai_client.md
T
ed 88aea3199c docs(guides): document run_with_tool_loop, native Ollama, v2 matrix, PROVIDERS
Updates docs/guide_ai_client.md and docs/guide_models.md
to document the follow-up track's Phase 1-4 work:

guide_ai_client.md (added 3 sections + 1 inline note):
  - run_with_tool_loop shared helper (signature, the
    2 extensions for vendored call paths, the
    4 applied + 3 deferred vendors, audit script)
  - Native Ollama adapter (the dispatcher check in
    _send_llama, the think/images/thinking fields,
    the /api/chat endpoint difference)
  - V2 Capability Matrix (12 fields, GUI rendering,
    static vs runtime caps.local)
  - PROVIDERS Location (Phase 2 move, PEP 562 re-export)

guide_models.md (added 2 sections):
  - PROVIDERS Constant (location change + circular
    import rationale + audit)
  - V2 Capability Matrix (v2 field list, how to add
    a new v2 field per the HARD RULE on no new
    src/<thing>.py files)

These docs were previously stale; they still described the
v1 matrix only and the old 'inline tool loop' pattern.
Phase 5 t5_5 is the docs step that brings them in sync
with the current code.

Verification: 118/118 vendor+tool+provider+import-isolation
tests pass (no regressions; docs changes do not affect code)
2026-06-11 21:51:55 -04:00

25 KiB

src/ai_client.py — Multi-Provider LLM Abstraction

Top | Architecture | Testing | MMA


Overview

src/ai_client.py (~116KB) is the unified LLM client for 8 providers. It abstracts the differences between providers (Gemini, Anthropic, DeepSeek, MiniMax, Gemini CLI, Qwen, Grok, Llama) behind a single send() function.

The module is a stateful singleton — all provider state is held in module-level globals. There is no class wrapping; the module itself is the abstraction layer.

The 8 providers split into 3 API shapes:

  • Native SDK: Gemini (google-genai), Anthropic (anthropic), Qwen (DashScope)
  • OpenAI-compatible: MiniMax, Grok, Llama (Ollama/OpenRouter/custom), DeepSeek
  • Subprocess: Gemini CLI

The OpenAI-compatible vendors all call the shared helper in src/openai_compatible.py (added 2026-06-06 by the qwen_llama_grok_integration_20260606 track; see "Shared OpenAI-Compatible Helper" section below). The MiniMax provider's _send_minimax was refactored to use this helper (Phase 4 of the same track, 231 → 75 lines, 68% reduction).


Module-Level Imports

Important: The 5 provider SDKs are NOT imported at module level. import google.genai, import anthropic, import openai, and import fastapi are heavy (~430-955ms each on cold load) and are now obtained via src.module_loader._require_warmed("google.genai") and similar calls, after the WarmupManager has loaded them in the background. The module-level globals you see in the State section (_gemini_client, _anthropic_client, etc.) are typed as Optional because they're populated by _require_warmed() on first use, not at import time.

This change was part of the 2026-06-06 startup_speedup_20260606 track. Before: import src.ai_client took ~1800ms. After: ~161ms. The remaining cost is the bare module skeleton.

Architecture

┌─────────────────────────────────────────────────┐
│ ai_client.send(md_content, user_message, ...)    │
│                                                 │
│ 1. _send_lock.acquire() — serialize all calls   │
│ 2. Read _provider / _model                       │
│ 3. Route to provider-specific _send_<provider>() │
│ 4. Return str response                           │
└─────────────────┬───────────────────────────────┘
                  │ dispatches based on _provider
                  ▼
   ┌────────┬─────────┬────────┬──────────┐
   ▼        ▼         ▼        ▼          ▼
_gemini  _anthropic _deepseek _minimax  _gemini_cli
                                              (subprocess)

State

All state is module-level globals. The most important:

Variable Type Purpose
_provider: str "gemini" | "anthropic" | "deepseek" | "minimax" | "gemini_cli" Active provider
_model: str str Active model name
_temperature: float 0.0 Sampling temperature
_top_p: float 1.0 Nucleus sampling
_max_tokens: int 8192 Output token cap
_history_trunc_limit: int 8000 Char limit for truncating old tool outputs
_send_lock threading.Lock Serializes all send() calls
_current_palette: str theme Last-applied theme palette

Per-Provider State

_gemini_client: Optional[genai.Client] = None
_gemini_chat: Any = None
_gemini_cache: Any = None
_gemini_cache_md_hash: Optional[str] = None
_gemini_cache_created_at: Optional[float] = None
_gemini_cached_file_paths: list[str] = []

_anthropic_client: Optional[anthropic.Anthropic] = None
_anthropic_history: list[dict] = []
_anthropic_history_lock: threading.Lock = threading.Lock()

_deepseek_client: Any = None
_deepseek_history: list[dict] = []
_deepseek_history_lock: threading.Lock = threading.Lock()

_minimax_client: Any = None
_minimax_history: list[dict] = []
_minimax_history_lock: threading.Lock = threading.Lock()

_gemini_cli_adapter: Optional[GeminiCliAdapter] = None

The Public API

send(...) — The Main Entry Point

def send(
    md_content: str,
    user_message: str,
    base_dir: str = ".",
    file_items: list[dict] | None = None,
    discussion_history: str = "",
    stream: bool = False,
    pre_tool_callback: Optional[Callable] = None,
    qa_callback: Optional[Callable] = None,
    enable_tools: bool = True,
    stream_callback: Optional[Callable] = None,
    patch_callback: Optional[Callable] = None,
    rag_engine: Optional[Any] = None,
) -> str:

Returns the model's response as a string. All provider calls go through here.

Parameters:

  • md_content — the system prompt + context (markdown)
  • user_message — the user's message
  • base_dir — for MCP tool filesystem operations
  • file_items — files in the context (deprecated path; usually empty)
  • discussion_history — legacy parameter
  • stream / stream_callback — for streaming responses
  • pre_tool_callback — called before each tool execution (HITL gate)
  • qa_callback — called when an error occurs (Tier 4 integration)
  • enable_tools — whether to enable PowerShell + MCP tools
  • patch_callback — Tier 4 patch generation hook
  • rag_engine — optional RAG engine for context augmentation

Provider Switching

from src import ai_client
ai_client.set_provider("gemini", "gemini-3-flash-preview")
ai_client.set_provider("anthropic", "claude-3-5-sonnet-latest")
ai_client.set_provider("deepseek", "deepseek-chat")
ai_client.set_provider("minimax", "grok-2-latest")
ai_client.set_provider("gemini_cli", "gemini-2.0-flash")

Parameter Setters

ai_client.set_model_params(temp=0.7, max_tok=4096, top_p=0.9, trunc_limit=4000)

Session Management

ai_client.reset_session()  # Clears all provider state, history, cache

Event Hooks

from src import ai_client

# Confirmation hook (called before destructive tool execution)
ai_client.confirm_and_run_callback = my_gui_callback

# Comms log hook (called on every API call)
ai_client.comms_log_callback = my_logging_callback

# Tool log hook (called on every tool completion)
ai_client.tool_log_callback = my_tool_logging_callback

# Event emitter (for any subscriber)
ai_client.events.on("my_event", my_handler)

Comms Log

ai_client._append_comms(direction, kind, payload)  # Add entry
ai_client.get_comms_log()  # Read all
ai_client.clear_comms_log()  # Clear
ai_client.get_token_stats(md_content)  # Estimate token usage

Provider Error Taxonomy

class ProviderError(Exception):
    kind: str  # "quota" | "rate_limit" | "auth" | "balance" | "network" | "unknown"
    provider: str
    original: Exception

    def ui_message(self) -> str:
        """Returns a user-friendly error message."""

ProviderError is raised by provider-specific _send_* functions on failure. The caller (typically app_controller.py) catches it and surfaces the error to the user via app.ai_status.


The Tool-Call Loop

All providers follow the same high-level pattern in _send_*:

def _send_<provider>(md_content, user_message, ...):
    for round in range(MAX_TOOL_ROUNDS + 2):  # up to 10 rounds
        response = provider_api_call(md_content, user_message, history, tools)
        comms_log(direction="IN", kind="response", payload=response)

        if not has_function_calls(response):
            return extract_text(response)

        for call in response.function_calls:
            if pre_tool_callback and pre_tool_callback(...) is rejected:
                return rejection_message
            tool_result = dispatch(call.name, call.args, base_dir)
            append_tool_result_to_history(call, tool_result)

        # Context refresh: re-read all tracked files (mtime check)
        _reread_file_items(file_items)

        # Truncate tool outputs at _history_trunc_limit
        truncate_tool_outputs(history)

        # Cumulative byte check
        if cumulative_tool_bytes > 500_000:
            inject_warning()

    return final_response

The constants:

  • MAX_TOOL_ROUNDS: int = 10 — max tool-call iterations per send()
  • _MAX_TOOL_OUTPUT_BYTES: int = 500_000 — cumulative tool output budget
  • _ANTHROPIC_CHUNK_SIZE: int = 120_000 — chars per Anthropic system text block
  • _ANTHROPIC_MAX_PROMPT_TOKENS: int = 180_000 — Anthropic prompt limit (200K minus headroom)
  • _GEMINI_MAX_INPUT_TOKENS: int = 900_000 — Gemini 1M window minus headroom

Provider-Specific Behaviors

Gemini (SDK)

  • Server-side cache: genai.CachedContent with TTL management
  • Cache rebuild at 90% TTL: proactive renewal
  • Cache hash: tracks content hash for invalidation
  • Cached file paths: tracks which files are in the active cache

Anthropic

  • Ephemeral prompt caching: 4 cache_control: ephemeral breakpoints
  • Breakpoints: system prompt, context chunks, tool def, conversation prefix
  • History trimming at 180K tokens: 2-phase (strip stale file refreshes, then drop turn pairs)
  • History repair: _repair_anthropic_history handles tool_result chain breaks

DeepSeek

  • Raw HTTP: uses requests.post directly (no SDK)
  • Streaming: supports streaming responses
  • History repair: _repair_deepseek_history for tool result chains

MiniMax

  • OpenAI-compatible endpoint: uses the openai SDK
  • History trimming: similar to Anthropic (drop turn pairs at threshold)
  • History repair: _repair_minimax_history

Gemini CLI

  • Subprocess adapter: GeminiCliAdapter in src/gemini_cli_adapter.py
  • Persistent session: CLI maintains its own session ID
  • JSONL output protocol: parses streaming JSONL from the CLI subprocess
  • Full feature parity: tool calls, streaming, usage metadata

History Trimming Strategies

Gemini (40% threshold)

if total_in > _GEMINI_MAX_INPUT_TOKENS * 0.4:
    while len(hist) > 4 and total_in > _GEMINI_MAX_INPUT_TOKENS * 0.3:
        hist.pop(0)  # Assistant
        hist.pop(0)  # User

Anthropic (180K limit)

_trim_anthropic_history(system_blocks, history) — two-phase:

  1. Strip stale [SYSTEM: FILES UPDATED] blocks
  2. Drop oldest turn pairs (preserving tool_result chains)

MiniMax

Same pattern as Anthropic (similar 180K limit).

DeepSeek

No built-in trimming (relies on the caller to keep history short).


Caching Strategies

Gemini Server-Side Cache

_gemini_cache_md_hash: Optional[str] = None  # Hash of cached content
_gemini_cache_created_at: Optional[float] = None  # Monotonic time

The cache decision is a 3-way branch on each _send_gemini call:

  • Hash changed: delete old, rebuild with new content
  • Cache age > 90% of TTL (3240s of 3600s): proactive renewal
  • No cache exists: create new if token count >= 2048, otherwise inline

Anthropic Cache (4-Breakpoint System)

[System prompt]─breakpoint 1
[Context chunks]─breakpoint 2
[Tool definitions]─breakpoint 3
[Last user message]─breakpoint 4

Before placing breakpoint 4, all existing cache_control is stripped to prevent exceeding the 4-breakpoint limit.


Context Refresh Mechanism

After the last tool call in each round, _reread_file_items(file_items) checks mtimes:

  1. For each file item: compare Path.stat().st_mtime against stored mtime
  2. If unchanged: pass through as-is
  3. If changed: re-read content, store old_content for diffing, update mtime
  4. Changed files are diffed via _build_file_diff_text:
    • Files ≤ 200 lines: emit full content
    • Files > 200 lines with old_content: emit difflib.unified_diff
  5. Diff is appended to the last tool's output as [SYSTEM: FILES UPDATED]\n\n{diff}
  6. Stale [FILES UPDATED] blocks are stripped from older history turns by _strip_stale_file_refreshes

This is the "agent always sees current code" mechanism.


Subagent Summarization

For Tier 4: when an error occurs, qa_callback may be invoked to get a Tier 4 AI summary of the traceback. The summary is injected back into the worker's history as a hint.

def run_tier4_analysis(stderr: str) -> str:
    """Stateless Tier 4 QA analysis of an error message."""
    # Uses a dedicated system prompt for error triage
    # Returns analysis text (root cause, suggested fix)
    # Does NOT modify any code — analysis only

For Tier 4 patch generation:

def run_tier4_patch_generation(error: str, file_context: str) -> str:
    """Generate a unified diff patch from an error and file context."""
    # Returns the patch as a string
    # The caller (typically the patch modal) presents it for human review

Public API Quick Reference

Function Purpose
send(...) The main entry point — call the active provider
set_provider(provider, model) Switch active provider and model
get_provider() -> str Get the active provider name
set_model_params(temp, max_tok, trunc_limit, top_p) Update generation params
set_custom_system_prompt(prompt) Set the per-session system prompt override
set_base_system_prompt(prompt) Set the foundational base prompt (advanced)
set_use_default_base_prompt(use: bool) Toggle whether the base prompt is included
set_project_context_marker(marker) Set the project-specific context tag
reset_session() Clear all provider state
get_comms_log() Read the in-memory comms log
clear_comms_log() Clear the in-memory comms log
get_token_stats(md_content) Estimate token usage for the given content
cleanup() Tear down (delete Gemini caches, etc.)
get_current_palette() -> str Get the current theme palette name
list_models(provider) -> list[str] List available models for a provider
run_tier4_analysis(stderr) -> str Tier 4 error analysis
run_tier4_patch_generation(error, file_context) -> str Tier 4 patch generation
run_subagent_summarization(file_path, content, is_code, outline) -> str AI summary of a file
run_discussion_compression(text) -> str AI compression of a long discussion

Thread Safety

  • _send_lock: threading.Lock — serializes all provider calls. No two send() calls run concurrently.
  • Per-provider history locks (_anthropic_history_lock, etc.) — guard the history list mutations.
  • The EventEmitter (in src/events.py) is thread-safe for subscribe/emit.

Testing

Unit Tests (no real API calls)

def test_set_provider():
    from src import ai_client
    ai_client.set_provider("anthropic", "claude-3-5-sonnet-latest")
    assert ai_client.get_provider() == "anthropic"
    ai_client.reset_session()  # Cleanup

Mocked Tests

from unittest.mock import patch

def test_send_routes_to_provider(monkeypatch):
    with patch.object(ai_client, "_send_anthropic", return_value="mocked") as m:
        ai_client.set_provider("anthropic", "claude-3-5-sonnet-latest")
        result = ai_client.send("system", "user")
        assert result == "mocked"
        m.assert_called_once()
    ai_client.reset_session()

Integration (real API)

Gated by env var (e.g., RUN_REAL_AI_TESTS=1). Hits the real API. Not in default CI.


See Also


Shared OpenAI-Compatible Helper (src/openai_compatible.py)

Added 2026-06-06 by the qwen_llama_grok_integration_20260606 track. Operates on a normalized request/response data structure so 4 OpenAI-compatible vendors (MiniMax, Grok, Llama, DeepSeek) can share the same request building, response parsing, streaming aggregation, tool call detection, and error classification logic.

Data Structures

@dataclass(frozen=True)
class NormalizedResponse:
    text: str
    tool_calls: list[dict[str, Any]]
    usage_input_tokens: int
    usage_output_tokens: int
    usage_cache_read_tokens: int
    usage_cache_creation_tokens: int
    raw_response: Any

@dataclass
class OpenAICompatibleRequest:
    messages: list[dict[str, Any]]
    model: str
    temperature: float = 0.0
    top_p: float = 1.0
    max_tokens: int = 8192
    tools: Optional[list[dict[str, Any]]] = None
    tool_choice: str = "auto"
    stream: bool = False
    stream_callback: Optional[Callable[[str], None]] = None

The Function

def send_openai_compatible(
    client: Any,        # openai.OpenAI client with vendor-specific base_url + auth
    request: OpenAICompatibleRequest,
    *, capabilities: "VendorCapabilities",  # from src/vendor_capabilities.py
) -> NormalizedResponse:

The function:

  1. Translates request.messages into the OpenAI SDK's messages parameter (passthrough — already in OpenAI shape).
  2. Translates request.tools if non-None (passthrough for now; future: strip unsupported fields based on capabilities).
  3. Calls client.chat.completions.create(...) with the right parameters.
  4. If streaming: aggregates chunks; calls stream_callback(text_chunk) for each text delta; collects final usage from the last chunk.
  5. If non-streaming: parses the response in one shot.
  6. Returns a NormalizedResponse with text, tool calls (in OpenAI shape), usage stats.
  7. On exception: classifies the OpenAI exception and re-raises as ProviderError.

Usage Pattern (per vendor)

# _send_grok, _send_llama (single-shot placeholders), _send_minimax (with restored tool loop)
def _send_grok(md_content, user_message, base_dir, file_items=None, discussion_history="", stream=False, ...):
    client = _ensure_grok_client()  # openai.OpenAI(api_key=..., base_url="https://api.x.ai/v1")
    with _grok_history_lock:
        # ... build messages, append user, system + context ...
        request = OpenAICompatibleRequest(
            messages=messages, model=_model, stream=stream,
            stream_callback=stream_callback,
        )
        caps = get_capabilities("grok", _model)
        response = send_openai_compatible(client, request, capabilities=caps)
        # ... append to history, return response.text ...

Qwen Adapter (src/qwen_adapter.py)

Qwen uses Alibaba's DashScope native SDK (not OpenAI-compatible) because DashScope's OpenAI-compatible mode drops important features (Qwen-Audio, Qwen-Long custom chunking, Qwen-VL-Max enhanced vision). The adapter normalizes DashScope tool format to OpenAI shape via build_dashscope_tools() and classifies DashScope exceptions via classify_dashscope_error().

Llama Multi-Backend

_send_llama supports 3 backends via the state globals _llama_base_url and _llama_api_key:

  • Ollama (local): http://localhost:11434/v1; no auth
  • OpenRouter (cloud aggregator): https://openrouter.ai/api/v1
  • Custom URL (escape hatch): any OpenAI-compatible endpoint

run_with_tool_loop — Shared Tool-Call Loop Helper

Added 2026-06-11 by the qwen_llama_grok_followup_20260611 track. Wraps send_openai_compatible with the tool-call loop, so 4+ OpenAI-compatible vendors share the same dispatch + history logic instead of each having their own inline loop.

Signature (in src/ai_client.py:806):

def run_with_tool_loop(
    client: Any,
    request: OpenAICompatibleRequest | Callable[[int], OpenAICompatibleRequest],
    *,
    capabilities: "VendorCapabilities",
    pre_tool_callback: Optional[Callable] = None,
    qa_callback: Optional[Callable] = None,
    stream_callback: Optional[Callable[[str], None]] = None,
    patch_callback: Optional[Callable] = None,
    base_dir: str,
    vendor_name: str,
    history_lock: Optional[threading.Lock] = None,
    history: Optional[list] = None,
    trim_func: Optional[Callable] = None,
    send_func: Optional[Callable[[int], "NormalizedResponse"]] = None,
    on_pre_dispatch: Optional[Callable] = None,
) -> str:

Two extensions were added beyond the original signature:

  1. request accepts a Callable[[int], OpenAICompatibleRequest] (per-round history rebuild). Use this when the vendor mutates history between rounds (e.g., MiniMax's per-round append).
  2. send_func + on_pre_dispatch allows vendored call paths (e.g., Gemini CLI's GeminiCliAdapter) to share the loop + dispatch without going through send_openai_compatible.

Vendors applied (as of 2026-06-11):

  • _send_minimax (was inline, now uses helper)
  • _send_grok (was single-shot, now has loop)
  • _send_llama (was single-shot, now has loop)
  • _send_gemini_cli (uses send_func + on_pre_dispatch)

Vendors still deferred (multi-day refactor; see conductor/tracks/qwen_llama_grok_followup_20260611/state.toml t5_6/7/8):

  • _send_anthropic (uses anthropic SDK)
  • _send_gemini (uses google-genai streaming)
  • _send_deepseek (uses requests.post)

Audit enforcement: scripts/audit_no_inline_tool_loops.py fails if any non-deferred _send_<vendor>() has an inline for ... in range(MAX_TOOL_ROUNDS) loop.

Native Ollama Adapter (Phase 4)

Added 2026-06-11. When _llama_base_url is localhost / 127.0.0.1 (Ollama default), _send_llama routes to _send_llama_native (which wraps ollama_chat). The native adapter POSTs to /api/chat (NOT /v1/chat/completions) and supports Ollama's vendor-specific fields:

  • think: low | medium | high — reasoning depth hint
  • images: list of base64-encoded images (for vision-capable models)
  • thinking: returned field; captured in history for subsequent rounds

The dispatcher check is in _send_llama at the function head:

if "localhost" in _llama_base_url or "127.0.0.1" in _llama_base_url:
    return _send_llama_native(...)

For OpenRouter, custom URLs, and other cloud Llama endpoints, the existing OpenAI-compat path is unchanged.

V2 Capability Matrix (Phase 4)

Added 2026-06-11. The VendorCapabilities dataclass in src/vendor_capabilities.py now has 12 v2 fields beyond the original 7 v1 fields:

V1 fields (unchanged):

  • vision, tool_calling, caching, streaming, model_discovery, context_window, cost_tracking

V2 fields (added):

  • local — backend is on-device (Ollama, etc.); consumed by _apply_runtime_caps_override for llama+localhost
  • reasoning — model supports thinking / reasoning traces (e.g., MiniMax-M2.5/M2.7, DeepSeek R1, llama-3.1-405b-reasoning)
  • structured_output — model supports JSON / tool-use output format
  • code_execution — model can run code (server-side; e.g., gemini-2.0-experimental)
  • web_search — model can do live web search (e.g., grok-2, gemini-grounded)
  • x_search — X/Twitter search (grok-specific)
  • file_search — model has a file_search tool (Anthropic)
  • mcp_support — model supports the Model Context Protocol (Anthropic, gemini)
  • audio — model accepts audio input (gemini-2.5+, qwen-audio)
  • video — model accepts video input (gemini-2.5+, qwen-vl-max)
  • grounding — model supports grounding (gemini)
  • computer_use — model can drive a computer (Anthropic claude-3.5+)

GUI rendering: src/gui_2.py:_render_v2_capability_badges renders small green badges in the provider panel for each field where caps.<field> = True. The user can see at a glance which capabilities their active vendor+model supports.

Static + runtime: Most v2 fields are per-model properties in the registry. caps.local is unique — it's runtime state (URL-dependent), so the GUI uses dataclasses.replace(caps, local=True) to override when the active backend is Ollama.

PROVIDERS Location (Phase 2)

The PROVIDERS list moved from src/models.py to src/ai_client.py:56 per the AGENTS.md HARD RULE (no new src/<thing>.py files). A PEP 562 __getattr__ re-export in src/models.py:261 maintains backward compatibility (lazy import; breaks the circular dependency where src/ai_client.py imports ToolPreset from src/models.py).

Audit: scripts/audit_providers_source_of_truth.py fails if PROVIDERS is declared in src/models.py.

Tests

  • tests/test_vendor_capabilities.py (3 tests): registry lookup, vendor-default fallback, unknown-vendor raises
  • tests/test_openai_compatible.py (6 tests): non-streaming, streaming aggregation, tool call detection, vision, error classification, frozen dataclass
  • conductor/tracks/nagent_review_20260608/report.md §15 Pitfalls #2 and #4 — Deep-dive on the per-provider history globals and the stateful singleton pattern; future-track candidate for stateless LLMClient