feat(ai): support stdin for Gemini CLI and verify with integration test

This commit is contained in:
2026-02-25 14:23:20 -05:00
parent 3ce4fa0c07
commit d187a6c8d9
4 changed files with 298 additions and 12 deletions

View File

@@ -12,25 +12,36 @@ class GeminiCliAdapter:
"""
Sends a message to the Gemini CLI and processes the streaming JSON output.
"""
command = [self.binary_path, 'run', message, '--output-format', 'stream-json']
# On Windows, using shell=True allows executing .cmd/.bat files and
# handles command strings with arguments more gracefully.
# We pass the message via stdin to avoid command-line length limits.
command = f'{self.binary_path} run --output-format stream-json'
if self.session_id:
command.extend(['--resume', self.session_id])
command += f' --resume {self.session_id}'
print(f"[DEBUG] GeminiCliAdapter: Executing command: {command}")
accumulated_text = ""
# Using subprocess.Popen as requested
process = subprocess.Popen(
command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
text=True
stderr=subprocess.PIPE,
text=True,
shell=True
)
try:
# Send message to stdin and close it
process.stdin.write(message)
process.stdin.close()
# Read stdout line by line
for line in process.stdout:
line = line.strip()
if not line:
continue
print(f"[DEBUG] GeminiCliAdapter stdout: {line}")
try:
data = json.loads(line)
@@ -55,8 +66,12 @@ class GeminiCliAdapter:
continue
process.wait()
if process.returncode != 0:
err = process.stderr.read()
print(f"[DEBUG] GeminiCliAdapter failed with exit code {process.returncode}. stderr: {err}")
except Exception as e:
process.kill()
print(f"[DEBUG] GeminiCliAdapter exception: {e}")
raise e
return accumulated_text

118
gui_2.py
View File

@@ -113,8 +113,8 @@ class App:
self.config = load_config()
ai_cfg = self.config.get("ai", {})
self.current_provider: str = ai_cfg.get("provider", "gemini")
self.current_model: str = ai_cfg.get("model", "gemini-2.5-flash-lite")
self._current_provider: str = ai_cfg.get("provider", "gemini")
self._current_model: str = ai_cfg.get("model", "gemini-2.5-flash-lite")
self.available_models: list[str] = []
self.temperature: float = ai_cfg.get("temperature", 0.0)
self.max_tokens: int = ai_cfg.get("max_tokens", 8192)
@@ -193,6 +193,12 @@ class App:
self._pending_dialog_lock = threading.Lock()
self._pending_actions: dict[str, ConfirmDialog] = {}
# Ask-related state (for tool approvals from CLI)
self._pending_ask_dialog = False
self._ask_dialog_open = False
self._ask_request_id = None
self._ask_tool_data = None
self._tool_log: list[tuple[str, str]] = []
self._comms_log: list[dict] = []
@@ -258,7 +264,45 @@ class App:
self._last_autosave = time.time()
session_logger.open_session()
self._init_ai_and_hooks()
@property
def current_provider(self):
return self._current_provider
@current_provider.setter
def current_provider(self, value):
if value != self._current_provider:
self._current_provider = value
ai_client.reset_session()
ai_client.set_provider(value, self.current_model)
if value == "gemini_cli":
# Ensure the adapter is initialized with the current path
if not ai_client._gemini_cli_adapter:
ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=self.ui_gemini_cli_path)
else:
ai_client._gemini_cli_adapter.binary_path = self.ui_gemini_cli_path
self.available_models = []
self._fetch_models(value)
@property
def current_model(self):
return self._current_model
@current_model.setter
def current_model(self, value):
if value != self._current_model:
self._current_model = value
ai_client.reset_session()
ai_client.set_provider(self.current_provider, value)
def _init_ai_and_hooks(self):
ai_client.set_provider(self.current_provider, self.current_model)
if self.current_provider == "gemini_cli":
if not ai_client._gemini_cli_adapter:
ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=self.ui_gemini_cli_path)
else:
ai_client._gemini_cli_adapter.binary_path = self.ui_gemini_cli_path
ai_client.confirm_and_run_callback = self._confirm_and_run
ai_client.comms_log_callback = self._on_comms_entry
ai_client.tool_log_callback = self._on_tool_log
@@ -277,6 +321,7 @@ class App:
'auto_add_history': 'ui_auto_add_history',
'disc_new_name_input': 'ui_disc_new_name_input',
'project_main_context': 'ui_project_main_context',
'gcli_path': 'ui_gemini_cli_path',
'output_dir': 'ui_output_dir',
'files_base_dir': 'ui_files_base_dir',
'ai_status': 'ai_status',
@@ -749,6 +794,11 @@ class App:
if item == "disc_listbox":
self._switch_discussion(value)
elif task.get("type") == "ask":
self._pending_ask_dialog = True
self._ask_request_id = task.get("request_id")
self._ask_tool_data = task.get("data", {})
elif action == "custom_callback":
cb = task.get("callback")
args = task.get("args", [])
@@ -789,6 +839,34 @@ class App:
else:
print("[DEBUG] No pending dialog to reject")
def _handle_approve_ask(self):
"""Responds with approval for a pending /api/ask request."""
if not self._ask_request_id: return
try:
requests.post(
"http://127.0.0.1:8999/api/ask/respond",
json={"request_id": self._ask_request_id, "response": {"approved": True}},
timeout=2
)
except Exception as e: print(f"Error responding to ask: {e}")
self._pending_ask_dialog = False
self._ask_request_id = None
self._ask_tool_data = None
def _handle_reject_ask(self):
"""Responds with rejection for a pending /api/ask request."""
if not self._ask_request_id: return
try:
requests.post(
"http://127.0.0.1:8999/api/ask/respond",
json={"request_id": self._ask_request_id, "response": {"approved": False}},
timeout=2
)
except Exception as e: print(f"Error responding to ask: {e}")
self._pending_ask_dialog = False
self._ask_request_id = None
self._ask_tool_data = None
def _handle_reset_session(self):
"""Logic for resetting the AI session."""
ai_client.reset_session()
@@ -1398,6 +1476,36 @@ class App:
imgui.close_current_popup()
imgui.end_popup()
if self._pending_ask_dialog:
if not self._ask_dialog_open:
imgui.open_popup("Approve Tool Execution")
self._ask_dialog_open = True
else:
self._ask_dialog_open = False
if imgui.begin_popup_modal("Approve Tool Execution", None, imgui.WindowFlags_.always_auto_resize)[0]:
if not self._pending_ask_dialog:
imgui.close_current_popup()
else:
tool_name = self._ask_tool_data.get("tool", "unknown")
tool_args = self._ask_tool_data.get("args", {})
imgui.text("The AI wants to execute a tool:")
imgui.text_colored(vec4(200, 200, 100), f"Tool: {tool_name}")
imgui.separator()
imgui.text("Arguments:")
imgui.begin_child("ask_args_child", imgui.ImVec2(400, 200), True)
imgui.text_unformatted(json.dumps(tool_args, indent=2))
imgui.end_child()
imgui.separator()
if imgui.button("Approve", imgui.ImVec2(120, 0)):
self._handle_approve_ask()
imgui.close_current_popup()
imgui.same_line()
if imgui.button("Deny", imgui.ImVec2(120, 0)):
self._handle_reject_ask()
imgui.close_current_popup()
imgui.end_popup()
if self.show_script_output:
if self._trigger_script_blink:
self._trigger_script_blink = False
@@ -1845,10 +1953,6 @@ class App:
for p in PROVIDERS:
if imgui.selectable(p, p == self.current_provider)[0]:
self.current_provider = p
ai_client.reset_session()
ai_client.set_provider(p, self.current_model)
self.available_models = []
self._fetch_models(p)
imgui.end_combo()
imgui.separator()
imgui.text("Model")
@@ -1860,8 +1964,6 @@ class App:
for m in self.available_models:
if imgui.selectable(m, m == self.current_model)[0]:
self.current_model = m
ai_client.reset_session()
ai_client.set_provider(self.current_provider, m)
imgui.end_list_box()
imgui.separator()
imgui.text("Parameters")

83
tests/mock_gemini_cli.py Normal file
View File

@@ -0,0 +1,83 @@
import sys
import json
import subprocess
import os
def main():
# The GUI calls: <binary> run --output-format stream-json
# The prompt is now passed via stdin.
# Debug log to stderr
sys.stderr.write(f"DEBUG: mock_gemini_cli called with args: {sys.argv}\n")
# Read prompt from stdin for debug
prompt = sys.stdin.read()
sys.stderr.write(f"DEBUG: Received prompt via stdin ({len(prompt)} chars)\n")
sys.stderr.flush()
if "run" not in sys.argv:
return
# Simulate the 'BeforeTool' hook by calling the bridge directly.
bridge_path = os.path.abspath("scripts/cli_tool_bridge.py")
tool_call = {
"tool_name": "read_file",
"tool_input": {"path": "test.txt"}
}
sys.stderr.write(f"DEBUG: Calling bridge at {bridge_path}\n")
sys.stderr.flush()
# Bridge reads from stdin
process = subprocess.Popen(
[sys.executable, bridge_path],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
stdout, stderr = process.communicate(input=json.dumps(tool_call))
sys.stderr.write(f"DEBUG: Bridge stdout: {stdout}\n")
sys.stderr.write(f"DEBUG: Bridge stderr: {stderr}\n")
sys.stderr.flush()
try:
decision_data = json.loads(stdout.strip())
decision = decision_data.get("decision")
except Exception as e:
sys.stderr.write(f"DEBUG: Failed to parse bridge output: {e}\n")
decision = "deny"
# Output JSONL to stdout
if decision == "allow":
print(json.dumps({
"type": "tool_use",
"name": "read_file",
"args": {"path": "test.txt"}
}), flush=True)
print(json.dumps({
"type": "message",
"text": "I read the file. It contains: 'Hello from mock!'"
}), flush=True)
print(json.dumps({
"type": "result",
"usage": {"total_tokens": 50},
"session_id": "mock-session-123"
}), flush=True)
else:
print(json.dumps({
"type": "message",
"text": f"Tool execution was denied. Decision: {decision}"
}), flush=True)
print(json.dumps({
"type": "result",
"usage": {"total_tokens": 10},
"session_id": "mock-session-denied"
}), flush=True)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,86 @@
import pytest
import time
import os
import sys
import requests
from api_hook_client import ApiHookClient
def test_gemini_cli_full_integration(live_gui):
"""
Integration test for the Gemini CLI provider and tool bridge.
"""
client = ApiHookClient("http://127.0.0.1:8999")
# 1. Setup paths and configure the GUI
mock_script = os.path.abspath("tests/mock_gemini_cli.py")
# Wrap in quotes for shell execution if path has spaces
cli_cmd = f'"{sys.executable}" "{mock_script}"'
# Set provider and binary path via GUI hooks
# Note: Using set_value which now triggers the property setter in gui_2.py
print(f"[TEST] Setting current_provider to gemini_cli")
client.set_value("current_provider", "gemini_cli")
print(f"[TEST] Setting gcli_path to {cli_cmd}")
client.set_value("gcli_path", cli_cmd)
# Verify settings were applied
assert client.get_value("current_provider") == "gemini_cli"
assert client.get_value("gcli_path") == cli_cmd
# Clear events
client.get_events()
# 2. Trigger a message in the GUI
print("[TEST] Sending user message...")
client.set_value("ai_input", "Please read test.txt")
client.click("btn_gen_send")
# 3. Monitor for the 'ask_received' event
print("[TEST] Waiting for ask_received event...")
request_id = None
timeout = 30
start_time = time.time()
while time.time() - start_time < timeout:
events = client.get_events()
if events:
print(f"[TEST] Received {len(events)} events: {[e.get('type') for e in events]}")
for ev in events:
if ev.get("type") == "ask_received":
request_id = ev.get("request_id")
print(f"[TEST] Found request_id: {request_id}")
break
if request_id:
break
time.sleep(0.5)
assert request_id is not None, "Timed out waiting for 'ask_received' event from the bridge"
# 4. Respond to the permission request
print("[TEST] Responding to ask with approval")
resp = requests.post(
"http://127.0.0.1:8999/api/ask/respond",
json={
"request_id": request_id,
"response": {"approved": True}
}
)
assert resp.status_code == 200
# 5. Verify that the final response is displayed in the GUI
print("[TEST] Waiting for final message in history...")
final_message_received = False
start_time = time.time()
while time.time() - start_time < timeout:
session = client.get_session()
entries = session.get("session", {}).get("entries", [])
for entry in entries:
content = entry.get("content", "")
if "Hello from mock!" in content:
print(f"[TEST] Success! Found message: {content[:50]}...")
final_message_received = True
break
if final_message_received:
break
time.sleep(1.0)
assert final_message_received, "Final message from mock CLI was not found in the GUI history"