import threading import time import sys import pytest from unittest.mock import MagicMock from src.multi_agent_conductor import WorkerPool def test_worker_pool_limit(): max_workers = 2 pool = WorkerPool(max_workers=max_workers) def slow_task(event): event.set() time.sleep(0.5) event1 = threading.Event() event2 = threading.Event() event3 = threading.Event() # Spawn 2 tasks t1 = pool.spawn("t1", slow_task, (event1,)) t2 = pool.spawn("t2", slow_task, (event2,)) assert t1 is not None assert t2 is not None assert pool.get_active_count() == 2 assert pool.is_full() is True # Try to spawn a 3rd task t3 = pool.spawn("t3", slow_task, (event3,)) assert t3 is None assert pool.get_active_count() == 2 # Wait for tasks to finish event1.wait() event2.wait() pool.join_all() assert pool.get_active_count() == 0 assert pool.is_full() is False def test_worker_pool_tracking(): pool = WorkerPool(max_workers=4) def task(ticket_id): time.sleep(0.1) pool.spawn("ticket_1", task, ("ticket_1",)) pool.spawn("ticket_2", task, ("ticket_2",)) assert "ticket_1" in pool._active assert "ticket_2" in pool._active pool.join_all() assert len(pool._active) == 0 def test_worker_pool_completion_cleanup(): pool = WorkerPool(max_workers=4) def fast_task(): pass pool.spawn("t1", fast_task, ()) time.sleep(0.2) # Give it time to finish and run finally block assert pool.get_active_count() == 0 assert "t1" not in pool._active from unittest.mock import patch from src.models import Track, Ticket from src.multi_agent_conductor import ConductorEngine @patch('src.multi_agent_conductor.run_worker_lifecycle') @patch('src.models.load_config') def test_conductor_engine_pool_integration(mock_load_config, mock_lifecycle): # Mock config to set max_workers=2 mock_load_config.return_value = {"mma": {"max_workers": 2}} # Create 4 independent tickets tickets = [ Ticket(id=f"t{i}", description=f"task {i}", status="todo") for i in range(4) ] track = Track(id="test_track", description="test", tickets=tickets) # Set up engine with auto_queue engine = ConductorEngine(track, auto_queue=True) sys.stderr.write(f"[TEST] engine.pool.max_workers = {engine.pool.max_workers}\n") assert engine.pool.max_workers == 2 # Slow down lifecycle to capture parallel state def slow_lifecycle(ticket, *args, **kwargs): # Set to in_progress immediately to simulate the status change # (The engine usually does this, but we want to be sure) time.sleep(0.5) ticket.status = "completed" mock_lifecycle.side_effect = slow_lifecycle # Run exactly 1 tick engine.run(max_ticks=1) # Verify only 2 were marked in_progress/spawned # Because we only ran for one tick, and there were 4 ready tasks, # it should have tried to spawn as many as possible (limit 2). in_progress = [tk for tk in tickets if tk.status == "in_progress"] # Also count those that already finished if the sleep was too short completed = [tk for tk in tickets if tk.status == "completed"] sys.stderr.write(f"[TEST] in_progress={len(in_progress)} completed={len(completed)}\n") assert len(in_progress) + len(completed) == 2 assert engine.pool.get_active_count() <= 2 # Cleanup: wait for mock threads to finish or join_all engine.pool.join_all()