chore(tests): Move meta-infrastructure tests to conductor/tests/ for permanent isolation

This commit is contained in:
2026-02-27 19:01:12 -05:00
parent cca9ef9307
commit 41ae3df75d
27 changed files with 221 additions and 31 deletions

View File

@@ -0,0 +1,25 @@
import subprocess
import sys
import os
def run_diag(role, prompt):
print(f"--- Running Diag for {role} ---")
cmd = [sys.executable, "scripts/mma_exec.py", "--role", role, prompt]
try:
result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8')
print("STDOUT:")
print(result.stdout)
print("STDERR:")
print(result.stderr)
return result.stdout
except Exception as e:
print(f"FAILED: {e}")
return str(e)
if __name__ == "__main__":
# Test 1: Simple read
print("TEST 1: read_file")
run_diag("tier3-worker", "Read the file 'pyproject.toml' and tell me the version of the project. ONLY the version string.")
print("\nTEST 2: run_shell_command")
run_diag("tier3-worker", "Use run_shell_command to execute 'echo HELLO_SUBAGENT' and return the output. ONLY the output.")

View File

@@ -0,0 +1,41 @@
import pytest
from unittest.mock import MagicMock, patch
import ai_client
def test_ai_client_send_gemini_cli():
"""
Verifies that 'ai_client.send' correctly interacts with 'GeminiCliAdapter'
when the 'gemini_cli' provider is specified.
"""
test_message = "Hello, this is a test prompt for the CLI adapter."
test_response = "This is a dummy response from the Gemini CLI."
# Set provider to gemini_cli
ai_client.set_provider("gemini_cli", "gemini-2.5-flash-lite")
# 1. Mock 'ai_client.GeminiCliAdapter' (which we will add)
with patch('ai_client.GeminiCliAdapter') as MockAdapterClass:
mock_adapter_instance = MockAdapterClass.return_value
mock_adapter_instance.send.return_value = {"text": test_response, "tool_calls": []}
mock_adapter_instance.last_usage = {"total_tokens": 100}
mock_adapter_instance.last_latency = 0.5
mock_adapter_instance.session_id = "test-session"
# Verify that 'events' are emitted correctly
with patch.object(ai_client.events, 'emit') as mock_emit:
response = ai_client.send(
md_content="<context></context>",
user_message=test_message,
base_dir="."
)
# Check that the adapter's send method was called.
mock_adapter_instance.send.assert_called()
# Verify that the expected lifecycle events were emitted.
emitted_event_names = [call.args[0] for call in mock_emit.call_args_list]
assert 'request_start' in emitted_event_names
assert 'response_received' in emitted_event_names
# Verify that the combined text returned by the adapter is returned by 'ai_client.send'.
assert response == test_response

View File

@@ -0,0 +1,75 @@
import unittest
from unittest.mock import patch, MagicMock
import io
import json
import sys
import os
# Add project root to sys.path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
# Import after path fix
from scripts.cli_tool_bridge import main
class TestCliToolBridge(unittest.TestCase):
def setUp(self):
os.environ['GEMINI_CLI_HOOK_CONTEXT'] = 'manual_slop'
self.tool_call = {
'tool_name': 'read_file',
'tool_input': {'path': 'test.txt'}
}
@patch('sys.stdin', new_callable=io.StringIO)
@patch('sys.stdout', new_callable=io.StringIO)
@patch('api_hook_client.ApiHookClient.request_confirmation')
def test_allow_decision(self, mock_request, mock_stdout, mock_stdin):
# 1. Mock stdin with a JSON string tool call
mock_stdin.write(json.dumps(self.tool_call))
mock_stdin.seek(0)
# 2. Mock ApiHookClient to return approved
mock_request.return_value = {'approved': True}
# Run main
main()
# 3. Capture stdout and assert allow
output = json.loads(mock_stdout.getvalue().strip())
self.assertEqual(output.get('decision'), 'allow')
@patch('sys.stdin', new_callable=io.StringIO)
@patch('sys.stdout', new_callable=io.StringIO)
@patch('api_hook_client.ApiHookClient.request_confirmation')
def test_deny_decision(self, mock_request, mock_stdout, mock_stdin):
# Mock stdin
mock_stdin.write(json.dumps(self.tool_call))
mock_stdin.seek(0)
# 4. Mock ApiHookClient to return denied
mock_request.return_value = {'approved': False}
main()
# Assert deny
output = json.loads(mock_stdout.getvalue().strip())
self.assertEqual(output.get('decision'), 'deny')
@patch('sys.stdin', new_callable=io.StringIO)
@patch('sys.stdout', new_callable=io.StringIO)
@patch('api_hook_client.ApiHookClient.request_confirmation')
def test_unreachable_hook_server(self, mock_request, mock_stdout, mock_stdin):
# Mock stdin
mock_stdin.write(json.dumps(self.tool_call))
mock_stdin.seek(0)
# 5. Test case where hook server is unreachable (exception)
mock_request.side_effect = Exception("Connection refused")
main()
# Assert deny on error
output = json.loads(mock_stdout.getvalue().strip())
self.assertEqual(output.get('decision'), 'deny')
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,53 @@
import unittest
from unittest.mock import patch, MagicMock
import io
import json
import sys
import os
# Add project root to sys.path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
# Import after path fix
from scripts.cli_tool_bridge import main
class TestCliToolBridgeMapping(unittest.TestCase):
def setUp(self):
os.environ['GEMINI_CLI_HOOK_CONTEXT'] = 'manual_slop'
@patch('sys.stdin', new_callable=io.StringIO)
@patch('sys.stdout', new_callable=io.StringIO)
@patch('api_hook_client.ApiHookClient.request_confirmation')
def test_mapping_from_api_format(self, mock_request, mock_stdout, mock_stdin):
"""
Verify that bridge correctly maps 'id', 'name', 'input' (Gemini API format)
into tool_name and tool_input for the hook client.
"""
api_tool_call = {
'id': 'call123',
'name': 'read_file',
'input': {'path': 'test.txt'}
}
# 1. Mock stdin with the API format JSON
mock_stdin.write(json.dumps(api_tool_call))
mock_stdin.seek(0)
# 2. Mock ApiHookClient to return approved
mock_request.return_value = {'approved': True}
# Run main
main()
# 3. Verify that request_confirmation was called with mapped values
# If it's not mapped, it will likely be called with None or fail
mock_request.assert_called_once_with('read_file', {'path': 'test.txt'})
# 4. Capture stdout and assert allow
output_str = mock_stdout.getvalue().strip()
self.assertTrue(output_str, "Stdout should not be empty")
output = json.loads(output_str)
self.assertEqual(output.get('decision'), 'allow')
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,130 @@
import unittest
from unittest.mock import patch, MagicMock
import json
import subprocess
import io
import sys
import os
# Ensure the project root is in sys.path to resolve imports correctly
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from gemini_cli_adapter import GeminiCliAdapter
class TestGeminiCliAdapter(unittest.TestCase):
def setUp(self):
self.adapter = GeminiCliAdapter(binary_path="gemini")
@patch('subprocess.Popen')
def test_send_starts_subprocess_with_correct_args(self, mock_popen):
"""
Verify that send(message) correctly starts the subprocess with
--output-format stream-json and the provided message via stdin using communicate.
"""
# Setup mock process with a minimal valid JSONL termination
process_mock = MagicMock()
stdout_content = json.dumps({"type": "result", "usage": {}}) + "\n"
process_mock.communicate.return_value = (stdout_content, "")
process_mock.poll.return_value = 0
process_mock.wait.return_value = 0
mock_popen.return_value = process_mock
message = "Hello Gemini CLI"
self.adapter.send(message)
# Verify subprocess.Popen call
mock_popen.assert_called_once()
args, kwargs = mock_popen.call_args
cmd = args[0]
# Check mandatory CLI components
self.assertIn("gemini", cmd)
self.assertIn("--output-format", cmd)
self.assertIn("stream-json", cmd)
# Message should NOT be in cmd now
self.assertNotIn(message, cmd)
# Verify message was sent via communicate
process_mock.communicate.assert_called_once_with(input=message)
# Check process configuration
self.assertEqual(kwargs.get('stdout'), subprocess.PIPE)
self.assertEqual(kwargs.get('stdin'), subprocess.PIPE)
self.assertEqual(kwargs.get('text'), True)
@patch('subprocess.Popen')
def test_send_parses_jsonl_output(self, mock_popen):
"""
Verify that it correctly parses multiple JSONL 'message' events
and returns the combined text.
"""
jsonl_output = [
json.dumps({"type": "message", "role": "model", "text": "The quick brown "}),
json.dumps({"type": "message", "role": "model", "text": "fox jumps."}),
json.dumps({"type": "result", "usage": {"prompt_tokens": 5, "candidates_tokens": 5}})
]
stdout_content = "\n".join(jsonl_output) + "\n"
process_mock = MagicMock()
process_mock.communicate.return_value = (stdout_content, "")
process_mock.poll.return_value = 0
process_mock.wait.return_value = 0
mock_popen.return_value = process_mock
result = self.adapter.send("test message")
self.assertEqual(result["text"], "The quick brown fox jumps.")
self.assertEqual(result["tool_calls"], [])
@patch('subprocess.Popen')
def test_send_handles_tool_use_events(self, mock_popen):
"""
Verify that it correctly handles 'tool_use' events in the stream
by continuing to read until the final 'result' event.
"""
jsonl_output = [
json.dumps({"type": "message", "role": "assistant", "text": "Calling tool..."}),
json.dumps({"type": "tool_use", "name": "read_file", "args": {"path": "test.txt"}}),
json.dumps({"type": "message", "role": "assistant", "text": "\nFile read successfully."}),
json.dumps({"type": "result", "usage": {}})
]
stdout_content = "\n".join(jsonl_output) + "\n"
process_mock = MagicMock()
process_mock.communicate.return_value = (stdout_content, "")
process_mock.poll.return_value = 0
process_mock.wait.return_value = 0
mock_popen.return_value = process_mock
result = self.adapter.send("read test.txt")
# Result should contain the combined text from all 'message' events
self.assertEqual(result["text"], "Calling tool...\nFile read successfully.")
self.assertEqual(len(result["tool_calls"]), 1)
self.assertEqual(result["tool_calls"][0]["name"], "read_file")
@patch('subprocess.Popen')
def test_send_captures_usage_metadata(self, mock_popen):
"""
Verify that usage data is extracted from the 'result' event.
"""
usage_data = {"total_tokens": 42}
jsonl_output = [
json.dumps({"type": "message", "text": "Finalizing"}),
json.dumps({"type": "result", "usage": usage_data})
]
stdout_content = "\n".join(jsonl_output) + "\n"
process_mock = MagicMock()
process_mock.communicate.return_value = (stdout_content, "")
process_mock.poll.return_value = 0
process_mock.wait.return_value = 0
mock_popen.return_value = process_mock
self.adapter.send("usage test")
# Verify the usage was captured in the adapter instance
self.assertEqual(self.adapter.last_usage, usage_data)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,176 @@
import unittest
from unittest.mock import patch, MagicMock, ANY
import json
import subprocess
import io
import sys
import os
# Ensure the project root is in sys.path to resolve imports correctly
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if project_root not in sys.path:
sys.path.append(project_root)
# Import the class to be tested
from gemini_cli_adapter import GeminiCliAdapter
class TestGeminiCliAdapterParity(unittest.TestCase):
def setUp(self):
"""Set up a fresh adapter instance and reset session state for each test."""
# Patch session_logger to prevent file operations during tests
self.session_logger_patcher = patch('gemini_cli_adapter.session_logger')
self.mock_session_logger = self.session_logger_patcher.start()
self.adapter = GeminiCliAdapter(binary_path="gemini")
self.adapter.session_id = None
self.adapter.last_usage = None
self.adapter.last_latency = 0.0
def tearDown(self):
self.session_logger_patcher.stop()
@patch('subprocess.Popen')
def test_count_tokens_uses_estimation(self, mock_popen):
"""
Test that count_tokens uses character-based estimation.
"""
contents_to_count = ["This is the first line.", "This is the second line."]
expected_chars = len("\n".join(contents_to_count))
expected_tokens = expected_chars // 4
token_count = self.adapter.count_tokens(contents=contents_to_count)
self.assertEqual(token_count, expected_tokens)
# Verify that NO subprocess was started for counting
mock_popen.assert_not_called()
@patch('subprocess.Popen')
def test_send_with_safety_settings_no_flags_added(self, mock_popen):
"""
Test that the send method does NOT add --safety flags when safety_settings are provided,
as this functionality is no longer supported via CLI flags.
"""
process_mock = MagicMock()
mock_stdout_content = json.dumps({"type": "result", "usage": {}}) + "\n"
process_mock.communicate.return_value = (mock_stdout_content, "")
process_mock.returncode = 0
mock_popen.return_value = process_mock
message_content = "User's prompt here."
safety_settings = [
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_ONLY_HIGH"},
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}
]
self.adapter.send(message=message_content, safety_settings=safety_settings)
args, kwargs = mock_popen.call_args
command = args[0]
# Verify that no --safety flags were added to the command
self.assertNotIn("--safety", command)
# Verify that the message was passed correctly via stdin
process_mock.communicate.assert_called_once_with(input=message_content)
@patch('subprocess.Popen')
def test_send_without_safety_settings_no_flags(self, mock_popen):
"""
Test that when safety_settings is None or an empty list, no --safety flags are added.
"""
process_mock = MagicMock()
mock_stdout_content = json.dumps({"type": "result", "usage": {}}) + "\n"
process_mock.communicate.return_value = (mock_stdout_content, "")
process_mock.returncode = 0
mock_popen.return_value = process_mock
message_content = "Another prompt."
self.adapter.send(message=message_content, safety_settings=None)
args_none, _ = mock_popen.call_args
self.assertNotIn("--safety", args_none[0])
mock_popen.reset_mock()
self.adapter.send(message=message_content, safety_settings=[])
args_empty, _ = mock_popen.call_args
self.assertNotIn("--safety", args_empty[0])
@patch('subprocess.Popen')
def test_send_with_system_instruction_prepended_to_stdin(self, mock_popen):
"""
Test that the send method prepends the system instruction to the prompt
sent via stdin, and does NOT add a --system flag to the command.
"""
process_mock = MagicMock()
mock_stdout_content = json.dumps({"type": "result", "usage": {}}) + "\n"
process_mock.communicate.return_value = (mock_stdout_content, "")
process_mock.returncode = 0
mock_popen.return_value = process_mock
message_content = "User's prompt here."
system_instruction_text = "Some instruction"
expected_input = f"{system_instruction_text}\n\n{message_content}"
self.adapter.send(message=message_content, system_instruction=system_instruction_text)
args, kwargs = mock_popen.call_args
command = args[0]
# Verify that the system instruction was prepended to the input sent to communicate
process_mock.communicate.assert_called_once_with(input=expected_input)
# Verify that no --system flag was added to the command
self.assertNotIn("--system", command)
@patch('subprocess.Popen')
def test_send_with_model_parameter(self, mock_popen):
"""
Test that the send method correctly adds the -m <model> flag when a model is specified.
"""
process_mock = MagicMock()
mock_stdout_content = json.dumps({"type": "result", "usage": {}}) + "\n"
process_mock.communicate.return_value = (mock_stdout_content, "")
process_mock.returncode = 0
mock_popen.return_value = process_mock
message_content = "User's prompt here."
model_name = "gemini-1.5-flash"
expected_command_part = f'-m "{model_name}"'
self.adapter.send(message=message_content, model=model_name)
args, kwargs = mock_popen.call_args
command = args[0]
# Verify that the -m <model> flag was added to the command
self.assertIn(expected_command_part, command)
# Verify that the message was passed correctly via stdin
process_mock.communicate.assert_called_once_with(input=message_content)
@patch('subprocess.Popen')
def test_send_kills_process_on_communicate_exception(self, mock_popen):
"""
Test that if subprocess.Popen().communicate() raises an exception,
GeminiCliAdapter.send() kills the process and re-raises the exception.
"""
mock_process = MagicMock()
mock_popen.return_value = mock_process
# Define an exception to simulate
simulated_exception = RuntimeError("Simulated communicate error")
mock_process.communicate.side_effect = simulated_exception
message_content = "User message"
# Assert that the exception is raised and process is killed
with self.assertRaises(RuntimeError) as cm:
self.adapter.send(message=message_content)
# Verify that the process's kill method was called
mock_process.kill.assert_called_once()
# Verify that the correct exception was re-raised
self.assertIs(cm.exception, simulated_exception)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,170 @@
import pytest
import time
import os
import sys
import requests
import json
from api_hook_client import ApiHookClient
def test_gemini_cli_context_bleed_prevention(live_gui):
"""
Test that the GeminiCliAdapter correctly filters out echoed 'user' messages
and only shows assistant content in the GUI history.
"""
client = ApiHookClient("http://127.0.0.1:8999")
client.click("btn_reset")
client.set_value("auto_add_history", True)
# Create a specialized mock for context bleed
bleed_mock = os.path.abspath("tests/mock_context_bleed.py")
with open(bleed_mock, "w") as f:
f.write('''import sys, json
print(json.dumps({"type": "init", "session_id": "bleed-test"}), flush=True)
print(json.dumps({"type": "message", "role": "user", "content": "I am echoing you"}), flush=True)
print(json.dumps({"type": "message", "role": "assistant", "content": "Actual AI Response"}), flush=True)
print(json.dumps({"type": "result", "stats": {"total_tokens": 10}}), flush=True)
''')
cli_cmd = f'"{sys.executable}" "{bleed_mock}"'
client.set_value("current_provider", "gemini_cli")
client.set_value("gcli_path", cli_cmd)
client.set_value("ai_input", "Test context bleed")
client.click("btn_gen_send")
# Wait for completion
time.sleep(3)
session = client.get_session()
entries = session.get("session", {}).get("entries", [])
# Verify: We expect exactly one AI entry, and it must NOT contain the echoed user message
ai_entries = [e for e in entries if e.get("role") == "AI"]
assert len(ai_entries) == 1
assert ai_entries[0].get("content") == "Actual AI Response"
assert "echoing you" not in ai_entries[0].get("content")
os.remove(bleed_mock)
def test_gemini_cli_parameter_resilience(live_gui):
"""
Test that mcp_client correctly handles 'file_path' and 'dir_path' aliases
sent by the AI instead of 'path'.
"""
client = ApiHookClient("http://127.0.0.1:8999")
client.click("btn_reset")
client.set_value("auto_add_history", True)
client.select_list_item("proj_files", "manual_slop")
# Create a mock that uses dir_path for list_directory
alias_mock = os.path.abspath("tests/mock_alias_tool.py")
bridge_path = os.path.abspath("scripts/cli_tool_bridge.py")
# Avoid backslashes in f-string expression part
if sys.platform == "win32":
bridge_path_str = bridge_path.replace("\\", "/")
else:
bridge_path_str = bridge_path
with open(alias_tool_content := "tests/mock_alias_tool.py", "w") as f:
f.write(f'''import sys, json, os, subprocess
prompt = sys.stdin.read()
if '"role": "tool"' in prompt:
print(json.dumps({{"type": "message", "role": "assistant", "content": "Tool worked!"}}), flush=True)
print(json.dumps({{"type": "result", "stats": {{"total_tokens": 20}}}}), flush=True)
else:
# We must call the bridge to trigger the GUI approval!
tool_call = {{"name": "list_directory", "input": {{"dir_path": "."}}}}
bridge_cmd = [sys.executable, "{bridge_path_str}"]
proc = subprocess.Popen(bridge_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True)
stdout, _ = proc.communicate(input=json.dumps(tool_call))
# Even if bridge says allow, we emit the tool_use to the adapter
print(json.dumps({{"type": "message", "role": "assistant", "content": "I will list the directory."}}), flush=True)
print(json.dumps({{
"type": "tool_use",
"name": "list_directory",
"id": "alias_call",
"args": {{"dir_path": "."}}
}}), flush=True)
print(json.dumps({{"type": "result", "stats": {{"total_tokens": 10}}}}), flush=True)
''')
cli_cmd = f'"{sys.executable}" "{alias_mock}"'
client.set_value("current_provider", "gemini_cli")
client.set_value("gcli_path", cli_cmd)
client.set_value("ai_input", "Test parameter aliases")
client.click("btn_gen_send")
# Handle approval
timeout = 15
start_time = time.time()
approved = False
while time.time() - start_time < timeout:
for ev in client.get_events():
if ev.get("type") == "ask_received":
requests.post("http://127.0.0.1:8999/api/ask/respond",
json={"request_id": ev.get("request_id"), "response": {"approved": True}})
approved = True
if approved: break
time.sleep(0.5)
assert approved, "Tool approval event never received"
# Verify tool result in history
time.sleep(2)
session = client.get_session()
entries = session.get("session", {}).get("entries", [])
# Check for "Tool worked!" which implies the tool execution was successful
found = any("Tool worked!" in e.get("content", "") for e in entries)
assert found, "Tool result indicating success not found in history"
os.remove(alias_mock)
def test_gemini_cli_loop_termination(live_gui):
"""
Test that multi-round tool calling correctly terminates and preserves
payload (session context) between rounds.
"""
client = ApiHookClient("http://127.0.0.1:8999")
client.click("btn_reset")
client.set_value("auto_add_history", True)
client.select_list_item("proj_files", "manual_slop")
# This uses the existing mock_gemini_cli.py which is already designed for 2 rounds
mock_script = os.path.abspath("tests/mock_gemini_cli.py")
cli_cmd = f'"{sys.executable}" "{mock_script}"'
client.set_value("current_provider", "gemini_cli")
client.set_value("gcli_path", cli_cmd)
client.set_value("ai_input", "Perform multi-round tool test")
client.click("btn_gen_send")
# Handle approvals (mock does one tool call)
timeout = 20
start_time = time.time()
approved = False
while time.time() - start_time < timeout:
for ev in client.get_events():
if ev.get("type") == "ask_received":
requests.post("http://127.0.0.1:8999/api/ask/respond",
json={"request_id": ev.get("request_id"), "response": {"approved": True}})
approved = True
if approved: break
time.sleep(0.5)
# Wait for the second round and final answer
found_final = False
start_time = time.time()
while time.time() - start_time < 15:
session = client.get_session()
entries = session.get("session", {}).get("entries", [])
for e in entries:
if "processed the tool results" in e.get("content", ""):
found_final = True
break
if found_final: break
time.sleep(1)
assert found_final, "Final message after multi-round tool loop not found"

View File

@@ -0,0 +1,141 @@
import pytest
import time
import os
import sys
import requests
from api_hook_client import ApiHookClient
def test_gemini_cli_full_integration(live_gui):
"""
Integration test for the Gemini CLI provider and tool bridge.
Handles 'ask_received' events from the bridge and any other approval requests.
"""
client = ApiHookClient("http://127.0.0.1:8999")
# 0. Reset session and enable history
client.click("btn_reset")
client.set_value("auto_add_history", True)
# Switch to manual_slop project explicitly
client.select_list_item("proj_files", "manual_slop")
# 1. Setup paths and configure the GUI
# Use the real gemini CLI if available, otherwise use mock
# For CI/testing we prefer mock
mock_script = os.path.abspath("tests/mock_gemini_cli.py")
cli_cmd = f'"{sys.executable}" "{mock_script}"'
print(f"[TEST] Setting current_provider to gemini_cli")
client.set_value("current_provider", "gemini_cli")
print(f"[TEST] Setting gcli_path to {cli_cmd}")
client.set_value("gcli_path", cli_cmd)
# Verify settings
assert client.get_value("current_provider") == "gemini_cli"
# Clear events
client.get_events()
# 2. Trigger a message in the GUI
print("[TEST] Sending user message...")
client.set_value("ai_input", "Please read test.txt")
client.click("btn_gen_send")
# 3. Monitor for approval events
print("[TEST] Waiting for approval events...")
timeout = 45
start_time = time.time()
approved_count = 0
while time.time() - start_time < timeout:
events = client.get_events()
if events:
for ev in events:
etype = ev.get("type")
eid = ev.get("request_id") or ev.get("action_id")
print(f"[TEST] Received event: {etype} (ID: {eid})")
if etype in ["ask_received", "glob_approval_required", "script_confirmation_required"]:
print(f"[TEST] Approving {etype} {eid}")
if etype == "script_confirmation_required":
resp = requests.post(f"http://127.0.0.1:8999/api/confirm/{eid}", json={"approved": True})
else:
resp = requests.post("http://127.0.0.1:8999/api/ask/respond",
json={"request_id": eid, "response": {"approved": True}})
assert resp.status_code == 200
approved_count += 1
# Check if we got a final response in history
session = client.get_session()
entries = session.get("session", {}).get("entries", [])
found_final = False
for entry in entries:
content = entry.get("content", "")
if "Hello from mock!" in content or "processed the tool results" in content:
print(f"[TEST] Success! Found final message in history.")
found_final = True
break
if found_final:
break
time.sleep(1.0)
assert approved_count > 0, "No approval events were processed"
assert found_final, "Final message from mock CLI was not found in the GUI history"
def test_gemini_cli_rejection_and_history(live_gui):
"""
Integration test for the Gemini CLI provider: Rejection flow and history.
"""
client = ApiHookClient("http://127.0.0.1:8999")
# 0. Reset session
client.click("btn_reset")
client.set_value("auto_add_history", True)
client.select_list_item("proj_files", "manual_slop")
mock_script = os.path.abspath("tests/mock_gemini_cli.py")
cli_cmd = f'"{sys.executable}" "{mock_script}"'
client.set_value("current_provider", "gemini_cli")
client.set_value("gcli_path", cli_cmd)
# 2. Trigger a message
print("[TEST] Sending user message (to be denied)...")
client.set_value("ai_input", "Deny me")
client.click("btn_gen_send")
# 3. Wait for event and reject
timeout = 20
start_time = time.time()
denied = False
while time.time() - start_time < timeout:
for ev in client.get_events():
etype = ev.get("type")
eid = ev.get("request_id")
print(f"[TEST] Received event: {etype}")
if etype == "ask_received":
print(f"[TEST] Denying request {eid}")
requests.post("http://127.0.0.1:8999/api/ask/respond",
json={"request_id": eid, "response": {"approved": False}})
denied = True
break
if denied: break
time.sleep(0.5)
assert denied, "No ask_received event to deny"
# 4. Verify rejection in history
print("[TEST] Waiting for rejection in history...")
rejection_found = False
start_time = time.time()
while time.time() - start_time < 20:
session = client.get_session()
entries = session.get("session", {}).get("entries", [])
for entry in entries:
if "Tool execution was denied" in entry.get("content", ""):
rejection_found = True
break
if rejection_found: break
time.sleep(1.0)
assert rejection_found, "Rejection message not found in history"

View File

@@ -0,0 +1,52 @@
import pytest
from unittest.mock import patch, MagicMock
import sys
import os
# Add project root to sys.path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
import ai_client
@pytest.fixture(autouse=True)
def setup_ai_client():
ai_client.reset_session()
ai_client.set_provider("gemini_cli", "gemini-2.5-flash")
ai_client.confirm_and_run_callback = lambda script, base_dir: "Mocked execution"
ai_client.comms_log_callback = lambda entry: None
ai_client.tool_log_callback = lambda script, result: None
yield
@patch('ai_client.GeminiCliAdapter')
@patch('ai_client._get_combined_system_prompt')
def test_send_invokes_adapter_send(mock_prompt, mock_adapter_class):
mock_prompt.return_value = "Mocked Prompt"
mock_instance = mock_adapter_class.return_value
mock_instance.send.return_value = {"text": "Done", "tool_calls": []}
mock_instance.last_usage = {"input_tokens": 10}
mock_instance.last_latency = 0.1
mock_instance.session_id = None
ai_client.send("context", "message", discussion_history="hist")
expected_payload = "[DISCUSSION HISTORY]\n\nhist\n\n---\n\nmessage"
assert mock_instance.send.called
args, kwargs = mock_instance.send.call_args
assert args[0] == expected_payload
assert kwargs['system_instruction'] == "Mocked Prompt\n\n<context>\ncontext\n</context>"
@patch('ai_client.GeminiCliAdapter')
def test_get_history_bleed_stats(mock_adapter_class):
mock_instance = mock_adapter_class.return_value
mock_instance.send.return_value = {"text": "txt", "tool_calls": []}
mock_instance.last_usage = {"input_tokens": 1500}
mock_instance.last_latency = 0.5
mock_instance.session_id = "sess"
# Initialize by sending a message
ai_client.send("context", "msg")
stats = ai_client.get_history_bleed_stats()
assert stats["provider"] == "gemini_cli"
assert stats["current"] == 1500

View File

@@ -0,0 +1,50 @@
import pytest
import os
import sys
from unittest.mock import MagicMock, patch
# Ensure project root is in path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
# Import the necessary functions from ai_client, including the reset helper
from ai_client import get_gemini_cache_stats, reset_session
def test_get_gemini_cache_stats_with_mock_client():
"""
Test that get_gemini_cache_stats correctly processes cache lists
from a mocked client instance.
"""
# Ensure a clean state before the test by resetting the session
reset_session()
# 1. Create a mock for the cache object that the client will return
mock_cache = MagicMock()
mock_cache.name = "cachedContents/test-cache"
mock_cache.display_name = "Test Cache"
mock_cache.model = "models/gemini-1.5-pro-001"
mock_cache.size_bytes = 1024
# 2. Create a mock for the client instance
mock_client_instance = MagicMock()
# Configure its `caches.list` method to return our mock cache
mock_client_instance.caches.list.return_value = [mock_cache]
# 3. Patch the Client constructor to return our mock instance
# This intercepts the `_ensure_gemini_client` call inside the function
with patch('google.genai.Client', return_value=mock_client_instance) as mock_client_constructor:
# 4. Call the function under test
stats = get_gemini_cache_stats()
# 5. Assert that the function behaved as expected
# It should have constructed the client
mock_client_constructor.assert_called_once()
# It should have called the `list` method on the `caches` attribute
mock_client_instance.caches.list.assert_called_once()
# The returned stats dictionary should be correct
assert "cache_count" in stats
assert "total_size_bytes" in stats
assert stats["cache_count"] == 1
assert stats["total_size_bytes"] == 1024

View File

@@ -0,0 +1,57 @@
import subprocess
import pytest
import os
def run_ps_script(role, prompt):
"""Helper to run the run_subagent.ps1 script."""
# Using -File is safer and handles arguments better
cmd = [
"powershell", "-NoProfile", "-ExecutionPolicy", "Bypass",
"-File", "./scripts/run_subagent.ps1",
"-Role", role,
"-Prompt", prompt
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.stdout:
print(f"\n[Sub-Agent {role} Output]:\n{result.stdout}")
if result.stderr:
print(f"\n[Sub-Agent {role} Error]:\n{result.stderr}")
return result
def test_subagent_script_qa_live():
"""Verify that the QA role works and returns a compressed fix."""
prompt = "Traceback (most recent call last): File 'test.py', line 1, in <module> 1/0 ZeroDivisionError: division by zero"
result = run_ps_script("QA", prompt)
assert result.returncode == 0
# Expected output should mention the fix for division by zero
assert "zero" in result.stdout.lower()
# It should be short (QA agents compress)
assert len(result.stdout.split()) < 40
def test_subagent_script_worker_live():
"""Verify that the Worker role works and returns code."""
prompt = "Write a python function that returns 'hello world'"
result = run_ps_script("Worker", prompt)
assert result.returncode == 0
assert "def" in result.stdout.lower()
assert "hello" in result.stdout.lower()
def test_subagent_script_utility_live():
"""Verify that the Utility role works."""
prompt = "Tell me 'True' if 1+1=2, otherwise 'False'"
result = run_ps_script("Utility", prompt)
assert result.returncode == 0
assert "true" in result.stdout.lower()
def test_subagent_isolation_live():
"""Verify that the sub-agent is stateless and does not see the parent's conversation context."""
# This prompt asks the sub-agent about a 'secret' mentioned only here, not in its prompt.
prompt = "What is the secret code I just told you? If I didn't tell you, say 'UNKNOWN'."
result = run_ps_script("Utility", prompt)
assert result.returncode == 0
# A stateless agent should not know any previous context.
assert "unknown" in result.stdout.lower()

View File

@@ -0,0 +1,151 @@
import pytest
import os
from unittest.mock import patch, MagicMock
from scripts.mma_exec import create_parser, get_role_documents, execute_agent, get_model_for_role, get_dependencies
def test_parser_role_choices():
"""Test that the parser accepts valid roles and the prompt argument."""
parser = create_parser()
valid_roles = ['tier1', 'tier2', 'tier3', 'tier4']
test_prompt = "Analyze the codebase for bottlenecks."
for role in valid_roles:
args = parser.parse_args(['--role', role, test_prompt])
assert args.role == role
assert args.prompt == test_prompt
def test_parser_invalid_role():
"""Test that the parser rejects roles outside the specified choices."""
parser = create_parser()
with pytest.raises(SystemExit):
parser.parse_args(['--role', 'tier5', 'Some prompt'])
def test_parser_prompt_optional():
"""Test that the prompt argument is optional if role is provided (or handled in main)."""
parser = create_parser()
# Prompt is now optional (nargs='?')
args = parser.parse_args(['--role', 'tier3'])
assert args.role == 'tier3'
assert args.prompt is None
def test_parser_help():
"""Test that the help flag works without raising errors (exits with 0)."""
parser = create_parser()
with pytest.raises(SystemExit) as excinfo:
parser.parse_args(['--help'])
assert excinfo.value.code == 0
def test_get_role_documents():
"""Test that get_role_documents returns the correct documentation paths for each tier."""
assert get_role_documents('tier1') == ['conductor/product.md', 'conductor/product-guidelines.md']
assert get_role_documents('tier2') == ['conductor/tech-stack.md', 'conductor/workflow.md']
assert get_role_documents('tier3') == ['conductor/workflow.md']
assert get_role_documents('tier4') == []
def test_get_model_for_role():
"""Test that get_model_for_role returns the correct model for each role."""
assert get_model_for_role('tier1-orchestrator') == 'gemini-3.1-pro-preview'
assert get_model_for_role('tier2-tech-lead') == 'gemini-2.5-flash-lite'
assert get_model_for_role('tier3-worker') == 'gemini-2.5-flash-lite'
assert get_model_for_role('tier4-qa') == 'gemini-2.5-flash-lite'
def test_execute_agent():
"""
Test that execute_agent calls subprocess.run with powershell and the correct gemini CLI arguments
including the model specified for the role.
"""
role = "tier3-worker"
prompt = "Write a unit test."
docs = ["file1.py", "docs/spec.md"]
expected_model = "gemini-2.5-flash-lite"
mock_stdout = "Mocked AI Response"
with patch("subprocess.run") as mock_run:
mock_process = MagicMock()
mock_process.stdout = mock_stdout
mock_process.returncode = 0
mock_run.return_value = mock_process
result = execute_agent(role, prompt, docs)
mock_run.assert_called_once()
args, kwargs = mock_run.call_args
cmd_list = args[0]
assert cmd_list[0] == "powershell.exe"
assert "-Command" in cmd_list
ps_cmd = cmd_list[cmd_list.index("-Command") + 1]
assert "gemini" in ps_cmd
assert f"--model {expected_model}" in ps_cmd
# Verify input contains the prompt and system directive
input_text = kwargs.get("input")
assert "STRICT SYSTEM DIRECTIVE" in input_text
assert "TASK: Write a unit test." in input_text
assert kwargs.get("capture_output") is True
assert kwargs.get("text") is True
assert result == mock_stdout
def test_get_dependencies(tmp_path):
content = (
"import os\n"
"import sys\n"
"import file_cache\n"
"from mcp_client import something\n"
)
filepath = tmp_path / "mock_script.py"
filepath.write_text(content)
dependencies = get_dependencies(str(filepath))
assert dependencies == ['os', 'sys', 'file_cache', 'mcp_client']
import re
def test_execute_agent_logging(tmp_path):
log_file = tmp_path / "mma_delegation.log"
# mma_exec now uses logs/agents/ for individual logs and logs/mma_delegation.log for master
# We will patch LOG_FILE to point to our temp location
with patch("scripts.mma_exec.LOG_FILE", str(log_file)), \
patch("subprocess.run") as mock_run:
mock_process = MagicMock()
mock_process.stdout = ""
mock_process.returncode = 0
mock_run.return_value = mock_process
test_role = "tier1"
test_prompt = "Plan the next phase"
execute_agent(test_role, test_prompt, [])
assert log_file.exists()
log_content = log_file.read_text()
assert test_role in log_content
assert test_prompt in log_content # Master log should now have the summary prompt
assert re.search(r"\d{4}-\d{2}-\d{2}", log_content)
def test_execute_agent_tier3_injection(tmp_path):
main_content = "import dependency\n\ndef run():\n dependency.do_work()\n"
main_file = tmp_path / "main.py"
main_file.write_text(main_content)
dep_content = "def do_work():\n pass\n\ndef other_func():\n print('hello')\n"
dep_file = tmp_path / "dependency.py"
dep_file.write_text(dep_content)
# We need to ensure generate_skeleton is mockable or working
old_cwd = os.getcwd()
os.chdir(tmp_path)
try:
with patch("subprocess.run") as mock_run:
mock_process = MagicMock()
mock_process.stdout = "OK"
mock_process.returncode = 0
mock_run.return_value = mock_process
execute_agent('tier3-worker', 'Modify main.py', ['main.py'])
assert mock_run.called
input_text = mock_run.call_args[1].get("input")
assert "DEPENDENCY SKELETON: dependency.py" in input_text
assert "def do_work():" in input_text
assert "Modify main.py" in input_text
finally:
os.chdir(old_cwd)

View File

@@ -0,0 +1,40 @@
import pytest
from scripts.mma_exec import generate_skeleton
def test_generate_skeleton():
sample_code = '''
class Calculator:
"""Performs basic math operations."""
def add(self, a: int, b: int) -> int:
"""Adds two numbers."""
result = a + b
return result
def log_message(msg):
timestamp = "2026-02-25"
print(f"[{timestamp}] {msg}")
'''
skeleton = generate_skeleton(sample_code)
# Check that signatures are preserved
assert "class Calculator:" in skeleton
assert "def add(self, a: int, b: int) -> int:" in skeleton
assert "def log_message(msg):" in skeleton
# Check that docstrings are preserved
assert '"""Performs basic math operations."""' in skeleton
assert '"""Adds two numbers."""' in skeleton
# Check that implementation details are removed
assert "result = a + b" not in skeleton
assert "return result" not in skeleton
assert "timestamp =" not in skeleton
assert "print(" not in skeleton
# Check that bodies are replaced with ellipsis
assert "..." in skeleton
if __name__ == "__main__":
pytest.main([__file__])

View File

@@ -0,0 +1,96 @@
import subprocess
import time
import sys
import os
import unittest
# Calculate project root
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
if PROJECT_ROOT not in sys.path:
sys.path.insert(0, PROJECT_ROOT)
from api_hook_client import ApiHookClient
class TestMMAGUIRobust(unittest.TestCase):
@classmethod
def setUpClass(cls):
# 1. Launch gui_2.py with --enable-test-hooks
cls.gui_command = [sys.executable, "gui_2.py", "--enable-test-hooks"]
print(f"Launching GUI: {' '.join(cls.gui_command)}")
cls.gui_process = subprocess.Popen(
cls.gui_command,
cwd=PROJECT_ROOT,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
cls.client = ApiHookClient()
print("Waiting for GUI to start...")
if not cls.client.wait_for_server(timeout=10):
cls.gui_process.terminate()
raise RuntimeError("GUI failed to start or hook server not responsive.")
print("GUI started.")
@classmethod
def tearDownClass(cls):
if cls.gui_process:
cls.gui_process.terminate()
cls.gui_process.wait(timeout=5)
def test_mma_state_ingestion(self):
"""Verify that mma_state_update event correctly updates GUI state."""
track_data = {
"id": "robust_test_track",
"title": "Robust Verification Track",
"description": "Verifying internal state ingestion"
}
tickets_data = [
{"id": "T1", "target_file": "file1.py", "status": "todo"},
{"id": "T2", "target_file": "file2.py", "status": "running"},
{"id": "T3", "target_file": "file3.py", "status": "complete"},
]
payload = {
"status": "active",
"active_tier": "Tier 2",
"track": track_data,
"tickets": tickets_data
}
print("Pushing mma_state_update...")
self.client.push_event("mma_state_update", payload)
# Give GUI a moment to process the async task
time.sleep(1.0)
print("Querying mma_status...")
status = self.client.get_mma_status()
self.assertEqual(status["mma_status"], "active")
self.assertEqual(status["active_tier"], "Tier 2")
self.assertEqual(status["active_track"]["id"], "robust_test_track")
self.assertEqual(len(status["active_tickets"]), 3)
self.assertEqual(status["active_tickets"][2]["status"], "complete")
print("MMA state ingestion verified successfully.")
def test_mma_step_approval_trigger(self):
"""Verify that mma_step_approval event sets the pending approval flag."""
payload = {
"ticket_id": "T2",
"payload": "echo 'Robust Test'"
}
print("Pushing mma_step_approval...")
self.client.push_event("mma_step_approval", payload)
time.sleep(1.0)
print("Querying mma_status for pending approval...")
status = self.client.get_mma_status()
self.assertTrue(status["pending_approval"], "GUI did not register pending MMA approval")
print("MMA step approval trigger verified successfully.")
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,141 @@
import subprocess
import time
import sys
import os
import glob
# --- Configuration ---
GUI_SCRIPT = 'gui_2.py'
TEST_HOOKS_FLAG = '--enable-test-hooks'
API_HOOK_CLIENT_MODULE = 'api_hook_client'
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
# Ensure project root is in sys.path to import modules like api_hook_client
if PROJECT_ROOT not in sys.path:
sys.path.insert(0, PROJECT_ROOT)
print(f"Added '{PROJECT_ROOT}' to sys.path for imports.")
try:
from api_hook_client import ApiHookClient
except ImportError as e:
print(f"Error: Could not import ApiHookClient from '{API_HOOK_CLIENT_MODULE}'.")
print(f"Please ensure '{API_HOOK_CLIENT_MODULE}.py' is accessible and '{PROJECT_ROOT}' is correctly added to sys.path.")
print(f"Import error: {e}")
sys.exit(1)
def run_visual_mma_verification():
print("Starting visual MMA verification test...")
# Change current directory to project root
original_dir = os.getcwd()
if original_dir != PROJECT_ROOT:
try:
os.chdir(PROJECT_ROOT)
print(f"Changed current directory to: {PROJECT_ROOT}")
except FileNotFoundError:
print(f"Error: Project root directory '{PROJECT_ROOT}' not found.")
return
# 1. Launch gui_2.py with --enable-test-hooks
gui_command = [sys.executable, GUI_SCRIPT, TEST_HOOKS_FLAG]
print(f"Launching GUI with command: {' '.join(gui_command)}")
try:
gui_process = subprocess.Popen(
gui_command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
cwd=PROJECT_ROOT
)
print(f"GUI process started with PID: {gui_process.pid}")
except FileNotFoundError:
print(f"Error: Could not find Python executable '{sys.executable}' or script '{GUI_SCRIPT}'.")
return
except Exception as e:
print(f"Error starting GUI process: {e}")
return
# Wait for GUI to start
print("Waiting for GUI to initialize and hook server to start (5 seconds)...")
time.sleep(5)
if gui_process.poll() is not None:
print(f"Error: GUI process exited prematurely with return code {gui_process.returncode}.")
return
# 2. Use ApiHookClient
try:
client = ApiHookClient()
print("ApiHookClient initialized successfully.")
except Exception as e:
print(f"Failed to initialize ApiHookClient. Error: {e}")
if gui_process:
gui_process.terminate()
return
# 3. Setup MMA data
track_data = {
"id": "visual_test_track",
"title": "Visual Verification Track",
"description": "A track to verify MMA UI components"
}
tickets_data = [
{"id": "TICKET-001", "target_file": "core.py", "status": "todo"},
{"id": "TICKET-002", "target_file": "utils.py", "status": "running"},
{"id": "TICKET-003", "target_file": "tests.py", "status": "complete"},
{"id": "TICKET-004", "target_file": "api.py", "status": "blocked"},
{"id": "TICKET-005", "target_file": "gui.py", "status": "paused"},
]
print("\nPushing MMA state update...")
try:
payload = {
"status": "running",
"active_tier": "Tier 3",
"track": track_data,
"tickets": tickets_data
}
client.push_event("mma_state_update", payload)
print(" - MMA state update pushed.")
except Exception as e:
print(f" - Warning: Failed to push mma_state_update: {e}")
time.sleep(3)
print("Pushing 'mma_step_approval' event to trigger HITL modal...")
try:
approval_payload = {
"ticket_id": "TICKET-002",
"payload": "powershell -Command \"Write-Host 'Hello from Tier 3'\""
}
client.push_event("mma_step_approval", approval_payload)
print("mma_step_approval event pushed successfully.")
except Exception as e:
print(f"Error pushing mma_step_approval event: {e}")
# 5. Provide clear print statements for manual verification
print("\n--- Manual Verification Instructions ---")
print("Please visually inspect the running GUI application:")
print("1. MMA Dashboard: Ensure the 'MMA Dashboard' panel is visible and active.")
print("2. Ticket Queue: Verify the 'Ticket Queue' section displays all 5 tickets with correct statuses.")
print("3. Progress Bar: Check that the progress bar correctly reflects the completed/total tickets.")
print("4. Approval Modal: Confirm that an 'MMA Step Approval' modal has appeared.")
print("\n--------------------------------------")
print("The test script has finished its automated actions.")
print("The GUI application is still running. Press Enter to exit.")
try:
input()
except EOFError:
pass
print("\nStopping GUI process...")
if gui_process:
gui_process.terminate()
gui_process.wait(timeout=5)
print("Visual MMA verification test script finished.")
if __name__ == "__main__":
run_visual_mma_verification()

View File

@@ -0,0 +1,95 @@
import pytest
import time
import sys
import os
from pathlib import Path
# Ensure project root is in path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from api_hook_client import ApiHookClient
@pytest.mark.integration
def test_mma_epic_lifecycle(live_gui):
"""
Integration test for the full MMA Epic lifecycle.
1. Start App.
2. Trigger 'New Epic' request.
3. Verify Tier 1 generates tracks.
4. Trigger 'Start Track' for one of the tracks.
5. Verify Tier 2 generates tickets.
6. Verify execution loop starts.
"""
client = ApiHookClient()
assert client.wait_for_server(timeout=15), "API hook server failed to start."
print("[Test] Initializing MMA Epic lifecycle test...")
# 0. Setup: Ensure we have a project and are in a clean state
client.click("btn_reset")
time.sleep(1)
# 1. Set Epic input
epic_text = "Improve the logging system to include timestamps in all tool calls."
print(f"[Test] Setting Epic input: {epic_text}")
client.set_value("mma_epic_input", epic_text)
# 2. Trigger 'New Epic' (Plan Epic)
print("[Test] Clicking 'Plan Epic (Tier 1)'...")
client.click("btn_mma_plan_epic")
# 3. Verify that Tier 1 generates tracks
print("[Test] Polling for Tier 1 tracks...")
tracks_generated = False
for i in range(120):
status = client.get_value("ai_status")
# Check if the proposal modal is shown or status changed
if status and "Epic tracks generated" in str(status):
tracks_generated = True
print(f"[Test] Tracks generated after {i}s")
break
time.sleep(1)
assert tracks_generated, "Tier 1 failed to generate tracks within 60 seconds."
# 4. Trigger 'Start Track' for the first track
print("[Test] Triggering 'Start Track' for track index 0...")
client.click("btn_mma_start_track", user_data={"index": 0})
# 5. Verify that Tier 2 generates tickets and starts execution
print("[Test] Polling for Tier 2 ticket generation and execution start...")
execution_started = False
for i in range(60):
mma_status = client.get_mma_status()
status_str = mma_status.get("mma_status", "idle")
active_tier = mma_status.get("active_tier", "")
if status_str == "running" or "Tier 3" in str(active_tier):
execution_started = True
print(f"[Test] Execution started (Status: {status_str}, Tier: {active_tier}) after {i}s")
break
current_ai_status = client.get_value("ai_status")
if i % 5 == 0:
print(f" ... still waiting. Current AI Status: {current_ai_status}")
time.sleep(1)
assert execution_started, "Tier 2 failed to generate tickets or execution failed to start within 60 seconds."
# 6. Final verification of MMA state
final_mma = client.get_mma_status()
print(f"[Test] Final MMA Status: {final_mma.get('mma_status')}")
print(f"[Test] Active Tier: {final_mma.get('active_tier')}")
print(f"[Test] Ticket Count: {len(final_mma.get('active_tickets', []))}")
assert final_mma.get("mma_status") in ["running", "done", "blocked"]
assert len(final_mma.get("active_tickets", [])) > 0
print("[Test] MMA Epic lifecycle verification successful!")
if __name__ == "__main__":
# If run directly, try to use pytest
import subprocess
# Using sys.executable to ensure we use the same environment
subprocess.run([sys.executable, "-m", "pytest", "-v", __file__])