"""Audit: enforce the local-imports + _PREFIX aliasing ban in src/*.py. Per `conductor/code_styleguides/python.md` §17.9 (added 2026-06-27): - §17.9a: local imports inside function bodies are BANNED (except in `try/except ImportError` blocks for optional dependencies, AND in files whitelisted for vendor-SDK warmup or hot-reload re-imports per `scripts/audit_imports_whitelist.toml`). - §17.9b: `import X as _X` aliasing-for-naming-convenience is BANNED. - §17.9c: repeated `.from_dict()` calls in the same expression are BANNED. This script AST-scans src/*.py for the above patterns and exits 1 in --strict mode on any violation. The local-imports check is the strict violation; _PREFIX aliasing is strict; repeated .from_dict() is INFO only (detection is heuristic; relies on Tier 2 review for confirmation). Usage: uv run python scripts/audit_imports.py uv run python scripts/audit_imports.py --strict uv run python scripts/audit_imports.py --json uv run python scripts/audit_imports.py --show-whitelist """ from __future__ import annotations import argparse import ast import json import sys from pathlib import Path try: import tomllib except ImportError: import tomli as tomllib DEFAULT_SCAN_ROOT: str = "src" DEFAULT_EXCLUDE_DIRS: tuple[str, ...] = ("__pycache__",) DEFAULT_WHITELIST_PATH: str = "scripts/audit_imports_whitelist.toml" def _is_within_optional_import_try(node: ast.stmt) -> bool: """Return True if `node` is an Import/ImportFrom inside a `try` whose except handler is `except ImportError` (the canonical "optional dependency" pattern). The check is structural: the Import statement must be a direct child of a Try whose handlers are all ImportError. """ # Walk up: check the statement's parents via a heuristic (we don't have # parent links in stdlib AST). The common pattern is: # try: # from foo import bar # <-- node # except ImportError: # bar = None # So `node` is in Try.body[0..n], and Try.handlers are all ImportError. # Caller must pass us the Try node directly; this helper checks the Try. return False # Conservative: caller does the structural check via _parent_map def _build_parent_map(tree: ast.AST) -> dict[int, ast.AST]: """Build a map id(node) -> parent node so we can check context.""" parents: dict[int, ast.AST] = {} for node in ast.walk(tree): for child in ast.iter_child_nodes(node): parents[id(child)] = node return parents def _is_optional_import_try_node(try_node: ast.Try, parents: dict[int, ast.AST]) -> bool: """Return True if the Try is an optional-import guard (all except handlers catch ImportError).""" if not try_node.handlers: return False for handler in try_node.handlers: if not isinstance(handler, ast.ExceptHandler): return False if handler.type is None: # bare except: too broad, not an optional-import guard return False # The exception type can be Name('ImportError') or Attribute(value=Name('ImportError')) t = handler.type if isinstance(t, ast.Name) and t.id == "ImportError": continue if isinstance(t, ast.Attribute) and t.attr == "ImportError": continue return False return True def _enclosing_function_name(node: ast.AST, parents: dict[int, ast.AST]) -> str | None: """Walk up the parent chain to find the nearest enclosing FunctionDef or AsyncFunctionDef. Returns the function name (or None if at module level). Used to enrich LOCAL_IMPORT output with the enclosing function context.""" current: ast.AST | None = node while current is not None: parent = parents.get(id(current)) if parent is None: return None if isinstance(parent, (ast.FunctionDef, ast.AsyncFunctionDef)): return parent.name current = parent return None def _is_local_import(node: ast.stmt, parents: dict[int, ast.AST]) -> bool: """Return True if `node` is an Import/ImportFrom nested inside a function body (NOT a module-level import, NOT inside an optional-import try guard). EXCEPTION 1 (per §17.9a): imports inside `try/except ImportError:` blocks are allowed (the canonical "optional dependency" pattern). EXCEPTION 2 (per §17.9a whitelist): files whitelisted in `scripts/audit_imports_whitelist.toml` (vendor SDK warmup, hot-reload re-imports) are filtered out at the audit_file() call site — this function is unaware of the whitelist.""" # First, check the IMMEDIATE parent: if it's a Try-optional block, allow. immediate_parent = parents.get(id(node)) if isinstance(immediate_parent, ast.Try) and _is_optional_import_try_node(immediate_parent, parents): return False # Otherwise, walk up looking for any FunctionDef ancestor. current: ast.AST | None = node while current is not None: parent = parents.get(id(current)) if parent is None: return False if isinstance(parent, (ast.FunctionDef, ast.AsyncFunctionDef)): return True current = parent return False def _is_prefix_aliasing(target: ast.alias) -> bool: """Return True if the alias name starts with a single underscore (per §17.9b: `import X as _X` is BANNED).""" # ast.alias has `asname` (the alias after `as`); if None, no aliasing. # Banned: asname starts with `_`. # Allowed: `import X` (no `as`), `import X as real_name` (not starting with `_`). if target.asname is None: return False return target.asname.startswith("_") def _count_from_dict_in_expr(node: ast.expr) -> int: """Count `.from_dict(...)` attribute calls in `node` (heuristic; may under/overcount with chained method calls but catches the common pattern).""" count = 0 for sub in ast.walk(node): if isinstance(sub, ast.Call): func = sub.func if isinstance(func, ast.Attribute) and func.attr == "from_dict": count += 1 return count def load_whitelist(whitelist_path: Path) -> dict[str, dict]: """Load the warmed-import whitelist from a TOML file. Returns a dict keyed by repo-relative file path (forward-slash normalized) -> metadata ({"reason": str, "scope": "file"}). Missing file returns empty dict.""" if not whitelist_path.exists(): return {} try: with open(whitelist_path, "rb") as f: data = tomllib.load(f) except (OSError, tomllib.TOMLDecodeError) as e: print(f"WARN: could not load whitelist {whitelist_path}: {e}", file=sys.stderr) return {} return data.get("whitelist", {}) def _is_file_whitelisted(filepath: Path, whitelist: dict[str, dict], repo_root: Path) -> tuple[bool, str | None]: """Check whether `filepath` is covered by the whitelist. Returns (is_whitelisted, reason). Uses forward-slash normalization for cross-OS matching.""" try: rel = filepath.resolve().relative_to(repo_root.resolve()).as_posix() except ValueError: return False, None entry = whitelist.get(rel) if entry is None: return False, None return True, entry.get("reason", "(no reason given)") def audit_file(filepath: Path, whitelist: dict[str, dict] | None = None, repo_root: Path | None = None) -> list[dict]: """Audit one file: scan for local imports, _PREFIX aliasing, and repeated .from_dict() in the same expression. If `whitelist` is provided and the file is whitelisted (warmed imports or hot-reload re-imports), LOCAL_IMPORT findings are filtered out and replaced with a single WHITELIST annotation entry (so the user knows the script saw them but is not flagging them). """ if not filepath.exists(): return [{"file": str(filepath), "line": 0, "kind": "MISSING_FILE", "note": "file not found"}] try: source = filepath.read_text(encoding="utf-8") except (OSError, UnicodeDecodeError) as e: return [{"file": str(filepath), "line": 0, "kind": "READ_ERROR", "note": str(e)}] try: tree = ast.parse(source) except SyntaxError as e: return [{"file": str(filepath), "line": e.lineno or 0, "kind": "SYNTAX_ERROR", "note": str(e)}] parents = _build_parent_map(tree) findings: list[dict] = [] whitelisted = False whitelist_reason: str | None = None if whitelist and repo_root: whitelisted, whitelist_reason = _is_file_whitelisted(filepath, whitelist, repo_root) # 1. Local imports (§17.9a) + _PREFIX aliasing (§17.9b) for node in ast.walk(tree): if isinstance(node, ast.Import): for alias in node.names: if _is_prefix_aliasing(alias): findings.append({ "file": str(filepath), "line": alias.lineno, "kind": "PREFIX_ALIAS", "note": f"`import {alias.name} as {alias.asname}` banned (§17.9b); use the real name", }) if _is_local_import(node, parents): func_name = _enclosing_function_name(node, parents) location = f"inside {func_name}()" if func_name else "inside anonymous fn" findings.append({ "file": str(filepath), "line": node.lineno, "kind": "LOCAL_IMPORT", "note": f"`import {node.names[0].name}` {location} banned (§17.9a); move to module top", }) elif isinstance(node, ast.ImportFrom): module = node.module or "" for alias in node.names: if _is_prefix_aliasing(alias): findings.append({ "file": str(filepath), "line": alias.lineno, "kind": "PREFIX_ALIAS", "note": f"`from {module} import {alias.name} as {alias.asname}` banned (§17.9b); use the real name", }) if _is_local_import(node, parents): func_name = _enclosing_function_name(node, parents) location = f"inside {func_name}()" if func_name else "inside anonymous fn" findings.append({ "file": str(filepath), "line": node.lineno, "kind": "LOCAL_IMPORT", "note": f"`from {module} import ...` {location} banned (§17.9a); move to module top", }) elif isinstance(node, ast.Call): # 2. Repeated .from_dict() in the same expression (§17.9c; INFO only) fd_count = _count_from_dict_in_expr(node) if fd_count > 1: findings.append({ "file": str(filepath), "line": node.lineno, "kind": "REPEATED_FROM_DICT", "note": f"expression contains {fd_count} .from_dict() calls (§17.9c INFO); cache in a local var", }) if whitelisted: # Filter LOCAL_IMPORT findings and add a single WHITELIST annotation local_count = sum(1 for f in findings if f["kind"] == "LOCAL_IMPORT") findings = [f for f in findings if f["kind"] != "LOCAL_IMPORT"] if local_count > 0: findings.insert(0, { "file": str(filepath), "line": 0, "kind": "WHITELISTED", "note": f"{local_count} LOCAL_IMPORT findings suppressed by whitelist: {whitelist_reason}", }) return findings def _iter_python_files(scan_root: str) -> list[Path]: root = Path(scan_root) if not root.is_dir(): return [] files: list[Path] = [] for p in root.rglob("*.py"): if any(part in DEFAULT_EXCLUDE_DIRS for part in p.parts): continue files.append(p) return sorted(files) def main() -> int: parser = argparse.ArgumentParser(description="Audit src/*.py for local imports + _PREFIX aliasing.") parser.add_argument("--strict", action="store_true", help="Exit 1 on any LOCAL_IMPORT or PREFIX_ALIAS (REPEATED_FROM_DICT is info-only)") parser.add_argument("--json", action="store_true", help="Output JSON") parser.add_argument("--root", default=DEFAULT_SCAN_ROOT, help=f"Root directory to scan (default: {DEFAULT_SCAN_ROOT})") parser.add_argument("--whitelist", default=DEFAULT_WHITELIST_PATH, help=f"Path to whitelist TOML (default: {DEFAULT_WHITELIST_PATH})") parser.add_argument("--no-whitelist", action="store_true", help="Disable whitelist filtering (audit ALL files)") parser.add_argument("--show-whitelist", action="store_true", help="Print the loaded whitelist and exit") args = parser.parse_args() repo_root = Path.cwd() whitelist: dict[str, dict] = {} if not args.no_whitelist: whitelist = load_whitelist(repo_root / args.whitelist) if args.show_whitelist: print(f"Loaded {len(whitelist)} whitelisted files from {args.whitelist}:") for path, entry in sorted(whitelist.items()): print(f" - {path}") print(f" reason: {entry.get('reason', '(no reason given)')}") return 0 files = _iter_python_files(args.root) all_findings: list[dict] = [] for filepath in files: findings = audit_file(filepath, whitelist=whitelist, repo_root=repo_root) all_findings.extend(findings) if args.json: out = { "scan_root": args.root, "files_scanned": len(files), "files_with_findings": len({f["file"] for f in all_findings}), "total_findings": len(all_findings), "whitelisted_files": len(whitelist), "by_kind": { "LOCAL_IMPORT": sum(1 for f in all_findings if f["kind"] == "LOCAL_IMPORT"), "PREFIX_ALIAS": sum(1 for f in all_findings if f["kind"] == "PREFIX_ALIAS"), "REPEATED_FROM_DICT": sum(1 for f in all_findings if f["kind"] == "REPEATED_FROM_DICT"), "WHITELISTED": sum(1 for f in all_findings if f["kind"] == "WHITELISTED"), }, "findings": all_findings, } print(json.dumps(out, indent=2)) return 0 strict_findings = [f for f in all_findings if f["kind"] in ("LOCAL_IMPORT", "PREFIX_ALIAS")] info_findings = [f for f in all_findings if f["kind"] == "REPEATED_FROM_DICT"] whitelist_findings = [f for f in all_findings if f["kind"] == "WHITELISTED"] print(f"Imports audit ({args.root}/): {len(all_findings)} total findings") print(f" - {len(strict_findings)} strict (LOCAL_IMPORT + PREFIX_ALIAS)") print(f" - {len(info_findings)} info (REPEATED_FROM_DICT)") print(f" - {len(whitelist_findings)} whitelist annotations ({len(whitelist)} files whitelisted)") for f in strict_findings: print(f" STRICT: {f['file']}:{f['line']} [{f['kind']}] {f['note']}") for f in info_findings: print(f" INFO: {f['file']}:{f['line']} [{f['kind']}] {f['note']}") for f in whitelist_findings: print(f" WL: {f['file']} [{f['kind']}] {f['note']}") if args.strict and strict_findings: print(f"STRICT: {len(strict_findings)} violations") return 1 return 0 if __name__ == "__main__": sys.exit(main())