feat(perf): Add performance tests and high-precision timing

This commit is contained in:
2026-05-06 14:59:34 -04:00
parent 3f592afa16
commit d0aff71430
3 changed files with 147 additions and 4 deletions
+4 -4
View File
@@ -154,7 +154,7 @@ class PerformanceMonitor:
return self._history_sums[key] / len(h) return self._history_sums[key] / len(h)
def start_frame(self) -> None: def start_frame(self) -> None:
now = time.time() now = time.perf_counter()
with self._lock: with self._lock:
if self._last_frame_start_time > 0: if self._last_frame_start_time > 0:
dt = now - self._last_frame_start_time dt = now - self._last_frame_start_time
@@ -167,7 +167,7 @@ class PerformanceMonitor:
def end_frame(self) -> None: def end_frame(self) -> None:
if self._start_time is None: if self._start_time is None:
return return
now = time.time() now = time.perf_counter()
elapsed = now - self._start_time elapsed = now - self._start_time
frame_time_ms = elapsed * 1000 frame_time_ms = elapsed * 1000
@@ -194,13 +194,13 @@ class PerformanceMonitor:
def start_component(self, name: str) -> None: def start_component(self, name: str) -> None:
if not self.enabled: return if not self.enabled: return
now = time.time() now = time.perf_counter()
with self._lock: with self._lock:
self._component_starts[name] = now self._component_starts[name] = now
def end_component(self, name: str) -> None: def end_component(self, name: str) -> None:
if not self.enabled: return if not self.enabled: return
now = time.time() now = time.perf_counter()
with self._lock: with self._lock:
start = self._component_starts.pop(name, None) start = self._component_starts.pop(name, None)
if start is not None: if start is not None:
+68
View File
@@ -0,0 +1,68 @@
import time
from pathlib import Path
from src.aggregate import build_tier3_context
from src.performance_monitor import get_monitor
def test_build_tier3_context_scaling():
perf = get_monitor()
perf.enabled = True
# 1. Create a large number of mock file items (e.g., 500)
file_items = []
for i in range(500):
path = Path(f"src/file_{i}.py")
file_items.append({
"path": path,
"entry": str(path),
"content": f"def func_{i}():\n \"\"\"Docstring for {i}\"\"\"\n pass\n",
"auto_aggregate": True,
"tier": 0
})
# 2. Create a large number of focus files (e.g., 100)
focus_files = [str(Path(f"src/file_{i}.py")) for i in range(100)]
# 3. Measure the time taken by build_tier3_context
with perf.scope("test_build_tier3_context_scaling"):
start_time = time.perf_counter()
# screenshot_base_dir, screenshots, history are empty Path/lists for this test
result = build_tier3_context(file_items, Path("assets"), [], [], focus_files)
end_time = time.perf_counter()
duration_ms = (end_time - start_time) * 1000
print(f"build_tier3_context took {duration_ms:.2f} ms for 500 items and 100 focus files")
# 4. Assert that the function correctly identifies focus files
for i in range(100):
# Focus files should have full content
entry = str(Path(f"src/file_{i}.py"))
expected_header = f"### `{entry}`"
assert expected_header in result
assert f"def func_{i}():" in result
assert "pass" in result
# Check non-focus files (should be skeletonized)
for i in range(100, 110): # Just check a few
entry = str(Path(f"src/file_{i}.py"))
# Non-focus files may have (AST Skeleton) in header if they are .py
expected_header = f"### `{entry}` (AST Skeleton)"
assert expected_header in result
assert f"def func_{i}():" in result
assert f"\"\"\"Docstring for {i}\"\"\"" in result
# The skeleton should strip the body 'pass'
assert "pass" not in result.split(expected_header)[1].split("###")[0]
# 5. Use the PerformanceMonitor to record the time under a 'test_build_tier3_context_scaling' component
metrics = perf.get_metrics()
assert "time_test_build_tier3_context_scaling_ms" in metrics
print(f"Recorded metric: {metrics['time_test_build_tier3_context_scaling_ms']:.2f} ms")
if __name__ == "__main__":
try:
test_build_tier3_context_scaling()
print("SUCCESS")
except Exception as e:
import traceback
traceback.print_exc()
print(f"FAILED: {e}")
exit(1)
+75
View File
@@ -0,0 +1,75 @@
import pytest
from src.models import Ticket
from src.dag_engine import TrackDAG
from src.performance_monitor import get_monitor
def test_dag_performance():
perf = get_monitor()
perf.enabled = True
num_tickets = 500
tickets = []
# Create a complex DAG: linear dependencies + branching
for i in range(num_tickets):
deps = []
if i > 0:
# Linear dependency
deps.append(f"ticket_{i-1}")
if i > 10:
# Branching dependency (binary tree-ish)
deps.append(f"ticket_{i//2}")
if i > 50:
# More cross-links
deps.append(f"ticket_{i-10}")
tickets.append(Ticket(
id=f"ticket_{i}",
description=f"Ticket number {i}",
depends_on=deps,
status="todo"
))
dag = TrackDAG(tickets[::-1])
# 1. Measure has_cycle()
cycle_found = dag.has_cycle()
assert cycle_found is False
# 2. Measure topological_sort()
sorted_ids = dag.topological_sort()
assert len(sorted_ids) == num_tickets
# 3. Measure cascade_blocks()
tickets[0].status = "blocked"
dag.cascade_blocks()
# Verify all are blocked
for t in tickets:
assert t.status == "blocked", f"Ticket {t.id} failed to cascade block"
# 4. Verify metrics were recorded
metrics = perf.get_metrics()
assert "time_dag_has_cycle_ms" in metrics
assert "time_dag_topological_sort_ms" in metrics
assert "time_dag_cascade_blocks_ms" in metrics
# Output results for visibility when running with -s
print(f"\n[PERF] DAG Performance (n={num_tickets}):")
print(f" has_cycle: {metrics['time_dag_has_cycle_ms']:.4f} ms")
print(f" topological_sort: {metrics['time_dag_topological_sort_ms']:.4f} ms")
print(f" cascade_blocks: {metrics['time_dag_cascade_blocks_ms']:.4f} ms")
def test_dag_edge_cases():
# Test cycle detection correctness
t1 = Ticket(id="a", description="a", depends_on=["b"])
t2 = Ticket(id="b", description="b", depends_on=["c"])
t3 = Ticket(id="c", description="c", depends_on=["a"])
dag = TrackDAG([t1, t2, t3])
assert dag.has_cycle() is True
with pytest.raises(ValueError, match="Dependency cycle detected"):
dag.topological_sort()
# Test empty DAG
dag_empty = TrackDAG([])
assert dag_empty.has_cycle() is False
assert dag_empty.topological_sort() == []