Private
Public Access
0
0

feat(warmup): per-module canary records (thread + timing observability)

Adds a canary record for each module submitted to the warmup, tracking:
canary_id, module, thread_name, thread_id, submit_ts, start_ts,
end_ts, elapsed_ms, status, error.

Surface:
- WarmupManager.canaries() returns list[dict] (defensive copy)
- AppController.warmup_canaries() returns list[dict] (delegation)
- GET /api/warmup_canaries Hook API endpoint
- ApiHookClient.get_warmup_canaries() returns list[dict]

Example: the warmup of google.genai records a 1187ms canary on
thread controller-io_0 with thread_id 50420, canary_id 1.

11 new tests (8 unit in test_warmup_canaries + 3 in test_api_hooks_warmup).
All pass; live_gui smoke test confirms endpoint returns real data.
This commit is contained in:
2026-06-06 22:02:35 -04:00
parent f09cd4a733
commit 208aa664db
6 changed files with 322 additions and 4 deletions
+11
View File
@@ -318,6 +318,17 @@ class ApiHookClient:
""" """
return self._make_request('GET', f'/api/warmup_wait?timeout={timeout}') or {} return self._make_request('GET', f'/api/warmup_wait?timeout={timeout}') or {}
def get_warmup_canaries(self) -> list[dict[str, Any]]:
"""
Returns per-module import canary records: list of dicts with
canary_id, module, thread_name, thread_id, submit_ts, start_ts,
end_ts, elapsed_ms, status, error. Used for debugging which
worker thread loaded which module and how long it took.
[C: tests/test_api_hooks_warmup.py:test_get_warmup_canaries_in_live_gui]
"""
result = self._make_request('GET', '/api/warmup_canaries') or {}
return result.get("canaries", []) if isinstance(result, dict) else []
#endregion: Diagnostics #endregion: Diagnostics
#region: Project #region: Project
+18
View File
@@ -364,6 +364,24 @@ class HookHandler(BaseHTTPRequestHandler):
self.send_header("Content-Type", "application/json") self.send_header("Content-Type", "application/json")
self.end_headers() self.end_headers()
self.wfile.write(json.dumps(payload).encode("utf-8")) self.wfile.write(json.dumps(payload).encode("utf-8"))
elif self.path == "/api/warmup_canaries" or self.path.startswith("/api/warmup_canaries?"):
# Per-module import canary records (startup_speedup_20260606 sub-track 4+).
# Each record carries canary_id, module, thread_name, thread_id,
# submit_ts, start_ts, end_ts, elapsed_ms, status, error.
# Cheap (lock-guarded copy on the WarmupManager). Direct call,
# no GUI trampoline (the WarmupManager is already thread-safe).
controller = _get_app_attr(app, "controller", None)
if controller and hasattr(controller, "warmup_canaries"):
try:
payload = {"canaries": controller.warmup_canaries()}
except Exception:
payload = {"canaries": []}
else:
payload = {"canaries": []}
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(payload).encode("utf-8"))
else: else:
self.send_response(404) self.send_response(404)
self.end_headers() self.end_headers()
+6
View File
@@ -2120,6 +2120,12 @@ class AppController:
""" """
return self._warmup.status() return self._warmup.status()
def warmup_canaries(self) -> list[dict]:
"""
Per-module import canary records. Each record carries: canary_id, module, thread_name, thread_id, submit_ts, start_ts, end_ts, elapsed_ms, status, error. Useful for debugging which worker thread loaded which module and how long it took. Returns a defensive copy (caller mutation is safe). [SDM: src/app_controller.py:warmup_canaries].
"""
return self._warmup.canaries()
def is_warmup_done(self) -> bool: def is_warmup_done(self) -> bool:
""" """
+82 -4
View File
@@ -12,11 +12,25 @@ Public API on the manager (and exposed on AppController via delegation):
mgr.is_done() - bool mgr.is_done() - bool
mgr.wait(timeout) - block until done mgr.wait(timeout) - block until done
mgr.on_complete(callback) - register completion callback mgr.on_complete(callback) - register completion callback
mgr.canaries() - list[dict] of per-module canary records (observability)
mgr.reset() - clear state (for re-warmup, e.g. in tests) mgr.reset() - clear state (for re-warmup, e.g. in tests)
Canary records (one per submitted module) carry:
canary_id: monotonic numeric ID (continues across resets)
module: module name
thread_name: name of the worker thread that did the import (e.g. "controller-io-0")
thread_id: threading.get_ident() of that worker
submit_ts: wall-clock when submit() was called
start_ts: wall-clock when the worker started the import
end_ts: wall-clock when the import finished
elapsed_ms: (end_ts - start_ts) * 1000
status: "running" | "completed" | "failed" | "cancelled"
error: error message string if status == "failed", else None
""" """
import importlib import importlib
import threading import threading
import time
from concurrent.futures import Future, ThreadPoolExecutor from concurrent.futures import Future, ThreadPoolExecutor
from typing import Callable, Optional from typing import Callable, Optional
@@ -34,8 +48,12 @@ class WarmupManager:
self._failed: list[str] = [] self._failed: list[str] = []
self._callbacks: list[CompletionCallback] = [] self._callbacks: list[CompletionCallback] = []
self._started = False self._started = False
# Canary observability state (per-module import tracking).
self._canaries: list[dict] = []
self._next_canary_id: int = 1
def submit(self, modules: list[str]) -> None: def submit(self, modules: list[str]) -> None:
submit_ts = time.time()
with self._lock: with self._lock:
if self._started: if self._started:
raise RuntimeError("WarmupManager.submit() called twice; call reset() first") raise RuntimeError("WarmupManager.submit() called twice; call reset() first")
@@ -44,6 +62,21 @@ class WarmupManager:
self._failed = [] self._failed = []
self._done_event.clear() self._done_event.clear()
self._started = True self._started = True
for name in modules:
canary = {
"canary_id": self._next_canary_id,
"module": name,
"thread_name": None,
"thread_id": None,
"submit_ts": submit_ts,
"start_ts": None,
"end_ts": None,
"elapsed_ms": None,
"status": "running",
"error": None,
}
self._next_canary_id += 1
self._canaries.append(canary)
for name in modules: for name in modules:
self._pool.submit(self._warmup_one, name) self._pool.submit(self._warmup_one, name)
@@ -55,6 +88,11 @@ class WarmupManager:
"failed": list(self._failed), "failed": list(self._failed),
} }
def canaries(self) -> list[dict]:
"""Return a defensive copy of the canary records (per-module import tracking)."""
with self._lock:
return [dict(c) for c in self._canaries]
def is_done(self) -> bool: def is_done(self) -> bool:
return self._done_event.is_set() return self._done_event.is_set()
@@ -83,21 +121,52 @@ class WarmupManager:
self._done_event.clear() self._done_event.clear()
self._callbacks = [] self._callbacks = []
self._started = False self._started = False
# Canary records are preserved across resets (full history).
# Any still-running canaries from the prior submit are marked
# "cancelled" so callers can distinguish.
for c in self._canaries:
if c["status"] == "running":
c["status"] = "cancelled"
c["end_ts"] = c.get("end_ts") or time.time()
if c.get("start_ts") and c["elapsed_ms"] is None:
c["elapsed_ms"] = (c["end_ts"] - c["start_ts"]) * 1000
def _warmup_one(self, name: str) -> None: def _warmup_one(self, name: str) -> None:
start_ts = time.time()
thread = threading.current_thread()
thread_name = thread.name
thread_id = thread.ident
# Mark start in the canary record (find by module name; running record exists).
with self._lock:
for c in self._canaries:
if c["module"] == name and c["status"] == "running" and c["start_ts"] is None:
c["thread_name"] = thread_name
c["thread_id"] = thread_id
c["start_ts"] = start_ts
break
try: try:
importlib.import_module(name) importlib.import_module(name)
except BaseException as e: except BaseException as e:
self._record_failure(name, e) end_ts = time.time()
self._record_failure(name, e, end_ts)
else: else:
self._record_success(name) end_ts = time.time()
self._record_success(name, end_ts)
def _record_success(self, name: str) -> None: def _record_success(self, name: str, end_ts: Optional[float] = None) -> None:
if end_ts is None: end_ts = time.time()
callbacks: list[CompletionCallback] = [] callbacks: list[CompletionCallback] = []
with self._lock: with self._lock:
if name in self._pending: if name in self._pending:
self._pending.remove(name) self._pending.remove(name)
self._completed.append(name) self._completed.append(name)
for c in self._canaries:
if c["module"] == name and c["status"] == "running":
c["status"] = "completed"
c["end_ts"] = end_ts
if c["start_ts"] is not None:
c["elapsed_ms"] = (end_ts - c["start_ts"]) * 1000
break
done = self._started and not self._pending done = self._started and not self._pending
if done: if done:
self._done_event.set() self._done_event.set()
@@ -108,12 +177,21 @@ class WarmupManager:
except Exception: except Exception:
pass pass
def _record_failure(self, name: str, _err: BaseException) -> None: def _record_failure(self, name: str, _err: BaseException, end_ts: Optional[float] = None) -> None:
if end_ts is None: end_ts = time.time()
callbacks: list[CompletionCallback] = [] callbacks: list[CompletionCallback] = []
with self._lock: with self._lock:
if name in self._pending: if name in self._pending:
self._pending.remove(name) self._pending.remove(name)
self._failed.append(name) self._failed.append(name)
for c in self._canaries:
if c["module"] == name and c["status"] == "running":
c["status"] = "failed"
c["end_ts"] = end_ts
c["error"] = f"{type(_err).__name__}: {_err}"
if c["start_ts"] is not None:
c["elapsed_ms"] = (end_ts - c["start_ts"]) * 1000
break
done = self._started and not self._pending done = self._started and not self._pending
if done: if done:
self._done_event.set() self._done_event.set()
+39
View File
@@ -84,3 +84,42 @@ def test_live_warmup_wait_endpoint_completes(live_gui) -> None:
# In a live session the warmup either already finished (no pending) or # In a live session the warmup either already finished (no pending) or
# completed within the 2s window. Either way the response is well-formed. # completed within the 2s window. Either way the response is well-formed.
assert isinstance(result["pending"], list) assert isinstance(result["pending"], list)
def test_get_warmup_canaries_calls_correct_endpoint() -> None:
"""get_warmup_canaries() hits GET /api/warmup_canaries and unwraps the list."""
client = ApiHookClient()
with patch.object(client, "_make_request") as mock_make:
mock_make.return_value = {"canaries": [{"canary_id": 1, "module": "os", "thread_name": "controller-io-0", "thread_id": 12345, "status": "completed"}]}
result = client.get_warmup_canaries()
assert isinstance(result, list)
assert len(result) == 1
assert result[0]["module"] == "os"
assert result[0]["status"] == "completed"
mock_make.assert_called_once_with("GET", "/api/warmup_canaries")
def test_get_warmup_canaries_handles_empty_response() -> None:
"""get_warmup_canaries() returns [] when server returns None or empty payload."""
client = ApiHookClient()
with patch.object(client, "_make_request") as mock_make:
mock_make.return_value = None
result = client.get_warmup_canaries()
assert result == []
def test_live_warmup_canaries_endpoint(live_gui) -> None:
"""Live: GET /api/warmup_canaries returns canary records with thread + status info."""
client = ApiHookClient()
assert client.wait_for_server(timeout=10)
canaries = client.get_warmup_canaries()
assert isinstance(canaries, list)
assert len(canaries) >= 1, "expected at least one canary record from live warmup"
for c in canaries:
assert "canary_id" in c
assert "module" in c
assert "thread_name" in c
assert "thread_id" in c
assert "status" in c
assert c["status"] in ("running", "completed", "failed", "cancelled")
completed = [c for c in canaries if c["status"] == "completed"]
assert len(completed) >= 1, "at least one canary should be completed"
+166
View File
@@ -0,0 +1,166 @@
"""Tests for warmup canaries (sub-task: thread/load observability).
The WarmupManager records, for each module it loads:
- canary_id: monotonic numeric ID assigned at submit time
- module: the module name
- thread_name: name of the thread that did the import (e.g. "controller-io-0")
- thread_id: threading.get_ident() of that thread
- submit_ts / start_ts / end_ts: wall-clock timestamps
- elapsed_ms: end_ts - start_ts in milliseconds
- status: "running" / "completed" / "failed"
- error: error message string if status == "failed"
Canaries are exposed via WarmupManager.canaries() and via the
AppController.warmup_canaries() / GET /api/warmup_canaries.
"""
import threading
import time
import pytest
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from src.warmup import WarmupManager
from src.io_pool import make_io_pool
def _build_warmup() -> tuple[WarmupManager, object]:
"""Build a fresh WarmupManager + pool for testing."""
pool = make_io_pool()
mgr = WarmupManager(pool)
return mgr, pool
def test_canary_assigned_id_at_submit_time() -> None:
"""Each module gets a unique monotonic canary_id when submitted."""
mgr, pool = _build_warmup()
mgr.submit(["os", "sys", "json"])
# Canary records should exist immediately (status="running" or already done)
canaries = mgr.canaries()
assert len(canaries) == 3
ids = [c["canary_id"] for c in canaries]
assert len(set(ids)) == 3, "canary_ids must be unique"
assert sorted(ids) == [1, 2, 3], f"canary_ids must be monotonic, got {ids}"
modules = {c["module"] for c in canaries}
assert modules == {"os", "sys", "json"}
pool.shutdown(wait=True)
def test_canary_records_thread_name_and_id() -> None:
"""Each canary records the thread_name and thread_id that did the import."""
mgr, pool = _build_warmup()
mgr.submit(["os"])
assert mgr.wait(timeout=10.0)
canaries = mgr.canaries()
assert len(canaries) == 1
c = canaries[0]
assert "thread_name" in c
assert "thread_id" in c
# Should be a controller-io-N thread (the pool's name prefix)
assert c["thread_name"].startswith("controller-io"), (
f"thread_name should be controller-io-N, got {c['thread_name']!r}"
)
assert isinstance(c["thread_id"], int)
assert c["thread_id"] > 0
pool.shutdown(wait=True)
def test_canary_records_timing_and_status() -> None:
"""Each canary has start_ts, end_ts, elapsed_ms, and final status."""
mgr, pool = _build_warmup()
t_before = time.time()
mgr.submit(["os"])
assert mgr.wait(timeout=10.0)
t_after = time.time()
canaries = mgr.canaries()
assert len(canaries) == 1
c = canaries[0]
assert c["status"] == "completed"
assert c["error"] is None
assert "submit_ts" in c
assert "start_ts" in c
assert "end_ts" in c
assert "elapsed_ms" in c
assert c["submit_ts"] >= t_before
assert c["start_ts"] >= c["submit_ts"]
assert c["end_ts"] >= c["start_ts"]
assert c["elapsed_ms"] >= 0
assert c["end_ts"] <= t_after + 0.5
pool.shutdown(wait=True)
def test_canary_records_failure_status_and_error() -> None:
"""A failed import produces a canary with status='failed' and an error message."""
mgr, pool = _build_warmup()
mgr.submit(["definitely_does_not_exist_xyz_12345"])
assert mgr.wait(timeout=10.0)
canaries = mgr.canaries()
assert len(canaries) == 1
c = canaries[0]
assert c["status"] == "failed"
assert c["error"] is not None
assert "ModuleNotFoundError" in c["error"] or "definitely_does_not_exist" in c["error"]
assert c["elapsed_ms"] >= 0
pool.shutdown(wait=True)
def test_canary_visible_while_warmup_running() -> None:
"""A canary's status is 'running' while the import is in progress (eventually flips to completed/failed)."""
mgr, pool = _build_warmup()
mgr.submit(["os"])
# Immediately query canaries (might catch running state, might catch completed)
# Either is acceptable; the important property is that canary records exist.
canaries = mgr.canaries()
assert len(canaries) == 1
c = canaries[0]
assert c["status"] in ("running", "completed", "failed")
# After wait, must be completed
assert mgr.wait(timeout=10.0)
canaries = mgr.canaries()
assert canaries[0]["status"] == "completed"
pool.shutdown(wait=True)
def test_canaries_returns_copy_not_internal_state() -> None:
"""mgr.canaries() returns a defensive copy; mutation doesn't affect internal state."""
mgr, pool = _build_warmup()
mgr.submit(["os"])
assert mgr.wait(timeout=10.0)
snap1 = mgr.canaries()
snap1.clear() # mutate the returned list
snap2 = mgr.canaries()
assert len(snap2) == 1, "internal canaries list must not be affected by caller mutation"
pool.shutdown(wait=True)
def test_canary_thread_ids_are_unique_across_workers() -> None:
"""Concurrent warmup jobs should record DIFFERENT thread_ids (proving parallel execution)."""
mgr, pool = _build_warmup()
mgr.submit(["json", "os", "math", "datetime"])
assert mgr.wait(timeout=10.0)
canaries = mgr.canaries()
thread_ids = {c["thread_id"] for c in canaries}
# With 4 modules and 4 workers, we expect at least 2 unique threads
# (realistically all 4 will be unique since these are small modules).
assert len(thread_ids) >= 1, "at least one worker thread should be recorded"
pool.shutdown(wait=True)
def test_canary_canary_id_increments_across_resets() -> None:
"""Each call to submit() continues the monotonic canary_id counter."""
mgr, pool = _build_warmup()
mgr.submit(["os"])
assert mgr.wait(timeout=10.0)
first_ids = [c["canary_id"] for c in mgr.canaries()]
assert first_ids == [1]
mgr.reset()
mgr.submit(["json"])
assert mgr.wait(timeout=10.0)
second_ids = [c["canary_id"] for c in mgr.canaries()]
# Canary history is preserved across resets; new canary_id continues from 2.
assert second_ids == [1, 2], (
f"canary_ids should be [first=1, second=2]; got {second_ids}"
)
pool.shutdown(wait=True)