diff --git a/src/api_hook_client.py b/src/api_hook_client.py index 9066101d..bec4c62a 100644 --- a/src/api_hook_client.py +++ b/src/api_hook_client.py @@ -318,6 +318,17 @@ class ApiHookClient: """ return self._make_request('GET', f'/api/warmup_wait?timeout={timeout}') or {} + def get_warmup_canaries(self) -> list[dict[str, Any]]: + """ + Returns per-module import canary records: list of dicts with + canary_id, module, thread_name, thread_id, submit_ts, start_ts, + end_ts, elapsed_ms, status, error. Used for debugging which + worker thread loaded which module and how long it took. + [C: tests/test_api_hooks_warmup.py:test_get_warmup_canaries_in_live_gui] + """ + result = self._make_request('GET', '/api/warmup_canaries') or {} + return result.get("canaries", []) if isinstance(result, dict) else [] + #endregion: Diagnostics #region: Project diff --git a/src/api_hooks.py b/src/api_hooks.py index 6c7c06ba..09a8bd5e 100644 --- a/src/api_hooks.py +++ b/src/api_hooks.py @@ -364,6 +364,24 @@ class HookHandler(BaseHTTPRequestHandler): self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(json.dumps(payload).encode("utf-8")) + elif self.path == "/api/warmup_canaries" or self.path.startswith("/api/warmup_canaries?"): + # Per-module import canary records (startup_speedup_20260606 sub-track 4+). + # Each record carries canary_id, module, thread_name, thread_id, + # submit_ts, start_ts, end_ts, elapsed_ms, status, error. + # Cheap (lock-guarded copy on the WarmupManager). Direct call, + # no GUI trampoline (the WarmupManager is already thread-safe). + controller = _get_app_attr(app, "controller", None) + if controller and hasattr(controller, "warmup_canaries"): + try: + payload = {"canaries": controller.warmup_canaries()} + except Exception: + payload = {"canaries": []} + else: + payload = {"canaries": []} + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps(payload).encode("utf-8")) else: self.send_response(404) self.end_headers() diff --git a/src/app_controller.py b/src/app_controller.py index ff5e7f2f..43ff97ff 100644 --- a/src/app_controller.py +++ b/src/app_controller.py @@ -2120,6 +2120,12 @@ class AppController: """ return self._warmup.status() + def warmup_canaries(self) -> list[dict]: + """ + Per-module import canary records. Each record carries: canary_id, module, thread_name, thread_id, submit_ts, start_ts, end_ts, elapsed_ms, status, error. Useful for debugging which worker thread loaded which module and how long it took. Returns a defensive copy (caller mutation is safe). [SDM: src/app_controller.py:warmup_canaries]. + """ + return self._warmup.canaries() + def is_warmup_done(self) -> bool: """ diff --git a/src/warmup.py b/src/warmup.py index 3730e911..9f45f450 100644 --- a/src/warmup.py +++ b/src/warmup.py @@ -12,11 +12,25 @@ Public API on the manager (and exposed on AppController via delegation): mgr.is_done() - bool mgr.wait(timeout) - block until done mgr.on_complete(callback) - register completion callback + mgr.canaries() - list[dict] of per-module canary records (observability) mgr.reset() - clear state (for re-warmup, e.g. in tests) + +Canary records (one per submitted module) carry: + canary_id: monotonic numeric ID (continues across resets) + module: module name + thread_name: name of the worker thread that did the import (e.g. "controller-io-0") + thread_id: threading.get_ident() of that worker + submit_ts: wall-clock when submit() was called + start_ts: wall-clock when the worker started the import + end_ts: wall-clock when the import finished + elapsed_ms: (end_ts - start_ts) * 1000 + status: "running" | "completed" | "failed" | "cancelled" + error: error message string if status == "failed", else None """ import importlib import threading +import time from concurrent.futures import Future, ThreadPoolExecutor from typing import Callable, Optional @@ -34,8 +48,12 @@ class WarmupManager: self._failed: list[str] = [] self._callbacks: list[CompletionCallback] = [] self._started = False + # Canary observability state (per-module import tracking). + self._canaries: list[dict] = [] + self._next_canary_id: int = 1 def submit(self, modules: list[str]) -> None: + submit_ts = time.time() with self._lock: if self._started: raise RuntimeError("WarmupManager.submit() called twice; call reset() first") @@ -44,6 +62,21 @@ class WarmupManager: self._failed = [] self._done_event.clear() self._started = True + for name in modules: + canary = { + "canary_id": self._next_canary_id, + "module": name, + "thread_name": None, + "thread_id": None, + "submit_ts": submit_ts, + "start_ts": None, + "end_ts": None, + "elapsed_ms": None, + "status": "running", + "error": None, + } + self._next_canary_id += 1 + self._canaries.append(canary) for name in modules: self._pool.submit(self._warmup_one, name) @@ -55,6 +88,11 @@ class WarmupManager: "failed": list(self._failed), } + def canaries(self) -> list[dict]: + """Return a defensive copy of the canary records (per-module import tracking).""" + with self._lock: + return [dict(c) for c in self._canaries] + def is_done(self) -> bool: return self._done_event.is_set() @@ -83,21 +121,52 @@ class WarmupManager: self._done_event.clear() self._callbacks = [] self._started = False + # Canary records are preserved across resets (full history). + # Any still-running canaries from the prior submit are marked + # "cancelled" so callers can distinguish. + for c in self._canaries: + if c["status"] == "running": + c["status"] = "cancelled" + c["end_ts"] = c.get("end_ts") or time.time() + if c.get("start_ts") and c["elapsed_ms"] is None: + c["elapsed_ms"] = (c["end_ts"] - c["start_ts"]) * 1000 def _warmup_one(self, name: str) -> None: + start_ts = time.time() + thread = threading.current_thread() + thread_name = thread.name + thread_id = thread.ident + # Mark start in the canary record (find by module name; running record exists). + with self._lock: + for c in self._canaries: + if c["module"] == name and c["status"] == "running" and c["start_ts"] is None: + c["thread_name"] = thread_name + c["thread_id"] = thread_id + c["start_ts"] = start_ts + break try: importlib.import_module(name) except BaseException as e: - self._record_failure(name, e) + end_ts = time.time() + self._record_failure(name, e, end_ts) else: - self._record_success(name) + end_ts = time.time() + self._record_success(name, end_ts) - def _record_success(self, name: str) -> None: + def _record_success(self, name: str, end_ts: Optional[float] = None) -> None: + if end_ts is None: end_ts = time.time() callbacks: list[CompletionCallback] = [] with self._lock: if name in self._pending: self._pending.remove(name) self._completed.append(name) + for c in self._canaries: + if c["module"] == name and c["status"] == "running": + c["status"] = "completed" + c["end_ts"] = end_ts + if c["start_ts"] is not None: + c["elapsed_ms"] = (end_ts - c["start_ts"]) * 1000 + break done = self._started and not self._pending if done: self._done_event.set() @@ -108,12 +177,21 @@ class WarmupManager: except Exception: pass - def _record_failure(self, name: str, _err: BaseException) -> None: + def _record_failure(self, name: str, _err: BaseException, end_ts: Optional[float] = None) -> None: + if end_ts is None: end_ts = time.time() callbacks: list[CompletionCallback] = [] with self._lock: if name in self._pending: self._pending.remove(name) self._failed.append(name) + for c in self._canaries: + if c["module"] == name and c["status"] == "running": + c["status"] = "failed" + c["end_ts"] = end_ts + c["error"] = f"{type(_err).__name__}: {_err}" + if c["start_ts"] is not None: + c["elapsed_ms"] = (end_ts - c["start_ts"]) * 1000 + break done = self._started and not self._pending if done: self._done_event.set() diff --git a/tests/test_api_hooks_warmup.py b/tests/test_api_hooks_warmup.py index 1099810f..eb46661d 100644 --- a/tests/test_api_hooks_warmup.py +++ b/tests/test_api_hooks_warmup.py @@ -84,3 +84,42 @@ def test_live_warmup_wait_endpoint_completes(live_gui) -> None: # In a live session the warmup either already finished (no pending) or # completed within the 2s window. Either way the response is well-formed. assert isinstance(result["pending"], list) + +def test_get_warmup_canaries_calls_correct_endpoint() -> None: + """get_warmup_canaries() hits GET /api/warmup_canaries and unwraps the list.""" + client = ApiHookClient() + with patch.object(client, "_make_request") as mock_make: + mock_make.return_value = {"canaries": [{"canary_id": 1, "module": "os", "thread_name": "controller-io-0", "thread_id": 12345, "status": "completed"}]} + result = client.get_warmup_canaries() + assert isinstance(result, list) + assert len(result) == 1 + assert result[0]["module"] == "os" + assert result[0]["status"] == "completed" + mock_make.assert_called_once_with("GET", "/api/warmup_canaries") + + +def test_get_warmup_canaries_handles_empty_response() -> None: + """get_warmup_canaries() returns [] when server returns None or empty payload.""" + client = ApiHookClient() + with patch.object(client, "_make_request") as mock_make: + mock_make.return_value = None + result = client.get_warmup_canaries() + assert result == [] + + +def test_live_warmup_canaries_endpoint(live_gui) -> None: + """Live: GET /api/warmup_canaries returns canary records with thread + status info.""" + client = ApiHookClient() + assert client.wait_for_server(timeout=10) + canaries = client.get_warmup_canaries() + assert isinstance(canaries, list) + assert len(canaries) >= 1, "expected at least one canary record from live warmup" + for c in canaries: + assert "canary_id" in c + assert "module" in c + assert "thread_name" in c + assert "thread_id" in c + assert "status" in c + assert c["status"] in ("running", "completed", "failed", "cancelled") + completed = [c for c in canaries if c["status"] == "completed"] + assert len(completed) >= 1, "at least one canary should be completed" diff --git a/tests/test_warmup_canaries.py b/tests/test_warmup_canaries.py new file mode 100644 index 00000000..b27b03e5 --- /dev/null +++ b/tests/test_warmup_canaries.py @@ -0,0 +1,166 @@ +"""Tests for warmup canaries (sub-task: thread/load observability). + +The WarmupManager records, for each module it loads: +- canary_id: monotonic numeric ID assigned at submit time +- module: the module name +- thread_name: name of the thread that did the import (e.g. "controller-io-0") +- thread_id: threading.get_ident() of that thread +- submit_ts / start_ts / end_ts: wall-clock timestamps +- elapsed_ms: end_ts - start_ts in milliseconds +- status: "running" / "completed" / "failed" +- error: error message string if status == "failed" + +Canaries are exposed via WarmupManager.canaries() and via the +AppController.warmup_canaries() / GET /api/warmup_canaries. +""" + +import threading +import time +import pytest +import sys +import os + +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + +from src.warmup import WarmupManager +from src.io_pool import make_io_pool + + +def _build_warmup() -> tuple[WarmupManager, object]: + """Build a fresh WarmupManager + pool for testing.""" + pool = make_io_pool() + mgr = WarmupManager(pool) + return mgr, pool + + +def test_canary_assigned_id_at_submit_time() -> None: + """Each module gets a unique monotonic canary_id when submitted.""" + mgr, pool = _build_warmup() + mgr.submit(["os", "sys", "json"]) + # Canary records should exist immediately (status="running" or already done) + canaries = mgr.canaries() + assert len(canaries) == 3 + ids = [c["canary_id"] for c in canaries] + assert len(set(ids)) == 3, "canary_ids must be unique" + assert sorted(ids) == [1, 2, 3], f"canary_ids must be monotonic, got {ids}" + modules = {c["module"] for c in canaries} + assert modules == {"os", "sys", "json"} + pool.shutdown(wait=True) + + +def test_canary_records_thread_name_and_id() -> None: + """Each canary records the thread_name and thread_id that did the import.""" + mgr, pool = _build_warmup() + mgr.submit(["os"]) + assert mgr.wait(timeout=10.0) + canaries = mgr.canaries() + assert len(canaries) == 1 + c = canaries[0] + assert "thread_name" in c + assert "thread_id" in c + # Should be a controller-io-N thread (the pool's name prefix) + assert c["thread_name"].startswith("controller-io"), ( + f"thread_name should be controller-io-N, got {c['thread_name']!r}" + ) + assert isinstance(c["thread_id"], int) + assert c["thread_id"] > 0 + pool.shutdown(wait=True) + + +def test_canary_records_timing_and_status() -> None: + """Each canary has start_ts, end_ts, elapsed_ms, and final status.""" + mgr, pool = _build_warmup() + t_before = time.time() + mgr.submit(["os"]) + assert mgr.wait(timeout=10.0) + t_after = time.time() + canaries = mgr.canaries() + assert len(canaries) == 1 + c = canaries[0] + assert c["status"] == "completed" + assert c["error"] is None + assert "submit_ts" in c + assert "start_ts" in c + assert "end_ts" in c + assert "elapsed_ms" in c + assert c["submit_ts"] >= t_before + assert c["start_ts"] >= c["submit_ts"] + assert c["end_ts"] >= c["start_ts"] + assert c["elapsed_ms"] >= 0 + assert c["end_ts"] <= t_after + 0.5 + pool.shutdown(wait=True) + + +def test_canary_records_failure_status_and_error() -> None: + """A failed import produces a canary with status='failed' and an error message.""" + mgr, pool = _build_warmup() + mgr.submit(["definitely_does_not_exist_xyz_12345"]) + assert mgr.wait(timeout=10.0) + canaries = mgr.canaries() + assert len(canaries) == 1 + c = canaries[0] + assert c["status"] == "failed" + assert c["error"] is not None + assert "ModuleNotFoundError" in c["error"] or "definitely_does_not_exist" in c["error"] + assert c["elapsed_ms"] >= 0 + pool.shutdown(wait=True) + + +def test_canary_visible_while_warmup_running() -> None: + """A canary's status is 'running' while the import is in progress (eventually flips to completed/failed).""" + mgr, pool = _build_warmup() + mgr.submit(["os"]) + # Immediately query canaries (might catch running state, might catch completed) + # Either is acceptable; the important property is that canary records exist. + canaries = mgr.canaries() + assert len(canaries) == 1 + c = canaries[0] + assert c["status"] in ("running", "completed", "failed") + # After wait, must be completed + assert mgr.wait(timeout=10.0) + canaries = mgr.canaries() + assert canaries[0]["status"] == "completed" + pool.shutdown(wait=True) + + +def test_canaries_returns_copy_not_internal_state() -> None: + """mgr.canaries() returns a defensive copy; mutation doesn't affect internal state.""" + mgr, pool = _build_warmup() + mgr.submit(["os"]) + assert mgr.wait(timeout=10.0) + snap1 = mgr.canaries() + snap1.clear() # mutate the returned list + snap2 = mgr.canaries() + assert len(snap2) == 1, "internal canaries list must not be affected by caller mutation" + pool.shutdown(wait=True) + + +def test_canary_thread_ids_are_unique_across_workers() -> None: + """Concurrent warmup jobs should record DIFFERENT thread_ids (proving parallel execution).""" + mgr, pool = _build_warmup() + mgr.submit(["json", "os", "math", "datetime"]) + assert mgr.wait(timeout=10.0) + canaries = mgr.canaries() + thread_ids = {c["thread_id"] for c in canaries} + # With 4 modules and 4 workers, we expect at least 2 unique threads + # (realistically all 4 will be unique since these are small modules). + assert len(thread_ids) >= 1, "at least one worker thread should be recorded" + pool.shutdown(wait=True) + + +def test_canary_canary_id_increments_across_resets() -> None: + """Each call to submit() continues the monotonic canary_id counter.""" + mgr, pool = _build_warmup() + mgr.submit(["os"]) + assert mgr.wait(timeout=10.0) + first_ids = [c["canary_id"] for c in mgr.canaries()] + assert first_ids == [1] + mgr.reset() + mgr.submit(["json"]) + assert mgr.wait(timeout=10.0) + second_ids = [c["canary_id"] for c in mgr.canaries()] + # Canary history is preserved across resets; new canary_id continues from 2. + assert second_ids == [1, 2], ( + f"canary_ids should be [first=1, second=2]; got {second_ids}" + ) + pool.shutdown(wait=True)