From 5a85653654e76e1c1bd57f5a437fb3a53b7b7064 Mon Sep 17 00:00:00 2001 From: Ed_ Date: Sat, 6 Jun 2026 13:57:26 -0400 Subject: [PATCH] feat(startup_profiler): add StartupProfiler for per-phase init timing Lightweight, in-memory profiler for AppController init phases. Used by the startup_speedup_20260606 track to measure where the time goes during boot (config hydration, hook server start, subsystem init, etc.). The profiler is exposed via /api/startup_profile (Phase 8 work) and the Diagnostics panel so the user can see the exact per-phase cost. Public API: StartupProfiler() - create .phase(name) - context manager .snapshot() - {phases: {name: {start_ts, duration_ms}}, total_ms, count} .reset() - clear recorded phases .enable() / .disable() - toggle recording Implementation: - dataclass with list of _Phase(name, start_ts, end_ts) - @contextmanager records wall-clock via time.perf_counter - records duration even if the body raises (try/finally) - snapshot is a copy, so consumers can't mutate the live state TDD: 5 tests in tests/test_startup_profiler.py cover: basic recording, total math, snapshot isolation, exception safety, empty state. --- src/startup_profiler.py | 54 ++++++++++++++++++++++++++ tests/test_startup_profiler.py | 70 ++++++++++++++++++++++++++++++++++ 2 files changed, 124 insertions(+) create mode 100644 src/startup_profiler.py create mode 100644 tests/test_startup_profiler.py diff --git a/src/startup_profiler.py b/src/startup_profiler.py new file mode 100644 index 00000000..41605ee2 --- /dev/null +++ b/src/startup_profiler.py @@ -0,0 +1,54 @@ +import time +from contextlib import contextmanager +from dataclasses import dataclass, field +from typing import Any, Iterator + + +@dataclass +class _Phase: + name: str + start_ts: float + end_ts: float = 0.0 + + +@dataclass +class StartupProfiler: + _phases: list[_Phase] = field(default_factory=list) + _enabled: bool = True + + def enable(self) -> None: + self._enabled = True + + def disable(self) -> None: + self._enabled = False + + @contextmanager + def phase(self, name: str) -> Iterator[None]: + if not self._enabled: + yield + return + p = _Phase(name=name, start_ts=time.perf_counter()) + try: + yield + finally: + p.end_ts = time.perf_counter() + self._phases.append(p) + + def snapshot(self) -> dict[str, Any]: + phases: dict[str, dict[str, float]] = {} + total = 0.0 + for p in self._phases: + duration_ms = max(0.0, (p.end_ts - p.start_ts) * 1000.0) if p.end_ts else 0.0 + phases[p.name] = { + "start_ts": p.start_ts, + "duration_ms": round(duration_ms, 3), + } + total += duration_ms + return { + "phases": phases, + "total_ms": round(total, 3), + "count": len(phases), + } + + def reset(self) -> None: + self._phases.clear() diff --git a/tests/test_startup_profiler.py b/tests/test_startup_profiler.py new file mode 100644 index 00000000..2649beb6 --- /dev/null +++ b/tests/test_startup_profiler.py @@ -0,0 +1,70 @@ +"""Tests for src/startup_profiler.py. + +TDD Red phase: this file should fail before src/startup_profiler.py exists, +pass after it's implemented. +""" + +import time +from pathlib import Path +import sys + +ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(ROOT)) + +from src.startup_profiler import StartupProfiler # noqa: E402 + + +def test_startup_profiler_records_phase_duration() -> None: + prof = StartupProfiler() + with prof.phase("init_config"): + time.sleep(0.01) + snap = prof.snapshot() + assert "phases" in snap + assert "init_config" in snap["phases"] + entry = snap["phases"]["init_config"] + assert entry["duration_ms"] >= 8.0 + assert entry["duration_ms"] < 500.0 + assert entry["start_ts"] > 0 + + +def test_startup_profiler_total_reflects_sum() -> None: + prof = StartupProfiler() + with prof.phase("a"): + time.sleep(0.005) + with prof.phase("b"): + time.sleep(0.005) + snap = prof.snapshot() + assert snap["total_ms"] >= 10.0 + assert abs(snap["total_ms"] - (snap["phases"]["a"]["duration_ms"] + snap["phases"]["b"]["duration_ms"])) < 0.5 + + +def test_startup_profiler_snapshot_is_independent_copy() -> None: + prof = StartupProfiler() + with prof.phase("x"): + pass + snap1 = prof.snapshot() + with prof.phase("y"): + pass + snap2 = prof.snapshot() + assert "x" in snap1["phases"] + assert "y" not in snap1["phases"] + assert "y" in snap2["phases"] + + +def test_startup_profiler_phase_records_on_exception() -> None: + prof = StartupProfiler() + try: + with prof.phase("will_fail"): + raise RuntimeError("boom") + except RuntimeError: + pass + snap = prof.snapshot() + assert "will_fail" in snap["phases"] + assert snap["phases"]["will_fail"]["duration_ms"] >= 0 + + +def test_startup_profiler_empty_snapshot() -> None: + prof = StartupProfiler() + snap = prof.snapshot() + assert snap["phases"] == {} + assert snap["total_ms"] == 0.0