This commit is contained in:
2026-02-21 16:51:00 -05:00
parent ee1ee1c77e
commit 330c8604c9
5 changed files with 438 additions and 141 deletions

View File

@@ -1,4 +1,4 @@
# ai_client.py
# ai_client.py
import tomllib
import json
import datetime
@@ -29,6 +29,12 @@ tool_log_callback = None
MAX_TOOL_ROUNDS = 5
# Maximum characters per text chunk sent to Anthropic.
# Anthropic's limit is ~200k tokens; we use 180k chars as a safe ceiling
# (1 token ~ 3-4 chars, so 180k chars ~ 45-60k tokens, well within limits
# even for very large aggregated markdown files).
_ANTHROPIC_CHUNK_SIZE = 180_000
# Anthropic system prompt - sent with cache_control so it is cached after the
# first request and reused on every subsequent call within the TTL window.
_ANTHROPIC_SYSTEM = (
@@ -170,6 +176,7 @@ def reset_session():
_gemini_chat = None
_anthropic_client = None
_anthropic_history = []
file_cache.reset_client()
# ------------------------------------------------------------------ model listing
@@ -379,31 +386,32 @@ def _send_gemini(md_content: str, user_message: str, base_dir: str) -> str:
# ------------------------------------------------------------------ anthropic
#
# Caching strategy (Anthropic prompt caching):
# Sending strategy for Anthropic:
#
# The Anthropic API caches a contiguous prefix of the input. To maximise
# cache hits we structure every request as follows:
# PRIMARY PATH (_send_anthropic_files) - used when file_items are provided
# ============
# Each file from config is uploaded via the Anthropic Files API
# (file_cache.get_file_id handles upload + caching by mtime/size).
# Files are sent as individual document/image content blocks in the first
# user message. The discussion history section of the markdown (which is
# small and changes each session) is still sent as a text block.
# This keeps the per-message payload lean and lets the Files API handle
# the heavy lifting of large source files.
#
# system (array form):
# [0] _ANTHROPIC_SYSTEM text <- cache_control: ephemeral
# Stable across the whole session; cached after the first request.
# FALLBACK PATH (_send_anthropic_chunked) - used when no file_items, or if
# ============ the Files API path fails
# The full aggregated markdown is split into <=_ANTHROPIC_CHUNK_SIZE char
# chunks and sent as separate text content blocks. cache_control:ephemeral
# is placed on the LAST chunk so the whole context prefix is cached together.
#
# tools:
# Last tool has cache_control: ephemeral.
# Stable across the whole session; cached together with the system prompt.
# Caching strategy (Anthropic prompt caching):
# - System prompt: cache_control:ephemeral on the text block
# - Last tool in _ANTHROPIC_TOOLS: cache_control:ephemeral
# - Context content blocks: cache_control:ephemeral on the last block
# These three form a stable cached prefix that survives across turns.
#
# messages[0] (first user turn ever, or re-sent each call):
# content[0]: <context> block <- cache_control: ephemeral
# The aggregated markdown. Changes only when the user regenerates.
# A new cache entry is created when it changes; otherwise it's a hit.
# content[1]: user question <- no cache_control (varies every turn)
#
# Subsequent turns (tool results, follow-up questions) are appended to
# _anthropic_history normally without extra cache markers.
#
# Token cost of cache creation is ~25 % more than a normal input token, but
# cache reads cost ~10 % of a normal input token, so steady-state (many
# rounds / sends per session) is much cheaper.
# Token cost: cache creation ~25% more than normal input; cache reads ~10%
# of normal input. Steady-state use is much cheaper after the first request.
def _ensure_anthropic_client():
global _anthropic_client
@@ -413,127 +421,337 @@ def _ensure_anthropic_client():
_anthropic_client = anthropic.Anthropic(api_key=creds["anthropic"]["api_key"])
def _send_anthropic(md_content: str, user_message: str, base_dir: str) -> str:
global _anthropic_history
import anthropic
def _chunk_text(text: str, chunk_size: int) -> list[str]:
"""Split text into chunks of at most chunk_size characters."""
return [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)]
try:
_ensure_anthropic_client()
# Build the user content: context block (cached) + question (not cached).
# The cache anchor is placed on the context block so the entire prefix
# (system + tools + context) is eligible for caching.
user_content = [
{
"type": "text",
"text": f"<context>\n{md_content}\n</context>",
"cache_control": {"type": "ephemeral"},
},
{
"type": "text",
"text": user_message,
},
]
def _build_chunked_context_blocks(md_content: str) -> list[dict]:
"""
Split md_content into <=_ANTHROPIC_CHUNK_SIZE char chunks and return
a list of Anthropic text content blocks. cache_control:ephemeral is
placed only on the LAST block so the whole prefix is cached as one unit.
"""
chunks = _chunk_text(md_content, _ANTHROPIC_CHUNK_SIZE)
blocks = []
for i, chunk in enumerate(chunks):
block: dict = {"type": "text", "text": chunk}
if i == len(chunks) - 1:
block["cache_control"] = {"type": "ephemeral"}
blocks.append(block)
return blocks
_anthropic_history.append({"role": "user", "content": user_content})
_append_comms("OUT", "request", {
"message": f"<context>\n{md_content}\n</context>\n\n{user_message}",
def _build_files_context_blocks(
md_header: str,
file_items: list[dict],
screenshot_items: list[dict] | None = None,
) -> list[dict]:
"""
Build content blocks for the Files API path.
- md_header : the Discussion History section text (small, sent as text block)
- file_items : list of dicts from aggregate.build_file_items()
each has: path (Path|None), entry (str), content (str), error (bool)
- screenshot_items: list of screenshot paths (Path) to include as image blocks
Returns a list of Anthropic content blocks.
The last block gets cache_control:ephemeral.
"""
blocks: list[dict] = []
# Discussion history / header as a text block (small, always inline)
if md_header.strip():
blocks.append({
"type": "text",
"text": md_header,
})
for round_idx in range(MAX_TOOL_ROUNDS):
response = _anthropic_client.messages.create(
model=_model,
max_tokens=8096,
system=[
{
"type": "text",
"text": _ANTHROPIC_SYSTEM,
"cache_control": {"type": "ephemeral"},
}
],
tools=_ANTHROPIC_TOOLS,
messages=_anthropic_history,
)
# One document/image block per file
for item in file_items:
path: Path | None = item.get("path")
entry: str = item.get("entry", "")
error: bool = item.get("error", False)
_anthropic_history.append({
"role": "assistant",
"content": response.content
if error or path is None:
# Fall back to inline text for error entries
blocks.append({
"type": "text",
"text": f"### `{entry}`\n\nERROR: {item.get('content', 'unknown error')}",
})
continue
block_type = file_cache.content_block_type(path)
if block_type == "unsupported":
# Inline as plain text
blocks.append({
"type": "text",
"text": f"### `{entry}`\n\n```\n{item.get('content', '')}\n```",
})
continue
# Try to get/upload via Files API
file_id = file_cache.get_file_id(path)
if file_id is None:
# Unsupported or missing - inline fallback
blocks.append({
"type": "text",
"text": f"### `{entry}`\n\n```\n{item.get('content', '')}\n```",
})
continue
if block_type == "document":
blocks.append({
"type": "document",
"source": {
"type": "file",
"file_id": file_id,
},
"title": path.name,
"citations": {"enabled": False},
})
elif block_type == "image":
blocks.append({
"type": "image",
"source": {
"type": "file",
"file_id": file_id,
},
})
text_blocks = [b.text for b in response.content if hasattr(b, "text") and b.text]
tool_use_blocks = [
{"id": b.id, "name": b.name, "input": b.input}
for b in response.content
if b.type == "tool_use"
]
# Collect usage; cache fields are present when caching is active
usage_dict: dict = {}
if response.usage:
usage_dict["input_tokens"] = response.usage.input_tokens
usage_dict["output_tokens"] = response.usage.output_tokens
cache_creation = getattr(response.usage, "cache_creation_input_tokens", None)
cache_read = getattr(response.usage, "cache_read_input_tokens", None)
if cache_creation is not None:
usage_dict["cache_creation_input_tokens"] = cache_creation
if cache_read is not None:
usage_dict["cache_read_input_tokens"] = cache_read
_append_comms("IN", "response", {
"round": round_idx,
"stop_reason": response.stop_reason,
"text": "\n".join(text_blocks),
"tool_calls": tool_use_blocks,
"usage": usage_dict,
# Screenshots as image blocks
for item in (screenshot_items or []):
path = item.get("path")
if path is None:
continue
block_type = file_cache.content_block_type(path)
if block_type != "image":
continue
file_id = file_cache.get_file_id(path)
if file_id:
blocks.append({
"type": "image",
"source": {
"type": "file",
"file_id": file_id,
},
})
if response.stop_reason != "tool_use":
break
# Put cache_control on the last block
if blocks:
blocks[-1]["cache_control"] = {"type": "ephemeral"}
tool_results = []
for block in response.content:
if block.type == "tool_use" and block.name == TOOL_NAME:
script = block.input.get("script", "")
_append_comms("OUT", "tool_call", {
"name": TOOL_NAME,
"id": block.id,
"script": script,
})
output = _run_script(script, base_dir)
_append_comms("IN", "tool_result", {
"name": TOOL_NAME,
"id": block.id,
"output": output,
})
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
})
return blocks
if not tool_results:
break
_anthropic_history.append({
"role": "user",
"content": tool_results,
})
def _run_anthropic_loop(
user_content: list[dict],
user_message: str,
base_dir: str,
log_summary: str,
) -> str:
"""
Core Anthropic message loop shared by both send paths.
Appends the user turn to _anthropic_history, runs the tool loop,
and returns the final assistant text.
"""
global _anthropic_history
_append_comms("OUT", "tool_result_send", {
"results": [
{"tool_use_id": r["tool_use_id"], "content": r["content"]}
for r in tool_results
],
})
_anthropic_history.append({"role": "user", "content": user_content})
text_parts = [
block.text
for block in response.content
if hasattr(block, "text") and block.text
_append_comms("OUT", "request", {
"message": log_summary,
})
for round_idx in range(MAX_TOOL_ROUNDS):
response = _anthropic_client.messages.create(
model=_model,
max_tokens=8096,
system=[
{
"type": "text",
"text": _ANTHROPIC_SYSTEM,
"cache_control": {"type": "ephemeral"},
}
],
tools=_ANTHROPIC_TOOLS,
messages=_anthropic_history,
extra_headers={"anthropic-beta": "files-api-2025-04-14"},
)
_anthropic_history.append({
"role": "assistant",
"content": response.content
})
text_blocks = [b.text for b in response.content if hasattr(b, "text") and b.text]
tool_use_blocks = [
{"id": b.id, "name": b.name, "input": b.input}
for b in response.content
if b.type == "tool_use"
]
return "\n".join(text_parts)
usage_dict: dict = {}
if response.usage:
usage_dict["input_tokens"] = response.usage.input_tokens
usage_dict["output_tokens"] = response.usage.output_tokens
cache_creation = getattr(response.usage, "cache_creation_input_tokens", None)
cache_read = getattr(response.usage, "cache_read_input_tokens", None)
if cache_creation is not None:
usage_dict["cache_creation_input_tokens"] = cache_creation
if cache_read is not None:
usage_dict["cache_read_input_tokens"] = cache_read
_append_comms("IN", "response", {
"round": round_idx,
"stop_reason": response.stop_reason,
"text": "\n".join(text_blocks),
"tool_calls": tool_use_blocks,
"usage": usage_dict,
})
if response.stop_reason != "tool_use":
break
tool_results = []
for block in response.content:
if block.type == "tool_use" and block.name == TOOL_NAME:
script = block.input.get("script", "")
_append_comms("OUT", "tool_call", {
"name": TOOL_NAME,
"id": block.id,
"script": script,
})
output = _run_script(script, base_dir)
_append_comms("IN", "tool_result", {
"name": TOOL_NAME,
"id": block.id,
"output": output,
})
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
})
if not tool_results:
break
_anthropic_history.append({
"role": "user",
"content": tool_results,
})
_append_comms("OUT", "tool_result_send", {
"results": [
{"tool_use_id": r["tool_use_id"], "content": r["content"]}
for r in tool_results
],
})
text_parts = [
block.text
for block in response.content
if hasattr(block, "text") and block.text
]
return "\n".join(text_parts)
def _send_anthropic_files(
md_content: str,
user_message: str,
base_dir: str,
file_items: list[dict],
) -> str:
"""
Files API send path. Uploads each file individually and sends document/image
blocks instead of inlining everything as text. Falls back to chunked text
on any upload error.
The discussion history section of md_content is extracted and sent inline
as a text block (it's small and changes each session so not worth uploading).
"""
import anthropic
_ensure_anthropic_client()
# Extract just the Discussion History section to send inline.
# Everything else comes via file blocks.
discussion_section = ""
files_marker = "\n\n---\n\n## Files\n\n"
split_idx = md_content.find(files_marker)
if split_idx != -1:
discussion_section = md_content[:split_idx]
else:
# No files section - the whole thing is discussion/screenshots
discussion_section = md_content
try:
context_blocks = _build_files_context_blocks(discussion_section, file_items)
except Exception as upload_err:
_append_comms("OUT", "request", {
"message": f"[Files API upload failed: {upload_err}] falling back to chunked text",
})
return _send_anthropic_chunked(md_content, user_message, base_dir)
user_content = context_blocks + [
{
"type": "text",
"text": user_message,
}
]
log_summary = (
f"[Files API: {len(file_items)} file(s) as document/image blocks, "
f"discussion section {len(discussion_section)} chars inline]\n\n{user_message}"
)
return _run_anthropic_loop(user_content, user_message, base_dir, log_summary)
def _send_anthropic_chunked(md_content: str, user_message: str, base_dir: str) -> str:
"""
Chunked text fallback path. Splits md_content into <=_ANTHROPIC_CHUNK_SIZE
char blocks, sends them all as text content blocks with cache_control on
the last one, then appends the user question.
"""
_ensure_anthropic_client()
context_blocks = _build_chunked_context_blocks(md_content)
user_content = context_blocks + [
{
"type": "text",
"text": user_message,
}
]
n_chunks = len(context_blocks)
log_summary = (
f"[Chunked text: {n_chunks} chunk(s), "
f"{len(md_content)} chars total]\n\n{user_message}"
)
return _run_anthropic_loop(user_content, user_message, base_dir, log_summary)
def _send_anthropic(
md_content: str,
user_message: str,
base_dir: str,
file_items: list[dict] | None = None,
) -> str:
"""
Entry point for Anthropic sends. Routes to the Files API path when
file_items are provided, otherwise falls back to chunked text.
"""
try:
if file_items:
return _send_anthropic_files(md_content, user_message, base_dir, file_items)
else:
return _send_anthropic_chunked(md_content, user_message, base_dir)
except ProviderError:
raise
except Exception as exc:
@@ -541,10 +759,24 @@ def _send_anthropic(md_content: str, user_message: str, base_dir: str) -> str:
# ------------------------------------------------------------------ unified send
def send(md_content: str, user_message: str, base_dir: str = ".") -> str:
def send(
md_content: str,
user_message: str,
base_dir: str = ".",
file_items: list[dict] | None = None,
) -> str:
"""
Send a message to the active provider.
md_content : aggregated markdown string from aggregate.run()
user_message: the user's question / instruction
base_dir : project base directory (for PowerShell tool calls)
file_items : optional list of file dicts from aggregate.build_file_items();
when provided and provider is anthropic, files are uploaded
via the Files API rather than inlined as text
"""
if _provider == "gemini":
return _send_gemini(md_content, user_message, base_dir)
elif _provider == "anthropic":
return _send_anthropic(md_content, user_message, base_dir)
raise ValueError(f"unknown provider: {_provider}")
return _send_anthropic(md_content, user_message, base_dir, file_items)
raise ValueError(f"unknown provider: {_provider}")