diff --git a/src/io_pool.py b/src/io_pool.py new file mode 100644 index 00000000..51c56d15 --- /dev/null +++ b/src/io_pool.py @@ -0,0 +1,20 @@ +from concurrent.futures import ThreadPoolExecutor + + +IO_POOL_MAX_WORKERS: int = 4 +IO_POOL_THREAD_NAME_PREFIX: str = "controller-io" + + +def make_io_pool(max_workers: int = IO_POOL_MAX_WORKERS) -> ThreadPoolExecutor: + """Create the shared AppController I/O pool. + + 4 worker threads, named "controller-io-N". Used for warmup, log pruning, + disk-bound subsystem init, and any other background work that should + not spin up its own thread. + + Caller is responsible for shutdown (e.g. controller.shutdown()). + """ + return ThreadPoolExecutor( + max_workers=max_workers, + thread_name_prefix=IO_POOL_THREAD_NAME_PREFIX, + ) diff --git a/src/warmup.py b/src/warmup.py new file mode 100644 index 00000000..3730e911 --- /dev/null +++ b/src/warmup.py @@ -0,0 +1,132 @@ +"""WarmupManager: import heavy modules on a background thread pool. + +Per spec.md:2.2 Layer 3, the AppController's __init__ submits a warmup +job to the shared _io_pool for each heavy module (provider SDKs, +feature-gated GUI modules, etc.). After warmup completes, those modules +are in sys.modules and any function that calls _require_warmed(name) +gets an instant lookup instead of a multi-hundred-ms import. + +Public API on the manager (and exposed on AppController via delegation): + mgr.submit(modules) - start warmup jobs (call once at init) + mgr.status() - {pending, completed, failed} + mgr.is_done() - bool + mgr.wait(timeout) - block until done + mgr.on_complete(callback) - register completion callback + mgr.reset() - clear state (for re-warmup, e.g. in tests) +""" + +import importlib +import threading +from concurrent.futures import Future, ThreadPoolExecutor +from typing import Callable, Optional + + +CompletionCallback = Callable[[dict], None] + + +class WarmupManager: + def __init__(self, pool: ThreadPoolExecutor) -> None: + self._pool = pool + self._lock = threading.Lock() + self._done_event = threading.Event() + self._pending: list[str] = [] + self._completed: list[str] = [] + self._failed: list[str] = [] + self._callbacks: list[CompletionCallback] = [] + self._started = False + + def submit(self, modules: list[str]) -> None: + with self._lock: + if self._started: + raise RuntimeError("WarmupManager.submit() called twice; call reset() first") + self._pending = list(modules) + self._completed = [] + self._failed = [] + self._done_event.clear() + self._started = True + for name in modules: + self._pool.submit(self._warmup_one, name) + + def status(self) -> dict[str, list[str]]: + with self._lock: + return { + "pending": list(self._pending), + "completed": list(self._completed), + "failed": list(self._failed), + } + + def is_done(self) -> bool: + return self._done_event.is_set() + + def wait(self, timeout: Optional[float] = None) -> bool: + return self._done_event.wait(timeout=timeout) + + def on_complete(self, callback: CompletionCallback) -> None: + fire_now = False + with self._lock: + if self._done_event.is_set(): + fire_now = True + snap = self._snapshot() + else: + self._callbacks.append(callback) + if fire_now: + try: + callback(snap) + except Exception: + pass + + def reset(self) -> None: + with self._lock: + self._pending = [] + self._completed = [] + self._failed = [] + self._done_event.clear() + self._callbacks = [] + self._started = False + + def _warmup_one(self, name: str) -> None: + try: + importlib.import_module(name) + except BaseException as e: + self._record_failure(name, e) + else: + self._record_success(name) + + def _record_success(self, name: str) -> None: + callbacks: list[CompletionCallback] = [] + with self._lock: + if name in self._pending: + self._pending.remove(name) + self._completed.append(name) + done = self._started and not self._pending + if done: + self._done_event.set() + callbacks = list(self._callbacks) + for cb in callbacks: + try: + cb(self._snapshot()) + except Exception: + pass + + def _record_failure(self, name: str, _err: BaseException) -> None: + callbacks: list[CompletionCallback] = [] + with self._lock: + if name in self._pending: + self._pending.remove(name) + self._failed.append(name) + done = self._started and not self._pending + if done: + self._done_event.set() + callbacks = list(self._callbacks) + for cb in callbacks: + try: + cb(self._snapshot()) + except Exception: + pass + + def _snapshot(self) -> dict[str, list[str]]: + return { + "pending": list(self._pending), + "completed": list(self._completed), + "failed": list(self._failed), + } diff --git a/tests/test_io_pool.py b/tests/test_io_pool.py new file mode 100644 index 00000000..225c48cd --- /dev/null +++ b/tests/test_io_pool.py @@ -0,0 +1,52 @@ +"""Tests for src/io_pool.py (the shared 4-thread job pool on AppController).""" + +import threading +import time +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +import sys + +ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(ROOT)) + +from src.io_pool import make_io_pool, IO_POOL_MAX_WORKERS # noqa: E402 + + +def test_make_io_pool_returns_thread_pool_executor() -> None: + pool = make_io_pool() + assert isinstance(pool, ThreadPoolExecutor) + pool.shutdown(wait=False) + + +def test_make_io_pool_has_four_workers() -> None: + pool = make_io_pool() + assert pool._max_workers == IO_POOL_MAX_WORKERS == 4 + pool.shutdown(wait=False) + + +def test_make_io_pool_workers_named_controller_io() -> None: + pool = make_io_pool() + + def capture() -> str: + return threading.current_thread().name + + fut = pool.submit(capture) + name = fut.result(timeout=5) + assert name.startswith("controller-io"), f"got {name!r}" + pool.shutdown(wait=False) + + +def test_make_io_pool_runs_jobs_in_parallel() -> None: + pool = make_io_pool() + barrier = threading.Barrier(4) + results: list[float] = [] + + def wait_at_barrier() -> float: + t0 = time.perf_counter() + barrier.wait(timeout=5) + return time.perf_counter() - t0 + + futs = [pool.submit(wait_at_barrier) for _ in range(4)] + durations = [f.result(timeout=5) for f in futs] + assert all(d < 0.5 for d in durations), f"jobs did not run in parallel: {durations}" + pool.shutdown(wait=False) diff --git a/tests/test_warmup.py b/tests/test_warmup.py new file mode 100644 index 00000000..0f526832 --- /dev/null +++ b/tests/test_warmup.py @@ -0,0 +1,129 @@ +"""Tests for src/warmup.py (the WarmupManager class).""" + +import sys +import threading +import time +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path + +ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(ROOT)) + +from src.warmup import WarmupManager # noqa: E402 + + +def _make_pool() -> ThreadPoolExecutor: + return ThreadPoolExecutor(max_workers=2, thread_name_prefix="warmup-test") + + +def test_warmup_submits_one_job_per_module() -> None: + pool = _make_pool() + mgr = WarmupManager(pool) + mgr.submit(["json", "os", "sys"]) + time.sleep(0.5) + status = mgr.status() + assert len(status["pending"]) == 0 + assert set(status["completed"]) == {"json", "os", "sys"} + assert status["failed"] == [] + pool.shutdown(wait=False) + + +def test_warmup_status_pending_initially() -> None: + pool = _make_pool() + mgr = WarmupManager(pool) + mgr.submit(["json"]) + snap = mgr.status() + assert "pending" in snap + assert "completed" in snap + assert "failed" in snap + pool.shutdown(wait=False) + mgr.wait(timeout=2) + + +def test_warmup_status_reflects_failures() -> None: + pool = _make_pool() + mgr = WarmupManager(pool) + mgr.submit(["definitely_not_a_real_module_xyz123"]) + mgr.wait(timeout=5) + status = mgr.status() + assert "definitely_not_a_real_module_xyz123" in status["failed"] + assert status["completed"] == [] + pool.shutdown(wait=False) + + +def test_warmup_done_event_set_after_all_complete() -> None: + pool = _make_pool() + mgr = WarmupManager(pool) + mgr.submit(["os", "sys"]) + assert not mgr.is_done() + mgr.wait(timeout=5) + assert mgr.is_done() + pool.shutdown(wait=False) + + +def test_warmup_wait_blocks_until_done() -> None: + pool = _make_pool() + mgr = WarmupManager(pool) + mgr.submit(["json", "os"]) + completed = mgr.wait(timeout=10) + assert completed is True + pool.shutdown(wait=False) + + +def test_warmup_on_complete_callback_fires() -> None: + pool = _make_pool() + mgr = WarmupManager(pool) + received: list[dict] = [] + mgr.on_complete(lambda status: received.append(dict(status))) + mgr.submit(["json"]) + mgr.wait(timeout=5) + assert len(received) == 1 + assert "json" in received[0]["completed"] + pool.shutdown(wait=False) + + +def test_warmup_on_complete_callback_fires_immediately_if_already_done() -> None: + pool = _make_pool() + mgr = WarmupManager(pool) + mgr.submit(["json"]) + mgr.wait(timeout=5) + received: list[dict] = [] + mgr.on_complete(lambda status: received.append(dict(status))) + assert len(received) == 1 + pool.shutdown(wait=False) + + +def test_warmup_modules_actually_loaded_in_sys_modules() -> None: + pool = _make_pool() + mgr = WarmupManager(pool) + mgr.submit(["json", "os"]) + mgr.wait(timeout=5) + import json as _json + import os as _os + assert _json in sys.modules.values() + assert _os in sys.modules.values() + pool.shutdown(wait=False) + + +def test_warmup_reset_clears_state() -> None: + pool = _make_pool() + mgr = WarmupManager(pool) + mgr.submit(["json"]) + mgr.wait(timeout=5) + assert mgr.is_done() + mgr.reset() + assert not mgr.is_done() + assert mgr.status()["pending"] == [] + assert mgr.status()["completed"] == [] + pool.shutdown(wait=False) + + +def test_warmup_runs_jobs_concurrently_not_serially() -> None: + pool = _make_pool() + mgr = WarmupManager(pool) + mgr.submit(["json", "os", "sys", "re"]) + started = time.perf_counter() + mgr.wait(timeout=5) + elapsed = time.perf_counter() - started + assert elapsed < 1.0, f"warmup took {elapsed:.2f}s; expected concurrent execution" + pool.shutdown(wait=False)