# ai_client.py from __future__ import annotations """ Note(Gemini): Acts as the unified interface for multiple LLM providers (Anthropic, Gemini). Abstracts away the differences in how they handle tool schemas, history, and caching. For Anthropic: aggressively manages the ~200k token limit by manually culling stale [FILES UPDATED] entries and dropping the oldest message pairs. For Gemini: injects the initial context directly into system_instruction during chat creation to avoid massive history bloat. """ # ai_client.py import anthropic from google import genai from google.genai import types import asyncio import datetime import difflib import hashlib import json import os import requests # type: ignore[import-untyped] import sys import threading import time import tomllib from collections import deque from typing import Optional, Callable, Any, List, Union, cast, Iterable from pathlib import Path from src.events import EventEmitter from src import project_manager from src import file_cache from src import mcp_client from src import mma_prompts from src import performance_monitor from src import project_manager from src.tool_bias import ToolBiasEngine from src.models import ToolPreset, BiasProfile, Tool from src.gemini_cli_adapter import GeminiCliAdapter as GeminiCliAdapter _provider: str = "gemini" _model: str = "gemini-2.5-flash-lite" _temperature: float = 0.0 _top_p: float = 1.0 _max_tokens: int = 8192 _history_trunc_limit: int = 8000 # Global event emitter for API lifecycle events events: EventEmitter = EventEmitter() class ProviderError(Exception): def __init__(self, kind: str, provider: str, original: Exception) -> None: """ [C: src/api_hooks.py:HookServerInstance.__init__, src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] """ self.kind = kind self.provider = provider self.original = original super().__init__(str(original)) def ui_message(self) -> str: """ [C: src/app_controller.py:AppController._handle_request_event, src/app_controller.py:_api_generate] """ labels = { "quota": "QUOTA EXHAUSTED", "rate_limit": "RATE LIMITED", "auth": "AUTH / API KEY ERROR", "balance": "BALANCE / BILLING ERROR", "network": "NETWORK / CONNECTION ERROR", "unknown": "API ERROR", } label = labels.get(self.kind, "API ERROR") return f"[{self.provider.upper()} {label}]\n\n{self.original}" #region: Provider Configuration def set_model_params(temp: float, max_tok: int, trunc_limit: int = 8000, top_p: float = 1.0) -> None: """ Sets global generation parameters like temperature and max tokens. [C: src/app_controller.py:AppController._handle_request_event, src/app_controller.py:_api_generate] """ global _temperature, _max_tokens, _history_trunc_limit, _top_p _temperature = temp _max_tokens = max_tok _history_trunc_limit = trunc_limit _top_p = top_p _gemini_client: Optional[genai.Client] = None _gemini_chat: Any = None _gemini_cache: Any = None _gemini_cache_md_hash: Optional[str] = None _gemini_cache_created_at: Optional[float] = None _gemini_cached_file_paths: list[str] = [] # Gemini cache TTL in seconds. Caches are created with this TTL and # proactively rebuilt at 90% of this value to avoid stale-reference errors. _GEMINI_CACHE_TTL: int = 3600 _anthropic_client: Optional[anthropic.Anthropic] = None _anthropic_history: list[dict[str, Any]] = [] _anthropic_history_lock: threading.Lock = threading.Lock() _deepseek_client: Any = None _deepseek_history: list[dict[str, Any]] = [] _deepseek_history_lock: threading.Lock = threading.Lock() _minimax_client: Any = None _minimax_history: list[dict[str, Any]] = [] _minimax_history_lock: threading.Lock = threading.Lock() _send_lock: threading.Lock = threading.Lock() _BIAS_ENGINE = ToolBiasEngine() _active_tool_preset: Optional[ToolPreset] = None _active_bias_profile: Optional[BiasProfile] = None _gemini_cli_adapter: Optional[GeminiCliAdapter] = None # Injected by gui.py - called when AI wants to run a command. confirm_and_run_callback: Optional[Callable[[str, str, Optional[Callable[[str], str]], Optional[Callable[[str, str], Optional[str]]]], Optional[str]]] = None # Injected by gui.py - called whenever a comms entry is appended. # Use get_comms_log_callback/set_comms_log_callback for thread-safe access. comms_log_callback: Optional[Callable[[dict[str, Any]], None]] = None # Injected by gui.py - called whenever a tool call completes. tool_log_callback: Optional[Callable[[str, str], None]] = None _local_storage = threading.local() _tool_approval_modes: dict[str, str] = {} def get_current_tier() -> Optional[str]: """ Returns the current tier from thread-local storage. [C: src/app_controller.py:AppController._on_tool_log, tests/test_ai_client_concurrency.py:intercepted_append] """ return getattr(_local_storage, "current_tier", None) def set_current_tier(tier: Optional[str]) -> None: """ Sets the current tier in thread-local storage. [C: src/app_controller.py:AppController._handle_request_event, src/conductor_tech_lead.py:generate_tickets, src/multi_agent_conductor.py:run_worker_lifecycle, tests/test_ai_client_concurrency.py:run_t1, tests/test_ai_client_concurrency.py:run_t2, tests/test_mma_agent_focus_phase1.py:reset_tier, tests/test_mma_agent_focus_phase1.py:test_append_comms_source_tier_none_when_unset, tests/test_mma_agent_focus_phase1.py:test_append_comms_source_tier_set_when_current_tier_set, tests/test_mma_agent_focus_phase1.py:test_append_comms_source_tier_tier2] """ _local_storage.current_tier = tier def get_comms_log_callback() -> Optional[Callable[[dict[str, Any]], None]]: """ Returns the comms log callback (thread-local with global fallback). [C: src/multi_agent_conductor.py:run_worker_lifecycle] """ tl_cb = getattr(_local_storage, "comms_log_callback", None) if tl_cb: return tl_cb return comms_log_callback def set_comms_log_callback(cb: Optional[Callable[[dict[str, Any]], None]]) -> None: """ Sets the comms log callback (both global and thread-local). [C: src/app_controller.py:AppController._init_ai_and_hooks, src/multi_agent_conductor.py:run_worker_lifecycle] """ global comms_log_callback comms_log_callback = cb _local_storage.comms_log_callback = cb # Increased to allow thorough code exploration before forcing a summary MAX_TOOL_ROUNDS: int = 10 # Maximum cumulative bytes of tool output allowed per send() call. _MAX_TOOL_OUTPUT_BYTES: int = 500_000 # Maximum characters per text chunk sent to Anthropic. _ANTHROPIC_CHUNK_SIZE: int = 120_000 _SYSTEM_PROMPT: str = ( "You are a helpful coding assistant with access to a PowerShell tool (run_powershell) and MCP tools (file access: read_file, list_directory, search_files, get_file_summary, web access: web_search, fetch_url). " "When calling file/directory tools, always use the 'path' parameter for the target path. " "When asked to create or edit files, prefer targeted edits over full rewrites. " "Always explain what you are doing before invoking the tool.\n\n" "When writing or rewriting large files (especially those containing quotes, backticks, or special characters), " "avoid python -c with inline strings. Instead: (1) write a .py helper script to disk using a PS here-string " "(@'...'@ for literal content), (2) run it with `python