import pytest from unittest.mock import MagicMock, patch from models import Ticket, Track, WorkerContext import ai_client import multi_agent_conductor # These tests define the expected interface for multi_agent_conductor.py # which will be implemented in the next phase of TDD. def test_conductor_engine_initialization() -> None: """ Test that ConductorEngine can be initialized with a Track. """ track = Track(id="test_track", description="Test Track") from multi_agent_conductor import ConductorEngine engine = ConductorEngine(track=track, auto_queue=True) assert engine.track == track @pytest.mark.asyncio async def test_conductor_engine_run_executes_tickets_in_order(monkeypatch: pytest.MonkeyPatch, vlogger) -> None: """ Test that run iterates through executable tickets and calls the worker lifecycle. """ ticket1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1") ticket2 = Ticket(id="T2", description="Task 2", status="todo", assigned_to="worker2", depends_on=["T1"]) track = Track(id="track1", description="Track 1", tickets=[ticket1, ticket2]) from multi_agent_conductor import ConductorEngine engine = ConductorEngine(track=track, auto_queue=True) vlogger.log_state("Ticket Count", 0, 2) vlogger.log_state("T1 Status", "todo", "todo") vlogger.log_state("T2 Status", "todo", "todo") # Mock ai_client.send using monkeypatch mock_send = MagicMock() monkeypatch.setattr(ai_client, 'send', mock_send) # We mock run_worker_lifecycle as it is expected to be in the same module with patch("multi_agent_conductor.run_worker_lifecycle") as mock_lifecycle: # Mocking lifecycle to mark ticket as complete so dependencies can be resolved def side_effect(ticket, context, *args, **kwargs): ticket.mark_complete() return "Success" mock_lifecycle.side_effect = side_effect await engine.run() vlogger.log_state("T1 Status Final", "todo", ticket1.status) vlogger.log_state("T2 Status Final", "todo", ticket2.status) # Track.get_executable_tickets() should be called repeatedly until all are done # T1 should run first, then T2. assert mock_lifecycle.call_count == 2 assert ticket1.status == "completed" assert ticket2.status == "completed" # Verify sequence: T1 before T2 calls = mock_lifecycle.call_args_list assert calls[0][0][0].id == "T1" assert calls[1][0][0].id == "T2" vlogger.finalize("Verify dependency execution order", "PASS", "T1 executed before T2") @pytest.mark.asyncio async def test_run_worker_lifecycle_calls_ai_client_send(monkeypatch: pytest.MonkeyPatch) -> None: """ Test that run_worker_lifecycle triggers the AI client and updates ticket status on success. """ ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1") context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[]) from multi_agent_conductor import run_worker_lifecycle # Mock ai_client.send using monkeypatch mock_send = MagicMock() monkeypatch.setattr(ai_client, 'send', mock_send) mock_send.return_value = "Task complete. I have updated the file." result = run_worker_lifecycle(ticket, context) assert result == "Task complete. I have updated the file." assert ticket.status == "completed" mock_send.assert_called_once() # Check if description was passed to send() args, kwargs = mock_send.call_args # user_message is passed as a keyword argument assert ticket.description in kwargs["user_message"] @pytest.mark.asyncio async def test_run_worker_lifecycle_context_injection(monkeypatch: pytest.MonkeyPatch) -> None: """ Test that run_worker_lifecycle can take a context_files list and injects AST views into the prompt. """ ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1") context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[]) context_files = ["primary.py", "secondary.py"] from multi_agent_conductor import run_worker_lifecycle # Mock ai_client.send using monkeypatch mock_send = MagicMock() monkeypatch.setattr(ai_client, 'send', mock_send) # We mock ASTParser which is expected to be imported in multi_agent_conductor with patch("multi_agent_conductor.ASTParser") as mock_ast_parser_class, \ patch("builtins.open", new_callable=MagicMock) as mock_open: # Setup open mock to return different content for different files file_contents = { "primary.py": "def primary(): pass", "secondary.py": "def secondary(): pass" } def mock_open_side_effect(file, *args, **kwargs): content = file_contents.get(file, "") mock_file = MagicMock() mock_file.read.return_value = content mock_file.__enter__.return_value = mock_file return mock_file mock_open.side_effect = mock_open_side_effect # Setup ASTParser mock mock_ast_parser = mock_ast_parser_class.return_value mock_ast_parser.get_curated_view.return_value = "CURATED VIEW" mock_ast_parser.get_skeleton.return_value = "SKELETON VIEW" mock_send.return_value = "Success" run_worker_lifecycle(ticket, context, context_files=context_files) # Verify ASTParser calls: # First file (primary) should get curated view, others (secondary) get skeleton mock_ast_parser.get_curated_view.assert_called_once_with("def primary(): pass") mock_ast_parser.get_skeleton.assert_called_once_with("def secondary(): pass") # Verify user_message contains the views _, kwargs = mock_send.call_args user_message = kwargs["user_message"] assert "CURATED VIEW" in user_message assert "SKELETON VIEW" in user_message assert "primary.py" in user_message assert "secondary.py" in user_message @pytest.mark.asyncio async def test_run_worker_lifecycle_handles_blocked_response(monkeypatch: pytest.MonkeyPatch) -> None: """ Test that run_worker_lifecycle marks the ticket as blocked if the AI indicates it cannot proceed. """ ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1") context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[]) from multi_agent_conductor import run_worker_lifecycle # Mock ai_client.send using monkeypatch mock_send = MagicMock() monkeypatch.setattr(ai_client, 'send', mock_send) # Simulate a response indicating a block mock_send.return_value = "I am BLOCKED because I don't have enough information." run_worker_lifecycle(ticket, context) assert ticket.status == "blocked" assert "BLOCKED" in ticket.blocked_reason @pytest.mark.asyncio async def test_run_worker_lifecycle_step_mode_confirmation(monkeypatch: pytest.MonkeyPatch) -> None: """ Test that run_worker_lifecycle passes confirm_execution to ai_client.send when step_mode is True. Verify that if confirm_execution is called (simulated by mocking ai_client.send to call its callback), the flow works as expected. """ ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1", step_mode=True) context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[]) from multi_agent_conductor import run_worker_lifecycle # Mock ai_client.send using monkeypatch mock_send = MagicMock() monkeypatch.setattr(ai_client, 'send', mock_send) # Important: confirm_spawn is called first if event_queue is present! with patch("multi_agent_conductor.confirm_spawn") as mock_spawn, \ patch("multi_agent_conductor.confirm_execution") as mock_confirm: mock_spawn.return_value = (True, "mock prompt", "mock context") mock_confirm.return_value = True def mock_send_side_effect(md_content, user_message, **kwargs): callback = kwargs.get("pre_tool_callback") if callback: # Simulate calling it with some payload callback('{"tool": "read_file", "args": {"path": "test.txt"}}') return "Success" mock_send.side_effect = mock_send_side_effect mock_event_queue = MagicMock() run_worker_lifecycle(ticket, context, event_queue=mock_event_queue) # Verify confirm_spawn was called because event_queue was present mock_spawn.assert_called_once() # Verify confirm_execution was called mock_confirm.assert_called_once() assert ticket.status == "completed" @pytest.mark.asyncio async def test_run_worker_lifecycle_step_mode_rejection(monkeypatch: pytest.MonkeyPatch) -> None: """ Verify that if confirm_execution returns False, the logic (in ai_client, which we simulate here) would prevent execution. In run_worker_lifecycle, we just check if it's passed. """ ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1", step_mode=True) context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[]) from multi_agent_conductor import run_worker_lifecycle # Mock ai_client.send using monkeypatch mock_send = MagicMock() monkeypatch.setattr(ai_client, 'send', mock_send) with patch("multi_agent_conductor.confirm_spawn") as mock_spawn, \ patch("multi_agent_conductor.confirm_execution") as mock_confirm: mock_spawn.return_value = (True, "mock prompt", "mock context") mock_confirm.return_value = False mock_send.return_value = "Task failed because tool execution was rejected." mock_event_queue = MagicMock() run_worker_lifecycle(ticket, context, event_queue=mock_event_queue) # Verify it was passed to send args, kwargs = mock_send.call_args assert kwargs["pre_tool_callback"] is not None @pytest.mark.asyncio async def test_conductor_engine_dynamic_parsing_and_execution(monkeypatch: pytest.MonkeyPatch, vlogger) -> None: """ Test that parse_json_tickets correctly populates the track and run executes them in dependency order. """ import json from multi_agent_conductor import ConductorEngine track = Track(id="dynamic_track", description="Dynamic Track") engine = ConductorEngine(track=track, auto_queue=True) tickets_json = json.dumps([ { "id": "T1", "description": "Initial task", "status": "todo", "assigned_to": "worker1", "depends_on": [] }, { "id": "T2", "description": "Dependent task", "status": "todo", "assigned_to": "worker2", "depends_on": ["T1"] }, { "id": "T3", "description": "Another initial task", "status": "todo", "assigned_to": "worker3", "depends_on": [] } ]) engine.parse_json_tickets(tickets_json) vlogger.log_state("Parsed Ticket Count", 0, len(engine.track.tickets)) assert len(engine.track.tickets) == 3 assert engine.track.tickets[0].id == "T1" assert engine.track.tickets[1].id == "T2" assert engine.track.tickets[2].id == "T3" # Mock ai_client.send using monkeypatch mock_send = MagicMock() monkeypatch.setattr(ai_client, 'send', mock_send) # Mock run_worker_lifecycle to mark tickets as complete with patch("multi_agent_conductor.run_worker_lifecycle") as mock_lifecycle: def side_effect(ticket, context, *args, **kwargs): ticket.mark_complete() return "Success" mock_lifecycle.side_effect = side_effect await engine.run() assert mock_lifecycle.call_count == 3 # Verify dependency order: T1 must be called before T2 calls = [call[0][0].id for call in mock_lifecycle.call_args_list] t1_idx = calls.index("T1") t2_idx = calls.index("T2") vlogger.log_state("T1 Sequence Index", "N/A", t1_idx) vlogger.log_state("T2 Sequence Index", "N/A", t2_idx) assert t1_idx < t2_idx # T3 can be anywhere relative to T1 and T2, but T1 < T2 is mandatory assert "T3" in calls vlogger.finalize("Dynamic track parsing and dependency execution", "PASS", "Dependency chain T1 -> T2 honored.") def test_run_worker_lifecycle_pushes_response_via_queue(monkeypatch: pytest.MonkeyPatch) -> None: """ Test that run_worker_lifecycle pushes a 'response' event with the correct stream_id via _queue_put when event_queue and loop are provided. """ import asyncio ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1") context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[]) mock_event_queue = MagicMock() mock_loop = MagicMock(spec=asyncio.AbstractEventLoop) mock_send = MagicMock(return_value="Task complete.") monkeypatch.setattr(ai_client, 'send', mock_send) monkeypatch.setattr(ai_client, 'reset_session', MagicMock()) from multi_agent_conductor import run_worker_lifecycle with patch("multi_agent_conductor.confirm_spawn") as mock_spawn, \ patch("multi_agent_conductor._queue_put") as mock_queue_put: mock_spawn.return_value = (True, "prompt", "context") run_worker_lifecycle(ticket, context, event_queue=mock_event_queue, loop=mock_loop) mock_queue_put.assert_called_once() call_args = mock_queue_put.call_args[0] assert call_args[2] == "response" assert call_args[3]["stream_id"] == "Tier 3 (Worker): T1" assert call_args[3]["text"] == "Task complete." assert call_args[3]["status"] == "done" assert ticket.status == "completed" def test_run_worker_lifecycle_token_usage_from_comms_log(monkeypatch: pytest.MonkeyPatch) -> None: """ Test that run_worker_lifecycle reads token usage from the comms log and updates engine.tier_usage['Tier 3'] with real input/output token counts. """ ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1") context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[]) fake_comms = [ {"direction": "OUT", "kind": "request", "payload": {"message": "hello"}}, {"direction": "IN", "kind": "response", "payload": {"usage": {"input_tokens": 120, "output_tokens": 45}}}, ] monkeypatch.setattr(ai_client, 'send', MagicMock(return_value="Done.")) monkeypatch.setattr(ai_client, 'reset_session', MagicMock()) monkeypatch.setattr(ai_client, 'get_comms_log', MagicMock(side_effect=[ [], # baseline call (before send) fake_comms, # after-send call ])) from multi_agent_conductor import run_worker_lifecycle, ConductorEngine from models import Track track = Track(id="test_track", description="Test") engine = ConductorEngine(track=track, auto_queue=True) with patch("multi_agent_conductor.confirm_spawn") as mock_spawn, \ patch("multi_agent_conductor._queue_put"): mock_spawn.return_value = (True, "prompt", "ctx") run_worker_lifecycle(ticket, context, event_queue=MagicMock(), loop=MagicMock(), engine=engine) assert engine.tier_usage["Tier 3"]["input"] == 120 assert engine.tier_usage["Tier 3"]["output"] == 45