import subprocess import json import os import time import session_logger from typing import Optional, Callable, Any class GeminiCliAdapter: """ Adapter for the Gemini CLI that parses streaming JSON output. """ def __init__(self, binary_path: str = "gemini"): self.binary_path = binary_path self.session_id: Optional[str] = None self.last_usage: Optional[dict] = None self.last_latency: float = 0.0 def send(self, message: str, safety_settings: list | None = None, system_instruction: str | None = None, model: str | None = None, stream_callback: Optional[Callable[[str], None]] = None) -> dict[str, Any]: """ Sends a message to the Gemini CLI and processes the streaming JSON output. Uses non-blocking line-by-line reading to allow stream_callback. """ start_time = time.time() command_parts = [self.binary_path] if model: command_parts.extend(['-m', f'"{model}"']) command_parts.extend(['--prompt', '""']) if self.session_id: command_parts.extend(['--resume', self.session_id]) command_parts.extend(['--output-format', 'stream-json']) command = " ".join(command_parts) prompt_text = message if system_instruction: prompt_text = f"{system_instruction}\n\n{message}" accumulated_text = "" tool_calls = [] stdout_content = [] env = os.environ.copy() env["GEMINI_CLI_HOOK_CONTEXT"] = "manual_slop" import shlex # shlex.split handles quotes correctly even on Windows if we are careful. # We want to split the entire binary_path into its components. if os.name == 'nt': # On Windows, shlex.split with default posix=True might swallow backslashes. # Using posix=False is better for Windows paths. cmd_list = shlex.split(self.binary_path, posix=False) else: cmd_list = shlex.split(self.binary_path) if model: cmd_list.extend(['-m', model]) cmd_list.extend(['--prompt', '""']) if self.session_id: cmd_list.extend(['--resume', self.session_id]) cmd_list.extend(['--output-format', 'stream-json']) # Filter out empty strings and strip quotes (Popen doesn't want them in cmd_list elements) cmd_list = [c.strip('"') for c in cmd_list if c] process = subprocess.Popen( cmd_list, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding="utf-8", shell=False, env=env ) # Use communicate to avoid pipe deadlocks with large input/output. # This blocks until the process exits, so we lose real-time streaming, # but it's much more robust. We then simulate streaming by processing the output. stdout_final, stderr_final = process.communicate(input=prompt_text) for line in stdout_final.splitlines(): line = line.strip() if not line: continue stdout_content.append(line) try: data = json.loads(line) msg_type = data.get("type") if msg_type == "init": if "session_id" in data: self.session_id = data.get("session_id") elif msg_type == "message" or msg_type == "chunk": role = data.get("role", "") if role in ["assistant", "model"] or not role: content = data.get("content", data.get("text")) if content: accumulated_text += content if stream_callback: stream_callback(content) elif msg_type == "result": self.last_usage = data.get("stats") or data.get("usage") if "session_id" in data: self.session_id = data.get("session_id") elif msg_type == "tool_use": tc = { "name": data.get("tool_name", data.get("name")), "args": data.get("parameters", data.get("args", {})), "id": data.get("tool_id", data.get("id")) } if tc["name"]: tool_calls.append(tc) except json.JSONDecodeError: continue current_latency = time.time() - start_time session_logger.open_session() session_logger.log_cli_call( command=command, stdin_content=prompt_text, stdout_content="\n".join(stdout_content), stderr_content=stderr_final, latency=current_latency ) self.last_latency = current_latency return { "text": accumulated_text, "tool_calls": tool_calls, "stderr": stderr_final } def count_tokens(self, contents: list[str]) -> int: """ Provides a character-based token estimation for the Gemini CLI. Uses 4 chars/token as a conservative average. """ total_chars = len("\n".join(contents)) return total_chars // 4