"""Tests for src/io_pool.py (the shared 4-thread job pool on AppController).""" import threading import time from concurrent.futures import ThreadPoolExecutor from pathlib import Path import sys ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(ROOT)) from src.io_pool import make_io_pool, IO_POOL_MAX_WORKERS # noqa: E402 def test_make_io_pool_returns_thread_pool_executor() -> None: pool = make_io_pool() assert isinstance(pool, ThreadPoolExecutor) pool.shutdown(wait=False) def test_make_io_pool_has_four_workers() -> None: pool = make_io_pool() assert pool._max_workers == IO_POOL_MAX_WORKERS == 4 pool.shutdown(wait=False) def test_make_io_pool_workers_named_controller_io() -> None: pool = make_io_pool() def capture() -> str: return threading.current_thread().name fut = pool.submit(capture) name = fut.result(timeout=5) assert name.startswith("controller-io"), f"got {name!r}" pool.shutdown(wait=False) def test_make_io_pool_runs_jobs_in_parallel() -> None: pool = make_io_pool() barrier = threading.Barrier(4) results: list[float] = [] def wait_at_barrier() -> float: t0 = time.perf_counter() barrier.wait(timeout=5) return time.perf_counter() - t0 futs = [pool.submit(wait_at_barrier) for _ in range(4)] durations = [f.result(timeout=5) for f in futs] assert all(d < 0.5 for d in durations), f"jobs did not run in parallel: {durations}" pool.shutdown(wait=False)