6 Commits

Author SHA1 Message Date
ed a52f3a2ef8 conductor(plan): Mark phase 'Phase 1: Metric Extraction and Logic Review' as complete 2026-02-23 13:35:15 -05:00
ed 2668f88e8a conductor(checkpoint): Checkpoint end of Phase 1 2026-02-23 13:34:18 -05:00
ed ac51ded52b docs(conductor): Update plan for api_metrics_20260223 2026-02-23 13:29:22 -05:00
ed f10a2f2ffa feat(conductor): Expose history bleed flags
This change introduces a new function, get_history_bleed_stats, to calculate and expose how close the current conversation history is to the provider's token limit. The initial implementation supports Anthropic, with a placeholder for Gemini.
2026-02-23 13:29:06 -05:00
ed c61fcc6333 docs(conductor): Update plan for api_metrics_20260223 2026-02-23 13:28:20 -05:00
ed 8aa70e287f fix(conductor): Implement Gemini cache metrics
This change corrects the implementation of get_gemini_cache_stats to use the Gemini client instance and updates the corresponding test to use proper mocking.
2026-02-23 13:27:49 -05:00
6 changed files with 184 additions and 32 deletions
BIN
View File
Binary file not shown.
+73 -22
View File
@@ -18,6 +18,7 @@ import datetime
from pathlib import Path
import file_cache
import mcp_client
import google.genai
_provider: str = "gemini"
_model: str = "gemini-2.5-flash"
@@ -241,6 +242,22 @@ def reset_session():
_CACHED_ANTHROPIC_TOOLS = None
file_cache.reset_client()
def get_gemini_cache_stats() -> dict:
"""
Retrieves statistics about the Gemini caches, such as count and total size.
"""
_ensure_gemini_client()
caches_iterator = _gemini_client.caches.list()
caches = list(caches_iterator)
total_size_bytes = sum(c.size_bytes for c in caches)
return {
"cache_count": len(list(caches)),
"total_size_bytes": total_size_bytes,
}
# ------------------------------------------------------------------ model listing
@@ -254,9 +271,9 @@ def list_models(provider: str) -> list[str]:
def _list_gemini_models(api_key: str) -> list[str]:
from google import genai
# from google import genai # Removed
try:
client = genai.Client(api_key=api_key)
client = google.genai.Client(api_key=api_key)
models = []
for m in client.models.list():
name = m.name
@@ -348,7 +365,7 @@ def _get_anthropic_tools() -> list[dict]:
def _gemini_tool_declaration():
from google.genai import types
# from google.genai import types # Removed
declarations = []
@@ -358,15 +375,15 @@ def _gemini_tool_declaration():
continue
props = {}
for pname, pdef in spec["parameters"].get("properties", {}).items():
props[pname] = types.Schema(
type=types.Type.STRING,
props[pname] = google.genai.types.Schema(
type=google.genai.types.Type.STRING,
description=pdef.get("description", ""),
)
declarations.append(types.FunctionDeclaration(
declarations.append(google.genai.types.FunctionDeclaration(
name=spec["name"],
description=spec["description"],
parameters=types.Schema(
type=types.Type.OBJECT,
parameters=google.genai.types.Schema(
type=google.genai.types.Type.OBJECT,
properties=props,
required=spec["parameters"].get("required", []),
),
@@ -374,7 +391,7 @@ def _gemini_tool_declaration():
# PowerShell tool
if _agent_tools.get(TOOL_NAME, True):
declarations.append(types.FunctionDeclaration(
declarations.append(google.genai.types.FunctionDeclaration(
name=TOOL_NAME,
description=(
"Run a PowerShell script within the project base_dir. "
@@ -382,11 +399,11 @@ def _gemini_tool_declaration():
"The working directory is set to base_dir automatically. "
"stdout and stderr are returned to you as the result."
),
parameters=types.Schema(
type=types.Type.OBJECT,
parameters=google.genai.types.Schema(
type=google.genai.types.Type.OBJECT,
properties={
"script": types.Schema(
type=types.Type.STRING,
"script": google.genai.types.Schema(
type=google.genai.types.Type.STRING,
description="The PowerShell script to execute."
)
},
@@ -394,7 +411,7 @@ def _gemini_tool_declaration():
),
))
return types.Tool(function_declarations=declarations) if declarations else None
return google.genai.types.Tool(function_declarations=declarations) if declarations else None
def _run_script(script: str, base_dir: str) -> str:
@@ -489,9 +506,9 @@ def _content_block_to_dict(block) -> dict:
def _ensure_gemini_client():
global _gemini_client
if _gemini_client is None:
from google import genai
# from google import genai # Removed
creds = _load_credentials()
_gemini_client = genai.Client(api_key=creds["gemini"]["api_key"])
_gemini_client = google.genai.Client(api_key=creds["gemini"]["api_key"])
@@ -508,7 +525,7 @@ def _get_gemini_history_list(chat):
def _send_gemini(md_content: str, user_message: str, base_dir: str, file_items: list[dict] | None = None) -> str:
global _gemini_chat, _gemini_cache, _gemini_cache_md_hash, _gemini_cache_created_at
from google.genai import types
# from google.genai import types # Removed
try:
_ensure_gemini_client(); mcp_client.configure(file_items or [], [base_dir])
sys_instr = f"{_get_combined_system_prompt()}\n\n<context>\n{md_content}\n</context>"
@@ -541,29 +558,29 @@ def _send_gemini(md_content: str, user_message: str, base_dir: str, file_items:
_append_comms("OUT", "request", {"message": f"[CACHE TTL] Rebuilding cache (expired after {int(elapsed)}s)..."})
if not _gemini_chat:
chat_config = types.GenerateContentConfig(
chat_config = google.genai.types.GenerateContentConfig(
system_instruction=sys_instr,
tools=tools_decl,
temperature=_temperature,
max_output_tokens=_max_tokens,
safety_settings=[types.SafetySetting(category="HARM_CATEGORY_DANGEROUS_CONTENT", threshold="BLOCK_ONLY_HIGH")]
safety_settings=[google.genai.types.SafetySetting(category="HARM_CATEGORY_DANGEROUS_CONTENT", threshold="BLOCK_ONLY_HIGH")]
)
try:
# Gemini requires 1024 (Flash) or 4096 (Pro) tokens to cache.
_gemini_cache = _gemini_client.caches.create(
model=_model,
config=types.CreateCachedContentConfig(
config=google.genai.types.CreateCachedContentConfig(
system_instruction=sys_instr,
tools=tools_decl,
ttl=f"{_GEMINI_CACHE_TTL}s",
)
)
_gemini_cache_created_at = time.time()
chat_config = types.GenerateContentConfig(
chat_config = google.genai.types.GenerateContentConfig(
cached_content=_gemini_cache.name,
temperature=_temperature,
max_output_tokens=_max_tokens,
safety_settings=[types.SafetySetting(category="HARM_CATEGORY_DANGEROUS_CONTENT", threshold="BLOCK_ONLY_HIGH")]
safety_settings=[google.genai.types.SafetySetting(category="HARM_CATEGORY_DANGEROUS_CONTENT", threshold="BLOCK_ONLY_HIGH")]
)
_append_comms("OUT", "request", {"message": f"[CACHE CREATED] {_gemini_cache.name}"})
except Exception as e:
@@ -1129,3 +1146,37 @@ def send(
elif _provider == "anthropic":
return _send_anthropic(md_content, user_message, base_dir, file_items)
raise ValueError(f"unknown provider: {_provider}")
def get_history_bleed_stats() -> dict:
"""
Calculates how close the current conversation history is to the token limit.
"""
if _provider == "anthropic":
# For Anthropic, we have a robust estimator
current_tokens = _estimate_prompt_tokens([], _anthropic_history)
limit_tokens = _ANTHROPIC_MAX_PROMPT_TOKENS
percentage = (current_tokens / limit_tokens) * 100 if limit_tokens > 0 else 0
return {
"provider": "anthropic",
"limit": limit_tokens,
"current": current_tokens,
"percentage": percentage,
}
elif _provider == "gemini":
# For Gemini, token estimation is complex and handled by the server.
# We don't have a reliable client-side estimate, so we return a
# "not implemented" state for now.
return {
"provider": "gemini",
"limit": _GEMINI_MAX_INPUT_TOKENS,
"current": 0,
"percentage": 0,
}
# Default empty state
return {
"provider": _provider,
"limit": 0,
"current": 0,
"percentage": 0,
}
+1 -1
View File
@@ -9,6 +9,6 @@ This file tracks all major tracks for the project. Each track has its own detail
---
- [ ] **Track: Review vendor api usage in regards to conservative context handling**
- [~] **Track: Review vendor api usage in regards to conservative context handling**
*Link: [./tracks/api_metrics_20260223/](./tracks/api_metrics_20260223/)*
@@ -1,13 +1,13 @@
# Implementation Plan
## Phase 1: Metric Extraction and Logic Review
- [ ] Task: Extract explicit cache counts and lifecycle states from Gemini SDK
- [ ] Sub-task: Write Tests
- [ ] Sub-task: Implement Feature
- [ ] Task: Review and expose 'history bleed' (token limit proximity) flags
- [ ] Sub-task: Write Tests
- [ ] Sub-task: Implement Feature
- [ ] Task: Conductor - User Manual Verification 'Phase 1: Metric Extraction and Logic Review' (Protocol in workflow.md)
## Phase 1: Metric Extraction and Logic Review [checkpoint: 2668f88]
- [x] Task: Extract explicit cache counts and lifecycle states from Gemini SDK
- [x] Sub-task: Write Tests
- [x] Sub-task: Implement Feature
- [x] Task: Review and expose 'history bleed' (token limit proximity) flags
- [x] Sub-task: Write Tests
- [x] Sub-task: Implement Feature
- [x] Task: Conductor - User Manual Verification 'Phase 1: Metric Extraction and Logic Review' (Protocol in workflow.md)
## Phase 2: GUI Telemetry and Plotting
- [ ] Task: Implement token budget visualizer (e.g., Progress bars for limits) in Dear PyGui
+45
View File
@@ -0,0 +1,45 @@
import pytest
from unittest.mock import MagicMock, patch
# Import the necessary functions from ai_client, including the reset helper
from ai_client import get_gemini_cache_stats, reset_session
def test_get_gemini_cache_stats_with_mock_client():
"""
Test that get_gemini_cache_stats correctly processes cache lists
from a mocked client instance.
"""
# Ensure a clean state before the test by resetting the session
reset_session()
# 1. Create a mock for the cache object that the client will return
mock_cache = MagicMock()
mock_cache.name = "cachedContents/test-cache"
mock_cache.display_name = "Test Cache"
mock_cache.model = "models/gemini-1.5-pro-001"
mock_cache.size_bytes = 1024
# 2. Create a mock for the client instance
mock_client_instance = MagicMock()
# Configure its `caches.list` method to return our mock cache
mock_client_instance.caches.list.return_value = [mock_cache]
# 3. Patch the Client constructor to return our mock instance
# This intercepts the `_ensure_gemini_client` call inside the function
with patch('google.genai.Client', return_value=mock_client_instance) as mock_client_constructor:
# 4. Call the function under test
stats = get_gemini_cache_stats()
# 5. Assert that the function behaved as expected
# It should have constructed the client
mock_client_constructor.assert_called_once()
# It should have called the `list` method on the `caches` attribute
mock_client_instance.caches.list.assert_called_once()
# The returned stats dictionary should be correct
assert "cache_count" in stats
assert "total_size_bytes" in stats
assert stats["cache_count"] == 1
assert stats["total_size_bytes"] == 1024
+56
View File
@@ -0,0 +1,56 @@
import pytest
from unittest.mock import patch, MagicMock
# Import the module to be tested
import ai_client
@pytest.fixture(autouse=True)
def reset_ai_client_session():
"""Fixture to automatically reset the ai_client session before each test."""
ai_client.reset_session()
def test_anthropic_history_bleed_calculation():
"""
Tests that get_history_bleed_stats calculates the token usage
percentage correctly for the Anthropic provider.
"""
# 1. Set up the test environment
ai_client.set_provider("anthropic", "claude-3-opus-20240229")
# Define the mock return value for the token estimator
mock_token_count = 150_000
# The hardcoded limit in the module is 180_000
expected_percentage = (mock_token_count / 180_000) * 100
# 2. Mock the internal dependencies
# We patch _estimate_prompt_tokens as it's the core of the calculation for anthropic
with patch('ai_client._estimate_prompt_tokens', return_value=mock_token_count) as mock_estimator:
# 3. Call the function under test (which doesn't exist yet)
stats = ai_client.get_history_bleed_stats()
# 4. Assert the results
assert stats["provider"] == "anthropic"
assert stats["limit"] == 180_000
assert stats["current"] == mock_token_count
assert stats["percentage"] == pytest.approx(expected_percentage)
# Ensure the mock was called
mock_estimator.assert_called_once()
def test_gemini_history_bleed_not_implemented():
"""
Tests that get_history_bleed_stats returns a 'not implemented' state
for Gemini, as its token calculation is different.
"""
# 1. Set up the test environment
ai_client.set_provider("gemini", "gemini-1.5-pro-latest")
# 2. Call the function
stats = ai_client.get_history_bleed_stats()
# 3. Assert the 'not implemented' state
assert stats["provider"] == "gemini"
assert stats["limit"] == 900_000 # The constant _GEMINI_MAX_INPUT_TOKENS
assert stats["current"] == 0
assert stats["percentage"] == 0