137 lines
4.8 KiB
Python
137 lines
4.8 KiB
Python
import subprocess
|
|
import json
|
|
import sys
|
|
import time
|
|
import os
|
|
import session_logger # Import session_logger
|
|
|
|
class GeminiCliAdapter:
|
|
def __init__(self, binary_path="gemini"):
|
|
self.binary_path = binary_path
|
|
self.last_usage = None
|
|
self.session_id = None
|
|
self.last_latency = 0.0
|
|
|
|
def count_tokens(self, contents: list[str]) -> int:
|
|
"""
|
|
Counts the tokens for a list of string contents using a character-based estimation.
|
|
Approximates tokens by assuming 4 characters per token.
|
|
This replaces the broken 'gemini count' CLI call.
|
|
"""
|
|
input_text = "\n".join(contents)
|
|
total_chars = len(input_text)
|
|
estimated_tokens = total_chars // 4
|
|
return estimated_tokens
|
|
|
|
def send(self, message, safety_settings=None, system_instruction=None, model: str = None):
|
|
"""
|
|
Sends a message to the Gemini CLI and processes the streaming JSON output.
|
|
Logs the CLI call details using session_logger.log_cli_call.
|
|
System instruction is prepended to the message.
|
|
Uses --prompt flag with a placeholder and sends the content via stdin.
|
|
"""
|
|
start_time = time.time()
|
|
|
|
command_parts = [self.binary_path]
|
|
|
|
if model:
|
|
command_parts.extend(['-m', f'"{model}"'])
|
|
|
|
# Use an empty string placeholder.
|
|
command_parts.extend(['--prompt', '""'])
|
|
|
|
if self.session_id:
|
|
command_parts.extend(['--resume', self.session_id])
|
|
|
|
command_parts.extend(['--output-format', 'stream-json'])
|
|
|
|
command = " ".join(command_parts)
|
|
|
|
# Construct the prompt text by prepending system_instruction if available
|
|
prompt_text = message
|
|
if system_instruction:
|
|
prompt_text = f"{system_instruction}\n\n{message}"
|
|
|
|
accumulated_text = ""
|
|
tool_calls = []
|
|
|
|
env = os.environ.copy()
|
|
env["GEMINI_CLI_HOOK_CONTEXT"] = "manual_slop"
|
|
|
|
process = None
|
|
stdout_content = ""
|
|
stderr_content = ""
|
|
stdin_content = prompt_text
|
|
|
|
try:
|
|
process = subprocess.Popen(
|
|
command,
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
shell=True,
|
|
env=env
|
|
)
|
|
|
|
stdout_output, stderr_output = process.communicate(input=prompt_text)
|
|
|
|
stdout_content = stdout_output
|
|
stderr_content = stderr_output
|
|
|
|
for line in stdout_content.splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
data = json.loads(line)
|
|
msg_type = data.get("type")
|
|
|
|
if msg_type == "init":
|
|
if "session_id" in data:
|
|
self.session_id = data.get("session_id")
|
|
elif msg_type == "message":
|
|
content = data.get("content", data.get("text"))
|
|
if content:
|
|
accumulated_text += content
|
|
elif msg_type == "result":
|
|
self.last_usage = data.get("stats") or data.get("usage")
|
|
if "session_id" in data:
|
|
self.session_id = data.get("session_id")
|
|
elif msg_type == "tool_use":
|
|
# Standardize format for ai_client.py
|
|
# Real CLI might use 'tool_name'/'tool_id'/'parameters'
|
|
# or 'name'/'id'/'args'. We'll map to 'name'/'id'/'args'.
|
|
tc = {
|
|
"name": data.get("tool_name", data.get("name")),
|
|
"args": data.get("parameters", data.get("args", {})),
|
|
"id": data.get("tool_id", data.get("id"))
|
|
}
|
|
if tc["name"]:
|
|
tool_calls.append(tc)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
except Exception as e:
|
|
if process:
|
|
process.kill()
|
|
raise e
|
|
finally:
|
|
current_latency = time.time() - start_time
|
|
if process:
|
|
session_logger.open_session()
|
|
session_logger.log_cli_call(
|
|
command=command,
|
|
stdin_content=stdin_content,
|
|
stdout_content=stdout_content,
|
|
stderr_content=stderr_content,
|
|
latency=current_latency
|
|
)
|
|
self.last_latency = current_latency
|
|
|
|
return {
|
|
"text": accumulated_text,
|
|
"tool_calls": tool_calls,
|
|
"stderr": stderr_content
|
|
}
|