273 lines
8.8 KiB
Python
273 lines
8.8 KiB
Python
import argparse
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import xdist as _xdist
|
|
_HAS_XDIST = True
|
|
except ImportError:
|
|
_HAS_XDIST = False
|
|
|
|
_SCRIPT_DIR = Path(__file__).resolve().parent
|
|
_PROJECT_ROOT = _SCRIPT_DIR.parent
|
|
sys.path.insert(0, str(_PROJECT_ROOT / "tests"))
|
|
|
|
_USE_COLOR = sys.stdout.isatty() or os.environ.get("FORCE_COLOR") == "1"
|
|
if _USE_COLOR and os.name == "nt":
|
|
try:
|
|
import ctypes
|
|
kernel32 = ctypes.windll.kernel32
|
|
kernel32.SetConsoleMode(kernel32.GetStdHandle(-11), 7)
|
|
except Exception:
|
|
_USE_COLOR = False
|
|
|
|
class _C:
|
|
RESET = "\033[0m"
|
|
BOLD = "\033[1m"
|
|
DIM = "\033[2m"
|
|
RED = "\033[31m"
|
|
GREEN = "\033[32m"
|
|
YELLOW = "\033[33m"
|
|
BLUE = "\033[34m"
|
|
MAGENTA = "\033[35m"
|
|
CYAN = "\033[36m"
|
|
BOLD_GREEN = "\033[1;32m"
|
|
BOLD_RED = "\033[1;31m"
|
|
BOLD_YELLOW = "\033[1;33m"
|
|
BOLD_CYAN = "\033[1;36m"
|
|
|
|
def _c(text: str, color: str) -> str:
|
|
if not _USE_COLOR:
|
|
return text
|
|
return f"{color}{text}{_C.RESET}"
|
|
|
|
from categorizer import categorize_all
|
|
from batcher import plan, Batch
|
|
|
|
def _parse_tiers(s: str) -> set[str]:
|
|
return {t.strip() for t in s.split(",") if t.strip()}
|
|
|
|
def _durations_path(tests_dir: Path) -> Path:
|
|
return tests_dir / ".test_durations.json"
|
|
|
|
def _load_durations(p: Path) -> dict[str, float]:
|
|
if not p.exists():
|
|
return {}
|
|
try:
|
|
with p.open("r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, OSError):
|
|
return {}
|
|
|
|
def _save_durations(p: Path, durations: dict[str, float]) -> None:
|
|
tmp = p.with_suffix(".json.tmp")
|
|
with tmp.open("w", encoding="utf-8") as f:
|
|
json.dump(durations, f, indent=2, sort_keys=True)
|
|
tmp.replace(p)
|
|
|
|
def _parse_durations_from_pytest_output(stdout: str) -> dict[str, float]:
|
|
out: dict[str, float] = {}
|
|
for line in stdout.splitlines():
|
|
line = line.strip()
|
|
if "::" not in line or " " not in line:
|
|
continue
|
|
parts = line.rsplit(None, 1)
|
|
if len(parts) != 2:
|
|
continue
|
|
nodeid, time_str = parts
|
|
try:
|
|
out[nodeid] = float(time_str.rstrip("s"))
|
|
except ValueError:
|
|
continue
|
|
return out
|
|
|
|
_NOISE_PREFIXES: tuple[str, ...] = (
|
|
"[LogPruner]",
|
|
"[startup]",
|
|
"created: ",
|
|
"Error during log pruning",
|
|
"=========",
|
|
)
|
|
|
|
_NOISE_SUBSTRINGS: tuple[str, ...] = (
|
|
"[WinError",
|
|
"File must be opened in binary mode",
|
|
)
|
|
|
|
def _format_pytest_line(line: str) -> str | None:
|
|
stripped = line.rstrip()
|
|
if not stripped:
|
|
return None
|
|
for prefix in _NOISE_PREFIXES:
|
|
if stripped.startswith(prefix):
|
|
return None
|
|
for sub in _NOISE_SUBSTRINGS:
|
|
if sub in stripped:
|
|
return None
|
|
if stripped.startswith(("tests/", "tests\\")) and "::" in stripped and len(stripped.split()) == 1:
|
|
return None
|
|
if " PASSED " in stripped and "[gw" in stripped:
|
|
return _c(stripped, _C.GREEN)
|
|
if " FAILED " in stripped and "[gw" in stripped:
|
|
return _c(stripped, _C.BOLD_RED)
|
|
if " ERROR " in stripped and "[gw" in stripped:
|
|
return _c(stripped, _C.BOLD_RED)
|
|
if stripped.startswith(("tests/", "tests\\")) and "::" in stripped:
|
|
if " PASSED" in stripped:
|
|
return _c(stripped, _C.GREEN)
|
|
if " FAILED" in stripped:
|
|
return _c(stripped, _C.BOLD_RED)
|
|
if " ERROR" in stripped:
|
|
return _c(stripped, _C.BOLD_RED)
|
|
if stripped.startswith(("PASSED", "FAILED", "ERROR")) and "::" in stripped:
|
|
status = stripped.split()[0]
|
|
rest = stripped[len(status):]
|
|
if status == "PASSED":
|
|
return _c(f"{status}{rest}", _C.GREEN)
|
|
return _c(f"{status}{rest}", _C.BOLD_RED)
|
|
if stripped.startswith(("passed", "failed", "error")) and " in " in stripped and stripped.endswith("s"):
|
|
return _c(stripped, _C.BOLD)
|
|
return stripped
|
|
|
|
def _run_batch(b: Batch, durations: dict[str, float]) -> tuple[int, float, dict[str, float]]:
|
|
if b.skip_reason:
|
|
return 0, 0.0, {}
|
|
args = list(b.pytest_args)
|
|
if not _HAS_XDIST:
|
|
args = [a for a in args if a not in {"-n", "auto"}]
|
|
cmd = ["uv", "run", "pytest", "-v", "--durations=3"] + args + [str(f) for f in b.files]
|
|
print(_c(f"\n>>> Running {b.label} ({len(b.files)} files)", _C.BOLD_CYAN))
|
|
t0 = time.monotonic()
|
|
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1)
|
|
captured: list[str] = []
|
|
assert proc.stdout is not None
|
|
for line in proc.stdout:
|
|
captured.append(line)
|
|
formatted = _format_pytest_line(line)
|
|
if formatted is None:
|
|
continue
|
|
print(formatted)
|
|
proc.wait()
|
|
elapsed = time.monotonic() - t0
|
|
new_durs = _parse_durations_from_pytest_output("".join(captured))
|
|
if proc.returncode == 0:
|
|
print(_c(f"<<< {b.label} PASS in {elapsed:.1f}s", _C.BOLD_GREEN))
|
|
else:
|
|
print(_c(f"<<< {b.label} FAIL (exit {proc.returncode}) in {elapsed:.1f}s", _C.BOLD_RED))
|
|
return proc.returncode, elapsed, new_durs
|
|
|
|
def _print_summary(results: list[tuple[Batch, int, float]]) -> int:
|
|
print()
|
|
rows: list[tuple[str, str, str, int, float, int]] = []
|
|
worst = 0
|
|
total_files = 0
|
|
total_time = 0.0
|
|
passed_count = 0
|
|
failed_count = 0
|
|
skipped_count = 0
|
|
for b, code, elapsed in results:
|
|
n = len(b.files)
|
|
total_files += n
|
|
total_time += elapsed
|
|
if b.skip_reason:
|
|
status_text = "SKIPPED"
|
|
skipped_count += 1
|
|
elif code == 0:
|
|
status_text = "PASS"
|
|
passed_count += 1
|
|
else:
|
|
status_text = "FAIL"
|
|
failed_count += 1
|
|
worst = max(worst, code)
|
|
rows.append((b.tier, b.label, status_text, n, elapsed, code))
|
|
tier_w = max(len("TIER"), max(len(r[0]) for r in rows))
|
|
label_w = max(len("BATCH LABEL"), max(len(r[1]) for r in rows))
|
|
status_w = max(len("STATUS"), max(len(r[2]) for r in rows))
|
|
files_w = max(len("FILES"), max(len(str(r[3])) for r in rows))
|
|
time_w = max(len("TIME"), max(len(f"{r[4]:.1f}s") for r in rows))
|
|
header = f" {'TIER':{tier_w}s} │ {'BATCH LABEL':{label_w}s} │ {'STATUS':{status_w}s} │ {'FILES':>{files_w}s} │ {'TIME':>{time_w}s} "
|
|
sep = "─" * len(header)
|
|
print(_c(sep, _C.DIM))
|
|
print(_c(header, _C.BOLD))
|
|
print(_c(sep, _C.DIM))
|
|
for tier, label, status_text, n, elapsed, _code in rows:
|
|
if status_text == "PASS":
|
|
status = _c(status_text, _C.BOLD_GREEN)
|
|
elif status_text == "FAIL":
|
|
status = _c(status_text, _C.BOLD_RED)
|
|
else:
|
|
status = _c(status_text, _C.BOLD_YELLOW)
|
|
tier_colored = _c(f" {tier:<{tier_w}s}", _C.CYAN)
|
|
print(f"{tier_colored} │ {label:<{label_w}s} │ {status} │ {n:>{files_w}d} │ {elapsed:>{time_w - 1}.1f}s")
|
|
print(_c(sep, _C.DIM))
|
|
if failed_count:
|
|
overall_text = f"{failed_count} FAILED"
|
|
overall = _c(overall_text, _C.BOLD_RED)
|
|
elif passed_count:
|
|
overall_text = f"ALL {passed_count} PASS"
|
|
overall = _c(overall_text, _C.BOLD_GREEN)
|
|
else:
|
|
overall_text = "NO BATCHES RUN"
|
|
overall = _c(overall_text, _C.BOLD_YELLOW)
|
|
total_label = _c(f" {'TOTAL':<{tier_w}s}", _C.BOLD)
|
|
print(f"{total_label} │ {'':<{label_w}s} │ {overall} │ {total_files:>{files_w}d} │ {total_time:>{time_w - 1}.1f}s")
|
|
print(_c(sep, _C.DIM))
|
|
return worst
|
|
|
|
def main() -> int:
|
|
p = argparse.ArgumentParser()
|
|
p.add_argument("--tests-dir", default=str(_PROJECT_ROOT / "tests"))
|
|
p.add_argument("--registry", default=str(_PROJECT_ROOT / "tests" / "test_categories.toml"))
|
|
p.add_argument("--tiers", default="1,2,3,H")
|
|
p.add_argument("--include-opt-in", action="store_true")
|
|
p.add_argument("--no-xdist", action="store_true")
|
|
p.add_argument("--plan", action="store_true")
|
|
p.add_argument("--audit", action="store_true")
|
|
p.add_argument("--strict", action="store_true")
|
|
p.add_argument("--durations", action="store_true", help="Record per-test durations to .test_durations.json")
|
|
p.add_argument("--no-color", action="store_true", help="Disable ANSI color output")
|
|
options = p.parse_args()
|
|
if options.no_color:
|
|
global _USE_COLOR
|
|
_USE_COLOR = False
|
|
tiers = _parse_tiers(options.tiers)
|
|
tests_dir = Path(options.tests_dir) if Path(options.tests_dir).is_absolute() else (_PROJECT_ROOT / options.tests_dir)
|
|
durations_path = _durations_path(tests_dir)
|
|
durations = _load_durations(durations_path)
|
|
records = categorize_all(tests_dir, Path(options.registry))
|
|
if options.audit:
|
|
auto = [r for r in records if r.source == "auto"]
|
|
print(f"Auto-inferred (unclassified) records: {len(auto)}")
|
|
for r in auto:
|
|
print(f" {r.filename}: fc={r.fixture_class.value}, subs={r.subsystems}, bg={r.batch_group}")
|
|
if options.strict:
|
|
bad = [r for r in auto if len(r.subsystems) > 1]
|
|
if bad:
|
|
print(f"STRICT: {len(bad)} auto-inferred files have multiple subsystems:")
|
|
for r in bad:
|
|
print(f" {r.filename}: subs={r.subsystems}")
|
|
return 1
|
|
return 0
|
|
batches = plan(records, tiers=tiers, include_opt_in=options.include_opt_in, xdist=not options.no_xdist)
|
|
if options.plan:
|
|
for b in batches:
|
|
status = "SKIP" if b.skip_reason else "RUN"
|
|
print(f"[{status}] {b.label}: {len(b.files)} files, est {b.estimated_seconds:.1f}s")
|
|
return 0
|
|
results: list[tuple[Batch, int, float]] = []
|
|
merged_durations = dict(durations)
|
|
for b in batches:
|
|
code, elapsed, new_durs = _run_batch(b, merged_durations)
|
|
results.append((b, code, elapsed))
|
|
merged_durations.update(new_durs)
|
|
if options.durations:
|
|
_save_durations(durations_path, merged_durations)
|
|
return _print_summary(results)
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|