""" ANTI-SIMPLIFICATION: These tests verify the core multi-agent execution engine, including dependency graph resolution, worker lifecycle, and context injection. They MUST NOT be simplified, and their assertions on exact call counts and dependency ordering are critical for preventing regressions in the orchestrator. """ import pytest from unittest.mock import MagicMock, patch from src.models import Ticket, Track, WorkerContext from src import ai_client def test_conductor_engine_initialization() -> None: """ Test that ConductorEngine can be initialized with a Track. """ track = Track(id="test_track", description="Test Track") from src.multi_agent_conductor import ConductorEngine engine = ConductorEngine(track=track, auto_queue=True) assert engine.track == track def test_conductor_engine_run_executes_tickets_in_order(monkeypatch: pytest.MonkeyPatch, vlogger) -> None: """ Test that run iterates through executable tickets and calls the worker lifecycle. """ ticket1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1") ticket2 = Ticket(id="T2", description="Task 2", status="todo", assigned_to="worker2", depends_on=["T1"]) track = Track(id="track1", description="Track 1", tickets=[ticket1, ticket2]) from src.multi_agent_conductor import ConductorEngine engine = ConductorEngine(track=track, auto_queue=True) vlogger.log_state("Ticket Count", 0, 2) vlogger.log_state("T1 Status", "todo", "todo") vlogger.log_state("T2 Status", "todo", "todo") # Mock ai_client.send using monkeypatch mock_send = MagicMock() monkeypatch.setattr(ai_client, 'send', mock_send) # We mock run_worker_lifecycle as it is expected to be in the same module with patch("src.multi_agent_conductor.run_worker_lifecycle") as mock_lifecycle: # Mocking lifecycle to mark ticket as complete so dependencies can be resolved def side_effect(ticket, context, *args, **kwargs): import threading def do_work(): import time time.sleep(0.1) ticket.mark_complete() threading.Thread(target=do_work).start() return "Success" mock_lifecycle.side_effect = side_effect engine.run(max_ticks=20) vlogger.log_state("T1 Status Final", "todo", ticket1.status) vlogger.log_state("T2 Status Final", "todo", ticket2.status) # Track.get_executable_tickets() should be called repeatedly until all are done # T1 should run first, then T2. assert mock_lifecycle.call_count == 2 assert ticket1.status == "completed" assert ticket2.status == "completed" # Verify sequence: T1 before T2 calls = mock_lifecycle.call_args_list assert calls[0][0][0].id == "T1" assert calls[1][0][0].id == "T2" vlogger.finalize("Verify dependency execution order", "PASS", "T1 executed before T2") def test_run_worker_lifecycle_calls_ai_client_send(monkeypatch: pytest.MonkeyPatch) -> None: """ Test that run_worker_lifecycle triggers the AI client and updates ticket status on success. """ ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1") context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[]) from src.multi_agent_conductor import run_worker_lifecycle # Mock ai_client.send using monkeypatch mock_send = MagicMock() monkeypatch.setattr(ai_client, 'send', mock_send) mock_send.return_value = "Task complete. I have updated the file." result = run_worker_lifecycle(ticket, context) assert result == "Task complete. I have updated the file." assert ticket.status == "completed" mock_send.assert_called_once() # Check if description was passed to send() args, kwargs = mock_send.call_args # user_message is passed as a keyword argument assert ticket.description in kwargs["user_message"] def test_run_worker_lifecycle_context_injection(monkeypatch: pytest.MonkeyPatch) -> None: """ Test that run_worker_lifecycle can take a context_files list and injects AST views into the prompt. """ ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1") context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[]) context_files = ["primary.py", "secondary.py"] from src.multi_agent_conductor import run_worker_lifecycle # Mock ai_client.send using monkeypatch mock_send = MagicMock() monkeypatch.setattr(ai_client, 'send', mock_send) # We mock ASTParser which is expected to be imported in multi_agent_conductor with patch("src.multi_agent_conductor.ASTParser") as mock_ast_parser_class, \ patch("builtins.open", new_callable=MagicMock) as mock_open: # Setup open mock to return different content for different files file_contents = { "primary.py": "def primary(): pass", "secondary.py": "def secondary(): pass" } def mock_open_side_effect(file, *args, **kwargs): content = file_contents.get(file, "") mock_file = MagicMock() mock_file.read.return_value = content mock_file.__enter__.return_value = mock_file return mock_file mock_open.side_effect = mock_open_side_effect # Setup ASTParser mock mock_ast_parser = mock_ast_parser_class.return_value mock_ast_parser.get_curated_view.return_value = "CURATED VIEW" mock_ast_parser.get_skeleton.return_value = "SKELETON VIEW" mock_send.return_value = "Success" run_worker_lifecycle(ticket, context, context_files=context_files) # Verify ASTParser calls: # First file (primary) should get curated view, others (secondary) get skeleton mock_ast_parser.get_curated_view.assert_called_once_with("def primary(): pass", path="primary.py") mock_ast_parser.get_skeleton.assert_called_once_with("def secondary(): pass", path="secondary.py") # Verify user_message contains the views _, kwargs = mock_send.call_args user_message = kwargs["user_message"] assert "CURATED VIEW" in user_message assert "SKELETON VIEW" in user_message assert "primary.py" in user_message assert "secondary.py" in user_message def test_run_worker_lifecycle_handles_blocked_response(monkeypatch: pytest.MonkeyPatch) -> None: """ Test that run_worker_lifecycle marks the ticket as blocked if the AI indicates it cannot proceed. """ ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1") context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[]) from src.multi_agent_conductor import run_worker_lifecycle # Mock ai_client.send using monkeypatch mock_send = MagicMock() monkeypatch.setattr(ai_client, 'send', mock_send) # Simulate a response indicating a block mock_send.return_value = "I am BLOCKED because I don't have enough information." run_worker_lifecycle(ticket, context) assert ticket.status == "blocked" assert "BLOCKED" in ticket.blocked_reason def test_run_worker_lifecycle_step_mode_confirmation(monkeypatch: pytest.MonkeyPatch) -> None: """ Test that run_worker_lifecycle passes confirm_execution to ai_client.send when step_mode is True. Verify that if confirm_execution is called (simulated by mocking ai_client.send to call its callback), the flow works as expected. """ ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1", step_mode=True) context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[]) from src.multi_agent_conductor import run_worker_lifecycle # Mock ai_client.send using monkeypatch mock_send = MagicMock() monkeypatch.setattr(ai_client, 'send', mock_send) # Important: confirm_spawn is called first if event_queue is present! with patch("src.multi_agent_conductor.confirm_spawn") as mock_spawn, \ patch("src.multi_agent_conductor.confirm_execution") as mock_confirm: mock_spawn.return_value = (True, "mock prompt", "mock context") mock_confirm.return_value = True def mock_send_side_effect(md_content, user_message, **kwargs): callback = kwargs.get("pre_tool_callback") if callback: # Simulate calling it with some payload callback('{"tool": "read_file", "args": {"path": "test.txt"}}') return "Success" mock_send.side_effect = mock_send_side_effect mock_event_queue = MagicMock() run_worker_lifecycle(ticket, context, event_queue=mock_event_queue) # Verify confirm_spawn was called because event_queue was present mock_spawn.assert_called_once() # Verify confirm_execution was called mock_confirm.assert_called_once() assert ticket.status == "completed" def test_run_worker_lifecycle_step_mode_rejection(monkeypatch: pytest.MonkeyPatch) -> None: """ Verify that if confirm_execution returns False, the logic (in ai_client, which we simulate here) would prevent execution. In run_worker_lifecycle, we just check if it's passed. """ ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1", step_mode=True) context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[]) from src.multi_agent_conductor import run_worker_lifecycle # Mock ai_client.send using monkeypatch mock_send = MagicMock() monkeypatch.setattr(ai_client, 'send', mock_send) with patch("src.multi_agent_conductor.confirm_spawn") as mock_spawn, \ patch("src.multi_agent_conductor.confirm_execution") as mock_confirm: mock_spawn.return_value = (True, "mock prompt", "mock context") mock_confirm.return_value = False mock_send.return_value = "Task failed because tool execution was rejected." mock_event_queue = MagicMock() run_worker_lifecycle(ticket, context, event_queue=mock_event_queue) # Verify it was passed to send args, kwargs = mock_send.call_args assert kwargs["pre_tool_callback"] is not None def test_conductor_engine_dynamic_parsing_and_execution(monkeypatch: pytest.MonkeyPatch, vlogger) -> None: """ Test that parse_json_tickets correctly populates the track and run executes them in dependency order. """ import json from src.multi_agent_conductor import ConductorEngine track = Track(id="dynamic_track", description="Dynamic Track") engine = ConductorEngine(track=track, auto_queue=True) tickets_json = json.dumps([ { "id": "T1", "description": "Initial task", "status": "todo", "assigned_to": "worker1", "depends_on": [] }, { "id": "T2", "description": "Dependent task", "status": "todo", "assigned_to": "worker2", "depends_on": ["T1"] }, { "id": "T3", "description": "Another initial task", "status": "todo", "assigned_to": "worker3", "depends_on": [] } ]) engine.parse_json_tickets(tickets_json) vlogger.log_state("Parsed Ticket Count", 0, len(engine.track.tickets)) assert len(engine.track.tickets) == 3 assert engine.track.tickets[0].id == "T1" assert engine.track.tickets[1].id == "T2" assert engine.track.tickets[2].id == "T3" # Mock ai_client.send using monkeypatch mock_send = MagicMock() monkeypatch.setattr(ai_client, 'send', mock_send) # Mock run_worker_lifecycle to mark tickets as complete with patch("src.multi_agent_conductor.run_worker_lifecycle") as mock_lifecycle: def side_effect(ticket, context, *args, **kwargs): import threading def do_work(): import time time.sleep(0.1) ticket.mark_complete() threading.Thread(target=do_work).start() return "Success" mock_lifecycle.side_effect = side_effect engine.run(max_ticks=20) assert mock_lifecycle.call_count == 3 # Verify dependency order: T1 must be called before T2 calls = [call[0][0].id for call in mock_lifecycle.call_args_list] t1_idx = calls.index("T1") t2_idx = calls.index("T2") vlogger.log_state("T1 Sequence Index", "N/A", t1_idx) vlogger.log_state("T2 Sequence Index", "N/A", t2_idx) assert t1_idx < t2_idx # T3 can be anywhere relative to T1 and T2, but T1 < T2 is mandatory assert "T3" in calls vlogger.finalize("Dynamic track parsing and dependency execution", "PASS", "Dependency chain T1 -> T2 honored.") def test_run_worker_lifecycle_pushes_response_via_queue(monkeypatch: pytest.MonkeyPatch) -> None: """ Test that run_worker_lifecycle pushes a 'response' event with the correct stream_id via _queue_put when event_queue is provided. """ ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1") context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[]) mock_event_queue = MagicMock() mock_send = MagicMock(return_value="Task complete.") monkeypatch.setattr(ai_client, 'send', mock_send) monkeypatch.setattr(ai_client, 'reset_session', MagicMock()) from src.multi_agent_conductor import run_worker_lifecycle with patch("src.multi_agent_conductor.confirm_spawn") as mock_spawn, \ patch("src.multi_agent_conductor._queue_put") as mock_queue_put: mock_spawn.return_value = (True, "prompt", "context") run_worker_lifecycle(ticket, context, event_queue=mock_event_queue) # ANTI-SIMPLIFICATION: Ensure exactly one 'response' event is put in the queue to avoid duplication loops. assert mock_queue_put.call_count >= 1 call_args = mock_queue_put.call_args_list[0][0] assert call_args[1] == "response" assert call_args[2]["stream_id"] == "Tier 3 (Worker): T1" assert call_args[2]["text"] == "Task complete." assert call_args[2]["status"] == "done" assert ticket.status == "completed" def test_run_worker_lifecycle_token_usage_from_comms_log(monkeypatch: pytest.MonkeyPatch) -> None: """ Test that run_worker_lifecycle reads token usage from the comms log and updates engine.tier_usage['Tier 3'] with real input/output token counts. """ ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1") context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[]) fake_comms = [ {"direction": "OUT", "kind": "request", "payload": {"message": "hello"}}, {"direction": "IN", "kind": "response", "payload": {"usage": {"input_tokens": 120, "output_tokens": 45}}}, ] monkeypatch.setattr(ai_client, 'send', MagicMock(return_value="Done.")) monkeypatch.setattr(ai_client, 'reset_session', MagicMock()) monkeypatch.setattr(ai_client, 'get_comms_log', MagicMock(side_effect=[ [], # baseline call (before send) fake_comms, # after-send call ])) from src.multi_agent_conductor import run_worker_lifecycle, ConductorEngine track = Track(id="test_track", description="Test") engine = ConductorEngine(track=track, auto_queue=True) with patch("src.multi_agent_conductor.confirm_spawn") as mock_spawn, \ patch("src.multi_agent_conductor._queue_put"): mock_spawn.return_value = (True, "prompt", "ctx") run_worker_lifecycle(ticket, context, event_queue=MagicMock(), engine=engine) assert engine.tier_usage["Tier 3"]["input"] == 120 assert engine.tier_usage["Tier 3"]["output"] == 45