150 lines
6.0 KiB
Python
150 lines
6.0 KiB
Python
import pytest
|
|
import time
|
|
import os
|
|
import sys
|
|
import requests
|
|
import json
|
|
from api_hook_client import ApiHookClient
|
|
|
|
def test_gemini_cli_context_bleed_prevention(live_gui):
|
|
"""
|
|
Test that the GeminiCliAdapter correctly filters out echoed 'user' messages
|
|
and only shows assistant content in the GUI history.
|
|
"""
|
|
client = ApiHookClient("http://127.0.0.1:8999")
|
|
client.click("btn_reset")
|
|
client.set_value("auto_add_history", True)
|
|
# Create a specialized mock for context bleed
|
|
bleed_mock = os.path.abspath("tests/mock_context_bleed.py")
|
|
with open(bleed_mock, "w") as f:
|
|
f.write('''import sys, json
|
|
print(json.dumps({"type": "init", "session_id": "bleed-test"}), flush=True)
|
|
print(json.dumps({"type": "message", "role": "user", "content": "I am echoing you"}), flush=True)
|
|
print(json.dumps({"type": "message", "role": "assistant", "content": "Actual AI Response"}), flush=True)
|
|
print(json.dumps({"type": "result", "stats": {"total_tokens": 10}}), flush=True)
|
|
''')
|
|
cli_cmd = f'"{sys.executable}" "{bleed_mock}"'
|
|
client.set_value("current_provider", "gemini_cli")
|
|
client.set_value("gcli_path", cli_cmd)
|
|
client.set_value("ai_input", "Test context bleed")
|
|
client.click("btn_gen_send")
|
|
# Wait for completion
|
|
time.sleep(3)
|
|
session = client.get_session()
|
|
entries = session.get("session", {}).get("entries", [])
|
|
# Verify: We expect exactly one AI entry, and it must NOT contain the echoed user message
|
|
ai_entries = [e for e in entries if e.get("role") == "AI"]
|
|
assert len(ai_entries) == 1
|
|
assert ai_entries[0].get("content") == "Actual AI Response"
|
|
assert "echoing you" not in ai_entries[0].get("content")
|
|
os.remove(bleed_mock)
|
|
|
|
def test_gemini_cli_parameter_resilience(live_gui):
|
|
"""
|
|
Test that mcp_client correctly handles 'file_path' and 'dir_path' aliases
|
|
sent by the AI instead of 'path'.
|
|
"""
|
|
client = ApiHookClient("http://127.0.0.1:8999")
|
|
client.click("btn_reset")
|
|
client.set_value("auto_add_history", True)
|
|
client.select_list_item("proj_files", "manual_slop")
|
|
# Create a mock that uses dir_path for list_directory
|
|
alias_mock = os.path.abspath("tests/mock_alias_tool.py")
|
|
bridge_path = os.path.abspath("scripts/cli_tool_bridge.py")
|
|
# Avoid backslashes in f-string expression part
|
|
if sys.platform == "win32":
|
|
bridge_path_str = bridge_path.replace("\\", "/")
|
|
else:
|
|
bridge_path_str = bridge_path
|
|
with open(alias_tool_content := "tests/mock_alias_tool.py", "w") as f:
|
|
f.write(f'''import sys, json, os, subprocess
|
|
prompt = sys.stdin.read()
|
|
if '"role": "tool"' in prompt:
|
|
print(json.dumps({{"type": "message", "role": "assistant", "content": "Tool worked!"}}), flush=True)
|
|
print(json.dumps({{"type": "result", "stats": {{"total_tokens": 20}}}}), flush=True)
|
|
else:
|
|
# We must call the bridge to trigger the GUI approval!
|
|
tool_call = {{"name": "list_directory", "input": {{"dir_path": "."}}}}
|
|
bridge_cmd = [sys.executable, "{bridge_path_str}"]
|
|
proc = subprocess.Popen(bridge_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True)
|
|
stdout, _ = proc.communicate(input=json.dumps(tool_call))
|
|
|
|
# Even if bridge says allow, we emit the tool_use to the adapter
|
|
print(json.dumps({{"type": "message", "role": "assistant", "content": "I will list the directory."}}), flush=True)
|
|
print(json.dumps({{
|
|
"type": "tool_use",
|
|
"name": "list_directory",
|
|
"id": "alias_call",
|
|
"args": {{"dir_path": "."}}
|
|
}}), flush=True)
|
|
print(json.dumps({{"type": "result", "stats": {{"total_tokens": 10}}}}), flush=True)
|
|
''')
|
|
cli_cmd = f'"{sys.executable}" "{alias_mock}"'
|
|
client.set_value("current_provider", "gemini_cli")
|
|
client.set_value("gcli_path", cli_cmd)
|
|
client.set_value("ai_input", "Test parameter aliases")
|
|
client.click("btn_gen_send")
|
|
# Handle approval
|
|
timeout = 15
|
|
start_time = time.time()
|
|
approved = False
|
|
while time.time() - start_time < timeout:
|
|
for ev in client.get_events():
|
|
if ev.get("type") == "ask_received":
|
|
requests.post("http://127.0.0.1:8999/api/ask/respond",
|
|
json={"request_id": ev.get("request_id"), "response": {"approved": True}})
|
|
approved = True
|
|
if approved: break
|
|
time.sleep(0.5)
|
|
assert approved, "Tool approval event never received"
|
|
# Verify tool result in history
|
|
time.sleep(2)
|
|
session = client.get_session()
|
|
entries = session.get("session", {}).get("entries", [])
|
|
# Check for "Tool worked!" which implies the tool execution was successful
|
|
found = any("Tool worked!" in e.get("content", "") for e in entries)
|
|
assert found, "Tool result indicating success not found in history"
|
|
os.remove(alias_mock)
|
|
|
|
def test_gemini_cli_loop_termination(live_gui):
|
|
"""
|
|
Test that multi-round tool calling correctly terminates and preserves
|
|
payload (session context) between rounds.
|
|
"""
|
|
client = ApiHookClient("http://127.0.0.1:8999")
|
|
client.click("btn_reset")
|
|
client.set_value("auto_add_history", True)
|
|
client.select_list_item("proj_files", "manual_slop")
|
|
# This uses the existing mock_gemini_cli.py which is already designed for 2 rounds
|
|
mock_script = os.path.abspath("tests/mock_gemini_cli.py")
|
|
cli_cmd = f'"{sys.executable}" "{mock_script}"'
|
|
client.set_value("current_provider", "gemini_cli")
|
|
client.set_value("gcli_path", cli_cmd)
|
|
client.set_value("ai_input", "Perform multi-round tool test")
|
|
client.click("btn_gen_send")
|
|
# Handle approvals (mock does one tool call)
|
|
timeout = 20
|
|
start_time = time.time()
|
|
approved = False
|
|
while time.time() - start_time < timeout:
|
|
for ev in client.get_events():
|
|
if ev.get("type") == "ask_received":
|
|
requests.post("http://127.0.0.1:8999/api/ask/respond",
|
|
json={"request_id": ev.get("request_id"), "response": {"approved": True}})
|
|
approved = True
|
|
if approved: break
|
|
time.sleep(0.5)
|
|
# Wait for the second round and final answer
|
|
found_final = False
|
|
start_time = time.time()
|
|
while time.time() - start_time < 15:
|
|
session = client.get_session()
|
|
entries = session.get("session", {}).get("entries", [])
|
|
for e in entries:
|
|
if "processed the tool results" in e.get("content", ""):
|
|
found_final = True
|
|
break
|
|
if found_final: break
|
|
time.sleep(1)
|
|
assert found_final, "Final message after multi-round tool loop not found"
|