Private
Public Access
0
0
Files
manual_slop/docs/guide_ai_client.md
T
ed 691dc584eb docs(phase-6): update ai_client+models guides; report + follow-up track setup
Phase 6 t6.1 + t6.2 (no archive per user directive):
- docs/guide_ai_client.md: update Overview to mention 8 providers (was 5);
  add 'Shared OpenAI-Compatible Helper' section explaining
  src/openai_compatible.py (NormalizedResponse, OpenAICompatibleRequest,
  send_openai_compatible, usage pattern); document the Qwen adapter
  and Llama multi-backend.
- docs/guide_models.md: update PROVIDERS list to 8 entries (was 5).
- conductor/tracks.md: update the Qwen track entry to reflect
  '50/79 tasks done; Phase 6 in progress; NOT archiving - has follow-up';
  add detailed status note pointing to the follow-up track + audit
  report.
- docs/reports/qwen_llama_grok_followup_audit_20260611.md: NEW report
  explaining why a follow-up is needed (7 categories of gaps; the
  Tech Lead's 'footnote for now' failure mode; the lessons learned).
- conductor/tracks/qwen_llama_grok_followup_20260611/: NEW follow-up
  track setup (spec.md, state.toml, metadata.json, TODO.md).
  5 phases: tool loop lift, PROVIDERS move, UX adaptations 2-9,
  local-first + matrix v2, Anthropic/Gemini/DeepSeek migration.

Phase 6 t6.3 (git mv to archive) and t6.4 (mark Recently Completed)
are NOT applied per user directive: 'we can then doc this we're not
archiving yet, if we have a follow up track I need this one to stay
up because there is still alot todo'.
2026-06-11 09:33:18 -04:00

20 KiB

src/ai_client.py — Multi-Provider LLM Abstraction

Top | Architecture | Testing | MMA


Overview

src/ai_client.py (~116KB) is the unified LLM client for 8 providers. It abstracts the differences between providers (Gemini, Anthropic, DeepSeek, MiniMax, Gemini CLI, Qwen, Grok, Llama) behind a single send() function.

The module is a stateful singleton — all provider state is held in module-level globals. There is no class wrapping; the module itself is the abstraction layer.

The 8 providers split into 3 API shapes:

  • Native SDK: Gemini (google-genai), Anthropic (anthropic), Qwen (DashScope)
  • OpenAI-compatible: MiniMax, Grok, Llama (Ollama/OpenRouter/custom), DeepSeek
  • Subprocess: Gemini CLI

The OpenAI-compatible vendors all call the shared helper in src/openai_compatible.py (added 2026-06-06 by the qwen_llama_grok_integration_20260606 track; see "Shared OpenAI-Compatible Helper" section below). The MiniMax provider's _send_minimax was refactored to use this helper (Phase 4 of the same track, 231 → 75 lines, 68% reduction).


Module-Level Imports

Important: The 5 provider SDKs are NOT imported at module level. import google.genai, import anthropic, import openai, and import fastapi are heavy (~430-955ms each on cold load) and are now obtained via src.module_loader._require_warmed("google.genai") and similar calls, after the WarmupManager has loaded them in the background. The module-level globals you see in the State section (_gemini_client, _anthropic_client, etc.) are typed as Optional because they're populated by _require_warmed() on first use, not at import time.

This change was part of the 2026-06-06 startup_speedup_20260606 track. Before: import src.ai_client took ~1800ms. After: ~161ms. The remaining cost is the bare module skeleton.

Architecture

┌─────────────────────────────────────────────────┐
│ ai_client.send(md_content, user_message, ...)    │
│                                                 │
│ 1. _send_lock.acquire() — serialize all calls   │
│ 2. Read _provider / _model                       │
│ 3. Route to provider-specific _send_<provider>() │
│ 4. Return str response                           │
└─────────────────┬───────────────────────────────┘
                  │ dispatches based on _provider
                  ▼
   ┌────────┬─────────┬────────┬──────────┐
   ▼        ▼         ▼        ▼          ▼
_gemini  _anthropic _deepseek _minimax  _gemini_cli
                                              (subprocess)

State

All state is module-level globals. The most important:

Variable Type Purpose
_provider: str "gemini" | "anthropic" | "deepseek" | "minimax" | "gemini_cli" Active provider
_model: str str Active model name
_temperature: float 0.0 Sampling temperature
_top_p: float 1.0 Nucleus sampling
_max_tokens: int 8192 Output token cap
_history_trunc_limit: int 8000 Char limit for truncating old tool outputs
_send_lock threading.Lock Serializes all send() calls
_current_palette: str theme Last-applied theme palette

Per-Provider State

_gemini_client: Optional[genai.Client] = None
_gemini_chat: Any = None
_gemini_cache: Any = None
_gemini_cache_md_hash: Optional[str] = None
_gemini_cache_created_at: Optional[float] = None
_gemini_cached_file_paths: list[str] = []

_anthropic_client: Optional[anthropic.Anthropic] = None
_anthropic_history: list[dict] = []
_anthropic_history_lock: threading.Lock = threading.Lock()

_deepseek_client: Any = None
_deepseek_history: list[dict] = []
_deepseek_history_lock: threading.Lock = threading.Lock()

_minimax_client: Any = None
_minimax_history: list[dict] = []
_minimax_history_lock: threading.Lock = threading.Lock()

_gemini_cli_adapter: Optional[GeminiCliAdapter] = None

The Public API

send(...) — The Main Entry Point

def send(
    md_content: str,
    user_message: str,
    base_dir: str = ".",
    file_items: list[dict] | None = None,
    discussion_history: str = "",
    stream: bool = False,
    pre_tool_callback: Optional[Callable] = None,
    qa_callback: Optional[Callable] = None,
    enable_tools: bool = True,
    stream_callback: Optional[Callable] = None,
    patch_callback: Optional[Callable] = None,
    rag_engine: Optional[Any] = None,
) -> str:

Returns the model's response as a string. All provider calls go through here.

Parameters:

  • md_content — the system prompt + context (markdown)
  • user_message — the user's message
  • base_dir — for MCP tool filesystem operations
  • file_items — files in the context (deprecated path; usually empty)
  • discussion_history — legacy parameter
  • stream / stream_callback — for streaming responses
  • pre_tool_callback — called before each tool execution (HITL gate)
  • qa_callback — called when an error occurs (Tier 4 integration)
  • enable_tools — whether to enable PowerShell + MCP tools
  • patch_callback — Tier 4 patch generation hook
  • rag_engine — optional RAG engine for context augmentation

Provider Switching

from src import ai_client
ai_client.set_provider("gemini", "gemini-3-flash-preview")
ai_client.set_provider("anthropic", "claude-3-5-sonnet-latest")
ai_client.set_provider("deepseek", "deepseek-chat")
ai_client.set_provider("minimax", "grok-2-latest")
ai_client.set_provider("gemini_cli", "gemini-2.0-flash")

Parameter Setters

ai_client.set_model_params(temp=0.7, max_tok=4096, top_p=0.9, trunc_limit=4000)

Session Management

ai_client.reset_session()  # Clears all provider state, history, cache

Event Hooks

from src import ai_client

# Confirmation hook (called before destructive tool execution)
ai_client.confirm_and_run_callback = my_gui_callback

# Comms log hook (called on every API call)
ai_client.comms_log_callback = my_logging_callback

# Tool log hook (called on every tool completion)
ai_client.tool_log_callback = my_tool_logging_callback

# Event emitter (for any subscriber)
ai_client.events.on("my_event", my_handler)

Comms Log

ai_client._append_comms(direction, kind, payload)  # Add entry
ai_client.get_comms_log()  # Read all
ai_client.clear_comms_log()  # Clear
ai_client.get_token_stats(md_content)  # Estimate token usage

Provider Error Taxonomy

class ProviderError(Exception):
    kind: str  # "quota" | "rate_limit" | "auth" | "balance" | "network" | "unknown"
    provider: str
    original: Exception

    def ui_message(self) -> str:
        """Returns a user-friendly error message."""

ProviderError is raised by provider-specific _send_* functions on failure. The caller (typically app_controller.py) catches it and surfaces the error to the user via app.ai_status.


The Tool-Call Loop

All providers follow the same high-level pattern in _send_*:

def _send_<provider>(md_content, user_message, ...):
    for round in range(MAX_TOOL_ROUNDS + 2):  # up to 10 rounds
        response = provider_api_call(md_content, user_message, history, tools)
        comms_log(direction="IN", kind="response", payload=response)

        if not has_function_calls(response):
            return extract_text(response)

        for call in response.function_calls:
            if pre_tool_callback and pre_tool_callback(...) is rejected:
                return rejection_message
            tool_result = dispatch(call.name, call.args, base_dir)
            append_tool_result_to_history(call, tool_result)

        # Context refresh: re-read all tracked files (mtime check)
        _reread_file_items(file_items)

        # Truncate tool outputs at _history_trunc_limit
        truncate_tool_outputs(history)

        # Cumulative byte check
        if cumulative_tool_bytes > 500_000:
            inject_warning()

    return final_response

The constants:

  • MAX_TOOL_ROUNDS: int = 10 — max tool-call iterations per send()
  • _MAX_TOOL_OUTPUT_BYTES: int = 500_000 — cumulative tool output budget
  • _ANTHROPIC_CHUNK_SIZE: int = 120_000 — chars per Anthropic system text block
  • _ANTHROPIC_MAX_PROMPT_TOKENS: int = 180_000 — Anthropic prompt limit (200K minus headroom)
  • _GEMINI_MAX_INPUT_TOKENS: int = 900_000 — Gemini 1M window minus headroom

Provider-Specific Behaviors

Gemini (SDK)

  • Server-side cache: genai.CachedContent with TTL management
  • Cache rebuild at 90% TTL: proactive renewal
  • Cache hash: tracks content hash for invalidation
  • Cached file paths: tracks which files are in the active cache

Anthropic

  • Ephemeral prompt caching: 4 cache_control: ephemeral breakpoints
  • Breakpoints: system prompt, context chunks, tool def, conversation prefix
  • History trimming at 180K tokens: 2-phase (strip stale file refreshes, then drop turn pairs)
  • History repair: _repair_anthropic_history handles tool_result chain breaks

DeepSeek

  • Raw HTTP: uses requests.post directly (no SDK)
  • Streaming: supports streaming responses
  • History repair: _repair_deepseek_history for tool result chains

MiniMax

  • OpenAI-compatible endpoint: uses the openai SDK
  • History trimming: similar to Anthropic (drop turn pairs at threshold)
  • History repair: _repair_minimax_history

Gemini CLI

  • Subprocess adapter: GeminiCliAdapter in src/gemini_cli_adapter.py
  • Persistent session: CLI maintains its own session ID
  • JSONL output protocol: parses streaming JSONL from the CLI subprocess
  • Full feature parity: tool calls, streaming, usage metadata

History Trimming Strategies

Gemini (40% threshold)

if total_in > _GEMINI_MAX_INPUT_TOKENS * 0.4:
    while len(hist) > 4 and total_in > _GEMINI_MAX_INPUT_TOKENS * 0.3:
        hist.pop(0)  # Assistant
        hist.pop(0)  # User

Anthropic (180K limit)

_trim_anthropic_history(system_blocks, history) — two-phase:

  1. Strip stale [SYSTEM: FILES UPDATED] blocks
  2. Drop oldest turn pairs (preserving tool_result chains)

MiniMax

Same pattern as Anthropic (similar 180K limit).

DeepSeek

No built-in trimming (relies on the caller to keep history short).


Caching Strategies

Gemini Server-Side Cache

_gemini_cache_md_hash: Optional[str] = None  # Hash of cached content
_gemini_cache_created_at: Optional[float] = None  # Monotonic time

The cache decision is a 3-way branch on each _send_gemini call:

  • Hash changed: delete old, rebuild with new content
  • Cache age > 90% of TTL (3240s of 3600s): proactive renewal
  • No cache exists: create new if token count >= 2048, otherwise inline

Anthropic Cache (4-Breakpoint System)

[System prompt]─breakpoint 1
[Context chunks]─breakpoint 2
[Tool definitions]─breakpoint 3
[Last user message]─breakpoint 4

Before placing breakpoint 4, all existing cache_control is stripped to prevent exceeding the 4-breakpoint limit.


Context Refresh Mechanism

After the last tool call in each round, _reread_file_items(file_items) checks mtimes:

  1. For each file item: compare Path.stat().st_mtime against stored mtime
  2. If unchanged: pass through as-is
  3. If changed: re-read content, store old_content for diffing, update mtime
  4. Changed files are diffed via _build_file_diff_text:
    • Files ≤ 200 lines: emit full content
    • Files > 200 lines with old_content: emit difflib.unified_diff
  5. Diff is appended to the last tool's output as [SYSTEM: FILES UPDATED]\n\n{diff}
  6. Stale [FILES UPDATED] blocks are stripped from older history turns by _strip_stale_file_refreshes

This is the "agent always sees current code" mechanism.


Subagent Summarization

For Tier 4: when an error occurs, qa_callback may be invoked to get a Tier 4 AI summary of the traceback. The summary is injected back into the worker's history as a hint.

def run_tier4_analysis(stderr: str) -> str:
    """Stateless Tier 4 QA analysis of an error message."""
    # Uses a dedicated system prompt for error triage
    # Returns analysis text (root cause, suggested fix)
    # Does NOT modify any code — analysis only

For Tier 4 patch generation:

def run_tier4_patch_generation(error: str, file_context: str) -> str:
    """Generate a unified diff patch from an error and file context."""
    # Returns the patch as a string
    # The caller (typically the patch modal) presents it for human review

Public API Quick Reference

Function Purpose
send(...) The main entry point — call the active provider
set_provider(provider, model) Switch active provider and model
get_provider() -> str Get the active provider name
set_model_params(temp, max_tok, trunc_limit, top_p) Update generation params
set_custom_system_prompt(prompt) Set the per-session system prompt override
set_base_system_prompt(prompt) Set the foundational base prompt (advanced)
set_use_default_base_prompt(use: bool) Toggle whether the base prompt is included
set_project_context_marker(marker) Set the project-specific context tag
reset_session() Clear all provider state
get_comms_log() Read the in-memory comms log
clear_comms_log() Clear the in-memory comms log
get_token_stats(md_content) Estimate token usage for the given content
cleanup() Tear down (delete Gemini caches, etc.)
get_current_palette() -> str Get the current theme palette name
list_models(provider) -> list[str] List available models for a provider
run_tier4_analysis(stderr) -> str Tier 4 error analysis
run_tier4_patch_generation(error, file_context) -> str Tier 4 patch generation
run_subagent_summarization(file_path, content, is_code, outline) -> str AI summary of a file
run_discussion_compression(text) -> str AI compression of a long discussion

Thread Safety

  • _send_lock: threading.Lock — serializes all provider calls. No two send() calls run concurrently.
  • Per-provider history locks (_anthropic_history_lock, etc.) — guard the history list mutations.
  • The EventEmitter (in src/events.py) is thread-safe for subscribe/emit.

Testing

Unit Tests (no real API calls)

def test_set_provider():
    from src import ai_client
    ai_client.set_provider("anthropic", "claude-3-5-sonnet-latest")
    assert ai_client.get_provider() == "anthropic"
    ai_client.reset_session()  # Cleanup

Mocked Tests

from unittest.mock import patch

def test_send_routes_to_provider(monkeypatch):
    with patch.object(ai_client, "_send_anthropic", return_value="mocked") as m:
        ai_client.set_provider("anthropic", "claude-3-5-sonnet-latest")
        result = ai_client.send("system", "user")
        assert result == "mocked"
        m.assert_called_once()
    ai_client.reset_session()

Integration (real API)

Gated by env var (e.g., RUN_REAL_AI_TESTS=1). Hits the real API. Not in default CI.


See Also


Shared OpenAI-Compatible Helper (src/openai_compatible.py)

Added 2026-06-06 by the qwen_llama_grok_integration_20260606 track. Operates on a normalized request/response data structure so 4 OpenAI-compatible vendors (MiniMax, Grok, Llama, DeepSeek) can share the same request building, response parsing, streaming aggregation, tool call detection, and error classification logic.

Data Structures

@dataclass(frozen=True)
class NormalizedResponse:
    text: str
    tool_calls: list[dict[str, Any]]
    usage_input_tokens: int
    usage_output_tokens: int
    usage_cache_read_tokens: int
    usage_cache_creation_tokens: int
    raw_response: Any

@dataclass
class OpenAICompatibleRequest:
    messages: list[dict[str, Any]]
    model: str
    temperature: float = 0.0
    top_p: float = 1.0
    max_tokens: int = 8192
    tools: Optional[list[dict[str, Any]]] = None
    tool_choice: str = "auto"
    stream: bool = False
    stream_callback: Optional[Callable[[str], None]] = None

The Function

def send_openai_compatible(
    client: Any,        # openai.OpenAI client with vendor-specific base_url + auth
    request: OpenAICompatibleRequest,
    *, capabilities: "VendorCapabilities",  # from src/vendor_capabilities.py
) -> NormalizedResponse:

The function:

  1. Translates request.messages into the OpenAI SDK's messages parameter (passthrough — already in OpenAI shape).
  2. Translates request.tools if non-None (passthrough for now; future: strip unsupported fields based on capabilities).
  3. Calls client.chat.completions.create(...) with the right parameters.
  4. If streaming: aggregates chunks; calls stream_callback(text_chunk) for each text delta; collects final usage from the last chunk.
  5. If non-streaming: parses the response in one shot.
  6. Returns a NormalizedResponse with text, tool calls (in OpenAI shape), usage stats.
  7. On exception: classifies the OpenAI exception and re-raises as ProviderError.

Usage Pattern (per vendor)

# _send_grok, _send_llama (single-shot placeholders), _send_minimax (with restored tool loop)
def _send_grok(md_content, user_message, base_dir, file_items=None, discussion_history="", stream=False, ...):
    client = _ensure_grok_client()  # openai.OpenAI(api_key=..., base_url="https://api.x.ai/v1")
    with _grok_history_lock:
        # ... build messages, append user, system + context ...
        request = OpenAICompatibleRequest(
            messages=messages, model=_model, stream=stream,
            stream_callback=stream_callback,
        )
        caps = get_capabilities("grok", _model)
        response = send_openai_compatible(client, request, capabilities=caps)
        # ... append to history, return response.text ...

Qwen Adapter (src/qwen_adapter.py)

Qwen uses Alibaba's DashScope native SDK (not OpenAI-compatible) because DashScope's OpenAI-compatible mode drops important features (Qwen-Audio, Qwen-Long custom chunking, Qwen-VL-Max enhanced vision). The adapter normalizes DashScope tool format to OpenAI shape via build_dashscope_tools() and classifies DashScope exceptions via classify_dashscope_error().

Llama Multi-Backend

_send_llama supports 3 backends via the state globals _llama_base_url and _llama_api_key:

  • Ollama (local): http://localhost:11434/v1; no auth
  • OpenRouter (cloud aggregator): https://openrouter.ai/api/v1
  • Custom URL (escape hatch): any OpenAI-compatible endpoint

The local-LLM signal is _get_llama_cost_tracking() (returns False for localhost/127.0.0.1).

Tests

  • tests/test_vendor_capabilities.py (3 tests): registry lookup, vendor-default fallback, unknown-vendor raises
  • tests/test_openai_compatible.py (6 tests): non-streaming, streaming aggregation, tool call detection, vision, error classification, frozen dataclass
  • conductor/tracks/nagent_review_20260608/report.md §15 Pitfalls #2 and #4 — Deep-dive on the per-provider history globals and the stateful singleton pattern; future-track candidate for stateless LLMClient