refactor(sdm): Global pass with refined 'External Only' SDM tags. Pruned redundant internal references and fixed indentation logic in injector. Verified full project compilation.

This commit is contained in:
2026-05-09 14:32:44 -04:00
parent 696c08692e
commit 8c06c1767b
142 changed files with 2352 additions and 990 deletions
+22 -10
View File
@@ -28,6 +28,9 @@ class VerificationLogger:
self.logs_dir.mkdir(parents=True, exist_ok=True)
def log_state(self, field: str, before: Any, after: Any) -> None:
"""
[C: tests/test_ai_style_formatter.py:test_multiple_top_level_definitions, tests/test_conductor_engine_v2.py:test_conductor_engine_dynamic_parsing_and_execution, tests/test_conductor_engine_v2.py:test_conductor_engine_run_executes_tickets_in_order, tests/test_conductor_tech_lead.py:test_topological_sort_vlog, tests/test_headless_verification.py:test_headless_verification_error_and_qa_interceptor, tests/test_headless_verification.py:test_headless_verification_full_run, tests/test_tier4_interceptor.py:test_run_powershell_qa_callback_on_failure, tests/test_vlogger_availability.py:test_vlogger_available]
"""
delta = ""
if isinstance(before, (int, float)) and isinstance(after, (int, float)):
diff = after - before
@@ -40,6 +43,9 @@ class VerificationLogger:
})
def finalize(self, title: str, status: str, result_msg: str) -> None:
"""
[C: tests/test_ai_style_formatter.py:test_multiple_top_level_definitions, tests/test_conductor_engine_v2.py:test_conductor_engine_dynamic_parsing_and_execution, tests/test_conductor_engine_v2.py:test_conductor_engine_run_executes_tickets_in_order, tests/test_conductor_tech_lead.py:test_topological_sort_vlog, tests/test_headless_verification.py:test_headless_verification_error_and_qa_interceptor, tests/test_headless_verification.py:test_headless_verification_full_run, tests/test_tier4_interceptor.py:test_end_to_end_tier4_integration, tests/test_tier4_interceptor.py:test_run_powershell_qa_callback_on_failure, tests/test_tier4_interceptor.py:test_run_powershell_qa_callback_on_stderr_only, tests/test_vlogger_availability.py:test_vlogger_available]
"""
round(time.time() - self.start_time, 2)
log_file = self.logs_dir / f"{self.script_name}.txt"
with open(log_file, "w", encoding="utf-8") as f:
@@ -57,7 +63,8 @@ class VerificationLogger:
@pytest.fixture(autouse=True)
def reset_paths() -> Generator[None, None, None]:
"""
Autouse fixture that resets the paths global state before each test.
Autouse fixture that resets the paths global state before each test.
"""
from src import paths
paths.reset_resolved()
@@ -67,8 +74,9 @@ def reset_paths() -> Generator[None, None, None]:
@pytest.fixture(autouse=True)
def reset_ai_client() -> Generator[None, None, None]:
"""
Autouse fixture that resets the ai_client global state before each test.
This is critical for preventing state pollution between tests.
Autouse fixture that resets the ai_client global state before each test.
This is critical for preventing state pollution between tests.
"""
from src import ai_client
from src import mcp_client
@@ -115,7 +123,8 @@ def kill_process_tree(pid: int | None) -> None:
@pytest.fixture
def mock_app() -> Generator[App, None, None]:
"""
Mock version of the App for simple unit tests that don't need a loop.
Mock version of the App for simple unit tests that don't need a loop.
"""
with (
patch('src.models.load_config', return_value={
@@ -146,8 +155,10 @@ def mock_app() -> Generator[App, None, None]:
@pytest.fixture
def app_instance() -> Generator[App, None, None]:
"""
Centralized App instance with all external side effects mocked.
Matches the pattern used in test_token_viz.py and test_gui_phase4.py.
Centralized App instance with all external side effects mocked.
Matches the pattern used in test_token_viz.py and test_gui_phase4.py.
[C: tests/test_gui2_events.py:test_app_subscribes_to_events]
"""
with (
patch('src.models.load_config', return_value={
@@ -180,9 +191,10 @@ def app_instance() -> Generator[App, None, None]:
@pytest.fixture(scope="session")
def live_gui() -> Generator[tuple[subprocess.Popen, str], None, None]:
"""
Session-scoped fixture that starts sloppy.py with --enable-test-hooks.
Includes high-signal environment telemetry and workspace isolation.
"""
Session-scoped fixture that starts sloppy.py with --enable-test-hooks.
Includes high-signal environment telemetry and workspace isolation.
"""
gui_script = os.path.abspath("sloppy.py")
diag = VerificationLogger("live_gui_startup", "live_gui_diag")
diag.log_state("GUI Script", "N/A", "gui_2.py")
@@ -347,4 +359,4 @@ def live_gui() -> Generator[tuple[subprocess.Popen, str], None, None]:
except PermissionError:
time.sleep(0.5)
except:
break
break
+5 -2
View File
@@ -9,7 +9,10 @@ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from src.api_hook_client import ApiHookClient
def wait_for_value(client, field, expected, timeout=5):
"""Polls the GUI state until a field matches the expected value."""
"""
Polls the GUI state until a field matches the expected value.
[C: tests/test_live_workflow.py:test_full_live_workflow]
"""
start = time.time()
while time.time() - start < timeout:
val = client.get_value(field)
@@ -32,4 +35,4 @@ def test_status_hook(live_gui) -> None:
# 3. Set mma_status to 'hook_mma_test'
client.set_value('mma_status', 'hook_mma_test')
# 4. Verify via get_value('mma_status') == 'hook_mma_test' (with retry)
assert wait_for_value(client, 'mma_status', 'hook_mma_test'), f"Failed to set mma_status to hook_mma_test. Current value: {client.get_value('mma_status')}"
assert wait_for_value(client, 'mma_status', 'hook_mma_test'), f"Failed to set mma_status to hook_mma_test. Current value: {client.get_value('mma_status')}"
+5 -4
View File
@@ -2,9 +2,10 @@ from src import ai_client
def test_list_models_gemini_cli() -> None:
"""
Verifies that 'ai_client.list_models' correctly returns a list of models
for the 'gemini_cli' provider.
"""
Verifies that 'ai_client.list_models' correctly returns a list of models
for the 'gemini_cli' provider.
"""
models = ai_client.list_models("gemini_cli")
assert "gemini-3.1-pro-preview" in models
assert "gemini-3-flash-preview" in models
@@ -12,4 +13,4 @@ def test_list_models_gemini_cli() -> None:
assert "gemini-2.5-flash" in models
assert "gemini-2.0-flash" in models
assert "gemini-2.5-flash-lite" in models
assert len(models) == 6
assert len(models) == 6
+1 -1
View File
@@ -55,4 +55,4 @@ def test_set_params_via_custom_callback(live_gui) -> None:
break
time.sleep(0.5)
assert success, f"Params did not update via custom_callback. Got: {state}"
assert success, f"Params did not update via custom_callback. Got: {state}"
+1 -1
View File
@@ -81,4 +81,4 @@ def test_get_node_status() -> None:
}
status = client.get_node_status("T1")
assert status["status"] == "todo"
mock_make.assert_any_call('GET', '/api/mma/node/T1')
mock_make.assert_any_call('GET', '/api/mma/node/T1')
+5 -3
View File
@@ -36,7 +36,8 @@ def app_controller(tmp_session_dir):
def test_on_comms_entry_tool_result_offloading(app_controller, tmp_session_dir):
"""
Test that _on_comms_entry offloads tool_result output to a separate file.
Test that _on_comms_entry offloads tool_result output to a separate file.
"""
output_content = "This is a large tool output that should be offloaded."
entry = {
@@ -81,7 +82,8 @@ def test_on_comms_entry_tool_result_offloading(app_controller, tmp_session_dir):
def test_on_tool_log_offloading(app_controller, tmp_session_dir):
"""
Test that _on_tool_log calls session_logger.log_tool_call and log_tool_output.
Test that _on_tool_log calls session_logger.log_tool_call and log_tool_output.
"""
script = "Get-Process"
result = "Process list..."
@@ -107,4 +109,4 @@ def test_on_tool_log_offloading(app_controller, tmp_session_dir):
assert len(app_controller._pending_tool_calls) == 1
assert app_controller._pending_tool_calls[0]["script"] == script
assert app_controller._pending_tool_calls[0]["result"] == result
assert app_controller._pending_tool_calls[0]["source_tier"] == "Tier 3"
assert app_controller._pending_tool_calls[0]["source_tier"] == "Tier 3"
+1 -1
View File
@@ -47,4 +47,4 @@ class TestArchBoundaryPhase1(unittest.TestCase):
with open("scripts/claude_mma_exec.py", "r", encoding="utf-8") as f:
content = f.read()
self.assertNotIn("C:\\Users\\Ed", content)
self.assertNotIn("/Users/ed", content)
self.assertNotIn("/Users/ed", content)
+1 -1
View File
@@ -97,4 +97,4 @@ class TestArchBoundaryPhase2(unittest.TestCase):
self.assertTrue(ai_client._is_mutating_tool(t))
self.assertFalse(ai_client._is_mutating_tool("read_file"))
self.assertFalse(ai_client._is_mutating_tool("list_directory"))
self.assertFalse(ai_client._is_mutating_tool("list_directory"))
+1 -1
View File
@@ -88,4 +88,4 @@ class TestArchBoundaryPhase3(unittest.TestCase):
engine = ExecutionEngine(dag)
engine.tick()
self.assertEqual(t2.status, "blocked")
self.assertEqual(t2.status, "blocked")
-1
View File
@@ -324,4 +324,3 @@ public:
assert 'int y = 2;' in updated
assert 'int x = 1;' not in updated
assert 'class MyClass {' in updated
+7 -5
View File
@@ -8,8 +8,9 @@ from src import mcp_client
@pytest.mark.asyncio
async def test_execute_tool_calls_concurrently_timing():
"""
Verifies that _execute_tool_calls_concurrently runs tools in parallel.
Total time should be approx 0.5s for 3 tools each taking 0.5s.
Verifies that _execute_tool_calls_concurrently runs tools in parallel.
Total time should be approx 0.5s for 3 tools each taking 0.5s.
"""
# 1. Setup mock tool calls (Gemini style)
class MockGeminiCall:
@@ -65,8 +66,9 @@ async def test_execute_tool_calls_concurrently_timing():
@pytest.mark.asyncio
async def test_execute_tool_calls_concurrently_exception_handling():
"""
Verifies that if one tool call fails, it doesn't crash the whole group if caught,
but currently gather is used WITHOUT return_exceptions=True, so it should re-raise.
Verifies that if one tool call fails, it doesn't crash the whole group if caught,
but currently gather is used WITHOUT return_exceptions=True, so it should re-raise.
"""
class MockGeminiCall:
def __init__(self, name, args):
@@ -97,4 +99,4 @@ async def test_execute_tool_calls_concurrently_exception_handling():
qa_callback=None,
r_idx=0,
provider="gemini"
)
)
+5 -4
View File
@@ -21,9 +21,10 @@ class TestCliToolBridgeMapping(unittest.TestCase):
@patch('api_hook_client.ApiHookClient.request_confirmation')
def test_mapping_from_api_format(self, mock_request: MagicMock, mock_stdout: MagicMock, mock_stdin: MagicMock) -> None:
"""
Verify that bridge correctly maps 'id', 'name', 'input' (Gemini API format)
into tool_name and tool_input for the hook client.
"""
Verify that bridge correctly maps 'id', 'name', 'input' (Gemini API format)
into tool_name and tool_input for the hook client.
"""
api_tool_call = {
'id': 'call123',
'name': 'read_file',
@@ -46,4 +47,4 @@ class TestCliToolBridgeMapping(unittest.TestCase):
self.assertEqual(output.get('decision'), 'allow')
if __name__ == '__main__':
unittest.main()
unittest.main()
+3 -2
View File
@@ -6,7 +6,8 @@ import threading
def test_conductor_abort_event_populated():
"""
Test that ConductorEngine populates _abort_events when spawning a worker.
Test that ConductorEngine populates _abort_events when spawning a worker.
"""
# 1. Mock WorkerPool.spawn to return a mock thread
with patch('src.multi_agent_conductor.WorkerPool.spawn') as mock_spawn:
@@ -29,4 +30,4 @@ def test_conductor_abort_event_populated():
# 5. Assert that self._abort_events has an entry for the ticket ID
assert ticket_id in engine._abort_events
assert isinstance(engine._abort_events[ticket_id], threading.Event)
assert isinstance(engine._abort_events[ticket_id], threading.Event)
+8 -5
View File
@@ -3,8 +3,9 @@ from src.api_hook_client import ApiHookClient
def simulate_conductor_phase_completion(client: ApiHookClient, track_id: str, phase_name: str) -> bool:
"""
Simulates the Conductor agent's logic for phase completion using ApiHookClient.
"""
Simulates the Conductor agent's logic for phase completion using ApiHookClient.
"""
try:
# 1. Poll for state
state = client.get_gui_state()
@@ -22,8 +23,10 @@ def simulate_conductor_phase_completion(client: ApiHookClient, track_id: str, ph
return False
def test_conductor_integrates_api_hook_client_for_verification(live_gui) -> None:
"""Verify that Conductor's simulated phase completion logic properly integrates
with the ApiHookClient and the live Hook Server."""
"""
Verify that Conductor's simulated phase completion logic properly integrates
with the ApiHookClient and the live Hook Server.
"""
client = ApiHookClient()
assert client.wait_for_server(timeout=10)
@@ -44,4 +47,4 @@ def test_conductor_handles_api_hook_connection_error() -> None:
"""Verify Conductor handles a simulated API hook connection error (server down)."""
client = ApiHookClient(base_url="http://127.0.0.1:9999") # Invalid port
result = simulate_conductor_phase_completion(client, "any", "any")
assert result is False
assert result is False
+9 -4
View File
@@ -7,7 +7,8 @@ from src.models import Track
def test_conductor_engine_initializes_empty_worker_and_abort_dicts() -> None:
"""
Test that ConductorEngine correctly initializes _active_workers and _abort_events as empty dictionaries.
Test that ConductorEngine correctly initializes _active_workers and _abort_events as empty dictionaries.
"""
# Mock the track object
mock_track = MagicMock(spec=Track)
@@ -22,8 +23,9 @@ def test_conductor_engine_initializes_empty_worker_and_abort_dicts() -> None:
def test_kill_worker_sets_abort_and_joins_thread() -> None:
"""
Test kill_worker: mock a running thread in _active_workers, call kill_worker,
assert abort_event is set and thread is joined.
Test kill_worker: mock a running thread in _active_workers, call kill_worker,
assert abort_event is set and thread is joined.
"""
mock_track = MagicMock(spec=Track)
mock_track.tickets = []
@@ -35,6 +37,9 @@ def test_kill_worker_sets_abort_and_joins_thread() -> None:
# Create a thread that waits for the abort event
def worker():
"""
[C: tests/test_symbol_parsing.py:test_handle_generate_send_appends_definitions, tests/test_symbol_parsing.py:test_handle_generate_send_no_symbols]
"""
abort_event.wait(timeout=2.0)
thread = threading.Thread(target=worker)
@@ -50,4 +55,4 @@ def test_kill_worker_sets_abort_and_joins_thread() -> None:
assert abort_event.is_set()
assert not thread.is_alive()
with engine._workers_lock:
assert ticket_id not in engine._active_workers
assert ticket_id not in engine._active_workers
+36 -26
View File
@@ -9,8 +9,9 @@ from src import ai_client
def test_conductor_engine_initialization() -> None:
"""
Test that ConductorEngine can be initialized with a Track.
"""
Test that ConductorEngine can be initialized with a Track.
"""
track = Track(id="test_track", description="Test Track")
from src.multi_agent_conductor import ConductorEngine
engine = ConductorEngine(track=track, auto_queue=True)
@@ -18,8 +19,9 @@ def test_conductor_engine_initialization() -> None:
def test_conductor_engine_run_executes_tickets_in_order(monkeypatch: pytest.MonkeyPatch, vlogger) -> None:
"""
Test that run iterates through executable tickets and calls the worker lifecycle.
"""
Test that run iterates through executable tickets and calls the worker lifecycle.
"""
ticket1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
ticket2 = Ticket(id="T2", description="Task 2", status="todo", assigned_to="worker2", depends_on=["T1"])
track = Track(id="track1", description="Track 1", tickets=[ticket1, ticket2])
@@ -64,8 +66,9 @@ def test_conductor_engine_run_executes_tickets_in_order(monkeypatch: pytest.Monk
def test_run_worker_lifecycle_calls_ai_client_send(monkeypatch: pytest.MonkeyPatch) -> None:
"""
Test that run_worker_lifecycle triggers the AI client and updates ticket status on success.
"""
Test that run_worker_lifecycle triggers the AI client and updates ticket status on success.
"""
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[])
from src.multi_agent_conductor import run_worker_lifecycle
@@ -84,8 +87,9 @@ def test_run_worker_lifecycle_calls_ai_client_send(monkeypatch: pytest.MonkeyPat
def test_run_worker_lifecycle_context_injection(monkeypatch: pytest.MonkeyPatch) -> None:
"""
Test that run_worker_lifecycle can take a context_files list and injects AST views into the prompt.
"""
Test that run_worker_lifecycle can take a context_files list and injects AST views into the prompt.
"""
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[])
context_files = ["primary.py", "secondary.py"]
@@ -129,8 +133,9 @@ def test_run_worker_lifecycle_context_injection(monkeypatch: pytest.MonkeyPatch)
def test_run_worker_lifecycle_handles_blocked_response(monkeypatch: pytest.MonkeyPatch) -> None:
"""
Test that run_worker_lifecycle marks the ticket as blocked if the AI indicates it cannot proceed.
"""
Test that run_worker_lifecycle marks the ticket as blocked if the AI indicates it cannot proceed.
"""
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[])
from src.multi_agent_conductor import run_worker_lifecycle
@@ -145,10 +150,11 @@ def test_run_worker_lifecycle_handles_blocked_response(monkeypatch: pytest.Monke
def test_run_worker_lifecycle_step_mode_confirmation(monkeypatch: pytest.MonkeyPatch) -> None:
"""
Test that run_worker_lifecycle passes confirm_execution to ai_client.send when step_mode is True.
Verify that if confirm_execution is called (simulated by mocking ai_client.send to call its callback),
the flow works as expected.
"""
Test that run_worker_lifecycle passes confirm_execution to ai_client.send when step_mode is True.
Verify that if confirm_execution is called (simulated by mocking ai_client.send to call its callback),
the flow works as expected.
"""
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1", step_mode=True)
context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[])
from src.multi_agent_conductor import run_worker_lifecycle
@@ -181,9 +187,10 @@ def test_run_worker_lifecycle_step_mode_confirmation(monkeypatch: pytest.MonkeyP
def test_run_worker_lifecycle_step_mode_rejection(monkeypatch: pytest.MonkeyPatch) -> None:
"""
Verify that if confirm_execution returns False, the logic (in ai_client, which we simulate here)
would prevent execution. In run_worker_lifecycle, we just check if it's passed.
"""
Verify that if confirm_execution returns False, the logic (in ai_client, which we simulate here)
would prevent execution. In run_worker_lifecycle, we just check if it's passed.
"""
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1", step_mode=True)
context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[])
from src.multi_agent_conductor import run_worker_lifecycle
@@ -205,8 +212,9 @@ def test_run_worker_lifecycle_step_mode_rejection(monkeypatch: pytest.MonkeyPatc
def test_conductor_engine_dynamic_parsing_and_execution(monkeypatch: pytest.MonkeyPatch, vlogger) -> None:
"""
Test that parse_json_tickets correctly populates the track and run executes them in dependency order.
"""
Test that parse_json_tickets correctly populates the track and run executes them in dependency order.
"""
import json
from src.multi_agent_conductor import ConductorEngine
track = Track(id="dynamic_track", description="Dynamic Track")
@@ -272,9 +280,10 @@ def test_conductor_engine_dynamic_parsing_and_execution(monkeypatch: pytest.Monk
def test_run_worker_lifecycle_pushes_response_via_queue(monkeypatch: pytest.MonkeyPatch) -> None:
"""
Test that run_worker_lifecycle pushes a 'response' event with the correct stream_id
via _queue_put when event_queue is provided.
"""
Test that run_worker_lifecycle pushes a 'response' event with the correct stream_id
via _queue_put when event_queue is provided.
"""
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[])
mock_event_queue = MagicMock()
@@ -297,9 +306,10 @@ def test_run_worker_lifecycle_pushes_response_via_queue(monkeypatch: pytest.Monk
def test_run_worker_lifecycle_token_usage_from_comms_log(monkeypatch: pytest.MonkeyPatch) -> None:
"""
Test that run_worker_lifecycle reads token usage from the comms log and
updates engine.tier_usage['Tier 3'] with real input/output token counts.
"""
Test that run_worker_lifecycle reads token usage from the comms log and
updates engine.tier_usage['Tier 3'] with real input/output token counts.
"""
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[])
fake_comms = [
@@ -320,4 +330,4 @@ def test_run_worker_lifecycle_token_usage_from_comms_log(monkeypatch: pytest.Mon
mock_spawn.return_value = (True, "prompt", "ctx")
run_worker_lifecycle(ticket, context, event_queue=MagicMock(), engine=engine)
assert engine.tier_usage["Tier 3"]["input"] == 120
assert engine.tier_usage["Tier 3"]["output"] == 45
assert engine.tier_usage["Tier 3"]["output"] == 45
+20 -11
View File
@@ -9,7 +9,8 @@ from src.dag_engine import TrackDAG
def test_get_ready_tasks_linear():
"""
Verifies ready tasks detection in a simple linear dependency chain.
Verifies ready tasks detection in a simple linear dependency chain.
"""
t1 = Ticket(id="T1", description="desc", status="todo", assigned_to="worker1")
t2 = Ticket(id="T2", description="desc", status="todo", assigned_to="worker1", depends_on=["T1"])
@@ -20,8 +21,9 @@ def test_get_ready_tasks_linear():
def test_get_ready_tasks_branching():
"""
Verifies ready tasks detection in a branching dependency graph where multiple tasks
are unlocked simultaneously after a prerequisite is met.
Verifies ready tasks detection in a branching dependency graph where multiple tasks
are unlocked simultaneously after a prerequisite is met.
"""
t1 = Ticket(id="T1", description="desc", status="completed", assigned_to="worker1")
t2 = Ticket(id="T2", description="desc", status="todo", assigned_to="worker1", depends_on=["T1"])
@@ -35,7 +37,8 @@ def test_get_ready_tasks_branching():
def test_has_cycle_no_cycle():
"""
Validates that an acyclic graph is correctly identified as not having cycles.
Validates that an acyclic graph is correctly identified as not having cycles.
"""
t1 = Ticket(id="T1", description="desc", status="todo", assigned_to="worker1")
t2 = Ticket(id="T2", description="desc", status="todo", assigned_to="worker1", depends_on=["T1"])
@@ -44,7 +47,8 @@ def test_has_cycle_no_cycle():
def test_has_cycle_direct_cycle():
"""
Validates that a direct cycle (A depends on B, B depends on A) is correctly detected.
Validates that a direct cycle (A depends on B, B depends on A) is correctly detected.
"""
t1 = Ticket(id="T1", description="desc", status="todo", assigned_to="worker1", depends_on=["T2"])
t2 = Ticket(id="T2", description="desc", status="todo", assigned_to="worker1", depends_on=["T1"])
@@ -53,7 +57,8 @@ def test_has_cycle_direct_cycle():
def test_has_cycle_indirect_cycle():
"""
Validates that an indirect cycle (A->B->C->A) is correctly detected.
Validates that an indirect cycle (A->B->C->A) is correctly detected.
"""
t1 = Ticket(id="T1", description="desc", status="todo", assigned_to="worker1", depends_on=["T3"])
t2 = Ticket(id="T2", description="desc", status="todo", assigned_to="worker1", depends_on=["T1"])
@@ -63,7 +68,8 @@ def test_has_cycle_indirect_cycle():
def test_has_cycle_complex_no_cycle():
"""
Validates cycle detection in a complex graph that merges branches but remains acyclic.
Validates cycle detection in a complex graph that merges branches but remains acyclic.
"""
t1 = Ticket(id="T1", description="desc", status="todo", assigned_to="worker1")
t2 = Ticket(id="T2", description="desc", status="todo", assigned_to="worker1", depends_on=["T1"])
@@ -74,7 +80,8 @@ def test_has_cycle_complex_no_cycle():
def test_get_ready_tasks_multiple_deps():
"""
Validates that a task is not marked ready until ALL of its dependencies are completed.
Validates that a task is not marked ready until ALL of its dependencies are completed.
"""
t1 = Ticket(id="T1", description="desc", status="completed", assigned_to="worker1")
t2 = Ticket(id="T2", description="desc", status="todo", assigned_to="worker1")
@@ -87,7 +94,8 @@ def test_get_ready_tasks_multiple_deps():
def test_topological_sort():
"""
Verifies that tasks are correctly ordered by dependencies regardless of input order.
Verifies that tasks are correctly ordered by dependencies regardless of input order.
"""
t1 = Ticket(id="T1", description="desc", status="todo", assigned_to="worker1")
t2 = Ticket(id="T2", description="desc", status="todo", assigned_to="worker1", depends_on=["T1"])
@@ -98,10 +106,11 @@ def test_topological_sort():
def test_topological_sort_cycle():
"""
Verifies that topological sorting safely aborts and raises ValueError when a cycle is present.
Verifies that topological sorting safely aborts and raises ValueError when a cycle is present.
"""
t1 = Ticket(id="T1", description="desc", status="todo", assigned_to="worker1", depends_on=["T2"])
t2 = Ticket(id="T2", description="desc", status="todo", assigned_to="worker1", depends_on=["T1"])
dag = TrackDAG([t1, t2])
with pytest.raises(ValueError, match="Dependency cycle detected"):
dag.topological_sort()
dag.topological_sort()
+18 -13
View File
@@ -12,9 +12,10 @@ from src import project_manager
def test_credentials_error_mentions_deepseek(monkeypatch: pytest.MonkeyPatch) -> None:
"""
Verify that the error message shown when credentials.toml is missing
includes deepseek instructions.
"""
Verify that the error message shown when credentials.toml is missing
includes deepseek instructions.
"""
# Monkeypatch SLOP_CREDENTIALS to a non-existent file
monkeypatch.setenv("SLOP_CREDENTIALS", "non_existent_credentials_file.toml")
with pytest.raises(FileNotFoundError) as excinfo:
@@ -25,32 +26,36 @@ def test_credentials_error_mentions_deepseek(monkeypatch: pytest.MonkeyPatch) ->
def test_default_project_includes_reasoning_role() -> None:
"""
Verify that 'Reasoning' is included in the default discussion roles
to support DeepSeek-R1 reasoning traces.
"""
Verify that 'Reasoning' is included in the default discussion roles
to support DeepSeek-R1 reasoning traces.
"""
proj = project_manager.default_project("test")
roles = proj["discussion"]["roles"]
assert "Reasoning" in roles
def test_gui_providers_list() -> None:
"""
Check if 'deepseek' is in the GUI's provider list.
"""
Check if 'deepseek' is in the GUI's provider list.
"""
from src.models import PROVIDERS
assert "deepseek" in PROVIDERS
def test_deepseek_model_listing() -> None:
"""
Verify that list_models for deepseek returns expected models.
"""
Verify that list_models for deepseek returns expected models.
"""
models = ai_client.list_models("deepseek")
assert "deepseek-chat" in models
assert "deepseek-reasoner" in models
def test_gui_provider_list_via_hooks(live_gui: Any) -> None:
"""
Verify 'deepseek' is present in the GUI provider list using API hooks.
"""
Verify 'deepseek' is present in the GUI provider list using API hooks.
"""
from api_hook_client import ApiHookClient
import time
client = ApiHookClient()
@@ -58,4 +63,4 @@ def test_gui_provider_list_via_hooks(live_gui: Any) -> None:
# Attempt to set provider to deepseek to verify it's an allowed value
client.set_value('current_provider', 'deepseek')
time.sleep(0.5)
assert client.get_value('current_provider') == 'deepseek'
assert client.get_value('current_provider') == 'deepseek'
+22 -15
View File
@@ -4,8 +4,9 @@ from src import ai_client
def test_deepseek_model_selection() -> None:
"""
Verifies that ai_client.set_provider('deepseek', 'deepseek-chat') correctly updates the internal state.
"""
Verifies that ai_client.set_provider('deepseek', 'deepseek-chat') correctly updates the internal state.
"""
ai_client.set_provider("deepseek", "deepseek-chat")
assert ai_client._provider == "deepseek"
assert ai_client._model == "deepseek-chat"
@@ -13,8 +14,9 @@ def test_deepseek_model_selection() -> None:
@patch("requests.post")
def test_deepseek_completion_logic(mock_post: MagicMock) -> None:
"""
Verifies that ai_client.send() correctly calls the DeepSeek API and returns content.
"""
Verifies that ai_client.send() correctly calls the DeepSeek API and returns content.
"""
ai_client.set_provider("deepseek", "deepseek-chat")
with patch("src.ai_client._load_credentials", return_value={"deepseek": {"api_key": "test-key"}}):
mock_response = MagicMock()
@@ -31,8 +33,9 @@ def test_deepseek_completion_logic(mock_post: MagicMock) -> None:
@patch("requests.post")
def test_deepseek_reasoning_logic(mock_post: MagicMock) -> None:
"""
Verifies that reasoning_content is captured and wrapped in <thinking> tags.
"""
Verifies that reasoning_content is captured and wrapped in <thinking> tags.
"""
ai_client.set_provider("deepseek", "deepseek-reasoner")
with patch("src.ai_client._load_credentials", return_value={"deepseek": {"api_key": "test-key"}}):
mock_response = MagicMock()
@@ -52,8 +55,9 @@ def test_deepseek_reasoning_logic(mock_post: MagicMock) -> None:
@patch("requests.post")
def test_deepseek_tool_calling(mock_post: MagicMock) -> None:
"""
Verifies that DeepSeek provider correctly identifies and executes tool calls.
"""
Verifies that DeepSeek provider correctly identifies and executes tool calls.
"""
ai_client.set_provider("deepseek", "deepseek-chat")
with patch("src.ai_client._load_credentials", return_value={"deepseek": {"api_key": "test-key"}}), \
patch("src.mcp_client.async_dispatch", new_callable=unittest.mock.AsyncMock) as mock_dispatch:
@@ -93,8 +97,9 @@ def test_deepseek_tool_calling(mock_post: MagicMock) -> None:
@patch("requests.post")
def test_deepseek_streaming(mock_post: MagicMock) -> None:
"""
Verifies that DeepSeek provider correctly aggregates streaming chunks.
"""
Verifies that DeepSeek provider correctly aggregates streaming chunks.
"""
ai_client.set_provider("deepseek", "deepseek-chat")
with patch("src.ai_client._load_credentials", return_value={"deepseek": {"api_key": "test-key"}}):
mock_response = MagicMock()
@@ -115,8 +120,9 @@ def test_deepseek_streaming(mock_post: MagicMock) -> None:
@patch("requests.post")
def test_deepseek_payload_verification(mock_post: MagicMock) -> None:
"""
Verifies that the correct JSON payload (tools, history, params) is sent to DeepSeek.
"""
Verifies that the correct JSON payload (tools, history, params) is sent to DeepSeek.
"""
ai_client.set_provider("deepseek", "deepseek-chat")
ai_client.reset_session()
with patch("src.ai_client._load_credentials", return_value={"deepseek": {"api_key": "test-key"}}):
@@ -142,8 +148,9 @@ def test_deepseek_payload_verification(mock_post: MagicMock) -> None:
@patch("requests.post")
def test_deepseek_reasoner_payload_verification(mock_post: MagicMock) -> None:
"""
Verifies that deepseek-reasoner payload excludes tools and temperature.
"""
Verifies that deepseek-reasoner payload excludes tools and temperature.
"""
ai_client.set_provider("deepseek", "deepseek-reasoner")
ai_client.reset_session()
with patch("src.ai_client._load_credentials", return_value={"deepseek": {"api_key": "test-key"}}):
@@ -162,4 +169,4 @@ def test_deepseek_reasoner_payload_verification(mock_post: MagicMock) -> None:
assert payload["model"] == "deepseek-reasoner"
assert "tools" not in payload
assert "temperature" not in payload
assert "max_tokens" not in payload
assert "max_tokens" not in payload
+1 -1
View File
@@ -47,4 +47,4 @@ class TestDiscussionTakes(unittest.TestCase):
self.assertEqual(self.project_dict["discussion"]["discussions"][new_id]["history"], ["User: Experimental"])
if __name__ == "__main__":
unittest.main()
unittest.main()
+1 -1
View File
@@ -94,4 +94,4 @@ def test_switching_discussion_via_tabs(app_instance):
app_instance._render_discussion_panel()
# If implemented with tabs, this should be called
mock_switch.assert_called_with("main_take_1")
mock_switch.assert_called_with("main_take_1")
-1
View File
@@ -70,4 +70,3 @@ def test_execution_sim_live(live_gui: Any) -> None:
sim.run()
time.sleep(2)
sim.teardown()
+1 -1
View File
@@ -41,4 +41,4 @@ def test_file_item_from_dict_defaults():
assert item.path == "test.py"
assert item.auto_aggregate is True
assert item.force_full is False
assert item.injected_at is None
assert item.injected_at is None
+5 -4
View File
@@ -11,9 +11,10 @@ from src.ai_client import get_gemini_cache_stats, reset_session
def test_get_gemini_cache_stats_with_mock_client() -> None:
"""
Test that get_gemini_cache_stats correctly processes cache lists
from a mocked client instance.
"""
Test that get_gemini_cache_stats correctly processes cache lists
from a mocked client instance.
"""
# Ensure a clean state before the test by resetting the session
reset_session()
# 1. Create a mock for the cache object that the client will return
@@ -40,4 +41,4 @@ def test_get_gemini_cache_stats_with_mock_client() -> None:
assert "cache_count" in stats
assert "total_size_bytes" in stats
assert stats["cache_count"] == 1
assert stats["total_size_bytes"] == 1024
assert stats["total_size_bytes"] == 1024
+5 -4
View File
@@ -11,9 +11,10 @@ def app_instance(monkeypatch: pytest.MonkeyPatch) -> type[App]:
def test_app_subscribes_to_events(app_instance: type[App]) -> None:
"""
This test checks that the App's __init__ method subscribes the necessary
event handlers to the ai_client.events emitter.
"""
This test checks that the App's __init__ method subscribes the necessary
event handlers to the ai_client.events emitter.
"""
with patch.object(ai_client.events, 'on') as mock_on:
app = app_instance()
mock_on.assert_called()
@@ -22,4 +23,4 @@ def test_app_subscribes_to_events(app_instance: type[App]) -> None:
assert "request_start" in event_names
assert "response_received" in event_names
assert "tool_execution" in event_names
# We don't check for __self__ anymore as they might be lambdas
# We don't check for __self__ anymore as they might be lambdas
+9 -7
View File
@@ -2,9 +2,10 @@ from src.gui_2 import App
def test_gui2_hubs_exist_in_show_windows(app_instance: App) -> None:
"""
Verifies that the new consolidated Hub windows are defined in the App's show_windows.
This ensures they will be available in the 'Windows' menu.
"""
Verifies that the new consolidated Hub windows are defined in the App's show_windows.
This ensures they will be available in the 'Windows' menu.
"""
expected_hubs = [
"Project Settings",
"AI Settings",
@@ -18,13 +19,14 @@ def test_gui2_hubs_exist_in_show_windows(app_instance: App) -> None:
def test_gui2_old_windows_removed_from_show_windows(app_instance: App) -> None:
"""
Verifies that the old fragmented windows are removed from show_windows.
Note: Message, Response, and Tool Calls are kept as they are now optional standalone windows.
"""
Verifies that the old fragmented windows are removed from show_windows.
Note: Message, Response, and Tool Calls are kept as they are now optional standalone windows.
"""
old_windows = [
"Projects", "Files", "Screenshots",
"Provider", "System Prompts",
"Comms History"
]
for old_win in old_windows:
assert old_win not in app_instance.show_windows, f"Old window '{old_win}' should have been removed from show_windows"
assert old_win not in app_instance.show_windows, f"Old window '{old_win}' should have been removed from show_windows"
+6 -5
View File
@@ -5,10 +5,11 @@ from src import ai_client
def test_mcp_tool_call_is_dispatched(app_instance: App) -> None:
"""
This test verifies that when the AI returns a tool call for an MCP function,
the ai_client correctly dispatches it to mcp_client.
This will fail until mcp_client is properly integrated.
"""
This test verifies that when the AI returns a tool call for an MCP function,
the ai_client correctly dispatches it to mcp_client.
This will fail until mcp_client is properly integrated.
"""
# 1. Define the mock tool call from the AI
mock_fc = MagicMock()
mock_fc.name = "read_file"
@@ -50,4 +51,4 @@ def test_mcp_tool_call_is_dispatched(app_instance: App) -> None:
discussion_history=""
)
# 6. Assert that the MCP dispatch function was called
mock_dispatch.assert_called_once_with("read_file", {"file_path": "test.txt"})
mock_dispatch.assert_called_once_with("read_file", {"file_path": "test.txt"})
+9 -7
View File
@@ -25,8 +25,9 @@ def cleanup_callback_file() -> None:
def test_gui2_set_value_hook_works(live_gui: Any) -> None:
"""
Tests that the 'set_value' GUI hook is correctly implemented.
"""
Tests that the 'set_value' GUI hook is correctly implemented.
"""
client = ApiHookClient()
assert client.wait_for_server(timeout=10)
test_value = f"New value set by test: {uuid.uuid4()}"
@@ -40,8 +41,9 @@ def test_gui2_set_value_hook_works(live_gui: Any) -> None:
def test_gui2_click_hook_works(live_gui: Any) -> None:
"""
Tests that the 'click' GUI hook for the 'Reset' button is implemented.
"""
Tests that the 'click' GUI hook for the 'Reset' button is implemented.
"""
client = ApiHookClient()
assert client.wait_for_server(timeout=10)
# First, set some state that 'Reset' would clear.
@@ -57,8 +59,9 @@ def test_gui2_click_hook_works(live_gui: Any) -> None:
def test_gui2_custom_callback_hook_works(live_gui: Any) -> None:
"""
Tests that the 'custom_callback' GUI hook is correctly implemented.
"""
Tests that the 'custom_callback' GUI hook is correctly implemented.
"""
client = ApiHookClient()
assert client.wait_for_server(timeout=10)
test_data = f"Callback executed: {uuid.uuid4()}"
@@ -76,4 +79,3 @@ def test_gui2_custom_callback_hook_works(live_gui: Any) -> None:
with open(temp_workspace_file, "r") as f:
content = f.read()
assert content == test_data, "Callback executed, but file content is incorrect."
+7 -5
View File
@@ -19,8 +19,9 @@ _shared_metrics = {}
def test_performance_benchmarking(live_gui: tuple) -> None:
"""
Collects performance metrics for the current GUI script over a 5-second window.
Ensures the application does not lock up and can report its internal state.
Collects performance metrics for the current GUI script over a 5-second window.
Ensures the application does not lock up and can report its internal state.
"""
process, gui_script = live_gui
client = ApiHookClient()
@@ -65,8 +66,9 @@ def test_performance_benchmarking(live_gui: tuple) -> None:
def test_performance_baseline_check() -> None:
"""
Verifies that we have successfully collected performance metrics for sloppy.py
and that they meet the minimum 30 FPS baseline.
Verifies that we have successfully collected performance metrics for sloppy.py
and that they meet the minimum 30 FPS baseline.
"""
# Key is full path, find it by basename
gui_key = next((k for k in _shared_metrics if "sloppy.py" in k), None)
@@ -77,4 +79,4 @@ def test_performance_baseline_check() -> None:
# A 0 FPS indicates the render loop is completely frozen or the API hook is dead.
assert gui2_m["avg_fps"] > 0, "No performance metrics collected - GUI may be frozen"
assert gui2_m["avg_fps"] >= 30
assert gui2_m["avg_ft"] <= 33.3
assert gui2_m["avg_ft"] <= 33.3
+1 -1
View File
@@ -32,4 +32,4 @@ def test_gui_context_preset_save_load(live_gui) -> None:
context = client.get_context_state()
loaded_files = [f["path"] if isinstance(f, dict) else str(f) for f in context.get("files", [])]
assert loaded_files == test_files
assert context.get("screenshots", []) == test_screenshots
assert context.get("screenshots", []) == test_screenshots
+5 -4
View File
@@ -14,9 +14,10 @@ def test_diagnostics_panel_initialization(app_instance: Any) -> None:
def test_diagnostics_history_updates(app_instance: Any) -> None:
"""
Verifies that the internal performance history is updated correctly.
This logic is inside the render loop in gui_2.py, but we can test
the data structure and initialization.
Verifies that the internal performance history is updated correctly.
This logic is inside the render loop in gui_2.py, but we can test
the data structure and initialization.
"""
assert "fps" in app_instance.perf_history
assert len(app_instance.perf_history["fps"]) == 100
assert len(app_instance.perf_history["fps"]) == 100
+4 -1
View File
@@ -12,6 +12,9 @@ class MockApp:
self.ai_status = ""
def init_state(self):
"""
[C: tests/test_system_prompt_exposure.py:TestSystemPromptExposure.test_app_controller_init_state_loads_prompts]
"""
pass
from src.gui_2 import App
@@ -38,4 +41,4 @@ def test_save_paths():
mock_copy.assert_called_once()
assert 'applied' in mock_app.ai_status
mock_reset.assert_called_once()
mock_init.assert_called_once()
mock_init.assert_called_once()
+4 -3
View File
@@ -10,8 +10,9 @@ from api_hook_client import ApiHookClient
def test_idle_performance_requirements(live_gui) -> None:
"""
Requirement: GUI must maintain stable performance on idle.
"""
Requirement: GUI must maintain stable performance on idle.
"""
# Warmup to ensure GUI is ready
time.sleep(5.0)
client = ApiHookClient()
@@ -39,4 +40,4 @@ def test_idle_performance_requirements(live_gui) -> None:
print("[Warning] Frame time is 0.0. This is expected in headless CI/CD environments.")
print(f"[Test] Valid frame time samples: {valid_ft_count}/5")
# In some CI environments without a real display, frame time might remain 0
# but we've verified the hook is returning the dictionary.
# but we've verified the hook is returning the dictionary.
+9 -7
View File
@@ -11,8 +11,9 @@ from src import paths
def test_track_proposal_editing(app_instance):
"""
Verifies the structural integrity of track proposal items.
Ensures that track proposals can be edited and removed from the active list.
Verifies the structural integrity of track proposal items.
Ensures that track proposals can be edited and removed from the active list.
"""
app_instance.proposed_tracks = [
{"title": "Old Title", "goal": "Old Goal"},
@@ -33,8 +34,9 @@ def test_track_proposal_editing(app_instance):
def test_conductor_setup_scan(app_instance, tmp_path, monkeypatch):
"""
Verifies that the conductor setup scan properly iterates through the conductor directory,
counts files and lines, and identifies active tracks.
Verifies that the conductor setup scan properly iterates through the conductor directory,
counts files and lines, and identifies active tracks.
"""
old_cwd = os.getcwd()
os.chdir(tmp_path)
@@ -60,8 +62,9 @@ def test_conductor_setup_scan(app_instance, tmp_path, monkeypatch):
def test_create_track(app_instance, tmp_path):
"""
Verifies that _cb_create_track properly creates the track folder
and populates the necessary boilerplate files (spec.md, plan.md, metadata.json).
Verifies that _cb_create_track properly creates the track folder
and populates the necessary boilerplate files (spec.md, plan.md, metadata.json).
"""
old_cwd = os.getcwd()
os.chdir(tmp_path)
@@ -89,4 +92,3 @@ def test_create_track(app_instance, tmp_path):
assert data['id'] == track_dir.name
finally:
os.chdir(old_cwd)
+3 -2
View File
@@ -2,7 +2,8 @@ import time
def test_gui_startup_smoke(live_gui):
"""
Smoke test to ensure the GUI starts and remains running.
Smoke test to ensure the GUI starts and remains running.
"""
proc, _ = live_gui
@@ -13,4 +14,4 @@ def test_gui_startup_smoke(live_gui):
time.sleep(2)
# Verify it's still running after 2 seconds
assert proc.poll() is None, "GUI process crashed within 2 seconds of startup"
assert proc.poll() is None, "GUI process crashed within 2 seconds of startup"
+1 -1
View File
@@ -53,4 +53,4 @@ def test_render_synthesis_panel(app_instance):
mock_imgui.input_text_multiline.assert_called_with("##synthesis_prompt", app_instance.ui_synthesis_prompt, ANY)
# 3. Assert imgui.button is called for 'Generate Synthesis'
mock_imgui.button.assert_any_call("Generate Synthesis")
mock_imgui.button.assert_any_call("Generate Synthesis")
+3 -2
View File
@@ -4,7 +4,8 @@ from src.api_hook_client import ApiHookClient
def test_text_viewer_state_update(live_gui) -> None:
"""
Verifies that we can set text viewer state and it is reflected in GUI state.
Verifies that we can set text viewer state and it is reflected in GUI state.
"""
client = ApiHookClient()
label = "Test Viewer Label"
@@ -21,4 +22,4 @@ def test_text_viewer_state_update(live_gui) -> None:
assert state is not None
assert state.get('show_text_viewer') == True
assert state.get('text_viewer_title') == label
assert state.get('text_viewer_type') == text_type
assert state.get('text_viewer_type') == text_type
+13 -10
View File
@@ -16,9 +16,10 @@ from src.gui_2 import App
def test_telemetry_data_updates_correctly(app_instance: Any) -> None:
"""
Tests that the _refresh_api_metrics method correctly updates
the internal state for display by querying the ai_client.
Verifies the boundary between GUI state and API state.
Tests that the _refresh_api_metrics method correctly updates
the internal state for display by querying the ai_client.
Verifies the boundary between GUI state and API state.
"""
# 1. Set the provider to anthropic
app_instance._current_provider = "anthropic"
@@ -41,9 +42,10 @@ def test_telemetry_data_updates_correctly(app_instance: Any) -> None:
def test_performance_history_updates(app_instance: Any) -> None:
"""
Verify the data structure that feeds the sparkline.
This ensures that the rolling buffer for performance telemetry maintains
the correct size and default initialization to prevent GUI rendering crashes.
Verify the data structure that feeds the sparkline.
This ensures that the rolling buffer for performance telemetry maintains
the correct size and default initialization to prevent GUI rendering crashes.
"""
# ANTI-SIMPLIFICATION: Verifying exactly 100 elements ensures the sparkline won't overflow
assert len(app_instance.perf_history["frame_time"]) == 100
@@ -51,9 +53,10 @@ def test_performance_history_updates(app_instance: Any) -> None:
def test_gui_updates_on_event(app_instance: App) -> None:
"""
Verifies that when an API event is received (e.g. from ai_client),
the _on_api_event handler correctly updates internal metrics and
queues the update to be processed by the GUI event loop.
Verifies that when an API event is received (e.g. from ai_client),
the _on_api_event handler correctly updates internal metrics and
queues the update to be processed by the GUI event loop.
"""
mock_stats = {"percentage": 50.0, "current": 500, "limit": 1000}
app_instance.last_md = "mock_md"
@@ -75,4 +78,4 @@ def test_gui_updates_on_event(app_instance: App) -> None:
app_instance._process_pending_gui_tasks()
# ANTI-SIMPLIFICATION: This assertion proves that the event pipeline
# successfully transmitted state from the background thread to the GUI state.
assert app_instance._token_stats["percentage"] == 50.0
assert app_instance._token_stats["percentage"] == 50.0
+6 -5
View File
@@ -5,10 +5,11 @@ from src.api_hook_client import ApiHookClient
@pytest.mark.asyncio
async def test_mma_track_lifecycle_simulation():
"""
This test simulates the sequence of API calls an external orchestrator
would make to manage an MMA track lifecycle via the Hook API.
It verifies that ApiHookClient correctly routes requests to the
corresponding endpoints in src/api_hooks.py.
This test simulates the sequence of API calls an external orchestrator
would make to manage an MMA track lifecycle via the Hook API.
It verifies that ApiHookClient correctly routes requests to the
corresponding endpoints in src/api_hooks.py.
"""
client = ApiHookClient("http://localhost:8999")
@@ -114,4 +115,4 @@ async def test_mma_track_lifecycle_simulation():
if __name__ == "__main__":
import asyncio
asyncio.run(test_mma_track_lifecycle_simulation())
asyncio.run(test_mma_track_lifecycle_simulation())
+11 -9
View File
@@ -9,11 +9,12 @@ from src import ai_client
@pytest.mark.asyncio
async def test_headless_verification_full_run(vlogger) -> None:
"""
1. Initialize a ConductorEngine with a Track containing multiple dependent Tickets.
2. Simulate a full execution run using engine.run().
3. Mock ai_client.send to simulate successful tool calls and final responses.
4. Specifically verify that 'Context Amnesia' is maintained.
"""
1. Initialize a ConductorEngine with a Track containing multiple dependent Tickets.
2. Simulate a full execution run using engine.run().
3. Mock ai_client.send to simulate successful tool calls and final responses.
4. Specifically verify that 'Context Amnesia' is maintained.
"""
t1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
t2 = Ticket(id="T2", description="Task 2", status="todo", assigned_to="worker1", depends_on=["T1"])
track = Track(id="track_verify", description="Verification Track", tickets=[t1, t2])
@@ -47,9 +48,10 @@ async def test_headless_verification_full_run(vlogger) -> None:
@pytest.mark.asyncio
async def test_headless_verification_error_and_qa_interceptor(vlogger) -> None:
"""
5. Simulate a shell error and verify that the Tier 4 QA interceptor is triggered
and its summary is injected into the worker's history for the next retry.
"""
5. Simulate a shell error and verify that the Tier 4 QA interceptor is triggered
and its summary is injected into the worker's history for the next retry.
"""
t1 = Ticket(id="T1", description="Task with error", status="todo", assigned_to="worker1")
track = Track(id="track_error", description="Error Track", tickets=[t1])
from src.events import SyncEventQueue
@@ -140,4 +142,4 @@ async def test_headless_verification_error_and_qa_interceptor(vlogger) -> None:
if "QA ANALYSIS:" in part_str and "FIX: Check if path exists." in part_str:
found_qa = True
assert found_qa, "QA Analysis was not injected into the next round"
vlogger.finalize("Tier 4 QA Injection", "PASS", "QA summary injected into next worker round.")
vlogger.finalize("Tier 4 QA Injection", "PASS", "QA summary injected into next worker round.")
+1 -1
View File
@@ -130,4 +130,4 @@ def test_get_history_bleed_stats_basic() -> None:
assert "current" in stats, "Stats dictionary should contain 'current' token usage"
assert 'limit' in stats, "Stats dictionary should contain 'limit'"
assert stats['limit'] == 500
assert isinstance(stats['current'], int) and stats['current'] >= 0
assert isinstance(stats['current'], int) and stats['current'] >= 0
+1 -1
View File
@@ -54,4 +54,4 @@ def test_live_hook_server_responses(live_gui) -> None:
# 4. Performance
# diagnostics are available via get_gui_diagnostics or get_gui_state
perf = client.get_gui_diagnostics() if hasattr(client, 'get_gui_diagnostics') else client.get_gui_state()
assert "fps" in perf or "thinking" in perf
assert "fps" in perf or "thinking" in perf
+10 -6
View File
@@ -11,8 +11,9 @@ from src.gui_2 import App
def test_new_hubs_defined_in_show_windows(mock_app: App) -> None:
"""
Verifies that the new consolidated Hub windows are defined in the App's show_windows.
This ensures they will be available in the 'Windows' menu.
Verifies that the new consolidated Hub windows are defined in the App's show_windows.
This ensures they will be available in the 'Windows' menu.
"""
expected_hubs = [
"Project Settings",
@@ -25,7 +26,8 @@ def test_new_hubs_defined_in_show_windows(mock_app: App) -> None:
def test_old_windows_removed_from_gui2(app_instance_simple: Any) -> None:
"""
Verifies that the old fragmented windows are removed or renamed.
Verifies that the old fragmented windows are removed or renamed.
"""
old_tags = [
"win_projects", "win_files", "win_screenshots",
@@ -51,7 +53,8 @@ def app_instance_simple() -> Any:
def test_hub_windows_exist_in_gui2(app_instance_simple: Any) -> None:
"""
Verifies that the new Hub windows are present in the show_windows dictionary.
Verifies that the new Hub windows are present in the show_windows dictionary.
"""
hubs = ["Project Settings", "AI Settings", "Discussion Hub", "Operations Hub"]
for hub in hubs:
@@ -59,7 +62,8 @@ def test_hub_windows_exist_in_gui2(app_instance_simple: Any) -> None:
def test_indicators_logic_exists(app_instance_simple: Any) -> None:
"""
Verifies that the status indicators logic exists in the App.
Verifies that the status indicators logic exists in the App.
"""
assert hasattr(app_instance_simple, 'ai_status')
assert hasattr(app_instance_simple, 'mma_status')
assert hasattr(app_instance_simple, 'mma_status')
+10 -8
View File
@@ -13,11 +13,12 @@ from src.api_hook_client import ApiHookClient
@pytest.mark.timeout(10)
def test_user_request_integration_flow(mock_app: App) -> None:
"""
Verifies that pushing a UserRequestEvent to the event_queue:
1. Triggers ai_client.send
2. Results in a 'response' event back to the queue
3. Eventually updates the UI state (ai_response, ai_status) after processing GUI tasks.
ANTI-SIMPLIFICATION: This verifies the full cross-thread boundary.
Verifies that pushing a UserRequestEvent to the event_queue:
1. Triggers ai_client.send
2. Results in a 'response' event back to the queue
3. Eventually updates the UI state (ai_response, ai_status) after processing GUI tasks.
ANTI-SIMPLIFICATION: This verifies the full cross-thread boundary.
"""
app = mock_app
# Mock all ai_client methods called during _handle_request_event
@@ -74,8 +75,9 @@ def test_user_request_integration_flow(mock_app: App) -> None:
@pytest.mark.timeout(10)
def test_user_request_error_handling(mock_app: App) -> None:
"""
Verifies that if ai_client.send raises an exception, the UI is updated with the error state.
"""
Verifies that if ai_client.send raises an exception, the UI is updated with the error state.
"""
app = mock_app
with (
patch('src.ai_client.send', side_effect=Exception("API Failure")),
@@ -129,4 +131,4 @@ def test_api_gui_state_live(live_gui) -> None:
assert success, f"GUI state did not update. Got: {client.get_gui_state()}"
final_state = client.get_gui_state()
assert final_state['current_provider'] == 'anthropic'
assert final_state['current_model'] == 'claude-3-haiku-20240307'
assert final_state['current_model'] == 'claude-3-haiku-20240307'
+7 -5
View File
@@ -15,7 +15,8 @@ from src.api_hook_client import ApiHookClient
def wait_for_value(client, field, expected, timeout=10):
"""
Helper to poll the GUI state until a field matches the expected value.
Helper to poll the GUI state until a field matches the expected value.
"""
start = time.time()
while time.time() - start < timeout:
@@ -29,9 +30,10 @@ def wait_for_value(client, field, expected, timeout=10):
@pytest.mark.integration
def test_full_live_workflow(live_gui) -> None:
"""
Integration test that drives the GUI through a full workflow.
ANTI-SIMPLIFICATION: Asserts exact AI behavior, thinking state tracking,
and response logging in discussion history.
Integration test that drives the GUI through a full workflow.
ANTI-SIMPLIFICATION: Asserts exact AI behavior, thinking state tracking,
and response logging in discussion history.
"""
client = ApiHookClient()
assert client.wait_for_server(timeout=10)
@@ -150,4 +152,4 @@ def test_full_live_workflow(live_gui) -> None:
entries = session.get('session', {}).get('entries', [])
print(f" New discussion history length: {len(entries)}")
assert len(entries) == 0
print("[TEST] Workflow completed successfully.")
print("[TEST] Workflow completed successfully.")
+1 -1
View File
@@ -151,4 +151,4 @@ class TestLogRegistry(unittest.TestCase):
self.assertIn(session_id_old_nw, all_found_session_ids)
self.assertIn(session_id_old_nw_incomplete, all_found_session_ids)
self.assertIn(session_id_recent_nw, all_found_session_ids)
self.assertNotIn(session_id_old_w, all_found_session_ids)
self.assertNotIn(session_id_old_w, all_found_session_ids)
+1 -1
View File
@@ -82,4 +82,4 @@ def test_append_tool_log_dict_keys(app_instance) -> None:
assert key in entry, f"key '{key}' missing from tool log entry: {entry}"
assert entry["script"] == "pwd"
assert entry["result"] == "/projects"
assert entry["source_tier"] is None
assert entry["source_tier"] is None
+1 -1
View File
@@ -86,4 +86,4 @@ def test_comms_log_filter_not_applied_for_prior_session(app_instance):
log_to_render = app.prior_session_entries if app.is_viewing_prior_session else list(app._comms_log)
if app.ui_focus_agent and not app.is_viewing_prior_session:
log_to_render = [e for e in log_to_render if e.get("source_tier") == app.ui_focus_agent]
assert len(log_to_render) == 2
assert len(log_to_render) == 2
+7 -1
View File
@@ -5,6 +5,9 @@ from src.gui_2 import App
def _make_app(**kwargs):
"""
[C: tests/test_mma_dashboard_streams.py:TestMMADashboardStreams.test_tier1_renders_stream_content, tests/test_mma_dashboard_streams.py:TestMMADashboardStreams.test_tier3_renders_worker_subheaders]
"""
app = MagicMock()
app.mma_streams = kwargs.get("mma_streams", {})
app.mma_tier_usage = kwargs.get("mma_tier_usage", {
@@ -61,6 +64,9 @@ def _make_app(**kwargs):
return app
def _make_imgui_mock():
"""
[C: tests/test_mma_dashboard_streams.py:TestMMADashboardStreams.test_tier1_renders_stream_content, tests/test_mma_dashboard_streams.py:TestMMADashboardStreams.test_tier3_renders_worker_subheaders]
"""
m = MagicMock()
m.begin_table.return_value = False
m.begin_child.return_value = False
@@ -136,4 +142,4 @@ class TestMMAApprovalIndicators:
combined = _collect_text_colored_args(imgui_mock)
assert "APPROVAL PENDING" in combined, (
"text_colored not called with 'APPROVAL PENDING' when _pending_ask_dialog is True"
)
)
+9 -5
View File
@@ -10,7 +10,10 @@ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "s
from src import api_hook_client
def _poll_mma_status(client, timeout, condition, label):
"""Poll get_mma_status() until condition(status) is True or timeout."""
"""
Poll get_mma_status() until condition(status) is True or timeout.
[C: tests/test_mma_step_mode_sim.py:test_mma_step_mode_approval_flow]
"""
last_status = {}
for i in range(timeout):
status = client.get_mma_status() or {}
@@ -24,9 +27,10 @@ def _poll_mma_status(client, timeout, condition, label):
@pytest.mark.timeout(300)
def test_mma_concurrent_tracks_execution(live_gui) -> None:
"""
Stress test for concurrent MMA track execution.
Verifies that starting multiple tracks simultaneously doesn't cause crashes
and that workers from both tracks are processed.
Stress test for concurrent MMA track execution.
Verifies that starting multiple tracks simultaneously doesn't cause crashes
and that workers from both tracks are processed.
"""
client = api_hook_client.ApiHookClient()
assert client.wait_for_server(timeout=15), "Hook server did not start"
@@ -132,4 +136,4 @@ def test_mma_concurrent_tracks_execution(live_gui) -> None:
assert status is not None
assert status.get('mma_status') in ['done', 'idle', 'running']
print("[SIM] Concurrent MMA tracks stress test PASSED.")
print("[SIM] Concurrent MMA tracks stress test PASSED.")
@@ -24,9 +24,10 @@ def _poll_mma_workers(client: api_hook_client.ApiHookClient, timeout: int, condi
@pytest.mark.timeout(600)
def test_mma_concurrent_tracks_stress(live_gui) -> None:
"""
Stress test: Start two tracks concurrently and verify they both progress
without crashing the GUI or losing state.
"""
Stress test: Start two tracks concurrently and verify they both progress
without crashing the GUI or losing state.
"""
client = api_hook_client.ApiHookClient()
assert client.wait_for_server(timeout=15), "Hook server did not start"
@@ -95,4 +96,4 @@ def test_mma_concurrent_tracks_stress(live_gui) -> None:
res = client.get_status()
assert res.get('status') == 'ok', "GUI crashed during concurrent execution"
print("[SIM] MMA Concurrent Tracks stress test PASSED.")
print("[SIM] MMA Concurrent Tracks stress test PASSED.")
+1 -1
View File
@@ -81,4 +81,4 @@ class TestMMADashboardStreams:
App._render_tier_stream_panel(app, "Tier 3", None)
text_args = " ".join(str(c) for c in imgui_mock.text.call_args_list)
assert "T-001" in text_args, "imgui.text not called with 'T-001' worker sub-header"
assert "T-002" in text_args, "imgui.text not called with 'T-002' worker sub-header"
assert "T-002" in text_args, "imgui.text not called with 'T-002' worker sub-header"
+36 -27
View File
@@ -2,9 +2,10 @@ from src.models import Ticket, Track, WorkerContext
def test_ticket_instantiation() -> None:
"""
Verifies that a Ticket can be instantiated with its required fields:
id, description, status, assigned_to.
"""
Verifies that a Ticket can be instantiated with its required fields:
id, description, status, assigned_to.
"""
ticket_id = "T1"
description = "Implement surgical code changes"
status = "todo"
@@ -23,8 +24,9 @@ def test_ticket_instantiation() -> None:
def test_ticket_with_dependencies() -> None:
"""
Verifies that a Ticket can store dependencies.
"""
Verifies that a Ticket can store dependencies.
"""
ticket = Ticket(
id="T2",
description="Write code",
@@ -36,9 +38,10 @@ def test_ticket_with_dependencies() -> None:
def test_track_instantiation() -> None:
"""
Verifies that a Track can be instantiated with its required fields:
id, description, and a list of Tickets.
"""
Verifies that a Track can be instantiated with its required fields:
id, description, and a list of Tickets.
"""
ticket1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="a")
ticket2 = Ticket(id="T2", description="Task 2", status="todo", assigned_to="b")
track_id = "TRACK-1"
@@ -57,16 +60,18 @@ def test_track_instantiation() -> None:
def test_track_can_handle_empty_tickets() -> None:
"""
Verifies that a Track can be instantiated with an empty list of tickets.
"""
Verifies that a Track can be instantiated with an empty list of tickets.
"""
track = Track(id="TRACK-2", description="Empty Track", tickets=[])
assert track.tickets == []
def test_worker_context_instantiation() -> None:
"""
Verifies that a WorkerContext can be instantiated with ticket_id,
model_name, and messages.
"""
Verifies that a WorkerContext can be instantiated with ticket_id,
model_name, and messages.
"""
ticket_id = "T1"
model_name = "gemini-2.0-flash-lite"
messages = [
@@ -84,26 +89,29 @@ def test_worker_context_instantiation() -> None:
def test_ticket_mark_blocked() -> None:
"""
Verifies that ticket.mark_blocked(reason) sets the status to 'blocked'.
Note: The reason field might need to be added to the Ticket class.
"""
Verifies that ticket.mark_blocked(reason) sets the status to 'blocked'.
Note: The reason field might need to be added to the Ticket class.
"""
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="a")
ticket.mark_blocked("Waiting for API key")
assert ticket.status == "blocked"
def test_ticket_mark_complete() -> None:
"""
Verifies that ticket.mark_complete() sets the status to 'completed'.
"""
Verifies that ticket.mark_complete() sets the status to 'completed'.
"""
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="a")
ticket.mark_complete()
assert ticket.status == "completed"
def test_track_get_executable_tickets() -> None:
"""
Verifies that track.get_executable_tickets() returns only 'todo' tickets
whose dependencies are all 'completed'.
"""
Verifies that track.get_executable_tickets() returns only 'todo' tickets
whose dependencies are all 'completed'.
"""
# T1: todo, no deps -> executable
t1 = Ticket(id="T1", description="T1", status="todo", assigned_to="a")
# T2: todo, deps [T1] -> not executable (T1 is todo)
@@ -125,11 +133,12 @@ def test_track_get_executable_tickets() -> None:
def test_track_get_executable_tickets_complex() -> None:
"""
Verifies executable tickets with complex dependency chains.
Chain: T1 (comp) -> T2 (todo) -> T3 (todo)
T4 (comp) -> T3
T5 (todo) -> T3
"""
Verifies executable tickets with complex dependency chains.
Chain: T1 (comp) -> T2 (todo) -> T3 (todo)
T4 (comp) -> T3
T5 (todo) -> T3
"""
t1 = Ticket(id="T1", description="T1", status="completed", assigned_to="a")
t2 = Ticket(id="T2", description="T2", status="todo", assigned_to="a", depends_on=["T1"])
t3 = Ticket(id="T3", description="T3", status="todo", assigned_to="a", depends_on=["T2", "T4", "T5"])
@@ -156,4 +165,4 @@ def test_track_get_executable_tickets_complex() -> None:
# Now T3 should be executable
executable = track.get_executable_tickets()
executable_ids = sorted([t.id for t in executable])
assert executable_ids == ["T3"]
assert executable_ids == ["T3"]
+1 -1
View File
@@ -48,4 +48,4 @@ def test_link_id_stability():
link_id = abs(hash(source_tid + "_" + target_tid))
assert link_id == abs(hash(source_tid + "_" + target_tid))
assert link_id != abs(hash(target_tid + "_" + source_tid))
assert link_id != abs(hash(target_tid + "_" + source_tid))
+1 -1
View File
@@ -111,4 +111,4 @@ def test_handle_ai_response_fallback(app_instance: App) -> None:
app_instance._process_pending_gui_tasks()
assert app_instance.ai_response == "Regular AI Response"
assert app_instance.ai_status == "done"
assert len(app_instance.mma_streams) == 0
assert len(app_instance.mma_streams) == 0
+4 -3
View File
@@ -23,8 +23,9 @@ def _poll_mma_status(client: api_hook_client.ApiHookClient, timeout: int, condit
@pytest.mark.timeout(300)
def test_mma_step_mode_approval_flow(live_gui) -> None:
"""
Verify that we can manually approve a ticket in Step Mode and it proceeds.
"""
Verify that we can manually approve a ticket in Step Mode and it proceeds.
"""
client = api_hook_client.ApiHookClient()
assert client.wait_for_server(timeout=15), "Hook server did not start"
@@ -66,4 +67,4 @@ def test_mma_step_mode_approval_flow(live_gui) -> None:
condition=lambda s: any(t['id'] == tid and t['status'] == 'in_progress' for t in s.get('active_tickets', [])))
assert ok, "Ticket did not move to in_progress after manual approval/mutation"
print("[SIM] MMA Step Mode approval flow test PASSED.")
print("[SIM] MMA Step Mode approval flow test PASSED.")
+8 -6
View File
@@ -12,9 +12,10 @@ from src import api_hook_client
@pytest.mark.timeout(120)
def test_patch_modal_appears_on_trigger(live_gui) -> None:
"""
Test that triggering a patch shows the modal in the GUI.
Uses live_gui fixture to start the GUI with test hooks enabled.
"""
Test that triggering a patch shows the modal in the GUI.
Uses live_gui fixture to start the GUI with test hooks enabled.
"""
proc, _ = live_gui
client = api_hook_client.ApiHookClient()
@@ -49,8 +50,9 @@ def test_patch_modal_appears_on_trigger(live_gui) -> None:
@pytest.mark.timeout(120)
def test_patch_apply_modal_workflow(live_gui) -> None:
"""
Test the full patch apply workflow: trigger -> apply -> verify modal closes.
"""
Test the full patch apply workflow: trigger -> apply -> verify modal closes.
"""
proc, _ = live_gui
client = api_hook_client.ApiHookClient()
@@ -77,4 +79,4 @@ def test_patch_apply_modal_workflow(live_gui) -> None:
time.sleep(1)
status = client.get_gui_state()
assert status.get("_show_patch_modal") == False, "Patch modal should close after hide"
assert status.get("_show_patch_modal") == False, "Patch modal should close after hide"
+1 -1
View File
@@ -125,4 +125,4 @@ system_prompt = "both_p"
pm.delete_preset("project1", scope="project")
presets = pm.load_all()
assert "project1" not in presets
assert "both" in presets # still in global
assert "both" in presets # still in global
+1 -1
View File
@@ -43,4 +43,4 @@ def test_api_hook_under_load(live_gui):
for res in results:
assert res is not None
assert res.get("status") == "ok"
assert res.get("status") == "ok"
+1 -1
View File
@@ -74,4 +74,4 @@ class TestPresetManager(unittest.TestCase):
self.assertEqual(manager.project_path, new_root / "project_presets.toml")
if __name__ == "__main__":
unittest.main()
unittest.main()
+1 -1
View File
@@ -87,4 +87,4 @@ roles = ["User", "AI"]
self.assertIn("Context", proj["discussion"]["roles"])
if __name__ == "__main__":
unittest.main()
unittest.main()
+1 -1
View File
@@ -5,4 +5,4 @@ def test_providers_moved_to_models():
"""Verify that PROVIDERS list is in models.py and removed from AppController."""
expected_providers = ['gemini', 'anthropic', 'gemini_cli', 'deepseek', 'minimax']
assert models.PROVIDERS == expected_providers
assert not hasattr(src.app_controller.AppController, 'PROVIDERS')
assert not hasattr(src.app_controller.AppController, 'PROVIDERS')
+1 -1
View File
@@ -12,4 +12,4 @@ def test_rag_panel_integration():
import inspect
source = inspect.getsource(App._gui_func)
assert "self._render_rag_panel()" in source
assert "imgui.collapsing_header(\"RAG Settings\")" in source
assert "imgui.collapsing_header(\"RAG Settings\")" in source
+3 -2
View File
@@ -24,7 +24,8 @@ def mock_project():
def test_rag_integration(mock_project):
"""
Integration test verifying the flow from AppController through RAGEngine to ai_client.
Integration test verifying the flow from AppController through RAGEngine to ai_client.
"""
# 1. Initializes a mock project and AppController.
# We patch several components to avoid side effects during initialization.
@@ -108,4 +109,4 @@ def test_rag_integration(mock_project):
assert "Source: test_file.py" in sent_user_message
# Verify that rag_engine.search was called with the original prompt
mock_rag_engine.search.assert_called_once_with("Tell me about the code.")
mock_rag_engine.search.assert_called_once_with("Tell me about the code.")
+4 -3
View File
@@ -9,8 +9,9 @@ from src.models import Ticket, WorkerContext
class TestRunWorkerLifecycleAbort(unittest.TestCase):
def test_run_worker_lifecycle_returns_early_on_abort(self):
"""
Test that run_worker_lifecycle returns early and marks ticket as 'killed'
if the abort event is set for the ticket.
Test that run_worker_lifecycle returns early and marks ticket as 'killed'
if the abort event is set for the ticket.
"""
# Mock ai_client.send
with patch('src.ai_client.send') as mock_send:
@@ -37,4 +38,4 @@ class TestRunWorkerLifecycleAbort(unittest.TestCase):
mock_send.assert_not_called()
if __name__ == "__main__":
unittest.main()
unittest.main()
+4 -3
View File
@@ -12,8 +12,9 @@ from src.gui_2 import App
@pytest.mark.integration
def test_selectable_label_stability(live_gui) -> None:
"""
Verifies that the application starts correctly with --enable-test-hooks
and that the selectable label infrastructure is present and stable.
Verifies that the application starts correctly with --enable-test-hooks
and that the selectable label infrastructure is present and stable.
"""
client = ApiHookClient()
assert client.wait_for_server(timeout=20), "Hook server failed to start"
@@ -44,4 +45,4 @@ def test_selectable_label_stability(live_gui) -> None:
# 5. Verify prior session indicator specifically via the gettable field
# prior_session_indicator is mapped to AppController.is_viewing_prior_session
prior_val = client.get_value("prior_session_indicator")
assert prior_val is False, "prior_session_indicator field should be False initially"
assert prior_val is False, "prior_session_indicator field should be False initially"
+4 -3
View File
@@ -15,8 +15,9 @@ from simulation.sim_ai_settings import AISettingsSimulation
def test_ai_settings_simulation_run() -> None:
"""
Verifies that AISettingsSimulation correctly cycles through models
to test the settings UI components.
Verifies that AISettingsSimulation correctly cycles through models
to test the settings UI components.
"""
mock_client = MagicMock()
mock_client.wait_for_server.return_value = True
@@ -42,4 +43,4 @@ def test_ai_settings_simulation_run() -> None:
# Verify calls
# ANTI-SIMPLIFICATION: Assert that specific models were set during simulation
mock_client.set_value.assert_any_call("current_model", "gemini-2.0-flash")
mock_client.set_value.assert_any_call("current_model", "gemini-2.5-flash-lite")
mock_client.set_value.assert_any_call("current_model", "gemini-2.5-flash-lite")
+6 -4
View File
@@ -15,7 +15,8 @@ from simulation.sim_base import BaseSimulation
def test_base_simulation_init() -> None:
"""
Verifies that the BaseSimulation initializes the ApiHookClient correctly.
Verifies that the BaseSimulation initializes the ApiHookClient correctly.
"""
with patch('simulation.sim_base.ApiHookClient') as mock_client_class:
mock_client = MagicMock()
@@ -27,8 +28,9 @@ def test_base_simulation_init() -> None:
def test_base_simulation_setup() -> None:
"""
Verifies that the setup routine correctly resets the GUI state
and initializes a clean temporary project for simulation.
Verifies that the setup routine correctly resets the GUI state
and initializes a clean temporary project for simulation.
"""
mock_client = MagicMock()
mock_client.wait_for_server.return_value = True
@@ -43,4 +45,4 @@ def test_base_simulation_setup() -> None:
mock_client.click.assert_any_call("btn_reset")
mock_sim.setup_new_project.assert_called()
from pathlib import Path
assert Path(sim.project_path).as_posix().endswith("tests/artifacts/temp_testsim.toml")
assert Path(sim.project_path).as_posix().endswith("tests/artifacts/temp_testsim.toml")
+4 -3
View File
@@ -15,8 +15,9 @@ from simulation.sim_context import ContextSimulation
def test_context_simulation_run() -> None:
"""
Verifies that the ContextSimulation runs the correct sequence of user actions:
discussion switching, context building (md_only), and history truncation.
Verifies that the ContextSimulation runs the correct sequence of user actions:
discussion switching, context building (md_only), and history truncation.
"""
mock_client = MagicMock()
mock_client.wait_for_server.return_value = True
@@ -52,4 +53,4 @@ def test_context_simulation_run() -> None:
mock_client.post_project.assert_called()
mock_client.click.assert_called_with("btn_md_only")
mock_sim.run_discussion_turn.assert_called()
mock_sim.truncate_history.assert_called_with(1)
mock_sim.truncate_history.assert_called_with(1)
+4 -3
View File
@@ -15,8 +15,9 @@ from simulation.sim_execution import ExecutionSimulation
def test_execution_simulation_run() -> None:
"""
Verifies that ExecutionSimulation handles script confirmation modals.
Ensures that it waits for the modal and clicks the approve button.
Verifies that ExecutionSimulation handles script confirmation modals.
Ensures that it waits for the modal and clicks the approve button.
"""
mock_client = MagicMock()
mock_client.wait_for_server.return_value = True
@@ -52,4 +53,4 @@ def test_execution_simulation_run() -> None:
# Verify calls
# ANTI-SIMPLIFICATION: Assert that the async discussion and the script approval button are triggered.
mock_sim.run_discussion_turn_async.assert_called()
mock_client.click.assert_called_with("btn_approve_script")
mock_client.click.assert_called_with("btn_approve_script")
+4 -3
View File
@@ -15,8 +15,9 @@ from simulation.sim_tools import ToolsSimulation
def test_tools_simulation_run() -> None:
"""
Verifies that ToolsSimulation requests specific tool executions
and verifies they appear in the resulting session history.
Verifies that ToolsSimulation requests specific tool executions
and verifies they appear in the resulting session history.
"""
mock_client = MagicMock()
mock_client.wait_for_server.return_value = True
@@ -39,4 +40,4 @@ def test_tools_simulation_run() -> None:
# Verify calls
# ANTI-SIMPLIFICATION: Must assert the specific commands were tested
mock_sim.run_discussion_turn.assert_any_call("List the files in the current directory.")
mock_sim.run_discussion_turn.assert_any_call("Read the first 10 lines of aggregate.py.")
mock_sim.run_discussion_turn.assert_any_call("Read the first 10 lines of aggregate.py.")
+1 -1
View File
@@ -28,4 +28,4 @@ def test_sync_event_queue_none_payload() -> None:
queue.put("no_payload")
name, payload = queue.get()
assert name == "no_payload"
assert payload is None
assert payload is None
+1 -1
View File
@@ -32,4 +32,4 @@ def test_api_ask_client_error() -> None:
with patch.object(client, '_make_request') as mock_make:
mock_make.return_value = None
result = client.request_confirmation("run_powershell", {})
assert result is None
assert result is None
+9 -8
View File
@@ -11,13 +11,14 @@ from src import ai_client
def test_system_prompt_sim(live_gui):
"""
Simulation test for system prompt settings.
1. Wait for server.
2. Verify initial state.
3. Modify settings via API.
4. Verify updates.
5. Use 'Reset to Default' button via API.
6. Verify restoration to default text.
Simulation test for system prompt settings.
1. Wait for server.
2. Verify initial state.
3. Modify settings via API.
4. Verify updates.
5. Use 'Reset to Default' button via API.
6. Verify restoration to default text.
"""
_, gui_script = live_gui
client = ApiHookClient()
@@ -70,4 +71,4 @@ def test_system_prompt_sim(live_gui):
# Close it
client.set_value('show_base_prompt_diff_modal', False)
assert client.get_value('show_base_prompt_diff_modal') is False
assert client.get_value('show_base_prompt_diff_modal') is False
+5 -4
View File
@@ -61,9 +61,10 @@ def test_run_powershell_optional_qa_callback() -> None:
assert "EXIT CODE: 1" in result
def test_end_to_end_tier4_integration(vlogger) -> None:
"""1. Start a task that triggers a tool failure.
2. Ensure Tier 4 QA analysis is run.
3. Verify the analysis is merged into the next turn's prompt.
"""
1. Start a task that triggers a tool failure.
2. Ensure Tier 4 QA analysis is run.
3. Verify the analysis is merged into the next turn's prompt.
"""
# Trigger a send that results in a tool failure
# (In reality, the tool loop handles this)
@@ -131,4 +132,4 @@ def test_gemini_provider_passes_qa_callback_to_run_script() -> None:
qa_callback=qa_callback
)
# Verify _run_script received the qa_callback and patch_callback
mock_run_script.assert_called_with("dir", ".", qa_callback, None)
mock_run_script.assert_called_with("dir", ".", qa_callback, None)
+1 -1
View File
@@ -93,4 +93,4 @@ def test_gemini_cache_fields_accessible() -> None:
def test_anthropic_history_lock_accessible() -> None:
"""_anthropic_history_lock must be accessible for cache hint rendering."""
assert hasattr(ai_client, "_anthropic_history_lock")
assert hasattr(ai_client, "_anthropic_history")
assert hasattr(ai_client, "_anthropic_history")
+1 -1
View File
@@ -34,4 +34,4 @@ def test_tool_preset_env_no_var(monkeypatch):
except Exception:
pass
mock_set_preset.assert_not_called()
mock_set_preset.assert_not_called()
+8 -7
View File
@@ -7,12 +7,13 @@ from src.project_manager import save_track_state, load_track_state
def test_track_state_persistence(tmp_path) -> None:
"""
Tests saving and loading a TrackState object to/from a TOML file.
1. Create a TrackState object with sample metadata, discussion, and tasks.
2. Call save_track_state('test_track', state, base_dir).
3. Verify that base_dir/conductor/tracks/test_track/state.toml exists.
4. Call load_track_state('test_track', base_dir) and verify it returns an identical TrackState object.
"""
Tests saving and loading a TrackState object to/from a TOML file.
1. Create a TrackState object with sample metadata, discussion, and tasks.
2. Call save_track_state('test_track', state, base_dir).
3. Verify that base_dir/conductor/tracks/test_track/state.toml exists.
4. Call load_track_state('test_track', base_dir) and verify it returns an identical TrackState object.
"""
base_dir = tmp_path
track_id = "test-track-999" # Metadata internal ID
track_folder_name = "test_track" # Folder name used in persistence
@@ -63,4 +64,4 @@ def test_track_state_persistence(tmp_path) -> None:
assert loaded_state.discussion[i]["content"] == original_state.discussion[i]["content"]
assert loaded_state.discussion[i]["ts"] == original_state.discussion[i]["ts"]
# Final check: deep equality of dataclasses
assert loaded_state == original_state
assert loaded_state == original_state
+1 -1
View File
@@ -156,4 +156,4 @@ def test_track_state_to_dict_with_none() -> None:
assert track_dict["metadata"]["updated_at"] is None # This should be None as it's passed as None
assert track_dict["discussion"][0]["ts"] is None
assert track_dict["tasks"][0]["description"] == "Task None"
assert track_dict["tasks"][0]["assigned_to"] == "anon"
assert track_dict["tasks"][0]["assigned_to"] == "anon"
+5 -4
View File
@@ -3,9 +3,10 @@ from tree_sitter import Language, Parser
def test_tree_sitter_python_setup() -> None:
"""
Verifies that tree-sitter and tree-sitter-python are correctly installed
and can parse a simple Python function string.
"""
Verifies that tree-sitter and tree-sitter-python are correctly installed
and can parse a simple Python function string.
"""
# Initialize the Python language and parser
PY_LANGUAGE = Language(tspython.language())
parser = Parser(PY_LANGUAGE)
@@ -22,4 +23,4 @@ def test_tree_sitter_python_setup() -> None:
if child.type == "function_definition":
found_function = True
break
assert found_function, "Should have found a function_definition node"
assert found_function, "Should have found a function_definition node"
+5 -4
View File
@@ -6,9 +6,10 @@ from src import api_hook_client
@pytest.mark.live
def test_visual_mma_components(live_gui):
"""
Refactored visual MMA verification using the live_gui fixture.
Ensures the MMA dashboard and tickets are correctly rendered.
"""
Refactored visual MMA verification using the live_gui fixture.
Ensures the MMA dashboard and tickets are correctly rendered.
"""
# live_gui is a tuple (process, script_name)
_, gui_script = live_gui
print(f"Testing visual MMA components on {gui_script}...")
@@ -85,4 +86,4 @@ def test_visual_mma_components(live_gui):
"action": "click",
"item": "btn_approve_mma_step"
})
time.sleep(0.5)
time.sleep(0.5)
+6 -5
View File
@@ -55,10 +55,11 @@ def _poll(client: api_hook_client.ApiHookClient, timeout: int, condition, label:
@pytest.mark.timeout(300)
def test_mma_complete_lifecycle(live_gui) -> None:
"""
End-to-end MMA lifecycle using real Gemini API (gemini-2.5-flash-lite).
Incorporates frame-sync sleeps and explicit state-transition waits per
simulation_hardening_20260301 spec (Issues 2 & 3).
"""
End-to-end MMA lifecycle using real Gemini API (gemini-2.5-flash-lite).
Incorporates frame-sync sleeps and explicit state-transition waits per
simulation_hardening_20260301 spec (Issues 2 & 3).
"""
client = api_hook_client.ApiHookClient()
assert client.wait_for_server(timeout=15), "Hook server did not start"
@@ -196,4 +197,4 @@ def test_mma_complete_lifecycle(live_gui) -> None:
if not ok:
print("[SIM] WARNING: mma_tier_usage Tier 3 still zero after 30s — may not be wired to hook API yet")
print("[SIM] MMA complete lifecycle simulation PASSED.")
print("[SIM] MMA complete lifecycle simulation PASSED.")
+8 -7
View File
@@ -13,12 +13,13 @@ from src import api_hook_client
@pytest.mark.integration
def test_workspace_profiles_restoration(live_gui):
"""
Verifies that workspace profiles can save and restore UI state.
1. Sets a field (ui_separate_tier1) to True.
2. Saves a workspace profile.
3. Resets the field to False.
4. Loads the workspace profile.
5. Verifies the field is restored to True.
Verifies that workspace profiles can save and restore UI state.
1. Sets a field (ui_separate_tier1) to True.
2. Saves a workspace profile.
3. Resets the field to False.
4. Loads the workspace profile.
5. Verifies the field is restored to True.
"""
client = api_hook_client.ApiHookClient()
assert client.wait_for_server(timeout=20), "Hook server did not start"
@@ -78,4 +79,4 @@ def test_workspace_profiles_restoration(live_gui):
print(f"Restored value: {restored_value}")
assert restored_value is True
print("Workspace profile restoration test PASSED.")
print("Workspace profile restoration test PASSED.")
+1 -1
View File
@@ -106,4 +106,4 @@ def test_mock_timeout(live_gui) -> None:
assert "timeout" in event["payload"]["text"].lower()
finally:
# Cleanup
client.push_event('custom_callback', {'callback': '_set_env_var', 'args': ['MOCK_MODE', 'success']})
client.push_event('custom_callback', {'callback': '_set_env_var', 'args': ['MOCK_MODE', 'success']})