teststests in wrong spot.
This commit is contained in:
@@ -18,7 +18,7 @@ history = [
|
||||
|
||||
[discussions.AutoDisc]
|
||||
git_commit = ""
|
||||
last_updated = "2026-02-27T19:27:19"
|
||||
last_updated = "2026-02-27T23:54:05"
|
||||
history = [
|
||||
"@2026-02-27T19:08:37\nSystem:\n[PERFORMANCE ALERT] Frame time high: 62.2ms. Please consider optimizing recent changes or reducing load.",
|
||||
]
|
||||
|
||||
41
tests/test_ai_client_cli.py
Normal file
41
tests/test_ai_client_cli.py
Normal file
@@ -0,0 +1,41 @@
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, patch
|
||||
import ai_client
|
||||
|
||||
def test_ai_client_send_gemini_cli():
|
||||
"""
|
||||
Verifies that 'ai_client.send' correctly interacts with 'GeminiCliAdapter'
|
||||
when the 'gemini_cli' provider is specified.
|
||||
"""
|
||||
test_message = "Hello, this is a test prompt for the CLI adapter."
|
||||
test_response = "This is a dummy response from the Gemini CLI."
|
||||
|
||||
# Set provider to gemini_cli
|
||||
ai_client.set_provider("gemini_cli", "gemini-2.5-flash-lite")
|
||||
|
||||
# 1. Mock 'ai_client.GeminiCliAdapter' (which we will add)
|
||||
with patch('ai_client.GeminiCliAdapter') as MockAdapterClass:
|
||||
mock_adapter_instance = MockAdapterClass.return_value
|
||||
mock_adapter_instance.send.return_value = {"text": test_response, "tool_calls": []}
|
||||
mock_adapter_instance.last_usage = {"total_tokens": 100}
|
||||
mock_adapter_instance.last_latency = 0.5
|
||||
mock_adapter_instance.session_id = "test-session"
|
||||
|
||||
# Verify that 'events' are emitted correctly
|
||||
with patch.object(ai_client.events, 'emit') as mock_emit:
|
||||
response = ai_client.send(
|
||||
md_content="<context></context>",
|
||||
user_message=test_message,
|
||||
base_dir="."
|
||||
)
|
||||
|
||||
# Check that the adapter's send method was called.
|
||||
mock_adapter_instance.send.assert_called()
|
||||
|
||||
# Verify that the expected lifecycle events were emitted.
|
||||
emitted_event_names = [call.args[0] for call in mock_emit.call_args_list]
|
||||
assert 'request_start' in emitted_event_names
|
||||
assert 'response_received' in emitted_event_names
|
||||
|
||||
# Verify that the combined text returned by the adapter is returned by 'ai_client.send'.
|
||||
assert response == test_response
|
||||
75
tests/test_cli_tool_bridge.py
Normal file
75
tests/test_cli_tool_bridge.py
Normal file
@@ -0,0 +1,75 @@
|
||||
import unittest
|
||||
from unittest.mock import patch, MagicMock
|
||||
import io
|
||||
import json
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Add project root to sys.path
|
||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
|
||||
|
||||
# Import after path fix
|
||||
from scripts.cli_tool_bridge import main
|
||||
|
||||
class TestCliToolBridge(unittest.TestCase):
|
||||
def setUp(self):
|
||||
os.environ['GEMINI_CLI_HOOK_CONTEXT'] = 'manual_slop'
|
||||
self.tool_call = {
|
||||
'tool_name': 'read_file',
|
||||
'tool_input': {'path': 'test.txt'}
|
||||
}
|
||||
|
||||
@patch('sys.stdin', new_callable=io.StringIO)
|
||||
@patch('sys.stdout', new_callable=io.StringIO)
|
||||
@patch('api_hook_client.ApiHookClient.request_confirmation')
|
||||
def test_allow_decision(self, mock_request, mock_stdout, mock_stdin):
|
||||
# 1. Mock stdin with a JSON string tool call
|
||||
mock_stdin.write(json.dumps(self.tool_call))
|
||||
mock_stdin.seek(0)
|
||||
|
||||
# 2. Mock ApiHookClient to return approved
|
||||
mock_request.return_value = {'approved': True}
|
||||
|
||||
# Run main
|
||||
main()
|
||||
|
||||
# 3. Capture stdout and assert allow
|
||||
output = json.loads(mock_stdout.getvalue().strip())
|
||||
self.assertEqual(output.get('decision'), 'allow')
|
||||
|
||||
@patch('sys.stdin', new_callable=io.StringIO)
|
||||
@patch('sys.stdout', new_callable=io.StringIO)
|
||||
@patch('api_hook_client.ApiHookClient.request_confirmation')
|
||||
def test_deny_decision(self, mock_request, mock_stdout, mock_stdin):
|
||||
# Mock stdin
|
||||
mock_stdin.write(json.dumps(self.tool_call))
|
||||
mock_stdin.seek(0)
|
||||
|
||||
# 4. Mock ApiHookClient to return denied
|
||||
mock_request.return_value = {'approved': False}
|
||||
|
||||
main()
|
||||
|
||||
# Assert deny
|
||||
output = json.loads(mock_stdout.getvalue().strip())
|
||||
self.assertEqual(output.get('decision'), 'deny')
|
||||
|
||||
@patch('sys.stdin', new_callable=io.StringIO)
|
||||
@patch('sys.stdout', new_callable=io.StringIO)
|
||||
@patch('api_hook_client.ApiHookClient.request_confirmation')
|
||||
def test_unreachable_hook_server(self, mock_request, mock_stdout, mock_stdin):
|
||||
# Mock stdin
|
||||
mock_stdin.write(json.dumps(self.tool_call))
|
||||
mock_stdin.seek(0)
|
||||
|
||||
# 5. Test case where hook server is unreachable (exception)
|
||||
mock_request.side_effect = Exception("Connection refused")
|
||||
|
||||
main()
|
||||
|
||||
# Assert deny on error
|
||||
output = json.loads(mock_stdout.getvalue().strip())
|
||||
self.assertEqual(output.get('decision'), 'deny')
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
53
tests/test_cli_tool_bridge_mapping.py
Normal file
53
tests/test_cli_tool_bridge_mapping.py
Normal file
@@ -0,0 +1,53 @@
|
||||
import unittest
|
||||
from unittest.mock import patch, MagicMock
|
||||
import io
|
||||
import json
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Add project root to sys.path
|
||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
|
||||
|
||||
# Import after path fix
|
||||
from scripts.cli_tool_bridge import main
|
||||
|
||||
class TestCliToolBridgeMapping(unittest.TestCase):
|
||||
def setUp(self):
|
||||
os.environ['GEMINI_CLI_HOOK_CONTEXT'] = 'manual_slop'
|
||||
|
||||
@patch('sys.stdin', new_callable=io.StringIO)
|
||||
@patch('sys.stdout', new_callable=io.StringIO)
|
||||
@patch('api_hook_client.ApiHookClient.request_confirmation')
|
||||
def test_mapping_from_api_format(self, mock_request, mock_stdout, mock_stdin):
|
||||
"""
|
||||
Verify that bridge correctly maps 'id', 'name', 'input' (Gemini API format)
|
||||
into tool_name and tool_input for the hook client.
|
||||
"""
|
||||
api_tool_call = {
|
||||
'id': 'call123',
|
||||
'name': 'read_file',
|
||||
'input': {'path': 'test.txt'}
|
||||
}
|
||||
|
||||
# 1. Mock stdin with the API format JSON
|
||||
mock_stdin.write(json.dumps(api_tool_call))
|
||||
mock_stdin.seek(0)
|
||||
|
||||
# 2. Mock ApiHookClient to return approved
|
||||
mock_request.return_value = {'approved': True}
|
||||
|
||||
# Run main
|
||||
main()
|
||||
|
||||
# 3. Verify that request_confirmation was called with mapped values
|
||||
# If it's not mapped, it will likely be called with None or fail
|
||||
mock_request.assert_called_once_with('read_file', {'path': 'test.txt'})
|
||||
|
||||
# 4. Capture stdout and assert allow
|
||||
output_str = mock_stdout.getvalue().strip()
|
||||
self.assertTrue(output_str, "Stdout should not be empty")
|
||||
output = json.loads(output_str)
|
||||
self.assertEqual(output.get('decision'), 'allow')
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
130
tests/test_gemini_cli_adapter.py
Normal file
130
tests/test_gemini_cli_adapter.py
Normal file
@@ -0,0 +1,130 @@
|
||||
import unittest
|
||||
from unittest.mock import patch, MagicMock
|
||||
import json
|
||||
import subprocess
|
||||
import io
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Ensure the project root is in sys.path to resolve imports correctly
|
||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
|
||||
|
||||
from gemini_cli_adapter import GeminiCliAdapter
|
||||
|
||||
class TestGeminiCliAdapter(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.adapter = GeminiCliAdapter(binary_path="gemini")
|
||||
|
||||
@patch('subprocess.Popen')
|
||||
def test_send_starts_subprocess_with_correct_args(self, mock_popen):
|
||||
"""
|
||||
Verify that send(message) correctly starts the subprocess with
|
||||
--output-format stream-json and the provided message via stdin using communicate.
|
||||
"""
|
||||
# Setup mock process with a minimal valid JSONL termination
|
||||
process_mock = MagicMock()
|
||||
stdout_content = json.dumps({"type": "result", "usage": {}}) + "\n"
|
||||
process_mock.communicate.return_value = (stdout_content, "")
|
||||
process_mock.poll.return_value = 0
|
||||
process_mock.wait.return_value = 0
|
||||
mock_popen.return_value = process_mock
|
||||
|
||||
message = "Hello Gemini CLI"
|
||||
self.adapter.send(message)
|
||||
|
||||
# Verify subprocess.Popen call
|
||||
mock_popen.assert_called_once()
|
||||
args, kwargs = mock_popen.call_args
|
||||
cmd = args[0]
|
||||
|
||||
# Check mandatory CLI components
|
||||
self.assertIn("gemini", cmd)
|
||||
self.assertIn("--output-format", cmd)
|
||||
self.assertIn("stream-json", cmd)
|
||||
# Message should NOT be in cmd now
|
||||
self.assertNotIn(message, cmd)
|
||||
|
||||
# Verify message was sent via communicate
|
||||
process_mock.communicate.assert_called_once_with(input=message)
|
||||
|
||||
# Check process configuration
|
||||
self.assertEqual(kwargs.get('stdout'), subprocess.PIPE)
|
||||
self.assertEqual(kwargs.get('stdin'), subprocess.PIPE)
|
||||
self.assertEqual(kwargs.get('text'), True)
|
||||
|
||||
@patch('subprocess.Popen')
|
||||
def test_send_parses_jsonl_output(self, mock_popen):
|
||||
"""
|
||||
Verify that it correctly parses multiple JSONL 'message' events
|
||||
and returns the combined text.
|
||||
"""
|
||||
jsonl_output = [
|
||||
json.dumps({"type": "message", "role": "model", "text": "The quick brown "}),
|
||||
json.dumps({"type": "message", "role": "model", "text": "fox jumps."}),
|
||||
json.dumps({"type": "result", "usage": {"prompt_tokens": 5, "candidates_tokens": 5}})
|
||||
]
|
||||
stdout_content = "\n".join(jsonl_output) + "\n"
|
||||
|
||||
process_mock = MagicMock()
|
||||
process_mock.communicate.return_value = (stdout_content, "")
|
||||
process_mock.poll.return_value = 0
|
||||
process_mock.wait.return_value = 0
|
||||
mock_popen.return_value = process_mock
|
||||
|
||||
result = self.adapter.send("test message")
|
||||
|
||||
self.assertEqual(result["text"], "The quick brown fox jumps.")
|
||||
self.assertEqual(result["tool_calls"], [])
|
||||
|
||||
@patch('subprocess.Popen')
|
||||
def test_send_handles_tool_use_events(self, mock_popen):
|
||||
"""
|
||||
Verify that it correctly handles 'tool_use' events in the stream
|
||||
by continuing to read until the final 'result' event.
|
||||
"""
|
||||
jsonl_output = [
|
||||
json.dumps({"type": "message", "role": "assistant", "text": "Calling tool..."}),
|
||||
json.dumps({"type": "tool_use", "name": "read_file", "args": {"path": "test.txt"}}),
|
||||
json.dumps({"type": "message", "role": "assistant", "text": "\nFile read successfully."}),
|
||||
json.dumps({"type": "result", "usage": {}})
|
||||
]
|
||||
stdout_content = "\n".join(jsonl_output) + "\n"
|
||||
|
||||
process_mock = MagicMock()
|
||||
process_mock.communicate.return_value = (stdout_content, "")
|
||||
process_mock.poll.return_value = 0
|
||||
process_mock.wait.return_value = 0
|
||||
mock_popen.return_value = process_mock
|
||||
|
||||
result = self.adapter.send("read test.txt")
|
||||
|
||||
# Result should contain the combined text from all 'message' events
|
||||
self.assertEqual(result["text"], "Calling tool...\nFile read successfully.")
|
||||
self.assertEqual(len(result["tool_calls"]), 1)
|
||||
self.assertEqual(result["tool_calls"][0]["name"], "read_file")
|
||||
|
||||
@patch('subprocess.Popen')
|
||||
def test_send_captures_usage_metadata(self, mock_popen):
|
||||
"""
|
||||
Verify that usage data is extracted from the 'result' event.
|
||||
"""
|
||||
usage_data = {"total_tokens": 42}
|
||||
jsonl_output = [
|
||||
json.dumps({"type": "message", "text": "Finalizing"}),
|
||||
json.dumps({"type": "result", "usage": usage_data})
|
||||
]
|
||||
stdout_content = "\n".join(jsonl_output) + "\n"
|
||||
|
||||
process_mock = MagicMock()
|
||||
process_mock.communicate.return_value = (stdout_content, "")
|
||||
process_mock.poll.return_value = 0
|
||||
process_mock.wait.return_value = 0
|
||||
mock_popen.return_value = process_mock
|
||||
|
||||
self.adapter.send("usage test")
|
||||
|
||||
# Verify the usage was captured in the adapter instance
|
||||
self.assertEqual(self.adapter.last_usage, usage_data)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
176
tests/test_gemini_cli_adapter_parity.py
Normal file
176
tests/test_gemini_cli_adapter_parity.py
Normal file
@@ -0,0 +1,176 @@
|
||||
import unittest
|
||||
from unittest.mock import patch, MagicMock, ANY
|
||||
import json
|
||||
import subprocess
|
||||
import io
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Ensure the project root is in sys.path to resolve imports correctly
|
||||
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
|
||||
if project_root not in sys.path:
|
||||
sys.path.append(project_root)
|
||||
|
||||
# Import the class to be tested
|
||||
from gemini_cli_adapter import GeminiCliAdapter
|
||||
|
||||
class TestGeminiCliAdapterParity(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
"""Set up a fresh adapter instance and reset session state for each test."""
|
||||
# Patch session_logger to prevent file operations during tests
|
||||
self.session_logger_patcher = patch('gemini_cli_adapter.session_logger')
|
||||
self.mock_session_logger = self.session_logger_patcher.start()
|
||||
|
||||
self.adapter = GeminiCliAdapter(binary_path="gemini")
|
||||
self.adapter.session_id = None
|
||||
self.adapter.last_usage = None
|
||||
self.adapter.last_latency = 0.0
|
||||
|
||||
def tearDown(self):
|
||||
self.session_logger_patcher.stop()
|
||||
|
||||
@patch('subprocess.Popen')
|
||||
def test_count_tokens_uses_estimation(self, mock_popen):
|
||||
"""
|
||||
Test that count_tokens uses character-based estimation.
|
||||
"""
|
||||
contents_to_count = ["This is the first line.", "This is the second line."]
|
||||
expected_chars = len("\n".join(contents_to_count))
|
||||
expected_tokens = expected_chars // 4
|
||||
|
||||
token_count = self.adapter.count_tokens(contents=contents_to_count)
|
||||
self.assertEqual(token_count, expected_tokens)
|
||||
|
||||
# Verify that NO subprocess was started for counting
|
||||
mock_popen.assert_not_called()
|
||||
|
||||
@patch('subprocess.Popen')
|
||||
def test_send_with_safety_settings_no_flags_added(self, mock_popen):
|
||||
"""
|
||||
Test that the send method does NOT add --safety flags when safety_settings are provided,
|
||||
as this functionality is no longer supported via CLI flags.
|
||||
"""
|
||||
process_mock = MagicMock()
|
||||
mock_stdout_content = json.dumps({"type": "result", "usage": {}}) + "\n"
|
||||
process_mock.communicate.return_value = (mock_stdout_content, "")
|
||||
process_mock.returncode = 0
|
||||
mock_popen.return_value = process_mock
|
||||
|
||||
message_content = "User's prompt here."
|
||||
safety_settings = [
|
||||
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_ONLY_HIGH"},
|
||||
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}
|
||||
]
|
||||
|
||||
self.adapter.send(message=message_content, safety_settings=safety_settings)
|
||||
|
||||
args, kwargs = mock_popen.call_args
|
||||
command = args[0]
|
||||
|
||||
# Verify that no --safety flags were added to the command
|
||||
self.assertNotIn("--safety", command)
|
||||
# Verify that the message was passed correctly via stdin
|
||||
process_mock.communicate.assert_called_once_with(input=message_content)
|
||||
|
||||
@patch('subprocess.Popen')
|
||||
def test_send_without_safety_settings_no_flags(self, mock_popen):
|
||||
"""
|
||||
Test that when safety_settings is None or an empty list, no --safety flags are added.
|
||||
"""
|
||||
process_mock = MagicMock()
|
||||
mock_stdout_content = json.dumps({"type": "result", "usage": {}}) + "\n"
|
||||
process_mock.communicate.return_value = (mock_stdout_content, "")
|
||||
process_mock.returncode = 0
|
||||
mock_popen.return_value = process_mock
|
||||
|
||||
message_content = "Another prompt."
|
||||
|
||||
self.adapter.send(message=message_content, safety_settings=None)
|
||||
args_none, _ = mock_popen.call_args
|
||||
self.assertNotIn("--safety", args_none[0])
|
||||
mock_popen.reset_mock()
|
||||
|
||||
self.adapter.send(message=message_content, safety_settings=[])
|
||||
args_empty, _ = mock_popen.call_args
|
||||
self.assertNotIn("--safety", args_empty[0])
|
||||
|
||||
@patch('subprocess.Popen')
|
||||
def test_send_with_system_instruction_prepended_to_stdin(self, mock_popen):
|
||||
"""
|
||||
Test that the send method prepends the system instruction to the prompt
|
||||
sent via stdin, and does NOT add a --system flag to the command.
|
||||
"""
|
||||
process_mock = MagicMock()
|
||||
mock_stdout_content = json.dumps({"type": "result", "usage": {}}) + "\n"
|
||||
process_mock.communicate.return_value = (mock_stdout_content, "")
|
||||
process_mock.returncode = 0
|
||||
mock_popen.return_value = process_mock
|
||||
|
||||
message_content = "User's prompt here."
|
||||
system_instruction_text = "Some instruction"
|
||||
expected_input = f"{system_instruction_text}\n\n{message_content}"
|
||||
|
||||
self.adapter.send(message=message_content, system_instruction=system_instruction_text)
|
||||
|
||||
args, kwargs = mock_popen.call_args
|
||||
command = args[0]
|
||||
|
||||
# Verify that the system instruction was prepended to the input sent to communicate
|
||||
process_mock.communicate.assert_called_once_with(input=expected_input)
|
||||
|
||||
# Verify that no --system flag was added to the command
|
||||
self.assertNotIn("--system", command)
|
||||
|
||||
@patch('subprocess.Popen')
|
||||
def test_send_with_model_parameter(self, mock_popen):
|
||||
"""
|
||||
Test that the send method correctly adds the -m <model> flag when a model is specified.
|
||||
"""
|
||||
process_mock = MagicMock()
|
||||
mock_stdout_content = json.dumps({"type": "result", "usage": {}}) + "\n"
|
||||
process_mock.communicate.return_value = (mock_stdout_content, "")
|
||||
process_mock.returncode = 0
|
||||
mock_popen.return_value = process_mock
|
||||
|
||||
message_content = "User's prompt here."
|
||||
model_name = "gemini-1.5-flash"
|
||||
expected_command_part = f'-m "{model_name}"'
|
||||
|
||||
self.adapter.send(message=message_content, model=model_name)
|
||||
|
||||
args, kwargs = mock_popen.call_args
|
||||
command = args[0]
|
||||
|
||||
# Verify that the -m <model> flag was added to the command
|
||||
self.assertIn(expected_command_part, command)
|
||||
# Verify that the message was passed correctly via stdin
|
||||
process_mock.communicate.assert_called_once_with(input=message_content)
|
||||
|
||||
@patch('subprocess.Popen')
|
||||
def test_send_kills_process_on_communicate_exception(self, mock_popen):
|
||||
"""
|
||||
Test that if subprocess.Popen().communicate() raises an exception,
|
||||
GeminiCliAdapter.send() kills the process and re-raises the exception.
|
||||
"""
|
||||
mock_process = MagicMock()
|
||||
mock_popen.return_value = mock_process
|
||||
|
||||
# Define an exception to simulate
|
||||
simulated_exception = RuntimeError("Simulated communicate error")
|
||||
mock_process.communicate.side_effect = simulated_exception
|
||||
|
||||
message_content = "User message"
|
||||
|
||||
# Assert that the exception is raised and process is killed
|
||||
with self.assertRaises(RuntimeError) as cm:
|
||||
self.adapter.send(message=message_content)
|
||||
|
||||
# Verify that the process's kill method was called
|
||||
mock_process.kill.assert_called_once()
|
||||
|
||||
# Verify that the correct exception was re-raised
|
||||
self.assertIs(cm.exception, simulated_exception)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
170
tests/test_gemini_cli_edge_cases.py
Normal file
170
tests/test_gemini_cli_edge_cases.py
Normal file
@@ -0,0 +1,170 @@
|
||||
import pytest
|
||||
import time
|
||||
import os
|
||||
import sys
|
||||
import requests
|
||||
import json
|
||||
from api_hook_client import ApiHookClient
|
||||
|
||||
def test_gemini_cli_context_bleed_prevention(live_gui):
|
||||
"""
|
||||
Test that the GeminiCliAdapter correctly filters out echoed 'user' messages
|
||||
and only shows assistant content in the GUI history.
|
||||
"""
|
||||
client = ApiHookClient("http://127.0.0.1:8999")
|
||||
client.click("btn_reset")
|
||||
client.set_value("auto_add_history", True)
|
||||
|
||||
# Create a specialized mock for context bleed
|
||||
bleed_mock = os.path.abspath("tests/mock_context_bleed.py")
|
||||
with open(bleed_mock, "w") as f:
|
||||
f.write('''import sys, json
|
||||
print(json.dumps({"type": "init", "session_id": "bleed-test"}), flush=True)
|
||||
print(json.dumps({"type": "message", "role": "user", "content": "I am echoing you"}), flush=True)
|
||||
print(json.dumps({"type": "message", "role": "assistant", "content": "Actual AI Response"}), flush=True)
|
||||
print(json.dumps({"type": "result", "stats": {"total_tokens": 10}}), flush=True)
|
||||
''')
|
||||
|
||||
cli_cmd = f'"{sys.executable}" "{bleed_mock}"'
|
||||
client.set_value("current_provider", "gemini_cli")
|
||||
client.set_value("gcli_path", cli_cmd)
|
||||
|
||||
client.set_value("ai_input", "Test context bleed")
|
||||
client.click("btn_gen_send")
|
||||
|
||||
# Wait for completion
|
||||
time.sleep(3)
|
||||
|
||||
session = client.get_session()
|
||||
entries = session.get("session", {}).get("entries", [])
|
||||
|
||||
# Verify: We expect exactly one AI entry, and it must NOT contain the echoed user message
|
||||
ai_entries = [e for e in entries if e.get("role") == "AI"]
|
||||
assert len(ai_entries) == 1
|
||||
assert ai_entries[0].get("content") == "Actual AI Response"
|
||||
assert "echoing you" not in ai_entries[0].get("content")
|
||||
|
||||
os.remove(bleed_mock)
|
||||
|
||||
def test_gemini_cli_parameter_resilience(live_gui):
|
||||
"""
|
||||
Test that mcp_client correctly handles 'file_path' and 'dir_path' aliases
|
||||
sent by the AI instead of 'path'.
|
||||
"""
|
||||
client = ApiHookClient("http://127.0.0.1:8999")
|
||||
client.click("btn_reset")
|
||||
client.set_value("auto_add_history", True)
|
||||
client.select_list_item("proj_files", "manual_slop")
|
||||
|
||||
# Create a mock that uses dir_path for list_directory
|
||||
alias_mock = os.path.abspath("tests/mock_alias_tool.py")
|
||||
bridge_path = os.path.abspath("scripts/cli_tool_bridge.py")
|
||||
# Avoid backslashes in f-string expression part
|
||||
if sys.platform == "win32":
|
||||
bridge_path_str = bridge_path.replace("\\", "/")
|
||||
else:
|
||||
bridge_path_str = bridge_path
|
||||
|
||||
with open(alias_tool_content := "tests/mock_alias_tool.py", "w") as f:
|
||||
f.write(f'''import sys, json, os, subprocess
|
||||
prompt = sys.stdin.read()
|
||||
if '"role": "tool"' in prompt:
|
||||
print(json.dumps({{"type": "message", "role": "assistant", "content": "Tool worked!"}}), flush=True)
|
||||
print(json.dumps({{"type": "result", "stats": {{"total_tokens": 20}}}}), flush=True)
|
||||
else:
|
||||
# We must call the bridge to trigger the GUI approval!
|
||||
tool_call = {{"name": "list_directory", "input": {{"dir_path": "."}}}}
|
||||
bridge_cmd = [sys.executable, "{bridge_path_str}"]
|
||||
proc = subprocess.Popen(bridge_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True)
|
||||
stdout, _ = proc.communicate(input=json.dumps(tool_call))
|
||||
|
||||
# Even if bridge says allow, we emit the tool_use to the adapter
|
||||
print(json.dumps({{"type": "message", "role": "assistant", "content": "I will list the directory."}}), flush=True)
|
||||
print(json.dumps({{
|
||||
"type": "tool_use",
|
||||
"name": "list_directory",
|
||||
"id": "alias_call",
|
||||
"args": {{"dir_path": "."}}
|
||||
}}), flush=True)
|
||||
print(json.dumps({{"type": "result", "stats": {{"total_tokens": 10}}}}), flush=True)
|
||||
''')
|
||||
|
||||
cli_cmd = f'"{sys.executable}" "{alias_mock}"'
|
||||
client.set_value("current_provider", "gemini_cli")
|
||||
client.set_value("gcli_path", cli_cmd)
|
||||
|
||||
client.set_value("ai_input", "Test parameter aliases")
|
||||
client.click("btn_gen_send")
|
||||
|
||||
# Handle approval
|
||||
timeout = 15
|
||||
start_time = time.time()
|
||||
approved = False
|
||||
while time.time() - start_time < timeout:
|
||||
for ev in client.get_events():
|
||||
if ev.get("type") == "ask_received":
|
||||
requests.post("http://127.0.0.1:8999/api/ask/respond",
|
||||
json={"request_id": ev.get("request_id"), "response": {"approved": True}})
|
||||
approved = True
|
||||
if approved: break
|
||||
time.sleep(0.5)
|
||||
|
||||
assert approved, "Tool approval event never received"
|
||||
|
||||
# Verify tool result in history
|
||||
time.sleep(2)
|
||||
session = client.get_session()
|
||||
entries = session.get("session", {}).get("entries", [])
|
||||
|
||||
# Check for "Tool worked!" which implies the tool execution was successful
|
||||
found = any("Tool worked!" in e.get("content", "") for e in entries)
|
||||
assert found, "Tool result indicating success not found in history"
|
||||
|
||||
os.remove(alias_mock)
|
||||
|
||||
def test_gemini_cli_loop_termination(live_gui):
|
||||
"""
|
||||
Test that multi-round tool calling correctly terminates and preserves
|
||||
payload (session context) between rounds.
|
||||
"""
|
||||
client = ApiHookClient("http://127.0.0.1:8999")
|
||||
client.click("btn_reset")
|
||||
client.set_value("auto_add_history", True)
|
||||
client.select_list_item("proj_files", "manual_slop")
|
||||
|
||||
# This uses the existing mock_gemini_cli.py which is already designed for 2 rounds
|
||||
mock_script = os.path.abspath("tests/mock_gemini_cli.py")
|
||||
cli_cmd = f'"{sys.executable}" "{mock_script}"'
|
||||
client.set_value("current_provider", "gemini_cli")
|
||||
client.set_value("gcli_path", cli_cmd)
|
||||
|
||||
client.set_value("ai_input", "Perform multi-round tool test")
|
||||
client.click("btn_gen_send")
|
||||
|
||||
# Handle approvals (mock does one tool call)
|
||||
timeout = 20
|
||||
start_time = time.time()
|
||||
approved = False
|
||||
while time.time() - start_time < timeout:
|
||||
for ev in client.get_events():
|
||||
if ev.get("type") == "ask_received":
|
||||
requests.post("http://127.0.0.1:8999/api/ask/respond",
|
||||
json={"request_id": ev.get("request_id"), "response": {"approved": True}})
|
||||
approved = True
|
||||
if approved: break
|
||||
time.sleep(0.5)
|
||||
|
||||
# Wait for the second round and final answer
|
||||
found_final = False
|
||||
start_time = time.time()
|
||||
while time.time() - start_time < 15:
|
||||
session = client.get_session()
|
||||
entries = session.get("session", {}).get("entries", [])
|
||||
for e in entries:
|
||||
if "processed the tool results" in e.get("content", ""):
|
||||
found_final = True
|
||||
break
|
||||
if found_final: break
|
||||
time.sleep(1)
|
||||
|
||||
assert found_final, "Final message after multi-round tool loop not found"
|
||||
141
tests/test_gemini_cli_integration.py
Normal file
141
tests/test_gemini_cli_integration.py
Normal file
@@ -0,0 +1,141 @@
|
||||
import pytest
|
||||
import time
|
||||
import os
|
||||
import sys
|
||||
import requests
|
||||
from api_hook_client import ApiHookClient
|
||||
|
||||
def test_gemini_cli_full_integration(live_gui):
|
||||
"""
|
||||
Integration test for the Gemini CLI provider and tool bridge.
|
||||
Handles 'ask_received' events from the bridge and any other approval requests.
|
||||
"""
|
||||
client = ApiHookClient("http://127.0.0.1:8999")
|
||||
|
||||
# 0. Reset session and enable history
|
||||
client.click("btn_reset")
|
||||
client.set_value("auto_add_history", True)
|
||||
# Switch to manual_slop project explicitly
|
||||
client.select_list_item("proj_files", "manual_slop")
|
||||
|
||||
# 1. Setup paths and configure the GUI
|
||||
# Use the real gemini CLI if available, otherwise use mock
|
||||
# For CI/testing we prefer mock
|
||||
mock_script = os.path.abspath("tests/mock_gemini_cli.py")
|
||||
cli_cmd = f'"{sys.executable}" "{mock_script}"'
|
||||
|
||||
print(f"[TEST] Setting current_provider to gemini_cli")
|
||||
client.set_value("current_provider", "gemini_cli")
|
||||
print(f"[TEST] Setting gcli_path to {cli_cmd}")
|
||||
client.set_value("gcli_path", cli_cmd)
|
||||
|
||||
# Verify settings
|
||||
assert client.get_value("current_provider") == "gemini_cli"
|
||||
|
||||
# Clear events
|
||||
client.get_events()
|
||||
|
||||
# 2. Trigger a message in the GUI
|
||||
print("[TEST] Sending user message...")
|
||||
client.set_value("ai_input", "Please read test.txt")
|
||||
client.click("btn_gen_send")
|
||||
|
||||
# 3. Monitor for approval events
|
||||
print("[TEST] Waiting for approval events...")
|
||||
timeout = 45
|
||||
start_time = time.time()
|
||||
approved_count = 0
|
||||
|
||||
while time.time() - start_time < timeout:
|
||||
events = client.get_events()
|
||||
if events:
|
||||
for ev in events:
|
||||
etype = ev.get("type")
|
||||
eid = ev.get("request_id") or ev.get("action_id")
|
||||
print(f"[TEST] Received event: {etype} (ID: {eid})")
|
||||
|
||||
if etype in ["ask_received", "glob_approval_required", "script_confirmation_required"]:
|
||||
print(f"[TEST] Approving {etype} {eid}")
|
||||
if etype == "script_confirmation_required":
|
||||
resp = requests.post(f"http://127.0.0.1:8999/api/confirm/{eid}", json={"approved": True})
|
||||
else:
|
||||
resp = requests.post("http://127.0.0.1:8999/api/ask/respond",
|
||||
json={"request_id": eid, "response": {"approved": True}})
|
||||
assert resp.status_code == 200
|
||||
approved_count += 1
|
||||
|
||||
# Check if we got a final response in history
|
||||
session = client.get_session()
|
||||
entries = session.get("session", {}).get("entries", [])
|
||||
found_final = False
|
||||
for entry in entries:
|
||||
content = entry.get("content", "")
|
||||
if "Hello from mock!" in content or "processed the tool results" in content:
|
||||
print(f"[TEST] Success! Found final message in history.")
|
||||
found_final = True
|
||||
break
|
||||
|
||||
if found_final:
|
||||
break
|
||||
|
||||
time.sleep(1.0)
|
||||
|
||||
assert approved_count > 0, "No approval events were processed"
|
||||
assert found_final, "Final message from mock CLI was not found in the GUI history"
|
||||
|
||||
def test_gemini_cli_rejection_and_history(live_gui):
|
||||
"""
|
||||
Integration test for the Gemini CLI provider: Rejection flow and history.
|
||||
"""
|
||||
client = ApiHookClient("http://127.0.0.1:8999")
|
||||
|
||||
# 0. Reset session
|
||||
client.click("btn_reset")
|
||||
client.set_value("auto_add_history", True)
|
||||
client.select_list_item("proj_files", "manual_slop")
|
||||
|
||||
mock_script = os.path.abspath("tests/mock_gemini_cli.py")
|
||||
cli_cmd = f'"{sys.executable}" "{mock_script}"'
|
||||
client.set_value("current_provider", "gemini_cli")
|
||||
client.set_value("gcli_path", cli_cmd)
|
||||
|
||||
# 2. Trigger a message
|
||||
print("[TEST] Sending user message (to be denied)...")
|
||||
client.set_value("ai_input", "Deny me")
|
||||
client.click("btn_gen_send")
|
||||
|
||||
# 3. Wait for event and reject
|
||||
timeout = 20
|
||||
start_time = time.time()
|
||||
denied = False
|
||||
while time.time() - start_time < timeout:
|
||||
for ev in client.get_events():
|
||||
etype = ev.get("type")
|
||||
eid = ev.get("request_id")
|
||||
print(f"[TEST] Received event: {etype}")
|
||||
if etype == "ask_received":
|
||||
print(f"[TEST] Denying request {eid}")
|
||||
requests.post("http://127.0.0.1:8999/api/ask/respond",
|
||||
json={"request_id": eid, "response": {"approved": False}})
|
||||
denied = True
|
||||
break
|
||||
if denied: break
|
||||
time.sleep(0.5)
|
||||
|
||||
assert denied, "No ask_received event to deny"
|
||||
|
||||
# 4. Verify rejection in history
|
||||
print("[TEST] Waiting for rejection in history...")
|
||||
rejection_found = False
|
||||
start_time = time.time()
|
||||
while time.time() - start_time < 20:
|
||||
session = client.get_session()
|
||||
entries = session.get("session", {}).get("entries", [])
|
||||
for entry in entries:
|
||||
if "Tool execution was denied" in entry.get("content", ""):
|
||||
rejection_found = True
|
||||
break
|
||||
if rejection_found: break
|
||||
time.sleep(1.0)
|
||||
|
||||
assert rejection_found, "Rejection message not found in history"
|
||||
52
tests/test_gemini_cli_parity_regression.py
Normal file
52
tests/test_gemini_cli_parity_regression.py
Normal file
@@ -0,0 +1,52 @@
|
||||
import pytest
|
||||
from unittest.mock import patch, MagicMock
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Add project root to sys.path
|
||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
|
||||
|
||||
import ai_client
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_ai_client():
|
||||
ai_client.reset_session()
|
||||
ai_client.set_provider("gemini_cli", "gemini-2.5-flash")
|
||||
ai_client.confirm_and_run_callback = lambda script, base_dir: "Mocked execution"
|
||||
ai_client.comms_log_callback = lambda entry: None
|
||||
ai_client.tool_log_callback = lambda script, result: None
|
||||
yield
|
||||
|
||||
@patch('ai_client.GeminiCliAdapter')
|
||||
@patch('ai_client._get_combined_system_prompt')
|
||||
def test_send_invokes_adapter_send(mock_prompt, mock_adapter_class):
|
||||
mock_prompt.return_value = "Mocked Prompt"
|
||||
mock_instance = mock_adapter_class.return_value
|
||||
mock_instance.send.return_value = {"text": "Done", "tool_calls": []}
|
||||
mock_instance.last_usage = {"input_tokens": 10}
|
||||
mock_instance.last_latency = 0.1
|
||||
mock_instance.session_id = None
|
||||
|
||||
ai_client.send("context", "message", discussion_history="hist")
|
||||
|
||||
expected_payload = "[DISCUSSION HISTORY]\n\nhist\n\n---\n\nmessage"
|
||||
assert mock_instance.send.called
|
||||
args, kwargs = mock_instance.send.call_args
|
||||
assert args[0] == expected_payload
|
||||
assert kwargs['system_instruction'] == "Mocked Prompt\n\n<context>\ncontext\n</context>"
|
||||
|
||||
@patch('ai_client.GeminiCliAdapter')
|
||||
def test_get_history_bleed_stats(mock_adapter_class):
|
||||
mock_instance = mock_adapter_class.return_value
|
||||
mock_instance.send.return_value = {"text": "txt", "tool_calls": []}
|
||||
mock_instance.last_usage = {"input_tokens": 1500}
|
||||
mock_instance.last_latency = 0.5
|
||||
mock_instance.session_id = "sess"
|
||||
|
||||
# Initialize by sending a message
|
||||
ai_client.send("context", "msg")
|
||||
|
||||
stats = ai_client.get_history_bleed_stats()
|
||||
|
||||
assert stats["provider"] == "gemini_cli"
|
||||
assert stats["current"] == 1500
|
||||
50
tests/test_gemini_metrics.py
Normal file
50
tests/test_gemini_metrics.py
Normal file
@@ -0,0 +1,50 @@
|
||||
import pytest
|
||||
import os
|
||||
import sys
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
# Ensure project root is in path
|
||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
|
||||
|
||||
# Import the necessary functions from ai_client, including the reset helper
|
||||
from ai_client import get_gemini_cache_stats, reset_session
|
||||
|
||||
def test_get_gemini_cache_stats_with_mock_client():
|
||||
"""
|
||||
Test that get_gemini_cache_stats correctly processes cache lists
|
||||
from a mocked client instance.
|
||||
"""
|
||||
# Ensure a clean state before the test by resetting the session
|
||||
reset_session()
|
||||
|
||||
# 1. Create a mock for the cache object that the client will return
|
||||
mock_cache = MagicMock()
|
||||
mock_cache.name = "cachedContents/test-cache"
|
||||
mock_cache.display_name = "Test Cache"
|
||||
mock_cache.model = "models/gemini-1.5-pro-001"
|
||||
mock_cache.size_bytes = 1024
|
||||
|
||||
# 2. Create a mock for the client instance
|
||||
mock_client_instance = MagicMock()
|
||||
# Configure its `caches.list` method to return our mock cache
|
||||
mock_client_instance.caches.list.return_value = [mock_cache]
|
||||
|
||||
# 3. Patch the Client constructor to return our mock instance
|
||||
# This intercepts the `_ensure_gemini_client` call inside the function
|
||||
with patch('google.genai.Client', return_value=mock_client_instance) as mock_client_constructor:
|
||||
|
||||
# 4. Call the function under test
|
||||
stats = get_gemini_cache_stats()
|
||||
|
||||
# 5. Assert that the function behaved as expected
|
||||
|
||||
# It should have constructed the client
|
||||
mock_client_constructor.assert_called_once()
|
||||
# It should have called the `list` method on the `caches` attribute
|
||||
mock_client_instance.caches.list.assert_called_once()
|
||||
|
||||
# The returned stats dictionary should be correct
|
||||
assert "cache_count" in stats
|
||||
assert "total_size_bytes" in stats
|
||||
assert stats["cache_count"] == 1
|
||||
assert stats["total_size_bytes"] == 1024
|
||||
96
tests/verify_mma_gui_robust.py
Normal file
96
tests/verify_mma_gui_robust.py
Normal file
@@ -0,0 +1,96 @@
|
||||
|
||||
import subprocess
|
||||
import time
|
||||
import sys
|
||||
import os
|
||||
import unittest
|
||||
|
||||
# Calculate project root
|
||||
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
|
||||
if PROJECT_ROOT not in sys.path:
|
||||
sys.path.insert(0, PROJECT_ROOT)
|
||||
|
||||
from api_hook_client import ApiHookClient
|
||||
|
||||
class TestMMAGUIRobust(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
# 1. Launch gui_2.py with --enable-test-hooks
|
||||
cls.gui_command = [sys.executable, "gui_2.py", "--enable-test-hooks"]
|
||||
print(f"Launching GUI: {' '.join(cls.gui_command)}")
|
||||
cls.gui_process = subprocess.Popen(
|
||||
cls.gui_command,
|
||||
cwd=PROJECT_ROOT,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True
|
||||
)
|
||||
cls.client = ApiHookClient()
|
||||
print("Waiting for GUI to start...")
|
||||
if not cls.client.wait_for_server(timeout=10):
|
||||
cls.gui_process.terminate()
|
||||
raise RuntimeError("GUI failed to start or hook server not responsive.")
|
||||
print("GUI started.")
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
if cls.gui_process:
|
||||
cls.gui_process.terminate()
|
||||
cls.gui_process.wait(timeout=5)
|
||||
|
||||
def test_mma_state_ingestion(self):
|
||||
"""Verify that mma_state_update event correctly updates GUI state."""
|
||||
track_data = {
|
||||
"id": "robust_test_track",
|
||||
"title": "Robust Verification Track",
|
||||
"description": "Verifying internal state ingestion"
|
||||
}
|
||||
tickets_data = [
|
||||
{"id": "T1", "target_file": "file1.py", "status": "todo"},
|
||||
{"id": "T2", "target_file": "file2.py", "status": "running"},
|
||||
{"id": "T3", "target_file": "file3.py", "status": "complete"},
|
||||
]
|
||||
|
||||
payload = {
|
||||
"status": "active",
|
||||
"active_tier": "Tier 2",
|
||||
"track": track_data,
|
||||
"tickets": tickets_data
|
||||
}
|
||||
|
||||
print("Pushing mma_state_update...")
|
||||
self.client.push_event("mma_state_update", payload)
|
||||
|
||||
# Give GUI a moment to process the async task
|
||||
time.sleep(1.0)
|
||||
|
||||
print("Querying mma_status...")
|
||||
status = self.client.get_mma_status()
|
||||
|
||||
self.assertEqual(status["mma_status"], "active")
|
||||
self.assertEqual(status["active_tier"], "Tier 2")
|
||||
self.assertEqual(status["active_track"]["id"], "robust_test_track")
|
||||
self.assertEqual(len(status["active_tickets"]), 3)
|
||||
self.assertEqual(status["active_tickets"][2]["status"], "complete")
|
||||
print("MMA state ingestion verified successfully.")
|
||||
|
||||
def test_mma_step_approval_trigger(self):
|
||||
"""Verify that mma_step_approval event sets the pending approval flag."""
|
||||
payload = {
|
||||
"ticket_id": "T2",
|
||||
"payload": "echo 'Robust Test'"
|
||||
}
|
||||
|
||||
print("Pushing mma_step_approval...")
|
||||
self.client.push_event("mma_step_approval", payload)
|
||||
|
||||
time.sleep(1.0)
|
||||
|
||||
print("Querying mma_status for pending approval...")
|
||||
status = self.client.get_mma_status()
|
||||
|
||||
self.assertTrue(status["pending_approval"], "GUI did not register pending MMA approval")
|
||||
print("MMA step approval trigger verified successfully.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
141
tests/visual_mma_verification.py
Normal file
141
tests/visual_mma_verification.py
Normal file
@@ -0,0 +1,141 @@
|
||||
import subprocess
|
||||
import time
|
||||
import sys
|
||||
import os
|
||||
import glob
|
||||
|
||||
# --- Configuration ---
|
||||
GUI_SCRIPT = 'gui_2.py'
|
||||
TEST_HOOKS_FLAG = '--enable-test-hooks'
|
||||
API_HOOK_CLIENT_MODULE = 'api_hook_client'
|
||||
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
|
||||
|
||||
# Ensure project root is in sys.path to import modules like api_hook_client
|
||||
if PROJECT_ROOT not in sys.path:
|
||||
sys.path.insert(0, PROJECT_ROOT)
|
||||
print(f"Added '{PROJECT_ROOT}' to sys.path for imports.")
|
||||
|
||||
try:
|
||||
from api_hook_client import ApiHookClient
|
||||
except ImportError as e:
|
||||
print(f"Error: Could not import ApiHookClient from '{API_HOOK_CLIENT_MODULE}'.")
|
||||
print(f"Please ensure '{API_HOOK_CLIENT_MODULE}.py' is accessible and '{PROJECT_ROOT}' is correctly added to sys.path.")
|
||||
print(f"Import error: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
def run_visual_mma_verification():
|
||||
print("Starting visual MMA verification test...")
|
||||
|
||||
# Change current directory to project root
|
||||
original_dir = os.getcwd()
|
||||
if original_dir != PROJECT_ROOT:
|
||||
try:
|
||||
os.chdir(PROJECT_ROOT)
|
||||
print(f"Changed current directory to: {PROJECT_ROOT}")
|
||||
except FileNotFoundError:
|
||||
print(f"Error: Project root directory '{PROJECT_ROOT}' not found.")
|
||||
return
|
||||
|
||||
# 1. Launch gui_2.py with --enable-test-hooks
|
||||
gui_command = [sys.executable, GUI_SCRIPT, TEST_HOOKS_FLAG]
|
||||
print(f"Launching GUI with command: {' '.join(gui_command)}")
|
||||
|
||||
try:
|
||||
gui_process = subprocess.Popen(
|
||||
gui_command,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True,
|
||||
cwd=PROJECT_ROOT
|
||||
)
|
||||
print(f"GUI process started with PID: {gui_process.pid}")
|
||||
except FileNotFoundError:
|
||||
print(f"Error: Could not find Python executable '{sys.executable}' or script '{GUI_SCRIPT}'.")
|
||||
return
|
||||
except Exception as e:
|
||||
print(f"Error starting GUI process: {e}")
|
||||
return
|
||||
|
||||
# Wait for GUI to start
|
||||
print("Waiting for GUI to initialize and hook server to start (5 seconds)...")
|
||||
time.sleep(5)
|
||||
|
||||
if gui_process.poll() is not None:
|
||||
print(f"Error: GUI process exited prematurely with return code {gui_process.returncode}.")
|
||||
return
|
||||
|
||||
# 2. Use ApiHookClient
|
||||
try:
|
||||
client = ApiHookClient()
|
||||
print("ApiHookClient initialized successfully.")
|
||||
except Exception as e:
|
||||
print(f"Failed to initialize ApiHookClient. Error: {e}")
|
||||
if gui_process:
|
||||
gui_process.terminate()
|
||||
return
|
||||
|
||||
# 3. Setup MMA data
|
||||
track_data = {
|
||||
"id": "visual_test_track",
|
||||
"title": "Visual Verification Track",
|
||||
"description": "A track to verify MMA UI components"
|
||||
}
|
||||
tickets_data = [
|
||||
{"id": "TICKET-001", "target_file": "core.py", "status": "todo"},
|
||||
{"id": "TICKET-002", "target_file": "utils.py", "status": "running"},
|
||||
{"id": "TICKET-003", "target_file": "tests.py", "status": "complete"},
|
||||
{"id": "TICKET-004", "target_file": "api.py", "status": "blocked"},
|
||||
{"id": "TICKET-005", "target_file": "gui.py", "status": "paused"},
|
||||
]
|
||||
|
||||
print("\nPushing MMA state update...")
|
||||
try:
|
||||
payload = {
|
||||
"status": "running",
|
||||
"active_tier": "Tier 3",
|
||||
"track": track_data,
|
||||
"tickets": tickets_data
|
||||
}
|
||||
client.push_event("mma_state_update", payload)
|
||||
print(" - MMA state update pushed.")
|
||||
except Exception as e:
|
||||
print(f" - Warning: Failed to push mma_state_update: {e}")
|
||||
|
||||
time.sleep(3)
|
||||
|
||||
print("Pushing 'mma_step_approval' event to trigger HITL modal...")
|
||||
try:
|
||||
approval_payload = {
|
||||
"ticket_id": "TICKET-002",
|
||||
"payload": "powershell -Command \"Write-Host 'Hello from Tier 3'\""
|
||||
}
|
||||
client.push_event("mma_step_approval", approval_payload)
|
||||
print("mma_step_approval event pushed successfully.")
|
||||
except Exception as e:
|
||||
print(f"Error pushing mma_step_approval event: {e}")
|
||||
|
||||
# 5. Provide clear print statements for manual verification
|
||||
print("\n--- Manual Verification Instructions ---")
|
||||
print("Please visually inspect the running GUI application:")
|
||||
print("1. MMA Dashboard: Ensure the 'MMA Dashboard' panel is visible and active.")
|
||||
print("2. Ticket Queue: Verify the 'Ticket Queue' section displays all 5 tickets with correct statuses.")
|
||||
print("3. Progress Bar: Check that the progress bar correctly reflects the completed/total tickets.")
|
||||
print("4. Approval Modal: Confirm that an 'MMA Step Approval' modal has appeared.")
|
||||
print("\n--------------------------------------")
|
||||
print("The test script has finished its automated actions.")
|
||||
print("The GUI application is still running. Press Enter to exit.")
|
||||
|
||||
try:
|
||||
input()
|
||||
except EOFError:
|
||||
pass
|
||||
|
||||
print("\nStopping GUI process...")
|
||||
if gui_process:
|
||||
gui_process.terminate()
|
||||
gui_process.wait(timeout=5)
|
||||
|
||||
print("Visual MMA verification test script finished.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
run_visual_mma_verification()
|
||||
95
tests/visual_orchestration_verification.py
Normal file
95
tests/visual_orchestration_verification.py
Normal file
@@ -0,0 +1,95 @@
|
||||
import pytest
|
||||
import time
|
||||
import sys
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
# Ensure project root is in path
|
||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
|
||||
|
||||
from api_hook_client import ApiHookClient
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_mma_epic_lifecycle(live_gui):
|
||||
"""
|
||||
Integration test for the full MMA Epic lifecycle.
|
||||
1. Start App.
|
||||
2. Trigger 'New Epic' request.
|
||||
3. Verify Tier 1 generates tracks.
|
||||
4. Trigger 'Start Track' for one of the tracks.
|
||||
5. Verify Tier 2 generates tickets.
|
||||
6. Verify execution loop starts.
|
||||
"""
|
||||
client = ApiHookClient()
|
||||
assert client.wait_for_server(timeout=15), "API hook server failed to start."
|
||||
|
||||
print("[Test] Initializing MMA Epic lifecycle test...")
|
||||
|
||||
# 0. Setup: Ensure we have a project and are in a clean state
|
||||
client.click("btn_reset")
|
||||
time.sleep(1)
|
||||
|
||||
# 1. Set Epic input
|
||||
epic_text = "Improve the logging system to include timestamps in all tool calls."
|
||||
print(f"[Test] Setting Epic input: {epic_text}")
|
||||
client.set_value("mma_epic_input", epic_text)
|
||||
|
||||
# 2. Trigger 'New Epic' (Plan Epic)
|
||||
print("[Test] Clicking 'Plan Epic (Tier 1)'...")
|
||||
client.click("btn_mma_plan_epic")
|
||||
|
||||
# 3. Verify that Tier 1 generates tracks
|
||||
print("[Test] Polling for Tier 1 tracks...")
|
||||
tracks_generated = False
|
||||
for i in range(120):
|
||||
status = client.get_value("ai_status")
|
||||
# Check if the proposal modal is shown or status changed
|
||||
if status and "Epic tracks generated" in str(status):
|
||||
tracks_generated = True
|
||||
print(f"[Test] Tracks generated after {i}s")
|
||||
break
|
||||
time.sleep(1)
|
||||
|
||||
assert tracks_generated, "Tier 1 failed to generate tracks within 60 seconds."
|
||||
|
||||
# 4. Trigger 'Start Track' for the first track
|
||||
print("[Test] Triggering 'Start Track' for track index 0...")
|
||||
client.click("btn_mma_start_track", user_data={"index": 0})
|
||||
|
||||
# 5. Verify that Tier 2 generates tickets and starts execution
|
||||
print("[Test] Polling for Tier 2 ticket generation and execution start...")
|
||||
execution_started = False
|
||||
for i in range(60):
|
||||
mma_status = client.get_mma_status()
|
||||
status_str = mma_status.get("mma_status", "idle")
|
||||
active_tier = mma_status.get("active_tier", "")
|
||||
|
||||
if status_str == "running" or "Tier 3" in str(active_tier):
|
||||
execution_started = True
|
||||
print(f"[Test] Execution started (Status: {status_str}, Tier: {active_tier}) after {i}s")
|
||||
break
|
||||
|
||||
current_ai_status = client.get_value("ai_status")
|
||||
if i % 5 == 0:
|
||||
print(f" ... still waiting. Current AI Status: {current_ai_status}")
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
assert execution_started, "Tier 2 failed to generate tickets or execution failed to start within 60 seconds."
|
||||
|
||||
# 6. Final verification of MMA state
|
||||
final_mma = client.get_mma_status()
|
||||
print(f"[Test] Final MMA Status: {final_mma.get('mma_status')}")
|
||||
print(f"[Test] Active Tier: {final_mma.get('active_tier')}")
|
||||
print(f"[Test] Ticket Count: {len(final_mma.get('active_tickets', []))}")
|
||||
|
||||
assert final_mma.get("mma_status") in ["running", "done", "blocked"]
|
||||
assert len(final_mma.get("active_tickets", [])) > 0
|
||||
|
||||
print("[Test] MMA Epic lifecycle verification successful!")
|
||||
|
||||
if __name__ == "__main__":
|
||||
# If run directly, try to use pytest
|
||||
import subprocess
|
||||
# Using sys.executable to ensure we use the same environment
|
||||
subprocess.run([sys.executable, "-m", "pytest", "-v", __file__])
|
||||
@@ -17,4 +17,28 @@ def test_mma_epic_simulation(live_gui):
|
||||
client = ApiHookClient()
|
||||
assert client.wait_for_server(timeout=10)
|
||||
|
||||
assert False, "Red Phase: Not yet implemented"
|
||||
# Try selecting MMA Dashboard tab if applicable (using typical naming convention)
|
||||
try:
|
||||
client.select_tab('main_tab_bar', 'tab_mma')
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Set model to mock to avoid real API calls and timeouts
|
||||
try:
|
||||
client.set_value('current_model', 'mock')
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
client.set_value('mma_epic_input', 'Build a simple calculator')
|
||||
client.click('btn_mma_plan_epic')
|
||||
|
||||
# Poll client.get_mma_status() every 1 second (up to 30 seconds)
|
||||
success = False
|
||||
for i in range(30):
|
||||
status = client.get_mma_status()
|
||||
if status and status.get('tracks') and len(status['tracks']) > 0:
|
||||
success = True
|
||||
break
|
||||
time.sleep(1)
|
||||
|
||||
assert success, "Failed to generate at least one track."
|
||||
|
||||
Reference in New Issue
Block a user