feat(io_pool, warmup): add shared 4-thread pool + WarmupManager
Phase 2 Tasks T2.1-T2.4 of the startup_speedup_20260606 track.
NEW: src/io_pool.py
make_io_pool() factory: 4-worker ThreadPoolExecutor with
thread_name_prefix='controller-io'. The sanctioned way for any
background work. Replaces ad-hoc threading.Thread() calls per
the 'no new threads' rule.
NEW: src/warmup.py
WarmupManager: manages a list of modules to import on the shared
pool. Public API:
.submit(modules) - start warmup (call once)
.status() - {pending, completed, failed}
.is_done() - bool
.wait(timeout) - block until done
.on_complete(callback) - register completion callback
.reset() - clear state
Thread-safe (lock-guarded). 10 tests cover all paths.
NEW: tests/test_io_pool.py (4 tests):
- ThreadPoolExecutor returned
- 4 workers
- Threads named 'controller-io-*'
- Jobs run in parallel (barrier test)
NEW: tests/test_warmup.py (10 tests):
- One job per module submitted
- Initial pending list correct
- Failed imports tracked
- Done event set after all complete
- wait() blocks until done
- on_complete callback fires (and immediately if already done)
- Modules actually end up in sys.modules
- reset() clears state
- Jobs run concurrently (not serially)
All 14 tests pass. AppController integration is the next commit.
This commit is contained in:
@@ -0,0 +1,20 @@
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
|
||||
IO_POOL_MAX_WORKERS: int = 4
|
||||
IO_POOL_THREAD_NAME_PREFIX: str = "controller-io"
|
||||
|
||||
|
||||
def make_io_pool(max_workers: int = IO_POOL_MAX_WORKERS) -> ThreadPoolExecutor:
|
||||
"""Create the shared AppController I/O pool.
|
||||
|
||||
4 worker threads, named "controller-io-N". Used for warmup, log pruning,
|
||||
disk-bound subsystem init, and any other background work that should
|
||||
not spin up its own thread.
|
||||
|
||||
Caller is responsible for shutdown (e.g. controller.shutdown()).
|
||||
"""
|
||||
return ThreadPoolExecutor(
|
||||
max_workers=max_workers,
|
||||
thread_name_prefix=IO_POOL_THREAD_NAME_PREFIX,
|
||||
)
|
||||
+132
@@ -0,0 +1,132 @@
|
||||
"""WarmupManager: import heavy modules on a background thread pool.
|
||||
|
||||
Per spec.md:2.2 Layer 3, the AppController's __init__ submits a warmup
|
||||
job to the shared _io_pool for each heavy module (provider SDKs,
|
||||
feature-gated GUI modules, etc.). After warmup completes, those modules
|
||||
are in sys.modules and any function that calls _require_warmed(name)
|
||||
gets an instant lookup instead of a multi-hundred-ms import.
|
||||
|
||||
Public API on the manager (and exposed on AppController via delegation):
|
||||
mgr.submit(modules) - start warmup jobs (call once at init)
|
||||
mgr.status() - {pending, completed, failed}
|
||||
mgr.is_done() - bool
|
||||
mgr.wait(timeout) - block until done
|
||||
mgr.on_complete(callback) - register completion callback
|
||||
mgr.reset() - clear state (for re-warmup, e.g. in tests)
|
||||
"""
|
||||
|
||||
import importlib
|
||||
import threading
|
||||
from concurrent.futures import Future, ThreadPoolExecutor
|
||||
from typing import Callable, Optional
|
||||
|
||||
|
||||
CompletionCallback = Callable[[dict], None]
|
||||
|
||||
|
||||
class WarmupManager:
|
||||
def __init__(self, pool: ThreadPoolExecutor) -> None:
|
||||
self._pool = pool
|
||||
self._lock = threading.Lock()
|
||||
self._done_event = threading.Event()
|
||||
self._pending: list[str] = []
|
||||
self._completed: list[str] = []
|
||||
self._failed: list[str] = []
|
||||
self._callbacks: list[CompletionCallback] = []
|
||||
self._started = False
|
||||
|
||||
def submit(self, modules: list[str]) -> None:
|
||||
with self._lock:
|
||||
if self._started:
|
||||
raise RuntimeError("WarmupManager.submit() called twice; call reset() first")
|
||||
self._pending = list(modules)
|
||||
self._completed = []
|
||||
self._failed = []
|
||||
self._done_event.clear()
|
||||
self._started = True
|
||||
for name in modules:
|
||||
self._pool.submit(self._warmup_one, name)
|
||||
|
||||
def status(self) -> dict[str, list[str]]:
|
||||
with self._lock:
|
||||
return {
|
||||
"pending": list(self._pending),
|
||||
"completed": list(self._completed),
|
||||
"failed": list(self._failed),
|
||||
}
|
||||
|
||||
def is_done(self) -> bool:
|
||||
return self._done_event.is_set()
|
||||
|
||||
def wait(self, timeout: Optional[float] = None) -> bool:
|
||||
return self._done_event.wait(timeout=timeout)
|
||||
|
||||
def on_complete(self, callback: CompletionCallback) -> None:
|
||||
fire_now = False
|
||||
with self._lock:
|
||||
if self._done_event.is_set():
|
||||
fire_now = True
|
||||
snap = self._snapshot()
|
||||
else:
|
||||
self._callbacks.append(callback)
|
||||
if fire_now:
|
||||
try:
|
||||
callback(snap)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def reset(self) -> None:
|
||||
with self._lock:
|
||||
self._pending = []
|
||||
self._completed = []
|
||||
self._failed = []
|
||||
self._done_event.clear()
|
||||
self._callbacks = []
|
||||
self._started = False
|
||||
|
||||
def _warmup_one(self, name: str) -> None:
|
||||
try:
|
||||
importlib.import_module(name)
|
||||
except BaseException as e:
|
||||
self._record_failure(name, e)
|
||||
else:
|
||||
self._record_success(name)
|
||||
|
||||
def _record_success(self, name: str) -> None:
|
||||
callbacks: list[CompletionCallback] = []
|
||||
with self._lock:
|
||||
if name in self._pending:
|
||||
self._pending.remove(name)
|
||||
self._completed.append(name)
|
||||
done = self._started and not self._pending
|
||||
if done:
|
||||
self._done_event.set()
|
||||
callbacks = list(self._callbacks)
|
||||
for cb in callbacks:
|
||||
try:
|
||||
cb(self._snapshot())
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _record_failure(self, name: str, _err: BaseException) -> None:
|
||||
callbacks: list[CompletionCallback] = []
|
||||
with self._lock:
|
||||
if name in self._pending:
|
||||
self._pending.remove(name)
|
||||
self._failed.append(name)
|
||||
done = self._started and not self._pending
|
||||
if done:
|
||||
self._done_event.set()
|
||||
callbacks = list(self._callbacks)
|
||||
for cb in callbacks:
|
||||
try:
|
||||
cb(self._snapshot())
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _snapshot(self) -> dict[str, list[str]]:
|
||||
return {
|
||||
"pending": list(self._pending),
|
||||
"completed": list(self._completed),
|
||||
"failed": list(self._failed),
|
||||
}
|
||||
@@ -0,0 +1,52 @@
|
||||
"""Tests for src/io_pool.py (the shared 4-thread job pool on AppController)."""
|
||||
|
||||
import threading
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from pathlib import Path
|
||||
import sys
|
||||
|
||||
ROOT = Path(__file__).resolve().parent.parent
|
||||
sys.path.insert(0, str(ROOT))
|
||||
|
||||
from src.io_pool import make_io_pool, IO_POOL_MAX_WORKERS # noqa: E402
|
||||
|
||||
|
||||
def test_make_io_pool_returns_thread_pool_executor() -> None:
|
||||
pool = make_io_pool()
|
||||
assert isinstance(pool, ThreadPoolExecutor)
|
||||
pool.shutdown(wait=False)
|
||||
|
||||
|
||||
def test_make_io_pool_has_four_workers() -> None:
|
||||
pool = make_io_pool()
|
||||
assert pool._max_workers == IO_POOL_MAX_WORKERS == 4
|
||||
pool.shutdown(wait=False)
|
||||
|
||||
|
||||
def test_make_io_pool_workers_named_controller_io() -> None:
|
||||
pool = make_io_pool()
|
||||
|
||||
def capture() -> str:
|
||||
return threading.current_thread().name
|
||||
|
||||
fut = pool.submit(capture)
|
||||
name = fut.result(timeout=5)
|
||||
assert name.startswith("controller-io"), f"got {name!r}"
|
||||
pool.shutdown(wait=False)
|
||||
|
||||
|
||||
def test_make_io_pool_runs_jobs_in_parallel() -> None:
|
||||
pool = make_io_pool()
|
||||
barrier = threading.Barrier(4)
|
||||
results: list[float] = []
|
||||
|
||||
def wait_at_barrier() -> float:
|
||||
t0 = time.perf_counter()
|
||||
barrier.wait(timeout=5)
|
||||
return time.perf_counter() - t0
|
||||
|
||||
futs = [pool.submit(wait_at_barrier) for _ in range(4)]
|
||||
durations = [f.result(timeout=5) for f in futs]
|
||||
assert all(d < 0.5 for d in durations), f"jobs did not run in parallel: {durations}"
|
||||
pool.shutdown(wait=False)
|
||||
@@ -0,0 +1,129 @@
|
||||
"""Tests for src/warmup.py (the WarmupManager class)."""
|
||||
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from pathlib import Path
|
||||
|
||||
ROOT = Path(__file__).resolve().parent.parent
|
||||
sys.path.insert(0, str(ROOT))
|
||||
|
||||
from src.warmup import WarmupManager # noqa: E402
|
||||
|
||||
|
||||
def _make_pool() -> ThreadPoolExecutor:
|
||||
return ThreadPoolExecutor(max_workers=2, thread_name_prefix="warmup-test")
|
||||
|
||||
|
||||
def test_warmup_submits_one_job_per_module() -> None:
|
||||
pool = _make_pool()
|
||||
mgr = WarmupManager(pool)
|
||||
mgr.submit(["json", "os", "sys"])
|
||||
time.sleep(0.5)
|
||||
status = mgr.status()
|
||||
assert len(status["pending"]) == 0
|
||||
assert set(status["completed"]) == {"json", "os", "sys"}
|
||||
assert status["failed"] == []
|
||||
pool.shutdown(wait=False)
|
||||
|
||||
|
||||
def test_warmup_status_pending_initially() -> None:
|
||||
pool = _make_pool()
|
||||
mgr = WarmupManager(pool)
|
||||
mgr.submit(["json"])
|
||||
snap = mgr.status()
|
||||
assert "pending" in snap
|
||||
assert "completed" in snap
|
||||
assert "failed" in snap
|
||||
pool.shutdown(wait=False)
|
||||
mgr.wait(timeout=2)
|
||||
|
||||
|
||||
def test_warmup_status_reflects_failures() -> None:
|
||||
pool = _make_pool()
|
||||
mgr = WarmupManager(pool)
|
||||
mgr.submit(["definitely_not_a_real_module_xyz123"])
|
||||
mgr.wait(timeout=5)
|
||||
status = mgr.status()
|
||||
assert "definitely_not_a_real_module_xyz123" in status["failed"]
|
||||
assert status["completed"] == []
|
||||
pool.shutdown(wait=False)
|
||||
|
||||
|
||||
def test_warmup_done_event_set_after_all_complete() -> None:
|
||||
pool = _make_pool()
|
||||
mgr = WarmupManager(pool)
|
||||
mgr.submit(["os", "sys"])
|
||||
assert not mgr.is_done()
|
||||
mgr.wait(timeout=5)
|
||||
assert mgr.is_done()
|
||||
pool.shutdown(wait=False)
|
||||
|
||||
|
||||
def test_warmup_wait_blocks_until_done() -> None:
|
||||
pool = _make_pool()
|
||||
mgr = WarmupManager(pool)
|
||||
mgr.submit(["json", "os"])
|
||||
completed = mgr.wait(timeout=10)
|
||||
assert completed is True
|
||||
pool.shutdown(wait=False)
|
||||
|
||||
|
||||
def test_warmup_on_complete_callback_fires() -> None:
|
||||
pool = _make_pool()
|
||||
mgr = WarmupManager(pool)
|
||||
received: list[dict] = []
|
||||
mgr.on_complete(lambda status: received.append(dict(status)))
|
||||
mgr.submit(["json"])
|
||||
mgr.wait(timeout=5)
|
||||
assert len(received) == 1
|
||||
assert "json" in received[0]["completed"]
|
||||
pool.shutdown(wait=False)
|
||||
|
||||
|
||||
def test_warmup_on_complete_callback_fires_immediately_if_already_done() -> None:
|
||||
pool = _make_pool()
|
||||
mgr = WarmupManager(pool)
|
||||
mgr.submit(["json"])
|
||||
mgr.wait(timeout=5)
|
||||
received: list[dict] = []
|
||||
mgr.on_complete(lambda status: received.append(dict(status)))
|
||||
assert len(received) == 1
|
||||
pool.shutdown(wait=False)
|
||||
|
||||
|
||||
def test_warmup_modules_actually_loaded_in_sys_modules() -> None:
|
||||
pool = _make_pool()
|
||||
mgr = WarmupManager(pool)
|
||||
mgr.submit(["json", "os"])
|
||||
mgr.wait(timeout=5)
|
||||
import json as _json
|
||||
import os as _os
|
||||
assert _json in sys.modules.values()
|
||||
assert _os in sys.modules.values()
|
||||
pool.shutdown(wait=False)
|
||||
|
||||
|
||||
def test_warmup_reset_clears_state() -> None:
|
||||
pool = _make_pool()
|
||||
mgr = WarmupManager(pool)
|
||||
mgr.submit(["json"])
|
||||
mgr.wait(timeout=5)
|
||||
assert mgr.is_done()
|
||||
mgr.reset()
|
||||
assert not mgr.is_done()
|
||||
assert mgr.status()["pending"] == []
|
||||
assert mgr.status()["completed"] == []
|
||||
pool.shutdown(wait=False)
|
||||
|
||||
|
||||
def test_warmup_runs_jobs_concurrently_not_serially() -> None:
|
||||
pool = _make_pool()
|
||||
mgr = WarmupManager(pool)
|
||||
mgr.submit(["json", "os", "sys", "re"])
|
||||
started = time.perf_counter()
|
||||
mgr.wait(timeout=5)
|
||||
elapsed = time.perf_counter() - started
|
||||
assert elapsed < 1.0, f"warmup took {elapsed:.2f}s; expected concurrent execution"
|
||||
pool.shutdown(wait=False)
|
||||
Reference in New Issue
Block a user