Private
Public Access
0
0
Files
manual_slop/scripts/audit_imports.py
T
ed 770c2fdb32 feat(audit): add audit_imports.py + warmed-import whitelist for §17.9a
Implements the 7th audit script referenced in python.md §17.8. Scans
src/*.py for local imports (§17.9a), _PREFIX aliasing (§17.9b), and
repeated .from_dict() in the same expression (§17.9c, info-only).

Three changes in this commit:
1. scripts/audit_imports.py: AST-based scanner; exits 1 in --strict on
   LOCAL_IMPORT or PREFIX_ALIAS. Whitelist-aware via
   scripts/audit_imports_whitelist.toml (load with --show-whitelist;
   disable with --no-whitelist).
2. scripts/audit_imports_whitelist.toml: 21 files whitelisted with per-file
   reason (vendor SDK warmup, hot-reload re-imports, circular-dep avoidance).
   Suppresses 187 LOCAL_IMPORT sites; 0 strict violations remain.
3. conductor/code_styleguides/python.md: updated §17.8 (4th audit entry)
   and §17.9a (3 documented exceptions + whitelist mechanism).

Tests: tests/test_audit_imports.py (7 tests, all passing).
2026-06-26 09:24:10 -04:00

345 lines
13 KiB
Python

"""Audit: enforce the local-imports + _PREFIX aliasing ban in src/*.py.
Per `conductor/code_styleguides/python.md` §17.9 (added 2026-06-27):
- §17.9a: local imports inside function bodies are BANNED (except in
`try/except ImportError` blocks for optional dependencies, AND in
files whitelisted for vendor-SDK warmup or hot-reload re-imports per
`scripts/audit_imports_whitelist.toml`).
- §17.9b: `import X as _X` aliasing-for-naming-convenience is BANNED.
- §17.9c: repeated `.from_dict()` calls in the same expression are BANNED.
This script AST-scans src/*.py for the above patterns and exits 1 in
--strict mode on any violation. The local-imports check is the strict
violation; _PREFIX aliasing is strict; repeated .from_dict() is INFO only
(detection is heuristic; relies on Tier 2 review for confirmation).
Usage:
uv run python scripts/audit_imports.py
uv run python scripts/audit_imports.py --strict
uv run python scripts/audit_imports.py --json
uv run python scripts/audit_imports.py --show-whitelist
"""
from __future__ import annotations
import argparse
import ast
import json
import sys
from pathlib import Path
try:
import tomllib
except ImportError:
import tomli as tomllib
DEFAULT_SCAN_ROOT: str = "src"
DEFAULT_EXCLUDE_DIRS: tuple[str, ...] = ("__pycache__",)
DEFAULT_WHITELIST_PATH: str = "scripts/audit_imports_whitelist.toml"
def _is_within_optional_import_try(node: ast.stmt) -> bool:
"""Return True if `node` is an Import/ImportFrom inside a `try` whose
except handler is `except ImportError` (the canonical "optional
dependency" pattern). The check is structural: the Import statement
must be a direct child of a Try whose handlers are all ImportError.
"""
# Walk up: check the statement's parents via a heuristic (we don't have
# parent links in stdlib AST). The common pattern is:
# try:
# from foo import bar # <-- node
# except ImportError:
# bar = None
# So `node` is in Try.body[0..n], and Try.handlers are all ImportError.
# Caller must pass us the Try node directly; this helper checks the Try.
return False # Conservative: caller does the structural check via _parent_map
def _build_parent_map(tree: ast.AST) -> dict[int, ast.AST]:
"""Build a map id(node) -> parent node so we can check context."""
parents: dict[int, ast.AST] = {}
for node in ast.walk(tree):
for child in ast.iter_child_nodes(node):
parents[id(child)] = node
return parents
def _is_optional_import_try_node(try_node: ast.Try, parents: dict[int, ast.AST]) -> bool:
"""Return True if the Try is an optional-import guard (all except
handlers catch ImportError)."""
if not try_node.handlers:
return False
for handler in try_node.handlers:
if not isinstance(handler, ast.ExceptHandler):
return False
if handler.type is None:
# bare except: too broad, not an optional-import guard
return False
# The exception type can be Name('ImportError') or Attribute(value=Name('ImportError'))
t = handler.type
if isinstance(t, ast.Name) and t.id == "ImportError":
continue
if isinstance(t, ast.Attribute) and t.attr == "ImportError":
continue
return False
return True
def _enclosing_function_name(node: ast.AST, parents: dict[int, ast.AST]) -> str | None:
"""Walk up the parent chain to find the nearest enclosing FunctionDef
or AsyncFunctionDef. Returns the function name (or None if at module level).
Used to enrich LOCAL_IMPORT output with the enclosing function context."""
current: ast.AST | None = node
while current is not None:
parent = parents.get(id(current))
if parent is None:
return None
if isinstance(parent, (ast.FunctionDef, ast.AsyncFunctionDef)):
return parent.name
current = parent
return None
def _is_local_import(node: ast.stmt, parents: dict[int, ast.AST]) -> bool:
"""Return True if `node` is an Import/ImportFrom nested inside a
function body (NOT a module-level import, NOT inside an optional-import
try guard).
EXCEPTION 1 (per §17.9a): imports inside `try/except ImportError:` blocks
are allowed (the canonical "optional dependency" pattern).
EXCEPTION 2 (per §17.9a whitelist): files whitelisted in
`scripts/audit_imports_whitelist.toml` (vendor SDK warmup, hot-reload
re-imports) are filtered out at the audit_file() call site — this function
is unaware of the whitelist."""
# First, check the IMMEDIATE parent: if it's a Try-optional block, allow.
immediate_parent = parents.get(id(node))
if isinstance(immediate_parent, ast.Try) and _is_optional_import_try_node(immediate_parent, parents):
return False
# Otherwise, walk up looking for any FunctionDef ancestor.
current: ast.AST | None = node
while current is not None:
parent = parents.get(id(current))
if parent is None:
return False
if isinstance(parent, (ast.FunctionDef, ast.AsyncFunctionDef)):
return True
current = parent
return False
def _is_prefix_aliasing(target: ast.alias) -> bool:
"""Return True if the alias name starts with a single underscore
(per §17.9b: `import X as _X` is BANNED)."""
# ast.alias has `asname` (the alias after `as`); if None, no aliasing.
# Banned: asname starts with `_`.
# Allowed: `import X` (no `as`), `import X as real_name` (not starting with `_`).
if target.asname is None:
return False
return target.asname.startswith("_")
def _count_from_dict_in_expr(node: ast.expr) -> int:
"""Count `.from_dict(...)` attribute calls in `node` (heuristic;
may under/overcount with chained method calls but catches the common
pattern)."""
count = 0
for sub in ast.walk(node):
if isinstance(sub, ast.Call):
func = sub.func
if isinstance(func, ast.Attribute) and func.attr == "from_dict":
count += 1
return count
def load_whitelist(whitelist_path: Path) -> dict[str, dict]:
"""Load the warmed-import whitelist from a TOML file. Returns a dict
keyed by repo-relative file path (forward-slash normalized) -> metadata
({"reason": str, "scope": "file"}). Missing file returns empty dict."""
if not whitelist_path.exists():
return {}
try:
with open(whitelist_path, "rb") as f:
data = tomllib.load(f)
except (OSError, tomllib.TOMLDecodeError) as e:
print(f"WARN: could not load whitelist {whitelist_path}: {e}", file=sys.stderr)
return {}
return data.get("whitelist", {})
def _is_file_whitelisted(filepath: Path, whitelist: dict[str, dict], repo_root: Path) -> tuple[bool, str | None]:
"""Check whether `filepath` is covered by the whitelist. Returns
(is_whitelisted, reason). Uses forward-slash normalization for cross-OS
matching."""
try:
rel = filepath.resolve().relative_to(repo_root.resolve()).as_posix()
except ValueError:
return False, None
entry = whitelist.get(rel)
if entry is None:
return False, None
return True, entry.get("reason", "(no reason given)")
def audit_file(filepath: Path, whitelist: dict[str, dict] | None = None, repo_root: Path | None = None) -> list[dict]:
"""Audit one file: scan for local imports, _PREFIX aliasing, and
repeated .from_dict() in the same expression.
If `whitelist` is provided and the file is whitelisted (warmed imports
or hot-reload re-imports), LOCAL_IMPORT findings are filtered out and
replaced with a single WHITELIST annotation entry (so the user knows
the script saw them but is not flagging them).
"""
if not filepath.exists():
return [{"file": str(filepath), "line": 0, "kind": "MISSING_FILE", "note": "file not found"}]
try:
source = filepath.read_text(encoding="utf-8")
except (OSError, UnicodeDecodeError) as e:
return [{"file": str(filepath), "line": 0, "kind": "READ_ERROR", "note": str(e)}]
try:
tree = ast.parse(source)
except SyntaxError as e:
return [{"file": str(filepath), "line": e.lineno or 0, "kind": "SYNTAX_ERROR", "note": str(e)}]
parents = _build_parent_map(tree)
findings: list[dict] = []
whitelisted = False
whitelist_reason: str | None = None
if whitelist and repo_root:
whitelisted, whitelist_reason = _is_file_whitelisted(filepath, whitelist, repo_root)
# 1. Local imports (§17.9a) + _PREFIX aliasing (§17.9b)
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
if _is_prefix_aliasing(alias):
findings.append({
"file": str(filepath),
"line": alias.lineno,
"kind": "PREFIX_ALIAS",
"note": f"`import {alias.name} as {alias.asname}` banned (§17.9b); use the real name",
})
if _is_local_import(node, parents):
func_name = _enclosing_function_name(node, parents)
location = f"inside {func_name}()" if func_name else "inside anonymous fn"
findings.append({
"file": str(filepath),
"line": node.lineno,
"kind": "LOCAL_IMPORT",
"note": f"`import {node.names[0].name}` {location} banned (§17.9a); move to module top",
})
elif isinstance(node, ast.ImportFrom):
module = node.module or ""
for alias in node.names:
if _is_prefix_aliasing(alias):
findings.append({
"file": str(filepath),
"line": alias.lineno,
"kind": "PREFIX_ALIAS",
"note": f"`from {module} import {alias.name} as {alias.asname}` banned (§17.9b); use the real name",
})
if _is_local_import(node, parents):
func_name = _enclosing_function_name(node, parents)
location = f"inside {func_name}()" if func_name else "inside anonymous fn"
findings.append({
"file": str(filepath),
"line": node.lineno,
"kind": "LOCAL_IMPORT",
"note": f"`from {module} import ...` {location} banned (§17.9a); move to module top",
})
elif isinstance(node, ast.Call):
# 2. Repeated .from_dict() in the same expression (§17.9c; INFO only)
fd_count = _count_from_dict_in_expr(node)
if fd_count > 1:
findings.append({
"file": str(filepath),
"line": node.lineno,
"kind": "REPEATED_FROM_DICT",
"note": f"expression contains {fd_count} .from_dict() calls (§17.9c INFO); cache in a local var",
})
if whitelisted:
# Filter LOCAL_IMPORT findings and add a single WHITELIST annotation
local_count = sum(1 for f in findings if f["kind"] == "LOCAL_IMPORT")
findings = [f for f in findings if f["kind"] != "LOCAL_IMPORT"]
if local_count > 0:
findings.insert(0, {
"file": str(filepath),
"line": 0,
"kind": "WHITELISTED",
"note": f"{local_count} LOCAL_IMPORT findings suppressed by whitelist: {whitelist_reason}",
})
return findings
def _iter_python_files(scan_root: str) -> list[Path]:
root = Path(scan_root)
if not root.is_dir():
return []
files: list[Path] = []
for p in root.rglob("*.py"):
if any(part in DEFAULT_EXCLUDE_DIRS for part in p.parts):
continue
files.append(p)
return sorted(files)
def main() -> int:
parser = argparse.ArgumentParser(description="Audit src/*.py for local imports + _PREFIX aliasing.")
parser.add_argument("--strict", action="store_true", help="Exit 1 on any LOCAL_IMPORT or PREFIX_ALIAS (REPEATED_FROM_DICT is info-only)")
parser.add_argument("--json", action="store_true", help="Output JSON")
parser.add_argument("--root", default=DEFAULT_SCAN_ROOT, help=f"Root directory to scan (default: {DEFAULT_SCAN_ROOT})")
parser.add_argument("--whitelist", default=DEFAULT_WHITELIST_PATH, help=f"Path to whitelist TOML (default: {DEFAULT_WHITELIST_PATH})")
parser.add_argument("--no-whitelist", action="store_true", help="Disable whitelist filtering (audit ALL files)")
parser.add_argument("--show-whitelist", action="store_true", help="Print the loaded whitelist and exit")
args = parser.parse_args()
repo_root = Path.cwd()
whitelist: dict[str, dict] = {}
if not args.no_whitelist:
whitelist = load_whitelist(repo_root / args.whitelist)
if args.show_whitelist:
print(f"Loaded {len(whitelist)} whitelisted files from {args.whitelist}:")
for path, entry in sorted(whitelist.items()):
print(f" - {path}")
print(f" reason: {entry.get('reason', '(no reason given)')}")
return 0
files = _iter_python_files(args.root)
all_findings: list[dict] = []
for filepath in files:
findings = audit_file(filepath, whitelist=whitelist, repo_root=repo_root)
all_findings.extend(findings)
if args.json:
out = {
"scan_root": args.root,
"files_scanned": len(files),
"files_with_findings": len({f["file"] for f in all_findings}),
"total_findings": len(all_findings),
"whitelisted_files": len(whitelist),
"by_kind": {
"LOCAL_IMPORT": sum(1 for f in all_findings if f["kind"] == "LOCAL_IMPORT"),
"PREFIX_ALIAS": sum(1 for f in all_findings if f["kind"] == "PREFIX_ALIAS"),
"REPEATED_FROM_DICT": sum(1 for f in all_findings if f["kind"] == "REPEATED_FROM_DICT"),
"WHITELISTED": sum(1 for f in all_findings if f["kind"] == "WHITELISTED"),
},
"findings": all_findings,
}
print(json.dumps(out, indent=2))
return 0
strict_findings = [f for f in all_findings if f["kind"] in ("LOCAL_IMPORT", "PREFIX_ALIAS")]
info_findings = [f for f in all_findings if f["kind"] == "REPEATED_FROM_DICT"]
whitelist_findings = [f for f in all_findings if f["kind"] == "WHITELISTED"]
print(f"Imports audit ({args.root}/): {len(all_findings)} total findings")
print(f" - {len(strict_findings)} strict (LOCAL_IMPORT + PREFIX_ALIAS)")
print(f" - {len(info_findings)} info (REPEATED_FROM_DICT)")
print(f" - {len(whitelist_findings)} whitelist annotations ({len(whitelist)} files whitelisted)")
for f in strict_findings:
print(f" STRICT: {f['file']}:{f['line']} [{f['kind']}] {f['note']}")
for f in info_findings:
print(f" INFO: {f['file']}:{f['line']} [{f['kind']}] {f['note']}")
for f in whitelist_findings:
print(f" WL: {f['file']} [{f['kind']}] {f['note']}")
if args.strict and strict_findings:
print(f"STRICT: {len(strict_findings)} violations")
return 1
return 0
if __name__ == "__main__":
sys.exit(main())