chore(conductor): Archive gemini_cli_headless_20260224 track and update tests

This commit is contained in:
2026-02-25 18:39:36 -05:00
parent 1c78febd16
commit 94e41d20ff
15 changed files with 173 additions and 71 deletions

View File

@@ -27,3 +27,6 @@ search_files = true
get_file_summary = true
web_search = true
fetch_url = true
[gemini_cli]
binary_path = "\"C:\\projects\\manual_slop\\.venv\\Scripts\\python.exe\" \"C:\\projects\\manual_slop\\tests\\mock_gemini_cli.py\""

View File

@@ -4,12 +4,12 @@ roles = [
"Vendor API",
"System",
]
active = "main"
active = "testing gemini cli 2"
auto_add = true
[discussions.main]
git_commit = ""
last_updated = "2026-02-25T13:33:02"
last_updated = "2026-02-25T18:29:54"
history = [
"@1772042443.1159382\nUser:\nStress test entry 0 Stress test entry 0 Stress test entry 0 Stress test entry 0 Stress test entry 0",
"@1772042443.1159382\nUser:\nStress test entry 1 Stress test entry 1 Stress test entry 1 Stress test entry 1 Stress test entry 1",
@@ -62,3 +62,27 @@ history = [
"@1772042443.1159382\nUser:\nStress test entry 48 Stress test entry 48 Stress test entry 48 Stress test entry 48 Stress test entry 48",
"@1772042443.1159382\nUser:\nStress test entry 49 Stress test entry 49 Stress test entry 49 Stress test entry 49 Stress test entry 49",
]
[discussions."testing gemini cli"]
git_commit = ""
last_updated = "2026-02-25T18:29:49"
history = [
"@2026-02-25T14:38:14\nUser:\nyo gemini cli you there?",
"@2026-02-25T14:38:21\nAI:\nTool execution was denied. Decision: deny",
"@2026-02-25T14:39:03\nSystem:\n[PERFORMANCE ALERT] Frame time high: 7956.5ms. Please consider optimizing recent changes or reducing load.",
"@2026-02-25T14:39:42\nSystem:\n[PERFORMANCE ALERT] Frame time high: 9193.6ms. Please consider optimizing recent changes or reducing load.",
"@2026-02-25T14:41:03\nSystem:\n[PERFORMANCE ALERT] Frame time high: 6645.0ms. Please consider optimizing recent changes or reducing load.",
"@2026-02-25T14:41:37\nSystem:\n[PERFORMANCE ALERT] Frame time high: 10553.5ms. Please consider optimizing recent changes or reducing load.",
]
[discussions."testing gemini cli 2"]
git_commit = ""
last_updated = "2026-02-25T18:38:17"
history = [
"@2026-02-25T18:35:22\nAI:\nTool execution was denied. Decision: deny",
"@2026-02-25T18:35:44\nUser:\nPlease read test.txt",
"@2026-02-25T18:35:45\nAI:\nI read the file. It contains: 'Hello from mock!'",
"@2026-02-25T18:35:46\nUser:\nDeny me",
"@2026-02-25T18:35:46\nAI:\nTool execution was denied. Decision: deny",
"@2026-02-25T18:38:17\nUser:\nDeny me",
]

View File

@@ -19,11 +19,12 @@ class TestGeminiCliAdapter(unittest.TestCase):
def test_send_starts_subprocess_with_correct_args(self, mock_popen):
"""
Verify that send(message) correctly starts the subprocess with
--output-format stream-json and the provided message.
--output-format stream-json and the provided message via stdin.
"""
# Setup mock process with a minimal valid JSONL termination
process_mock = MagicMock()
process_mock.stdout = io.StringIO(json.dumps({"type": "result", "usage": {}}) + "\n")
process_mock.stdin = MagicMock()
process_mock.poll.return_value = 0
process_mock.wait.return_value = 0
mock_popen.return_value = process_mock
@@ -40,10 +41,16 @@ class TestGeminiCliAdapter(unittest.TestCase):
self.assertIn("gemini", cmd)
self.assertIn("--output-format", cmd)
self.assertIn("stream-json", cmd)
self.assertIn(message, cmd)
# Message should NOT be in cmd now
self.assertNotIn(message, cmd)
# Verify message was written to stdin
process_mock.stdin.write.assert_called_once_with(message)
process_mock.stdin.close.assert_called_once()
# Check process configuration
self.assertEqual(kwargs.get('stdout'), subprocess.PIPE)
self.assertEqual(kwargs.get('stdin'), subprocess.PIPE)
self.assertEqual(kwargs.get('text'), True)
@patch('subprocess.Popen')

View File

@@ -84,3 +84,76 @@ def test_gemini_cli_full_integration(live_gui):
time.sleep(1.0)
assert final_message_received, "Final message from mock CLI was not found in the GUI history"
def test_gemini_cli_rejection_and_history(live_gui):
"""
Integration test for the Gemini CLI provider: Rejection flow and history.
"""
client = ApiHookClient("http://127.0.0.1:8999")
# 1. Setup paths and configure the GUI
mock_script = os.path.abspath("tests/mock_gemini_cli.py")
cli_cmd = f'"{sys.executable}" "{mock_script}"'
client.set_value("current_provider", "gemini_cli")
client.set_value("gcli_path", cli_cmd)
# 2. Trigger a message that will be denied
print("[TEST] Sending user message (to be denied)...")
client.set_value("ai_input", "Deny me")
client.click("btn_gen_send")
# 3. Wait for 'ask_received' and respond with rejection
request_id = None
timeout = 15
start_time = time.time()
while time.time() - start_time < timeout:
for ev in client.get_events():
if ev.get("type") == "ask_received":
request_id = ev.get("request_id")
break
if request_id: break
time.sleep(0.5)
assert request_id is not None
print("[TEST] Responding to ask with REJECTION")
requests.post("http://127.0.0.1:8999/api/ask/respond",
json={"request_id": request_id, "response": {"approved": False}})
# 4. Verify rejection message in history
print("[TEST] Waiting for rejection message in history...")
rejection_found = False
start_time = time.time()
while time.time() - start_time < timeout:
session = client.get_session()
entries = session.get("session", {}).get("entries", [])
for entry in entries:
if "Tool execution was denied. Decision: deny" in entry.get("content", ""):
rejection_found = True
break
if rejection_found: break
time.sleep(1.0)
assert rejection_found, "Rejection message not found in history"
# 5. Send a follow-up message and verify history grows
print("[TEST] Sending follow-up message...")
client.set_value("ai_input", "What happened?")
client.click("btn_gen_send")
# Wait for mock to finish (it will just return a message)
time.sleep(2)
session = client.get_session()
entries = session.get("session", {}).get("entries", [])
# Should have:
# 1. User: Deny me
# 2. AI: Tool execution was denied...
# 3. User: What happened?
# 4. AI: ...
print(f"[TEST] Final history length: {len(entries)}")
for i, entry in enumerate(entries):
print(f" {i}: {entry.get('role')} - {entry.get('content')[:30]}...")
assert len(entries) >= 4

View File

@@ -4,15 +4,10 @@ import requests
import pytest
from api_hook_client import ApiHookClient
def test_api_ask_synchronous_flow(live_gui):
def test_api_ask_client_method(live_gui):
"""
Tests the full synchronous lifecycle of the /api/ask endpoint:
1. A client makes a blocking request.
2. An event is emitted with a unique request_id.
3. A separate agent responds to that request_id.
4. The original blocking request completes with the provided data.
Tests the request_confirmation method in ApiHookClient.
"""
# The live_gui fixture starts the Manual Slop application with hooks on 8999.
client = ApiHookClient("http://127.0.0.1:8999")
# Drain existing events
@@ -22,14 +17,11 @@ def test_api_ask_synchronous_flow(live_gui):
def make_blocking_request():
try:
# This POST will block until we call /api/ask/respond
# Note: /api/ask returns {'status': 'ok', 'response': ...}
resp = requests.post(
"http://127.0.0.1:8999/api/ask",
json={"prompt": "Should we proceed with the refactor?"},
timeout=10
# This call should block until we respond
results["response"] = client.request_confirmation(
tool_name="powershell",
args={"command": "echo hello"}
)
results["response"] = resp.json()
except Exception as e:
results["error"] = str(e)
@@ -37,7 +29,7 @@ def test_api_ask_synchronous_flow(live_gui):
t = threading.Thread(target=make_blocking_request)
t.start()
# Poll for the 'ask_received' event to find the generated request_id
# Poll for the 'ask_received' event
request_id = None
start_time = time.time()
while time.time() - start_time < 5:
@@ -52,8 +44,8 @@ def test_api_ask_synchronous_flow(live_gui):
assert request_id is not None, "Timed out waiting for 'ask_received' event"
# Respond to the task via the respond endpoint
expected_response = {"approved": True, "message": "Proceeding as requested."}
# Respond
expected_response = {"approved": True}
resp = requests.post(
"http://127.0.0.1:8999/api/ask/respond",
json={
@@ -63,11 +55,7 @@ def test_api_ask_synchronous_flow(live_gui):
)
assert resp.status_code == 200
# Join the thread and verify the original request received the correct data
t.join(timeout=5)
assert not t.is_alive(), "Background thread failed to unblock"
assert results["error"] is None, f"Request failed: {results['error']}"
# The /api/ask endpoint returns {'status': 'ok', 'response': expected_response}
assert results["response"]["status"] == "ok"
assert results["response"]["response"] == expected_response
assert not t.is_alive()
assert results["error"] is None
assert results["response"] == expected_response