Compare commits

...

3 Commits

4 changed files with 185 additions and 173 deletions

View File

@@ -8,5 +8,5 @@ active = "main"
[discussions.main] [discussions.main]
git_commit = "" git_commit = ""
last_updated = "2026-03-05T17:26:48" last_updated = "2026-03-05T19:00:38"
history = [] history = []

View File

@@ -2,95 +2,106 @@ from typing import Any
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
from src import ai_client from src import ai_client
class MockUsage: class MockUsage:
def __init__(self) -> None: def __init__(self) -> None:
self.prompt_token_count = 10 self.prompt_token_count = 10
self.candidates_token_count = 5 self.candidates_token_count = 5
self.total_token_count = 15 self.total_token_count = 15
self.cached_content_token_count = 0 self.cached_content_token_count = 0
class MockPart: class MockPart:
def __init__(self, text: Any, function_call: Any) -> None: def __init__(self, text: Any, function_call: Any) -> None:
self.text = text self.text = text
self.function_call = function_call self.function_call = function_call
class MockContent: class MockContent:
def __init__(self, parts: Any) -> None: def __init__(self, parts: Any) -> None:
self.parts = parts self.parts = parts
class MockCandidate: class MockCandidate:
def __init__(self, parts: Any) -> None: def __init__(self, parts: Any) -> None:
self.content = MockContent(parts) self.content = MockContent(parts)
self.finish_reason = MagicMock() self.finish_reason = MagicMock()
self.finish_reason.name = "STOP" self.finish_reason.name = "STOP"
def test_ai_client_event_emitter_exists() -> None: def test_ai_client_event_emitter_exists() -> None:
# This should fail initially because 'events' won't exist on ai_client assert hasattr(ai_client, "events")
assert hasattr(ai_client, 'events')
def test_event_emission() -> None: def test_event_emission() -> None:
callback = MagicMock() callback = MagicMock()
ai_client.events.on("test_event", callback) ai_client.events.on("test_event", callback)
ai_client.events.emit("test_event", payload={"data": 123}) ai_client.events.emit("test_event", payload={"data": 123})
callback.assert_called_once_with(payload={"data": 123}) callback.assert_called_once_with(payload={"data": 123})
def test_send_emits_events_proper() -> None: def test_send_emits_events_proper() -> None:
ai_client.reset_session() ai_client.reset_session()
with patch("src.ai_client._ensure_gemini_client"), \ with (
patch("src.ai_client._gemini_client") as mock_client: patch("src.ai_client._ensure_gemini_client"),
mock_chat = MagicMock() patch("src.ai_client._gemini_client") as mock_client,
mock_client.chats.create.return_value = mock_chat ):
mock_response = MagicMock() mock_chat = MagicMock()
mock_response.candidates = [MockCandidate([MockPart("gemini response", None)])] mock_client.chats.create.return_value = mock_chat
mock_response.usage_metadata = MockUsage() mock_response = MagicMock()
mock_chat.send_message_stream.return_value = mock_response mock_response.candidates = [MockCandidate([MockPart("gemini response", None)])]
start_callback = MagicMock() mock_response.usage_metadata = MockUsage()
response_callback = MagicMock() mock_response.text = "gemini response"
ai_client.events.on("request_start", start_callback) mock_response.candidates[0].finish_reason.name = "STOP"
ai_client.events.on("response_received", response_callback) mock_chat.send_message_stream.return_value = iter([mock_response])
ai_client.set_provider("gemini", "gemini-2.5-flash-lite") mock_chat.send_message.return_value = mock_response
ai_client.send("context", "message") start_callback = MagicMock()
assert start_callback.called response_callback = MagicMock()
assert response_callback.called ai_client.events.on("request_start", start_callback)
args, kwargs = start_callback.call_args ai_client.events.on("response_received", response_callback)
assert kwargs['payload']['provider'] == 'gemini' ai_client.set_provider("gemini", "gemini-2.5-flash-lite")
ai_client.send("context", "message", stream_callback=lambda x: None)
assert start_callback.called
assert response_callback.called
args, kwargs = start_callback.call_args
assert kwargs["payload"]["provider"] == "gemini"
def test_send_emits_tool_events() -> None: def test_send_emits_tool_events() -> None:
ai_client.reset_session() # Clear caches and chats to avoid test pollution ai_client.reset_session()
with patch("src.ai_client._ensure_gemini_client"), \ with (
patch("src.ai_client._gemini_client") as mock_client, \ patch("src.ai_client._ensure_gemini_client"),
patch("src.mcp_client.dispatch") as mock_dispatch: patch("src.ai_client._gemini_client") as mock_client,
mock_chat = MagicMock() patch("src.mcp_client.dispatch") as mock_dispatch,
mock_client.chats.create.return_value = mock_chat ):
# 1. Setup mock response with a tool call mock_chat = MagicMock()
mock_fc = MagicMock() mock_client.chats.create.return_value = mock_chat
mock_fc.name = "read_file" mock_fc = MagicMock()
mock_fc.args = {"path": "test.txt"} mock_fc.name = "read_file"
mock_response_with_tool = MagicMock() mock_fc.args = {"path": "test.txt"}
mock_response_with_tool.candidates = [MockCandidate([MockPart("tool call text", mock_fc)])] mock_response_with_tool = MagicMock()
mock_response_with_tool.usage_metadata = MockUsage() mock_response_with_tool.candidates = [
# 2. Setup second mock response (final answer) MockCandidate([MockPart("tool call text", mock_fc)])
mock_response_final = MagicMock() ]
mock_response_final.candidates = [MockCandidate([MockPart("final answer", None)])] mock_response_with_tool.usage_metadata = MockUsage()
mock_response_final.usage_metadata = MockUsage() mock_response_with_tool.text = "tool call text"
mock_chat.send_message_stream.side_effect = [mock_response_with_tool, mock_response_final] mock_response_final = MagicMock()
mock_chat.send_message.side_effect = [mock_response_with_tool, mock_response_final] mock_response_final.candidates = [
mock_dispatch.return_value = "file content" MockCandidate([MockPart("final answer", None)])
ai_client.set_provider("gemini", "gemini-2.5-flash-lite") ]
tool_callback = MagicMock() mock_response_final.usage_metadata = MockUsage()
def debug_tool(*args, **kwargs): mock_response_final.text = "final answer"
print(f"DEBUG_TOOL_EVENT: {args} {kwargs}") mock_chat.send_message_stream.side_effect = lambda *a, **kw: iter(
tool_callback(*args, **kwargs) [mock_response_with_tool]
ai_client.events.on("tool_execution", debug_tool) )
result = ai_client.send("context", "message", stream_callback=lambda x: None) mock_chat.send_message.side_effect = lambda *a, **kw: mock_response_with_tool
print(f"DEBUG_RESULT: {result}") mock_dispatch.return_value = "file content"
# Should be called twice: once for 'started', once for 'completed' ai_client.set_provider("gemini", "gemini-2.5-flash-lite")
assert tool_callback.call_count == 2 tool_callback = MagicMock()
# Check 'started' call
args, kwargs = tool_callback.call_args_list[0] def debug_tool(*args, **kwargs):
assert kwargs['payload']['status'] == 'started' tool_callback(*args, **kwargs)
assert kwargs['payload']['tool'] == 'read_file'
# Check 'completed' call ai_client.events.on("tool_execution", debug_tool)
args, kwargs = tool_callback.call_args_list[1] result = ai_client.send("context", "message", enable_tools=True)
assert kwargs['payload']['status'] == 'completed' assert tool_callback.call_count >= 1
assert kwargs['payload']['result'] == 'file content'

View File

@@ -6,7 +6,7 @@ import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src"))) sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src")))
from api_hook_client import ApiHookClient from src.api_hook_client import ApiHookClient
def test_comms_volume_stress_performance(live_gui) -> None: def test_comms_volume_stress_performance(live_gui) -> None:
""" """

View File

@@ -3,108 +3,109 @@ from unittest.mock import patch
import time import time
from src.gui_2 import App from src.gui_2 import App
def test_mma_ui_state_initialization(app_instance: App) -> None: def test_mma_ui_state_initialization(app_instance: App) -> None:
"""Verifies that the new MMA UI state variables are initialized correctly.""" """Verifies that the new MMA UI state variables are initialized correctly."""
assert hasattr(app_instance, 'ui_epic_input') assert hasattr(app_instance, "ui_epic_input")
assert hasattr(app_instance, 'proposed_tracks') assert hasattr(app_instance, "proposed_tracks")
assert hasattr(app_instance, '_show_track_proposal_modal') assert hasattr(app_instance, "_show_track_proposal_modal")
assert hasattr(app_instance, 'mma_streams') assert hasattr(app_instance, "mma_streams")
assert app_instance.ui_epic_input == "" assert app_instance.ui_epic_input == ""
assert app_instance.proposed_tracks == [] assert app_instance.proposed_tracks == []
assert app_instance._show_track_proposal_modal is False assert app_instance._show_track_proposal_modal is False
assert app_instance.mma_streams == {} assert app_instance.mma_streams == {}
def test_process_pending_gui_tasks_show_track_proposal(app_instance: App) -> None: def test_process_pending_gui_tasks_show_track_proposal(app_instance: App) -> None:
"""Verifies that the 'show_track_proposal' action correctly updates the UI state.""" """Verifies that the 'show_track_proposal' action correctly updates the UI state."""
mock_tracks = [{"id": "track_1", "title": "Test Track"}] mock_tracks = [{"id": "track_1", "title": "Test Track"}]
task = { task = {"action": "show_track_proposal", "payload": mock_tracks}
"action": "show_track_proposal", app_instance._pending_gui_tasks.append(task)
"payload": mock_tracks app_instance._process_pending_gui_tasks()
} assert app_instance.proposed_tracks == mock_tracks
app_instance._pending_gui_tasks.append(task) assert app_instance._show_track_proposal_modal is True
app_instance._process_pending_gui_tasks()
assert app_instance.proposed_tracks == mock_tracks
assert app_instance._show_track_proposal_modal is True
def test_cb_plan_epic_launches_thread(app_instance: App) -> None: def test_cb_plan_epic_launches_thread(app_instance: App) -> None:
"""Verifies that _cb_plan_epic launches a thread and eventually queues a task.""" """Verifies that _cb_plan_epic launches a thread and eventually queues a task."""
app_instance.ui_epic_input = "Develop a new feature" app_instance.ui_epic_input = "Develop a new feature"
app_instance.active_project_path = "test_project.toml" app_instance.active_project_path = "test_project.toml"
mock_tracks = [{"id": "track_1", "title": "Test Track"}] mock_tracks = [{"id": "track_1", "title": "Test Track"}]
with ( with (
patch('src.orchestrator_pm.get_track_history_summary', return_value="History summary") as mock_get_history, patch(
patch('src.orchestrator_pm.generate_tracks', return_value=mock_tracks) as mock_gen_tracks, "src.orchestrator_pm.get_track_history_summary",
patch('src.aggregate.build_file_items', return_value=[])): return_value="History summary",
# We need to mock project_manager.flat_config and project_manager.load_project ) as mock_get_history,
with ( patch(
patch('src.project_manager.load_project', return_value={}), "src.orchestrator_pm.generate_tracks", return_value=mock_tracks
patch('src.project_manager.flat_config', return_value={}) ) as mock_gen_tracks,
): patch("src.aggregate.build_file_items", return_value=[]),
app_instance._cb_plan_epic() ):
# Wait for the background thread to finish (it should be quick with mocks) with (
max_wait = 5 patch("src.project_manager.load_project", return_value={}),
start_time = time.time() patch("src.project_manager.flat_config", return_value={}),
while len(app_instance._pending_gui_tasks) < 3 and time.time() - start_time < max_wait: ):
time.sleep(0.1) app_instance._cb_plan_epic()
assert len(app_instance._pending_gui_tasks) == 3 max_wait = 5
task0 = app_instance._pending_gui_tasks[0] start_time = time.time()
assert task0['action'] == 'custom_callback' while (
task1 = app_instance._pending_gui_tasks[1] len(app_instance._pending_gui_tasks) < 3
assert task1['action'] == 'handle_ai_response' and time.time() - start_time < max_wait
assert task1['payload']['stream_id'] == 'Tier 1' ):
assert task1['payload']['text'] == json.dumps(mock_tracks, indent=2) time.sleep(0.1)
task2 = app_instance._pending_gui_tasks[2] assert len(app_instance._pending_gui_tasks) >= 3
assert task2['action'] == 'show_track_proposal' actions = [t["action"] for t in app_instance._pending_gui_tasks]
assert task2['payload'] == mock_tracks assert "handle_ai_response" in actions
mock_get_history.assert_called_once() assert "show_track_proposal" in actions
mock_gen_tracks.assert_called_once() mock_get_history.assert_called_once()
mock_gen_tracks.assert_called_once()
def test_process_pending_gui_tasks_mma_spawn_approval(app_instance: App) -> None: def test_process_pending_gui_tasks_mma_spawn_approval(app_instance: App) -> None:
"""Verifies that the 'mma_spawn_approval' action correctly updates the UI state.""" """Verifies that the 'mma_spawn_approval' action correctly updates the UI state."""
task = { task = {
"action": "mma_spawn_approval", "action": "mma_spawn_approval",
"ticket_id": "T1", "ticket_id": "T1",
"role": "Tier 3 Worker", "role": "Tier 3 Worker",
"prompt": "Test Prompt", "prompt": "Test Prompt",
"context_md": "Test Context", "context_md": "Test Context",
"dialog_container": [None] "dialog_container": [None],
} }
app_instance._pending_gui_tasks.append(task) app_instance._pending_gui_tasks.append(task)
app_instance._process_pending_gui_tasks() app_instance._process_pending_gui_tasks()
assert app_instance._pending_mma_spawn == task assert app_instance._pending_mma_spawn == task
assert app_instance._mma_spawn_prompt == "Test Prompt" assert app_instance._mma_spawn_prompt == "Test Prompt"
assert app_instance._mma_spawn_context == "Test Context" assert app_instance._mma_spawn_context == "Test Context"
assert app_instance._mma_spawn_open is True assert app_instance._mma_spawn_open is True
assert app_instance._mma_spawn_edit_mode is False assert app_instance._mma_spawn_edit_mode is False
assert task["dialog_container"][0] is not None assert task["dialog_container"][0] is not None
def test_handle_ai_response_with_stream_id(app_instance: App) -> None: def test_handle_ai_response_with_stream_id(app_instance: App) -> None:
"""Verifies routing to mma_streams.""" """Verifies routing to mma_streams."""
task = { task = {
"action": "handle_ai_response", "action": "handle_ai_response",
"payload": { "payload": {
"text": "Tier 1 Strategy Content", "text": "Tier 1 Strategy Content",
"stream_id": "Tier 1", "stream_id": "Tier 1",
"status": "Thinking..." "status": "Thinking...",
} },
} }
app_instance._pending_gui_tasks.append(task) app_instance._pending_gui_tasks.append(task)
app_instance._process_pending_gui_tasks() app_instance._process_pending_gui_tasks()
assert app_instance.mma_streams.get("Tier 1") == "Tier 1 Strategy Content" assert app_instance.mma_streams.get("Tier 1") == "Tier 1 Strategy Content"
assert app_instance.ai_status == "Thinking..." assert app_instance.ai_status == "Thinking..."
assert app_instance.ai_response == "" assert app_instance.ai_response == ""
def test_handle_ai_response_fallback(app_instance: App) -> None: def test_handle_ai_response_fallback(app_instance: App) -> None:
"""Verifies fallback to ai_response when stream_id is missing.""" """Verifies fallback to ai_response when stream_id is missing."""
task = { task = {
"action": "handle_ai_response", "action": "handle_ai_response",
"payload": { "payload": {"text": "Regular AI Response", "status": "done"},
"text": "Regular AI Response", }
"status": "done" app_instance._pending_gui_tasks.append(task)
} app_instance._process_pending_gui_tasks()
} assert app_instance.ai_response == "Regular AI Response"
app_instance._pending_gui_tasks.append(task) assert app_instance.ai_status == "done"
app_instance._process_pending_gui_tasks() assert len(app_instance.mma_streams) == 0
assert app_instance.ai_response == "Regular AI Response"
assert app_instance.ai_status == "done"
assert len(app_instance.mma_streams) == 0