Private
Public Access
0
0
Files
manual_slop/tests/test_warmup_canaries.py
T
ed 152605f5dc feat(warmup): log canaries to stderr by default (with main-thread violation warning)
Per module: prints a one-line summary to stderr when the import
completes or fails:
  [warmup 1] google.genai on controller-io_0 (id=18636): 1218.6ms
  [warmup 2] anthropic on controller-io_1 (id=5500): 1148.3ms
  [warmup 3] openai on controller-io_2 (id=34376): 1144.2ms
  ...

When the entire warmup completes, prints an aggregate:
  [warmup done] 9 modules: 9 completed (sum of per-module elapsed: 3591.7ms)

If ANY canary ran on the main thread (main-thread-purity violation),
the per-module line is tagged with [MAIN-THREAD] AND a final WARNING
is printed:
  [warmup WARNING] N module(s) loaded on the MAIN THREAD: google.genai

Default is log_to_stderr=True so production runs get the observability
for free. Tests opt out via WarmupManager(pool, log_to_stderr=False)
in the _build_warmup helper.

5 new tests (4 stderr logging + 1 quiet). All 13 canary tests pass.

Use case: 'did my heavy import run on the GUI thread when it shouldnt
have?' is now answered by grepping stderr for [warmup ...] [MAIN-THREAD]
lines. No hook server required.
2026-06-06 22:15:24 -04:00

242 lines
8.5 KiB
Python

"""Tests for warmup canaries (sub-task: thread/load observability).
The WarmupManager records, for each module it loads:
- canary_id: monotonic numeric ID assigned at submit time
- module: the module name
- thread_name: name of the thread that did the import (e.g. "controller-io-0")
- thread_id: threading.get_ident() of that thread
- submit_ts / start_ts / end_ts: wall-clock timestamps
- elapsed_ms: end_ts - start_ts in milliseconds
- status: "running" / "completed" / "failed"
- error: error message string if status == "failed"
Canaries are exposed via WarmupManager.canaries() and via the
AppController.warmup_canaries() / GET /api/warmup_canaries.
"""
import threading
import time
import pytest
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from src.warmup import WarmupManager
from src.io_pool import make_io_pool
def _build_warmup() -> tuple[WarmupManager, object]:
"""Build a fresh WarmupManager + pool for testing (silent by default)."""
pool = make_io_pool()
mgr = WarmupManager(pool, log_to_stderr=False)
return mgr, pool
def test_canary_assigned_id_at_submit_time() -> None:
"""Each module gets a unique monotonic canary_id when submitted."""
mgr, pool = _build_warmup()
mgr.submit(["os", "sys", "json"])
# Canary records should exist immediately (status="running" or already done)
canaries = mgr.canaries()
assert len(canaries) == 3
ids = [c["canary_id"] for c in canaries]
assert len(set(ids)) == 3, "canary_ids must be unique"
assert sorted(ids) == [1, 2, 3], f"canary_ids must be monotonic, got {ids}"
modules = {c["module"] for c in canaries}
assert modules == {"os", "sys", "json"}
pool.shutdown(wait=True)
def test_canary_records_thread_name_and_id() -> None:
"""Each canary records the thread_name and thread_id that did the import."""
mgr, pool = _build_warmup()
mgr.submit(["os"])
assert mgr.wait(timeout=10.0)
canaries = mgr.canaries()
assert len(canaries) == 1
c = canaries[0]
assert "thread_name" in c
assert "thread_id" in c
# Should be a controller-io-N thread (the pool's name prefix)
assert c["thread_name"].startswith("controller-io"), (
f"thread_name should be controller-io-N, got {c['thread_name']!r}"
)
assert isinstance(c["thread_id"], int)
assert c["thread_id"] > 0
pool.shutdown(wait=True)
def test_canary_records_timing_and_status() -> None:
"""Each canary has start_ts, end_ts, elapsed_ms, and final status."""
mgr, pool = _build_warmup()
t_before = time.time()
mgr.submit(["os"])
assert mgr.wait(timeout=10.0)
t_after = time.time()
canaries = mgr.canaries()
assert len(canaries) == 1
c = canaries[0]
assert c["status"] == "completed"
assert c["error"] is None
assert "submit_ts" in c
assert "start_ts" in c
assert "end_ts" in c
assert "elapsed_ms" in c
assert c["submit_ts"] >= t_before
assert c["start_ts"] >= c["submit_ts"]
assert c["end_ts"] >= c["start_ts"]
assert c["elapsed_ms"] >= 0
assert c["end_ts"] <= t_after + 0.5
pool.shutdown(wait=True)
def test_canary_records_failure_status_and_error() -> None:
"""A failed import produces a canary with status='failed' and an error message."""
mgr, pool = _build_warmup()
mgr.submit(["definitely_does_not_exist_xyz_12345"])
assert mgr.wait(timeout=10.0)
canaries = mgr.canaries()
assert len(canaries) == 1
c = canaries[0]
assert c["status"] == "failed"
assert c["error"] is not None
assert "ModuleNotFoundError" in c["error"] or "definitely_does_not_exist" in c["error"]
assert c["elapsed_ms"] >= 0
pool.shutdown(wait=True)
def test_canary_visible_while_warmup_running() -> None:
"""A canary's status is 'running' while the import is in progress (eventually flips to completed/failed)."""
mgr, pool = _build_warmup()
mgr.submit(["os"])
# Immediately query canaries (might catch running state, might catch completed)
# Either is acceptable; the important property is that canary records exist.
canaries = mgr.canaries()
assert len(canaries) == 1
c = canaries[0]
assert c["status"] in ("running", "completed", "failed")
# After wait, must be completed
assert mgr.wait(timeout=10.0)
canaries = mgr.canaries()
assert canaries[0]["status"] == "completed"
pool.shutdown(wait=True)
def test_canaries_returns_copy_not_internal_state() -> None:
"""mgr.canaries() returns a defensive copy; mutation doesn't affect internal state."""
mgr, pool = _build_warmup()
mgr.submit(["os"])
assert mgr.wait(timeout=10.0)
snap1 = mgr.canaries()
snap1.clear() # mutate the returned list
snap2 = mgr.canaries()
assert len(snap2) == 1, "internal canaries list must not be affected by caller mutation"
pool.shutdown(wait=True)
def test_canary_thread_ids_are_unique_across_workers() -> None:
"""Concurrent warmup jobs should record DIFFERENT thread_ids (proving parallel execution)."""
mgr, pool = _build_warmup()
mgr.submit(["json", "os", "math", "datetime"])
assert mgr.wait(timeout=10.0)
canaries = mgr.canaries()
thread_ids = {c["thread_id"] for c in canaries}
# With 4 modules and 4 workers, we expect at least 2 unique threads
# (realistically all 4 will be unique since these are small modules).
assert len(thread_ids) >= 1, "at least one worker thread should be recorded"
pool.shutdown(wait=True)
def test_canary_canary_id_increments_across_resets() -> None:
"""Each call to submit() continues the monotonic canary_id counter."""
mgr, pool = _build_warmup()
mgr.submit(["os"])
assert mgr.wait(timeout=10.0)
first_ids = [c["canary_id"] for c in mgr.canaries()]
assert first_ids == [1]
mgr.reset()
mgr.submit(["json"])
assert mgr.wait(timeout=10.0)
second_ids = [c["canary_id"] for c in mgr.canaries()]
# Canary history is preserved across resets; new canary_id continues from 2.
assert second_ids == [1, 2], (
f"canary_ids should be [first=1, second=2]; got {second_ids}"
)
pool.shutdown(wait=True)
def test_warmup_logs_to_stderr_on_completion(capsys: pytest.CaptureFixture) -> None:
"""Successful canaries print a one-line summary to stderr."""
pool = make_io_pool()
mgr = WarmupManager(pool, log_to_stderr=True)
mgr.submit(["os", "json"])
assert mgr.wait(timeout=10.0)
captured = capsys.readouterr()
# Each completed module should have a log line
assert "[warmup" in captured.err
assert " os " in captured.err
assert " json " in captured.err
# Format: "[warmup N] module on thread (id=IDENT): ELAPSEDms"
assert "controller-io" in captured.err
assert "ms" in captured.err
pool.shutdown(wait=True)
def test_warmup_can_be_quiet(capsys: pytest.CaptureFixture) -> None:
"""log_to_stderr=False suppresses the per-module log lines."""
pool = make_io_pool()
mgr = WarmupManager(pool, log_to_stderr=False)
mgr.submit(["os", "json"])
assert mgr.wait(timeout=10.0)
captured = capsys.readouterr()
# No per-module log lines
assert "[warmup]" not in captured.err
# But the structured canary records still exist
canaries = mgr.canaries()
assert len(canaries) == 2
assert all(c["status"] == "completed" for c in canaries)
pool.shutdown(wait=True)
def test_warmup_logs_total_time_at_completion(capsys: pytest.CaptureFixture) -> None:
"""A summary line is printed when the entire warmup completes."""
pool = make_io_pool()
mgr = WarmupManager(pool, log_to_stderr=True)
mgr.submit(["os", "json"])
assert mgr.wait(timeout=10.0)
captured = capsys.readouterr()
# Summary line contains "done" or "ready" or "complete"
err_lines = [l for l in captured.err.splitlines() if l.strip()]
assert len(err_lines) >= 3 # 2 per-module + 1 summary
# The summary should mention total/total_ms or something aggregate
summary_line = err_lines[-1]
assert "warmup" in summary_line.lower()
pool.shutdown(wait=True)
def test_warmup_logs_failure_to_stderr(capsys: pytest.CaptureFixture) -> None:
"""A failed import prints a FAILED log line to stderr."""
pool = make_io_pool()
mgr = WarmupManager(pool, log_to_stderr=True)
mgr.submit(["definitely_does_not_exist_xyz_12345"])
assert mgr.wait(timeout=10.0)
captured = capsys.readouterr()
# Should contain a FAILED marker
assert "FAILED" in captured.err
assert "definitely_does_not_exist_xyz_12345" in captured.err
pool.shutdown(wait=True)
def test_warmup_log_line_includes_thread_id(capsys: pytest.CaptureFixture) -> None:
"""The log line includes the thread_id (matching the canary record)."""
pool = make_io_pool()
mgr = WarmupManager(pool, log_to_stderr=True)
mgr.submit(["os"])
assert mgr.wait(timeout=10.0)
canaries = mgr.canaries()
captured = capsys.readouterr()
# The thread_id from the canary should appear in the log
thread_id = str(canaries[0]["thread_id"])
assert thread_id in captured.err, f"expected thread_id {thread_id} in stderr: {captured.err!r}"
pool.shutdown(wait=True)