"""Tests for src/warmup.py (the WarmupManager class).""" import sys import threading import time from concurrent.futures import ThreadPoolExecutor from pathlib import Path ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(ROOT)) from src.warmup import WarmupManager # noqa: E402 def _make_pool() -> ThreadPoolExecutor: return ThreadPoolExecutor(max_workers=2, thread_name_prefix="warmup-test") def test_warmup_submits_one_job_per_module() -> None: pool = _make_pool() mgr = WarmupManager(pool) mgr.submit(["json", "os", "sys"]) time.sleep(0.5) status = mgr.status() assert len(status["pending"]) == 0 assert set(status["completed"]) == {"json", "os", "sys"} assert status["failed"] == [] pool.shutdown(wait=False) def test_warmup_status_pending_initially() -> None: pool = _make_pool() mgr = WarmupManager(pool) mgr.submit(["json"]) snap = mgr.status() assert "pending" in snap assert "completed" in snap assert "failed" in snap pool.shutdown(wait=False) mgr.wait(timeout=2) def test_warmup_status_reflects_failures() -> None: pool = _make_pool() mgr = WarmupManager(pool) mgr.submit(["definitely_not_a_real_module_xyz123"]) mgr.wait(timeout=5) status = mgr.status() assert "definitely_not_a_real_module_xyz123" in status["failed"] assert status["completed"] == [] pool.shutdown(wait=False) def test_warmup_done_event_set_after_all_complete() -> None: pool = _make_pool() mgr = WarmupManager(pool) mgr.submit(["os", "sys"]) assert not mgr.is_done() mgr.wait(timeout=5) assert mgr.is_done() pool.shutdown(wait=False) def test_warmup_wait_blocks_until_done() -> None: pool = _make_pool() mgr = WarmupManager(pool) mgr.submit(["json", "os"]) completed = mgr.wait(timeout=10) assert completed is True pool.shutdown(wait=False) def test_warmup_on_complete_callback_fires() -> None: pool = _make_pool() mgr = WarmupManager(pool) received: list[dict] = [] mgr.on_complete(lambda status: received.append(dict(status))) mgr.submit(["json"]) mgr.wait(timeout=5) assert len(received) == 1 assert "json" in received[0]["completed"] pool.shutdown(wait=False) def test_warmup_on_complete_callback_fires_immediately_if_already_done() -> None: pool = _make_pool() mgr = WarmupManager(pool) mgr.submit(["json"]) mgr.wait(timeout=5) received: list[dict] = [] mgr.on_complete(lambda status: received.append(dict(status))) assert len(received) == 1 pool.shutdown(wait=False) def test_warmup_modules_actually_loaded_in_sys_modules() -> None: pool = _make_pool() mgr = WarmupManager(pool) mgr.submit(["json", "os"]) mgr.wait(timeout=5) import json as _json import os as _os assert _json in sys.modules.values() assert _os in sys.modules.values() pool.shutdown(wait=False) def test_warmup_reset_clears_state() -> None: pool = _make_pool() mgr = WarmupManager(pool) mgr.submit(["json"]) mgr.wait(timeout=5) assert mgr.is_done() mgr.reset() assert not mgr.is_done() assert mgr.status()["pending"] == [] assert mgr.status()["completed"] == [] pool.shutdown(wait=False) def test_warmup_runs_jobs_concurrently_not_serially() -> None: pool = _make_pool() mgr = WarmupManager(pool) mgr.submit(["json", "os", "sys", "re"]) started = time.perf_counter() mgr.wait(timeout=5) elapsed = time.perf_counter() - started assert elapsed < 1.0, f"warmup took {elapsed:.2f}s; expected concurrent execution" pool.shutdown(wait=False)