refactor(tests): Add strict type hints to second batch of test files

This commit is contained in:
2026-02-28 19:11:23 -05:00
parent f0415a40aa
commit 579ee8394f
10 changed files with 358 additions and 351 deletions

View File

@@ -1,106 +1,107 @@
import pytest import pytest
from typing import Any
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
import ai_client import ai_client
class MockUsage: class MockUsage:
def __init__(self) -> None: def __init__(self) -> None:
self.prompt_token_count = 10 self.prompt_token_count = 10
self.candidates_token_count = 5 self.candidates_token_count = 5
self.total_token_count = 15 self.total_token_count = 15
self.cached_content_token_count = 0 self.cached_content_token_count = 0
class MockPart: class MockPart:
def __init__(self, text, function_call): def __init__(self, text: Any, function_call: Any) -> None:
self.text = text self.text = text
self.function_call = function_call self.function_call = function_call
class MockContent: class MockContent:
def __init__(self, parts): def __init__(self, parts: Any) -> None:
self.parts = parts self.parts = parts
class MockCandidate: class MockCandidate:
def __init__(self, parts): def __init__(self, parts: Any) -> None:
self.content = MockContent(parts) self.content = MockContent(parts)
self.finish_reason = MagicMock() self.finish_reason = MagicMock()
self.finish_reason.name = "STOP" self.finish_reason.name = "STOP"
def test_ai_client_event_emitter_exists(): def test_ai_client_event_emitter_exists() -> None:
# This should fail initially because 'events' won't exist on ai_client # This should fail initially because 'events' won't exist on ai_client
assert hasattr(ai_client, 'events') assert hasattr(ai_client, 'events')
def test_event_emission() -> None: def test_event_emission() -> None:
callback = MagicMock() callback = MagicMock()
ai_client.events.on("test_event", callback) ai_client.events.on("test_event", callback)
ai_client.events.emit("test_event", payload={"data": 123}) ai_client.events.emit("test_event", payload={"data": 123})
callback.assert_called_once_with(payload={"data": 123}) callback.assert_called_once_with(payload={"data": 123})
def test_send_emits_events() -> None: def test_send_emits_events() -> None:
with patch("ai_client._send_gemini") as mock_send_gemini, \ with patch("ai_client._send_gemini") as mock_send_gemini, \
patch("ai_client._send_anthropic") as mock_send_anthropic: patch("ai_client._send_anthropic") as mock_send_anthropic:
mock_send_gemini.return_value = "gemini response" mock_send_gemini.return_value = "gemini response"
start_callback = MagicMock() start_callback = MagicMock()
response_callback = MagicMock() response_callback = MagicMock()
ai_client.events.on("request_start", start_callback) ai_client.events.on("request_start", start_callback)
ai_client.events.on("response_received", response_callback) ai_client.events.on("response_received", response_callback)
ai_client.set_provider("gemini", "gemini-2.5-flash-lite") ai_client.set_provider("gemini", "gemini-2.5-flash-lite")
ai_client.send("context", "message") ai_client.send("context", "message")
# We mocked _send_gemini so it doesn't emit events inside. # We mocked _send_gemini so it doesn't emit events inside.
# But wait, ai_client.send itself emits request_start and response_received? # But wait, ai_client.send itself emits request_start and response_received?
# Actually, ai_client.send delegates to _send_gemini. # Actually, ai_client.send delegates to _send_gemini.
# Let's mock _gemini_client instead to let _send_gemini run and emit events. # Let's mock _gemini_client instead to let _send_gemini run and emit events.
pass pass
def test_send_emits_events_proper() -> None: def test_send_emits_events_proper() -> None:
with patch("ai_client._ensure_gemini_client"), \ with patch("ai_client._ensure_gemini_client"), \
patch("ai_client._gemini_client") as mock_client: patch("ai_client._gemini_client") as mock_client:
mock_chat = MagicMock() mock_chat = MagicMock()
mock_client.chats.create.return_value = mock_chat mock_client.chats.create.return_value = mock_chat
mock_response = MagicMock() mock_response = MagicMock()
mock_response.candidates = [MockCandidate([MockPart("gemini response", None)])] mock_response.candidates = [MockCandidate([MockPart("gemini response", None)])]
mock_response.usage_metadata = MockUsage() mock_response.usage_metadata = MockUsage()
mock_chat.send_message.return_value = mock_response mock_chat.send_message.return_value = mock_response
start_callback = MagicMock() start_callback = MagicMock()
response_callback = MagicMock() response_callback = MagicMock()
ai_client.events.on("request_start", start_callback) ai_client.events.on("request_start", start_callback)
ai_client.events.on("response_received", response_callback) ai_client.events.on("response_received", response_callback)
ai_client.set_provider("gemini", "gemini-2.5-flash-lite") ai_client.set_provider("gemini", "gemini-2.5-flash-lite")
ai_client.send("context", "message") ai_client.send("context", "message")
assert start_callback.called assert start_callback.called
assert response_callback.called assert response_callback.called
args, kwargs = start_callback.call_args args, kwargs = start_callback.call_args
assert kwargs['payload']['provider'] == 'gemini' assert kwargs['payload']['provider'] == 'gemini'
def test_send_emits_tool_events() -> None: def test_send_emits_tool_events() -> None:
import mcp_client import mcp_client
with patch("ai_client._ensure_gemini_client"), \ with patch("ai_client._ensure_gemini_client"), \
patch("ai_client._gemini_client") as mock_client, \ patch("ai_client._gemini_client") as mock_client, \
patch("mcp_client.dispatch") as mock_dispatch: patch("mcp_client.dispatch") as mock_dispatch:
mock_chat = MagicMock() mock_chat = MagicMock()
mock_client.chats.create.return_value = mock_chat mock_client.chats.create.return_value = mock_chat
# 1. Setup mock response with a tool call # 1. Setup mock response with a tool call
mock_fc = MagicMock() mock_fc = MagicMock()
mock_fc.name = "read_file" mock_fc.name = "read_file"
mock_fc.args = {"path": "test.txt"} mock_fc.args = {"path": "test.txt"}
mock_response_with_tool = MagicMock() mock_response_with_tool = MagicMock()
mock_response_with_tool.candidates = [MockCandidate([MockPart("tool call text", mock_fc)])] mock_response_with_tool.candidates = [MockCandidate([MockPart("tool call text", mock_fc)])]
mock_response_with_tool.usage_metadata = MockUsage() mock_response_with_tool.usage_metadata = MockUsage()
# 2. Setup second mock response (final answer) # 2. Setup second mock response (final answer)
mock_response_final = MagicMock() mock_response_final = MagicMock()
mock_response_final.candidates = [MockCandidate([MockPart("final answer", None)])] mock_response_final.candidates = [MockCandidate([MockPart("final answer", None)])]
mock_response_final.usage_metadata = MockUsage() mock_response_final.usage_metadata = MockUsage()
mock_chat.send_message.side_effect = [mock_response_with_tool, mock_response_final] mock_chat.send_message.side_effect = [mock_response_with_tool, mock_response_final]
mock_dispatch.return_value = "file content" mock_dispatch.return_value = "file content"
ai_client.set_provider("gemini", "gemini-2.5-flash-lite") ai_client.set_provider("gemini", "gemini-2.5-flash-lite")
tool_callback = MagicMock() tool_callback = MagicMock()
ai_client.events.on("tool_execution", tool_callback) ai_client.events.on("tool_execution", tool_callback)
ai_client.send("context", "message") ai_client.send("context", "message")
# Should be called twice: once for 'started', once for 'completed' # Should be called twice: once for 'started', once for 'completed'
assert tool_callback.call_count == 2 assert tool_callback.call_count == 2
# Check 'started' call # Check 'started' call
args, kwargs = tool_callback.call_args_list[0] args, kwargs = tool_callback.call_args_list[0]
assert kwargs['payload']['status'] == 'started' assert kwargs['payload']['status'] == 'started'
assert kwargs['payload']['tool'] == 'read_file' assert kwargs['payload']['tool'] == 'read_file'
# Check 'completed' call # Check 'completed' call
args, kwargs = tool_callback.call_args_list[1] args, kwargs = tool_callback.call_args_list[1]
assert kwargs['payload']['status'] == 'completed' assert kwargs['payload']['status'] == 'completed'
assert kwargs['payload']['result'] == 'file content' assert kwargs['payload']['result'] == 'file content'

View File

@@ -1,17 +1,18 @@
import os import os
import pytest import pytest
from typing import Any
from datetime import datetime from datetime import datetime
from log_registry import LogRegistry from log_registry import LogRegistry
@pytest.fixture @pytest.fixture
def registry_setup(tmp_path): def registry_setup(tmp_path: Any) -> Any:
registry_path = tmp_path / "log_registry.toml" registry_path = tmp_path / "log_registry.toml"
logs_dir = tmp_path / "logs" logs_dir = tmp_path / "logs"
logs_dir.mkdir() logs_dir.mkdir()
registry = LogRegistry(str(registry_path)) registry = LogRegistry(str(registry_path))
return registry, logs_dir return registry, logs_dir
def test_auto_whitelist_keywords(registry_setup): def test_auto_whitelist_keywords(registry_setup: Any) -> None:
registry, logs_dir = registry_setup registry, logs_dir = registry_setup
session_id = "test_kw" session_id = "test_kw"
session_dir = logs_dir / session_id session_dir = logs_dir / session_id
@@ -24,7 +25,7 @@ def test_auto_whitelist_keywords(registry_setup):
assert registry.is_session_whitelisted(session_id) assert registry.is_session_whitelisted(session_id)
assert "ERROR" in registry.data[session_id]["metadata"]["reason"] assert "ERROR" in registry.data[session_id]["metadata"]["reason"]
def test_auto_whitelist_message_count(registry_setup): def test_auto_whitelist_message_count(registry_setup: Any) -> None:
registry, logs_dir = registry_setup registry, logs_dir = registry_setup
session_id = "test_msg_count" session_id = "test_msg_count"
session_dir = logs_dir / session_id session_dir = logs_dir / session_id
@@ -37,7 +38,7 @@ def test_auto_whitelist_message_count(registry_setup):
assert registry.is_session_whitelisted(session_id) assert registry.is_session_whitelisted(session_id)
assert registry.data[session_id]["metadata"]["message_count"] == 15 assert registry.data[session_id]["metadata"]["message_count"] == 15
def test_auto_whitelist_large_size(registry_setup): def test_auto_whitelist_large_size(registry_setup: Any) -> None:
registry, logs_dir = registry_setup registry, logs_dir = registry_setup
session_id = "test_large" session_id = "test_large"
session_dir = logs_dir / session_id session_dir = logs_dir / session_id
@@ -50,7 +51,7 @@ def test_auto_whitelist_large_size(registry_setup):
assert registry.is_session_whitelisted(session_id) assert registry.is_session_whitelisted(session_id)
assert "Large session size" in registry.data[session_id]["metadata"]["reason"] assert "Large session size" in registry.data[session_id]["metadata"]["reason"]
def test_no_auto_whitelist_insignificant(registry_setup): def test_no_auto_whitelist_insignificant(registry_setup: Any) -> None:
registry, logs_dir = registry_setup registry, logs_dir = registry_setup
session_id = "test_insignificant" session_id = "test_insignificant"
session_dir = logs_dir / session_id session_dir = logs_dir / session_id

View File

@@ -1,109 +1,109 @@
import unittest import unittest
from typing import Any
from unittest.mock import patch, MagicMock from unittest.mock import patch, MagicMock
import json import json
import conductor_tech_lead import conductor_tech_lead
class TestConductorTechLead(unittest.TestCase): class TestConductorTechLead(unittest.TestCase):
@patch('ai_client.send') @patch('ai_client.send')
@patch('ai_client.set_provider') @patch('ai_client.set_provider')
@patch('ai_client.reset_session') @patch('ai_client.reset_session')
def test_generate_tickets_success(self, mock_reset_session, mock_set_provider, mock_send): def test_generate_tickets_success(self, mock_reset_session: Any, mock_set_provider: Any, mock_send: Any) -> None:
# Setup mock response mock_tickets = [
mock_tickets = [ {
{ "id": "ticket_1",
"id": "ticket_1", "type": "Ticket",
"type": "Ticket", "goal": "Test goal",
"goal": "Test goal", "target_file": "test.py",
"target_file": "test.py", "depends_on": [],
"depends_on": [], "context_requirements": []
"context_requirements": [] }
} ]
] mock_send.return_value = "```json\n" + json.dumps(mock_tickets) + "\n```"
mock_send.return_value = "```json\n" + json.dumps(mock_tickets) + "\n```" track_brief = "Test track brief"
track_brief = "Test track brief" module_skeletons = "Test skeletons"
module_skeletons = "Test skeletons" # Call the function
# Call the function tickets = conductor_tech_lead.generate_tickets(track_brief, module_skeletons)
tickets = conductor_tech_lead.generate_tickets(track_brief, module_skeletons) # Verify set_provider was called
# Verify set_provider was called mock_set_provider.assert_called_with('gemini', 'gemini-2.5-flash-lite')
mock_set_provider.assert_called_with('gemini', 'gemini-2.5-flash-lite') mock_reset_session.assert_called_once()
mock_reset_session.assert_called_once() # Verify send was called
# Verify send was called mock_send.assert_called_once()
mock_send.assert_called_once() args, kwargs = mock_send.call_args
args, kwargs = mock_send.call_args self.assertEqual(kwargs['md_content'], "")
self.assertEqual(kwargs['md_content'], "") self.assertIn(track_brief, kwargs['user_message'])
self.assertIn(track_brief, kwargs['user_message']) self.assertIn(module_skeletons, kwargs['user_message'])
self.assertIn(module_skeletons, kwargs['user_message']) # Verify tickets were parsed correctly
# Verify tickets were parsed correctly self.assertEqual(tickets, mock_tickets)
self.assertEqual(tickets, mock_tickets)
@patch('ai_client.send') @patch('ai_client.send')
@patch('ai_client.set_provider') @patch('ai_client.set_provider')
@patch('ai_client.reset_session') @patch('ai_client.reset_session')
def test_generate_tickets_parse_error(self, mock_reset_session, mock_set_provider, mock_send): def test_generate_tickets_parse_error(self, mock_reset_session: Any, mock_set_provider: Any, mock_send: Any) -> None:
# Setup mock invalid response # Setup mock invalid response
mock_send.return_value = "Invalid JSON" mock_send.return_value = "Invalid JSON"
# Call the function # Call the function
tickets = conductor_tech_lead.generate_tickets("brief", "skeletons") tickets = conductor_tech_lead.generate_tickets("brief", "skeletons")
# Verify it returns an empty list on parse error # Verify it returns an empty list on parse error
self.assertEqual(tickets, []) self.assertEqual(tickets, [])
class TestTopologicalSort(unittest.TestCase): class TestTopologicalSort(unittest.TestCase):
def test_topological_sort_empty(self) -> None: def test_topological_sort_empty(self) -> None:
tickets = [] tickets = []
sorted_tickets = conductor_tech_lead.topological_sort(tickets) sorted_tickets = conductor_tech_lead.topological_sort(tickets)
self.assertEqual(sorted_tickets, []) self.assertEqual(sorted_tickets, [])
def test_topological_sort_linear(self) -> None: def test_topological_sort_linear(self) -> None:
tickets = [ tickets = [
{"id": "t2", "depends_on": ["t1"]}, {"id": "t2", "depends_on": ["t1"]},
{"id": "t1", "depends_on": []}, {"id": "t1", "depends_on": []},
{"id": "t3", "depends_on": ["t2"]}, {"id": "t3", "depends_on": ["t2"]},
] ]
sorted_tickets = conductor_tech_lead.topological_sort(tickets) sorted_tickets = conductor_tech_lead.topological_sort(tickets)
ids = [t["id"] for t in sorted_tickets] ids = [t["id"] for t in sorted_tickets]
self.assertEqual(ids, ["t1", "t2", "t3"]) self.assertEqual(ids, ["t1", "t2", "t3"])
def test_topological_sort_complex(self): def test_topological_sort_complex(self) -> None:
# t1 # t1
# | \ # | \
# t2 t3 # t2 t3
# | / # | /
# t4 # t4
tickets = [ tickets = [
{"id": "t4", "depends_on": ["t2", "t3"]}, {"id": "t4", "depends_on": ["t2", "t3"]},
{"id": "t3", "depends_on": ["t1"]}, {"id": "t3", "depends_on": ["t1"]},
{"id": "t2", "depends_on": ["t1"]}, {"id": "t2", "depends_on": ["t1"]},
{"id": "t1", "depends_on": []}, {"id": "t1", "depends_on": []},
] ]
sorted_tickets = conductor_tech_lead.topological_sort(tickets) sorted_tickets = conductor_tech_lead.topological_sort(tickets)
ids = [t["id"] for t in sorted_tickets] ids = [t["id"] for t in sorted_tickets]
# Possible valid orders: [t1, t2, t3, t4] or [t1, t3, t2, t4] # Possible valid orders: [t1, t2, t3, t4] or [t1, t3, t2, t4]
self.assertEqual(ids[0], "t1") self.assertEqual(ids[0], "t1")
self.assertEqual(ids[-1], "t4") self.assertEqual(ids[-1], "t4")
self.assertSetEqual(set(ids[1:3]), {"t2", "t3"}) self.assertSetEqual(set(ids[1:3]), {"t2", "t3"})
def test_topological_sort_cycle(self) -> None: def test_topological_sort_cycle(self) -> None:
tickets = [ tickets = [
{"id": "t1", "depends_on": ["t2"]}, {"id": "t1", "depends_on": ["t2"]},
{"id": "t2", "depends_on": ["t1"]}, {"id": "t2", "depends_on": ["t1"]},
] ]
with self.assertRaises(ValueError) as cm: with self.assertRaises(ValueError) as cm:
conductor_tech_lead.topological_sort(tickets) conductor_tech_lead.topological_sort(tickets)
self.assertIn("Circular dependency detected", str(cm.exception)) self.assertIn("Circular dependency detected", str(cm.exception))
def test_topological_sort_missing_dependency(self): def test_topological_sort_missing_dependency(self) -> None:
# If a ticket depends on something not in the list, we should probably handle it or let it fail. # If a ticket depends on something not in the list, we should probably handle it or let it fail.
# Usually in our context, we only care about dependencies within the same track. # Usually in our context, we only care about dependencies within the same track.
tickets = [ tickets = [
{"id": "t1", "depends_on": ["missing"]}, {"id": "t1", "depends_on": ["missing"]},
] ]
# For now, let's assume it should raise an error if a dependency is missing within the set we are sorting, # For now, let's assume it should raise an error if a dependency is missing within the set we are sorting,
# OR it should just treat it as "ready" if it's external? # OR it should just treat it as "ready" if it's external?
# Actually, let's just test that it doesn't crash if it's not a cycle. # Actually, let's just test that it doesn't crash if it's not a cycle.
# But if 'missing' is not in tickets, it will never be satisfied. # But if 'missing' is not in tickets, it will never be satisfied.
# Let's say it raises ValueError for missing internal dependencies. # Let's say it raises ValueError for missing internal dependencies.
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
conductor_tech_lead.topological_sort(tickets) conductor_tech_lead.topological_sort(tickets)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@@ -1,4 +1,5 @@
import pytest import pytest
from typing import Any
import time import time
import sys import sys
import os import os
@@ -13,7 +14,7 @@ from simulation.sim_tools import ToolsSimulation
from simulation.sim_execution import ExecutionSimulation from simulation.sim_execution import ExecutionSimulation
@pytest.mark.integration @pytest.mark.integration
def test_context_sim_live(live_gui): def test_context_sim_live(live_gui: Any) -> None:
"""Run the Context & Chat simulation against a live GUI.""" """Run the Context & Chat simulation against a live GUI."""
client = ApiHookClient() client = ApiHookClient()
assert client.wait_for_server(timeout=10) assert client.wait_for_server(timeout=10)
@@ -23,7 +24,7 @@ def test_context_sim_live(live_gui):
sim.teardown() sim.teardown()
@pytest.mark.integration @pytest.mark.integration
def test_ai_settings_sim_live(live_gui): def test_ai_settings_sim_live(live_gui: Any) -> None:
"""Run the AI Settings simulation against a live GUI.""" """Run the AI Settings simulation against a live GUI."""
client = ApiHookClient() client = ApiHookClient()
assert client.wait_for_server(timeout=10) assert client.wait_for_server(timeout=10)
@@ -33,7 +34,7 @@ def test_ai_settings_sim_live(live_gui):
sim.teardown() sim.teardown()
@pytest.mark.integration @pytest.mark.integration
def test_tools_sim_live(live_gui): def test_tools_sim_live(live_gui: Any) -> None:
"""Run the Tools & Search simulation against a live GUI.""" """Run the Tools & Search simulation against a live GUI."""
client = ApiHookClient() client = ApiHookClient()
assert client.wait_for_server(timeout=10) assert client.wait_for_server(timeout=10)
@@ -43,7 +44,7 @@ def test_tools_sim_live(live_gui):
sim.teardown() sim.teardown()
@pytest.mark.integration @pytest.mark.integration
def test_execution_sim_live(live_gui): def test_execution_sim_live(live_gui: Any) -> None:
"""Run the Execution & Modals simulation against a live GUI.""" """Run the Execution & Modals simulation against a live GUI."""
client = ApiHookClient() client = ApiHookClient()
assert client.wait_for_server(timeout=10) assert client.wait_for_server(timeout=10)

View File

@@ -1,4 +1,5 @@
import unittest import unittest
from typing import Any
from unittest.mock import patch, MagicMock from unittest.mock import patch, MagicMock
import json import json
import subprocess import subprocess
@@ -12,105 +13,105 @@ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from gemini_cli_adapter import GeminiCliAdapter from gemini_cli_adapter import GeminiCliAdapter
class TestGeminiCliAdapter(unittest.TestCase): class TestGeminiCliAdapter(unittest.TestCase):
def setUp(self) -> None: def setUp(self) -> None:
self.adapter = GeminiCliAdapter(binary_path="gemini") self.adapter = GeminiCliAdapter(binary_path="gemini")
@patch('subprocess.Popen') @patch('subprocess.Popen')
def test_send_starts_subprocess_with_correct_args(self, mock_popen): def test_send_starts_subprocess_with_correct_args(self, mock_popen: Any) -> None:
""" """
Verify that send(message) correctly starts the subprocess with Verify that send(message) correctly starts the subprocess with
--output-format stream-json and the provided message via stdin using communicate. --output-format stream-json and the provided message via stdin using communicate.
""" """
# Setup mock process with a minimal valid JSONL termination # Setup mock process with a minimal valid JSONL termination
process_mock = MagicMock() process_mock = MagicMock()
stdout_content = json.dumps({"type": "result", "usage": {}}) + "\n" stdout_content = json.dumps({"type": "result", "usage": {}}) + "\n"
process_mock.communicate.return_value = (stdout_content, "") process_mock.communicate.return_value = (stdout_content, "")
process_mock.poll.return_value = 0 process_mock.poll.return_value = 0
process_mock.wait.return_value = 0 process_mock.wait.return_value = 0
mock_popen.return_value = process_mock mock_popen.return_value = process_mock
message = "Hello Gemini CLI" message = "Hello Gemini CLI"
self.adapter.send(message) self.adapter.send(message)
# Verify subprocess.Popen call # Verify subprocess.Popen call
mock_popen.assert_called_once() mock_popen.assert_called_once()
args, kwargs = mock_popen.call_args args, kwargs = mock_popen.call_args
cmd = args[0] cmd = args[0]
# Check mandatory CLI components # Check mandatory CLI components
self.assertIn("gemini", cmd) self.assertIn("gemini", cmd)
self.assertIn("--output-format", cmd) self.assertIn("--output-format", cmd)
self.assertIn("stream-json", cmd) self.assertIn("stream-json", cmd)
# Message should NOT be in cmd now # Message should NOT be in cmd now
self.assertNotIn(message, cmd) self.assertNotIn(message, cmd)
# Verify message was sent via communicate # Verify message was sent via communicate
process_mock.communicate.assert_called_once_with(input=message) process_mock.communicate.assert_called_once_with(input=message)
# Check process configuration # Check process configuration
self.assertEqual(kwargs.get('stdout'), subprocess.PIPE) self.assertEqual(kwargs.get('stdout'), subprocess.PIPE)
self.assertEqual(kwargs.get('stdin'), subprocess.PIPE) self.assertEqual(kwargs.get('stdin'), subprocess.PIPE)
self.assertEqual(kwargs.get('text'), True) self.assertEqual(kwargs.get('text'), True)
@patch('subprocess.Popen') @patch('subprocess.Popen')
def test_send_parses_jsonl_output(self, mock_popen): def test_send_parses_jsonl_output(self, mock_popen: Any) -> None:
""" """
Verify that it correctly parses multiple JSONL 'message' events Verify that it correctly parses multiple JSONL 'message' events
and returns the combined text. and returns the combined text.
""" """
jsonl_output = [ jsonl_output = [
json.dumps({"type": "message", "role": "model", "text": "The quick brown "}), json.dumps({"type": "message", "role": "model", "text": "The quick brown "}),
json.dumps({"type": "message", "role": "model", "text": "fox jumps."}), json.dumps({"type": "message", "role": "model", "text": "fox jumps."}),
json.dumps({"type": "result", "usage": {"prompt_tokens": 5, "candidates_tokens": 5}}) json.dumps({"type": "result", "usage": {"prompt_tokens": 5, "candidates_tokens": 5}})
] ]
stdout_content = "\n".join(jsonl_output) + "\n" stdout_content = "\n".join(jsonl_output) + "\n"
process_mock = MagicMock() process_mock = MagicMock()
process_mock.communicate.return_value = (stdout_content, "") process_mock.communicate.return_value = (stdout_content, "")
process_mock.poll.return_value = 0 process_mock.poll.return_value = 0
process_mock.wait.return_value = 0 process_mock.wait.return_value = 0
mock_popen.return_value = process_mock mock_popen.return_value = process_mock
result = self.adapter.send("test message") result = self.adapter.send("test message")
self.assertEqual(result["text"], "The quick brown fox jumps.") self.assertEqual(result["text"], "The quick brown fox jumps.")
self.assertEqual(result["tool_calls"], []) self.assertEqual(result["tool_calls"], [])
@patch('subprocess.Popen') @patch('subprocess.Popen')
def test_send_handles_tool_use_events(self, mock_popen): def test_send_handles_tool_use_events(self, mock_popen: Any) -> None:
""" """
Verify that it correctly handles 'tool_use' events in the stream Verify that it correctly handles 'tool_use' events in the stream
by continuing to read until the final 'result' event. by continuing to read until the final 'result' event.
""" """
jsonl_output = [ jsonl_output = [
json.dumps({"type": "message", "role": "assistant", "text": "Calling tool..."}), json.dumps({"type": "message", "role": "assistant", "text": "Calling tool..."}),
json.dumps({"type": "tool_use", "name": "read_file", "args": {"path": "test.txt"}}), json.dumps({"type": "tool_use", "name": "read_file", "args": {"path": "test.txt"}}),
json.dumps({"type": "message", "role": "assistant", "text": "\nFile read successfully."}), json.dumps({"type": "message", "role": "assistant", "text": "\nFile read successfully."}),
json.dumps({"type": "result", "usage": {}}) json.dumps({"type": "result", "usage": {}})
] ]
stdout_content = "\n".join(jsonl_output) + "\n" stdout_content = "\n".join(jsonl_output) + "\n"
process_mock = MagicMock() process_mock = MagicMock()
process_mock.communicate.return_value = (stdout_content, "") process_mock.communicate.return_value = (stdout_content, "")
process_mock.poll.return_value = 0 process_mock.poll.return_value = 0
process_mock.wait.return_value = 0 process_mock.wait.return_value = 0
mock_popen.return_value = process_mock mock_popen.return_value = process_mock
result = self.adapter.send("read test.txt") result = self.adapter.send("read test.txt")
# Result should contain the combined text from all 'message' events # Result should contain the combined text from all 'message' events
self.assertEqual(result["text"], "Calling tool...\nFile read successfully.") self.assertEqual(result["text"], "Calling tool...\nFile read successfully.")
self.assertEqual(len(result["tool_calls"]), 1) self.assertEqual(len(result["tool_calls"]), 1)
self.assertEqual(result["tool_calls"][0]["name"], "read_file") self.assertEqual(result["tool_calls"][0]["name"], "read_file")
@patch('subprocess.Popen') @patch('subprocess.Popen')
def test_send_captures_usage_metadata(self, mock_popen): def test_send_captures_usage_metadata(self, mock_popen: Any) -> None:
""" """
Verify that usage data is extracted from the 'result' event. Verify that usage data is extracted from the 'result' event.
""" """
usage_data = {"total_tokens": 42} usage_data = {"total_tokens": 42}
jsonl_output = [ jsonl_output = [
json.dumps({"type": "message", "text": "Finalizing"}), json.dumps({"type": "message", "text": "Finalizing"}),
json.dumps({"type": "result", "usage": usage_data}) json.dumps({"type": "result", "usage": usage_data})
] ]
stdout_content = "\n".join(jsonl_output) + "\n" stdout_content = "\n".join(jsonl_output) + "\n"
process_mock = MagicMock() process_mock = MagicMock()
process_mock.communicate.return_value = (stdout_content, "") process_mock.communicate.return_value = (stdout_content, "")
process_mock.poll.return_value = 0 process_mock.poll.return_value = 0
process_mock.wait.return_value = 0 process_mock.wait.return_value = 0
mock_popen.return_value = process_mock mock_popen.return_value = process_mock
self.adapter.send("usage test") self.adapter.send("usage test")
# Verify the usage was captured in the adapter instance # Verify the usage was captured in the adapter instance
self.assertEqual(self.adapter.last_usage, usage_data) self.assertEqual(self.adapter.last_usage, usage_data)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@@ -1,4 +1,5 @@
import pytest import pytest
from typing import Any
import time import time
import json import json
import os import os
@@ -22,7 +23,7 @@ def cleanup_callback_file() -> None:
if TEST_CALLBACK_FILE.exists(): if TEST_CALLBACK_FILE.exists():
TEST_CALLBACK_FILE.unlink() TEST_CALLBACK_FILE.unlink()
def test_gui2_set_value_hook_works(live_gui): def test_gui2_set_value_hook_works(live_gui: Any) -> None:
""" """
Tests that the 'set_value' GUI hook is correctly implemented. Tests that the 'set_value' GUI hook is correctly implemented.
""" """
@@ -37,7 +38,7 @@ def test_gui2_set_value_hook_works(live_gui):
current_value = client.get_value('ai_input') current_value = client.get_value('ai_input')
assert current_value == test_value assert current_value == test_value
def test_gui2_click_hook_works(live_gui): def test_gui2_click_hook_works(live_gui: Any) -> None:
""" """
Tests that the 'click' GUI hook for the 'Reset' button is implemented. Tests that the 'click' GUI hook for the 'Reset' button is implemented.
""" """
@@ -54,7 +55,7 @@ def test_gui2_click_hook_works(live_gui):
# Verify it was reset # Verify it was reset
assert client.get_value('ai_input') == "" assert client.get_value('ai_input') == ""
def test_gui2_custom_callback_hook_works(live_gui): def test_gui2_custom_callback_hook_works(live_gui: Any) -> None:
""" """
Tests that the 'custom_callback' GUI hook is correctly implemented. Tests that the 'custom_callback' GUI hook is correctly implemented.
""" """

View File

@@ -1,4 +1,5 @@
import pytest import pytest
from typing import Any
import sys import sys
import os import os
import importlib.util import importlib.util
@@ -40,7 +41,7 @@ def test_new_hubs_defined_in_window_info() -> None:
assert l == label or label in l, f"Label mismatch for {tag}: expected {label}, found {l}" assert l == label or label in l, f"Label mismatch for {tag}: expected {label}, found {l}"
assert found, f"Expected window label {label} not found in window_info" assert found, f"Expected window label {label} not found in window_info"
def test_old_windows_removed_from_window_info(app_instance_simple): def test_old_windows_removed_from_window_info(app_instance_simple: Any) -> None:
""" """
Verifies that the old fragmented windows are removed from window_info. Verifies that the old fragmented windows are removed from window_info.
""" """
@@ -54,14 +55,14 @@ def test_old_windows_removed_from_window_info(app_instance_simple):
assert tag not in app_instance_simple.window_info.values(), f"Old window tag {tag} should have been removed from window_info" assert tag not in app_instance_simple.window_info.values(), f"Old window tag {tag} should have been removed from window_info"
@pytest.fixture @pytest.fixture
def app_instance_simple(): def app_instance_simple() -> Any:
from unittest.mock import patch from unittest.mock import patch
from gui_legacy import App from gui_legacy import App
with patch('gui_legacy.load_config', return_value={}): with patch('gui_legacy.load_config', return_value={}):
app = App() app = App()
return app return app
def test_hub_windows_have_correct_flags(app_instance_simple): def test_hub_windows_have_correct_flags(app_instance_simple: Any) -> None:
""" """
Verifies that the new Hub windows have appropriate flags for a professional workspace. Verifies that the new Hub windows have appropriate flags for a professional workspace.
(e.g., no_collapse should be True for main hubs). (e.g., no_collapse should be True for main hubs).
@@ -80,7 +81,7 @@ def test_hub_windows_have_correct_flags(app_instance_simple):
# but we can check if it's been configured if we mock dpg.window or check it manually # but we can check if it's been configured if we mock dpg.window or check it manually
dpg.destroy_context() dpg.destroy_context()
def test_indicators_exist(app_instance_simple): def test_indicators_exist(app_instance_simple: Any) -> None:
""" """
Verifies that the new thinking and live indicators exist in the UI. Verifies that the new thinking and live indicators exist in the UI.
""" """

View File

@@ -1,4 +1,5 @@
import unittest import unittest
from typing import Any
from unittest.mock import patch, MagicMock from unittest.mock import patch, MagicMock
import json import json
import orchestrator_pm import orchestrator_pm
@@ -6,67 +7,67 @@ import mma_prompts
class TestOrchestratorPM(unittest.TestCase): class TestOrchestratorPM(unittest.TestCase):
@patch('summarize.build_summary_markdown') @patch('summarize.build_summary_markdown')
@patch('ai_client.send') @patch('ai_client.send')
def test_generate_tracks_success(self, mock_send, mock_summarize): def test_generate_tracks_success(self, mock_send: Any, mock_summarize: Any) -> None:
# Setup mocks # Setup mocks
mock_summarize.return_value = "REPO_MAP_CONTENT" mock_summarize.return_value = "REPO_MAP_CONTENT"
mock_response_data = [ mock_response_data = [
{ {
"id": "track_1", "id": "track_1",
"type": "Track", "type": "Track",
"module": "test_module", "module": "test_module",
"persona": "Tech Lead", "persona": "Tech Lead",
"severity": "Medium", "severity": "Medium",
"goal": "Test goal", "goal": "Test goal",
"acceptance_criteria": ["criteria 1"] "acceptance_criteria": ["criteria 1"]
} }
] ]
mock_send.return_value = json.dumps(mock_response_data) mock_send.return_value = json.dumps(mock_response_data)
user_request = "Implement unit tests" user_request = "Implement unit tests"
project_config = {"files": {"paths": ["src"]}} project_config = {"files": {"paths": ["src"]}}
file_items = [{"path": "src/main.py", "content": "print('hello')"}] file_items = [{"path": "src/main.py", "content": "print('hello')"}]
# Execute # Execute
result = orchestrator_pm.generate_tracks(user_request, project_config, file_items) result = orchestrator_pm.generate_tracks(user_request, project_config, file_items)
# Verify summarize call # Verify summarize call
mock_summarize.assert_called_once_with(file_items) mock_summarize.assert_called_once_with(file_items)
# Verify ai_client.send call # Verify ai_client.send call
expected_system_prompt = mma_prompts.PROMPTS['tier1_epic_init'] expected_system_prompt = mma_prompts.PROMPTS['tier1_epic_init']
mock_send.assert_called_once() mock_send.assert_called_once()
args, kwargs = mock_send.call_args args, kwargs = mock_send.call_args
self.assertEqual(kwargs['md_content'], "") self.assertEqual(kwargs['md_content'], "")
# Cannot check system_prompt via mock_send kwargs anymore as it's set globally # Cannot check system_prompt via mock_send kwargs anymore as it's set globally
# But we can verify user_message was passed # But we can verify user_message was passed
self.assertIn(user_request, kwargs['user_message']) self.assertIn(user_request, kwargs['user_message'])
self.assertIn("REPO_MAP_CONTENT", kwargs['user_message']) self.assertIn("REPO_MAP_CONTENT", kwargs['user_message'])
# Verify result # Verify result
self.assertEqual(result[0]['id'], mock_response_data[0]['id']) self.assertEqual(result[0]['id'], mock_response_data[0]['id'])
@patch('summarize.build_summary_markdown') @patch('summarize.build_summary_markdown')
@patch('ai_client.send') @patch('ai_client.send')
def test_generate_tracks_markdown_wrapped(self, mock_send, mock_summarize): def test_generate_tracks_markdown_wrapped(self, mock_send: Any, mock_summarize: Any) -> None:
mock_summarize.return_value = "REPO_MAP" mock_summarize.return_value = "REPO_MAP"
mock_response_data = [{"id": "track_1"}] mock_response_data = [{"id": "track_1"}]
expected_result = [{"id": "track_1", "title": "Untitled Track"}] expected_result = [{"id": "track_1", "title": "Untitled Track"}]
# Wrapped in ```json ... ``` # Wrapped in ```json ... ```
mock_send.return_value = f"Here is the plan:\n```json\n{json.dumps(mock_response_data)}\n```\nHope this helps." mock_send.return_value = f"Here is the plan:\n```json\n{json.dumps(mock_response_data)}\n```\nHope this helps."
result = orchestrator_pm.generate_tracks("req", {}, []) result = orchestrator_pm.generate_tracks("req", {}, [])
self.assertEqual(result, expected_result) self.assertEqual(result, expected_result)
# Wrapped in ``` ... ``` # Wrapped in ``` ... ```
mock_send.return_value = f"```\n{json.dumps(mock_response_data)}\n```" mock_send.return_value = f"```\n{json.dumps(mock_response_data)}\n```"
result = orchestrator_pm.generate_tracks("req", {}, []) result = orchestrator_pm.generate_tracks("req", {}, [])
self.assertEqual(result, expected_result) self.assertEqual(result, expected_result)
@patch('summarize.build_summary_markdown') @patch('summarize.build_summary_markdown')
@patch('ai_client.send') @patch('ai_client.send')
def test_generate_tracks_malformed_json(self, mock_send, mock_summarize): def test_generate_tracks_malformed_json(self, mock_send: Any, mock_summarize: Any) -> None:
mock_summarize.return_value = "REPO_MAP" mock_summarize.return_value = "REPO_MAP"
mock_send.return_value = "NOT A JSON" mock_send.return_value = "NOT A JSON"
# Should return empty list and print error (we can mock print if we want to be thorough) # Should return empty list and print error (we can mock print if we want to be thorough)
with patch('builtins.print') as mock_print: with patch('builtins.print') as mock_print:
result = orchestrator_pm.generate_tracks("req", {}, []) result = orchestrator_pm.generate_tracks("req", {}, [])
self.assertEqual(result, []) self.assertEqual(result, [])
mock_print.assert_any_call("Error parsing Tier 1 response: Expecting value: line 1 column 1 (char 0)") mock_print.assert_any_call("Error parsing Tier 1 response: Expecting value: line 1 column 1 (char 0)")
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@@ -1,15 +1,15 @@
import pytest import pytest
from typing import Any
import json import json
from pathlib import Path from pathlib import Path
from project_manager import get_all_tracks, save_track_state from project_manager import get_all_tracks, save_track_state
from models import TrackState, Metadata, Ticket from models import TrackState, Metadata, Ticket
from datetime import datetime from datetime import datetime
def test_get_all_tracks_empty(tmp_path): def test_get_all_tracks_empty(tmp_path: Any) -> None:
# conductor/tracks directory doesn't exist
assert get_all_tracks(tmp_path) == [] assert get_all_tracks(tmp_path) == []
def test_get_all_tracks_with_state(tmp_path): def test_get_all_tracks_with_state(tmp_path: Any) -> None:
tracks_dir = tmp_path / "conductor" / "tracks" tracks_dir = tmp_path / "conductor" / "tracks"
tracks_dir.mkdir(parents=True) tracks_dir.mkdir(parents=True)
track_id = "test_track_1" track_id = "test_track_1"
@@ -34,7 +34,7 @@ def test_get_all_tracks_with_state(tmp_path):
assert track["total"] == 2 assert track["total"] == 2
assert track["progress"] == 0.5 assert track["progress"] == 0.5
def test_get_all_tracks_with_metadata_json(tmp_path): def test_get_all_tracks_with_metadata_json(tmp_path: Any) -> None:
tracks_dir = tmp_path / "conductor" / "tracks" tracks_dir = tmp_path / "conductor" / "tracks"
tracks_dir.mkdir(parents=True) tracks_dir.mkdir(parents=True)
track_id = "test_track_2" track_id = "test_track_2"
@@ -66,7 +66,7 @@ def test_get_all_tracks_with_metadata_json(tmp_path):
assert track["total"] == 3 assert track["total"] == 3
assert pytest.approx(track["progress"]) == 0.333333 assert pytest.approx(track["progress"]) == 0.333333
def test_get_all_tracks_malformed(tmp_path): def test_get_all_tracks_malformed(tmp_path: Any) -> None:
tracks_dir = tmp_path / "conductor" / "tracks" tracks_dir = tmp_path / "conductor" / "tracks"
tracks_dir.mkdir(parents=True) tracks_dir.mkdir(parents=True)
track_id = "malformed_track" track_id = "malformed_track"

View File

@@ -1,9 +1,9 @@
import pytest import pytest
from typing import Any
from pathlib import Path from pathlib import Path
from aggregate import build_tier1_context, build_tier2_context, build_tier3_context from aggregate import build_tier1_context, build_tier2_context, build_tier3_context
def test_build_tier1_context_exists(): def test_build_tier1_context_exists() -> None:
# This should fail if the function is not defined
file_items = [ file_items = [
{"path": Path("conductor/product.md"), "entry": "conductor/product.md", "content": "Product content", "error": False}, {"path": Path("conductor/product.md"), "entry": "conductor/product.md", "content": "Product content", "error": False},
{"path": Path("other.py"), "entry": "other.py", "content": "Other content", "error": False} {"path": Path("other.py"), "entry": "other.py", "content": "Other content", "error": False}
@@ -22,7 +22,7 @@ def test_build_tier2_context_exists() -> None:
result = build_tier2_context(file_items, Path("."), [], history) result = build_tier2_context(file_items, Path("."), [], history)
assert "Other content" in result assert "Other content" in result
def test_build_tier3_context_ast_skeleton(monkeypatch): def test_build_tier3_context_ast_skeleton(monkeypatch: Any) -> None:
from unittest.mock import MagicMock from unittest.mock import MagicMock
import aggregate import aggregate
import file_cache import file_cache
@@ -59,7 +59,7 @@ def test_build_tier3_context_exists() -> None:
assert "other.py" in result assert "other.py" in result
assert "AST Skeleton" in result assert "AST Skeleton" in result
def test_build_file_items_with_tiers(tmp_path): def test_build_file_items_with_tiers(tmp_path: Any) -> None:
from aggregate import build_file_items from aggregate import build_file_items
# Create some dummy files # Create some dummy files
file1 = tmp_path / "file1.txt" file1 = tmp_path / "file1.txt"
@@ -80,7 +80,7 @@ def test_build_file_items_with_tiers(tmp_path):
assert item2["content"] == "content2" assert item2["content"] == "content2"
assert item2["tier"] == 3 assert item2["tier"] == 3
def test_build_files_section_with_dicts(tmp_path): def test_build_files_section_with_dicts(tmp_path: Any) -> None:
from aggregate import build_files_section from aggregate import build_files_section
file1 = tmp_path / "file1.txt" file1 = tmp_path / "file1.txt"
file1.write_text("content1") file1.write_text("content1")