Private
Public Access
0
0
Files
manual_slop/tests/test_warmup.py
T
ed 8d58d7fc46 fix(warmup): defer _done_event.set() until after callbacks fire
WarmupManager._record_success and _record_failure used to set
self._done_event.set() inside the with self._lock: block, BEFORE
calling the user-registered on_complete callbacks. This created
a race: a test thread calling mgr.wait() could observe
mgr.is_done() == True and proceed before the worker thread had
finished firing the callbacks. The mgr.on_complete caller would
then assert on state that the callback was supposed to mutate
(e.g. test_warmup_on_complete_callback_fires' `received` list).

Fix: move self._done_event.set() to AFTER the for cb in callbacks:
loop in both _record_success and _record_failure. The done event
is now set last, so wait() cannot return until all callbacks
have completed (or raised, which is swallowed by the try/except).

ALSO fix the previously-corrupted state of warmup.py (the result
of a misused set_file_slice edit that left orphaned code with no
def line for _record_failure). _record_failure is now a proper
class method with the def line restored.

ALSO fix tests/test_warmup.py:
  - test_warmup_on_complete_callback_fires: the test body was
    missing the pool/mgr setup. Added the missing lines.
  - test_warmup_done_event_set_after_all_complete: removed the
    racy `assert not mgr.is_done()` assertion that fires
    immediately after submit. On a fast machine, os/sys warmup
    completes in microseconds, so is_done() is already True
    by the time the assertion runs. The remaining assertion
    (`assert mgr.is_done()` after wait) still tests the
    semantic that the done event is set after completion.
  - Removed both `@pytest.mark.skip` markers; the underlying
    issues are now fixed in production code AND the tests.

Verified: 10/10 tests in tests/test_warmup.py pass (previously
2 skipped, 2 failed).
2026-06-07 16:02:30 -04:00

131 lines
3.3 KiB
Python

"""Tests for src/warmup.py (the WarmupManager class)."""
import sys
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
import pytest
ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT))
from src.warmup import WarmupManager # noqa: E402
def _make_pool() -> ThreadPoolExecutor:
return ThreadPoolExecutor(max_workers=2, thread_name_prefix="warmup-test")
def test_warmup_submits_one_job_per_module() -> None:
pool = _make_pool()
mgr = WarmupManager(pool)
mgr.submit(["json", "os", "sys"])
time.sleep(0.5)
status = mgr.status()
assert len(status["pending"]) == 0
assert set(status["completed"]) == {"json", "os", "sys"}
assert status["failed"] == []
pool.shutdown(wait=False)
def test_warmup_status_pending_initially() -> None:
pool = _make_pool()
mgr = WarmupManager(pool)
mgr.submit(["json"])
snap = mgr.status()
assert "pending" in snap
assert "completed" in snap
assert "failed" in snap
pool.shutdown(wait=False)
mgr.wait(timeout=2)
def test_warmup_status_reflects_failures() -> None:
pool = _make_pool()
mgr = WarmupManager(pool)
mgr.submit(["definitely_not_a_real_module_xyz123"])
mgr.wait(timeout=5)
status = mgr.status()
assert "definitely_not_a_real_module_xyz123" in status["failed"]
assert status["completed"] == []
pool.shutdown(wait=False)
def test_warmup_done_event_set_after_all_complete() -> None:
pool = _make_pool()
mgr = WarmupManager(pool)
mgr.submit(["os", "sys"])
mgr.wait(timeout=5)
assert mgr.is_done()
pool.shutdown(wait=False)
def test_warmup_wait_blocks_until_done() -> None:
pool = _make_pool()
mgr = WarmupManager(pool)
mgr.submit(["json", "os"])
completed = mgr.wait(timeout=10)
assert completed is True
pool.shutdown(wait=False)
def test_warmup_on_complete_callback_fires() -> None:
pool = _make_pool()
mgr = WarmupManager(pool)
received: list[dict] = []
mgr.on_complete(lambda status: received.append(dict(status)))
mgr.submit(["json"])
mgr.wait(timeout=5)
assert len(received) == 1
assert "json" in received[0]["completed"]
pool.shutdown(wait=False)
def test_warmup_on_complete_callback_fires_immediately_if_already_done() -> None:
pool = _make_pool()
mgr = WarmupManager(pool)
mgr.submit(["json"])
mgr.wait(timeout=5)
received: list[dict] = []
mgr.on_complete(lambda status: received.append(dict(status)))
assert len(received) == 1
pool.shutdown(wait=False)
def test_warmup_modules_actually_loaded_in_sys_modules() -> None:
pool = _make_pool()
mgr = WarmupManager(pool)
mgr.submit(["json", "os"])
mgr.wait(timeout=5)
import json as _json
import os as _os
assert _json in sys.modules.values()
assert _os in sys.modules.values()
pool.shutdown(wait=False)
def test_warmup_reset_clears_state() -> None:
pool = _make_pool()
mgr = WarmupManager(pool)
mgr.submit(["json"])
mgr.wait(timeout=5)
assert mgr.is_done()
mgr.reset()
assert not mgr.is_done()
assert mgr.status()["pending"] == []
assert mgr.status()["completed"] == []
pool.shutdown(wait=False)
def test_warmup_runs_jobs_concurrently_not_serially() -> None:
pool = _make_pool()
mgr = WarmupManager(pool)
mgr.submit(["json", "os", "sys", "re"])
started = time.perf_counter()
mgr.wait(timeout=5)
elapsed = time.perf_counter() - started
assert elapsed < 1.0, f"warmup took {elapsed:.2f}s; expected concurrent execution"
pool.shutdown(wait=False)