Compare commits

..

7 Commits

13 changed files with 766 additions and 27 deletions

10
.gemini/settings.json Normal file
View File

@@ -0,0 +1,10 @@
{
"hooks": [
{
"name": "manual-slop-bridge",
"type": "command",
"event": "BeforeTool",
"command": "python C:/projects/manual_slop/scripts/cli_tool_bridge.py"
}
]
}

View File

@@ -23,6 +23,7 @@ import os
import file_cache
import mcp_client
import anthropic
from gemini_cli_adapter import GeminiCliAdapter
from google import genai
from google.genai import types
from events import EventEmitter
@@ -58,6 +59,8 @@ _anthropic_history: list[dict] = []
_anthropic_history_lock = threading.Lock()
_send_lock = threading.Lock()
_gemini_cli_adapter = None
# Injected by gui.py - called when AI wants to run a command.
# Signature: (script: str, base_dir: str) -> str | None
confirm_and_run_callback = None
@@ -253,6 +256,7 @@ def reset_session():
global _gemini_cache_md_hash, _gemini_cache_created_at
global _anthropic_client, _anthropic_history
global _CACHED_ANTHROPIC_TOOLS
global _gemini_cli_adapter
if _gemini_client and _gemini_cache:
try:
_gemini_client.caches.delete(name=_gemini_cache.name)
@@ -263,6 +267,8 @@ def reset_session():
_gemini_cache = None
_gemini_cache_md_hash = None
_gemini_cache_created_at = None
if _gemini_cli_adapter:
_gemini_cli_adapter.session_id = None
_anthropic_client = None
with _anthropic_history_lock:
_anthropic_history = []
@@ -787,8 +793,47 @@ def _send_gemini(md_content: str, user_message: str, base_dir: str,
return "\n\n".join(all_text) if all_text else "(No text returned)"
except Exception as e: raise _classify_gemini_error(e) from e
def _send_gemini_cli(md_content: str, user_message: str, base_dir: str,
file_items: list[dict] | None = None,
discussion_history: str = "") -> str:
global _gemini_cli_adapter
try:
if _gemini_cli_adapter is None:
_gemini_cli_adapter = GeminiCliAdapter(binary_path="gemini")
events.emit("request_start", payload={"provider": "gemini_cli", "model": _model, "round": 0})
# If it's a new session (session_id is None), we should ideally send the context.
# For now, following the simple pattern:
payload = user_message
if _gemini_cli_adapter.session_id is None:
# Prepend context and discussion history to the first message
full_prompt = f"{_get_combined_system_prompt()}\n\n<context>\n{md_content}\n</context>\n\n"
if discussion_history:
full_prompt += f"[DISCUSSION HISTORY]\n\n{discussion_history}\n\n---\n\n"
full_prompt += user_message
payload = full_prompt
_append_comms("OUT", "request", {"message": f"[CLI] [msg {len(payload)}]"})
result_text = _gemini_cli_adapter.send(payload)
usage = _gemini_cli_adapter.last_usage or {}
latency = _gemini_cli_adapter.last_latency
events.emit("response_received", payload={"provider": "gemini_cli", "model": _model, "usage": usage, "latency": latency, "round": 0})
_append_comms("IN", "response", {
"round": 0,
"stop_reason": "STOP",
"text": result_text,
"tool_calls": [],
"usage": usage
})
return result_text
except Exception as e:
# Basic error classification for CLI
raise ProviderError("unknown", "gemini_cli", e)
# ------------------------------------------------------------------ anthropic history management
@@ -1276,6 +1321,8 @@ def send(
with _send_lock:
if _provider == "gemini":
return _send_gemini(md_content, user_message, base_dir, file_items, discussion_history)
elif _provider == "gemini_cli":
return _send_gemini_cli(md_content, user_message, base_dir, file_items, discussion_history)
elif _provider == "anthropic":
return _send_anthropic(md_content, user_message, base_dir, file_items, discussion_history)
raise ValueError(f"unknown provider: {_provider}")

View File

@@ -35,7 +35,7 @@ This file tracks all major tracks for the project. Each track has its own detail
---
- [ ] **Track: Support gemini cli headless as an alternative to the raw client_api route. So that they user may use their gemini subscription and gemini cli features within manual slop for a more discliplined and visually enriched UX.**
- [x] **Track: Support gemini cli headless as an alternative to the raw client_api route. So that they user may use their gemini subscription and gemini cli features within manual slop for a more discliplined and visually enriched UX.**
*Link: [./tracks/gemini_cli_headless_20260224/](./tracks/gemini_cli_headless_20260224/)*

View File

@@ -7,20 +7,20 @@
- [x] Task: Conductor - User Manual Verification 'Phase 1: IPC Infrastructure Extension' (Protocol in workflow.md) (c0bccce)
## Phase 2: Gemini CLI Adapter & Tool Bridge
- [ ] Task: Implement `scripts/cli_tool_bridge.py`. This script will be called by the Gemini CLI `BeforeTool` hook and use `ApiHookClient` to talk to the GUI.
- [ ] Task: Implement the `GeminiCliAdapter` in `ai_client.py` (or a new `gemini_cli_adapter.py`). It must handle the `subprocess` lifecycle and parse the `stream-json` output.
- [ ] Task: Integrate `GeminiCliAdapter` into the main `ai_client.send()` logic.
- [ ] Task: Write unit tests for the JSON parsing and subprocess management in `GeminiCliAdapter`.
- [ ] Task: Conductor - User Manual Verification 'Phase 2: Gemini CLI Adapter & Tool Bridge' (Protocol in workflow.md)
- [x] Task: Implement `scripts/cli_tool_bridge.py`. This script will be called by the Gemini CLI `BeforeTool` hook and use `ApiHookClient` to talk to the GUI. (211000c)
- [x] Task: Implement the `GeminiCliAdapter` in `ai_client.py` (or a new `gemini_cli_adapter.py`). It must handle the `subprocess` lifecycle and parse the `stream-json` output. (b762a80)
- [x] Task: Integrate `GeminiCliAdapter` into the main `ai_client.send()` logic. (b762a80)
- [x] Task: Write unit tests for the JSON parsing and subprocess management in `GeminiCliAdapter`. (b762a80)
- [~] Task: Conductor - User Manual Verification 'Phase 2: Gemini CLI Adapter & Tool Bridge' (Protocol in workflow.md)
## Phase 3: GUI Integration & Provider Support
- [ ] Task: Update `gui_2.py` (and `gui_legacy.py`) to add "Gemini CLI" to the provider dropdown.
- [ ] Task: Implement UI elements for "Gemini CLI Session Management" (Login button, session ID display).
- [ ] Task: Update the `manual_slop.toml` logic to persist Gemini CLI specific settings (e.g., path to CLI, approval mode).
- [ ] Task: Conductor - User Manual Verification 'Phase 3: GUI Integration & Provider Support' (Protocol in workflow.md)
- [x] Task: Update `gui_2.py` to add "Gemini CLI" to the provider dropdown. (3ce4fa0)
- [x] Task: Implement UI elements for "Gemini CLI Session Management" (Login button, session ID display). (3ce4fa0)
- [x] Task: Update the `manual_slop.toml` logic to persist Gemini CLI specific settings (e.g., path to CLI, approval mode). (3ce4fa0)
- [~] Task: Conductor - User Manual Verification 'Phase 3: GUI Integration & Provider Support' (Protocol in workflow.md)
## Phase 4: Integration Testing & UX Polish
- [ ] Task: Create a comprehensive integration test `tests/test_gemini_cli_integration.py` that uses the `live_gui` fixture to simulate a full session.
- [ ] Task: Verify tool confirmation flow: CLI Tool -> Bridge -> GUI Modal -> User Approval -> CLI Execution.
- [ ] Task: Polish the display of CLI telemetry (tokens/latency) in the GUI diagnostics panel.
- [ ] Task: Conductor - User Manual Verification 'Phase 4: Integration Testing & UX Polish' (Protocol in workflow.md)
- [x] Task: Create a comprehensive integration test `tests/test_gemini_cli_integration.py` that uses the `live_gui` fixture to simulate a full session. (d187a6c)
- [x] Task: Verify tool confirmation flow: CLI Tool -> Bridge -> GUI Modal -> User Approval -> CLI Execution. (d187a6c)
- [x] Task: Polish the display of CLI telemetry (tokens/latency) in the GUI diagnostics panel. (1e5b43e)
- [x] Task: Conductor - User Manual Verification 'Phase 4: Integration Testing & UX Polish' (Protocol in workflow.md) (1e5b43e)

76
gemini_cli_adapter.py Normal file
View File

@@ -0,0 +1,76 @@
import subprocess
import json
import sys
import time
class GeminiCliAdapter:
def __init__(self, binary_path="gemini"):
self.binary_path = binary_path
self.last_usage = None
self.session_id = None
self.last_latency = 0.0
def send(self, message):
"""
Sends a message to the Gemini CLI and processes the streaming JSON output.
"""
start_time = time.time()
# On Windows, using shell=True allows executing .cmd/.bat files and
# handles command strings with arguments more gracefully.
# We pass the message via stdin to avoid command-line length limits.
command = f'{self.binary_path} run --output-format stream-json'
if self.session_id:
command += f' --resume {self.session_id}'
accumulated_text = ""
process = subprocess.Popen(
command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
shell=True
)
try:
# Send message to stdin and close it
process.stdin.write(message)
process.stdin.close()
# Read stdout line by line
for line in process.stdout:
line = line.strip()
if not line:
continue
try:
data = json.loads(line)
msg_type = data.get("type")
if msg_type == "message":
# Append message text to results
accumulated_text += data.get("text", "")
elif msg_type == "result":
# Capture final usage and session persistence
self.last_usage = data.get("usage")
self.session_id = data.get("session_id")
elif msg_type in ("status", "tool_use"):
# Log status/tool_use to stderr for debugging
sys.stderr.write(f"GeminiCliAdapter [{msg_type}]: {line}\n")
sys.stderr.flush()
except json.JSONDecodeError:
# Skip lines that are not valid JSON
continue
process.wait()
except Exception as e:
process.kill()
raise e
finally:
self.last_latency = time.time() - start_time
return accumulated_text

160
gui_2.py
View File

@@ -29,7 +29,7 @@ from pydantic import BaseModel
from imgui_bundle import imgui, hello_imgui, immapp
CONFIG_PATH = Path("config.toml")
PROVIDERS = ["gemini", "anthropic"]
PROVIDERS = ["gemini", "anthropic", "gemini_cli"]
COMMS_CLAMP_CHARS = 300
def load_config() -> dict:
@@ -113,8 +113,8 @@ class App:
self.config = load_config()
ai_cfg = self.config.get("ai", {})
self.current_provider: str = ai_cfg.get("provider", "gemini")
self.current_model: str = ai_cfg.get("model", "gemini-2.5-flash-lite")
self._current_provider: str = ai_cfg.get("provider", "gemini")
self._current_model: str = ai_cfg.get("model", "gemini-2.5-flash-lite")
self.available_models: list[str] = []
self.temperature: float = ai_cfg.get("temperature", 0.0)
self.max_tokens: int = ai_cfg.get("max_tokens", 8192)
@@ -148,6 +148,7 @@ class App:
self.ui_project_git_dir = proj_meta.get("git_dir", "")
self.ui_project_main_context = proj_meta.get("main_context", "")
self.ui_project_system_prompt = proj_meta.get("system_prompt", "")
self.ui_gemini_cli_path = self.project.get("gemini_cli", {}).get("binary_path", "gemini")
self.ui_word_wrap = proj_meta.get("word_wrap", True)
self.ui_summary_only = proj_meta.get("summary_only", False)
self.ui_auto_add_history = disc_sec.get("auto_add", False)
@@ -192,6 +193,12 @@ class App:
self._pending_dialog_lock = threading.Lock()
self._pending_actions: dict[str, ConfirmDialog] = {}
# Ask-related state (for tool approvals from CLI)
self._pending_ask_dialog = False
self._ask_dialog_open = False
self._ask_request_id = None
self._ask_tool_data = None
self._tool_log: list[tuple[str, str]] = []
self._comms_log: list[dict] = []
@@ -257,7 +264,45 @@ class App:
self._last_autosave = time.time()
session_logger.open_session()
self._init_ai_and_hooks()
@property
def current_provider(self):
return self._current_provider
@current_provider.setter
def current_provider(self, value):
if value != self._current_provider:
self._current_provider = value
ai_client.reset_session()
ai_client.set_provider(value, self.current_model)
if value == "gemini_cli":
# Ensure the adapter is initialized with the current path
if not ai_client._gemini_cli_adapter:
ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=self.ui_gemini_cli_path)
else:
ai_client._gemini_cli_adapter.binary_path = self.ui_gemini_cli_path
self.available_models = []
self._fetch_models(value)
@property
def current_model(self):
return self._current_model
@current_model.setter
def current_model(self, value):
if value != self._current_model:
self._current_model = value
ai_client.reset_session()
ai_client.set_provider(self.current_provider, value)
def _init_ai_and_hooks(self):
ai_client.set_provider(self.current_provider, self.current_model)
if self.current_provider == "gemini_cli":
if not ai_client._gemini_cli_adapter:
ai_client._gemini_cli_adapter = ai_client.GeminiCliAdapter(binary_path=self.ui_gemini_cli_path)
else:
ai_client._gemini_cli_adapter.binary_path = self.ui_gemini_cli_path
ai_client.confirm_and_run_callback = self._confirm_and_run
ai_client.comms_log_callback = self._on_comms_entry
ai_client.tool_log_callback = self._on_tool_log
@@ -276,6 +321,7 @@ class App:
'auto_add_history': 'ui_auto_add_history',
'disc_new_name_input': 'ui_disc_new_name_input',
'project_main_context': 'ui_project_main_context',
'gcli_path': 'ui_gemini_cli_path',
'output_dir': 'ui_output_dir',
'files_base_dir': 'ui_files_base_dir',
'ai_status': 'ai_status',
@@ -592,6 +638,7 @@ class App:
self.ui_project_git_dir = proj.get("project", {}).get("git_dir", "")
self.ui_project_system_prompt = proj.get("project", {}).get("system_prompt", "")
self.ui_project_main_context = proj.get("project", {}).get("main_context", "")
self.ui_gemini_cli_path = proj.get("gemini_cli", {}).get("binary_path", "gemini")
self.ui_auto_add_history = proj.get("discussion", {}).get("auto_add", False)
self.ui_auto_scroll_comms = proj.get("project", {}).get("auto_scroll_comms", True)
self.ui_auto_scroll_tool_calls = proj.get("project", {}).get("auto_scroll_tool_calls", True)
@@ -747,6 +794,11 @@ class App:
if item == "disc_listbox":
self._switch_discussion(value)
elif task.get("type") == "ask":
self._pending_ask_dialog = True
self._ask_request_id = task.get("request_id")
self._ask_tool_data = task.get("data", {})
elif action == "custom_callback":
cb = task.get("callback")
args = task.get("args", [])
@@ -787,6 +839,34 @@ class App:
else:
print("[DEBUG] No pending dialog to reject")
def _handle_approve_ask(self):
"""Responds with approval for a pending /api/ask request."""
if not self._ask_request_id: return
try:
requests.post(
"http://127.0.0.1:8999/api/ask/respond",
json={"request_id": self._ask_request_id, "response": {"approved": True}},
timeout=2
)
except Exception as e: print(f"Error responding to ask: {e}")
self._pending_ask_dialog = False
self._ask_request_id = None
self._ask_tool_data = None
def _handle_reject_ask(self):
"""Responds with rejection for a pending /api/ask request."""
if not self._ask_request_id: return
try:
requests.post(
"http://127.0.0.1:8999/api/ask/respond",
json={"request_id": self._ask_request_id, "response": {"approved": False}},
timeout=2
)
except Exception as e: print(f"Error responding to ask: {e}")
self._pending_ask_dialog = False
self._ask_request_id = None
self._ask_tool_data = None
def _handle_reset_session(self):
"""Logic for resetting the AI session."""
ai_client.reset_session()
@@ -883,15 +963,18 @@ class App:
f.write(data)
def _recalculate_session_usage(self):
usage = {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0}
usage = {"input_tokens": 0, "output_tokens": 0, "cache_read_input_tokens": 0, "cache_creation_input_tokens": 0, "total_tokens": 0, "last_latency": 0.0}
for entry in ai_client.get_comms_log():
if entry.get("kind") == "response" and "usage" in entry.get("payload", {}):
u = entry["payload"]["usage"]
for k in usage.keys():
usage[k] += u.get(k, 0) or 0
for k in ["input_tokens", "output_tokens", "cache_read_input_tokens", "cache_creation_input_tokens", "total_tokens"]:
if k in usage:
usage[k] += u.get(k, 0) or 0
self.session_usage = usage
def _refresh_api_metrics(self, payload: dict, md_content: str | None = None):
if "latency" in payload:
self.session_usage["last_latency"] = payload["latency"]
self._recalculate_session_usage()
def fetch_stats():
@@ -1036,6 +1119,8 @@ class App:
proj["project"]["auto_scroll_comms"] = self.ui_auto_scroll_comms
proj["project"]["auto_scroll_tool_calls"] = self.ui_auto_scroll_tool_calls
proj.setdefault("gemini_cli", {})["binary_path"] = self.ui_gemini_cli_path
proj.setdefault("agent", {}).setdefault("tools", {})
for t_name in AGENT_TOOL_NAMES:
proj["agent"]["tools"][t_name] = self.ui_agent_tools.get(t_name, True)
@@ -1394,6 +1479,36 @@ class App:
imgui.close_current_popup()
imgui.end_popup()
if self._pending_ask_dialog:
if not self._ask_dialog_open:
imgui.open_popup("Approve Tool Execution")
self._ask_dialog_open = True
else:
self._ask_dialog_open = False
if imgui.begin_popup_modal("Approve Tool Execution", None, imgui.WindowFlags_.always_auto_resize)[0]:
if not self._pending_ask_dialog:
imgui.close_current_popup()
else:
tool_name = self._ask_tool_data.get("tool", "unknown")
tool_args = self._ask_tool_data.get("args", {})
imgui.text("The AI wants to execute a tool:")
imgui.text_colored(vec4(200, 200, 100), f"Tool: {tool_name}")
imgui.separator()
imgui.text("Arguments:")
imgui.begin_child("ask_args_child", imgui.ImVec2(400, 200), True)
imgui.text_unformatted(json.dumps(tool_args, indent=2))
imgui.end_child()
imgui.separator()
if imgui.button("Approve", imgui.ImVec2(120, 0)):
self._handle_approve_ask()
imgui.close_current_popup()
imgui.same_line()
if imgui.button("Deny", imgui.ImVec2(120, 0)):
self._handle_reject_ask()
imgui.close_current_popup()
imgui.end_popup()
if self.show_script_output:
if self._trigger_script_blink:
self._trigger_script_blink = False
@@ -1841,10 +1956,6 @@ class App:
for p in PROVIDERS:
if imgui.selectable(p, p == self.current_provider)[0]:
self.current_provider = p
ai_client.reset_session()
ai_client.set_provider(p, self.current_model)
self.available_models = []
self._fetch_models(p)
imgui.end_combo()
imgui.separator()
imgui.text("Model")
@@ -1856,8 +1967,6 @@ class App:
for m in self.available_models:
if imgui.selectable(m, m == self.current_model)[0]:
self.current_model = m
ai_client.reset_session()
ai_client.set_provider(self.current_provider, m)
imgui.end_list_box()
imgui.separator()
imgui.text("Parameters")
@@ -1865,11 +1974,38 @@ class App:
ch, self.max_tokens = imgui.input_int("Max Tokens (Output)", self.max_tokens, 1024)
ch, self.history_trunc_limit = imgui.input_int("History Truncation Limit", self.history_trunc_limit, 1024)
if self.current_provider == "gemini_cli":
imgui.separator()
imgui.text("Gemini CLI")
sid = "None"
if hasattr(ai_client, "_gemini_cli_adapter") and ai_client._gemini_cli_adapter:
sid = ai_client._gemini_cli_adapter.session_id or "None"
imgui.text(f"Session ID: {sid}")
if imgui.button("Reset CLI Session"):
ai_client.reset_session()
imgui.text("Binary Path")
ch, self.ui_gemini_cli_path = imgui.input_text("##gcli_path", self.ui_gemini_cli_path)
imgui.same_line()
if imgui.button("Browse##gcli"):
r = hide_tk_root()
p = filedialog.askopenfilename(title="Select gemini CLI binary")
r.destroy()
if p:
self.ui_gemini_cli_path = p
if ch:
if hasattr(ai_client, "_gemini_cli_adapter") and ai_client._gemini_cli_adapter:
ai_client._gemini_cli_adapter.binary_path = self.ui_gemini_cli_path
imgui.separator()
imgui.text("Telemetry")
usage = self.session_usage
total = usage["input_tokens"] + usage["output_tokens"]
if total == 0 and usage.get("total_tokens", 0) > 0:
total = usage["total_tokens"]
imgui.text_colored(C_RES, f"Tokens: {total:,} (In: {usage['input_tokens']:,} Out: {usage['output_tokens']:,})")
if usage.get("last_latency", 0.0) > 0:
imgui.text_colored(C_LBL, f" Last Latency: {usage['last_latency']:.2f}s")
if usage["cache_read_input_tokens"]:
imgui.text_colored(C_LBL, f" Cache Read: {usage['cache_read_input_tokens']:,} Creation: {usage['cache_creation_input_tokens']:,}")
imgui.text("Token Budget:")

View File

@@ -100,6 +100,7 @@ def default_project(name: str = "unnamed") -> dict:
"output": {"output_dir": "./md_gen"},
"files": {"base_dir": ".", "paths": []},
"screenshots": {"base_dir": ".", "paths": []},
"gemini_cli": {"binary_path": "gemini"},
"agent": {
"tools": {
"run_powershell": True,

View File

@@ -0,0 +1,65 @@
import sys
import json
import logging
import os
# Add project root to sys.path so we can import api_hook_client
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
try:
from api_hook_client import ApiHookClient
except ImportError:
# Fallback for if we are running from root or other locations
sys.path.append(os.path.abspath(os.path.dirname(__file__)))
from api_hook_client import ApiHookClient
def main():
# Setup basic logging to stderr so it doesn't interfere with stdout JSON
logging.basicConfig(level=logging.ERROR, stream=sys.stderr)
try:
# 1. Read JSON from sys.stdin
input_data = sys.stdin.read()
if not input_data:
return
hook_input = json.loads(input_data)
# 2. Extract 'tool_name' and 'tool_input'
tool_name = hook_input.get('tool_name')
tool_args = hook_input.get('tool_input', {})
# 3. Use 'ApiHookClient' (assuming GUI is on http://127.0.0.1:8999)
client = ApiHookClient(base_url="http://127.0.0.1:8999")
try:
# 5. Request confirmation
# This is a blocking call that waits for the user in the GUI
response = client.request_confirmation(tool_name, tool_args)
if response and response.get('approved') is True:
# 5. Print 'allow' decision
print(json.dumps({"decision": "allow"}))
else:
# 6. Print 'deny' decision
print(json.dumps({
"decision": "deny",
"reason": "User rejected tool execution."
}))
except Exception as e:
# 7. Handle cases where hook server is not reachable
print(json.dumps({
"decision": "deny",
"reason": f"Hook server unreachable or error occurred: {str(e)}"
}))
except Exception as e:
# Fallback for unexpected parsing errors
print(json.dumps({
"decision": "deny",
"reason": f"Internal bridge error: {str(e)}"
}))
if __name__ == "__main__":
main()

83
tests/mock_gemini_cli.py Normal file
View File

@@ -0,0 +1,83 @@
import sys
import json
import subprocess
import os
def main():
# The GUI calls: <binary> run --output-format stream-json
# The prompt is now passed via stdin.
# Debug log to stderr
sys.stderr.write(f"DEBUG: mock_gemini_cli called with args: {sys.argv}\n")
# Read prompt from stdin for debug
prompt = sys.stdin.read()
sys.stderr.write(f"DEBUG: Received prompt via stdin ({len(prompt)} chars)\n")
sys.stderr.flush()
if "run" not in sys.argv:
return
# Simulate the 'BeforeTool' hook by calling the bridge directly.
bridge_path = os.path.abspath("scripts/cli_tool_bridge.py")
tool_call = {
"tool_name": "read_file",
"tool_input": {"path": "test.txt"}
}
sys.stderr.write(f"DEBUG: Calling bridge at {bridge_path}\n")
sys.stderr.flush()
# Bridge reads from stdin
process = subprocess.Popen(
[sys.executable, bridge_path],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
stdout, stderr = process.communicate(input=json.dumps(tool_call))
sys.stderr.write(f"DEBUG: Bridge stdout: {stdout}\n")
sys.stderr.write(f"DEBUG: Bridge stderr: {stderr}\n")
sys.stderr.flush()
try:
decision_data = json.loads(stdout.strip())
decision = decision_data.get("decision")
except Exception as e:
sys.stderr.write(f"DEBUG: Failed to parse bridge output: {e}\n")
decision = "deny"
# Output JSONL to stdout
if decision == "allow":
print(json.dumps({
"type": "tool_use",
"name": "read_file",
"args": {"path": "test.txt"}
}), flush=True)
print(json.dumps({
"type": "message",
"text": "I read the file. It contains: 'Hello from mock!'"
}), flush=True)
print(json.dumps({
"type": "result",
"usage": {"total_tokens": 50},
"session_id": "mock-session-123"
}), flush=True)
else:
print(json.dumps({
"type": "message",
"text": f"Tool execution was denied. Decision: {decision}"
}), flush=True)
print(json.dumps({
"type": "result",
"usage": {"total_tokens": 10},
"session_id": "mock-session-denied"
}), flush=True)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,39 @@
import pytest
from unittest.mock import MagicMock, patch
import ai_client
def test_ai_client_send_gemini_cli():
"""
Verifies that 'ai_client.send' correctly interacts with 'GeminiCliAdapter'
when the 'gemini_cli' provider is specified.
"""
test_message = "Hello, this is a test prompt for the CLI adapter."
test_response = "This is a dummy response from the Gemini CLI."
# Set provider to gemini_cli
ai_client.set_provider("gemini_cli", "gemini-2.0-flash")
# 1. Mock 'ai_client.GeminiCliAdapter' (which we will add)
with patch('ai_client.GeminiCliAdapter') as MockAdapterClass:
mock_adapter_instance = MockAdapterClass.return_value
mock_adapter_instance.send.return_value = test_response
mock_adapter_instance.last_usage = {"total_tokens": 100}
# Verify that 'events' are emitted correctly
with patch.object(ai_client.events, 'emit') as mock_emit:
response = ai_client.send(
md_content="<context></context>",
user_message=test_message,
base_dir="."
)
# Check that the adapter's send method was called.
mock_adapter_instance.send.assert_called()
# Verify that the expected lifecycle events were emitted.
emitted_event_names = [call.args[0] for call in mock_emit.call_args_list]
assert 'request_start' in emitted_event_names
assert 'response_received' in emitted_event_names
# Verify that the combined text returned by the adapter is returned by 'ai_client.send'.
assert response == test_response

View File

@@ -0,0 +1,74 @@
import unittest
from unittest.mock import patch, MagicMock
import io
import json
import sys
import os
# Add project root to sys.path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
# Import after path fix
from scripts.cli_tool_bridge import main
class TestCliToolBridge(unittest.TestCase):
def setUp(self):
self.tool_call = {
'tool_name': 'read_file',
'tool_input': {'path': 'test.txt'}
}
@patch('sys.stdin', new_callable=io.StringIO)
@patch('sys.stdout', new_callable=io.StringIO)
@patch('api_hook_client.ApiHookClient.request_confirmation')
def test_allow_decision(self, mock_request, mock_stdout, mock_stdin):
# 1. Mock stdin with a JSON string tool call
mock_stdin.write(json.dumps(self.tool_call))
mock_stdin.seek(0)
# 2. Mock ApiHookClient to return approved
mock_request.return_value = {'approved': True}
# Run main
main()
# 3. Capture stdout and assert allow
output = json.loads(mock_stdout.getvalue().strip())
self.assertEqual(output.get('decision'), 'allow')
@patch('sys.stdin', new_callable=io.StringIO)
@patch('sys.stdout', new_callable=io.StringIO)
@patch('api_hook_client.ApiHookClient.request_confirmation')
def test_deny_decision(self, mock_request, mock_stdout, mock_stdin):
# Mock stdin
mock_stdin.write(json.dumps(self.tool_call))
mock_stdin.seek(0)
# 4. Mock ApiHookClient to return denied
mock_request.return_value = {'approved': False}
main()
# Assert deny
output = json.loads(mock_stdout.getvalue().strip())
self.assertEqual(output.get('decision'), 'deny')
@patch('sys.stdin', new_callable=io.StringIO)
@patch('sys.stdout', new_callable=io.StringIO)
@patch('api_hook_client.ApiHookClient.request_confirmation')
def test_unreachable_hook_server(self, mock_request, mock_stdout, mock_stdin):
# Mock stdin
mock_stdin.write(json.dumps(self.tool_call))
mock_stdin.seek(0)
# 5. Test case where hook server is unreachable (exception)
mock_request.side_effect = Exception("Connection refused")
main()
# Assert deny on error
output = json.loads(mock_stdout.getvalue().strip())
self.assertEqual(output.get('decision'), 'deny')
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,122 @@
import unittest
from unittest.mock import patch, MagicMock
import json
import subprocess
import io
import sys
import os
# Ensure the project root is in sys.path to resolve imports correctly
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from gemini_cli_adapter import GeminiCliAdapter
class TestGeminiCliAdapter(unittest.TestCase):
def setUp(self):
self.adapter = GeminiCliAdapter(binary_path="gemini")
@patch('subprocess.Popen')
def test_send_starts_subprocess_with_correct_args(self, mock_popen):
"""
Verify that send(message) correctly starts the subprocess with
--output-format stream-json and the provided message.
"""
# Setup mock process with a minimal valid JSONL termination
process_mock = MagicMock()
process_mock.stdout = io.StringIO(json.dumps({"type": "result", "usage": {}}) + "\n")
process_mock.poll.return_value = 0
process_mock.wait.return_value = 0
mock_popen.return_value = process_mock
message = "Hello Gemini CLI"
self.adapter.send(message)
# Verify subprocess.Popen call
mock_popen.assert_called_once()
args, kwargs = mock_popen.call_args
cmd = args[0]
# Check mandatory CLI components
self.assertIn("gemini", cmd)
self.assertIn("--output-format", cmd)
self.assertIn("stream-json", cmd)
self.assertIn(message, cmd)
# Check process configuration
self.assertEqual(kwargs.get('stdout'), subprocess.PIPE)
self.assertEqual(kwargs.get('text'), True)
@patch('subprocess.Popen')
def test_send_parses_jsonl_output(self, mock_popen):
"""
Verify that it correctly parses multiple JSONL 'message' events
and returns the combined text.
"""
jsonl_output = [
json.dumps({"type": "message", "text": "The quick brown "}),
json.dumps({"type": "message", "text": "fox jumps."}),
json.dumps({"type": "result", "usage": {"prompt_tokens": 5, "candidates_tokens": 5}})
]
stdout_content = "\n".join(jsonl_output) + "\n"
process_mock = MagicMock()
process_mock.stdout = io.StringIO(stdout_content)
# Mock poll sequence: running, running, finished
process_mock.poll.side_effect = [None, None, 0]
process_mock.wait.return_value = 0
mock_popen.return_value = process_mock
result = self.adapter.send("test message")
self.assertEqual(result, "The quick brown fox jumps.")
@patch('subprocess.Popen')
def test_send_handles_tool_use_events(self, mock_popen):
"""
Verify that it correctly handles 'tool_use' events in the stream
by continuing to read until the final 'result' event.
"""
jsonl_output = [
json.dumps({"type": "message", "text": "Calling tool..."}),
json.dumps({"type": "tool_use", "name": "read_file", "args": {"path": "test.txt"}}),
json.dumps({"type": "message", "text": "\nFile read successfully."}),
json.dumps({"type": "result", "usage": {}})
]
stdout_content = "\n".join(jsonl_output) + "\n"
process_mock = MagicMock()
process_mock.stdout = io.StringIO(stdout_content)
process_mock.poll.side_effect = [None, None, None, 0]
process_mock.wait.return_value = 0
mock_popen.return_value = process_mock
result = self.adapter.send("read test.txt")
# Result should contain the combined text from all 'message' events
self.assertEqual(result, "Calling tool...\nFile read successfully.")
@patch('subprocess.Popen')
def test_send_captures_usage_metadata(self, mock_popen):
"""
Verify that usage data is extracted from the 'result' event.
"""
usage_data = {"total_tokens": 42}
jsonl_output = [
json.dumps({"type": "message", "text": "Finalizing"}),
json.dumps({"type": "result", "usage": usage_data})
]
stdout_content = "\n".join(jsonl_output) + "\n"
process_mock = MagicMock()
process_mock.stdout = io.StringIO(stdout_content)
process_mock.poll.side_effect = [None, 0]
process_mock.wait.return_value = 0
mock_popen.return_value = process_mock
self.adapter.send("usage test")
# Verify the usage was captured in the adapter instance
self.assertEqual(self.adapter.last_usage, usage_data)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,86 @@
import pytest
import time
import os
import sys
import requests
from api_hook_client import ApiHookClient
def test_gemini_cli_full_integration(live_gui):
"""
Integration test for the Gemini CLI provider and tool bridge.
"""
client = ApiHookClient("http://127.0.0.1:8999")
# 1. Setup paths and configure the GUI
mock_script = os.path.abspath("tests/mock_gemini_cli.py")
# Wrap in quotes for shell execution if path has spaces
cli_cmd = f'"{sys.executable}" "{mock_script}"'
# Set provider and binary path via GUI hooks
# Note: Using set_value which now triggers the property setter in gui_2.py
print(f"[TEST] Setting current_provider to gemini_cli")
client.set_value("current_provider", "gemini_cli")
print(f"[TEST] Setting gcli_path to {cli_cmd}")
client.set_value("gcli_path", cli_cmd)
# Verify settings were applied
assert client.get_value("current_provider") == "gemini_cli"
assert client.get_value("gcli_path") == cli_cmd
# Clear events
client.get_events()
# 2. Trigger a message in the GUI
print("[TEST] Sending user message...")
client.set_value("ai_input", "Please read test.txt")
client.click("btn_gen_send")
# 3. Monitor for the 'ask_received' event
print("[TEST] Waiting for ask_received event...")
request_id = None
timeout = 30
start_time = time.time()
while time.time() - start_time < timeout:
events = client.get_events()
if events:
print(f"[TEST] Received {len(events)} events: {[e.get('type') for e in events]}")
for ev in events:
if ev.get("type") == "ask_received":
request_id = ev.get("request_id")
print(f"[TEST] Found request_id: {request_id}")
break
if request_id:
break
time.sleep(0.5)
assert request_id is not None, "Timed out waiting for 'ask_received' event from the bridge"
# 4. Respond to the permission request
print("[TEST] Responding to ask with approval")
resp = requests.post(
"http://127.0.0.1:8999/api/ask/respond",
json={
"request_id": request_id,
"response": {"approved": True}
}
)
assert resp.status_code == 200
# 5. Verify that the final response is displayed in the GUI
print("[TEST] Waiting for final message in history...")
final_message_received = False
start_time = time.time()
while time.time() - start_time < timeout:
session = client.get_session()
entries = session.get("session", {}).get("entries", [])
for entry in entries:
content = entry.get("content", "")
if "Hello from mock!" in content:
print(f"[TEST] Success! Found message: {content[:50]}...")
final_message_received = True
break
if final_message_received:
break
time.sleep(1.0)
assert final_message_received, "Final message from mock CLI was not found in the GUI history"