import pytest from unittest.mock import MagicMock, patch, AsyncMock import asyncio import json import multi_agent_conductor from multi_agent_conductor import ConductorEngine, run_worker_lifecycle from models import Ticket, Track, WorkerContext def test_worker_streaming_intermediate(): ticket = Ticket(id="T-001", description="Test", status="todo", assigned_to="worker") context = WorkerContext(ticket_id="T-001", model_name="test-model", messages=[]) event_queue = MagicMock() event_queue.put = AsyncMock() loop = MagicMock() with ( patch("ai_client.send") as mock_send, patch("multi_agent_conductor._queue_put") as mock_q_put, patch("multi_agent_conductor.confirm_spawn", return_value=(True, "p", "c")), patch("ai_client.reset_session"), patch("ai_client.set_provider"), patch("ai_client.get_provider"), patch("ai_client.get_comms_log", return_value=[]) ): def side_effect(*args, **kwargs): import ai_client cb = ai_client.comms_log_callback if cb: cb({"kind": "tool_call", "payload": {"name": "test_tool", "script": "echo hello"}}) cb({"kind": "tool_result", "payload": {"name": "test_tool", "output": "hello"}}) return "DONE" mock_send.side_effect = side_effect run_worker_lifecycle(ticket, context, event_queue=event_queue, loop=loop) responses = [call.args[3] for call in mock_q_put.call_args_list if call.args[2] == "response"] assert any("[TOOL CALL]" in r.get("text", "") for r in responses) assert any("[TOOL RESULT]" in r.get("text", "") for r in responses) def test_per_tier_model_persistence(): # Mock UI frameworks before importing gui_2 mock_imgui = MagicMock() with patch.dict("sys.modules", { "imgui_bundle": MagicMock(), "imgui_bundle.imgui": mock_imgui, "imgui_bundle.hello_imgui": MagicMock(), "imgui_bundle.immapp": MagicMock(), }): from gui_2 import App with ( patch("gui_2.project_manager.load_project", return_value={}), patch("gui_2.project_manager.migrate_from_legacy_config", return_value={}), patch("gui_2.project_manager.save_project"), patch("gui_2.save_config"), patch("gui_2.theme.load_from_config"), patch("gui_2.ai_client.set_provider"), patch("gui_2.ai_client.list_models", return_value=["gpt-4", "claude-3"]), patch("gui_2.PerformanceMonitor"), patch("gui_2.api_hooks.HookServer"), patch("gui_2.session_logger.open_session") ): app = App() app.available_models = ["gpt-4", "claude-3"] tier = "Tier 3" model = "claude-3" # Simulate 'Tier Model Config' UI logic app.mma_tier_usage[tier]["model"] = model app.project.setdefault("mma", {}).setdefault("tier_models", {})[tier] = model assert app.project["mma"]["tier_models"][tier] == model @pytest.mark.asyncio async def test_retry_escalation(): ticket = Ticket(id="T-001", description="Test", status="todo", assigned_to="worker") track = Track(id="TR-001", description="Track", tickets=[ticket]) event_queue = MagicMock() event_queue.put = AsyncMock() engine = ConductorEngine(track, event_queue=event_queue) engine.engine.auto_queue = True with patch("multi_agent_conductor.run_worker_lifecycle") as mock_lifecycle: def lifecycle_side_effect(t, *args, **kwargs): t.status = "blocked" return "BLOCKED" mock_lifecycle.side_effect = lifecycle_side_effect with patch.object(engine.engine, "tick") as mock_tick: # First tick returns ticket, second tick returns empty list to stop loop mock_tick.side_effect = [[ticket], []] await engine.run() assert ticket.retry_count == 1 assert ticket.status == "todo"