"""Third-party license + CVE + version-pin audit tool. Audits the project's dependencies (pyproject.toml + uv.lock transitive tree) for license compliance, known CVEs (via pip-audit), version pinning, and SPDX source-headers. See conductor/tracks/license_cve_audit_20260607/spec.md. Output: line-per-violation to stdout (parseable) + a markdown report under docs/reports/license_cve_audit//. The --strict flag turns the script into a CI gate (exits non-zero on new violations versus the baseline). """ from __future__ import annotations import json import re import subprocess import sys import tomllib from dataclasses import dataclass, field from importlib import metadata from pathlib import Path from typing import Literal ALLOW_LICENSES: frozenset[str] = frozenset({ "MIT", "MIT-0", "BSD", "BSD-2-Clause", "BSD-3-Clause", "0BSD", "Apache", "Apache-2.0", "Apache 2.0", "Apache-2.0 WITH LLVM-exception", "ISC", "ISC-License", "Unlicense", "Unlicense-2.0", "Zlib", "zlib-acknowledgement", "Python-2.0", "PSF-2.0", "PSF", "CNRI-Python", "LGPL", "LGPL-2.0", "LGPL-2.1", "LGPL-3.0", "LGPL-2.0-or-later", "LGPL-2.1-or-later", "LGPL-3.0-or-later", "MPL", "MPL-1.1", "MPL-2.0", "CC0", "CC0-1.0", "WTFPL", "Anti-996", "Anti-996-License", "Hippocratic", "Hippocratic-2.1", }) BLOCK_LICENSES: frozenset[str] = frozenset({ "GPL", "GPL-1.0", "GPL-2.0", "GPL-3.0", "GPL-2.0-or-later", "GPL-3.0-or-later", "AGPL", "AGPL-1.0", "AGPL-3.0", "AGPL-3.0-or-later", "SSPL", "SSPL-1.0", "Server Side Public License", "BUSL", "BUSL-1.1", "BSL", "BSL-1.1", "Commons-Clause", "Elastic", "Elastic-2.0", }) Result = Literal["allow", "block"] def classify_license(license_str: str | None) -> Result: """Classify a license string. Returns 'allow' or 'block'. Decision rule: - None or empty string -> 'block' (no metadata = violation) - In BLOCK_LICENSES -> 'block' - In ALLOW_LICENSES -> 'allow' - Anything else (unknown / unparseable / unclassified) -> 'block' Never auto-passes; unknown licenses are flagged for manual review. """ if not license_str: return "block" normalized = license_str.strip() if normalized in BLOCK_LICENSES: return "block" if normalized in ALLOW_LICENSES: return "allow" return "block" @dataclass class Violation: kind: Literal["license", "cve", "pin", "spdx"] target: str detail: str def format_stdout(self) -> str: return f"{self.kind.upper()}_VIOLATION target={self.target} detail={self.detail!r}" def check_pins(pyproject_path: Path) -> list[Violation]: """Parse pyproject.toml and flag any dep without a version specifier.""" with pyproject_path.open("rb") as f: data = tomllib.load(f) violations: list[Violation] = [] for dep in data.get("project", {}).get("dependencies", []): name = re.split(r"[<>=!~;\[ ]", dep, maxsplit=1)[0].strip() has_specifier = any(op in dep for op in ("<", ">", "=", "~", "!")) if not has_specifier: violations.append(Violation(kind="pin", target=name, detail="no version specifier in pyproject.toml")) return violations SPDX_PATTERN = re.compile(r"SPDX-License-Identifier:\s*(\S+)", re.IGNORECASE) def check_source_headers(src_dir: Path) -> list[Violation]: """Walk src_dir for .py files; flag any with a non-permissive SPDX.""" violations: list[Violation] = [] for py_file in src_dir.rglob("*.py"): try: text = py_file.read_text(encoding="utf-8", errors="replace") except OSError: continue head = "\n".join(text.splitlines()[:20]) m = SPDX_PATTERN.search(head) if m and classify_license(m.group(1)) == "block": violations.append(Violation( kind="spdx", target=str(py_file), detail=f"license={m.group(1)!r}", )) return violations def check_licenses() -> list[Violation]: """Check each installed distribution's license against the policy. Iterates importlib.metadata.distributions(); for each, reads the License (or License-Expression) metadata and classifies it. If classify_license returns 'block', the dep is a violation. """ violations: list[Violation] = [] for dist in metadata.distributions(): name = dist.metadata["Name"] license_str = dist.metadata.get("License") or dist.metadata.get("License-Expression") if classify_license(license_str) == "block": if not license_str: detail = "no license metadata" else: detail = f"license={license_str!r}" violations.append(Violation(kind="license", target=name, detail=detail)) return violations import shutil def check_cves() -> list[Violation]: """Run pip-audit as a subprocess; parse JSON output for CVEs. If pip-audit is not installed, this is a no-op (returns []). The script logs a warning so the user knows the CVE check was skipped. """ if shutil.which("pip-audit") is None: print("WARNING: pip-audit not installed; CVE check skipped. Install via 'uv tool install pip-audit'.", file=sys.stderr) return [] try: result = subprocess.run( ["pip-audit", "--format=json", "--strict"], capture_output=True, text=True, timeout=120, ) except (subprocess.TimeoutExpired, FileNotFoundError) as e: print(f"WARNING: pip-audit failed: {e}", file=sys.stderr) return [] if result.returncode != 0 and not result.stdout.strip(): print(f"WARNING: pip-audit returned non-zero with no output: {result.stderr}", file=sys.stderr) return [] try: data = json.loads(result.stdout) except json.JSONDecodeError: return [] violations: list[Violation] = [] for dep in data.get("dependencies", []): name = dep.get("name", "") for vuln in dep.get("vulns", []): cve_id = vuln.get("id", "") fix = ", ".join(vuln.get("fix_versions", []) or [""]) severity = vuln.get("severity", "unknown") violations.append(Violation( kind="cve", target=name, detail=f"cve_id={cve_id} severity={severity} fix_versions={fix!r}", )) return violations def main() -> int: import argparse parser = argparse.ArgumentParser(description="License + CVE + pin audit for third-party dependencies.") parser.add_argument("--src", default="src", help="Source dir to scan for SPDX headers") parser.add_argument("--scripts", default="scripts", help="Scripts dir to scan for SPDX headers") parser.add_argument("--pyproject", default="pyproject.toml", help="Path to pyproject.toml") parser.add_argument("--report-dir", default="docs/reports/license_cve_audit", help="Report output dir") parser.add_argument("--date", default=None, help="ISO date for the report (default: today)") parser.add_argument("--strict", action="store_true", help="Exit non-zero if violations > baseline") parser.add_argument("--dump-baseline", action="store_true", help="Write current violations as the new baseline") parser.add_argument("--report-name", default="initial", help="Report filename (default: 'initial'; use 'final' for post-cleanup)") args = parser.parse_args() violations: list[Violation] = [] violations.extend(check_licenses()) violations.extend(check_cves()) violations.extend(check_pins(Path(args.pyproject))) src_dir = Path(args.src) if src_dir.exists(): violations.extend(check_source_headers(src_dir)) scripts_dir = Path(args.scripts) if scripts_dir.exists(): violations.extend(check_source_headers(scripts_dir)) for v in violations: print(v.format_stdout()) from datetime import date date_str = args.date or date.today().isoformat() report_dir = Path(args.report_dir) / date_str report_dir.mkdir(parents=True, exist_ok=True) report_path = report_dir / f"{args.report_name}.md" _write_report(violations, report_path, args) if args.strict: baseline_path = Path(__file__).parent / "audit_license_cve.baseline.json" if baseline_path.exists(): baseline = json.loads(baseline_path.read_text(encoding="utf-8")) baseline_n = len(baseline.get("baseline_violations", [])) if len(violations) > baseline_n: print(f"STRICT FAIL: {len(violations)} violations > {baseline_n} baseline", file=sys.stderr) return 1 if args.dump_baseline: baseline_path = Path(__file__).parent / "audit_license_cve.baseline.json" baseline_path.parent.mkdir(parents=True, exist_ok=True) baseline_path.write_text(json.dumps({ "schema_version": 1, "baseline_violations": [v.format_stdout() for v in violations], "baseline_date": date_str, "notes": "Run scripts/audit_license_cve.py --dump-baseline to regenerate.", }, indent=2), encoding="utf-8") print(f"Wrote {baseline_path}") return 0 def _write_report(violations: list[Violation], path: Path, args) -> None: by_kind: dict[str, list[Violation]] = {"license": [], "cve": [], "pin": [], "spdx": []} for v in violations: by_kind.setdefault(v.kind, []).append(v) lines: list[str] = [ f"# License & CVE Audit - {args.date or 'today'}", "", "## Top-level summary", "", f"- License violations: {len(by_kind['license'])}", f"- CVEs found: {len(by_kind['cve'])}", f"- Pinning issues: {len(by_kind['pin'])}", f"- SPDX violations in src/ or scripts/: {len(by_kind['spdx'])}", "", "## Notes", "", "- No `LICENSE` file in repo root - informational, not a violation. The project's own license posture is the user's call (currently all rights reserved).", "- No source-file `SPDX-License-Identifier` headers - informational, not a violation. The project's own copyright headers are the user's call.", "- If pip-audit is not installed, the CVE check is skipped. Install via `uv tool install pip-audit` to enable.", "", "## Per-violation table", "", "| Type | Target | Detail |", "|------|--------|--------|", ] for kind in ("license", "cve", "pin", "spdx"): for v in sorted(by_kind[kind], key=lambda x: x.target): lines.append(f"| {v.kind} | `{v.target}` | {v.detail} |") path.write_text("\n".join(lines) + "\n", encoding="utf-8") print(f"Wrote {path}") if __name__ == "__main__": sys.exit(main())