import argparse import json import os import subprocess import sys import time from pathlib import Path try: import xdist as _xdist _HAS_XDIST = True except ImportError: _HAS_XDIST = False _SCRIPT_DIR = Path(__file__).resolve().parent _PROJECT_ROOT = _SCRIPT_DIR.parent sys.path.insert(0, str(_PROJECT_ROOT / "tests")) _USE_COLOR = sys.stdout.isatty() or os.environ.get("FORCE_COLOR") == "1" if _USE_COLOR and os.name == "nt": try: import ctypes kernel32 = ctypes.windll.kernel32 kernel32.SetConsoleMode(kernel32.GetStdHandle(-11), 7) except Exception: _USE_COLOR = False class _C: RESET = "\033[0m" BOLD = "\033[1m" DIM = "\033[2m" RED = "\033[31m" GREEN = "\033[32m" YELLOW = "\033[33m" BLUE = "\033[34m" MAGENTA = "\033[35m" CYAN = "\033[36m" BOLD_GREEN = "\033[1;32m" BOLD_RED = "\033[1;31m" BOLD_YELLOW = "\033[1;33m" BOLD_CYAN = "\033[1;36m" def _c(text: str, color: str) -> str: if not _USE_COLOR: return text return f"{color}{text}{_C.RESET}" from categorizer import categorize_all from batcher import plan, Batch def _parse_tiers(s: str) -> set[str]: return {t.strip() for t in s.split(",") if t.strip()} def _durations_path(tests_dir: Path) -> Path: return tests_dir / ".test_durations.json" def _load_durations(p: Path) -> dict[str, float]: if not p.exists(): return {} try: with p.open("r", encoding="utf-8") as f: return json.load(f) except (json.JSONDecodeError, OSError): return {} def _save_durations(p: Path, durations: dict[str, float]) -> None: tmp = p.with_suffix(".json.tmp") with tmp.open("w", encoding="utf-8") as f: json.dump(durations, f, indent=2, sort_keys=True) tmp.replace(p) def _parse_durations_from_pytest_output(stdout: str) -> dict[str, float]: out: dict[str, float] = {} for line in stdout.splitlines(): line = line.strip() if "::" not in line or " " not in line: continue parts = line.rsplit(None, 1) if len(parts) != 2: continue nodeid, time_str = parts try: out[nodeid] = float(time_str.rstrip("s")) except ValueError: continue return out _NOISE_PREFIXES: tuple[str, ...] = ( "[LogPruner]", "[startup]", "created: ", "=========", ) _NOISE_SUBSTRINGS: tuple[str, ...] = ( "[WinError", ) def _format_pytest_line(line: str) -> str | None: stripped = line.rstrip() if not stripped: return None for prefix in _NOISE_PREFIXES: if stripped.startswith(prefix): return None for sub in _NOISE_SUBSTRINGS: if sub in stripped: return None if " PASSED " in stripped and "[gw" in stripped: return _c(stripped, _C.GREEN) if " FAILED " in stripped and "[gw" in stripped: return _c(stripped, _C.BOLD_RED) if " ERROR " in stripped and "[gw" in stripped: return _c(stripped, _C.BOLD_RED) if stripped.startswith(("PASSED", "FAILED", "ERROR")) and "::" in stripped: status = stripped.split()[0] rest = stripped[len(status):] if status == "PASSED": return _c(f"{status}{rest}", _C.GREEN) return _c(f"{status}{rest}", _C.BOLD_RED) if stripped.startswith(("passed", "failed", "error")) and " in " in stripped and stripped.endswith("s"): return _c(stripped, _C.BOLD) return stripped def _run_batch(b: Batch, durations: dict[str, float]) -> tuple[int, float, dict[str, float]]: if b.skip_reason: return 0, 0.0, {} args = list(b.pytest_args) if not _HAS_XDIST: args = [a for a in args if a not in {"-n", "auto"}] cmd = ["uv", "run", "pytest", "-v", "--durations=3"] + args + [str(f) for f in b.files] print(_c(f"\n>>> Running {b.label} ({len(b.files)} files)", _C.BOLD_CYAN)) t0 = time.monotonic() proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1) captured: list[str] = [] assert proc.stdout is not None for line in proc.stdout: captured.append(line) formatted = _format_pytest_line(line) if formatted is None: continue print(formatted) proc.wait() elapsed = time.monotonic() - t0 new_durs = _parse_durations_from_pytest_output("".join(captured)) if proc.returncode == 0: print(_c(f"<<< {b.label} PASS in {elapsed:.1f}s", _C.BOLD_GREEN)) else: print(_c(f"<<< {b.label} FAIL (exit {proc.returncode}) in {elapsed:.1f}s", _C.BOLD_RED)) return proc.returncode, elapsed, new_durs def _print_summary(results: list[tuple[Batch, int, float]]) -> int: print() rows: list[tuple[str, str, str, int, float, int]] = [] worst = 0 total_files = 0 total_time = 0.0 passed_count = 0 failed_count = 0 skipped_count = 0 for b, code, elapsed in results: n = len(b.files) total_files += n total_time += elapsed if b.skip_reason: status_text = "SKIPPED" skipped_count += 1 elif code == 0: status_text = "PASS" passed_count += 1 else: status_text = "FAIL" failed_count += 1 worst = max(worst, code) rows.append((b.tier, b.label, status_text, n, elapsed, code)) tier_w = max(len("TIER"), max(len(r[0]) for r in rows)) label_w = max(len("BATCH LABEL"), max(len(r[1]) for r in rows)) status_w = max(len("STATUS"), max(len(r[2]) for r in rows)) files_w = max(len("FILES"), max(len(str(r[3])) for r in rows)) time_w = max(len("TIME"), max(len(f"{r[4]:.1f}s") for r in rows)) header = f" {'TIER':{tier_w}s} │ {'BATCH LABEL':{label_w}s} │ {'STATUS':{status_w}s} │ {'FILES':>{files_w}s} │ {'TIME':>{time_w}s} " sep = "─" * len(header) print(_c(sep, _C.DIM)) print(_c(header, _C.BOLD)) print(_c(sep, _C.DIM)) for tier, label, status_text, n, elapsed, _code in rows: if status_text == "PASS": status = _c(status_text, _C.BOLD_GREEN) elif status_text == "FAIL": status = _c(status_text, _C.BOLD_RED) else: status = _c(status_text, _C.BOLD_YELLOW) tier_colored = _c(f" {tier:<{tier_w}s}", _C.CYAN) print(f"{tier_colored} │ {label:<{label_w}s} │ {status} │ {n:>{files_w}d} │ {elapsed:>{time_w - 1}.1f}s") print(_c(sep, _C.DIM)) if failed_count: overall_text = f"{failed_count} FAILED" overall = _c(overall_text, _C.BOLD_RED) elif passed_count: overall_text = f"ALL {passed_count} PASS" overall = _c(overall_text, _C.BOLD_GREEN) else: overall_text = "NO BATCHES RUN" overall = _c(overall_text, _C.BOLD_YELLOW) total_label = _c(f" {'TOTAL':<{tier_w}s}", _C.BOLD) print(f"{total_label} │ {'':<{label_w}s} │ {overall} │ {total_files:>{files_w}d} │ {total_time:>{time_w - 1}.1f}s") print(_c(sep, _C.DIM)) return worst def main() -> int: p = argparse.ArgumentParser() p.add_argument("--tests-dir", default=str(_PROJECT_ROOT / "tests")) p.add_argument("--registry", default=str(_PROJECT_ROOT / "tests" / "test_categories.toml")) p.add_argument("--tiers", default="1,2,3,H") p.add_argument("--include-opt-in", action="store_true") p.add_argument("--no-xdist", action="store_true") p.add_argument("--plan", action="store_true") p.add_argument("--audit", action="store_true") p.add_argument("--strict", action="store_true") p.add_argument("--durations", action="store_true", help="Record per-test durations to .test_durations.json") p.add_argument("--no-color", action="store_true", help="Disable ANSI color output") options = p.parse_args() if options.no_color: global _USE_COLOR _USE_COLOR = False tiers = _parse_tiers(options.tiers) tests_dir = Path(options.tests_dir) if Path(options.tests_dir).is_absolute() else (_PROJECT_ROOT / options.tests_dir) durations_path = _durations_path(tests_dir) durations = _load_durations(durations_path) records = categorize_all(tests_dir, Path(options.registry)) if options.audit: auto = [r for r in records if r.source == "auto"] print(f"Auto-inferred (unclassified) records: {len(auto)}") for r in auto: print(f" {r.filename}: fc={r.fixture_class.value}, subs={r.subsystems}, bg={r.batch_group}") if options.strict: bad = [r for r in auto if len(r.subsystems) > 1] if bad: print(f"STRICT: {len(bad)} auto-inferred files have multiple subsystems:") for r in bad: print(f" {r.filename}: subs={r.subsystems}") return 1 return 0 batches = plan(records, tiers=tiers, include_opt_in=options.include_opt_in, xdist=not options.no_xdist) if options.plan: for b in batches: status = "SKIP" if b.skip_reason else "RUN" print(f"[{status}] {b.label}: {len(b.files)} files, est {b.estimated_seconds:.1f}s") return 0 results: list[tuple[Batch, int, float]] = [] merged_durations = dict(durations) for b in batches: code, elapsed, new_durs = _run_batch(b, merged_durations) results.append((b, code, elapsed)) merged_durations.update(new_durs) if options.durations: _save_durations(durations_path, merged_durations) return _print_summary(results) if __name__ == "__main__": sys.exit(main())