#!/usr/bin/env python3 """Audit src/ for residual `Any`-typed and `dict[str, Any]` annotations. The complementary audit to `audit_weak_types.py`. Where the weak-types audit tracks "weak STRUCT patterns" (dict, list of dict, tuple), this audit tracks ALL remaining `Any` usages - including bare `Any`, `Optional[Any]`, `list[Any]`, etc. It also counts literal `dict[str, Any]` annotations NOT aliased to `Metadata`/`CommsLogEntry`/`FileItem`/etc. This audit is the CI gate for the `any_type_componentization_20260621` track: the post-track baseline documents the count AFTER the 89 fat-struct sites are promoted to `dataclass(frozen=True)`. Usage: python scripts/audit_dataclass_coverage.py # human-readable report python scripts/audit_dataclass_coverage.py --json # JSON output for tooling python scripts/audit_dataclass_coverage.py --src src # override source dir python scripts/audit_dataclass_coverage.py --top 15 # show top N files python scripts/audit_dataclass_coverage.py --strict # CI gate; exit 1 on regression python scripts/audit_dataclass_coverage.py --baseline X # custom baseline file Exit codes: 0 - audit ran; in --strict mode, current count <= baseline 1 - usage error OR --strict mode regression """ from __future__ import annotations import argparse import ast import json import re import sys from collections import Counter from dataclasses import dataclass, field from pathlib import Path ANY_PATTERNS: list[tuple[str, str]] = [ (r"\bAny\b", "any"), ] WEAK_STRUCT_PATTERNS: list[tuple[str, str]] = [ (r"Dict\[str,\s*Any\]", "dict_str_any"), (r"dict\[str,\s*Any\]", "dict_str_any"), (r"List\[Dict\[", "list_of_dict"), (r"list\[dict\[", "list_of_dict"), (r"Optional\[List\[Dict\[", "optional_list_of_dict"), (r"Optional\[list\[dict\[", "optional_list_of_dict"), (r"Optional\[Dict\[", "optional_dict"), (r"Optional\[dict\[", "optional_dict"), ] PROMOTED_SITE_MODULES: set[str] = { "src/mcp_tool_specs.py", "src/openai_schemas.py", "src/provider_state.py", } # Files where dataclass promotion already happened inline (Phase 4 + Phase 5). # Any usages INSIDE these files are the new typed shapes; do NOT double-count. INLINE_PROMOTED_SITE_MODULES: set[str] = { "src/log_registry.py", "src/api_hooks.py", } @dataclass(frozen=True) class Finding: filename: str line: int context: str type_str: str category: str severity: str @dataclass class FileReport: filename: str weak: list[Finding] = field(default_factory=list) positive: list[tuple[int, str, str]] = field(default_factory=list) @property def weak_count(self) -> int: return len(self.weak) def _is_promoted_site(filename: str) -> bool: norm = filename.replace("\\", "/") if norm in PROMOTED_SITE_MODULES: return True if norm in INLINE_PROMOTED_SITE_MODULES: return True return False class CoverageVisitor(ast.NodeVisitor): def __init__(self, filename: str, source: str) -> None: self.filename = filename self.source = source self.report = FileReport(filename=filename) self._func_stack: list[ast.FunctionDef] = [] self._class_stack: list[ast.ClassDef] = [] def _check_type(self, type_node: ast.AST | None, line: int, context: str) -> None: if type_node is None: return type_str = ast.unparse(type_node).replace("\n", " ").strip() promoted = _is_promoted_site(self.filename) for pattern, category in WEAK_STRUCT_PATTERNS: if re.search(pattern, type_str): self.report.weak.append(Finding( filename=self.filename, line=line, context=context, type_str=type_str, category=category, severity="high", )) break for pattern, category in ANY_PATTERNS: if re.search(pattern, type_str): if not promoted: self.report.weak.append(Finding( filename=self.filename, line=line, context=context, type_str=type_str, category=category, severity="medium", )) break def visit_FunctionDef(self, node: ast.FunctionDef) -> None: self._func_stack.append(node) try: for arg in node.args.args + node.args.kwonlyargs: self._check_type(arg.annotation, arg.lineno, f"{node.name}({arg.arg})") if node.args.vararg and node.args.vararg.annotation: self._check_type(node.args.vararg.annotation, node.args.vararg.lineno, f"{node.name}(*{node.args.vararg.arg})") if node.args.kwarg and node.args.kwarg.annotation: self._check_type(node.args.kwarg.annotation, node.args.kwarg.lineno, f"{node.name}(**{node.args.kwarg.arg})") self._check_type(node.returns, node.returns.lineno if node.returns else node.lineno, f"{node.name} -> ...") for stmt in node.body: self.visit(stmt) finally: self._func_stack.pop() def visit_ClassDef(self, node: ast.ClassDef) -> None: self._class_stack.append(node) try: for stmt in node.body: self.visit(stmt) finally: self._class_stack.pop() def visit_AnnAssign(self, node: ast.AnnAssign) -> None: target = ast.unparse(node.target) self._check_type(node.annotation, node.lineno, f"{target}: ...") self.generic_visit(node) def audit_file(filepath: Path) -> FileReport: try: source = filepath.read_text(encoding="utf-8") except (OSError, UnicodeDecodeError) as e: print(f"WARN: could not read {filepath}: {e}", file=sys.stderr) return FileReport(filename=str(filepath)) try: tree = ast.parse(source, filename=str(filepath)) except SyntaxError as e: print(f"WARN: syntax error in {filepath}: {e}", file=sys.stderr) return FileReport(filename=str(filepath)) visitor = CoverageVisitor(str(filepath), source) visitor.visit(tree) return visitor.report def find_python_files(root: Path) -> list[Path]: if not root.exists(): raise FileNotFoundError(f"Source directory not found: {root}") return sorted(p for p in root.rglob("*.py") if "artifacts" not in p.parts and "__pycache__" not in p.parts) def main() -> int: parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument("--src", default="src", help="Source directory to audit (default: src)") parser.add_argument("--json", action="store_true", help="Output JSON instead of human-readable report") parser.add_argument("--top", type=int, default=15, help="Show top N files by weak count (default: 15)") parser.add_argument("--strict", action="store_true", help="CI mode; exits 1 if current count exceeds baseline") parser.add_argument("--baseline", default="scripts/audit_dataclass_coverage.baseline.json", help="Baseline file for --strict mode") args = parser.parse_args() src = Path(args.src) try: files = find_python_files(src) except FileNotFoundError as e: print(f"ERROR: {e}", file=sys.stderr) return 1 reports: list[FileReport] = [audit_file(f) for f in files] reports = [r for r in reports if r.weak_count > 0] if args.strict: baseline_path = Path(args.baseline) if not baseline_path.exists(): print(f"ERROR: baseline file not found: {baseline_path}", file=sys.stderr) return 1 try: with baseline_path.open("r", encoding="utf-8") as f: baseline_data = json.load(f) baseline_count = baseline_data.get("total_weak", 0) except (OSError, json.JSONDecodeError) as e: print(f"ERROR: could not read baseline {baseline_path}: {e}", file=sys.stderr) return 1 current_count = sum(r.weak_count for r in reports) if current_count > baseline_count: print(f"STRICT: {current_count} weak sites found, baseline is {baseline_count} (regression of {current_count - baseline_count})", file=sys.stderr) return 1 print(f"STRICT OK: {current_count} weak sites <= baseline {baseline_count}") return 0 if args.json: output = { "src_dir": str(src), "files_scanned": len(files), "files_with_findings": len(reports), "total_weak": sum(r.weak_count for r in reports), "by_category": dict(Counter(f.category for r in reports for f in r.weak).most_common()), "by_file": [ { "filename": r.filename, "weak_count": r.weak_count, "findings": [ { "line": f.line, "context": f.context, "type_str": f.type_str, "category": f.category, "severity": f.severity, } for f in r.weak ], } for r in sorted(reports, key=lambda r: -r.weak_count) ], } print(json.dumps(output, indent=2)) return 0 print(f"=== Dataclass Coverage Audit: {src} ===\n") print(f"Files scanned: {len(files)}") print(f"Files with findings: {len(reports)}") print(f"Total weak findings: {sum(r.weak_count for r in reports)}\n") cat_counts = Counter(f.category for r in reports for f in r.weak) print("By category:") for cat, n in cat_counts.most_common(): print(f" {cat:30s} {n:4d}") print(f"\n--- Top {args.top} files by weak count ---") top = sorted(reports, key=lambda r: -r.weak_count)[:args.top] for r in top: pct = (r.weak_count / max(sum(rr.weak_count for rr in reports), 1)) * 100 print(f"\n{r.filename} ({r.weak_count} findings, {pct:.1f}% of total)") by_cat = Counter(f.category for f in r.weak) for cat, n in by_cat.most_common(): print(f" {cat:30s} {n}") return 0 if __name__ == "__main__": sys.exit(main())