import subprocess import json import os import time import session_logger from typing import Optional, Callable, Any class GeminiCliAdapter: """ Adapter for the Gemini CLI that parses streaming JSON output. """ def __init__(self, binary_path: str = "gemini"): self.binary_path = binary_path self.session_id: Optional[str] = None self.last_usage: Optional[dict] = None self.last_latency: float = 0.0 def send(self, message: str, safety_settings: list | None = None, system_instruction: str | None = None, model: str | None = None, stream_callback: Optional[Callable[[str], None]] = None) -> dict[str, Any]: """ Sends a message to the Gemini CLI and processes the streaming JSON output. Uses non-blocking line-by-line reading to allow stream_callback. """ start_time = time.time() command_parts = [self.binary_path] if model: command_parts.extend(['-m', f'"{model}"']) command_parts.extend(['--prompt', '""']) if self.session_id: command_parts.extend(['--resume', self.session_id]) command_parts.extend(['--output-format', 'stream-json']) command = " ".join(command_parts) prompt_text = message if system_instruction: prompt_text = f"{system_instruction}\n\n{message}" accumulated_text = "" tool_calls = [] stdout_content = [] stderr_content = [] env = os.environ.copy() env["GEMINI_CLI_HOOK_CONTEXT"] = "manual_slop" process = subprocess.Popen( command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, shell=True, env=env, bufsize=1 # Line buffered ) # Use a thread or just communicate if we don't need real-time for stdin. # But we must read stdout line by line to avoid blocking the main thread # if this were called from the main thread (though it's usually in a background thread). # The issue is that process.communicate blocks until the process exits. # We want to process JSON lines as they arrive. import threading def write_stdin(): try: process.stdin.write(prompt_text) process.stdin.close() except: pass stdin_thread = threading.Thread(target=write_stdin, daemon=True) stdin_thread.start() # Read stdout line by line while True: line = process.stdout.readline() if not line and process.poll() is not None: break if not line: continue line = line.strip() stdout_content.append(line) try: data = json.loads(line) msg_type = data.get("type") if msg_type == "init": if "session_id" in data: self.session_id = data.get("session_id") elif msg_type == "message" or msg_type == "chunk": role = data.get("role", "") if role in ["assistant", "model"] or not role: content = data.get("content", data.get("text")) if content: accumulated_text += content if stream_callback: stream_callback(content) elif msg_type == "result": self.last_usage = data.get("stats") or data.get("usage") if "session_id" in data: self.session_id = data.get("session_id") elif msg_type == "tool_use": tc = { "name": data.get("tool_name", data.get("name")), "args": data.get("parameters", data.get("args", {})), "id": data.get("tool_id", data.get("id")) } if tc["name"]: tool_calls.append(tc) except json.JSONDecodeError: continue # Read remaining stderr stderr_final = process.stderr.read() process.wait() current_latency = time.time() - start_time session_logger.open_session() session_logger.log_cli_call( command=command, stdin_content=prompt_text, stdout_content="\n".join(stdout_content), stderr_content=stderr_final, latency=current_latency ) self.last_latency = current_latency return { "text": accumulated_text, "tool_calls": tool_calls, "stderr": stderr_final } def count_tokens(self, contents: list[str]) -> int: """ Provides a character-based token estimation for the Gemini CLI. Uses 4 chars/token as a conservative average. """ total_chars = len("\n".join(contents)) return total_chars // 4