From a70680b2a269dd96f4d964d9b6b9f231b4af0488 Mon Sep 17 00:00:00 2001 From: Ed_ Date: Thu, 26 Feb 2026 00:31:33 -0500 Subject: [PATCH] checkpoint: Working on getting gemini cli to actually have parity with gemini api. --- ai_client.py | 62 ++++++-- config.toml | 4 +- gemini_cli_adapter.py | 137 ++++++++++------ reproduce_no_text.py | 28 ++++ scripts/cli_tool_bridge.py | 131 ++++++++++++--- session_logger.py | 48 ++++-- tests/mock_gemini_cli.py | 84 +++++----- tests/temp_project.toml | 2 +- tests/temp_project_history.toml | 7 +- tests/test_ai_client_list_models.py | 17 ++ tests/test_cli_tool_bridge_mapping.py | 53 +++++++ tests/test_gemini_cli_adapter_parity.py | 175 +++++++++++++++++++++ tests/test_gemini_cli_integration.py | 153 +++++++----------- tests/test_gemini_cli_parity_regression.py | 52 ++++++ 14 files changed, 710 insertions(+), 243 deletions(-) create mode 100644 reproduce_no_text.py create mode 100644 tests/test_ai_client_list_models.py create mode 100644 tests/test_cli_tool_bridge_mapping.py create mode 100644 tests/test_gemini_cli_adapter_parity.py create mode 100644 tests/test_gemini_cli_parity_regression.py diff --git a/ai_client.py b/ai_client.py index a68e100..7574e61 100644 --- a/ai_client.py +++ b/ai_client.py @@ -13,6 +13,7 @@ during chat creation to avoid massive history bloat. # ai_client.py import tomllib import json +import sys import time import datetime import hashlib @@ -267,7 +268,16 @@ def _classify_deepseek_error(exc: Exception) -> ProviderError: def set_provider(provider: str, model: str): global _provider, _model _provider = provider - _model = model + + if provider == "gemini_cli": + valid_models = _list_gemini_cli_models() + # If model is invalid or belongs to another provider (like deepseek), force default + if model not in valid_models or model.startswith("deepseek"): + _model = "gemini-3-flash-preview" + else: + _model = model + else: + _model = model @@ -298,6 +308,7 @@ def reset_session(): _gemini_cache_created_at = None if _gemini_cli_adapter: _gemini_cli_adapter.session_id = None + _gemini_cli_adapter = None _anthropic_client = None with _anthropic_history_lock: _anthropic_history = [] @@ -336,9 +347,26 @@ def list_models(provider: str) -> list[str]: return _list_anthropic_models() elif provider == "deepseek": return _list_deepseek_models(creds["deepseek"]["api_key"]) + elif provider == "gemini_cli": + return _list_gemini_cli_models() return [] +def _list_gemini_cli_models() -> list[str]: + """ + List available Gemini models for the CLI. + Since the CLI doesn't have a direct 'list models' command yet, + we return a curated list of supported models based on CLI metadata. + """ + return [ + "gemini-3-flash-preview", + "gemini-3.1-pro-preview", + "gemini-2.5-pro", + "gemini-2.5-flash", + "gemini-2.5-flash-lite", + ] + + def _list_gemini_models(api_key: str) -> list[str]: try: @@ -844,33 +872,45 @@ def _send_gemini_cli(md_content: str, user_message: str, base_dir: str, try: if _gemini_cli_adapter is None: _gemini_cli_adapter = GeminiCliAdapter(binary_path="gemini") + + adapter = _gemini_cli_adapter mcp_client.configure(file_items or [], [base_dir]) - # If it's a new session (session_id is None), we should ideally send the context. + # Construct the system instruction, combining the base system prompt and the current context. + sys_instr = f"{_get_combined_system_prompt()}\n\n\n{md_content}\n" + safety_settings = [{'category': 'HARM_CATEGORY_DANGEROUS_CONTENT', 'threshold': 'BLOCK_ONLY_HIGH'}] + + # Initial payload for the first message payload = user_message - if _gemini_cli_adapter.session_id is None: - # Prepend context and discussion history to the first message - full_prompt = f"{_get_combined_system_prompt()}\n\n\n{md_content}\n\n\n" + if adapter.session_id is None: if discussion_history: - full_prompt += f"[DISCUSSION HISTORY]\n\n{discussion_history}\n\n---\n\n" - full_prompt += user_message - payload = full_prompt + payload = f"[DISCUSSION HISTORY]\n\n{discussion_history}\n\n---\n\n{user_message}" all_text = [] _cumulative_tool_bytes = 0 for r_idx in range(MAX_TOOL_ROUNDS + 2): + if adapter is None: + break + events.emit("request_start", payload={"provider": "gemini_cli", "model": _model, "round": r_idx}) _append_comms("OUT", "request", {"message": f"[CLI] [round {r_idx}] [msg {len(payload)}]"}) - resp_data = _gemini_cli_adapter.send(payload) + resp_data = adapter.send(payload, safety_settings=safety_settings, system_instruction=sys_instr, model=_model) + + # Log any stderr from the CLI for transparency + cli_stderr = resp_data.get("stderr", "") + if cli_stderr: + sys.stderr.write(f"\n--- Gemini CLI stderr ---\n{cli_stderr}\n-------------------------\n") + sys.stderr.flush() + txt = resp_data.get("text", "") if txt: all_text.append(txt) calls = resp_data.get("tool_calls", []) - usage = _gemini_cli_adapter.last_usage or {} - latency = _gemini_cli_adapter.last_latency + usage = adapter.last_usage or {} + latency = adapter.last_latency events.emit("response_received", payload={"provider": "gemini_cli", "model": _model, "usage": usage, "latency": latency, "round": r_idx}) diff --git a/config.toml b/config.toml index 9cbf71e..5b6b34e 100644 --- a/config.toml +++ b/config.toml @@ -1,6 +1,6 @@ [ai] -provider = "deepseek" -model = "deepseek-chat" +provider = "gemini_cli" +model = "gemini-3-flash-preview" temperature = 0.0 max_tokens = 8192 history_trunc_limit = 8000 diff --git a/gemini_cli_adapter.py b/gemini_cli_adapter.py index 9722dc7..2870656 100644 --- a/gemini_cli_adapter.py +++ b/gemini_cli_adapter.py @@ -3,6 +3,7 @@ import json import sys import time import os +import session_logger # Import session_logger class GeminiCliAdapter: def __init__(self, binary_path="gemini"): @@ -11,83 +12,125 @@ class GeminiCliAdapter: self.session_id = None self.last_latency = 0.0 - def send(self, message): + def count_tokens(self, contents: list[str]) -> int: + """ + Counts the tokens for a list of string contents using a character-based estimation. + Approximates tokens by assuming 4 characters per token. + This replaces the broken 'gemini count' CLI call. + """ + input_text = "\n".join(contents) + total_chars = len(input_text) + estimated_tokens = total_chars // 4 + return estimated_tokens + + def send(self, message, safety_settings=None, system_instruction=None, model: str = None): """ Sends a message to the Gemini CLI and processes the streaming JSON output. + Logs the CLI call details using session_logger.log_cli_call. + System instruction is prepended to the message. + Uses --prompt flag with a placeholder and sends the content via stdin. """ start_time = time.time() - # On Windows, using shell=True allows executing .cmd/.bat files and - # handles command strings with arguments more gracefully. - # We pass the message via stdin to avoid command-line length limits. - command = f'{self.binary_path} run --output-format stream-json' + + command_parts = [self.binary_path] + + if model: + command_parts.extend(['-m', f'"{model}"']) + + # Use an empty string placeholder. + command_parts.extend(['--prompt', '""']) + if self.session_id: - command += f' --resume {self.session_id}' + command_parts.extend(['--resume', self.session_id]) + + command_parts.extend(['--output-format', 'stream-json']) + + command = " ".join(command_parts) + + # Construct the prompt text by prepending system_instruction if available + prompt_text = message + if system_instruction: + prompt_text = f"{system_instruction}\n\n{message}" accumulated_text = "" tool_calls = [] - + env = os.environ.copy() env["GEMINI_CLI_HOOK_CONTEXT"] = "manual_slop" - process = subprocess.Popen( - command, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - shell=True, - env=env - ) + process = None + stdout_content = "" + stderr_content = "" + stdin_content = prompt_text try: - # Send message to stdin and close it - process.stdin.write(message) - process.stdin.close() + process = subprocess.Popen( + command, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + shell=True, + env=env + ) - # Read stdout line by line - for line in process.stdout: + stdout_output, stderr_output = process.communicate(input=prompt_text) + + stdout_content = stdout_output + stderr_content = stderr_output + + for line in stdout_content.splitlines(): line = line.strip() if not line: continue - try: data = json.loads(line) msg_type = data.get("type") - if msg_type == "message": - # Append message text to results - accumulated_text += data.get("text", "") - + if msg_type == "init": + if "session_id" in data: + self.session_id = data.get("session_id") + elif msg_type == "message": + content = data.get("content", data.get("text")) + if content: + accumulated_text += content elif msg_type == "result": - # Capture final usage and session persistence - # Support both mock ('usage') and real ('stats') keys - self.last_usage = data.get("usage") or data.get("stats") - self.session_id = data.get("session_id") - + self.last_usage = data.get("stats") or data.get("usage") + if "session_id" in data: + self.session_id = data.get("session_id") elif msg_type == "tool_use": - # Collect tool_use messages - tool_calls.append(data) - # Log status/tool_use to stderr for debugging - sys.stderr.write(f"GeminiCliAdapter [{msg_type}]: {line}\n") - sys.stderr.flush() - - elif msg_type == "status": - # Log status to stderr for debugging - sys.stderr.write(f"GeminiCliAdapter [{msg_type}]: {line}\n") - sys.stderr.flush() - + # Standardize format for ai_client.py + # Real CLI might use 'tool_name'/'tool_id'/'parameters' + # or 'name'/'id'/'args'. We'll map to 'name'/'id'/'args'. + tc = { + "name": data.get("tool_name", data.get("name")), + "args": data.get("parameters", data.get("args", {})), + "id": data.get("tool_id", data.get("id")) + } + if tc["name"]: + tool_calls.append(tc) except json.JSONDecodeError: - # Skip lines that are not valid JSON continue - process.wait() except Exception as e: - process.kill() + if process: + process.kill() raise e finally: - self.last_latency = time.time() - start_time + current_latency = time.time() - start_time + if process: + session_logger.open_session() + session_logger.log_cli_call( + command=command, + stdin_content=stdin_content, + stdout_content=stdout_content, + stderr_content=stderr_content, + latency=current_latency + ) + self.last_latency = current_latency return { "text": accumulated_text, - "tool_calls": tool_calls + "tool_calls": tool_calls, + "stderr": stderr_content } diff --git a/reproduce_no_text.py b/reproduce_no_text.py new file mode 100644 index 0000000..d07cd68 --- /dev/null +++ b/reproduce_no_text.py @@ -0,0 +1,28 @@ +import json +import subprocess +import os +import time +import sys + +# Add project root to sys.path +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "."))) + +from gemini_cli_adapter import GeminiCliAdapter + +def test_repro(): + adapter = GeminiCliAdapter(binary_path="gemini") + # Using a simple message + message = "say hello" + print(f"Sending message: '{message}'") + + result = adapter.send(message, model="gemini-3-flash-preview") + + print("\n--- Result ---") + print(f"Text: '{result.get('text')}'") + print(f"Tool Calls: {result.get('tool_calls')}") + print(f"Usage: {adapter.last_usage}") + print(f"Session ID: {adapter.session_id}") + print(f"Stderr: {result.get('stderr')}") + +if __name__ == "__main__": + test_repro() diff --git a/scripts/cli_tool_bridge.py b/scripts/cli_tool_bridge.py index 691ac72..abf0148 100644 --- a/scripts/cli_tool_bridge.py +++ b/scripts/cli_tool_bridge.py @@ -4,72 +4,151 @@ import logging import os # Add project root to sys.path so we can import api_hook_client -sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) +# This helps in cases where the script is run from different directories +project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) +if project_root not in sys.path: + sys.path.append(project_root) try: from api_hook_client import ApiHookClient except ImportError: - # Fallback for if we are running from root or other locations - sys.path.append(os.path.abspath(os.path.dirname(__file__))) - from api_hook_client import ApiHookClient + # Fallback if the script is run from the project root directly, + # or if the above path append didn't work for some reason. + try: + from api_hook_client import ApiHookClient + except ImportError: + # Use basic print for fatal errors if logging isn't set up yet + print("FATAL: Failed to import ApiHookClient. Ensure it's in the Python path.", file=sys.stderr) + sys.exit(1) # Exit if the core dependency cannot be imported + def main(): - # Setup basic logging to stderr so it doesn't interfere with stdout JSON - logging.basicConfig(level=logging.ERROR, stream=sys.stderr) - + # Setup basic logging to stderr. + # Set level to DEBUG to capture all messages, including debug info. + logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', stream=sys.stderr) + + logging.debug("CLI Tool Bridge script started.") + try: # 1. Read JSON from sys.stdin input_data = sys.stdin.read() + if not input_data: + logging.debug("No input received from stdin. Exiting gracefully.") return - hook_input = json.loads(input_data) - - # 2. Extract 'tool_name' and 'tool_input' - tool_name = hook_input.get('tool_name') - tool_args = hook_input.get('tool_input', {}) + logging.debug(f"Received raw input data: {input_data}") - # 3. Check context — if not running via Manual Slop, we pass through (allow) + try: + hook_input = json.loads(input_data) + except json.JSONDecodeError: + logging.error("Failed to decode JSON from stdin.") + print(json.dumps({ + "decision": "deny", + "reason": "Invalid JSON received from stdin." + })) + return + + # Initialize variables for tool name and arguments + tool_name = None + tool_args = {} + + # 2. Try to parse input in Gemini API format ('name', 'input') + logging.debug("Attempting to parse input in Gemini API format ('name', 'input').") + if 'name' in hook_input and hook_input['name'] is not None: + tool_name = hook_input['name'] + logging.debug(f"Found Gemini API format tool name: {tool_name}") + + if 'input' in hook_input and hook_input['input'] is not None: + if isinstance(hook_input['input'], dict): + tool_args = hook_input['input'] + logging.debug(f"Found Gemini API format tool input: {tool_args}") + else: + logging.warning("Gemini API format 'input' is not a dictionary. Ignoring.") + + # 3. If Gemini format wasn't fully present, try the legacy format ('tool_name', 'tool_input') + if tool_name is None: + logging.debug("Gemini API format not fully detected. Falling back to legacy format ('tool_name', 'tool_input').") + tool_name = hook_input.get('tool_name') + if tool_name: + logging.debug(f"Found legacy format tool name: {tool_name}") + + tool_input_legacy = hook_input.get('tool_input') + if tool_input_legacy is not None: + if isinstance(tool_input_legacy, dict): + tool_args = tool_input_legacy + logging.debug(f"Found legacy format tool input: {tool_args}") + else: + logging.warning("Legacy format 'tool_input' is not a dictionary. Ignoring.") + + # Final checks on resolved tool_name and tool_args + if tool_name is None: + logging.error("Could not determine tool name from input.") + print(json.dumps({ + "decision": "deny", + "reason": "Could not determine tool name from input. Expected 'name' or 'tool_name'." + })) + return + + if not isinstance(tool_args, dict): + logging.error(f"Resolved tool_args is not a dictionary: {tool_args}") + print(json.dumps({ + "decision": "deny", + "reason": "Resolved tool arguments are not in a valid dictionary format." + })) + return + + logging.debug(f"Resolved tool_name: '{tool_name}', tool_args: {tool_args}") + + # 4. Check context — if not running via Manual Slop, we pass through (allow) # This prevents the hook from affecting normal CLI usage. hook_context = os.environ.get("GEMINI_CLI_HOOK_CONTEXT") if hook_context != "manual_slop": + logging.debug("GEMINI_CLI_HOOK_CONTEXT not set to 'manual_slop'. Allowing execution without confirmation.") print(json.dumps({ - "decision": "allow", + "decision": "allow", "reason": "Non-programmatic usage (GEMINI_CLI_HOOK_CONTEXT not set)." })) return - # 4. Use 'ApiHookClient' (assuming GUI is on http://127.0.0.1:8999) + # 5. Use 'ApiHookClient' (assuming GUI is on http://127.0.0.1:8999) + logging.debug("GEMINI_CLI_HOOK_CONTEXT is 'manual_slop'. Proceeding with API Hook Client.") client = ApiHookClient(base_url="http://127.0.0.1:8999") - + try: - # 5. Request confirmation + # 6. Request confirmation # This is a blocking call that waits for the user in the GUI + logging.debug(f"Requesting confirmation for tool '{tool_name}' with args: {tool_args}") response = client.request_confirmation(tool_name, tool_args) if response and response.get('approved') is True: - # 6. Print 'allow' decision + # 7. Print 'allow' decision + logging.debug("User approved tool execution.") print(json.dumps({"decision": "allow"})) else: - # 7. Print 'deny' decision + # 8. Print 'deny' decision + reason = response.get('reason', 'User rejected tool execution in GUI.') if response else 'No response from GUI.' + logging.debug(f"User denied tool execution. Reason: {reason}") print(json.dumps({ - "decision": "deny", - "reason": "User rejected tool execution in GUI." + "decision": "deny", + "reason": reason })) except Exception as e: - # 8. Handle cases where hook server is not reachable + # 9. Handle cases where hook server is not reachable or other API errors # If we ARE in manual_slop context but can't reach the server, we should DENY # because the user expects to be in control. + logging.error(f"API Hook Client error: {str(e)}", exc_info=True) print(json.dumps({ - "decision": "deny", - "reason": f"Manual Slop hook server unreachable: {str(e)}" + "decision": "deny", + "reason": f"Manual Slop hook server unreachable or API error: {str(e)}" })) except Exception as e: - # Fallback for unexpected parsing errors + # Fallback for unexpected errors during initial processing (e.g., stdin read) + logging.error(f"An unexpected error occurred in the main bridge logic: {str(e)}", exc_info=True) print(json.dumps({ - "decision": "deny", + "decision": "deny", "reason": f"Internal bridge error: {str(e)}" })) diff --git a/session_logger.py b/session_logger.py index bf3c859..80fb4c4 100644 --- a/session_logger.py +++ b/session_logger.py @@ -1,16 +1,5 @@ # session_logger.py """ -Note(Gemini): -Opens timestamped log/script files at startup and keeps them open for the -lifetime of the process. - -File layout: -logs/comms_.log - every comms entry (direction/kind/payload) as JSON-L -logs/toolcalls_.log - sequential record of every tool invocation -scripts/generated/_.ps1 - each PowerShell script the AI generated -""" -# session_logger.py -""" Opens timestamped log/script files at startup and keeps them open for the lifetime of the process. The next run of the GUI creates new files; the previous run's files are simply closed when the process exits. @@ -20,6 +9,7 @@ File layout logs/ comms_.log - every comms entry (direction/kind/payload) as JSON-L toolcalls_.log - sequential record of every tool invocation + clicalls_.log - sequential record of every CLI subprocess call scripts/generated/ _.ps1 - each PowerShell script the AI generated, in order @@ -42,6 +32,7 @@ _seq_lock = threading.Lock() _comms_fh = None # file handle: logs/comms_.log _tool_fh = None # file handle: logs/toolcalls_.log _api_fh = None # file handle: logs/apihooks_.log - API hook calls +_cli_fh = None # file handle: logs/clicalls_.log - CLI subprocess calls def _now_ts() -> str: @@ -54,7 +45,7 @@ def open_session(): opens the two log files for this session. Idempotent - a second call is ignored. """ - global _ts, _comms_fh, _tool_fh, _api_fh, _seq + global _ts, _comms_fh, _tool_fh, _api_fh, _cli_fh, _seq if _comms_fh is not None: return # already open @@ -68,16 +59,19 @@ def open_session(): _comms_fh = open(_LOG_DIR / f"comms_{_ts}.log", "w", encoding="utf-8", buffering=1) _tool_fh = open(_LOG_DIR / f"toolcalls_{_ts}.log", "w", encoding="utf-8", buffering=1) _api_fh = open(_LOG_DIR / f"apihooks_{_ts}.log", "w", encoding="utf-8", buffering=1) + _cli_fh = open(_LOG_DIR / f"clicalls_{_ts}.log", "w", encoding="utf-8", buffering=1) # New log file handle _tool_fh.write(f"# Tool-call log — session {_ts}\n\n") _tool_fh.flush() + _cli_fh.write(f"# CLI Subprocess Call Log — session {_ts}\n\n") # Header for new log file + _cli_fh.flush() atexit.register(close_session) def close_session(): - """Flush and close both log files. Called on clean exit (optional).""" - global _comms_fh, _tool_fh, _api_fh + """Flush and close all log files. Called on clean exit (optional).""" + global _comms_fh, _tool_fh, _api_fh, _cli_fh if _comms_fh: _comms_fh.close() _comms_fh = None @@ -87,6 +81,9 @@ def close_session(): if _api_fh: _api_fh.close() _api_fh = None + if _cli_fh: # Close the new log file handle + _cli_fh.close() + _cli_fh = None def log_api_hook(method: str, path: str, payload: str): @@ -155,3 +152,26 @@ def log_tool_call(script: str, result: str, script_path: str | None): pass return str(ps1_path) if ps1_path else None + + +def log_cli_call(command: str, stdin_content: str | None, stdout_content: str | None, stderr_content: str | None, latency: float): + """ + Log details of a CLI subprocess execution. + """ + if _cli_fh is None: + return + + ts_entry = datetime.datetime.now().strftime("%H:%M:%S") + try: + log_data = { + "timestamp": ts_entry, + "command": command, + "stdin": stdin_content, + "stdout": stdout_content, + "stderr": stderr_content, + "latency_sec": latency + } + _cli_fh.write(json.dumps(log_data, ensure_ascii=False, default=str) + "\n") + _cli_fh.flush() + except Exception: + pass diff --git a/tests/mock_gemini_cli.py b/tests/mock_gemini_cli.py index c83863f..a58a0cb 100644 --- a/tests/mock_gemini_cli.py +++ b/tests/mock_gemini_cli.py @@ -4,93 +4,99 @@ import subprocess import os def main(): - # The GUI calls: run --output-format stream-json - # The prompt is now passed via stdin. - # Debug log to stderr sys.stderr.write(f"DEBUG: mock_gemini_cli called with args: {sys.argv}\n") - # Read prompt from stdin for debug - prompt = sys.stdin.read() + # Read prompt from stdin + try: + prompt = sys.stdin.read() + except EOFError: + prompt = "" + sys.stderr.write(f"DEBUG: Received prompt via stdin ({len(prompt)} chars)\n") sys.stderr.flush() - if "run" not in sys.argv: + # Skip management commands + if len(sys.argv) > 1 and sys.argv[1] in ["mcp", "extensions", "skills", "hooks"]: return - # If the prompt contains tool results (indicated by "role": "tool"), - # it means we are in the second round and should provide a final answer. - if '"role": "tool"' in prompt: + # If the prompt contains tool results, provide final answer + if '"role": "tool"' in prompt or '"tool_call_id"' in prompt: print(json.dumps({ "type": "message", - "text": "I have processed the tool results. Everything looks good!" + "role": "assistant", + "content": "I have processed the tool results. Everything looks good!" }), flush=True) print(json.dumps({ "type": "result", - "usage": {"total_tokens": 100}, + "status": "success", + "stats": {"total_tokens": 100, "input_tokens": 80, "output_tokens": 20}, "session_id": "mock-session-final" }), flush=True) return - # Simulate the 'BeforeTool' hook by calling the bridge directly. + # Default flow: simulate a tool call bridge_path = os.path.abspath("scripts/cli_tool_bridge.py") - - tool_call = { - "tool_name": "read_file", - "tool_input": {"path": "test.txt"} + # Using format that bridge understands + bridge_tool_call = { + "name": "read_file", + "input": {"path": "test.txt"} } sys.stderr.write(f"DEBUG: Calling bridge at {bridge_path}\n") sys.stderr.flush() - # Bridge reads from stdin - process = subprocess.Popen( - [sys.executable, bridge_path], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - env=os.environ # Ensure environment variables are inherited - ) - stdout, stderr = process.communicate(input=json.dumps(tool_call)) - - sys.stderr.write(f"DEBUG: Bridge stdout: {stdout}\n") - sys.stderr.write(f"DEBUG: Bridge stderr: {stderr}\n") - sys.stderr.flush() - try: + process = subprocess.Popen( + [sys.executable, bridge_path], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + env=os.environ + ) + stdout, stderr = process.communicate(input=json.dumps(bridge_tool_call)) + + sys.stderr.write(f"DEBUG: Bridge stdout: {stdout}\n") + sys.stderr.write(f"DEBUG: Bridge stderr: {stderr}\n") + decision_data = json.loads(stdout.strip()) decision = decision_data.get("decision") except Exception as e: - sys.stderr.write(f"DEBUG: Failed to parse bridge output: {e}\n") + sys.stderr.write(f"DEBUG: Bridge failed: {e}\n") decision = "deny" - # Output JSONL to stdout if decision == "allow": + # Simulate REAL CLI field names for adapter normalization test print(json.dumps({ "type": "tool_use", - "name": "read_file", - "args": {"path": "test.txt"} + "tool_name": "read_file", + "tool_id": "call_123", + "parameters": {"path": "test.txt"} }), flush=True) print(json.dumps({ "type": "message", - "text": "I read the file. It contains: 'Hello from mock!'" + "role": "assistant", + "content": "I am reading the file now..." }), flush=True) print(json.dumps({ "type": "result", - "usage": {"total_tokens": 50}, + "status": "success", + "stats": {"total_tokens": 50, "input_tokens": 40, "output_tokens": 10}, "session_id": "mock-session-123" }), flush=True) else: print(json.dumps({ "type": "message", - "text": f"Tool execution was denied. Decision: {decision}" + "role": "assistant", + "content": f"Tool execution was denied. Decision: {decision}" }), flush=True) print(json.dumps({ "type": "result", - "usage": {"total_tokens": 10}, + "status": "success", + "stats": {"total_tokens": 10, "input_tokens": 10, "output_tokens": 0}, "session_id": "mock-session-denied" }), flush=True) diff --git a/tests/temp_project.toml b/tests/temp_project.toml index e1c0d76..ec9b612 100644 --- a/tests/temp_project.toml +++ b/tests/temp_project.toml @@ -20,7 +20,7 @@ base_dir = "." paths = [] [gemini_cli] -binary_path = "gemini" +binary_path = "\"C:\\projects\\manual_slop\\.venv\\Scripts\\python.exe\" \"C:\\projects\\manual_slop\\tests\\mock_gemini_cli.py\"" [agent.tools] run_powershell = true diff --git a/tests/temp_project_history.toml b/tests/temp_project_history.toml index c3214c6..fd6e190 100644 --- a/tests/temp_project_history.toml +++ b/tests/temp_project_history.toml @@ -16,8 +16,5 @@ history = [ [discussions."testing deepseek"] git_commit = "" -last_updated = "2026-02-25T23:33:45" -history = [ - "@2026-02-25T23:33:21\nUser:\ntesting testing 123?", - "@2026-02-25T23:33:25\nAI:\nI see you're testing with \"testing testing 123\". I'm here and ready to help! I'm a helpful coding assistant with access to PowerShell and various tools for file operations and web access.\n\nWhat would you like me to help you with today? I can assist with:\n- Writing or editing code files\n- Searching for information\n- Reading or analyzing files\n- Running commands\n- Web searches\n- And much more!\n\nJust let me know what you need help with.", -] +last_updated = "2026-02-26T00:29:48" +history = [] diff --git a/tests/test_ai_client_list_models.py b/tests/test_ai_client_list_models.py new file mode 100644 index 0000000..30de38b --- /dev/null +++ b/tests/test_ai_client_list_models.py @@ -0,0 +1,17 @@ +import pytest +from unittest.mock import patch, MagicMock +import ai_client + +def test_list_models_gemini_cli(): + """ + Verifies that 'ai_client.list_models' correctly returns a list of models + for the 'gemini_cli' provider. + """ + models = ai_client.list_models("gemini_cli") + + assert "gemini-3.1-pro-preview" in models + assert "gemini-3-flash-preview" in models + assert "gemini-2.5-pro" in models + assert "gemini-2.5-flash" in models + assert "gemini-2.5-flash-lite" in models + assert len(models) == 5 diff --git a/tests/test_cli_tool_bridge_mapping.py b/tests/test_cli_tool_bridge_mapping.py new file mode 100644 index 0000000..991eed5 --- /dev/null +++ b/tests/test_cli_tool_bridge_mapping.py @@ -0,0 +1,53 @@ +import unittest +from unittest.mock import patch, MagicMock +import io +import json +import sys +import os + +# Add project root to sys.path +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + +# Import after path fix +from scripts.cli_tool_bridge import main + +class TestCliToolBridgeMapping(unittest.TestCase): + def setUp(self): + os.environ['GEMINI_CLI_HOOK_CONTEXT'] = 'manual_slop' + + @patch('sys.stdin', new_callable=io.StringIO) + @patch('sys.stdout', new_callable=io.StringIO) + @patch('api_hook_client.ApiHookClient.request_confirmation') + def test_mapping_from_api_format(self, mock_request, mock_stdout, mock_stdin): + """ + Verify that bridge correctly maps 'id', 'name', 'input' (Gemini API format) + into tool_name and tool_input for the hook client. + """ + api_tool_call = { + 'id': 'call123', + 'name': 'read_file', + 'input': {'path': 'test.txt'} + } + + # 1. Mock stdin with the API format JSON + mock_stdin.write(json.dumps(api_tool_call)) + mock_stdin.seek(0) + + # 2. Mock ApiHookClient to return approved + mock_request.return_value = {'approved': True} + + # Run main + main() + + # 3. Verify that request_confirmation was called with mapped values + # If it's not mapped, it will likely be called with None or fail + mock_request.assert_called_once_with('read_file', {'path': 'test.txt'}) + + # 4. Capture stdout and assert allow + output_str = mock_stdout.getvalue().strip() + self.assertTrue(output_str, "Stdout should not be empty") + output = json.loads(output_str) + self.assertEqual(output.get('decision'), 'allow') + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_gemini_cli_adapter_parity.py b/tests/test_gemini_cli_adapter_parity.py new file mode 100644 index 0000000..0488168 --- /dev/null +++ b/tests/test_gemini_cli_adapter_parity.py @@ -0,0 +1,175 @@ +import unittest +from unittest.mock import patch, MagicMock, ANY +import json +import subprocess +import io +import sys +import os + +# Ensure the project root is in sys.path to resolve imports correctly +project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) +if project_root not in sys.path: + sys.path.append(project_root) + +# Import the class to be tested +from gemini_cli_adapter import GeminiCliAdapter + +# Mock the session_logger module to prevent file operations during tests. +mock_session_logger = MagicMock() +sys.modules['session_logger'] = mock_session_logger + +class TestGeminiCliAdapterParity(unittest.TestCase): + + def setUp(self): + """Set up a fresh adapter instance and reset session state for each test.""" + self.adapter = GeminiCliAdapter(binary_path="gemini") + self.adapter.session_id = None + self.adapter.last_usage = None + self.adapter.last_latency = 0.0 + # Reset mock calls for session_logger for each test + mock_session_logger.reset_mock() + + @patch('subprocess.Popen') + def test_count_tokens_uses_estimation(self, mock_popen): + """ + Test that count_tokens uses character-based estimation. + """ + contents_to_count = ["This is the first line.", "This is the second line."] + expected_chars = len("\n".join(contents_to_count)) + expected_tokens = expected_chars // 4 + + token_count = self.adapter.count_tokens(contents=contents_to_count) + self.assertEqual(token_count, expected_tokens) + + # Verify that NO subprocess was started for counting + mock_popen.assert_not_called() + + @patch('subprocess.Popen') + def test_send_with_safety_settings_no_flags_added(self, mock_popen): + """ + Test that the send method does NOT add --safety flags when safety_settings are provided, + as this functionality is no longer supported via CLI flags. + """ + process_mock = MagicMock() + mock_stdout_content = json.dumps({"type": "result", "usage": {}}) + "\n" + process_mock.communicate.return_value = (mock_stdout_content, "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + + message_content = "User's prompt here." + safety_settings = [ + {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_ONLY_HIGH"}, + {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"} + ] + + self.adapter.send(message=message_content, safety_settings=safety_settings) + + args, kwargs = mock_popen.call_args + command = args[0] + + # Verify that no --safety flags were added to the command + self.assertNotIn("--safety", command) + # Verify that the message was passed correctly via stdin + process_mock.communicate.assert_called_once_with(input=message_content) + + @patch('subprocess.Popen') + def test_send_without_safety_settings_no_flags(self, mock_popen): + """ + Test that when safety_settings is None or an empty list, no --safety flags are added. + """ + process_mock = MagicMock() + mock_stdout_content = json.dumps({"type": "result", "usage": {}}) + "\n" + process_mock.communicate.return_value = (mock_stdout_content, "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + + message_content = "Another prompt." + + self.adapter.send(message=message_content, safety_settings=None) + args_none, _ = mock_popen.call_args + self.assertNotIn("--safety", args_none[0]) + mock_popen.reset_mock() + + self.adapter.send(message=message_content, safety_settings=[]) + args_empty, _ = mock_popen.call_args + self.assertNotIn("--safety", args_empty[0]) + + @patch('subprocess.Popen') + def test_send_with_system_instruction_prepended_to_stdin(self, mock_popen): + """ + Test that the send method prepends the system instruction to the prompt + sent via stdin, and does NOT add a --system flag to the command. + """ + process_mock = MagicMock() + mock_stdout_content = json.dumps({"type": "result", "usage": {}}) + "\n" + process_mock.communicate.return_value = (mock_stdout_content, "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + + message_content = "User's prompt here." + system_instruction_text = "Some instruction" + expected_input = f"{system_instruction_text}\n\n{message_content}" + + self.adapter.send(message=message_content, system_instruction=system_instruction_text) + + args, kwargs = mock_popen.call_args + command = args[0] + + # Verify that the system instruction was prepended to the input sent to communicate + process_mock.communicate.assert_called_once_with(input=expected_input) + + # Verify that no --system flag was added to the command + self.assertNotIn("--system", command) + + @patch('subprocess.Popen') + def test_send_with_model_parameter(self, mock_popen): + """ + Test that the send method correctly adds the -m flag when a model is specified. + """ + process_mock = MagicMock() + mock_stdout_content = json.dumps({"type": "result", "usage": {}}) + "\n" + process_mock.communicate.return_value = (mock_stdout_content, "") + process_mock.returncode = 0 + mock_popen.return_value = process_mock + + message_content = "User's prompt here." + model_name = "gemini-1.5-flash" + expected_command_part = f'-m "{model_name}"' + + self.adapter.send(message=message_content, model=model_name) + + args, kwargs = mock_popen.call_args + command = args[0] + + # Verify that the -m flag was added to the command + self.assertIn(expected_command_part, command) + # Verify that the message was passed correctly via stdin + process_mock.communicate.assert_called_once_with(input=message_content) + + @patch('subprocess.Popen') + def test_send_kills_process_on_communicate_exception(self, mock_popen): + """ + Test that if subprocess.Popen().communicate() raises an exception, + GeminiCliAdapter.send() kills the process and re-raises the exception. + """ + mock_process = MagicMock() + mock_popen.return_value = mock_process + + # Define an exception to simulate + simulated_exception = RuntimeError("Simulated communicate error") + mock_process.communicate.side_effect = simulated_exception + + message_content = "User message" + + # Assert that the exception is raised and process is killed + with self.assertRaises(RuntimeError) as cm: + self.adapter.send(message=message_content) + + # Verify that the process's kill method was called + mock_process.kill.assert_called_once() + + # Verify that the correct exception was re-raised + self.assertIs(cm.exception, simulated_exception) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_gemini_cli_integration.py b/tests/test_gemini_cli_integration.py index cabcb44..fa20be5 100644 --- a/tests/test_gemini_cli_integration.py +++ b/tests/test_gemini_cli_integration.py @@ -8,6 +8,7 @@ from api_hook_client import ApiHookClient def test_gemini_cli_full_integration(live_gui): """ Integration test for the Gemini CLI provider and tool bridge. + Handles 'ask_received' events from the bridge and any other approval requests. """ client = ApiHookClient("http://127.0.0.1:8999") @@ -18,21 +19,19 @@ def test_gemini_cli_full_integration(live_gui): client.select_list_item("proj_files", "manual_slop") # 1. Setup paths and configure the GUI + # Use the real gemini CLI if available, otherwise use mock + # For CI/testing we prefer mock mock_script = os.path.abspath("tests/mock_gemini_cli.py") - # Wrap in quotes for shell execution if path has spaces cli_cmd = f'"{sys.executable}" "{mock_script}"' - # Set provider and binary path via GUI hooks - # Note: Using set_value which now triggers the property setter in gui_2.py print(f"[TEST] Setting current_provider to gemini_cli") client.set_value("current_provider", "gemini_cli") print(f"[TEST] Setting gcli_path to {cli_cmd}") client.set_value("gcli_path", cli_cmd) - # Verify settings were applied + # Verify settings assert client.get_value("current_provider") == "gemini_cli" - assert client.get_value("gcli_path") == cli_cmd - + # Clear events client.get_events() @@ -41,55 +40,48 @@ def test_gemini_cli_full_integration(live_gui): client.set_value("ai_input", "Please read test.txt") client.click("btn_gen_send") - # 3. Monitor for the 'ask_received' event - print("[TEST] Waiting for ask_received event...") - request_id = None - timeout = 30 + # 3. Monitor for approval events + print("[TEST] Waiting for approval events...") + timeout = 45 start_time = time.time() + approved_count = 0 + while time.time() - start_time < timeout: events = client.get_events() if events: - print(f"[TEST] Received {len(events)} events: {[e.get('type') for e in events]}") - for ev in events: - if ev.get("type") == "ask_received": - request_id = ev.get("request_id") - print(f"[TEST] Found request_id: {request_id}") - break - if request_id: - break - time.sleep(0.5) - - assert request_id is not None, "Timed out waiting for 'ask_received' event from the bridge" - - # 4. Respond to the permission request - print("[TEST] Responding to ask with approval") - resp = requests.post( - "http://127.0.0.1:8999/api/ask/respond", - json={ - "request_id": request_id, - "response": {"approved": True} - } - ) - assert resp.status_code == 200 - - # 5. Verify that the final response is displayed in the GUI - print("[TEST] Waiting for final message in history...") - final_message_received = False - start_time = time.time() - while time.time() - start_time < timeout: + for ev in events: + etype = ev.get("type") + eid = ev.get("request_id") or ev.get("action_id") + print(f"[TEST] Received event: {etype} (ID: {eid})") + + if etype in ["ask_received", "glob_approval_required", "script_confirmation_required"]: + print(f"[TEST] Approving {etype} {eid}") + if etype == "script_confirmation_required": + resp = requests.post(f"http://127.0.0.1:8999/api/confirm/{eid}", json={"approved": True}) + else: + resp = requests.post("http://127.0.0.1:8999/api/ask/respond", + json={"request_id": eid, "response": {"approved": True}}) + assert resp.status_code == 200 + approved_count += 1 + + # Check if we got a final response in history session = client.get_session() entries = session.get("session", {}).get("entries", []) + found_final = False for entry in entries: content = entry.get("content", "") - if "Hello from mock!" in content: - print(f"[TEST] Success! Found message: {content[:50]}...") - final_message_received = True + if "Hello from mock!" in content or "processed the tool results" in content: + print(f"[TEST] Success! Found final message in history.") + found_final = True break - if final_message_received: + + if found_final: break + time.sleep(1.0) - assert final_message_received, "Final message from mock CLI was not found in the GUI history" + assert approved_count > 0, "No approval events were processed" + assert found_final, "Final message from mock CLI was not found in the GUI history" def test_gemini_cli_rejection_and_history(live_gui): """ @@ -97,88 +89,53 @@ def test_gemini_cli_rejection_and_history(live_gui): """ client = ApiHookClient("http://127.0.0.1:8999") - # 0. Reset session and enable history + # 0. Reset session client.click("btn_reset") client.set_value("auto_add_history", True) - # Switch to manual_slop project explicitly client.select_list_item("proj_files", "manual_slop") - # 1. Setup paths and configure the GUI mock_script = os.path.abspath("tests/mock_gemini_cli.py") cli_cmd = f'"{sys.executable}" "{mock_script}"' - client.set_value("current_provider", "gemini_cli") client.set_value("gcli_path", cli_cmd) - - # 2. Trigger a message that will be denied + + # 2. Trigger a message print("[TEST] Sending user message (to be denied)...") client.set_value("ai_input", "Deny me") client.click("btn_gen_send") - # 3. Wait for 'ask_received' and respond with rejection - request_id = None - timeout = 15 + # 3. Wait for event and reject + timeout = 20 start_time = time.time() + denied = False while time.time() - start_time < timeout: for ev in client.get_events(): - if ev.get("type") == "ask_received": - request_id = ev.get("request_id") + etype = ev.get("type") + eid = ev.get("request_id") + print(f"[TEST] Received event: {etype}") + if etype == "ask_received": + print(f"[TEST] Denying request {eid}") + requests.post("http://127.0.0.1:8999/api/ask/respond", + json={"request_id": eid, "response": {"approved": False}}) + denied = True break - if request_id: break + if denied: break time.sleep(0.5) - assert request_id is not None + assert denied, "No ask_received event to deny" - print("[TEST] Responding to ask with REJECTION") - requests.post("http://127.0.0.1:8999/api/ask/respond", - json={"request_id": request_id, "response": {"approved": False}}) - - # 4. Verify rejection message in history - print("[TEST] Waiting for rejection message in history...") + # 4. Verify rejection in history + print("[TEST] Waiting for rejection in history...") rejection_found = False start_time = time.time() - while time.time() - start_time < timeout: + while time.time() - start_time < 20: session = client.get_session() entries = session.get("session", {}).get("entries", []) for entry in entries: - if "Tool execution was denied. Decision: deny" in entry.get("content", ""): + if "Tool execution was denied" in entry.get("content", ""): rejection_found = True break if rejection_found: break time.sleep(1.0) assert rejection_found, "Rejection message not found in history" - - # 5. Send a follow-up message and verify history grows - print("[TEST] Sending follow-up message...") - client.set_value("ai_input", "What happened?") - client.click("btn_gen_send") - - # Wait for mock to finish (polling history) - print("[TEST] Waiting for final history entry (max 30s)...") - final_message_received = False - start_time = time.time() - while time.time() - start_time < 30: - session = client.get_session() - entries = session.get("session", {}).get("entries", []) - if len(entries) >= 3: - final_message_received = True - break - # Print snapshot for debug - if int(time.time() - start_time) % 5 == 0: - print(f"[TEST] History length at {int(time.time() - start_time)}s: {len(entries)}") - time.sleep(1.0) - - session = client.get_session() - entries = session.get("session", {}).get("entries", []) - # Should have: - # 1. User: Deny me - # 2. AI: Tool execution was denied... - # 3. User: What happened? - # 4. AI or System: ... - print(f"[TEST] Final history length: {len(entries)}") - for i, entry in enumerate(entries): - print(f" {i}: {entry.get('role')} - {entry.get('content')[:30]}...") - - assert len(entries) >= 3 - diff --git a/tests/test_gemini_cli_parity_regression.py b/tests/test_gemini_cli_parity_regression.py new file mode 100644 index 0000000..3cf8b03 --- /dev/null +++ b/tests/test_gemini_cli_parity_regression.py @@ -0,0 +1,52 @@ +import pytest +from unittest.mock import patch, MagicMock +import sys +import os + +# Add project root to sys.path +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + +import ai_client + +@pytest.fixture(autouse=True) +def setup_ai_client(): + ai_client.reset_session() + ai_client.set_provider("gemini_cli", "gemini-2.5-flash") + ai_client.confirm_and_run_callback = lambda script, base_dir: "Mocked execution" + ai_client.comms_log_callback = lambda entry: None + ai_client.tool_log_callback = lambda script, result: None + yield + +@patch('ai_client.GeminiCliAdapter') +@patch('ai_client._get_combined_system_prompt') +def test_send_invokes_adapter_send(mock_prompt, mock_adapter_class): + mock_prompt.return_value = "Mocked Prompt" + mock_instance = mock_adapter_class.return_value + mock_instance.send.return_value = {"text": "Done", "tool_calls": []} + mock_instance.last_usage = {"input_tokens": 10} + mock_instance.last_latency = 0.1 + mock_instance.session_id = None + + ai_client.send("context", "message", discussion_history="hist") + + expected_payload = "[DISCUSSION HISTORY]\n\nhist\n\n---\n\nmessage" + assert mock_instance.send.called + args, kwargs = mock_instance.send.call_args + assert args[0] == expected_payload + assert kwargs['system_instruction'] == "Mocked Prompt\n\n\ncontext\n" + +@patch('ai_client.GeminiCliAdapter') +def test_get_history_bleed_stats(mock_adapter_class): + mock_instance = mock_adapter_class.return_value + mock_instance.send.return_value = {"text": "txt", "tool_calls": []} + mock_instance.last_usage = {"input_tokens": 1500} + mock_instance.last_latency = 0.5 + mock_instance.session_id = "sess" + + # Initialize by sending a message + ai_client.send("context", "msg") + + stats = ai_client.get_history_bleed_stats() + + assert stats["provider"] == "gemini_cli" + assert stats["current"] == 1500