checkpoint: Working on getting gemini cli to actually have parity with gemini api.

This commit is contained in:
2026-02-26 00:31:33 -05:00
parent cbe359b1a5
commit a70680b2a2
14 changed files with 710 additions and 243 deletions

View File

@@ -13,6 +13,7 @@ during chat creation to avoid massive history bloat.
# ai_client.py # ai_client.py
import tomllib import tomllib
import json import json
import sys
import time import time
import datetime import datetime
import hashlib import hashlib
@@ -267,7 +268,16 @@ def _classify_deepseek_error(exc: Exception) -> ProviderError:
def set_provider(provider: str, model: str): def set_provider(provider: str, model: str):
global _provider, _model global _provider, _model
_provider = provider _provider = provider
_model = model
if provider == "gemini_cli":
valid_models = _list_gemini_cli_models()
# If model is invalid or belongs to another provider (like deepseek), force default
if model not in valid_models or model.startswith("deepseek"):
_model = "gemini-3-flash-preview"
else:
_model = model
else:
_model = model
@@ -298,6 +308,7 @@ def reset_session():
_gemini_cache_created_at = None _gemini_cache_created_at = None
if _gemini_cli_adapter: if _gemini_cli_adapter:
_gemini_cli_adapter.session_id = None _gemini_cli_adapter.session_id = None
_gemini_cli_adapter = None
_anthropic_client = None _anthropic_client = None
with _anthropic_history_lock: with _anthropic_history_lock:
_anthropic_history = [] _anthropic_history = []
@@ -336,9 +347,26 @@ def list_models(provider: str) -> list[str]:
return _list_anthropic_models() return _list_anthropic_models()
elif provider == "deepseek": elif provider == "deepseek":
return _list_deepseek_models(creds["deepseek"]["api_key"]) return _list_deepseek_models(creds["deepseek"]["api_key"])
elif provider == "gemini_cli":
return _list_gemini_cli_models()
return [] return []
def _list_gemini_cli_models() -> list[str]:
"""
List available Gemini models for the CLI.
Since the CLI doesn't have a direct 'list models' command yet,
we return a curated list of supported models based on CLI metadata.
"""
return [
"gemini-3-flash-preview",
"gemini-3.1-pro-preview",
"gemini-2.5-pro",
"gemini-2.5-flash",
"gemini-2.5-flash-lite",
]
def _list_gemini_models(api_key: str) -> list[str]: def _list_gemini_models(api_key: str) -> list[str]:
try: try:
@@ -845,32 +873,44 @@ def _send_gemini_cli(md_content: str, user_message: str, base_dir: str,
if _gemini_cli_adapter is None: if _gemini_cli_adapter is None:
_gemini_cli_adapter = GeminiCliAdapter(binary_path="gemini") _gemini_cli_adapter = GeminiCliAdapter(binary_path="gemini")
adapter = _gemini_cli_adapter
mcp_client.configure(file_items or [], [base_dir]) mcp_client.configure(file_items or [], [base_dir])
# If it's a new session (session_id is None), we should ideally send the context. # Construct the system instruction, combining the base system prompt and the current context.
sys_instr = f"{_get_combined_system_prompt()}\n\n<context>\n{md_content}\n</context>"
safety_settings = [{'category': 'HARM_CATEGORY_DANGEROUS_CONTENT', 'threshold': 'BLOCK_ONLY_HIGH'}]
# Initial payload for the first message
payload = user_message payload = user_message
if _gemini_cli_adapter.session_id is None: if adapter.session_id is None:
# Prepend context and discussion history to the first message
full_prompt = f"{_get_combined_system_prompt()}\n\n<context>\n{md_content}\n</context>\n\n"
if discussion_history: if discussion_history:
full_prompt += f"[DISCUSSION HISTORY]\n\n{discussion_history}\n\n---\n\n" payload = f"[DISCUSSION HISTORY]\n\n{discussion_history}\n\n---\n\n{user_message}"
full_prompt += user_message
payload = full_prompt
all_text = [] all_text = []
_cumulative_tool_bytes = 0 _cumulative_tool_bytes = 0
for r_idx in range(MAX_TOOL_ROUNDS + 2): for r_idx in range(MAX_TOOL_ROUNDS + 2):
if adapter is None:
break
events.emit("request_start", payload={"provider": "gemini_cli", "model": _model, "round": r_idx}) events.emit("request_start", payload={"provider": "gemini_cli", "model": _model, "round": r_idx})
_append_comms("OUT", "request", {"message": f"[CLI] [round {r_idx}] [msg {len(payload)}]"}) _append_comms("OUT", "request", {"message": f"[CLI] [round {r_idx}] [msg {len(payload)}]"})
resp_data = _gemini_cli_adapter.send(payload) resp_data = adapter.send(payload, safety_settings=safety_settings, system_instruction=sys_instr, model=_model)
# Log any stderr from the CLI for transparency
cli_stderr = resp_data.get("stderr", "")
if cli_stderr:
sys.stderr.write(f"\n--- Gemini CLI stderr ---\n{cli_stderr}\n-------------------------\n")
sys.stderr.flush()
txt = resp_data.get("text", "") txt = resp_data.get("text", "")
if txt: all_text.append(txt) if txt: all_text.append(txt)
calls = resp_data.get("tool_calls", []) calls = resp_data.get("tool_calls", [])
usage = _gemini_cli_adapter.last_usage or {} usage = adapter.last_usage or {}
latency = _gemini_cli_adapter.last_latency latency = adapter.last_latency
events.emit("response_received", payload={"provider": "gemini_cli", "model": _model, "usage": usage, "latency": latency, "round": r_idx}) events.emit("response_received", payload={"provider": "gemini_cli", "model": _model, "usage": usage, "latency": latency, "round": r_idx})

View File

@@ -1,6 +1,6 @@
[ai] [ai]
provider = "deepseek" provider = "gemini_cli"
model = "deepseek-chat" model = "gemini-3-flash-preview"
temperature = 0.0 temperature = 0.0
max_tokens = 8192 max_tokens = 8192
history_trunc_limit = 8000 history_trunc_limit = 8000

View File

@@ -3,6 +3,7 @@ import json
import sys import sys
import time import time
import os import os
import session_logger # Import session_logger
class GeminiCliAdapter: class GeminiCliAdapter:
def __init__(self, binary_path="gemini"): def __init__(self, binary_path="gemini"):
@@ -11,17 +12,45 @@ class GeminiCliAdapter:
self.session_id = None self.session_id = None
self.last_latency = 0.0 self.last_latency = 0.0
def send(self, message): def count_tokens(self, contents: list[str]) -> int:
"""
Counts the tokens for a list of string contents using a character-based estimation.
Approximates tokens by assuming 4 characters per token.
This replaces the broken 'gemini count' CLI call.
"""
input_text = "\n".join(contents)
total_chars = len(input_text)
estimated_tokens = total_chars // 4
return estimated_tokens
def send(self, message, safety_settings=None, system_instruction=None, model: str = None):
""" """
Sends a message to the Gemini CLI and processes the streaming JSON output. Sends a message to the Gemini CLI and processes the streaming JSON output.
Logs the CLI call details using session_logger.log_cli_call.
System instruction is prepended to the message.
Uses --prompt flag with a placeholder and sends the content via stdin.
""" """
start_time = time.time() start_time = time.time()
# On Windows, using shell=True allows executing .cmd/.bat files and
# handles command strings with arguments more gracefully. command_parts = [self.binary_path]
# We pass the message via stdin to avoid command-line length limits.
command = f'{self.binary_path} run --output-format stream-json' if model:
command_parts.extend(['-m', f'"{model}"'])
# Use an empty string placeholder.
command_parts.extend(['--prompt', '""'])
if self.session_id: if self.session_id:
command += f' --resume {self.session_id}' command_parts.extend(['--resume', self.session_id])
command_parts.extend(['--output-format', 'stream-json'])
command = " ".join(command_parts)
# Construct the prompt text by prepending system_instruction if available
prompt_text = message
if system_instruction:
prompt_text = f"{system_instruction}\n\n{message}"
accumulated_text = "" accumulated_text = ""
tool_calls = [] tool_calls = []
@@ -29,65 +58,79 @@ class GeminiCliAdapter:
env = os.environ.copy() env = os.environ.copy()
env["GEMINI_CLI_HOOK_CONTEXT"] = "manual_slop" env["GEMINI_CLI_HOOK_CONTEXT"] = "manual_slop"
process = subprocess.Popen( process = None
command, stdout_content = ""
stdin=subprocess.PIPE, stderr_content = ""
stdout=subprocess.PIPE, stdin_content = prompt_text
stderr=subprocess.PIPE,
text=True,
shell=True,
env=env
)
try: try:
# Send message to stdin and close it process = subprocess.Popen(
process.stdin.write(message) command,
process.stdin.close() stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
shell=True,
env=env
)
# Read stdout line by line stdout_output, stderr_output = process.communicate(input=prompt_text)
for line in process.stdout:
stdout_content = stdout_output
stderr_content = stderr_output
for line in stdout_content.splitlines():
line = line.strip() line = line.strip()
if not line: if not line:
continue continue
try: try:
data = json.loads(line) data = json.loads(line)
msg_type = data.get("type") msg_type = data.get("type")
if msg_type == "message": if msg_type == "init":
# Append message text to results if "session_id" in data:
accumulated_text += data.get("text", "") self.session_id = data.get("session_id")
elif msg_type == "message":
content = data.get("content", data.get("text"))
if content:
accumulated_text += content
elif msg_type == "result": elif msg_type == "result":
# Capture final usage and session persistence self.last_usage = data.get("stats") or data.get("usage")
# Support both mock ('usage') and real ('stats') keys if "session_id" in data:
self.last_usage = data.get("usage") or data.get("stats") self.session_id = data.get("session_id")
self.session_id = data.get("session_id")
elif msg_type == "tool_use": elif msg_type == "tool_use":
# Collect tool_use messages # Standardize format for ai_client.py
tool_calls.append(data) # Real CLI might use 'tool_name'/'tool_id'/'parameters'
# Log status/tool_use to stderr for debugging # or 'name'/'id'/'args'. We'll map to 'name'/'id'/'args'.
sys.stderr.write(f"GeminiCliAdapter [{msg_type}]: {line}\n") tc = {
sys.stderr.flush() "name": data.get("tool_name", data.get("name")),
"args": data.get("parameters", data.get("args", {})),
elif msg_type == "status": "id": data.get("tool_id", data.get("id"))
# Log status to stderr for debugging }
sys.stderr.write(f"GeminiCliAdapter [{msg_type}]: {line}\n") if tc["name"]:
sys.stderr.flush() tool_calls.append(tc)
except json.JSONDecodeError: except json.JSONDecodeError:
# Skip lines that are not valid JSON
continue continue
process.wait()
except Exception as e: except Exception as e:
process.kill() if process:
process.kill()
raise e raise e
finally: finally:
self.last_latency = time.time() - start_time current_latency = time.time() - start_time
if process:
session_logger.open_session()
session_logger.log_cli_call(
command=command,
stdin_content=stdin_content,
stdout_content=stdout_content,
stderr_content=stderr_content,
latency=current_latency
)
self.last_latency = current_latency
return { return {
"text": accumulated_text, "text": accumulated_text,
"tool_calls": tool_calls "tool_calls": tool_calls,
"stderr": stderr_content
} }

28
reproduce_no_text.py Normal file
View File

@@ -0,0 +1,28 @@
import json
import subprocess
import os
import time
import sys
# Add project root to sys.path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".")))
from gemini_cli_adapter import GeminiCliAdapter
def test_repro():
adapter = GeminiCliAdapter(binary_path="gemini")
# Using a simple message
message = "say hello"
print(f"Sending message: '{message}'")
result = adapter.send(message, model="gemini-3-flash-preview")
print("\n--- Result ---")
print(f"Text: '{result.get('text')}'")
print(f"Tool Calls: {result.get('tool_calls')}")
print(f"Usage: {adapter.last_usage}")
print(f"Session ID: {adapter.session_id}")
print(f"Stderr: {result.get('stderr')}")
if __name__ == "__main__":
test_repro()

View File

@@ -4,70 +4,149 @@ import logging
import os import os
# Add project root to sys.path so we can import api_hook_client # Add project root to sys.path so we can import api_hook_client
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) # This helps in cases where the script is run from different directories
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if project_root not in sys.path:
sys.path.append(project_root)
try: try:
from api_hook_client import ApiHookClient from api_hook_client import ApiHookClient
except ImportError: except ImportError:
# Fallback for if we are running from root or other locations # Fallback if the script is run from the project root directly,
sys.path.append(os.path.abspath(os.path.dirname(__file__))) # or if the above path append didn't work for some reason.
from api_hook_client import ApiHookClient try:
from api_hook_client import ApiHookClient
except ImportError:
# Use basic print for fatal errors if logging isn't set up yet
print("FATAL: Failed to import ApiHookClient. Ensure it's in the Python path.", file=sys.stderr)
sys.exit(1) # Exit if the core dependency cannot be imported
def main(): def main():
# Setup basic logging to stderr so it doesn't interfere with stdout JSON # Setup basic logging to stderr.
logging.basicConfig(level=logging.ERROR, stream=sys.stderr) # Set level to DEBUG to capture all messages, including debug info.
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', stream=sys.stderr)
logging.debug("CLI Tool Bridge script started.")
try: try:
# 1. Read JSON from sys.stdin # 1. Read JSON from sys.stdin
input_data = sys.stdin.read() input_data = sys.stdin.read()
if not input_data: if not input_data:
logging.debug("No input received from stdin. Exiting gracefully.")
return return
hook_input = json.loads(input_data) logging.debug(f"Received raw input data: {input_data}")
# 2. Extract 'tool_name' and 'tool_input' try:
tool_name = hook_input.get('tool_name') hook_input = json.loads(input_data)
tool_args = hook_input.get('tool_input', {}) except json.JSONDecodeError:
logging.error("Failed to decode JSON from stdin.")
print(json.dumps({
"decision": "deny",
"reason": "Invalid JSON received from stdin."
}))
return
# 3. Check context — if not running via Manual Slop, we pass through (allow) # Initialize variables for tool name and arguments
tool_name = None
tool_args = {}
# 2. Try to parse input in Gemini API format ('name', 'input')
logging.debug("Attempting to parse input in Gemini API format ('name', 'input').")
if 'name' in hook_input and hook_input['name'] is not None:
tool_name = hook_input['name']
logging.debug(f"Found Gemini API format tool name: {tool_name}")
if 'input' in hook_input and hook_input['input'] is not None:
if isinstance(hook_input['input'], dict):
tool_args = hook_input['input']
logging.debug(f"Found Gemini API format tool input: {tool_args}")
else:
logging.warning("Gemini API format 'input' is not a dictionary. Ignoring.")
# 3. If Gemini format wasn't fully present, try the legacy format ('tool_name', 'tool_input')
if tool_name is None:
logging.debug("Gemini API format not fully detected. Falling back to legacy format ('tool_name', 'tool_input').")
tool_name = hook_input.get('tool_name')
if tool_name:
logging.debug(f"Found legacy format tool name: {tool_name}")
tool_input_legacy = hook_input.get('tool_input')
if tool_input_legacy is not None:
if isinstance(tool_input_legacy, dict):
tool_args = tool_input_legacy
logging.debug(f"Found legacy format tool input: {tool_args}")
else:
logging.warning("Legacy format 'tool_input' is not a dictionary. Ignoring.")
# Final checks on resolved tool_name and tool_args
if tool_name is None:
logging.error("Could not determine tool name from input.")
print(json.dumps({
"decision": "deny",
"reason": "Could not determine tool name from input. Expected 'name' or 'tool_name'."
}))
return
if not isinstance(tool_args, dict):
logging.error(f"Resolved tool_args is not a dictionary: {tool_args}")
print(json.dumps({
"decision": "deny",
"reason": "Resolved tool arguments are not in a valid dictionary format."
}))
return
logging.debug(f"Resolved tool_name: '{tool_name}', tool_args: {tool_args}")
# 4. Check context — if not running via Manual Slop, we pass through (allow)
# This prevents the hook from affecting normal CLI usage. # This prevents the hook from affecting normal CLI usage.
hook_context = os.environ.get("GEMINI_CLI_HOOK_CONTEXT") hook_context = os.environ.get("GEMINI_CLI_HOOK_CONTEXT")
if hook_context != "manual_slop": if hook_context != "manual_slop":
logging.debug("GEMINI_CLI_HOOK_CONTEXT not set to 'manual_slop'. Allowing execution without confirmation.")
print(json.dumps({ print(json.dumps({
"decision": "allow", "decision": "allow",
"reason": "Non-programmatic usage (GEMINI_CLI_HOOK_CONTEXT not set)." "reason": "Non-programmatic usage (GEMINI_CLI_HOOK_CONTEXT not set)."
})) }))
return return
# 4. Use 'ApiHookClient' (assuming GUI is on http://127.0.0.1:8999) # 5. Use 'ApiHookClient' (assuming GUI is on http://127.0.0.1:8999)
logging.debug("GEMINI_CLI_HOOK_CONTEXT is 'manual_slop'. Proceeding with API Hook Client.")
client = ApiHookClient(base_url="http://127.0.0.1:8999") client = ApiHookClient(base_url="http://127.0.0.1:8999")
try: try:
# 5. Request confirmation # 6. Request confirmation
# This is a blocking call that waits for the user in the GUI # This is a blocking call that waits for the user in the GUI
logging.debug(f"Requesting confirmation for tool '{tool_name}' with args: {tool_args}")
response = client.request_confirmation(tool_name, tool_args) response = client.request_confirmation(tool_name, tool_args)
if response and response.get('approved') is True: if response and response.get('approved') is True:
# 6. Print 'allow' decision # 7. Print 'allow' decision
logging.debug("User approved tool execution.")
print(json.dumps({"decision": "allow"})) print(json.dumps({"decision": "allow"}))
else: else:
# 7. Print 'deny' decision # 8. Print 'deny' decision
reason = response.get('reason', 'User rejected tool execution in GUI.') if response else 'No response from GUI.'
logging.debug(f"User denied tool execution. Reason: {reason}")
print(json.dumps({ print(json.dumps({
"decision": "deny", "decision": "deny",
"reason": "User rejected tool execution in GUI." "reason": reason
})) }))
except Exception as e: except Exception as e:
# 8. Handle cases where hook server is not reachable # 9. Handle cases where hook server is not reachable or other API errors
# If we ARE in manual_slop context but can't reach the server, we should DENY # If we ARE in manual_slop context but can't reach the server, we should DENY
# because the user expects to be in control. # because the user expects to be in control.
logging.error(f"API Hook Client error: {str(e)}", exc_info=True)
print(json.dumps({ print(json.dumps({
"decision": "deny", "decision": "deny",
"reason": f"Manual Slop hook server unreachable: {str(e)}" "reason": f"Manual Slop hook server unreachable or API error: {str(e)}"
})) }))
except Exception as e: except Exception as e:
# Fallback for unexpected parsing errors # Fallback for unexpected errors during initial processing (e.g., stdin read)
logging.error(f"An unexpected error occurred in the main bridge logic: {str(e)}", exc_info=True)
print(json.dumps({ print(json.dumps({
"decision": "deny", "decision": "deny",
"reason": f"Internal bridge error: {str(e)}" "reason": f"Internal bridge error: {str(e)}"

View File

@@ -1,16 +1,5 @@
# session_logger.py # session_logger.py
""" """
Note(Gemini):
Opens timestamped log/script files at startup and keeps them open for the
lifetime of the process.
File layout:
logs/comms_<ts>.log - every comms entry (direction/kind/payload) as JSON-L
logs/toolcalls_<ts>.log - sequential record of every tool invocation
scripts/generated/<ts>_<seq:04d>.ps1 - each PowerShell script the AI generated
"""
# session_logger.py
"""
Opens timestamped log/script files at startup and keeps them open for the Opens timestamped log/script files at startup and keeps them open for the
lifetime of the process. The next run of the GUI creates new files; the lifetime of the process. The next run of the GUI creates new files; the
previous run's files are simply closed when the process exits. previous run's files are simply closed when the process exits.
@@ -20,6 +9,7 @@ File layout
logs/ logs/
comms_<ts>.log - every comms entry (direction/kind/payload) as JSON-L comms_<ts>.log - every comms entry (direction/kind/payload) as JSON-L
toolcalls_<ts>.log - sequential record of every tool invocation toolcalls_<ts>.log - sequential record of every tool invocation
clicalls_<ts>.log - sequential record of every CLI subprocess call
scripts/generated/ scripts/generated/
<ts>_<seq:04d>.ps1 - each PowerShell script the AI generated, in order <ts>_<seq:04d>.ps1 - each PowerShell script the AI generated, in order
@@ -42,6 +32,7 @@ _seq_lock = threading.Lock()
_comms_fh = None # file handle: logs/comms_<ts>.log _comms_fh = None # file handle: logs/comms_<ts>.log
_tool_fh = None # file handle: logs/toolcalls_<ts>.log _tool_fh = None # file handle: logs/toolcalls_<ts>.log
_api_fh = None # file handle: logs/apihooks_<ts>.log - API hook calls _api_fh = None # file handle: logs/apihooks_<ts>.log - API hook calls
_cli_fh = None # file handle: logs/clicalls_<ts>.log - CLI subprocess calls
def _now_ts() -> str: def _now_ts() -> str:
@@ -54,7 +45,7 @@ def open_session():
opens the two log files for this session. Idempotent - a second call is opens the two log files for this session. Idempotent - a second call is
ignored. ignored.
""" """
global _ts, _comms_fh, _tool_fh, _api_fh, _seq global _ts, _comms_fh, _tool_fh, _api_fh, _cli_fh, _seq
if _comms_fh is not None: if _comms_fh is not None:
return # already open return # already open
@@ -68,16 +59,19 @@ def open_session():
_comms_fh = open(_LOG_DIR / f"comms_{_ts}.log", "w", encoding="utf-8", buffering=1) _comms_fh = open(_LOG_DIR / f"comms_{_ts}.log", "w", encoding="utf-8", buffering=1)
_tool_fh = open(_LOG_DIR / f"toolcalls_{_ts}.log", "w", encoding="utf-8", buffering=1) _tool_fh = open(_LOG_DIR / f"toolcalls_{_ts}.log", "w", encoding="utf-8", buffering=1)
_api_fh = open(_LOG_DIR / f"apihooks_{_ts}.log", "w", encoding="utf-8", buffering=1) _api_fh = open(_LOG_DIR / f"apihooks_{_ts}.log", "w", encoding="utf-8", buffering=1)
_cli_fh = open(_LOG_DIR / f"clicalls_{_ts}.log", "w", encoding="utf-8", buffering=1) # New log file handle
_tool_fh.write(f"# Tool-call log — session {_ts}\n\n") _tool_fh.write(f"# Tool-call log — session {_ts}\n\n")
_tool_fh.flush() _tool_fh.flush()
_cli_fh.write(f"# CLI Subprocess Call Log — session {_ts}\n\n") # Header for new log file
_cli_fh.flush()
atexit.register(close_session) atexit.register(close_session)
def close_session(): def close_session():
"""Flush and close both log files. Called on clean exit (optional).""" """Flush and close all log files. Called on clean exit (optional)."""
global _comms_fh, _tool_fh, _api_fh global _comms_fh, _tool_fh, _api_fh, _cli_fh
if _comms_fh: if _comms_fh:
_comms_fh.close() _comms_fh.close()
_comms_fh = None _comms_fh = None
@@ -87,6 +81,9 @@ def close_session():
if _api_fh: if _api_fh:
_api_fh.close() _api_fh.close()
_api_fh = None _api_fh = None
if _cli_fh: # Close the new log file handle
_cli_fh.close()
_cli_fh = None
def log_api_hook(method: str, path: str, payload: str): def log_api_hook(method: str, path: str, payload: str):
@@ -155,3 +152,26 @@ def log_tool_call(script: str, result: str, script_path: str | None):
pass pass
return str(ps1_path) if ps1_path else None return str(ps1_path) if ps1_path else None
def log_cli_call(command: str, stdin_content: str | None, stdout_content: str | None, stderr_content: str | None, latency: float):
"""
Log details of a CLI subprocess execution.
"""
if _cli_fh is None:
return
ts_entry = datetime.datetime.now().strftime("%H:%M:%S")
try:
log_data = {
"timestamp": ts_entry,
"command": command,
"stdin": stdin_content,
"stdout": stdout_content,
"stderr": stderr_content,
"latency_sec": latency
}
_cli_fh.write(json.dumps(log_data, ensure_ascii=False, default=str) + "\n")
_cli_fh.flush()
except Exception:
pass

View File

@@ -4,93 +4,99 @@ import subprocess
import os import os
def main(): def main():
# The GUI calls: <binary> run --output-format stream-json
# The prompt is now passed via stdin.
# Debug log to stderr # Debug log to stderr
sys.stderr.write(f"DEBUG: mock_gemini_cli called with args: {sys.argv}\n") sys.stderr.write(f"DEBUG: mock_gemini_cli called with args: {sys.argv}\n")
# Read prompt from stdin for debug # Read prompt from stdin
prompt = sys.stdin.read() try:
prompt = sys.stdin.read()
except EOFError:
prompt = ""
sys.stderr.write(f"DEBUG: Received prompt via stdin ({len(prompt)} chars)\n") sys.stderr.write(f"DEBUG: Received prompt via stdin ({len(prompt)} chars)\n")
sys.stderr.flush() sys.stderr.flush()
if "run" not in sys.argv: # Skip management commands
if len(sys.argv) > 1 and sys.argv[1] in ["mcp", "extensions", "skills", "hooks"]:
return return
# If the prompt contains tool results (indicated by "role": "tool"), # If the prompt contains tool results, provide final answer
# it means we are in the second round and should provide a final answer. if '"role": "tool"' in prompt or '"tool_call_id"' in prompt:
if '"role": "tool"' in prompt:
print(json.dumps({ print(json.dumps({
"type": "message", "type": "message",
"text": "I have processed the tool results. Everything looks good!" "role": "assistant",
"content": "I have processed the tool results. Everything looks good!"
}), flush=True) }), flush=True)
print(json.dumps({ print(json.dumps({
"type": "result", "type": "result",
"usage": {"total_tokens": 100}, "status": "success",
"stats": {"total_tokens": 100, "input_tokens": 80, "output_tokens": 20},
"session_id": "mock-session-final" "session_id": "mock-session-final"
}), flush=True) }), flush=True)
return return
# Simulate the 'BeforeTool' hook by calling the bridge directly. # Default flow: simulate a tool call
bridge_path = os.path.abspath("scripts/cli_tool_bridge.py") bridge_path = os.path.abspath("scripts/cli_tool_bridge.py")
# Using format that bridge understands
tool_call = { bridge_tool_call = {
"tool_name": "read_file", "name": "read_file",
"tool_input": {"path": "test.txt"} "input": {"path": "test.txt"}
} }
sys.stderr.write(f"DEBUG: Calling bridge at {bridge_path}\n") sys.stderr.write(f"DEBUG: Calling bridge at {bridge_path}\n")
sys.stderr.flush() sys.stderr.flush()
# Bridge reads from stdin
process = subprocess.Popen(
[sys.executable, bridge_path],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
env=os.environ # Ensure environment variables are inherited
)
stdout, stderr = process.communicate(input=json.dumps(tool_call))
sys.stderr.write(f"DEBUG: Bridge stdout: {stdout}\n")
sys.stderr.write(f"DEBUG: Bridge stderr: {stderr}\n")
sys.stderr.flush()
try: try:
process = subprocess.Popen(
[sys.executable, bridge_path],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
env=os.environ
)
stdout, stderr = process.communicate(input=json.dumps(bridge_tool_call))
sys.stderr.write(f"DEBUG: Bridge stdout: {stdout}\n")
sys.stderr.write(f"DEBUG: Bridge stderr: {stderr}\n")
decision_data = json.loads(stdout.strip()) decision_data = json.loads(stdout.strip())
decision = decision_data.get("decision") decision = decision_data.get("decision")
except Exception as e: except Exception as e:
sys.stderr.write(f"DEBUG: Failed to parse bridge output: {e}\n") sys.stderr.write(f"DEBUG: Bridge failed: {e}\n")
decision = "deny" decision = "deny"
# Output JSONL to stdout
if decision == "allow": if decision == "allow":
# Simulate REAL CLI field names for adapter normalization test
print(json.dumps({ print(json.dumps({
"type": "tool_use", "type": "tool_use",
"name": "read_file", "tool_name": "read_file",
"args": {"path": "test.txt"} "tool_id": "call_123",
"parameters": {"path": "test.txt"}
}), flush=True) }), flush=True)
print(json.dumps({ print(json.dumps({
"type": "message", "type": "message",
"text": "I read the file. It contains: 'Hello from mock!'" "role": "assistant",
"content": "I am reading the file now..."
}), flush=True) }), flush=True)
print(json.dumps({ print(json.dumps({
"type": "result", "type": "result",
"usage": {"total_tokens": 50}, "status": "success",
"stats": {"total_tokens": 50, "input_tokens": 40, "output_tokens": 10},
"session_id": "mock-session-123" "session_id": "mock-session-123"
}), flush=True) }), flush=True)
else: else:
print(json.dumps({ print(json.dumps({
"type": "message", "type": "message",
"text": f"Tool execution was denied. Decision: {decision}" "role": "assistant",
"content": f"Tool execution was denied. Decision: {decision}"
}), flush=True) }), flush=True)
print(json.dumps({ print(json.dumps({
"type": "result", "type": "result",
"usage": {"total_tokens": 10}, "status": "success",
"stats": {"total_tokens": 10, "input_tokens": 10, "output_tokens": 0},
"session_id": "mock-session-denied" "session_id": "mock-session-denied"
}), flush=True) }), flush=True)

View File

@@ -20,7 +20,7 @@ base_dir = "."
paths = [] paths = []
[gemini_cli] [gemini_cli]
binary_path = "gemini" binary_path = "\"C:\\projects\\manual_slop\\.venv\\Scripts\\python.exe\" \"C:\\projects\\manual_slop\\tests\\mock_gemini_cli.py\""
[agent.tools] [agent.tools]
run_powershell = true run_powershell = true

View File

@@ -16,8 +16,5 @@ history = [
[discussions."testing deepseek"] [discussions."testing deepseek"]
git_commit = "" git_commit = ""
last_updated = "2026-02-25T23:33:45" last_updated = "2026-02-26T00:29:48"
history = [ history = []
"@2026-02-25T23:33:21\nUser:\ntesting testing 123?",
"@2026-02-25T23:33:25\nAI:\nI see you're testing with \"testing testing 123\". I'm here and ready to help! I'm a helpful coding assistant with access to PowerShell and various tools for file operations and web access.\n\nWhat would you like me to help you with today? I can assist with:\n- Writing or editing code files\n- Searching for information\n- Reading or analyzing files\n- Running commands\n- Web searches\n- And much more!\n\nJust let me know what you need help with.",
]

View File

@@ -0,0 +1,17 @@
import pytest
from unittest.mock import patch, MagicMock
import ai_client
def test_list_models_gemini_cli():
"""
Verifies that 'ai_client.list_models' correctly returns a list of models
for the 'gemini_cli' provider.
"""
models = ai_client.list_models("gemini_cli")
assert "gemini-3.1-pro-preview" in models
assert "gemini-3-flash-preview" in models
assert "gemini-2.5-pro" in models
assert "gemini-2.5-flash" in models
assert "gemini-2.5-flash-lite" in models
assert len(models) == 5

View File

@@ -0,0 +1,53 @@
import unittest
from unittest.mock import patch, MagicMock
import io
import json
import sys
import os
# Add project root to sys.path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
# Import after path fix
from scripts.cli_tool_bridge import main
class TestCliToolBridgeMapping(unittest.TestCase):
def setUp(self):
os.environ['GEMINI_CLI_HOOK_CONTEXT'] = 'manual_slop'
@patch('sys.stdin', new_callable=io.StringIO)
@patch('sys.stdout', new_callable=io.StringIO)
@patch('api_hook_client.ApiHookClient.request_confirmation')
def test_mapping_from_api_format(self, mock_request, mock_stdout, mock_stdin):
"""
Verify that bridge correctly maps 'id', 'name', 'input' (Gemini API format)
into tool_name and tool_input for the hook client.
"""
api_tool_call = {
'id': 'call123',
'name': 'read_file',
'input': {'path': 'test.txt'}
}
# 1. Mock stdin with the API format JSON
mock_stdin.write(json.dumps(api_tool_call))
mock_stdin.seek(0)
# 2. Mock ApiHookClient to return approved
mock_request.return_value = {'approved': True}
# Run main
main()
# 3. Verify that request_confirmation was called with mapped values
# If it's not mapped, it will likely be called with None or fail
mock_request.assert_called_once_with('read_file', {'path': 'test.txt'})
# 4. Capture stdout and assert allow
output_str = mock_stdout.getvalue().strip()
self.assertTrue(output_str, "Stdout should not be empty")
output = json.loads(output_str)
self.assertEqual(output.get('decision'), 'allow')
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,175 @@
import unittest
from unittest.mock import patch, MagicMock, ANY
import json
import subprocess
import io
import sys
import os
# Ensure the project root is in sys.path to resolve imports correctly
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if project_root not in sys.path:
sys.path.append(project_root)
# Import the class to be tested
from gemini_cli_adapter import GeminiCliAdapter
# Mock the session_logger module to prevent file operations during tests.
mock_session_logger = MagicMock()
sys.modules['session_logger'] = mock_session_logger
class TestGeminiCliAdapterParity(unittest.TestCase):
def setUp(self):
"""Set up a fresh adapter instance and reset session state for each test."""
self.adapter = GeminiCliAdapter(binary_path="gemini")
self.adapter.session_id = None
self.adapter.last_usage = None
self.adapter.last_latency = 0.0
# Reset mock calls for session_logger for each test
mock_session_logger.reset_mock()
@patch('subprocess.Popen')
def test_count_tokens_uses_estimation(self, mock_popen):
"""
Test that count_tokens uses character-based estimation.
"""
contents_to_count = ["This is the first line.", "This is the second line."]
expected_chars = len("\n".join(contents_to_count))
expected_tokens = expected_chars // 4
token_count = self.adapter.count_tokens(contents=contents_to_count)
self.assertEqual(token_count, expected_tokens)
# Verify that NO subprocess was started for counting
mock_popen.assert_not_called()
@patch('subprocess.Popen')
def test_send_with_safety_settings_no_flags_added(self, mock_popen):
"""
Test that the send method does NOT add --safety flags when safety_settings are provided,
as this functionality is no longer supported via CLI flags.
"""
process_mock = MagicMock()
mock_stdout_content = json.dumps({"type": "result", "usage": {}}) + "\n"
process_mock.communicate.return_value = (mock_stdout_content, "")
process_mock.returncode = 0
mock_popen.return_value = process_mock
message_content = "User's prompt here."
safety_settings = [
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_ONLY_HIGH"},
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}
]
self.adapter.send(message=message_content, safety_settings=safety_settings)
args, kwargs = mock_popen.call_args
command = args[0]
# Verify that no --safety flags were added to the command
self.assertNotIn("--safety", command)
# Verify that the message was passed correctly via stdin
process_mock.communicate.assert_called_once_with(input=message_content)
@patch('subprocess.Popen')
def test_send_without_safety_settings_no_flags(self, mock_popen):
"""
Test that when safety_settings is None or an empty list, no --safety flags are added.
"""
process_mock = MagicMock()
mock_stdout_content = json.dumps({"type": "result", "usage": {}}) + "\n"
process_mock.communicate.return_value = (mock_stdout_content, "")
process_mock.returncode = 0
mock_popen.return_value = process_mock
message_content = "Another prompt."
self.adapter.send(message=message_content, safety_settings=None)
args_none, _ = mock_popen.call_args
self.assertNotIn("--safety", args_none[0])
mock_popen.reset_mock()
self.adapter.send(message=message_content, safety_settings=[])
args_empty, _ = mock_popen.call_args
self.assertNotIn("--safety", args_empty[0])
@patch('subprocess.Popen')
def test_send_with_system_instruction_prepended_to_stdin(self, mock_popen):
"""
Test that the send method prepends the system instruction to the prompt
sent via stdin, and does NOT add a --system flag to the command.
"""
process_mock = MagicMock()
mock_stdout_content = json.dumps({"type": "result", "usage": {}}) + "\n"
process_mock.communicate.return_value = (mock_stdout_content, "")
process_mock.returncode = 0
mock_popen.return_value = process_mock
message_content = "User's prompt here."
system_instruction_text = "Some instruction"
expected_input = f"{system_instruction_text}\n\n{message_content}"
self.adapter.send(message=message_content, system_instruction=system_instruction_text)
args, kwargs = mock_popen.call_args
command = args[0]
# Verify that the system instruction was prepended to the input sent to communicate
process_mock.communicate.assert_called_once_with(input=expected_input)
# Verify that no --system flag was added to the command
self.assertNotIn("--system", command)
@patch('subprocess.Popen')
def test_send_with_model_parameter(self, mock_popen):
"""
Test that the send method correctly adds the -m <model> flag when a model is specified.
"""
process_mock = MagicMock()
mock_stdout_content = json.dumps({"type": "result", "usage": {}}) + "\n"
process_mock.communicate.return_value = (mock_stdout_content, "")
process_mock.returncode = 0
mock_popen.return_value = process_mock
message_content = "User's prompt here."
model_name = "gemini-1.5-flash"
expected_command_part = f'-m "{model_name}"'
self.adapter.send(message=message_content, model=model_name)
args, kwargs = mock_popen.call_args
command = args[0]
# Verify that the -m <model> flag was added to the command
self.assertIn(expected_command_part, command)
# Verify that the message was passed correctly via stdin
process_mock.communicate.assert_called_once_with(input=message_content)
@patch('subprocess.Popen')
def test_send_kills_process_on_communicate_exception(self, mock_popen):
"""
Test that if subprocess.Popen().communicate() raises an exception,
GeminiCliAdapter.send() kills the process and re-raises the exception.
"""
mock_process = MagicMock()
mock_popen.return_value = mock_process
# Define an exception to simulate
simulated_exception = RuntimeError("Simulated communicate error")
mock_process.communicate.side_effect = simulated_exception
message_content = "User message"
# Assert that the exception is raised and process is killed
with self.assertRaises(RuntimeError) as cm:
self.adapter.send(message=message_content)
# Verify that the process's kill method was called
mock_process.kill.assert_called_once()
# Verify that the correct exception was re-raised
self.assertIs(cm.exception, simulated_exception)
if __name__ == '__main__':
unittest.main()

View File

@@ -8,6 +8,7 @@ from api_hook_client import ApiHookClient
def test_gemini_cli_full_integration(live_gui): def test_gemini_cli_full_integration(live_gui):
""" """
Integration test for the Gemini CLI provider and tool bridge. Integration test for the Gemini CLI provider and tool bridge.
Handles 'ask_received' events from the bridge and any other approval requests.
""" """
client = ApiHookClient("http://127.0.0.1:8999") client = ApiHookClient("http://127.0.0.1:8999")
@@ -18,20 +19,18 @@ def test_gemini_cli_full_integration(live_gui):
client.select_list_item("proj_files", "manual_slop") client.select_list_item("proj_files", "manual_slop")
# 1. Setup paths and configure the GUI # 1. Setup paths and configure the GUI
# Use the real gemini CLI if available, otherwise use mock
# For CI/testing we prefer mock
mock_script = os.path.abspath("tests/mock_gemini_cli.py") mock_script = os.path.abspath("tests/mock_gemini_cli.py")
# Wrap in quotes for shell execution if path has spaces
cli_cmd = f'"{sys.executable}" "{mock_script}"' cli_cmd = f'"{sys.executable}" "{mock_script}"'
# Set provider and binary path via GUI hooks
# Note: Using set_value which now triggers the property setter in gui_2.py
print(f"[TEST] Setting current_provider to gemini_cli") print(f"[TEST] Setting current_provider to gemini_cli")
client.set_value("current_provider", "gemini_cli") client.set_value("current_provider", "gemini_cli")
print(f"[TEST] Setting gcli_path to {cli_cmd}") print(f"[TEST] Setting gcli_path to {cli_cmd}")
client.set_value("gcli_path", cli_cmd) client.set_value("gcli_path", cli_cmd)
# Verify settings were applied # Verify settings
assert client.get_value("current_provider") == "gemini_cli" assert client.get_value("current_provider") == "gemini_cli"
assert client.get_value("gcli_path") == cli_cmd
# Clear events # Clear events
client.get_events() client.get_events()
@@ -41,55 +40,48 @@ def test_gemini_cli_full_integration(live_gui):
client.set_value("ai_input", "Please read test.txt") client.set_value("ai_input", "Please read test.txt")
client.click("btn_gen_send") client.click("btn_gen_send")
# 3. Monitor for the 'ask_received' event # 3. Monitor for approval events
print("[TEST] Waiting for ask_received event...") print("[TEST] Waiting for approval events...")
request_id = None timeout = 45
timeout = 30
start_time = time.time() start_time = time.time()
approved_count = 0
while time.time() - start_time < timeout: while time.time() - start_time < timeout:
events = client.get_events() events = client.get_events()
if events: if events:
print(f"[TEST] Received {len(events)} events: {[e.get('type') for e in events]}") for ev in events:
for ev in events: etype = ev.get("type")
if ev.get("type") == "ask_received": eid = ev.get("request_id") or ev.get("action_id")
request_id = ev.get("request_id") print(f"[TEST] Received event: {etype} (ID: {eid})")
print(f"[TEST] Found request_id: {request_id}")
break
if request_id:
break
time.sleep(0.5)
assert request_id is not None, "Timed out waiting for 'ask_received' event from the bridge" if etype in ["ask_received", "glob_approval_required", "script_confirmation_required"]:
print(f"[TEST] Approving {etype} {eid}")
if etype == "script_confirmation_required":
resp = requests.post(f"http://127.0.0.1:8999/api/confirm/{eid}", json={"approved": True})
else:
resp = requests.post("http://127.0.0.1:8999/api/ask/respond",
json={"request_id": eid, "response": {"approved": True}})
assert resp.status_code == 200
approved_count += 1
# 4. Respond to the permission request # Check if we got a final response in history
print("[TEST] Responding to ask with approval")
resp = requests.post(
"http://127.0.0.1:8999/api/ask/respond",
json={
"request_id": request_id,
"response": {"approved": True}
}
)
assert resp.status_code == 200
# 5. Verify that the final response is displayed in the GUI
print("[TEST] Waiting for final message in history...")
final_message_received = False
start_time = time.time()
while time.time() - start_time < timeout:
session = client.get_session() session = client.get_session()
entries = session.get("session", {}).get("entries", []) entries = session.get("session", {}).get("entries", [])
found_final = False
for entry in entries: for entry in entries:
content = entry.get("content", "") content = entry.get("content", "")
if "Hello from mock!" in content: if "Hello from mock!" in content or "processed the tool results" in content:
print(f"[TEST] Success! Found message: {content[:50]}...") print(f"[TEST] Success! Found final message in history.")
final_message_received = True found_final = True
break break
if final_message_received:
if found_final:
break break
time.sleep(1.0) time.sleep(1.0)
assert final_message_received, "Final message from mock CLI was not found in the GUI history" assert approved_count > 0, "No approval events were processed"
assert found_final, "Final message from mock CLI was not found in the GUI history"
def test_gemini_cli_rejection_and_history(live_gui): def test_gemini_cli_rejection_and_history(live_gui):
""" """
@@ -97,88 +89,53 @@ def test_gemini_cli_rejection_and_history(live_gui):
""" """
client = ApiHookClient("http://127.0.0.1:8999") client = ApiHookClient("http://127.0.0.1:8999")
# 0. Reset session and enable history # 0. Reset session
client.click("btn_reset") client.click("btn_reset")
client.set_value("auto_add_history", True) client.set_value("auto_add_history", True)
# Switch to manual_slop project explicitly
client.select_list_item("proj_files", "manual_slop") client.select_list_item("proj_files", "manual_slop")
# 1. Setup paths and configure the GUI
mock_script = os.path.abspath("tests/mock_gemini_cli.py") mock_script = os.path.abspath("tests/mock_gemini_cli.py")
cli_cmd = f'"{sys.executable}" "{mock_script}"' cli_cmd = f'"{sys.executable}" "{mock_script}"'
client.set_value("current_provider", "gemini_cli") client.set_value("current_provider", "gemini_cli")
client.set_value("gcli_path", cli_cmd) client.set_value("gcli_path", cli_cmd)
# 2. Trigger a message that will be denied # 2. Trigger a message
print("[TEST] Sending user message (to be denied)...") print("[TEST] Sending user message (to be denied)...")
client.set_value("ai_input", "Deny me") client.set_value("ai_input", "Deny me")
client.click("btn_gen_send") client.click("btn_gen_send")
# 3. Wait for 'ask_received' and respond with rejection # 3. Wait for event and reject
request_id = None timeout = 20
timeout = 15
start_time = time.time() start_time = time.time()
denied = False
while time.time() - start_time < timeout: while time.time() - start_time < timeout:
for ev in client.get_events(): for ev in client.get_events():
if ev.get("type") == "ask_received": etype = ev.get("type")
request_id = ev.get("request_id") eid = ev.get("request_id")
print(f"[TEST] Received event: {etype}")
if etype == "ask_received":
print(f"[TEST] Denying request {eid}")
requests.post("http://127.0.0.1:8999/api/ask/respond",
json={"request_id": eid, "response": {"approved": False}})
denied = True
break break
if request_id: break if denied: break
time.sleep(0.5) time.sleep(0.5)
assert request_id is not None assert denied, "No ask_received event to deny"
print("[TEST] Responding to ask with REJECTION") # 4. Verify rejection in history
requests.post("http://127.0.0.1:8999/api/ask/respond", print("[TEST] Waiting for rejection in history...")
json={"request_id": request_id, "response": {"approved": False}})
# 4. Verify rejection message in history
print("[TEST] Waiting for rejection message in history...")
rejection_found = False rejection_found = False
start_time = time.time() start_time = time.time()
while time.time() - start_time < timeout: while time.time() - start_time < 20:
session = client.get_session() session = client.get_session()
entries = session.get("session", {}).get("entries", []) entries = session.get("session", {}).get("entries", [])
for entry in entries: for entry in entries:
if "Tool execution was denied. Decision: deny" in entry.get("content", ""): if "Tool execution was denied" in entry.get("content", ""):
rejection_found = True rejection_found = True
break break
if rejection_found: break if rejection_found: break
time.sleep(1.0) time.sleep(1.0)
assert rejection_found, "Rejection message not found in history" assert rejection_found, "Rejection message not found in history"
# 5. Send a follow-up message and verify history grows
print("[TEST] Sending follow-up message...")
client.set_value("ai_input", "What happened?")
client.click("btn_gen_send")
# Wait for mock to finish (polling history)
print("[TEST] Waiting for final history entry (max 30s)...")
final_message_received = False
start_time = time.time()
while time.time() - start_time < 30:
session = client.get_session()
entries = session.get("session", {}).get("entries", [])
if len(entries) >= 3:
final_message_received = True
break
# Print snapshot for debug
if int(time.time() - start_time) % 5 == 0:
print(f"[TEST] History length at {int(time.time() - start_time)}s: {len(entries)}")
time.sleep(1.0)
session = client.get_session()
entries = session.get("session", {}).get("entries", [])
# Should have:
# 1. User: Deny me
# 2. AI: Tool execution was denied...
# 3. User: What happened?
# 4. AI or System: ...
print(f"[TEST] Final history length: {len(entries)}")
for i, entry in enumerate(entries):
print(f" {i}: {entry.get('role')} - {entry.get('content')[:30]}...")
assert len(entries) >= 3

View File

@@ -0,0 +1,52 @@
import pytest
from unittest.mock import patch, MagicMock
import sys
import os
# Add project root to sys.path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
import ai_client
@pytest.fixture(autouse=True)
def setup_ai_client():
ai_client.reset_session()
ai_client.set_provider("gemini_cli", "gemini-2.5-flash")
ai_client.confirm_and_run_callback = lambda script, base_dir: "Mocked execution"
ai_client.comms_log_callback = lambda entry: None
ai_client.tool_log_callback = lambda script, result: None
yield
@patch('ai_client.GeminiCliAdapter')
@patch('ai_client._get_combined_system_prompt')
def test_send_invokes_adapter_send(mock_prompt, mock_adapter_class):
mock_prompt.return_value = "Mocked Prompt"
mock_instance = mock_adapter_class.return_value
mock_instance.send.return_value = {"text": "Done", "tool_calls": []}
mock_instance.last_usage = {"input_tokens": 10}
mock_instance.last_latency = 0.1
mock_instance.session_id = None
ai_client.send("context", "message", discussion_history="hist")
expected_payload = "[DISCUSSION HISTORY]\n\nhist\n\n---\n\nmessage"
assert mock_instance.send.called
args, kwargs = mock_instance.send.call_args
assert args[0] == expected_payload
assert kwargs['system_instruction'] == "Mocked Prompt\n\n<context>\ncontext\n</context>"
@patch('ai_client.GeminiCliAdapter')
def test_get_history_bleed_stats(mock_adapter_class):
mock_instance = mock_adapter_class.return_value
mock_instance.send.return_value = {"text": "txt", "tool_calls": []}
mock_instance.last_usage = {"input_tokens": 1500}
mock_instance.last_latency = 0.5
mock_instance.session_id = "sess"
# Initialize by sending a message
ai_client.send("context", "msg")
stats = ai_client.get_history_bleed_stats()
assert stats["provider"] == "gemini_cli"
assert stats["current"] == 1500