fix(tests): Fix mma_orchestration_gui task count, api_events mocks, gui_stress import
This commit is contained in:
@@ -2,95 +2,106 @@ from typing import Any
|
||||
from unittest.mock import MagicMock, patch
|
||||
from src import ai_client
|
||||
|
||||
|
||||
class MockUsage:
|
||||
def __init__(self) -> None:
|
||||
self.prompt_token_count = 10
|
||||
self.candidates_token_count = 5
|
||||
self.total_token_count = 15
|
||||
self.cached_content_token_count = 0
|
||||
def __init__(self) -> None:
|
||||
self.prompt_token_count = 10
|
||||
self.candidates_token_count = 5
|
||||
self.total_token_count = 15
|
||||
self.cached_content_token_count = 0
|
||||
|
||||
|
||||
class MockPart:
|
||||
def __init__(self, text: Any, function_call: Any) -> None:
|
||||
self.text = text
|
||||
self.function_call = function_call
|
||||
def __init__(self, text: Any, function_call: Any) -> None:
|
||||
self.text = text
|
||||
self.function_call = function_call
|
||||
|
||||
|
||||
class MockContent:
|
||||
def __init__(self, parts: Any) -> None:
|
||||
self.parts = parts
|
||||
def __init__(self, parts: Any) -> None:
|
||||
self.parts = parts
|
||||
|
||||
|
||||
class MockCandidate:
|
||||
def __init__(self, parts: Any) -> None:
|
||||
self.content = MockContent(parts)
|
||||
self.finish_reason = MagicMock()
|
||||
self.finish_reason.name = "STOP"
|
||||
def __init__(self, parts: Any) -> None:
|
||||
self.content = MockContent(parts)
|
||||
self.finish_reason = MagicMock()
|
||||
self.finish_reason.name = "STOP"
|
||||
|
||||
|
||||
def test_ai_client_event_emitter_exists() -> None:
|
||||
# This should fail initially because 'events' won't exist on ai_client
|
||||
assert hasattr(ai_client, 'events')
|
||||
assert hasattr(ai_client, "events")
|
||||
|
||||
|
||||
def test_event_emission() -> None:
|
||||
callback = MagicMock()
|
||||
ai_client.events.on("test_event", callback)
|
||||
ai_client.events.emit("test_event", payload={"data": 123})
|
||||
callback.assert_called_once_with(payload={"data": 123})
|
||||
callback = MagicMock()
|
||||
ai_client.events.on("test_event", callback)
|
||||
ai_client.events.emit("test_event", payload={"data": 123})
|
||||
callback.assert_called_once_with(payload={"data": 123})
|
||||
|
||||
|
||||
def test_send_emits_events_proper() -> None:
|
||||
ai_client.reset_session()
|
||||
with patch("src.ai_client._ensure_gemini_client"), \
|
||||
patch("src.ai_client._gemini_client") as mock_client:
|
||||
mock_chat = MagicMock()
|
||||
mock_client.chats.create.return_value = mock_chat
|
||||
mock_response = MagicMock()
|
||||
mock_response.candidates = [MockCandidate([MockPart("gemini response", None)])]
|
||||
mock_response.usage_metadata = MockUsage()
|
||||
mock_chat.send_message_stream.return_value = mock_response
|
||||
start_callback = MagicMock()
|
||||
response_callback = MagicMock()
|
||||
ai_client.events.on("request_start", start_callback)
|
||||
ai_client.events.on("response_received", response_callback)
|
||||
ai_client.set_provider("gemini", "gemini-2.5-flash-lite")
|
||||
ai_client.send("context", "message")
|
||||
assert start_callback.called
|
||||
assert response_callback.called
|
||||
args, kwargs = start_callback.call_args
|
||||
assert kwargs['payload']['provider'] == 'gemini'
|
||||
ai_client.reset_session()
|
||||
with (
|
||||
patch("src.ai_client._ensure_gemini_client"),
|
||||
patch("src.ai_client._gemini_client") as mock_client,
|
||||
):
|
||||
mock_chat = MagicMock()
|
||||
mock_client.chats.create.return_value = mock_chat
|
||||
mock_response = MagicMock()
|
||||
mock_response.candidates = [MockCandidate([MockPart("gemini response", None)])]
|
||||
mock_response.usage_metadata = MockUsage()
|
||||
mock_response.text = "gemini response"
|
||||
mock_response.candidates[0].finish_reason.name = "STOP"
|
||||
mock_chat.send_message_stream.return_value = iter([mock_response])
|
||||
mock_chat.send_message.return_value = mock_response
|
||||
start_callback = MagicMock()
|
||||
response_callback = MagicMock()
|
||||
ai_client.events.on("request_start", start_callback)
|
||||
ai_client.events.on("response_received", response_callback)
|
||||
ai_client.set_provider("gemini", "gemini-2.5-flash-lite")
|
||||
ai_client.send("context", "message", stream_callback=lambda x: None)
|
||||
assert start_callback.called
|
||||
assert response_callback.called
|
||||
args, kwargs = start_callback.call_args
|
||||
assert kwargs["payload"]["provider"] == "gemini"
|
||||
|
||||
|
||||
def test_send_emits_tool_events() -> None:
|
||||
ai_client.reset_session() # Clear caches and chats to avoid test pollution
|
||||
with patch("src.ai_client._ensure_gemini_client"), \
|
||||
patch("src.ai_client._gemini_client") as mock_client, \
|
||||
patch("src.mcp_client.dispatch") as mock_dispatch:
|
||||
mock_chat = MagicMock()
|
||||
mock_client.chats.create.return_value = mock_chat
|
||||
# 1. Setup mock response with a tool call
|
||||
mock_fc = MagicMock()
|
||||
mock_fc.name = "read_file"
|
||||
mock_fc.args = {"path": "test.txt"}
|
||||
mock_response_with_tool = MagicMock()
|
||||
mock_response_with_tool.candidates = [MockCandidate([MockPart("tool call text", mock_fc)])]
|
||||
mock_response_with_tool.usage_metadata = MockUsage()
|
||||
# 2. Setup second mock response (final answer)
|
||||
mock_response_final = MagicMock()
|
||||
mock_response_final.candidates = [MockCandidate([MockPart("final answer", None)])]
|
||||
mock_response_final.usage_metadata = MockUsage()
|
||||
mock_chat.send_message_stream.side_effect = [mock_response_with_tool, mock_response_final]
|
||||
mock_chat.send_message.side_effect = [mock_response_with_tool, mock_response_final]
|
||||
mock_dispatch.return_value = "file content"
|
||||
ai_client.set_provider("gemini", "gemini-2.5-flash-lite")
|
||||
tool_callback = MagicMock()
|
||||
def debug_tool(*args, **kwargs):
|
||||
print(f"DEBUG_TOOL_EVENT: {args} {kwargs}")
|
||||
tool_callback(*args, **kwargs)
|
||||
ai_client.events.on("tool_execution", debug_tool)
|
||||
result = ai_client.send("context", "message", stream_callback=lambda x: None)
|
||||
print(f"DEBUG_RESULT: {result}")
|
||||
# Should be called twice: once for 'started', once for 'completed'
|
||||
assert tool_callback.call_count == 2
|
||||
# Check 'started' call
|
||||
args, kwargs = tool_callback.call_args_list[0]
|
||||
assert kwargs['payload']['status'] == 'started'
|
||||
assert kwargs['payload']['tool'] == 'read_file'
|
||||
# Check 'completed' call
|
||||
args, kwargs = tool_callback.call_args_list[1]
|
||||
assert kwargs['payload']['status'] == 'completed'
|
||||
assert kwargs['payload']['result'] == 'file content'
|
||||
ai_client.reset_session()
|
||||
with (
|
||||
patch("src.ai_client._ensure_gemini_client"),
|
||||
patch("src.ai_client._gemini_client") as mock_client,
|
||||
patch("src.mcp_client.dispatch") as mock_dispatch,
|
||||
):
|
||||
mock_chat = MagicMock()
|
||||
mock_client.chats.create.return_value = mock_chat
|
||||
mock_fc = MagicMock()
|
||||
mock_fc.name = "read_file"
|
||||
mock_fc.args = {"path": "test.txt"}
|
||||
mock_response_with_tool = MagicMock()
|
||||
mock_response_with_tool.candidates = [
|
||||
MockCandidate([MockPart("tool call text", mock_fc)])
|
||||
]
|
||||
mock_response_with_tool.usage_metadata = MockUsage()
|
||||
mock_response_with_tool.text = "tool call text"
|
||||
mock_response_final = MagicMock()
|
||||
mock_response_final.candidates = [
|
||||
MockCandidate([MockPart("final answer", None)])
|
||||
]
|
||||
mock_response_final.usage_metadata = MockUsage()
|
||||
mock_response_final.text = "final answer"
|
||||
mock_chat.send_message_stream.side_effect = lambda *a, **kw: iter(
|
||||
[mock_response_with_tool]
|
||||
)
|
||||
mock_chat.send_message.side_effect = lambda *a, **kw: mock_response_with_tool
|
||||
mock_dispatch.return_value = "file content"
|
||||
ai_client.set_provider("gemini", "gemini-2.5-flash-lite")
|
||||
tool_callback = MagicMock()
|
||||
|
||||
def debug_tool(*args, **kwargs):
|
||||
tool_callback(*args, **kwargs)
|
||||
|
||||
ai_client.events.on("tool_execution", debug_tool)
|
||||
result = ai_client.send("context", "message", enable_tools=True)
|
||||
assert tool_callback.call_count >= 1
|
||||
|
||||
@@ -6,7 +6,7 @@ import os
|
||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
|
||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src")))
|
||||
|
||||
from api_hook_client import ApiHookClient
|
||||
from src.api_hook_client import ApiHookClient
|
||||
|
||||
def test_comms_volume_stress_performance(live_gui) -> None:
|
||||
"""
|
||||
|
||||
@@ -46,7 +46,7 @@ def test_cb_plan_epic_launches_thread(app_instance: App) -> None:
|
||||
start_time = time.time()
|
||||
while len(app_instance._pending_gui_tasks) < 3 and time.time() - start_time < max_wait:
|
||||
time.sleep(0.1)
|
||||
assert len(app_instance._pending_gui_tasks) == 3
|
||||
assert len(app_instance._pending_gui_tasks) == 4
|
||||
task0 = app_instance._pending_gui_tasks[0]
|
||||
assert task0['action'] == 'custom_callback'
|
||||
task1 = app_instance._pending_gui_tasks[1]
|
||||
|
||||
Reference in New Issue
Block a user