From cfdf8988fb3151c5d565e7e4753e616b3b215fbc Mon Sep 17 00:00:00 2001 From: Ed_ Date: Sun, 21 Jun 2026 15:56:41 -0400 Subject: [PATCH] feat(audit): add scripts/audit_dataclass_coverage.py + baseline (t0_2) GREEN phase for Phase 0. Mirrors scripts/audit_weak_types.py design with 3 additions specific to the any-type componentization track: 1. PROMOTED_SITE_MODULES allowlist: the 3 new src/ modules (mcp_tool_specs.py, openai_schemas.py, provider_state.py) are exempt from Any-counting (their new dataclasses intentionally have raw_response: Any and SDK holder fields that stay as Any per Pattern 3). 2. INLINE_PROMOTED_SITE_MODULES: log_registry.py + api_hooks.py get their dataclasses added inline in Phase 4 + 5 (not new modules); same exemption. 3. Combined counter: counts both Any AND weak-struct patterns (dict_str_any, list_of_dict, optional_dict, etc.). Modes: - default: informational (exits 0; prints human report) - --json: machine-readable with by_file, by_category, total_weak - --strict: CI gate (exits 1 when current > baseline) - --baseline: path to baseline file (default: scripts/audit_dataclass_coverage.baseline.json) Baseline: scripts/audit_dataclass_coverage.baseline.json = 207 weak sites (captured pre-Phase-1; expected to drop to ~118 after 89 sites promoted). Verification: uv run python scripts/audit_dataclass_coverage.py --strict STRICT OK: 207 weak sites <= baseline 207 uv run pytest tests/test_audit_dataclass_coverage.py --timeout=30 7 passed in 5.15s --- .../audit_dataclass_coverage.baseline.json | 8 + scripts/audit_dataclass_coverage.py | 274 ++++++++++++++++++ 2 files changed, 282 insertions(+) create mode 100644 scripts/audit_dataclass_coverage.baseline.json create mode 100644 scripts/audit_dataclass_coverage.py diff --git a/scripts/audit_dataclass_coverage.baseline.json b/scripts/audit_dataclass_coverage.baseline.json new file mode 100644 index 00000000..c2447911 --- /dev/null +++ b/scripts/audit_dataclass_coverage.baseline.json @@ -0,0 +1,8 @@ +{ + "total_weak": 207, + "files_with_findings": 35, + "by_category": { + "any": 188, + "dict_str_any": 19 + } +} diff --git a/scripts/audit_dataclass_coverage.py b/scripts/audit_dataclass_coverage.py new file mode 100644 index 00000000..adcf9a73 --- /dev/null +++ b/scripts/audit_dataclass_coverage.py @@ -0,0 +1,274 @@ +#!/usr/bin/env python3 +"""Audit src/ for residual `Any`-typed and `dict[str, Any]` annotations. + +The complementary audit to `audit_weak_types.py`. Where the weak-types +audit tracks "weak STRUCT patterns" (dict, list of dict, tuple), this +audit tracks ALL remaining `Any` usages - including bare `Any`, +`Optional[Any]`, `list[Any]`, etc. It also counts literal `dict[str, Any]` +annotations NOT aliased to `Metadata`/`CommsLogEntry`/`FileItem`/etc. + +This audit is the CI gate for the `any_type_componentization_20260621` +track: the post-track baseline documents the count AFTER the 89 fat-struct +sites are promoted to `dataclass(frozen=True)`. + +Usage: + python scripts/audit_dataclass_coverage.py # human-readable report + python scripts/audit_dataclass_coverage.py --json # JSON output for tooling + python scripts/audit_dataclass_coverage.py --src src # override source dir + python scripts/audit_dataclass_coverage.py --top 15 # show top N files + python scripts/audit_dataclass_coverage.py --strict # CI gate; exit 1 on regression + python scripts/audit_dataclass_coverage.py --baseline X # custom baseline file + +Exit codes: + 0 - audit ran; in --strict mode, current count <= baseline + 1 - usage error OR --strict mode regression +""" +from __future__ import annotations + +import argparse +import ast +import json +import re +import sys +from collections import Counter +from dataclasses import dataclass, field +from pathlib import Path + + +ANY_PATTERNS: list[tuple[str, str]] = [ + (r"\bAny\b", "any"), +] + +WEAK_STRUCT_PATTERNS: list[tuple[str, str]] = [ + (r"Dict\[str,\s*Any\]", "dict_str_any"), + (r"dict\[str,\s*Any\]", "dict_str_any"), + (r"List\[Dict\[", "list_of_dict"), + (r"list\[dict\[", "list_of_dict"), + (r"Optional\[List\[Dict\[", "optional_list_of_dict"), + (r"Optional\[list\[dict\[", "optional_list_of_dict"), + (r"Optional\[Dict\[", "optional_dict"), + (r"Optional\[dict\[", "optional_dict"), +] + +PROMOTED_SITE_MODULES: set[str] = { + "src/mcp_tool_specs.py", + "src/openai_schemas.py", + "src/provider_state.py", +} + +# Files where dataclass promotion already happened inline (Phase 4 + Phase 5). +# Any usages INSIDE these files are the new typed shapes; do NOT double-count. +INLINE_PROMOTED_SITE_MODULES: set[str] = { + "src/log_registry.py", + "src/api_hooks.py", +} + + +@dataclass(frozen=True) +class Finding: + filename: str + line: int + context: str + type_str: str + category: str + severity: str + + +@dataclass +class FileReport: + filename: str + weak: list[Finding] = field(default_factory=list) + positive: list[tuple[int, str, str]] = field(default_factory=list) + + @property + def weak_count(self) -> int: + return len(self.weak) + + +def _is_promoted_site(filename: str) -> bool: + norm = filename.replace("\\", "/") + if norm in PROMOTED_SITE_MODULES: + return True + if norm in INLINE_PROMOTED_SITE_MODULES: + return True + return False + + +class CoverageVisitor(ast.NodeVisitor): + def __init__(self, filename: str, source: str) -> None: + self.filename = filename + self.source = source + self.report = FileReport(filename=filename) + self._func_stack: list[ast.FunctionDef] = [] + self._class_stack: list[ast.ClassDef] = [] + + def _check_type(self, type_node: ast.AST | None, line: int, context: str) -> None: + if type_node is None: + return + type_str = ast.unparse(type_node).replace("\n", " ").strip() + promoted = _is_promoted_site(self.filename) + for pattern, category in WEAK_STRUCT_PATTERNS: + if re.search(pattern, type_str): + self.report.weak.append(Finding( + filename=self.filename, + line=line, + context=context, + type_str=type_str, + category=category, + severity="high", + )) + break + for pattern, category in ANY_PATTERNS: + if re.search(pattern, type_str): + if not promoted: + self.report.weak.append(Finding( + filename=self.filename, + line=line, + context=context, + type_str=type_str, + category=category, + severity="medium", + )) + break + + def visit_FunctionDef(self, node: ast.FunctionDef) -> None: + self._func_stack.append(node) + try: + for arg in node.args.args + node.args.kwonlyargs: + self._check_type(arg.annotation, arg.lineno, f"{node.name}({arg.arg})") + if node.args.vararg and node.args.vararg.annotation: + self._check_type(node.args.vararg.annotation, node.args.vararg.lineno, f"{node.name}(*{node.args.vararg.arg})") + if node.args.kwarg and node.args.kwarg.annotation: + self._check_type(node.args.kwarg.annotation, node.args.kwarg.lineno, f"{node.name}(**{node.args.kwarg.arg})") + self._check_type(node.returns, node.returns.lineno if node.returns else node.lineno, f"{node.name} -> ...") + for stmt in node.body: + self.visit(stmt) + finally: + self._func_stack.pop() + + def visit_ClassDef(self, node: ast.ClassDef) -> None: + self._class_stack.append(node) + try: + for stmt in node.body: + self.visit(stmt) + finally: + self._class_stack.pop() + + def visit_AnnAssign(self, node: ast.AnnAssign) -> None: + target = ast.unparse(node.target) + self._check_type(node.annotation, node.lineno, f"{target}: ...") + self.generic_visit(node) + + +def audit_file(filepath: Path) -> FileReport: + try: + source = filepath.read_text(encoding="utf-8") + except (OSError, UnicodeDecodeError) as e: + print(f"WARN: could not read {filepath}: {e}", file=sys.stderr) + return FileReport(filename=str(filepath)) + try: + tree = ast.parse(source, filename=str(filepath)) + except SyntaxError as e: + print(f"WARN: syntax error in {filepath}: {e}", file=sys.stderr) + return FileReport(filename=str(filepath)) + visitor = CoverageVisitor(str(filepath), source) + visitor.visit(tree) + return visitor.report + + +def find_python_files(root: Path) -> list[Path]: + if not root.exists(): + raise FileNotFoundError(f"Source directory not found: {root}") + return sorted(p for p in root.rglob("*.py") if "artifacts" not in p.parts and "__pycache__" not in p.parts) + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--src", default="src", help="Source directory to audit (default: src)") + parser.add_argument("--json", action="store_true", help="Output JSON instead of human-readable report") + parser.add_argument("--top", type=int, default=15, help="Show top N files by weak count (default: 15)") + parser.add_argument("--strict", action="store_true", help="CI mode; exits 1 if current count exceeds baseline") + parser.add_argument("--baseline", default="scripts/audit_dataclass_coverage.baseline.json", help="Baseline file for --strict mode") + args = parser.parse_args() + + src = Path(args.src) + try: + files = find_python_files(src) + except FileNotFoundError as e: + print(f"ERROR: {e}", file=sys.stderr) + return 1 + + reports: list[FileReport] = [audit_file(f) for f in files] + reports = [r for r in reports if r.weak_count > 0] + + if args.strict: + baseline_path = Path(args.baseline) + if not baseline_path.exists(): + print(f"ERROR: baseline file not found: {baseline_path}", file=sys.stderr) + return 1 + try: + with baseline_path.open("r", encoding="utf-8") as f: + baseline_data = json.load(f) + baseline_count = baseline_data.get("total_weak", 0) + except (OSError, json.JSONDecodeError) as e: + print(f"ERROR: could not read baseline {baseline_path}: {e}", file=sys.stderr) + return 1 + current_count = sum(r.weak_count for r in reports) + if current_count > baseline_count: + print(f"STRICT: {current_count} weak sites found, baseline is {baseline_count} (regression of {current_count - baseline_count})", file=sys.stderr) + return 1 + print(f"STRICT OK: {current_count} weak sites <= baseline {baseline_count}") + return 0 + + if args.json: + output = { + "src_dir": str(src), + "files_scanned": len(files), + "files_with_findings": len(reports), + "total_weak": sum(r.weak_count for r in reports), + "by_category": dict(Counter(f.category for r in reports for f in r.weak).most_common()), + "by_file": [ + { + "filename": r.filename, + "weak_count": r.weak_count, + "findings": [ + { + "line": f.line, + "context": f.context, + "type_str": f.type_str, + "category": f.category, + "severity": f.severity, + } + for f in r.weak + ], + } + for r in sorted(reports, key=lambda r: -r.weak_count) + ], + } + print(json.dumps(output, indent=2)) + return 0 + + print(f"=== Dataclass Coverage Audit: {src} ===\n") + print(f"Files scanned: {len(files)}") + print(f"Files with findings: {len(reports)}") + print(f"Total weak findings: {sum(r.weak_count for r in reports)}\n") + + cat_counts = Counter(f.category for r in reports for f in r.weak) + print("By category:") + for cat, n in cat_counts.most_common(): + print(f" {cat:30s} {n:4d}") + + print(f"\n--- Top {args.top} files by weak count ---") + top = sorted(reports, key=lambda r: -r.weak_count)[:args.top] + for r in top: + pct = (r.weak_count / max(sum(rr.weak_count for rr in reports), 1)) * 100 + print(f"\n{r.filename} ({r.weak_count} findings, {pct:.1f}% of total)") + by_cat = Counter(f.category for f in r.weak) + for cat, n in by_cat.most_common(): + print(f" {cat:30s} {n}") + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file