diff --git a/scripts/run_tests_batched.py b/scripts/run_tests_batched.py index bbfa9027..6139665d 100644 --- a/scripts/run_tests_batched.py +++ b/scripts/run_tests_batched.py @@ -1,34 +1,84 @@ import argparse +import json +import subprocess import sys +import time from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "tests")) from categorizer import categorize_all -from batcher import plan +from batcher import plan, Batch -def _print_plan(records, options) -> int: - batches = plan(records, include_opt_in=options.include_opt_in, xdist=not options.no_xdist) - for b in batches: - status = "SKIP" if b.skip_reason else "RUN" - print(f"[{status}] {b.label}: {len(b.files)} files, est {b.estimated_seconds:.1f}s, args={b.pytest_args}") +def _parse_tiers(s: str) -> set[str]: + return {t.strip() for t in s.split(",") if t.strip()} + +def _durations_path(tests_dir: Path) -> Path: + return tests_dir / ".test_durations.json" + +def _load_durations(p: Path) -> dict[str, float]: + if not p.exists(): + return {} + try: + with p.open("r", encoding="utf-8") as f: + return json.load(f) + except (json.JSONDecodeError, OSError): + return {} + +def _save_durations(p: Path, durations: dict[str, float]) -> None: + tmp = p.with_suffix(".json.tmp") + with tmp.open("w", encoding="utf-8") as f: + json.dump(durations, f, indent=2, sort_keys=True) + tmp.replace(p) + +def _parse_durations_from_pytest_output(stdout: str) -> dict[str, float]: + out: dict[str, float] = {} + for line in stdout.splitlines(): + line = line.strip() + if "::" not in line or " " not in line: + continue + parts = line.rsplit(None, 1) + if len(parts) != 2: + continue + nodeid, time_str = parts + try: + out[nodeid] = float(time_str.rstrip("s")) + except ValueError: + continue + return out + +def _run_batch(b: Batch, durations: dict[str, float]) -> tuple[int, float, dict[str, float]]: + if b.skip_reason: + return 0, 0.0, {} + cmd = ["uv", "run", "pytest", "-v", "--durations=0"] + b.pytest_args + [str(f) for f in b.files] + print(f"\n>>> Running {b.label} ({len(b.files)} files)") + t0 = time.monotonic() + proc = subprocess.run(cmd, capture_output=True, text=True) + elapsed = time.monotonic() - t0 + new_durs = _parse_durations_from_pytest_output(proc.stdout) + tail = proc.stdout[-2000:] if proc.returncode != 0 else f"<<< {b.label} PASS in {elapsed:.1f}s" + print(tail) + if proc.returncode != 0: + print(f"<<< {b.label} FAIL (exit {proc.returncode}) in {elapsed:.1f}s") + print(proc.stderr[-1000:]) + return proc.returncode, elapsed, new_durs + +def _print_summary(results: list[tuple[Batch, int, float]]) -> int: + print("\n" + "=" * 60) + print("SUMMARY") + print("=" * 60) + worst = 0 + for b, code, elapsed in results: if b.skip_reason: - print(f" reason: {b.skip_reason}") - return 0 - -def _print_audit(records, strict: bool) -> int: - auto = [r for r in records if r.source == "auto"] - print(f"Auto-inferred (unclassified) records: {len(auto)}") - for r in auto: - print(f" {r.filename}: fc={r.fixture_class.value}, subs={r.subsystems}, bg={r.batch_group}") - if strict: - bad = [r for r in auto if len(r.subsystems) > 1] - if bad: - print(f"STRICT: {len(bad)} auto-inferred files have multiple subsystems (probably cross-cutting):") - for r in bad: - print(f" {r.filename}: subs={r.subsystems}") - return 1 - return 0 + status = "SKIPPED" + elif code == 0: + status = "PASS" + else: + status = "FAIL" + worst = max(worst, code) + n = len(b.files) + print(f"[{b.tier}] {b.label:40s} {status:8s} {n} files {elapsed:6.1f}s") + return worst def main() -> int: p = argparse.ArgumentParser() @@ -40,14 +90,41 @@ def main() -> int: p.add_argument("--plan", action="store_true") p.add_argument("--audit", action="store_true") p.add_argument("--strict", action="store_true") + p.add_argument("--durations", action="store_true", help="Record per-test durations to .test_durations.json") options = p.parse_args() - records = categorize_all(Path(options.tests_dir), Path(options.registry)) + tiers = _parse_tiers(options.tiers) + tests_dir = Path(options.tests_dir) + durations_path = _durations_path(tests_dir) + durations = _load_durations(durations_path) + records = categorize_all(tests_dir, Path(options.registry)) if options.audit: - return _print_audit(records, strict=options.strict) + auto = [r for r in records if r.source == "auto"] + print(f"Auto-inferred (unclassified) records: {len(auto)}") + for r in auto: + print(f" {r.filename}: fc={r.fixture_class.value}, subs={r.subsystems}, bg={r.batch_group}") + if options.strict: + bad = [r for r in auto if len(r.subsystems) > 1] + if bad: + print(f"STRICT: {len(bad)} auto-inferred files have multiple subsystems:") + for r in bad: + print(f" {r.filename}: subs={r.subsystems}") + return 1 + return 0 + batches = plan(records, tiers=tiers, include_opt_in=options.include_opt_in, xdist=not options.no_xdist) if options.plan: - return _print_plan(records, options) - print("Phase 1 stub: no actual test execution yet. Use --plan or --audit.") - return 0 + for b in batches: + status = "SKIP" if b.skip_reason else "RUN" + print(f"[{status}] {b.label}: {len(b.files)} files, est {b.estimated_seconds:.1f}s") + return 0 + results: list[tuple[Batch, int, float]] = [] + merged_durations = dict(durations) + for b in batches: + code, elapsed, new_durs = _run_batch(b, merged_durations) + results.append((b, code, elapsed)) + merged_durations.update(new_durs) + if options.durations: + _save_durations(durations_path, merged_durations) + return _print_summary(results) if __name__ == "__main__": sys.exit(main())