feat(ai): integrate GeminiCliAdapter into ai_client

This commit is contained in:
2026-02-25 14:02:06 -05:00
parent 211000c926
commit b762a80482
5 changed files with 280 additions and 1 deletions

10
.gemini/settings.json Normal file
View File

@@ -0,0 +1,10 @@
{
"hooks": [
{
"name": "manual-slop-bridge",
"type": "command",
"event": "BeforeTool",
"command": "python C:/projects/manual_slop/scripts/cli_tool_bridge.py"
}
]
}

View File

@@ -23,6 +23,7 @@ import os
import file_cache
import mcp_client
import anthropic
from gemini_cli_adapter import GeminiCliAdapter
from google import genai
from google.genai import types
from events import EventEmitter
@@ -58,6 +59,8 @@ _anthropic_history: list[dict] = []
_anthropic_history_lock = threading.Lock()
_send_lock = threading.Lock()
_gemini_cli_adapter = None
# Injected by gui.py - called when AI wants to run a command.
# Signature: (script: str, base_dir: str) -> str | None
confirm_and_run_callback = None
@@ -253,6 +256,7 @@ def reset_session():
global _gemini_cache_md_hash, _gemini_cache_created_at
global _anthropic_client, _anthropic_history
global _CACHED_ANTHROPIC_TOOLS
global _gemini_cli_adapter
if _gemini_client and _gemini_cache:
try:
_gemini_client.caches.delete(name=_gemini_cache.name)
@@ -263,6 +267,8 @@ def reset_session():
_gemini_cache = None
_gemini_cache_md_hash = None
_gemini_cache_created_at = None
if _gemini_cli_adapter:
_gemini_cli_adapter.session_id = None
_anthropic_client = None
with _anthropic_history_lock:
_anthropic_history = []
@@ -787,8 +793,46 @@ def _send_gemini(md_content: str, user_message: str, base_dir: str,
return "\n\n".join(all_text) if all_text else "(No text returned)"
except Exception as e: raise _classify_gemini_error(e) from e
def _send_gemini_cli(md_content: str, user_message: str, base_dir: str,
file_items: list[dict] | None = None,
discussion_history: str = "") -> str:
global _gemini_cli_adapter
try:
if _gemini_cli_adapter is None:
_gemini_cli_adapter = GeminiCliAdapter(binary_path="gemini")
events.emit("request_start", payload={"provider": "gemini_cli", "model": _model, "round": 0})
# If it's a new session (session_id is None), we should ideally send the context.
# For now, following the simple pattern:
payload = user_message
if _gemini_cli_adapter.session_id is None:
# Prepend context and discussion history to the first message
full_prompt = f"{_get_combined_system_prompt()}\n\n<context>\n{md_content}\n</context>\n\n"
if discussion_history:
full_prompt += f"[DISCUSSION HISTORY]\n\n{discussion_history}\n\n---\n\n"
full_prompt += user_message
payload = full_prompt
_append_comms("OUT", "request", {"message": f"[CLI] [msg {len(payload)}]"})
result_text = _gemini_cli_adapter.send(payload)
usage = _gemini_cli_adapter.last_usage or {}
events.emit("response_received", payload={"provider": "gemini_cli", "model": _model, "usage": usage, "round": 0})
_append_comms("IN", "response", {
"round": 0,
"stop_reason": "STOP",
"text": result_text,
"tool_calls": [],
"usage": usage
})
return result_text
except Exception as e:
# Basic error classification for CLI
raise ProviderError("unknown", "gemini_cli", e)
# ------------------------------------------------------------------ anthropic history management
@@ -1276,6 +1320,8 @@ def send(
with _send_lock:
if _provider == "gemini":
return _send_gemini(md_content, user_message, base_dir, file_items, discussion_history)
elif _provider == "gemini_cli":
return _send_gemini_cli(md_content, user_message, base_dir, file_items, discussion_history)
elif _provider == "anthropic":
return _send_anthropic(md_content, user_message, base_dir, file_items, discussion_history)
raise ValueError(f"unknown provider: {_provider}")

62
gemini_cli_adapter.py Normal file
View File

@@ -0,0 +1,62 @@
import subprocess
import json
import sys
class GeminiCliAdapter:
def __init__(self, binary_path="gemini"):
self.binary_path = binary_path
self.last_usage = None
self.session_id = None
def send(self, message):
"""
Sends a message to the Gemini CLI and processes the streaming JSON output.
"""
command = [self.binary_path, 'run', message, '--output-format', 'stream-json']
if self.session_id:
command.extend(['--resume', self.session_id])
accumulated_text = ""
# Using subprocess.Popen as requested
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
text=True
)
try:
# Read stdout line by line
for line in process.stdout:
line = line.strip()
if not line:
continue
try:
data = json.loads(line)
msg_type = data.get("type")
if msg_type == "message":
# Append message text to results
accumulated_text += data.get("text", "")
elif msg_type == "result":
# Capture final usage and session persistence
self.last_usage = data.get("usage")
self.session_id = data.get("session_id")
elif msg_type in ("status", "tool_use"):
# Log status/tool_use to stderr for debugging
sys.stderr.write(f"GeminiCliAdapter [{msg_type}]: {line}\n")
sys.stderr.flush()
except json.JSONDecodeError:
# Skip lines that are not valid JSON
continue
process.wait()
except Exception as e:
process.kill()
raise e
return accumulated_text

View File

@@ -0,0 +1,39 @@
import pytest
from unittest.mock import MagicMock, patch
import ai_client
def test_ai_client_send_gemini_cli():
"""
Verifies that 'ai_client.send' correctly interacts with 'GeminiCliAdapter'
when the 'gemini_cli' provider is specified.
"""
test_message = "Hello, this is a test prompt for the CLI adapter."
test_response = "This is a dummy response from the Gemini CLI."
# Set provider to gemini_cli
ai_client.set_provider("gemini_cli", "gemini-2.0-flash")
# 1. Mock 'ai_client.GeminiCliAdapter' (which we will add)
with patch('ai_client.GeminiCliAdapter') as MockAdapterClass:
mock_adapter_instance = MockAdapterClass.return_value
mock_adapter_instance.send.return_value = test_response
mock_adapter_instance.last_usage = {"total_tokens": 100}
# Verify that 'events' are emitted correctly
with patch.object(ai_client.events, 'emit') as mock_emit:
response = ai_client.send(
md_content="<context></context>",
user_message=test_message,
base_dir="."
)
# Check that the adapter's send method was called.
mock_adapter_instance.send.assert_called()
# Verify that the expected lifecycle events were emitted.
emitted_event_names = [call.args[0] for call in mock_emit.call_args_list]
assert 'request_start' in emitted_event_names
assert 'response_received' in emitted_event_names
# Verify that the combined text returned by the adapter is returned by 'ai_client.send'.
assert response == test_response

View File

@@ -0,0 +1,122 @@
import unittest
from unittest.mock import patch, MagicMock
import json
import subprocess
import io
import sys
import os
# Ensure the project root is in sys.path to resolve imports correctly
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from gemini_cli_adapter import GeminiCliAdapter
class TestGeminiCliAdapter(unittest.TestCase):
def setUp(self):
self.adapter = GeminiCliAdapter(binary_path="gemini")
@patch('subprocess.Popen')
def test_send_starts_subprocess_with_correct_args(self, mock_popen):
"""
Verify that send(message) correctly starts the subprocess with
--output-format stream-json and the provided message.
"""
# Setup mock process with a minimal valid JSONL termination
process_mock = MagicMock()
process_mock.stdout = io.StringIO(json.dumps({"type": "result", "usage": {}}) + "\n")
process_mock.poll.return_value = 0
process_mock.wait.return_value = 0
mock_popen.return_value = process_mock
message = "Hello Gemini CLI"
self.adapter.send(message)
# Verify subprocess.Popen call
mock_popen.assert_called_once()
args, kwargs = mock_popen.call_args
cmd = args[0]
# Check mandatory CLI components
self.assertIn("gemini", cmd)
self.assertIn("--output-format", cmd)
self.assertIn("stream-json", cmd)
self.assertIn(message, cmd)
# Check process configuration
self.assertEqual(kwargs.get('stdout'), subprocess.PIPE)
self.assertEqual(kwargs.get('text'), True)
@patch('subprocess.Popen')
def test_send_parses_jsonl_output(self, mock_popen):
"""
Verify that it correctly parses multiple JSONL 'message' events
and returns the combined text.
"""
jsonl_output = [
json.dumps({"type": "message", "text": "The quick brown "}),
json.dumps({"type": "message", "text": "fox jumps."}),
json.dumps({"type": "result", "usage": {"prompt_tokens": 5, "candidates_tokens": 5}})
]
stdout_content = "\n".join(jsonl_output) + "\n"
process_mock = MagicMock()
process_mock.stdout = io.StringIO(stdout_content)
# Mock poll sequence: running, running, finished
process_mock.poll.side_effect = [None, None, 0]
process_mock.wait.return_value = 0
mock_popen.return_value = process_mock
result = self.adapter.send("test message")
self.assertEqual(result, "The quick brown fox jumps.")
@patch('subprocess.Popen')
def test_send_handles_tool_use_events(self, mock_popen):
"""
Verify that it correctly handles 'tool_use' events in the stream
by continuing to read until the final 'result' event.
"""
jsonl_output = [
json.dumps({"type": "message", "text": "Calling tool..."}),
json.dumps({"type": "tool_use", "name": "read_file", "args": {"path": "test.txt"}}),
json.dumps({"type": "message", "text": "\nFile read successfully."}),
json.dumps({"type": "result", "usage": {}})
]
stdout_content = "\n".join(jsonl_output) + "\n"
process_mock = MagicMock()
process_mock.stdout = io.StringIO(stdout_content)
process_mock.poll.side_effect = [None, None, None, 0]
process_mock.wait.return_value = 0
mock_popen.return_value = process_mock
result = self.adapter.send("read test.txt")
# Result should contain the combined text from all 'message' events
self.assertEqual(result, "Calling tool...\nFile read successfully.")
@patch('subprocess.Popen')
def test_send_captures_usage_metadata(self, mock_popen):
"""
Verify that usage data is extracted from the 'result' event.
"""
usage_data = {"total_tokens": 42}
jsonl_output = [
json.dumps({"type": "message", "text": "Finalizing"}),
json.dumps({"type": "result", "usage": usage_data})
]
stdout_content = "\n".join(jsonl_output) + "\n"
process_mock = MagicMock()
process_mock.stdout = io.StringIO(stdout_content)
process_mock.poll.side_effect = [None, 0]
process_mock.wait.return_value = 0
mock_popen.return_value = process_mock
self.adapter.send("usage test")
# Verify the usage was captured in the adapter instance
self.assertEqual(self.adapter.last_usage, usage_data)
if __name__ == '__main__':
unittest.main()