From 579ee8394f92f27799f25b2c3103afec8f0de729 Mon Sep 17 00:00:00 2001 From: Ed_ Date: Sat, 28 Feb 2026 19:11:23 -0500 Subject: [PATCH] refactor(tests): Add strict type hints to second batch of test files --- tests/test_api_events.py | 173 ++++++++++++------------ tests/test_auto_whitelist.py | 11 +- tests/test_conductor_tech_lead.py | 188 +++++++++++++-------------- tests/test_extended_sims.py | 9 +- tests/test_gemini_cli_adapter.py | 171 ++++++++++++------------ tests/test_gui2_parity.py | 7 +- tests/test_layout_reorganization.py | 9 +- tests/test_orchestrator_pm.py | 121 ++++++++--------- tests/test_project_manager_tracks.py | 10 +- tests/test_tiered_context.py | 10 +- 10 files changed, 358 insertions(+), 351 deletions(-) diff --git a/tests/test_api_events.py b/tests/test_api_events.py index 22d7494..ce54913 100644 --- a/tests/test_api_events.py +++ b/tests/test_api_events.py @@ -1,106 +1,107 @@ import pytest +from typing import Any from unittest.mock import MagicMock, patch import ai_client class MockUsage: - def __init__(self) -> None: - self.prompt_token_count = 10 - self.candidates_token_count = 5 - self.total_token_count = 15 - self.cached_content_token_count = 0 + def __init__(self) -> None: + self.prompt_token_count = 10 + self.candidates_token_count = 5 + self.total_token_count = 15 + self.cached_content_token_count = 0 class MockPart: - def __init__(self, text, function_call): - self.text = text - self.function_call = function_call + def __init__(self, text: Any, function_call: Any) -> None: + self.text = text + self.function_call = function_call class MockContent: - def __init__(self, parts): - self.parts = parts + def __init__(self, parts: Any) -> None: + self.parts = parts class MockCandidate: - def __init__(self, parts): - self.content = MockContent(parts) - self.finish_reason = MagicMock() - self.finish_reason.name = "STOP" + def __init__(self, parts: Any) -> None: + self.content = MockContent(parts) + self.finish_reason = MagicMock() + self.finish_reason.name = "STOP" -def test_ai_client_event_emitter_exists(): -# This should fail initially because 'events' won't exist on ai_client - assert hasattr(ai_client, 'events') +def test_ai_client_event_emitter_exists() -> None: + # This should fail initially because 'events' won't exist on ai_client + assert hasattr(ai_client, 'events') def test_event_emission() -> None: - callback = MagicMock() - ai_client.events.on("test_event", callback) - ai_client.events.emit("test_event", payload={"data": 123}) - callback.assert_called_once_with(payload={"data": 123}) + callback = MagicMock() + ai_client.events.on("test_event", callback) + ai_client.events.emit("test_event", payload={"data": 123}) + callback.assert_called_once_with(payload={"data": 123}) def test_send_emits_events() -> None: - with patch("ai_client._send_gemini") as mock_send_gemini, \ - patch("ai_client._send_anthropic") as mock_send_anthropic: - mock_send_gemini.return_value = "gemini response" - start_callback = MagicMock() - response_callback = MagicMock() - ai_client.events.on("request_start", start_callback) - ai_client.events.on("response_received", response_callback) - ai_client.set_provider("gemini", "gemini-2.5-flash-lite") - ai_client.send("context", "message") - # We mocked _send_gemini so it doesn't emit events inside. - # But wait, ai_client.send itself emits request_start and response_received? - # Actually, ai_client.send delegates to _send_gemini. - # Let's mock _gemini_client instead to let _send_gemini run and emit events. - pass + with patch("ai_client._send_gemini") as mock_send_gemini, \ + patch("ai_client._send_anthropic") as mock_send_anthropic: + mock_send_gemini.return_value = "gemini response" + start_callback = MagicMock() + response_callback = MagicMock() + ai_client.events.on("request_start", start_callback) + ai_client.events.on("response_received", response_callback) + ai_client.set_provider("gemini", "gemini-2.5-flash-lite") + ai_client.send("context", "message") + # We mocked _send_gemini so it doesn't emit events inside. + # But wait, ai_client.send itself emits request_start and response_received? + # Actually, ai_client.send delegates to _send_gemini. + # Let's mock _gemini_client instead to let _send_gemini run and emit events. + pass def test_send_emits_events_proper() -> None: - with patch("ai_client._ensure_gemini_client"), \ - patch("ai_client._gemini_client") as mock_client: - mock_chat = MagicMock() - mock_client.chats.create.return_value = mock_chat - mock_response = MagicMock() - mock_response.candidates = [MockCandidate([MockPart("gemini response", None)])] - mock_response.usage_metadata = MockUsage() - mock_chat.send_message.return_value = mock_response - start_callback = MagicMock() - response_callback = MagicMock() - ai_client.events.on("request_start", start_callback) - ai_client.events.on("response_received", response_callback) - ai_client.set_provider("gemini", "gemini-2.5-flash-lite") - ai_client.send("context", "message") - assert start_callback.called - assert response_callback.called - args, kwargs = start_callback.call_args - assert kwargs['payload']['provider'] == 'gemini' + with patch("ai_client._ensure_gemini_client"), \ + patch("ai_client._gemini_client") as mock_client: + mock_chat = MagicMock() + mock_client.chats.create.return_value = mock_chat + mock_response = MagicMock() + mock_response.candidates = [MockCandidate([MockPart("gemini response", None)])] + mock_response.usage_metadata = MockUsage() + mock_chat.send_message.return_value = mock_response + start_callback = MagicMock() + response_callback = MagicMock() + ai_client.events.on("request_start", start_callback) + ai_client.events.on("response_received", response_callback) + ai_client.set_provider("gemini", "gemini-2.5-flash-lite") + ai_client.send("context", "message") + assert start_callback.called + assert response_callback.called + args, kwargs = start_callback.call_args + assert kwargs['payload']['provider'] == 'gemini' def test_send_emits_tool_events() -> None: - import mcp_client - with patch("ai_client._ensure_gemini_client"), \ - patch("ai_client._gemini_client") as mock_client, \ - patch("mcp_client.dispatch") as mock_dispatch: - mock_chat = MagicMock() - mock_client.chats.create.return_value = mock_chat - # 1. Setup mock response with a tool call - mock_fc = MagicMock() - mock_fc.name = "read_file" - mock_fc.args = {"path": "test.txt"} - mock_response_with_tool = MagicMock() - mock_response_with_tool.candidates = [MockCandidate([MockPart("tool call text", mock_fc)])] - mock_response_with_tool.usage_metadata = MockUsage() - # 2. Setup second mock response (final answer) - mock_response_final = MagicMock() - mock_response_final.candidates = [MockCandidate([MockPart("final answer", None)])] - mock_response_final.usage_metadata = MockUsage() - mock_chat.send_message.side_effect = [mock_response_with_tool, mock_response_final] - mock_dispatch.return_value = "file content" - ai_client.set_provider("gemini", "gemini-2.5-flash-lite") - tool_callback = MagicMock() - ai_client.events.on("tool_execution", tool_callback) - ai_client.send("context", "message") - # Should be called twice: once for 'started', once for 'completed' - assert tool_callback.call_count == 2 - # Check 'started' call - args, kwargs = tool_callback.call_args_list[0] - assert kwargs['payload']['status'] == 'started' - assert kwargs['payload']['tool'] == 'read_file' - # Check 'completed' call - args, kwargs = tool_callback.call_args_list[1] - assert kwargs['payload']['status'] == 'completed' - assert kwargs['payload']['result'] == 'file content' + import mcp_client + with patch("ai_client._ensure_gemini_client"), \ + patch("ai_client._gemini_client") as mock_client, \ + patch("mcp_client.dispatch") as mock_dispatch: + mock_chat = MagicMock() + mock_client.chats.create.return_value = mock_chat + # 1. Setup mock response with a tool call + mock_fc = MagicMock() + mock_fc.name = "read_file" + mock_fc.args = {"path": "test.txt"} + mock_response_with_tool = MagicMock() + mock_response_with_tool.candidates = [MockCandidate([MockPart("tool call text", mock_fc)])] + mock_response_with_tool.usage_metadata = MockUsage() + # 2. Setup second mock response (final answer) + mock_response_final = MagicMock() + mock_response_final.candidates = [MockCandidate([MockPart("final answer", None)])] + mock_response_final.usage_metadata = MockUsage() + mock_chat.send_message.side_effect = [mock_response_with_tool, mock_response_final] + mock_dispatch.return_value = "file content" + ai_client.set_provider("gemini", "gemini-2.5-flash-lite") + tool_callback = MagicMock() + ai_client.events.on("tool_execution", tool_callback) + ai_client.send("context", "message") + # Should be called twice: once for 'started', once for 'completed' + assert tool_callback.call_count == 2 + # Check 'started' call + args, kwargs = tool_callback.call_args_list[0] + assert kwargs['payload']['status'] == 'started' + assert kwargs['payload']['tool'] == 'read_file' + # Check 'completed' call + args, kwargs = tool_callback.call_args_list[1] + assert kwargs['payload']['status'] == 'completed' + assert kwargs['payload']['result'] == 'file content' diff --git a/tests/test_auto_whitelist.py b/tests/test_auto_whitelist.py index 7e24b5a..35efcab 100644 --- a/tests/test_auto_whitelist.py +++ b/tests/test_auto_whitelist.py @@ -1,17 +1,18 @@ import os import pytest +from typing import Any from datetime import datetime from log_registry import LogRegistry @pytest.fixture -def registry_setup(tmp_path): +def registry_setup(tmp_path: Any) -> Any: registry_path = tmp_path / "log_registry.toml" logs_dir = tmp_path / "logs" logs_dir.mkdir() registry = LogRegistry(str(registry_path)) return registry, logs_dir -def test_auto_whitelist_keywords(registry_setup): +def test_auto_whitelist_keywords(registry_setup: Any) -> None: registry, logs_dir = registry_setup session_id = "test_kw" session_dir = logs_dir / session_id @@ -24,7 +25,7 @@ def test_auto_whitelist_keywords(registry_setup): assert registry.is_session_whitelisted(session_id) assert "ERROR" in registry.data[session_id]["metadata"]["reason"] -def test_auto_whitelist_message_count(registry_setup): +def test_auto_whitelist_message_count(registry_setup: Any) -> None: registry, logs_dir = registry_setup session_id = "test_msg_count" session_dir = logs_dir / session_id @@ -37,7 +38,7 @@ def test_auto_whitelist_message_count(registry_setup): assert registry.is_session_whitelisted(session_id) assert registry.data[session_id]["metadata"]["message_count"] == 15 -def test_auto_whitelist_large_size(registry_setup): +def test_auto_whitelist_large_size(registry_setup: Any) -> None: registry, logs_dir = registry_setup session_id = "test_large" session_dir = logs_dir / session_id @@ -50,7 +51,7 @@ def test_auto_whitelist_large_size(registry_setup): assert registry.is_session_whitelisted(session_id) assert "Large session size" in registry.data[session_id]["metadata"]["reason"] -def test_no_auto_whitelist_insignificant(registry_setup): +def test_no_auto_whitelist_insignificant(registry_setup: Any) -> None: registry, logs_dir = registry_setup session_id = "test_insignificant" session_dir = logs_dir / session_id diff --git a/tests/test_conductor_tech_lead.py b/tests/test_conductor_tech_lead.py index ec3c539..ba6c0c5 100644 --- a/tests/test_conductor_tech_lead.py +++ b/tests/test_conductor_tech_lead.py @@ -1,109 +1,109 @@ import unittest +from typing import Any from unittest.mock import patch, MagicMock import json import conductor_tech_lead class TestConductorTechLead(unittest.TestCase): - @patch('ai_client.send') - @patch('ai_client.set_provider') - @patch('ai_client.reset_session') - def test_generate_tickets_success(self, mock_reset_session, mock_set_provider, mock_send): - # Setup mock response - mock_tickets = [ - { - "id": "ticket_1", - "type": "Ticket", - "goal": "Test goal", - "target_file": "test.py", - "depends_on": [], - "context_requirements": [] - } - ] - mock_send.return_value = "```json\n" + json.dumps(mock_tickets) + "\n```" - track_brief = "Test track brief" - module_skeletons = "Test skeletons" - # Call the function - tickets = conductor_tech_lead.generate_tickets(track_brief, module_skeletons) - # Verify set_provider was called - mock_set_provider.assert_called_with('gemini', 'gemini-2.5-flash-lite') - mock_reset_session.assert_called_once() - # Verify send was called - mock_send.assert_called_once() - args, kwargs = mock_send.call_args - self.assertEqual(kwargs['md_content'], "") - self.assertIn(track_brief, kwargs['user_message']) - self.assertIn(module_skeletons, kwargs['user_message']) - # Verify tickets were parsed correctly - self.assertEqual(tickets, mock_tickets) + @patch('ai_client.send') + @patch('ai_client.set_provider') + @patch('ai_client.reset_session') + def test_generate_tickets_success(self, mock_reset_session: Any, mock_set_provider: Any, mock_send: Any) -> None: + mock_tickets = [ + { + "id": "ticket_1", + "type": "Ticket", + "goal": "Test goal", + "target_file": "test.py", + "depends_on": [], + "context_requirements": [] + } + ] + mock_send.return_value = "```json\n" + json.dumps(mock_tickets) + "\n```" + track_brief = "Test track brief" + module_skeletons = "Test skeletons" + # Call the function + tickets = conductor_tech_lead.generate_tickets(track_brief, module_skeletons) + # Verify set_provider was called + mock_set_provider.assert_called_with('gemini', 'gemini-2.5-flash-lite') + mock_reset_session.assert_called_once() + # Verify send was called + mock_send.assert_called_once() + args, kwargs = mock_send.call_args + self.assertEqual(kwargs['md_content'], "") + self.assertIn(track_brief, kwargs['user_message']) + self.assertIn(module_skeletons, kwargs['user_message']) + # Verify tickets were parsed correctly + self.assertEqual(tickets, mock_tickets) - @patch('ai_client.send') - @patch('ai_client.set_provider') - @patch('ai_client.reset_session') - def test_generate_tickets_parse_error(self, mock_reset_session, mock_set_provider, mock_send): - # Setup mock invalid response - mock_send.return_value = "Invalid JSON" - # Call the function - tickets = conductor_tech_lead.generate_tickets("brief", "skeletons") - # Verify it returns an empty list on parse error - self.assertEqual(tickets, []) + @patch('ai_client.send') + @patch('ai_client.set_provider') + @patch('ai_client.reset_session') + def test_generate_tickets_parse_error(self, mock_reset_session: Any, mock_set_provider: Any, mock_send: Any) -> None: + # Setup mock invalid response + mock_send.return_value = "Invalid JSON" + # Call the function + tickets = conductor_tech_lead.generate_tickets("brief", "skeletons") + # Verify it returns an empty list on parse error + self.assertEqual(tickets, []) class TestTopologicalSort(unittest.TestCase): - def test_topological_sort_empty(self) -> None: - tickets = [] - sorted_tickets = conductor_tech_lead.topological_sort(tickets) - self.assertEqual(sorted_tickets, []) + def test_topological_sort_empty(self) -> None: + tickets = [] + sorted_tickets = conductor_tech_lead.topological_sort(tickets) + self.assertEqual(sorted_tickets, []) - def test_topological_sort_linear(self) -> None: - tickets = [ - {"id": "t2", "depends_on": ["t1"]}, - {"id": "t1", "depends_on": []}, - {"id": "t3", "depends_on": ["t2"]}, - ] - sorted_tickets = conductor_tech_lead.topological_sort(tickets) - ids = [t["id"] for t in sorted_tickets] - self.assertEqual(ids, ["t1", "t2", "t3"]) + def test_topological_sort_linear(self) -> None: + tickets = [ + {"id": "t2", "depends_on": ["t1"]}, + {"id": "t1", "depends_on": []}, + {"id": "t3", "depends_on": ["t2"]}, + ] + sorted_tickets = conductor_tech_lead.topological_sort(tickets) + ids = [t["id"] for t in sorted_tickets] + self.assertEqual(ids, ["t1", "t2", "t3"]) - def test_topological_sort_complex(self): - # t1 - # | \ - # t2 t3 - # | / - # t4 - tickets = [ - {"id": "t4", "depends_on": ["t2", "t3"]}, - {"id": "t3", "depends_on": ["t1"]}, - {"id": "t2", "depends_on": ["t1"]}, - {"id": "t1", "depends_on": []}, - ] - sorted_tickets = conductor_tech_lead.topological_sort(tickets) - ids = [t["id"] for t in sorted_tickets] - # Possible valid orders: [t1, t2, t3, t4] or [t1, t3, t2, t4] - self.assertEqual(ids[0], "t1") - self.assertEqual(ids[-1], "t4") - self.assertSetEqual(set(ids[1:3]), {"t2", "t3"}) + def test_topological_sort_complex(self) -> None: + # t1 + # | \ + # t2 t3 + # | / + # t4 + tickets = [ + {"id": "t4", "depends_on": ["t2", "t3"]}, + {"id": "t3", "depends_on": ["t1"]}, + {"id": "t2", "depends_on": ["t1"]}, + {"id": "t1", "depends_on": []}, + ] + sorted_tickets = conductor_tech_lead.topological_sort(tickets) + ids = [t["id"] for t in sorted_tickets] + # Possible valid orders: [t1, t2, t3, t4] or [t1, t3, t2, t4] + self.assertEqual(ids[0], "t1") + self.assertEqual(ids[-1], "t4") + self.assertSetEqual(set(ids[1:3]), {"t2", "t3"}) - def test_topological_sort_cycle(self) -> None: - tickets = [ - {"id": "t1", "depends_on": ["t2"]}, - {"id": "t2", "depends_on": ["t1"]}, - ] - with self.assertRaises(ValueError) as cm: - conductor_tech_lead.topological_sort(tickets) - self.assertIn("Circular dependency detected", str(cm.exception)) + def test_topological_sort_cycle(self) -> None: + tickets = [ + {"id": "t1", "depends_on": ["t2"]}, + {"id": "t2", "depends_on": ["t1"]}, + ] + with self.assertRaises(ValueError) as cm: + conductor_tech_lead.topological_sort(tickets) + self.assertIn("Circular dependency detected", str(cm.exception)) - def test_topological_sort_missing_dependency(self): - # If a ticket depends on something not in the list, we should probably handle it or let it fail. - # Usually in our context, we only care about dependencies within the same track. - tickets = [ - {"id": "t1", "depends_on": ["missing"]}, - ] - # For now, let's assume it should raise an error if a dependency is missing within the set we are sorting, - # OR it should just treat it as "ready" if it's external? - # Actually, let's just test that it doesn't crash if it's not a cycle. - # But if 'missing' is not in tickets, it will never be satisfied. - # Let's say it raises ValueError for missing internal dependencies. - with self.assertRaises(ValueError): - conductor_tech_lead.topological_sort(tickets) + def test_topological_sort_missing_dependency(self) -> None: + # If a ticket depends on something not in the list, we should probably handle it or let it fail. + # Usually in our context, we only care about dependencies within the same track. + tickets = [ + {"id": "t1", "depends_on": ["missing"]}, + ] + # For now, let's assume it should raise an error if a dependency is missing within the set we are sorting, + # OR it should just treat it as "ready" if it's external? + # Actually, let's just test that it doesn't crash if it's not a cycle. + # But if 'missing' is not in tickets, it will never be satisfied. + # Let's say it raises ValueError for missing internal dependencies. + with self.assertRaises(ValueError): + conductor_tech_lead.topological_sort(tickets) if __name__ == '__main__': - unittest.main() + unittest.main() diff --git a/tests/test_extended_sims.py b/tests/test_extended_sims.py index 9bbe2c9..f5ef1ce 100644 --- a/tests/test_extended_sims.py +++ b/tests/test_extended_sims.py @@ -1,4 +1,5 @@ import pytest +from typing import Any import time import sys import os @@ -13,7 +14,7 @@ from simulation.sim_tools import ToolsSimulation from simulation.sim_execution import ExecutionSimulation @pytest.mark.integration -def test_context_sim_live(live_gui): +def test_context_sim_live(live_gui: Any) -> None: """Run the Context & Chat simulation against a live GUI.""" client = ApiHookClient() assert client.wait_for_server(timeout=10) @@ -23,7 +24,7 @@ def test_context_sim_live(live_gui): sim.teardown() @pytest.mark.integration -def test_ai_settings_sim_live(live_gui): +def test_ai_settings_sim_live(live_gui: Any) -> None: """Run the AI Settings simulation against a live GUI.""" client = ApiHookClient() assert client.wait_for_server(timeout=10) @@ -33,7 +34,7 @@ def test_ai_settings_sim_live(live_gui): sim.teardown() @pytest.mark.integration -def test_tools_sim_live(live_gui): +def test_tools_sim_live(live_gui: Any) -> None: """Run the Tools & Search simulation against a live GUI.""" client = ApiHookClient() assert client.wait_for_server(timeout=10) @@ -43,7 +44,7 @@ def test_tools_sim_live(live_gui): sim.teardown() @pytest.mark.integration -def test_execution_sim_live(live_gui): +def test_execution_sim_live(live_gui: Any) -> None: """Run the Execution & Modals simulation against a live GUI.""" client = ApiHookClient() assert client.wait_for_server(timeout=10) diff --git a/tests/test_gemini_cli_adapter.py b/tests/test_gemini_cli_adapter.py index 49a4aa9..363bc4b 100644 --- a/tests/test_gemini_cli_adapter.py +++ b/tests/test_gemini_cli_adapter.py @@ -1,4 +1,5 @@ import unittest +from typing import Any from unittest.mock import patch, MagicMock import json import subprocess @@ -12,105 +13,105 @@ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from gemini_cli_adapter import GeminiCliAdapter class TestGeminiCliAdapter(unittest.TestCase): - def setUp(self) -> None: - self.adapter = GeminiCliAdapter(binary_path="gemini") + def setUp(self) -> None: + self.adapter = GeminiCliAdapter(binary_path="gemini") - @patch('subprocess.Popen') - def test_send_starts_subprocess_with_correct_args(self, mock_popen): - """ + @patch('subprocess.Popen') + def test_send_starts_subprocess_with_correct_args(self, mock_popen: Any) -> None: + """ Verify that send(message) correctly starts the subprocess with --output-format stream-json and the provided message via stdin using communicate. """ - # Setup mock process with a minimal valid JSONL termination - process_mock = MagicMock() - stdout_content = json.dumps({"type": "result", "usage": {}}) + "\n" - process_mock.communicate.return_value = (stdout_content, "") - process_mock.poll.return_value = 0 - process_mock.wait.return_value = 0 - mock_popen.return_value = process_mock - message = "Hello Gemini CLI" - self.adapter.send(message) - # Verify subprocess.Popen call - mock_popen.assert_called_once() - args, kwargs = mock_popen.call_args - cmd = args[0] - # Check mandatory CLI components - self.assertIn("gemini", cmd) - self.assertIn("--output-format", cmd) - self.assertIn("stream-json", cmd) - # Message should NOT be in cmd now - self.assertNotIn(message, cmd) - # Verify message was sent via communicate - process_mock.communicate.assert_called_once_with(input=message) - # Check process configuration - self.assertEqual(kwargs.get('stdout'), subprocess.PIPE) - self.assertEqual(kwargs.get('stdin'), subprocess.PIPE) - self.assertEqual(kwargs.get('text'), True) + # Setup mock process with a minimal valid JSONL termination + process_mock = MagicMock() + stdout_content = json.dumps({"type": "result", "usage": {}}) + "\n" + process_mock.communicate.return_value = (stdout_content, "") + process_mock.poll.return_value = 0 + process_mock.wait.return_value = 0 + mock_popen.return_value = process_mock + message = "Hello Gemini CLI" + self.adapter.send(message) + # Verify subprocess.Popen call + mock_popen.assert_called_once() + args, kwargs = mock_popen.call_args + cmd = args[0] + # Check mandatory CLI components + self.assertIn("gemini", cmd) + self.assertIn("--output-format", cmd) + self.assertIn("stream-json", cmd) + # Message should NOT be in cmd now + self.assertNotIn(message, cmd) + # Verify message was sent via communicate + process_mock.communicate.assert_called_once_with(input=message) + # Check process configuration + self.assertEqual(kwargs.get('stdout'), subprocess.PIPE) + self.assertEqual(kwargs.get('stdin'), subprocess.PIPE) + self.assertEqual(kwargs.get('text'), True) - @patch('subprocess.Popen') - def test_send_parses_jsonl_output(self, mock_popen): - """ + @patch('subprocess.Popen') + def test_send_parses_jsonl_output(self, mock_popen: Any) -> None: + """ Verify that it correctly parses multiple JSONL 'message' events and returns the combined text. """ - jsonl_output = [ - json.dumps({"type": "message", "role": "model", "text": "The quick brown "}), - json.dumps({"type": "message", "role": "model", "text": "fox jumps."}), - json.dumps({"type": "result", "usage": {"prompt_tokens": 5, "candidates_tokens": 5}}) - ] - stdout_content = "\n".join(jsonl_output) + "\n" - process_mock = MagicMock() - process_mock.communicate.return_value = (stdout_content, "") - process_mock.poll.return_value = 0 - process_mock.wait.return_value = 0 - mock_popen.return_value = process_mock - result = self.adapter.send("test message") - self.assertEqual(result["text"], "The quick brown fox jumps.") - self.assertEqual(result["tool_calls"], []) + jsonl_output = [ + json.dumps({"type": "message", "role": "model", "text": "The quick brown "}), + json.dumps({"type": "message", "role": "model", "text": "fox jumps."}), + json.dumps({"type": "result", "usage": {"prompt_tokens": 5, "candidates_tokens": 5}}) + ] + stdout_content = "\n".join(jsonl_output) + "\n" + process_mock = MagicMock() + process_mock.communicate.return_value = (stdout_content, "") + process_mock.poll.return_value = 0 + process_mock.wait.return_value = 0 + mock_popen.return_value = process_mock + result = self.adapter.send("test message") + self.assertEqual(result["text"], "The quick brown fox jumps.") + self.assertEqual(result["tool_calls"], []) - @patch('subprocess.Popen') - def test_send_handles_tool_use_events(self, mock_popen): - """ + @patch('subprocess.Popen') + def test_send_handles_tool_use_events(self, mock_popen: Any) -> None: + """ Verify that it correctly handles 'tool_use' events in the stream by continuing to read until the final 'result' event. """ - jsonl_output = [ - json.dumps({"type": "message", "role": "assistant", "text": "Calling tool..."}), - json.dumps({"type": "tool_use", "name": "read_file", "args": {"path": "test.txt"}}), - json.dumps({"type": "message", "role": "assistant", "text": "\nFile read successfully."}), - json.dumps({"type": "result", "usage": {}}) - ] - stdout_content = "\n".join(jsonl_output) + "\n" - process_mock = MagicMock() - process_mock.communicate.return_value = (stdout_content, "") - process_mock.poll.return_value = 0 - process_mock.wait.return_value = 0 - mock_popen.return_value = process_mock - result = self.adapter.send("read test.txt") - # Result should contain the combined text from all 'message' events - self.assertEqual(result["text"], "Calling tool...\nFile read successfully.") - self.assertEqual(len(result["tool_calls"]), 1) - self.assertEqual(result["tool_calls"][0]["name"], "read_file") + jsonl_output = [ + json.dumps({"type": "message", "role": "assistant", "text": "Calling tool..."}), + json.dumps({"type": "tool_use", "name": "read_file", "args": {"path": "test.txt"}}), + json.dumps({"type": "message", "role": "assistant", "text": "\nFile read successfully."}), + json.dumps({"type": "result", "usage": {}}) + ] + stdout_content = "\n".join(jsonl_output) + "\n" + process_mock = MagicMock() + process_mock.communicate.return_value = (stdout_content, "") + process_mock.poll.return_value = 0 + process_mock.wait.return_value = 0 + mock_popen.return_value = process_mock + result = self.adapter.send("read test.txt") + # Result should contain the combined text from all 'message' events + self.assertEqual(result["text"], "Calling tool...\nFile read successfully.") + self.assertEqual(len(result["tool_calls"]), 1) + self.assertEqual(result["tool_calls"][0]["name"], "read_file") - @patch('subprocess.Popen') - def test_send_captures_usage_metadata(self, mock_popen): - """ + @patch('subprocess.Popen') + def test_send_captures_usage_metadata(self, mock_popen: Any) -> None: + """ Verify that usage data is extracted from the 'result' event. """ - usage_data = {"total_tokens": 42} - jsonl_output = [ - json.dumps({"type": "message", "text": "Finalizing"}), - json.dumps({"type": "result", "usage": usage_data}) - ] - stdout_content = "\n".join(jsonl_output) + "\n" - process_mock = MagicMock() - process_mock.communicate.return_value = (stdout_content, "") - process_mock.poll.return_value = 0 - process_mock.wait.return_value = 0 - mock_popen.return_value = process_mock - self.adapter.send("usage test") - # Verify the usage was captured in the adapter instance - self.assertEqual(self.adapter.last_usage, usage_data) + usage_data = {"total_tokens": 42} + jsonl_output = [ + json.dumps({"type": "message", "text": "Finalizing"}), + json.dumps({"type": "result", "usage": usage_data}) + ] + stdout_content = "\n".join(jsonl_output) + "\n" + process_mock = MagicMock() + process_mock.communicate.return_value = (stdout_content, "") + process_mock.poll.return_value = 0 + process_mock.wait.return_value = 0 + mock_popen.return_value = process_mock + self.adapter.send("usage test") + # Verify the usage was captured in the adapter instance + self.assertEqual(self.adapter.last_usage, usage_data) if __name__ == '__main__': - unittest.main() + unittest.main() diff --git a/tests/test_gui2_parity.py b/tests/test_gui2_parity.py index ac11375..a8a6fdf 100644 --- a/tests/test_gui2_parity.py +++ b/tests/test_gui2_parity.py @@ -1,4 +1,5 @@ import pytest +from typing import Any import time import json import os @@ -22,7 +23,7 @@ def cleanup_callback_file() -> None: if TEST_CALLBACK_FILE.exists(): TEST_CALLBACK_FILE.unlink() -def test_gui2_set_value_hook_works(live_gui): +def test_gui2_set_value_hook_works(live_gui: Any) -> None: """ Tests that the 'set_value' GUI hook is correctly implemented. """ @@ -37,7 +38,7 @@ def test_gui2_set_value_hook_works(live_gui): current_value = client.get_value('ai_input') assert current_value == test_value -def test_gui2_click_hook_works(live_gui): +def test_gui2_click_hook_works(live_gui: Any) -> None: """ Tests that the 'click' GUI hook for the 'Reset' button is implemented. """ @@ -54,7 +55,7 @@ def test_gui2_click_hook_works(live_gui): # Verify it was reset assert client.get_value('ai_input') == "" -def test_gui2_custom_callback_hook_works(live_gui): +def test_gui2_custom_callback_hook_works(live_gui: Any) -> None: """ Tests that the 'custom_callback' GUI hook is correctly implemented. """ diff --git a/tests/test_layout_reorganization.py b/tests/test_layout_reorganization.py index ac901c8..6c0cc64 100644 --- a/tests/test_layout_reorganization.py +++ b/tests/test_layout_reorganization.py @@ -1,4 +1,5 @@ import pytest +from typing import Any import sys import os import importlib.util @@ -40,7 +41,7 @@ def test_new_hubs_defined_in_window_info() -> None: assert l == label or label in l, f"Label mismatch for {tag}: expected {label}, found {l}" assert found, f"Expected window label {label} not found in window_info" -def test_old_windows_removed_from_window_info(app_instance_simple): +def test_old_windows_removed_from_window_info(app_instance_simple: Any) -> None: """ Verifies that the old fragmented windows are removed from window_info. """ @@ -54,14 +55,14 @@ def test_old_windows_removed_from_window_info(app_instance_simple): assert tag not in app_instance_simple.window_info.values(), f"Old window tag {tag} should have been removed from window_info" @pytest.fixture -def app_instance_simple(): +def app_instance_simple() -> Any: from unittest.mock import patch from gui_legacy import App with patch('gui_legacy.load_config', return_value={}): app = App() return app -def test_hub_windows_have_correct_flags(app_instance_simple): +def test_hub_windows_have_correct_flags(app_instance_simple: Any) -> None: """ Verifies that the new Hub windows have appropriate flags for a professional workspace. (e.g., no_collapse should be True for main hubs). @@ -80,7 +81,7 @@ def test_hub_windows_have_correct_flags(app_instance_simple): # but we can check if it's been configured if we mock dpg.window or check it manually dpg.destroy_context() -def test_indicators_exist(app_instance_simple): +def test_indicators_exist(app_instance_simple: Any) -> None: """ Verifies that the new thinking and live indicators exist in the UI. """ diff --git a/tests/test_orchestrator_pm.py b/tests/test_orchestrator_pm.py index 55032df..511860a 100644 --- a/tests/test_orchestrator_pm.py +++ b/tests/test_orchestrator_pm.py @@ -1,4 +1,5 @@ import unittest +from typing import Any from unittest.mock import patch, MagicMock import json import orchestrator_pm @@ -6,67 +7,67 @@ import mma_prompts class TestOrchestratorPM(unittest.TestCase): - @patch('summarize.build_summary_markdown') - @patch('ai_client.send') - def test_generate_tracks_success(self, mock_send, mock_summarize): - # Setup mocks - mock_summarize.return_value = "REPO_MAP_CONTENT" - mock_response_data = [ - { - "id": "track_1", - "type": "Track", - "module": "test_module", - "persona": "Tech Lead", - "severity": "Medium", - "goal": "Test goal", - "acceptance_criteria": ["criteria 1"] - } - ] - mock_send.return_value = json.dumps(mock_response_data) - user_request = "Implement unit tests" - project_config = {"files": {"paths": ["src"]}} - file_items = [{"path": "src/main.py", "content": "print('hello')"}] - # Execute - result = orchestrator_pm.generate_tracks(user_request, project_config, file_items) - # Verify summarize call - mock_summarize.assert_called_once_with(file_items) - # Verify ai_client.send call - expected_system_prompt = mma_prompts.PROMPTS['tier1_epic_init'] - mock_send.assert_called_once() - args, kwargs = mock_send.call_args - self.assertEqual(kwargs['md_content'], "") - # Cannot check system_prompt via mock_send kwargs anymore as it's set globally - # But we can verify user_message was passed - self.assertIn(user_request, kwargs['user_message']) - self.assertIn("REPO_MAP_CONTENT", kwargs['user_message']) - # Verify result - self.assertEqual(result[0]['id'], mock_response_data[0]['id']) + @patch('summarize.build_summary_markdown') + @patch('ai_client.send') + def test_generate_tracks_success(self, mock_send: Any, mock_summarize: Any) -> None: + # Setup mocks + mock_summarize.return_value = "REPO_MAP_CONTENT" + mock_response_data = [ + { + "id": "track_1", + "type": "Track", + "module": "test_module", + "persona": "Tech Lead", + "severity": "Medium", + "goal": "Test goal", + "acceptance_criteria": ["criteria 1"] + } + ] + mock_send.return_value = json.dumps(mock_response_data) + user_request = "Implement unit tests" + project_config = {"files": {"paths": ["src"]}} + file_items = [{"path": "src/main.py", "content": "print('hello')"}] + # Execute + result = orchestrator_pm.generate_tracks(user_request, project_config, file_items) + # Verify summarize call + mock_summarize.assert_called_once_with(file_items) + # Verify ai_client.send call + expected_system_prompt = mma_prompts.PROMPTS['tier1_epic_init'] + mock_send.assert_called_once() + args, kwargs = mock_send.call_args + self.assertEqual(kwargs['md_content'], "") + # Cannot check system_prompt via mock_send kwargs anymore as it's set globally + # But we can verify user_message was passed + self.assertIn(user_request, kwargs['user_message']) + self.assertIn("REPO_MAP_CONTENT", kwargs['user_message']) + # Verify result + self.assertEqual(result[0]['id'], mock_response_data[0]['id']) - @patch('summarize.build_summary_markdown') - @patch('ai_client.send') - def test_generate_tracks_markdown_wrapped(self, mock_send, mock_summarize): - mock_summarize.return_value = "REPO_MAP" - mock_response_data = [{"id": "track_1"}] - expected_result = [{"id": "track_1", "title": "Untitled Track"}] - # Wrapped in ```json ... ``` - mock_send.return_value = f"Here is the plan:\n```json\n{json.dumps(mock_response_data)}\n```\nHope this helps." - result = orchestrator_pm.generate_tracks("req", {}, []) - self.assertEqual(result, expected_result) - # Wrapped in ``` ... ``` - mock_send.return_value = f"```\n{json.dumps(mock_response_data)}\n```" - result = orchestrator_pm.generate_tracks("req", {}, []) - self.assertEqual(result, expected_result) + @patch('summarize.build_summary_markdown') + @patch('ai_client.send') + def test_generate_tracks_markdown_wrapped(self, mock_send: Any, mock_summarize: Any) -> None: + mock_summarize.return_value = "REPO_MAP" + mock_response_data = [{"id": "track_1"}] + expected_result = [{"id": "track_1", "title": "Untitled Track"}] + # Wrapped in ```json ... ``` + mock_send.return_value = f"Here is the plan:\n```json\n{json.dumps(mock_response_data)}\n```\nHope this helps." + result = orchestrator_pm.generate_tracks("req", {}, []) + self.assertEqual(result, expected_result) + # Wrapped in ``` ... ``` + mock_send.return_value = f"```\n{json.dumps(mock_response_data)}\n```" + result = orchestrator_pm.generate_tracks("req", {}, []) + self.assertEqual(result, expected_result) - @patch('summarize.build_summary_markdown') - @patch('ai_client.send') - def test_generate_tracks_malformed_json(self, mock_send, mock_summarize): - mock_summarize.return_value = "REPO_MAP" - mock_send.return_value = "NOT A JSON" - # Should return empty list and print error (we can mock print if we want to be thorough) - with patch('builtins.print') as mock_print: - result = orchestrator_pm.generate_tracks("req", {}, []) - self.assertEqual(result, []) - mock_print.assert_any_call("Error parsing Tier 1 response: Expecting value: line 1 column 1 (char 0)") + @patch('summarize.build_summary_markdown') + @patch('ai_client.send') + def test_generate_tracks_malformed_json(self, mock_send: Any, mock_summarize: Any) -> None: + mock_summarize.return_value = "REPO_MAP" + mock_send.return_value = "NOT A JSON" + # Should return empty list and print error (we can mock print if we want to be thorough) + with patch('builtins.print') as mock_print: + result = orchestrator_pm.generate_tracks("req", {}, []) + self.assertEqual(result, []) + mock_print.assert_any_call("Error parsing Tier 1 response: Expecting value: line 1 column 1 (char 0)") if __name__ == '__main__': - unittest.main() + unittest.main() diff --git a/tests/test_project_manager_tracks.py b/tests/test_project_manager_tracks.py index dd6dfe8..9cbcefc 100644 --- a/tests/test_project_manager_tracks.py +++ b/tests/test_project_manager_tracks.py @@ -1,15 +1,15 @@ import pytest +from typing import Any import json from pathlib import Path from project_manager import get_all_tracks, save_track_state from models import TrackState, Metadata, Ticket from datetime import datetime -def test_get_all_tracks_empty(tmp_path): -# conductor/tracks directory doesn't exist +def test_get_all_tracks_empty(tmp_path: Any) -> None: assert get_all_tracks(tmp_path) == [] -def test_get_all_tracks_with_state(tmp_path): +def test_get_all_tracks_with_state(tmp_path: Any) -> None: tracks_dir = tmp_path / "conductor" / "tracks" tracks_dir.mkdir(parents=True) track_id = "test_track_1" @@ -34,7 +34,7 @@ def test_get_all_tracks_with_state(tmp_path): assert track["total"] == 2 assert track["progress"] == 0.5 -def test_get_all_tracks_with_metadata_json(tmp_path): +def test_get_all_tracks_with_metadata_json(tmp_path: Any) -> None: tracks_dir = tmp_path / "conductor" / "tracks" tracks_dir.mkdir(parents=True) track_id = "test_track_2" @@ -66,7 +66,7 @@ def test_get_all_tracks_with_metadata_json(tmp_path): assert track["total"] == 3 assert pytest.approx(track["progress"]) == 0.333333 -def test_get_all_tracks_malformed(tmp_path): +def test_get_all_tracks_malformed(tmp_path: Any) -> None: tracks_dir = tmp_path / "conductor" / "tracks" tracks_dir.mkdir(parents=True) track_id = "malformed_track" diff --git a/tests/test_tiered_context.py b/tests/test_tiered_context.py index 8d1a347..e49a5c7 100644 --- a/tests/test_tiered_context.py +++ b/tests/test_tiered_context.py @@ -1,9 +1,9 @@ import pytest +from typing import Any from pathlib import Path from aggregate import build_tier1_context, build_tier2_context, build_tier3_context -def test_build_tier1_context_exists(): -# This should fail if the function is not defined +def test_build_tier1_context_exists() -> None: file_items = [ {"path": Path("conductor/product.md"), "entry": "conductor/product.md", "content": "Product content", "error": False}, {"path": Path("other.py"), "entry": "other.py", "content": "Other content", "error": False} @@ -22,7 +22,7 @@ def test_build_tier2_context_exists() -> None: result = build_tier2_context(file_items, Path("."), [], history) assert "Other content" in result -def test_build_tier3_context_ast_skeleton(monkeypatch): +def test_build_tier3_context_ast_skeleton(monkeypatch: Any) -> None: from unittest.mock import MagicMock import aggregate import file_cache @@ -59,7 +59,7 @@ def test_build_tier3_context_exists() -> None: assert "other.py" in result assert "AST Skeleton" in result -def test_build_file_items_with_tiers(tmp_path): +def test_build_file_items_with_tiers(tmp_path: Any) -> None: from aggregate import build_file_items # Create some dummy files file1 = tmp_path / "file1.txt" @@ -80,7 +80,7 @@ def test_build_file_items_with_tiers(tmp_path): assert item2["content"] == "content2" assert item2["tier"] == 3 -def test_build_files_section_with_dicts(tmp_path): +def test_build_files_section_with_dicts(tmp_path: Any) -> None: from aggregate import build_files_section file1 = tmp_path / "file1.txt" file1.write_text("content1")