Files
manual_slop/tests/test_conductor_engine_v2.py

324 lines
14 KiB
Python

"""
ANTI-SIMPLIFICATION: These tests verify the core multi-agent execution engine, including dependency graph resolution, worker lifecycle, and context injection.
They MUST NOT be simplified, and their assertions on exact call counts and dependency ordering are critical for preventing regressions in the orchestrator.
"""
import pytest
from unittest.mock import MagicMock, patch
from src.models import Ticket, Track, WorkerContext
from src import ai_client
def test_conductor_engine_initialization() -> None:
"""
Test that ConductorEngine can be initialized with a Track.
"""
track = Track(id="test_track", description="Test Track")
from src.multi_agent_conductor import ConductorEngine
engine = ConductorEngine(track=track, auto_queue=True)
assert engine.track == track
def test_conductor_engine_run_executes_tickets_in_order(monkeypatch: pytest.MonkeyPatch, vlogger) -> None:
"""
Test that run iterates through executable tickets and calls the worker lifecycle.
"""
ticket1 = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
ticket2 = Ticket(id="T2", description="Task 2", status="todo", assigned_to="worker2", depends_on=["T1"])
track = Track(id="track1", description="Track 1", tickets=[ticket1, ticket2])
from src.multi_agent_conductor import ConductorEngine
engine = ConductorEngine(track=track, auto_queue=True)
vlogger.log_state("Ticket Count", 0, 2)
vlogger.log_state("T1 Status", "todo", "todo")
vlogger.log_state("T2 Status", "todo", "todo")
# Mock ai_client.send using monkeypatch
mock_send = MagicMock()
monkeypatch.setattr(ai_client, 'send', mock_send)
# We mock run_worker_lifecycle as it is expected to be in the same module
with patch("src.multi_agent_conductor.run_worker_lifecycle") as mock_lifecycle:
# Mocking lifecycle to mark ticket as complete so dependencies can be resolved
def side_effect(ticket, context, *args, **kwargs):
import threading
def do_work():
import time
time.sleep(0.1)
ticket.mark_complete()
threading.Thread(target=do_work).start()
return "Success"
mock_lifecycle.side_effect = side_effect
engine.run(max_ticks=20)
vlogger.log_state("T1 Status Final", "todo", ticket1.status)
vlogger.log_state("T2 Status Final", "todo", ticket2.status)
# Track.get_executable_tickets() should be called repeatedly until all are done
# T1 should run first, then T2.
assert mock_lifecycle.call_count == 2
assert ticket1.status == "completed"
assert ticket2.status == "completed"
# Verify sequence: T1 before T2
calls = mock_lifecycle.call_args_list
assert calls[0][0][0].id == "T1"
assert calls[1][0][0].id == "T2"
vlogger.finalize("Verify dependency execution order", "PASS", "T1 executed before T2")
def test_run_worker_lifecycle_calls_ai_client_send(monkeypatch: pytest.MonkeyPatch) -> None:
"""
Test that run_worker_lifecycle triggers the AI client and updates ticket status on success.
"""
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[])
from src.multi_agent_conductor import run_worker_lifecycle
# Mock ai_client.send using monkeypatch
mock_send = MagicMock()
monkeypatch.setattr(ai_client, 'send', mock_send)
mock_send.return_value = "Task complete. I have updated the file."
result = run_worker_lifecycle(ticket, context)
assert result == "Task complete. I have updated the file."
assert ticket.status == "completed"
mock_send.assert_called_once()
# Check if description was passed to send()
args, kwargs = mock_send.call_args
# user_message is passed as a keyword argument
assert ticket.description in kwargs["user_message"]
def test_run_worker_lifecycle_context_injection(monkeypatch: pytest.MonkeyPatch) -> None:
"""
Test that run_worker_lifecycle can take a context_files list and injects AST views into the prompt.
"""
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[])
context_files = ["primary.py", "secondary.py"]
from src.multi_agent_conductor import run_worker_lifecycle
# Mock ai_client.send using monkeypatch
mock_send = MagicMock()
monkeypatch.setattr(ai_client, 'send', mock_send)
# We mock ASTParser which is expected to be imported in multi_agent_conductor
with patch("src.multi_agent_conductor.ASTParser") as mock_ast_parser_class, \
patch("builtins.open", new_callable=MagicMock) as mock_open:
# Setup open mock to return different content for different files
file_contents = {
"primary.py": "def primary(): pass",
"secondary.py": "def secondary(): pass"
}
def mock_open_side_effect(file, *args, **kwargs):
content = file_contents.get(file, "")
mock_file = MagicMock()
mock_file.read.return_value = content
mock_file.__enter__.return_value = mock_file
return mock_file
mock_open.side_effect = mock_open_side_effect
# Setup ASTParser mock
mock_ast_parser = mock_ast_parser_class.return_value
mock_ast_parser.get_curated_view.return_value = "CURATED VIEW"
mock_ast_parser.get_skeleton.return_value = "SKELETON VIEW"
mock_send.return_value = "Success"
run_worker_lifecycle(ticket, context, context_files=context_files)
# Verify ASTParser calls:
# First file (primary) should get curated view, others (secondary) get skeleton
mock_ast_parser.get_curated_view.assert_called_once_with("def primary(): pass", path="primary.py")
mock_ast_parser.get_skeleton.assert_called_once_with("def secondary(): pass", path="secondary.py")
# Verify user_message contains the views
_, kwargs = mock_send.call_args
user_message = kwargs["user_message"]
assert "CURATED VIEW" in user_message
assert "SKELETON VIEW" in user_message
assert "primary.py" in user_message
assert "secondary.py" in user_message
def test_run_worker_lifecycle_handles_blocked_response(monkeypatch: pytest.MonkeyPatch) -> None:
"""
Test that run_worker_lifecycle marks the ticket as blocked if the AI indicates it cannot proceed.
"""
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[])
from src.multi_agent_conductor import run_worker_lifecycle
# Mock ai_client.send using monkeypatch
mock_send = MagicMock()
monkeypatch.setattr(ai_client, 'send', mock_send)
# Simulate a response indicating a block
mock_send.return_value = "I am BLOCKED because I don't have enough information."
run_worker_lifecycle(ticket, context)
assert ticket.status == "blocked"
assert "BLOCKED" in ticket.blocked_reason
def test_run_worker_lifecycle_step_mode_confirmation(monkeypatch: pytest.MonkeyPatch) -> None:
"""
Test that run_worker_lifecycle passes confirm_execution to ai_client.send when step_mode is True.
Verify that if confirm_execution is called (simulated by mocking ai_client.send to call its callback),
the flow works as expected.
"""
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1", step_mode=True)
context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[])
from src.multi_agent_conductor import run_worker_lifecycle
# Mock ai_client.send using monkeypatch
mock_send = MagicMock()
monkeypatch.setattr(ai_client, 'send', mock_send)
# Important: confirm_spawn is called first if event_queue is present!
with patch("src.multi_agent_conductor.confirm_spawn") as mock_spawn, \
patch("src.multi_agent_conductor.confirm_execution") as mock_confirm:
mock_spawn.return_value = (True, "mock prompt", "mock context")
mock_confirm.return_value = True
def mock_send_side_effect(md_content, user_message, **kwargs):
callback = kwargs.get("pre_tool_callback")
if callback:
# Simulate calling it with some payload
callback('{"tool": "read_file", "args": {"path": "test.txt"}}')
return "Success"
mock_send.side_effect = mock_send_side_effect
mock_event_queue = MagicMock()
run_worker_lifecycle(ticket, context, event_queue=mock_event_queue)
# Verify confirm_spawn was called because event_queue was present
mock_spawn.assert_called_once()
# Verify confirm_execution was called
mock_confirm.assert_called_once()
assert ticket.status == "completed"
def test_run_worker_lifecycle_step_mode_rejection(monkeypatch: pytest.MonkeyPatch) -> None:
"""
Verify that if confirm_execution returns False, the logic (in ai_client, which we simulate here)
would prevent execution. In run_worker_lifecycle, we just check if it's passed.
"""
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1", step_mode=True)
context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[])
from src.multi_agent_conductor import run_worker_lifecycle
# Mock ai_client.send using monkeypatch
mock_send = MagicMock()
monkeypatch.setattr(ai_client, 'send', mock_send)
with patch("src.multi_agent_conductor.confirm_spawn") as mock_spawn, \
patch("src.multi_agent_conductor.confirm_execution") as mock_confirm:
mock_spawn.return_value = (True, "mock prompt", "mock context")
mock_confirm.return_value = False
mock_send.return_value = "Task failed because tool execution was rejected."
mock_event_queue = MagicMock()
run_worker_lifecycle(ticket, context, event_queue=mock_event_queue)
# Verify it was passed to send
args, kwargs = mock_send.call_args
assert kwargs["pre_tool_callback"] is not None
def test_conductor_engine_dynamic_parsing_and_execution(monkeypatch: pytest.MonkeyPatch, vlogger) -> None:
"""
Test that parse_json_tickets correctly populates the track and run executes them in dependency order.
"""
import json
from src.multi_agent_conductor import ConductorEngine
track = Track(id="dynamic_track", description="Dynamic Track")
engine = ConductorEngine(track=track, auto_queue=True)
tickets_json = json.dumps([
{
"id": "T1",
"description": "Initial task",
"status": "todo",
"assigned_to": "worker1",
"depends_on": []
},
{
"id": "T2",
"description": "Dependent task",
"status": "todo",
"assigned_to": "worker2",
"depends_on": ["T1"]
},
{
"id": "T3",
"description": "Another initial task",
"status": "todo",
"assigned_to": "worker3",
"depends_on": []
}
])
engine.parse_json_tickets(tickets_json)
vlogger.log_state("Parsed Ticket Count", 0, len(engine.track.tickets))
assert len(engine.track.tickets) == 3
assert engine.track.tickets[0].id == "T1"
assert engine.track.tickets[1].id == "T2"
assert engine.track.tickets[2].id == "T3"
# Mock ai_client.send using monkeypatch
mock_send = MagicMock()
monkeypatch.setattr(ai_client, 'send', mock_send)
# Mock run_worker_lifecycle to mark tickets as complete
with patch("src.multi_agent_conductor.run_worker_lifecycle") as mock_lifecycle:
def side_effect(ticket, context, *args, **kwargs):
import threading
def do_work():
import time
time.sleep(0.1)
ticket.mark_complete()
threading.Thread(target=do_work).start()
return "Success"
mock_lifecycle.side_effect = side_effect
engine.run(max_ticks=20)
assert mock_lifecycle.call_count == 3
# Verify dependency order: T1 must be called before T2
calls = [call[0][0].id for call in mock_lifecycle.call_args_list]
t1_idx = calls.index("T1")
t2_idx = calls.index("T2")
vlogger.log_state("T1 Sequence Index", "N/A", t1_idx)
vlogger.log_state("T2 Sequence Index", "N/A", t2_idx)
assert t1_idx < t2_idx
# T3 can be anywhere relative to T1 and T2, but T1 < T2 is mandatory
assert "T3" in calls
vlogger.finalize("Dynamic track parsing and dependency execution", "PASS", "Dependency chain T1 -> T2 honored.")
def test_run_worker_lifecycle_pushes_response_via_queue(monkeypatch: pytest.MonkeyPatch) -> None:
"""
Test that run_worker_lifecycle pushes a 'response' event with the correct stream_id
via _queue_put when event_queue is provided.
"""
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[])
mock_event_queue = MagicMock()
mock_send = MagicMock(return_value="Task complete.")
monkeypatch.setattr(ai_client, 'send', mock_send)
monkeypatch.setattr(ai_client, 'reset_session', MagicMock())
from src.multi_agent_conductor import run_worker_lifecycle
with patch("src.multi_agent_conductor.confirm_spawn") as mock_spawn, \
patch("src.multi_agent_conductor._queue_put") as mock_queue_put:
mock_spawn.return_value = (True, "prompt", "context")
run_worker_lifecycle(ticket, context, event_queue=mock_event_queue)
# ANTI-SIMPLIFICATION: Ensure exactly one 'response' event is put in the queue to avoid duplication loops.
assert mock_queue_put.call_count >= 1
call_args = mock_queue_put.call_args_list[0][0]
assert call_args[1] == "response"
assert call_args[2]["stream_id"] == "Tier 3 (Worker): T1"
assert call_args[2]["text"] == "Task complete."
assert call_args[2]["status"] == "done"
assert ticket.status == "completed"
def test_run_worker_lifecycle_token_usage_from_comms_log(monkeypatch: pytest.MonkeyPatch) -> None:
"""
Test that run_worker_lifecycle reads token usage from the comms log and
updates engine.tier_usage['Tier 3'] with real input/output token counts.
"""
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[])
fake_comms = [
{"direction": "OUT", "kind": "request", "payload": {"message": "hello"}},
{"direction": "IN", "kind": "response", "payload": {"usage": {"input_tokens": 120, "output_tokens": 45}}},
]
monkeypatch.setattr(ai_client, 'send', MagicMock(return_value="Done."))
monkeypatch.setattr(ai_client, 'reset_session', MagicMock())
monkeypatch.setattr(ai_client, 'get_comms_log', MagicMock(side_effect=[
[], # baseline call (before send)
fake_comms, # after-send call
]))
from src.multi_agent_conductor import run_worker_lifecycle, ConductorEngine
track = Track(id="test_track", description="Test")
engine = ConductorEngine(track=track, auto_queue=True)
with patch("src.multi_agent_conductor.confirm_spawn") as mock_spawn, \
patch("src.multi_agent_conductor._queue_put"):
mock_spawn.return_value = (True, "prompt", "ctx")
run_worker_lifecycle(ticket, context, event_queue=MagicMock(), engine=engine)
assert engine.tier_usage["Tier 3"]["input"] == 120
assert engine.tier_usage["Tier 3"]["output"] == 45