feat(run_tests_batched): full CLI with --tiers, --durations, actual pytest execution
This commit is contained in:
+104
-27
@@ -1,34 +1,84 @@
|
||||
import argparse
|
||||
import json
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "tests"))
|
||||
|
||||
from categorizer import categorize_all
|
||||
from batcher import plan
|
||||
from batcher import plan, Batch
|
||||
|
||||
def _print_plan(records, options) -> int:
|
||||
batches = plan(records, include_opt_in=options.include_opt_in, xdist=not options.no_xdist)
|
||||
for b in batches:
|
||||
status = "SKIP" if b.skip_reason else "RUN"
|
||||
print(f"[{status}] {b.label}: {len(b.files)} files, est {b.estimated_seconds:.1f}s, args={b.pytest_args}")
|
||||
def _parse_tiers(s: str) -> set[str]:
|
||||
return {t.strip() for t in s.split(",") if t.strip()}
|
||||
|
||||
def _durations_path(tests_dir: Path) -> Path:
|
||||
return tests_dir / ".test_durations.json"
|
||||
|
||||
def _load_durations(p: Path) -> dict[str, float]:
|
||||
if not p.exists():
|
||||
return {}
|
||||
try:
|
||||
with p.open("r", encoding="utf-8") as f:
|
||||
return json.load(f)
|
||||
except (json.JSONDecodeError, OSError):
|
||||
return {}
|
||||
|
||||
def _save_durations(p: Path, durations: dict[str, float]) -> None:
|
||||
tmp = p.with_suffix(".json.tmp")
|
||||
with tmp.open("w", encoding="utf-8") as f:
|
||||
json.dump(durations, f, indent=2, sort_keys=True)
|
||||
tmp.replace(p)
|
||||
|
||||
def _parse_durations_from_pytest_output(stdout: str) -> dict[str, float]:
|
||||
out: dict[str, float] = {}
|
||||
for line in stdout.splitlines():
|
||||
line = line.strip()
|
||||
if "::" not in line or " " not in line:
|
||||
continue
|
||||
parts = line.rsplit(None, 1)
|
||||
if len(parts) != 2:
|
||||
continue
|
||||
nodeid, time_str = parts
|
||||
try:
|
||||
out[nodeid] = float(time_str.rstrip("s"))
|
||||
except ValueError:
|
||||
continue
|
||||
return out
|
||||
|
||||
def _run_batch(b: Batch, durations: dict[str, float]) -> tuple[int, float, dict[str, float]]:
|
||||
if b.skip_reason:
|
||||
return 0, 0.0, {}
|
||||
cmd = ["uv", "run", "pytest", "-v", "--durations=0"] + b.pytest_args + [str(f) for f in b.files]
|
||||
print(f"\n>>> Running {b.label} ({len(b.files)} files)")
|
||||
t0 = time.monotonic()
|
||||
proc = subprocess.run(cmd, capture_output=True, text=True)
|
||||
elapsed = time.monotonic() - t0
|
||||
new_durs = _parse_durations_from_pytest_output(proc.stdout)
|
||||
tail = proc.stdout[-2000:] if proc.returncode != 0 else f"<<< {b.label} PASS in {elapsed:.1f}s"
|
||||
print(tail)
|
||||
if proc.returncode != 0:
|
||||
print(f"<<< {b.label} FAIL (exit {proc.returncode}) in {elapsed:.1f}s")
|
||||
print(proc.stderr[-1000:])
|
||||
return proc.returncode, elapsed, new_durs
|
||||
|
||||
def _print_summary(results: list[tuple[Batch, int, float]]) -> int:
|
||||
print("\n" + "=" * 60)
|
||||
print("SUMMARY")
|
||||
print("=" * 60)
|
||||
worst = 0
|
||||
for b, code, elapsed in results:
|
||||
if b.skip_reason:
|
||||
print(f" reason: {b.skip_reason}")
|
||||
return 0
|
||||
|
||||
def _print_audit(records, strict: bool) -> int:
|
||||
auto = [r for r in records if r.source == "auto"]
|
||||
print(f"Auto-inferred (unclassified) records: {len(auto)}")
|
||||
for r in auto:
|
||||
print(f" {r.filename}: fc={r.fixture_class.value}, subs={r.subsystems}, bg={r.batch_group}")
|
||||
if strict:
|
||||
bad = [r for r in auto if len(r.subsystems) > 1]
|
||||
if bad:
|
||||
print(f"STRICT: {len(bad)} auto-inferred files have multiple subsystems (probably cross-cutting):")
|
||||
for r in bad:
|
||||
print(f" {r.filename}: subs={r.subsystems}")
|
||||
return 1
|
||||
return 0
|
||||
status = "SKIPPED"
|
||||
elif code == 0:
|
||||
status = "PASS"
|
||||
else:
|
||||
status = "FAIL"
|
||||
worst = max(worst, code)
|
||||
n = len(b.files)
|
||||
print(f"[{b.tier}] {b.label:40s} {status:8s} {n} files {elapsed:6.1f}s")
|
||||
return worst
|
||||
|
||||
def main() -> int:
|
||||
p = argparse.ArgumentParser()
|
||||
@@ -40,14 +90,41 @@ def main() -> int:
|
||||
p.add_argument("--plan", action="store_true")
|
||||
p.add_argument("--audit", action="store_true")
|
||||
p.add_argument("--strict", action="store_true")
|
||||
p.add_argument("--durations", action="store_true", help="Record per-test durations to .test_durations.json")
|
||||
options = p.parse_args()
|
||||
records = categorize_all(Path(options.tests_dir), Path(options.registry))
|
||||
tiers = _parse_tiers(options.tiers)
|
||||
tests_dir = Path(options.tests_dir)
|
||||
durations_path = _durations_path(tests_dir)
|
||||
durations = _load_durations(durations_path)
|
||||
records = categorize_all(tests_dir, Path(options.registry))
|
||||
if options.audit:
|
||||
return _print_audit(records, strict=options.strict)
|
||||
auto = [r for r in records if r.source == "auto"]
|
||||
print(f"Auto-inferred (unclassified) records: {len(auto)}")
|
||||
for r in auto:
|
||||
print(f" {r.filename}: fc={r.fixture_class.value}, subs={r.subsystems}, bg={r.batch_group}")
|
||||
if options.strict:
|
||||
bad = [r for r in auto if len(r.subsystems) > 1]
|
||||
if bad:
|
||||
print(f"STRICT: {len(bad)} auto-inferred files have multiple subsystems:")
|
||||
for r in bad:
|
||||
print(f" {r.filename}: subs={r.subsystems}")
|
||||
return 1
|
||||
return 0
|
||||
batches = plan(records, tiers=tiers, include_opt_in=options.include_opt_in, xdist=not options.no_xdist)
|
||||
if options.plan:
|
||||
return _print_plan(records, options)
|
||||
print("Phase 1 stub: no actual test execution yet. Use --plan or --audit.")
|
||||
return 0
|
||||
for b in batches:
|
||||
status = "SKIP" if b.skip_reason else "RUN"
|
||||
print(f"[{status}] {b.label}: {len(b.files)} files, est {b.estimated_seconds:.1f}s")
|
||||
return 0
|
||||
results: list[tuple[Batch, int, float]] = []
|
||||
merged_durations = dict(durations)
|
||||
for b in batches:
|
||||
code, elapsed, new_durs = _run_batch(b, merged_durations)
|
||||
results.append((b, code, elapsed))
|
||||
merged_durations.update(new_durs)
|
||||
if options.durations:
|
||||
_save_durations(durations_path, merged_durations)
|
||||
return _print_summary(results)
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
|
||||
Reference in New Issue
Block a user