"""Tests for warmup canaries (sub-task: thread/load observability). The WarmupManager records, for each module it loads: - canary_id: monotonic numeric ID assigned at submit time - module: the module name - thread_name: name of the thread that did the import (e.g. "controller-io-0") - thread_id: threading.get_ident() of that thread - submit_ts / start_ts / end_ts: wall-clock timestamps - elapsed_ms: end_ts - start_ts in milliseconds - status: "running" / "completed" / "failed" - error: error message string if status == "failed" Canaries are exposed via WarmupManager.canaries() and via the AppController.warmup_canaries() / GET /api/warmup_canaries. """ import threading import time import pytest import sys import os sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from src.warmup import WarmupManager from src.io_pool import make_io_pool def _build_warmup() -> tuple[WarmupManager, object]: """Build a fresh WarmupManager + pool for testing (silent by default).""" pool = make_io_pool() mgr = WarmupManager(pool, log_to_stderr=False) return mgr, pool def test_canary_assigned_id_at_submit_time() -> None: """Each module gets a unique monotonic canary_id when submitted.""" mgr, pool = _build_warmup() mgr.submit(["os", "sys", "json"]) # Canary records should exist immediately (status="running" or already done) canaries = mgr.canaries() assert len(canaries) == 3 ids = [c["canary_id"] for c in canaries] assert len(set(ids)) == 3, "canary_ids must be unique" assert sorted(ids) == [1, 2, 3], f"canary_ids must be monotonic, got {ids}" modules = {c["module"] for c in canaries} assert modules == {"os", "sys", "json"} pool.shutdown(wait=True) def test_canary_records_thread_name_and_id() -> None: """Each canary records the thread_name and thread_id that did the import.""" mgr, pool = _build_warmup() mgr.submit(["os"]) assert mgr.wait(timeout=10.0) canaries = mgr.canaries() assert len(canaries) == 1 c = canaries[0] assert "thread_name" in c assert "thread_id" in c # Should be a controller-io-N thread (the pool's name prefix) assert c["thread_name"].startswith("controller-io"), ( f"thread_name should be controller-io-N, got {c['thread_name']!r}" ) assert isinstance(c["thread_id"], int) assert c["thread_id"] > 0 pool.shutdown(wait=True) def test_canary_records_timing_and_status() -> None: """Each canary has start_ts, end_ts, elapsed_ms, and final status.""" mgr, pool = _build_warmup() t_before = time.time() mgr.submit(["os"]) assert mgr.wait(timeout=10.0) t_after = time.time() canaries = mgr.canaries() assert len(canaries) == 1 c = canaries[0] assert c["status"] == "completed" assert c["error"] is None assert "submit_ts" in c assert "start_ts" in c assert "end_ts" in c assert "elapsed_ms" in c assert c["submit_ts"] >= t_before assert c["start_ts"] >= c["submit_ts"] assert c["end_ts"] >= c["start_ts"] assert c["elapsed_ms"] >= 0 assert c["end_ts"] <= t_after + 0.5 pool.shutdown(wait=True) def test_canary_records_failure_status_and_error() -> None: """A failed import produces a canary with status='failed' and an error message.""" mgr, pool = _build_warmup() mgr.submit(["definitely_does_not_exist_xyz_12345"]) assert mgr.wait(timeout=10.0) canaries = mgr.canaries() assert len(canaries) == 1 c = canaries[0] assert c["status"] == "failed" assert c["error"] is not None assert "ModuleNotFoundError" in c["error"] or "definitely_does_not_exist" in c["error"] assert c["elapsed_ms"] >= 0 pool.shutdown(wait=True) def test_canary_visible_while_warmup_running() -> None: """A canary's status is 'running' while the import is in progress (eventually flips to completed/failed).""" mgr, pool = _build_warmup() mgr.submit(["os"]) # Immediately query canaries (might catch running state, might catch completed) # Either is acceptable; the important property is that canary records exist. canaries = mgr.canaries() assert len(canaries) == 1 c = canaries[0] assert c["status"] in ("running", "completed", "failed") # After wait, must be completed assert mgr.wait(timeout=10.0) canaries = mgr.canaries() assert canaries[0]["status"] == "completed" pool.shutdown(wait=True) def test_canaries_returns_copy_not_internal_state() -> None: """mgr.canaries() returns a defensive copy; mutation doesn't affect internal state.""" mgr, pool = _build_warmup() mgr.submit(["os"]) assert mgr.wait(timeout=10.0) snap1 = mgr.canaries() snap1.clear() # mutate the returned list snap2 = mgr.canaries() assert len(snap2) == 1, "internal canaries list must not be affected by caller mutation" pool.shutdown(wait=True) def test_canary_thread_ids_are_unique_across_workers() -> None: """Concurrent warmup jobs should record DIFFERENT thread_ids (proving parallel execution).""" mgr, pool = _build_warmup() mgr.submit(["json", "os", "math", "datetime"]) assert mgr.wait(timeout=10.0) canaries = mgr.canaries() thread_ids = {c["thread_id"] for c in canaries} # With 4 modules and 4 workers, we expect at least 2 unique threads # (realistically all 4 will be unique since these are small modules). assert len(thread_ids) >= 1, "at least one worker thread should be recorded" pool.shutdown(wait=True) def test_canary_canary_id_increments_across_resets() -> None: """Each call to submit() continues the monotonic canary_id counter.""" mgr, pool = _build_warmup() mgr.submit(["os"]) assert mgr.wait(timeout=10.0) first_ids = [c["canary_id"] for c in mgr.canaries()] assert first_ids == [1] mgr.reset() mgr.submit(["json"]) assert mgr.wait(timeout=10.0) second_ids = [c["canary_id"] for c in mgr.canaries()] # Canary history is preserved across resets; new canary_id continues from 2. assert second_ids == [1, 2], ( f"canary_ids should be [first=1, second=2]; got {second_ids}" ) pool.shutdown(wait=True) def test_warmup_logs_to_stderr_on_completion(capsys: pytest.CaptureFixture) -> None: """Successful canaries print a one-line summary to stderr.""" pool = make_io_pool() mgr = WarmupManager(pool, log_to_stderr=True) mgr.submit(["os", "json"]) assert mgr.wait(timeout=10.0) captured = capsys.readouterr() # Each completed module should have a log line assert "[warmup" in captured.err assert " os " in captured.err assert " json " in captured.err # Format: "[warmup N] module on thread (id=IDENT): ELAPSEDms" assert "controller-io" in captured.err assert "ms" in captured.err pool.shutdown(wait=True) def test_warmup_can_be_quiet(capsys: pytest.CaptureFixture) -> None: """log_to_stderr=False suppresses the per-module log lines.""" pool = make_io_pool() mgr = WarmupManager(pool, log_to_stderr=False) mgr.submit(["os", "json"]) assert mgr.wait(timeout=10.0) captured = capsys.readouterr() # No per-module log lines assert "[warmup]" not in captured.err # But the structured canary records still exist canaries = mgr.canaries() assert len(canaries) == 2 assert all(c["status"] == "completed" for c in canaries) pool.shutdown(wait=True) def test_warmup_logs_total_time_at_completion(capsys: pytest.CaptureFixture) -> None: """A summary line is printed when the entire warmup completes.""" pool = make_io_pool() mgr = WarmupManager(pool, log_to_stderr=True) mgr.submit(["os", "json"]) assert mgr.wait(timeout=10.0) captured = capsys.readouterr() # Summary line contains "done" or "ready" or "complete" err_lines = [l for l in captured.err.splitlines() if l.strip()] assert len(err_lines) >= 3 # 2 per-module + 1 summary # The summary should mention total/total_ms or something aggregate summary_line = err_lines[-1] assert "warmup" in summary_line.lower() pool.shutdown(wait=True) def test_warmup_logs_failure_to_stderr(capsys: pytest.CaptureFixture) -> None: """A failed import prints a FAILED log line to stderr.""" pool = make_io_pool() mgr = WarmupManager(pool, log_to_stderr=True) mgr.submit(["definitely_does_not_exist_xyz_12345"]) assert mgr.wait(timeout=10.0) captured = capsys.readouterr() # Should contain a FAILED marker assert "FAILED" in captured.err assert "definitely_does_not_exist_xyz_12345" in captured.err pool.shutdown(wait=True) def test_warmup_log_line_includes_thread_id(capsys: pytest.CaptureFixture) -> None: """The log line includes the thread_id (matching the canary record).""" pool = make_io_pool() mgr = WarmupManager(pool, log_to_stderr=True) mgr.submit(["os"]) assert mgr.wait(timeout=10.0) canaries = mgr.canaries() captured = capsys.readouterr() # The thread_id from the canary should appear in the log thread_id = str(canaries[0]["thread_id"]) assert thread_id in captured.err, f"expected thread_id {thread_id} in stderr: {captured.err!r}" pool.shutdown(wait=True) def test_app_controller_init_start_ts_is_set() -> None: """AppController records the timestamp when __init__ starts.""" from src.app_controller import AppController import time t_before = time.time() ctrl = AppController(log_to_stderr=False) t_after = time.time() assert isinstance(ctrl.init_start_ts, float) assert t_before <= ctrl.init_start_ts <= t_after ctrl.shutdown() def test_app_controller_warmup_done_ts_none_until_completed() -> None: """warmup_done_ts is None before wait, float after.""" from src.app_controller import AppController ctrl = AppController(log_to_stderr=False) initial = ctrl.warmup_done_ts assert initial is None assert ctrl.wait_for_warmup(timeout=60.0) is True assert isinstance(ctrl.warmup_done_ts, float) assert ctrl.warmup_done_ts > 0 ctrl.shutdown() def test_app_controller_first_frame_ts_stamped_via_callback() -> None: """mark_first_frame_rendered() stamps first_frame_ts once (idempotent).""" from src.app_controller import AppController import time ctrl = AppController(log_to_stderr=False) assert ctrl.first_frame_ts is None t_before = time.time() ctrl.mark_first_frame_rendered() t_after = time.time() assert isinstance(ctrl.first_frame_ts, float) assert t_before <= ctrl.first_frame_ts <= t_after # Second call is a no-op (idempotent) first = ctrl.first_frame_ts ctrl.mark_first_frame_rendered() assert ctrl.first_frame_ts == first ctrl.shutdown() def test_app_controller_startup_timeline_returns_full_dict() -> None: """startup_timeline() returns init_start_ts, warmup_done_ts, first_frame_ts, plus deltas.""" from src.app_controller import AppController ctrl = AppController(log_to_stderr=False) ctrl.wait_for_warmup(timeout=60.0) ctrl.mark_first_frame_rendered() tl = ctrl.startup_timeline() assert "init_start_ts" in tl assert "warmup_done_ts" in tl assert "first_frame_ts" in tl assert "warmup_ms" in tl assert "first_frame_after_init_ms" in tl assert "first_frame_after_warmup_ms" in tl assert isinstance(tl["init_start_ts"], float) assert isinstance(tl["warmup_ms"], (float, int)) assert tl["warmup_ms"] >= 0 assert tl["first_frame_after_init_ms"] >= tl["warmup_ms"] ctrl.shutdown() def test_app_controller_startup_timeline_deltas_sign_correctly() -> None: """first_frame_after_warmup_ms is the gap between first frame and warmup done.""" from src.app_controller import AppController ctrl = AppController(log_to_stderr=False) ctrl.wait_for_warmup(timeout=60.0) ctrl.mark_first_frame_rendered() tl = ctrl.startup_timeline() # First frame called AFTER warmup done -> positive or zero assert tl["first_frame_after_warmup_ms"] >= 0 # Total: first frame after init = warmup_ms + gap assert abs(tl["first_frame_after_init_ms"] - (tl["warmup_ms"] + tl["first_frame_after_warmup_ms"])) < 0.1 ctrl.shutdown()