test(mma): Add test verifying run_worker_lifecycle pushes response via _queue_put
Task 1.4 of mma_pipeline_fix_20260301: asserts stream_id='Tier 3 (Worker): T1', event_name='response', text and status fields correct.
This commit is contained in:
@@ -267,3 +267,29 @@ async def test_conductor_engine_dynamic_parsing_and_execution(monkeypatch: pytes
|
||||
assert "T3" in calls
|
||||
vlogger.finalize("Dynamic track parsing and dependency execution", "PASS", "Dependency chain T1 -> T2 honored.")
|
||||
|
||||
def test_run_worker_lifecycle_pushes_response_via_queue(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
"""
|
||||
Test that run_worker_lifecycle pushes a 'response' event with the correct stream_id
|
||||
via _queue_put when event_queue and loop are provided.
|
||||
"""
|
||||
import asyncio
|
||||
ticket = Ticket(id="T1", description="Task 1", status="todo", assigned_to="worker1")
|
||||
context = WorkerContext(ticket_id="T1", model_name="test-model", messages=[])
|
||||
mock_event_queue = MagicMock()
|
||||
mock_loop = MagicMock(spec=asyncio.AbstractEventLoop)
|
||||
mock_send = MagicMock(return_value="Task complete.")
|
||||
monkeypatch.setattr(ai_client, 'send', mock_send)
|
||||
monkeypatch.setattr(ai_client, 'reset_session', MagicMock())
|
||||
from multi_agent_conductor import run_worker_lifecycle
|
||||
with patch("multi_agent_conductor.confirm_spawn") as mock_spawn, \
|
||||
patch("multi_agent_conductor._queue_put") as mock_queue_put:
|
||||
mock_spawn.return_value = (True, "prompt", "context")
|
||||
run_worker_lifecycle(ticket, context, event_queue=mock_event_queue, loop=mock_loop)
|
||||
mock_queue_put.assert_called_once()
|
||||
call_args = mock_queue_put.call_args[0]
|
||||
assert call_args[2] == "response"
|
||||
assert call_args[3]["stream_id"] == "Tier 3 (Worker): T1"
|
||||
assert call_args[3]["text"] == "Task complete."
|
||||
assert call_args[3]["status"] == "done"
|
||||
assert ticket.status == "completed"
|
||||
|
||||
|
||||
Reference in New Issue
Block a user