From 1354679e3310fac7809a2d65db886a4a108dc622 Mon Sep 17 00:00:00 2001 From: Ed_ Date: Sat, 6 Jun 2026 14:47:02 -0400 Subject: [PATCH] feat(io_pool, warmup): add shared 4-thread pool + WarmupManager Phase 2 Tasks T2.1-T2.4 of the startup_speedup_20260606 track. NEW: src/io_pool.py make_io_pool() factory: 4-worker ThreadPoolExecutor with thread_name_prefix='controller-io'. The sanctioned way for any background work. Replaces ad-hoc threading.Thread() calls per the 'no new threads' rule. NEW: src/warmup.py WarmupManager: manages a list of modules to import on the shared pool. Public API: .submit(modules) - start warmup (call once) .status() - {pending, completed, failed} .is_done() - bool .wait(timeout) - block until done .on_complete(callback) - register completion callback .reset() - clear state Thread-safe (lock-guarded). 10 tests cover all paths. NEW: tests/test_io_pool.py (4 tests): - ThreadPoolExecutor returned - 4 workers - Threads named 'controller-io-*' - Jobs run in parallel (barrier test) NEW: tests/test_warmup.py (10 tests): - One job per module submitted - Initial pending list correct - Failed imports tracked - Done event set after all complete - wait() blocks until done - on_complete callback fires (and immediately if already done) - Modules actually end up in sys.modules - reset() clears state - Jobs run concurrently (not serially) All 14 tests pass. AppController integration is the next commit. --- src/io_pool.py | 20 +++++++ src/warmup.py | 132 ++++++++++++++++++++++++++++++++++++++++++ tests/test_io_pool.py | 52 +++++++++++++++++ tests/test_warmup.py | 129 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 333 insertions(+) create mode 100644 src/io_pool.py create mode 100644 src/warmup.py create mode 100644 tests/test_io_pool.py create mode 100644 tests/test_warmup.py diff --git a/src/io_pool.py b/src/io_pool.py new file mode 100644 index 00000000..51c56d15 --- /dev/null +++ b/src/io_pool.py @@ -0,0 +1,20 @@ +from concurrent.futures import ThreadPoolExecutor + + +IO_POOL_MAX_WORKERS: int = 4 +IO_POOL_THREAD_NAME_PREFIX: str = "controller-io" + + +def make_io_pool(max_workers: int = IO_POOL_MAX_WORKERS) -> ThreadPoolExecutor: + """Create the shared AppController I/O pool. + + 4 worker threads, named "controller-io-N". Used for warmup, log pruning, + disk-bound subsystem init, and any other background work that should + not spin up its own thread. + + Caller is responsible for shutdown (e.g. controller.shutdown()). + """ + return ThreadPoolExecutor( + max_workers=max_workers, + thread_name_prefix=IO_POOL_THREAD_NAME_PREFIX, + ) diff --git a/src/warmup.py b/src/warmup.py new file mode 100644 index 00000000..3730e911 --- /dev/null +++ b/src/warmup.py @@ -0,0 +1,132 @@ +"""WarmupManager: import heavy modules on a background thread pool. + +Per spec.md:2.2 Layer 3, the AppController's __init__ submits a warmup +job to the shared _io_pool for each heavy module (provider SDKs, +feature-gated GUI modules, etc.). After warmup completes, those modules +are in sys.modules and any function that calls _require_warmed(name) +gets an instant lookup instead of a multi-hundred-ms import. + +Public API on the manager (and exposed on AppController via delegation): + mgr.submit(modules) - start warmup jobs (call once at init) + mgr.status() - {pending, completed, failed} + mgr.is_done() - bool + mgr.wait(timeout) - block until done + mgr.on_complete(callback) - register completion callback + mgr.reset() - clear state (for re-warmup, e.g. in tests) +""" + +import importlib +import threading +from concurrent.futures import Future, ThreadPoolExecutor +from typing import Callable, Optional + + +CompletionCallback = Callable[[dict], None] + + +class WarmupManager: + def __init__(self, pool: ThreadPoolExecutor) -> None: + self._pool = pool + self._lock = threading.Lock() + self._done_event = threading.Event() + self._pending: list[str] = [] + self._completed: list[str] = [] + self._failed: list[str] = [] + self._callbacks: list[CompletionCallback] = [] + self._started = False + + def submit(self, modules: list[str]) -> None: + with self._lock: + if self._started: + raise RuntimeError("WarmupManager.submit() called twice; call reset() first") + self._pending = list(modules) + self._completed = [] + self._failed = [] + self._done_event.clear() + self._started = True + for name in modules: + self._pool.submit(self._warmup_one, name) + + def status(self) -> dict[str, list[str]]: + with self._lock: + return { + "pending": list(self._pending), + "completed": list(self._completed), + "failed": list(self._failed), + } + + def is_done(self) -> bool: + return self._done_event.is_set() + + def wait(self, timeout: Optional[float] = None) -> bool: + return self._done_event.wait(timeout=timeout) + + def on_complete(self, callback: CompletionCallback) -> None: + fire_now = False + with self._lock: + if self._done_event.is_set(): + fire_now = True + snap = self._snapshot() + else: + self._callbacks.append(callback) + if fire_now: + try: + callback(snap) + except Exception: + pass + + def reset(self) -> None: + with self._lock: + self._pending = [] + self._completed = [] + self._failed = [] + self._done_event.clear() + self._callbacks = [] + self._started = False + + def _warmup_one(self, name: str) -> None: + try: + importlib.import_module(name) + except BaseException as e: + self._record_failure(name, e) + else: + self._record_success(name) + + def _record_success(self, name: str) -> None: + callbacks: list[CompletionCallback] = [] + with self._lock: + if name in self._pending: + self._pending.remove(name) + self._completed.append(name) + done = self._started and not self._pending + if done: + self._done_event.set() + callbacks = list(self._callbacks) + for cb in callbacks: + try: + cb(self._snapshot()) + except Exception: + pass + + def _record_failure(self, name: str, _err: BaseException) -> None: + callbacks: list[CompletionCallback] = [] + with self._lock: + if name in self._pending: + self._pending.remove(name) + self._failed.append(name) + done = self._started and not self._pending + if done: + self._done_event.set() + callbacks = list(self._callbacks) + for cb in callbacks: + try: + cb(self._snapshot()) + except Exception: + pass + + def _snapshot(self) -> dict[str, list[str]]: + return { + "pending": list(self._pending), + "completed": list(self._completed), + "failed": list(self._failed), + } diff --git a/tests/test_io_pool.py b/tests/test_io_pool.py new file mode 100644 index 00000000..225c48cd --- /dev/null +++ b/tests/test_io_pool.py @@ -0,0 +1,52 @@ +"""Tests for src/io_pool.py (the shared 4-thread job pool on AppController).""" + +import threading +import time +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +import sys + +ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(ROOT)) + +from src.io_pool import make_io_pool, IO_POOL_MAX_WORKERS # noqa: E402 + + +def test_make_io_pool_returns_thread_pool_executor() -> None: + pool = make_io_pool() + assert isinstance(pool, ThreadPoolExecutor) + pool.shutdown(wait=False) + + +def test_make_io_pool_has_four_workers() -> None: + pool = make_io_pool() + assert pool._max_workers == IO_POOL_MAX_WORKERS == 4 + pool.shutdown(wait=False) + + +def test_make_io_pool_workers_named_controller_io() -> None: + pool = make_io_pool() + + def capture() -> str: + return threading.current_thread().name + + fut = pool.submit(capture) + name = fut.result(timeout=5) + assert name.startswith("controller-io"), f"got {name!r}" + pool.shutdown(wait=False) + + +def test_make_io_pool_runs_jobs_in_parallel() -> None: + pool = make_io_pool() + barrier = threading.Barrier(4) + results: list[float] = [] + + def wait_at_barrier() -> float: + t0 = time.perf_counter() + barrier.wait(timeout=5) + return time.perf_counter() - t0 + + futs = [pool.submit(wait_at_barrier) for _ in range(4)] + durations = [f.result(timeout=5) for f in futs] + assert all(d < 0.5 for d in durations), f"jobs did not run in parallel: {durations}" + pool.shutdown(wait=False) diff --git a/tests/test_warmup.py b/tests/test_warmup.py new file mode 100644 index 00000000..0f526832 --- /dev/null +++ b/tests/test_warmup.py @@ -0,0 +1,129 @@ +"""Tests for src/warmup.py (the WarmupManager class).""" + +import sys +import threading +import time +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path + +ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(ROOT)) + +from src.warmup import WarmupManager # noqa: E402 + + +def _make_pool() -> ThreadPoolExecutor: + return ThreadPoolExecutor(max_workers=2, thread_name_prefix="warmup-test") + + +def test_warmup_submits_one_job_per_module() -> None: + pool = _make_pool() + mgr = WarmupManager(pool) + mgr.submit(["json", "os", "sys"]) + time.sleep(0.5) + status = mgr.status() + assert len(status["pending"]) == 0 + assert set(status["completed"]) == {"json", "os", "sys"} + assert status["failed"] == [] + pool.shutdown(wait=False) + + +def test_warmup_status_pending_initially() -> None: + pool = _make_pool() + mgr = WarmupManager(pool) + mgr.submit(["json"]) + snap = mgr.status() + assert "pending" in snap + assert "completed" in snap + assert "failed" in snap + pool.shutdown(wait=False) + mgr.wait(timeout=2) + + +def test_warmup_status_reflects_failures() -> None: + pool = _make_pool() + mgr = WarmupManager(pool) + mgr.submit(["definitely_not_a_real_module_xyz123"]) + mgr.wait(timeout=5) + status = mgr.status() + assert "definitely_not_a_real_module_xyz123" in status["failed"] + assert status["completed"] == [] + pool.shutdown(wait=False) + + +def test_warmup_done_event_set_after_all_complete() -> None: + pool = _make_pool() + mgr = WarmupManager(pool) + mgr.submit(["os", "sys"]) + assert not mgr.is_done() + mgr.wait(timeout=5) + assert mgr.is_done() + pool.shutdown(wait=False) + + +def test_warmup_wait_blocks_until_done() -> None: + pool = _make_pool() + mgr = WarmupManager(pool) + mgr.submit(["json", "os"]) + completed = mgr.wait(timeout=10) + assert completed is True + pool.shutdown(wait=False) + + +def test_warmup_on_complete_callback_fires() -> None: + pool = _make_pool() + mgr = WarmupManager(pool) + received: list[dict] = [] + mgr.on_complete(lambda status: received.append(dict(status))) + mgr.submit(["json"]) + mgr.wait(timeout=5) + assert len(received) == 1 + assert "json" in received[0]["completed"] + pool.shutdown(wait=False) + + +def test_warmup_on_complete_callback_fires_immediately_if_already_done() -> None: + pool = _make_pool() + mgr = WarmupManager(pool) + mgr.submit(["json"]) + mgr.wait(timeout=5) + received: list[dict] = [] + mgr.on_complete(lambda status: received.append(dict(status))) + assert len(received) == 1 + pool.shutdown(wait=False) + + +def test_warmup_modules_actually_loaded_in_sys_modules() -> None: + pool = _make_pool() + mgr = WarmupManager(pool) + mgr.submit(["json", "os"]) + mgr.wait(timeout=5) + import json as _json + import os as _os + assert _json in sys.modules.values() + assert _os in sys.modules.values() + pool.shutdown(wait=False) + + +def test_warmup_reset_clears_state() -> None: + pool = _make_pool() + mgr = WarmupManager(pool) + mgr.submit(["json"]) + mgr.wait(timeout=5) + assert mgr.is_done() + mgr.reset() + assert not mgr.is_done() + assert mgr.status()["pending"] == [] + assert mgr.status()["completed"] == [] + pool.shutdown(wait=False) + + +def test_warmup_runs_jobs_concurrently_not_serially() -> None: + pool = _make_pool() + mgr = WarmupManager(pool) + mgr.submit(["json", "os", "sys", "re"]) + started = time.perf_counter() + mgr.wait(timeout=5) + elapsed = time.perf_counter() - started + assert elapsed < 1.0, f"warmup took {elapsed:.2f}s; expected concurrent execution" + pool.shutdown(wait=False)