import subprocess import json import sys import time import os import session_logger # Import session_logger class GeminiCliAdapter: def __init__(self, binary_path: str = "gemini") -> None: self.binary_path = binary_path self.last_usage = None self.session_id = None self.last_latency = 0.0 def count_tokens(self, contents: list[str]) -> int: """ Counts the tokens for a list of string contents using a character-based estimation. Approximates tokens by assuming 4 characters per token. This replaces the broken 'gemini count' CLI call. """ input_text = "\n".join(contents) total_chars = len(input_text) estimated_tokens = total_chars // 4 return estimated_tokens def send(self, message: str, safety_settings: list | None = None, system_instruction: str | None = None, model: str | None = None) -> str: """ Sends a message to the Gemini CLI and processes the streaming JSON output. Logs the CLI call details using session_logger.log_cli_call. System instruction is prepended to the message. Uses --prompt flag with a placeholder and sends the content via stdin. """ start_time = time.time() command_parts = [self.binary_path] if model: command_parts.extend(['-m', f'"{model}"']) # Use an empty string placeholder. command_parts.extend(['--prompt', '""']) if self.session_id: command_parts.extend(['--resume', self.session_id]) command_parts.extend(['--output-format', 'stream-json']) command = " ".join(command_parts) # Construct the prompt text by prepending system_instruction if available prompt_text = message if system_instruction: prompt_text = f"{system_instruction}\n\n{message}" accumulated_text = "" tool_calls = [] env = os.environ.copy() env["GEMINI_CLI_HOOK_CONTEXT"] = "manual_slop" process = None stdout_content = "" stderr_content = "" stdin_content = prompt_text try: process = subprocess.Popen( command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, shell=True, env=env ) stdout_output, stderr_output = process.communicate(input=prompt_text) stdout_content = stdout_output stderr_content = stderr_output for line in stdout_content.splitlines(): line = line.strip() if not line: continue try: data = json.loads(line) msg_type = data.get("type") if msg_type == "init": if "session_id" in data: self.session_id = data.get("session_id") elif msg_type == "message": # CRITICAL: Only accumulate content from the assistant/model role. # The CLI echoes back the 'user' prompt in the stream, which we must skip. role = data.get("role", "") if role in ["assistant", "model"]: content = data.get("content", data.get("text")) if content: accumulated_text += content elif msg_type == "result": self.last_usage = data.get("stats") or data.get("usage") if "session_id" in data: self.session_id = data.get("session_id") elif msg_type == "tool_use": # Standardize format for ai_client.py # Real CLI might use 'tool_name'/'tool_id'/'parameters' # or 'name'/'id'/'args'. We'll map to 'name'/'id'/'args'. tc = { "name": data.get("tool_name", data.get("name")), "args": data.get("parameters", data.get("args", {})), "id": data.get("tool_id", data.get("id")) } if tc["name"]: tool_calls.append(tc) except json.JSONDecodeError: continue except Exception as e: if process: process.kill() raise e finally: current_latency = time.time() - start_time if process: session_logger.open_session() session_logger.log_cli_call( command=command, stdin_content=stdin_content, stdout_content=stdout_content, stderr_content=stderr_content, latency=current_latency ) self.last_latency = current_latency return { "text": accumulated_text, "tool_calls": tool_calls, "stderr": stderr_content }