Files
manual_slop/tests/test_conductor_engine.py
Ed_ 3eefdfd29d fix(mma): Replace token stats stub with real comms log extraction in run_worker_lifecycle
Task 2.1 of mma_pipeline_fix_20260301: capture comms baseline before send(), then sum input_tokens/output_tokens from IN/response entries to populate engine.tier_usage['Tier 3'].
2026-03-01 13:22:15 -05:00

324 lines
14 KiB
Python

import pytest
from unittest.mock import MagicMock, patch
from models import Ticket, Track, WorkerContext
import ai_client
import multi_agent_conductor
# These tests define the expected interface for multi_agent_conductor.py
# which will be implemented in the next phase of TDD.
def test_conductor_engine_initialization() -> None:
"""
Test that ConductorEngine can be initialized with a Track.
"""
track = Track(id="test_track", description="Test Track")
from multi_agent_conductor import ConductorEngine
engine = ConductorEngine(track=track, auto_queue=True)
assert engine.track == track
@pytest.mark.asyncio
async def test_conductor_engine_run_executes_tickets_in_order(monkeypatch: pytest.MonkeyPatch, vlogger) -> None:
"""
Test that run iterates through executable tickets and calls the worker lifecycle.
"""
ticket1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
ticket2 = Ticket(id="T2", description="Task 2", status="todo", assigned_to="worker2", depends_on=["T1"])
track = Track(id="track1", description="Track 1", tickets=[ticket1, ticket2])
from multi_agent_conductor import ConductorEngine
engine = ConductorEngine(track=track, auto_queue=True)
vlogger.log_state("Ticket Count", 0, 2)
vlogger.log_state("T1 Status", "todo", "todo")
vlogger.log_state("T2 Status", "todo", "todo")
# Mock ai_client.send using monkeypatch
mock_send = MagicMock()
monkeypatch.setattr(ai_client, 'send', mock_send)
# We mock run_worker_lifecycle as it is expected to be in the same module
with patch("multi_agent_conductor.run_worker_lifecycle") as mock_lifecycle:
# Mocking lifecycle to mark ticket as complete so dependencies can be resolved
def side_effect(ticket, context, *args, **kwargs):
ticket.mark_complete()
return "Success"
mock_lifecycle.side_effect = side_effect
await engine.run()
vlogger.log_state("T1 Status Final", "todo", ticket1.status)
vlogger.log_state("T2 Status Final", "todo", ticket2.status)
# Track.get_executable_tickets() should be called repeatedly until all are done
# T1 should run first, then T2.
assert mock_lifecycle.call_count == 2
assert ticket1.status == "completed"
assert ticket2.status == "completed"
# Verify sequence: T1 before T2
calls = mock_lifecycle.call_args_list
assert calls[0][0][0].id == "T1"
assert calls[1][0][0].id == "T2"
vlogger.finalize("Verify dependency execution order", "PASS", "T1 executed before T2")
@pytest.mark.asyncio
async def test_run_worker_lifecycle_calls_ai_client_send(monkeypatch: pytest.MonkeyPatch) -> None:
"""
Test that run_worker_lifecycle triggers the AI client and updates ticket status on success.
"""
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[])
from multi_agent_conductor import run_worker_lifecycle
# Mock ai_client.send using monkeypatch
mock_send = MagicMock()
monkeypatch.setattr(ai_client, 'send', mock_send)
mock_send.return_value = "Task complete. I have updated the file."
result = run_worker_lifecycle(ticket, context)
assert result == "Task complete. I have updated the file."
assert ticket.status == "completed"
mock_send.assert_called_once()
# Check if description was passed to send()
args, kwargs = mock_send.call_args
# user_message is passed as a keyword argument
assert ticket.description in kwargs["user_message"]
@pytest.mark.asyncio
async def test_run_worker_lifecycle_context_injection(monkeypatch: pytest.MonkeyPatch) -> None:
"""
Test that run_worker_lifecycle can take a context_files list and injects AST views into the prompt.
"""
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[])
context_files = ["primary.py", "secondary.py"]
from multi_agent_conductor import run_worker_lifecycle
# Mock ai_client.send using monkeypatch
mock_send = MagicMock()
monkeypatch.setattr(ai_client, 'send', mock_send)
# We mock ASTParser which is expected to be imported in multi_agent_conductor
with patch("multi_agent_conductor.ASTParser") as mock_ast_parser_class, \
patch("builtins.open", new_callable=MagicMock) as mock_open:
# Setup open mock to return different content for different files
file_contents = {
"primary.py": "def primary(): pass",
"secondary.py": "def secondary(): pass"
}
def mock_open_side_effect(file, *args, **kwargs):
content = file_contents.get(file, "")
mock_file = MagicMock()
mock_file.read.return_value = content
mock_file.__enter__.return_value = mock_file
return mock_file
mock_open.side_effect = mock_open_side_effect
# Setup ASTParser mock
mock_ast_parser = mock_ast_parser_class.return_value
mock_ast_parser.get_curated_view.return_value = "CURATED VIEW"
mock_ast_parser.get_skeleton.return_value = "SKELETON VIEW"
mock_send.return_value = "Success"
run_worker_lifecycle(ticket, context, context_files=context_files)
# Verify ASTParser calls:
# First file (primary) should get curated view, others (secondary) get skeleton
mock_ast_parser.get_curated_view.assert_called_once_with("def primary(): pass")
mock_ast_parser.get_skeleton.assert_called_once_with("def secondary(): pass")
# Verify user_message contains the views
_, kwargs = mock_send.call_args
user_message = kwargs["user_message"]
assert "CURATED VIEW" in user_message
assert "SKELETON VIEW" in user_message
assert "primary.py" in user_message
assert "secondary.py" in user_message
@pytest.mark.asyncio
async def test_run_worker_lifecycle_handles_blocked_response(monkeypatch: pytest.MonkeyPatch) -> None:
"""
Test that run_worker_lifecycle marks the ticket as blocked if the AI indicates it cannot proceed.
"""
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[])
from multi_agent_conductor import run_worker_lifecycle
# Mock ai_client.send using monkeypatch
mock_send = MagicMock()
monkeypatch.setattr(ai_client, 'send', mock_send)
# Simulate a response indicating a block
mock_send.return_value = "I am BLOCKED because I don't have enough information."
run_worker_lifecycle(ticket, context)
assert ticket.status == "blocked"
assert "BLOCKED" in ticket.blocked_reason
@pytest.mark.asyncio
async def test_run_worker_lifecycle_step_mode_confirmation(monkeypatch: pytest.MonkeyPatch) -> None:
"""
Test that run_worker_lifecycle passes confirm_execution to ai_client.send when step_mode is True.
Verify that if confirm_execution is called (simulated by mocking ai_client.send to call its callback),
the flow works as expected.
"""
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1", step_mode=True)
context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[])
from multi_agent_conductor import run_worker_lifecycle
# Mock ai_client.send using monkeypatch
mock_send = MagicMock()
monkeypatch.setattr(ai_client, 'send', mock_send)
# Important: confirm_spawn is called first if event_queue is present!
with patch("multi_agent_conductor.confirm_spawn") as mock_spawn, \
patch("multi_agent_conductor.confirm_execution") as mock_confirm:
mock_spawn.return_value = (True, "mock prompt", "mock context")
mock_confirm.return_value = True
def mock_send_side_effect(md_content, user_message, **kwargs):
callback = kwargs.get("pre_tool_callback")
if callback:
# Simulate calling it with some payload
callback('{"tool": "read_file", "args": {"path": "test.txt"}}')
return "Success"
mock_send.side_effect = mock_send_side_effect
mock_event_queue = MagicMock()
run_worker_lifecycle(ticket, context, event_queue=mock_event_queue)
# Verify confirm_spawn was called because event_queue was present
mock_spawn.assert_called_once()
# Verify confirm_execution was called
mock_confirm.assert_called_once()
assert ticket.status == "completed"
@pytest.mark.asyncio
async def test_run_worker_lifecycle_step_mode_rejection(monkeypatch: pytest.MonkeyPatch) -> None:
"""
Verify that if confirm_execution returns False, the logic (in ai_client, which we simulate here)
would prevent execution. In run_worker_lifecycle, we just check if it's passed.
"""
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1", step_mode=True)
context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[])
from multi_agent_conductor import run_worker_lifecycle
# Mock ai_client.send using monkeypatch
mock_send = MagicMock()
monkeypatch.setattr(ai_client, 'send', mock_send)
with patch("multi_agent_conductor.confirm_spawn") as mock_spawn, \
patch("multi_agent_conductor.confirm_execution") as mock_confirm:
mock_spawn.return_value = (True, "mock prompt", "mock context")
mock_confirm.return_value = False
mock_send.return_value = "Task failed because tool execution was rejected."
mock_event_queue = MagicMock()
run_worker_lifecycle(ticket, context, event_queue=mock_event_queue)
# Verify it was passed to send
args, kwargs = mock_send.call_args
assert kwargs["pre_tool_callback"] is not None
@pytest.mark.asyncio
async def test_conductor_engine_dynamic_parsing_and_execution(monkeypatch: pytest.MonkeyPatch, vlogger) -> None:
"""
Test that parse_json_tickets correctly populates the track and run executes them in dependency order.
"""
import json
from multi_agent_conductor import ConductorEngine
track = Track(id="dynamic_track", description="Dynamic Track")
engine = ConductorEngine(track=track, auto_queue=True)
tickets_json = json.dumps([
{
"id": "T1",
"description": "Initial task",
"status": "todo",
"assigned_to": "worker1",
"depends_on": []
},
{
"id": "T2",
"description": "Dependent task",
"status": "todo",
"assigned_to": "worker2",
"depends_on": ["T1"]
},
{
"id": "T3",
"description": "Another initial task",
"status": "todo",
"assigned_to": "worker3",
"depends_on": []
}
])
engine.parse_json_tickets(tickets_json)
vlogger.log_state("Parsed Ticket Count", 0, len(engine.track.tickets))
assert len(engine.track.tickets) == 3
assert engine.track.tickets[0].id == "T1"
assert engine.track.tickets[1].id == "T2"
assert engine.track.tickets[2].id == "T3"
# Mock ai_client.send using monkeypatch
mock_send = MagicMock()
monkeypatch.setattr(ai_client, 'send', mock_send)
# Mock run_worker_lifecycle to mark tickets as complete
with patch("multi_agent_conductor.run_worker_lifecycle") as mock_lifecycle:
def side_effect(ticket, context, *args, **kwargs):
ticket.mark_complete()
return "Success"
mock_lifecycle.side_effect = side_effect
await engine.run()
assert mock_lifecycle.call_count == 3
# Verify dependency order: T1 must be called before T2
calls = [call[0][0].id for call in mock_lifecycle.call_args_list]
t1_idx = calls.index("T1")
t2_idx = calls.index("T2")
vlogger.log_state("T1 Sequence Index", "N/A", t1_idx)
vlogger.log_state("T2 Sequence Index", "N/A", t2_idx)
assert t1_idx < t2_idx
# T3 can be anywhere relative to T1 and T2, but T1 < T2 is mandatory
assert "T3" in calls
vlogger.finalize("Dynamic track parsing and dependency execution", "PASS", "Dependency chain T1 -> T2 honored.")
def test_run_worker_lifecycle_pushes_response_via_queue(monkeypatch: pytest.MonkeyPatch) -> None:
"""
Test that run_worker_lifecycle pushes a 'response' event with the correct stream_id
via _queue_put when event_queue and loop are provided.
"""
import asyncio
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[])
mock_event_queue = MagicMock()
mock_loop = MagicMock(spec=asyncio.AbstractEventLoop)
mock_send = MagicMock(return_value="Task complete.")
monkeypatch.setattr(ai_client, 'send', mock_send)
monkeypatch.setattr(ai_client, 'reset_session', MagicMock())
from multi_agent_conductor import run_worker_lifecycle
with patch("multi_agent_conductor.confirm_spawn") as mock_spawn, \
patch("multi_agent_conductor._queue_put") as mock_queue_put:
mock_spawn.return_value = (True, "prompt", "context")
run_worker_lifecycle(ticket, context, event_queue=mock_event_queue, loop=mock_loop)
mock_queue_put.assert_called_once()
call_args = mock_queue_put.call_args[0]
assert call_args[2] == "response"
assert call_args[3]["stream_id"] == "Tier 3 (Worker): T1"
assert call_args[3]["text"] == "Task complete."
assert call_args[3]["status"] == "done"
assert ticket.status == "completed"
def test_run_worker_lifecycle_token_usage_from_comms_log(monkeypatch: pytest.MonkeyPatch) -> None:
"""
Test that run_worker_lifecycle reads token usage from the comms log and
updates engine.tier_usage['Tier 3'] with real input/output token counts.
"""
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[])
fake_comms = [
{"direction": "OUT", "kind": "request", "payload": {"message": "hello"}},
{"direction": "IN", "kind": "response", "payload": {"usage": {"input_tokens": 120, "output_tokens": 45}}},
]
monkeypatch.setattr(ai_client, 'send', MagicMock(return_value="Done."))
monkeypatch.setattr(ai_client, 'reset_session', MagicMock())
monkeypatch.setattr(ai_client, 'get_comms_log', MagicMock(side_effect=[
[], # baseline call (before send)
fake_comms, # after-send call
]))
from multi_agent_conductor import run_worker_lifecycle, ConductorEngine
from models import Track
track = Track(id="test_track", description="Test")
engine = ConductorEngine(track=track, auto_queue=True)
with patch("multi_agent_conductor.confirm_spawn") as mock_spawn, \
patch("multi_agent_conductor._queue_put"):
mock_spawn.return_value = (True, "prompt", "ctx")
run_worker_lifecycle(ticket, context, event_queue=MagicMock(), loop=MagicMock(), engine=engine)
assert engine.tier_usage["Tier 3"]["input"] == 120
assert engine.tier_usage["Tier 3"]["output"] == 45