770c2fdb32
Implements the 7th audit script referenced in python.md §17.8. Scans src/*.py for local imports (§17.9a), _PREFIX aliasing (§17.9b), and repeated .from_dict() in the same expression (§17.9c, info-only). Three changes in this commit: 1. scripts/audit_imports.py: AST-based scanner; exits 1 in --strict on LOCAL_IMPORT or PREFIX_ALIAS. Whitelist-aware via scripts/audit_imports_whitelist.toml (load with --show-whitelist; disable with --no-whitelist). 2. scripts/audit_imports_whitelist.toml: 21 files whitelisted with per-file reason (vendor SDK warmup, hot-reload re-imports, circular-dep avoidance). Suppresses 187 LOCAL_IMPORT sites; 0 strict violations remain. 3. conductor/code_styleguides/python.md: updated §17.8 (4th audit entry) and §17.9a (3 documented exceptions + whitelist mechanism). Tests: tests/test_audit_imports.py (7 tests, all passing).
345 lines
13 KiB
Python
345 lines
13 KiB
Python
"""Audit: enforce the local-imports + _PREFIX aliasing ban in src/*.py.
|
|
|
|
Per `conductor/code_styleguides/python.md` §17.9 (added 2026-06-27):
|
|
- §17.9a: local imports inside function bodies are BANNED (except in
|
|
`try/except ImportError` blocks for optional dependencies, AND in
|
|
files whitelisted for vendor-SDK warmup or hot-reload re-imports per
|
|
`scripts/audit_imports_whitelist.toml`).
|
|
- §17.9b: `import X as _X` aliasing-for-naming-convenience is BANNED.
|
|
- §17.9c: repeated `.from_dict()` calls in the same expression are BANNED.
|
|
|
|
This script AST-scans src/*.py for the above patterns and exits 1 in
|
|
--strict mode on any violation. The local-imports check is the strict
|
|
violation; _PREFIX aliasing is strict; repeated .from_dict() is INFO only
|
|
(detection is heuristic; relies on Tier 2 review for confirmation).
|
|
|
|
Usage:
|
|
uv run python scripts/audit_imports.py
|
|
uv run python scripts/audit_imports.py --strict
|
|
uv run python scripts/audit_imports.py --json
|
|
uv run python scripts/audit_imports.py --show-whitelist
|
|
"""
|
|
from __future__ import annotations
|
|
import argparse
|
|
import ast
|
|
import json
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import tomllib
|
|
except ImportError:
|
|
import tomli as tomllib
|
|
|
|
|
|
DEFAULT_SCAN_ROOT: str = "src"
|
|
DEFAULT_EXCLUDE_DIRS: tuple[str, ...] = ("__pycache__",)
|
|
DEFAULT_WHITELIST_PATH: str = "scripts/audit_imports_whitelist.toml"
|
|
|
|
|
|
def _is_within_optional_import_try(node: ast.stmt) -> bool:
|
|
"""Return True if `node` is an Import/ImportFrom inside a `try` whose
|
|
except handler is `except ImportError` (the canonical "optional
|
|
dependency" pattern). The check is structural: the Import statement
|
|
must be a direct child of a Try whose handlers are all ImportError.
|
|
"""
|
|
# Walk up: check the statement's parents via a heuristic (we don't have
|
|
# parent links in stdlib AST). The common pattern is:
|
|
# try:
|
|
# from foo import bar # <-- node
|
|
# except ImportError:
|
|
# bar = None
|
|
# So `node` is in Try.body[0..n], and Try.handlers are all ImportError.
|
|
# Caller must pass us the Try node directly; this helper checks the Try.
|
|
return False # Conservative: caller does the structural check via _parent_map
|
|
|
|
|
|
def _build_parent_map(tree: ast.AST) -> dict[int, ast.AST]:
|
|
"""Build a map id(node) -> parent node so we can check context."""
|
|
parents: dict[int, ast.AST] = {}
|
|
for node in ast.walk(tree):
|
|
for child in ast.iter_child_nodes(node):
|
|
parents[id(child)] = node
|
|
return parents
|
|
|
|
|
|
def _is_optional_import_try_node(try_node: ast.Try, parents: dict[int, ast.AST]) -> bool:
|
|
"""Return True if the Try is an optional-import guard (all except
|
|
handlers catch ImportError)."""
|
|
if not try_node.handlers:
|
|
return False
|
|
for handler in try_node.handlers:
|
|
if not isinstance(handler, ast.ExceptHandler):
|
|
return False
|
|
if handler.type is None:
|
|
# bare except: too broad, not an optional-import guard
|
|
return False
|
|
# The exception type can be Name('ImportError') or Attribute(value=Name('ImportError'))
|
|
t = handler.type
|
|
if isinstance(t, ast.Name) and t.id == "ImportError":
|
|
continue
|
|
if isinstance(t, ast.Attribute) and t.attr == "ImportError":
|
|
continue
|
|
return False
|
|
return True
|
|
|
|
|
|
def _enclosing_function_name(node: ast.AST, parents: dict[int, ast.AST]) -> str | None:
|
|
"""Walk up the parent chain to find the nearest enclosing FunctionDef
|
|
or AsyncFunctionDef. Returns the function name (or None if at module level).
|
|
Used to enrich LOCAL_IMPORT output with the enclosing function context."""
|
|
current: ast.AST | None = node
|
|
while current is not None:
|
|
parent = parents.get(id(current))
|
|
if parent is None:
|
|
return None
|
|
if isinstance(parent, (ast.FunctionDef, ast.AsyncFunctionDef)):
|
|
return parent.name
|
|
current = parent
|
|
return None
|
|
|
|
|
|
def _is_local_import(node: ast.stmt, parents: dict[int, ast.AST]) -> bool:
|
|
"""Return True if `node` is an Import/ImportFrom nested inside a
|
|
function body (NOT a module-level import, NOT inside an optional-import
|
|
try guard).
|
|
|
|
EXCEPTION 1 (per §17.9a): imports inside `try/except ImportError:` blocks
|
|
are allowed (the canonical "optional dependency" pattern).
|
|
|
|
EXCEPTION 2 (per §17.9a whitelist): files whitelisted in
|
|
`scripts/audit_imports_whitelist.toml` (vendor SDK warmup, hot-reload
|
|
re-imports) are filtered out at the audit_file() call site — this function
|
|
is unaware of the whitelist."""
|
|
# First, check the IMMEDIATE parent: if it's a Try-optional block, allow.
|
|
immediate_parent = parents.get(id(node))
|
|
if isinstance(immediate_parent, ast.Try) and _is_optional_import_try_node(immediate_parent, parents):
|
|
return False
|
|
# Otherwise, walk up looking for any FunctionDef ancestor.
|
|
current: ast.AST | None = node
|
|
while current is not None:
|
|
parent = parents.get(id(current))
|
|
if parent is None:
|
|
return False
|
|
if isinstance(parent, (ast.FunctionDef, ast.AsyncFunctionDef)):
|
|
return True
|
|
current = parent
|
|
return False
|
|
|
|
|
|
def _is_prefix_aliasing(target: ast.alias) -> bool:
|
|
"""Return True if the alias name starts with a single underscore
|
|
(per §17.9b: `import X as _X` is BANNED)."""
|
|
# ast.alias has `asname` (the alias after `as`); if None, no aliasing.
|
|
# Banned: asname starts with `_`.
|
|
# Allowed: `import X` (no `as`), `import X as real_name` (not starting with `_`).
|
|
if target.asname is None:
|
|
return False
|
|
return target.asname.startswith("_")
|
|
|
|
|
|
def _count_from_dict_in_expr(node: ast.expr) -> int:
|
|
"""Count `.from_dict(...)` attribute calls in `node` (heuristic;
|
|
may under/overcount with chained method calls but catches the common
|
|
pattern)."""
|
|
count = 0
|
|
for sub in ast.walk(node):
|
|
if isinstance(sub, ast.Call):
|
|
func = sub.func
|
|
if isinstance(func, ast.Attribute) and func.attr == "from_dict":
|
|
count += 1
|
|
return count
|
|
|
|
|
|
def load_whitelist(whitelist_path: Path) -> dict[str, dict]:
|
|
"""Load the warmed-import whitelist from a TOML file. Returns a dict
|
|
keyed by repo-relative file path (forward-slash normalized) -> metadata
|
|
({"reason": str, "scope": "file"}). Missing file returns empty dict."""
|
|
if not whitelist_path.exists():
|
|
return {}
|
|
try:
|
|
with open(whitelist_path, "rb") as f:
|
|
data = tomllib.load(f)
|
|
except (OSError, tomllib.TOMLDecodeError) as e:
|
|
print(f"WARN: could not load whitelist {whitelist_path}: {e}", file=sys.stderr)
|
|
return {}
|
|
return data.get("whitelist", {})
|
|
|
|
|
|
def _is_file_whitelisted(filepath: Path, whitelist: dict[str, dict], repo_root: Path) -> tuple[bool, str | None]:
|
|
"""Check whether `filepath` is covered by the whitelist. Returns
|
|
(is_whitelisted, reason). Uses forward-slash normalization for cross-OS
|
|
matching."""
|
|
try:
|
|
rel = filepath.resolve().relative_to(repo_root.resolve()).as_posix()
|
|
except ValueError:
|
|
return False, None
|
|
entry = whitelist.get(rel)
|
|
if entry is None:
|
|
return False, None
|
|
return True, entry.get("reason", "(no reason given)")
|
|
|
|
|
|
def audit_file(filepath: Path, whitelist: dict[str, dict] | None = None, repo_root: Path | None = None) -> list[dict]:
|
|
"""Audit one file: scan for local imports, _PREFIX aliasing, and
|
|
repeated .from_dict() in the same expression.
|
|
|
|
If `whitelist` is provided and the file is whitelisted (warmed imports
|
|
or hot-reload re-imports), LOCAL_IMPORT findings are filtered out and
|
|
replaced with a single WHITELIST annotation entry (so the user knows
|
|
the script saw them but is not flagging them).
|
|
"""
|
|
if not filepath.exists():
|
|
return [{"file": str(filepath), "line": 0, "kind": "MISSING_FILE", "note": "file not found"}]
|
|
try:
|
|
source = filepath.read_text(encoding="utf-8")
|
|
except (OSError, UnicodeDecodeError) as e:
|
|
return [{"file": str(filepath), "line": 0, "kind": "READ_ERROR", "note": str(e)}]
|
|
try:
|
|
tree = ast.parse(source)
|
|
except SyntaxError as e:
|
|
return [{"file": str(filepath), "line": e.lineno or 0, "kind": "SYNTAX_ERROR", "note": str(e)}]
|
|
parents = _build_parent_map(tree)
|
|
findings: list[dict] = []
|
|
whitelisted = False
|
|
whitelist_reason: str | None = None
|
|
if whitelist and repo_root:
|
|
whitelisted, whitelist_reason = _is_file_whitelisted(filepath, whitelist, repo_root)
|
|
# 1. Local imports (§17.9a) + _PREFIX aliasing (§17.9b)
|
|
for node in ast.walk(tree):
|
|
if isinstance(node, ast.Import):
|
|
for alias in node.names:
|
|
if _is_prefix_aliasing(alias):
|
|
findings.append({
|
|
"file": str(filepath),
|
|
"line": alias.lineno,
|
|
"kind": "PREFIX_ALIAS",
|
|
"note": f"`import {alias.name} as {alias.asname}` banned (§17.9b); use the real name",
|
|
})
|
|
if _is_local_import(node, parents):
|
|
func_name = _enclosing_function_name(node, parents)
|
|
location = f"inside {func_name}()" if func_name else "inside anonymous fn"
|
|
findings.append({
|
|
"file": str(filepath),
|
|
"line": node.lineno,
|
|
"kind": "LOCAL_IMPORT",
|
|
"note": f"`import {node.names[0].name}` {location} banned (§17.9a); move to module top",
|
|
})
|
|
elif isinstance(node, ast.ImportFrom):
|
|
module = node.module or ""
|
|
for alias in node.names:
|
|
if _is_prefix_aliasing(alias):
|
|
findings.append({
|
|
"file": str(filepath),
|
|
"line": alias.lineno,
|
|
"kind": "PREFIX_ALIAS",
|
|
"note": f"`from {module} import {alias.name} as {alias.asname}` banned (§17.9b); use the real name",
|
|
})
|
|
if _is_local_import(node, parents):
|
|
func_name = _enclosing_function_name(node, parents)
|
|
location = f"inside {func_name}()" if func_name else "inside anonymous fn"
|
|
findings.append({
|
|
"file": str(filepath),
|
|
"line": node.lineno,
|
|
"kind": "LOCAL_IMPORT",
|
|
"note": f"`from {module} import ...` {location} banned (§17.9a); move to module top",
|
|
})
|
|
elif isinstance(node, ast.Call):
|
|
# 2. Repeated .from_dict() in the same expression (§17.9c; INFO only)
|
|
fd_count = _count_from_dict_in_expr(node)
|
|
if fd_count > 1:
|
|
findings.append({
|
|
"file": str(filepath),
|
|
"line": node.lineno,
|
|
"kind": "REPEATED_FROM_DICT",
|
|
"note": f"expression contains {fd_count} .from_dict() calls (§17.9c INFO); cache in a local var",
|
|
})
|
|
if whitelisted:
|
|
# Filter LOCAL_IMPORT findings and add a single WHITELIST annotation
|
|
local_count = sum(1 for f in findings if f["kind"] == "LOCAL_IMPORT")
|
|
findings = [f for f in findings if f["kind"] != "LOCAL_IMPORT"]
|
|
if local_count > 0:
|
|
findings.insert(0, {
|
|
"file": str(filepath),
|
|
"line": 0,
|
|
"kind": "WHITELISTED",
|
|
"note": f"{local_count} LOCAL_IMPORT findings suppressed by whitelist: {whitelist_reason}",
|
|
})
|
|
return findings
|
|
|
|
|
|
def _iter_python_files(scan_root: str) -> list[Path]:
|
|
root = Path(scan_root)
|
|
if not root.is_dir():
|
|
return []
|
|
files: list[Path] = []
|
|
for p in root.rglob("*.py"):
|
|
if any(part in DEFAULT_EXCLUDE_DIRS for part in p.parts):
|
|
continue
|
|
files.append(p)
|
|
return sorted(files)
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Audit src/*.py for local imports + _PREFIX aliasing.")
|
|
parser.add_argument("--strict", action="store_true", help="Exit 1 on any LOCAL_IMPORT or PREFIX_ALIAS (REPEATED_FROM_DICT is info-only)")
|
|
parser.add_argument("--json", action="store_true", help="Output JSON")
|
|
parser.add_argument("--root", default=DEFAULT_SCAN_ROOT, help=f"Root directory to scan (default: {DEFAULT_SCAN_ROOT})")
|
|
parser.add_argument("--whitelist", default=DEFAULT_WHITELIST_PATH, help=f"Path to whitelist TOML (default: {DEFAULT_WHITELIST_PATH})")
|
|
parser.add_argument("--no-whitelist", action="store_true", help="Disable whitelist filtering (audit ALL files)")
|
|
parser.add_argument("--show-whitelist", action="store_true", help="Print the loaded whitelist and exit")
|
|
args = parser.parse_args()
|
|
repo_root = Path.cwd()
|
|
whitelist: dict[str, dict] = {}
|
|
if not args.no_whitelist:
|
|
whitelist = load_whitelist(repo_root / args.whitelist)
|
|
if args.show_whitelist:
|
|
print(f"Loaded {len(whitelist)} whitelisted files from {args.whitelist}:")
|
|
for path, entry in sorted(whitelist.items()):
|
|
print(f" - {path}")
|
|
print(f" reason: {entry.get('reason', '(no reason given)')}")
|
|
return 0
|
|
files = _iter_python_files(args.root)
|
|
all_findings: list[dict] = []
|
|
for filepath in files:
|
|
findings = audit_file(filepath, whitelist=whitelist, repo_root=repo_root)
|
|
all_findings.extend(findings)
|
|
if args.json:
|
|
out = {
|
|
"scan_root": args.root,
|
|
"files_scanned": len(files),
|
|
"files_with_findings": len({f["file"] for f in all_findings}),
|
|
"total_findings": len(all_findings),
|
|
"whitelisted_files": len(whitelist),
|
|
"by_kind": {
|
|
"LOCAL_IMPORT": sum(1 for f in all_findings if f["kind"] == "LOCAL_IMPORT"),
|
|
"PREFIX_ALIAS": sum(1 for f in all_findings if f["kind"] == "PREFIX_ALIAS"),
|
|
"REPEATED_FROM_DICT": sum(1 for f in all_findings if f["kind"] == "REPEATED_FROM_DICT"),
|
|
"WHITELISTED": sum(1 for f in all_findings if f["kind"] == "WHITELISTED"),
|
|
},
|
|
"findings": all_findings,
|
|
}
|
|
print(json.dumps(out, indent=2))
|
|
return 0
|
|
strict_findings = [f for f in all_findings if f["kind"] in ("LOCAL_IMPORT", "PREFIX_ALIAS")]
|
|
info_findings = [f for f in all_findings if f["kind"] == "REPEATED_FROM_DICT"]
|
|
whitelist_findings = [f for f in all_findings if f["kind"] == "WHITELISTED"]
|
|
print(f"Imports audit ({args.root}/): {len(all_findings)} total findings")
|
|
print(f" - {len(strict_findings)} strict (LOCAL_IMPORT + PREFIX_ALIAS)")
|
|
print(f" - {len(info_findings)} info (REPEATED_FROM_DICT)")
|
|
print(f" - {len(whitelist_findings)} whitelist annotations ({len(whitelist)} files whitelisted)")
|
|
for f in strict_findings:
|
|
print(f" STRICT: {f['file']}:{f['line']} [{f['kind']}] {f['note']}")
|
|
for f in info_findings:
|
|
print(f" INFO: {f['file']}:{f['line']} [{f['kind']}] {f['note']}")
|
|
for f in whitelist_findings:
|
|
print(f" WL: {f['file']} [{f['kind']}] {f['note']}")
|
|
if args.strict and strict_findings:
|
|
print(f"STRICT: {len(strict_findings)} violations")
|
|
return 1
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|