Private
Public Access
0
0

feat(audit): add scripts/audit_dataclass_coverage.py + baseline (t0_2)

GREEN phase for Phase 0. Mirrors scripts/audit_weak_types.py design with
3 additions specific to the any-type componentization track:

1. PROMOTED_SITE_MODULES allowlist: the 3 new src/ modules
   (mcp_tool_specs.py, openai_schemas.py, provider_state.py) are exempt
   from Any-counting (their new dataclasses intentionally have raw_response: Any
   and SDK holder fields that stay as Any per Pattern 3).
2. INLINE_PROMOTED_SITE_MODULES: log_registry.py + api_hooks.py get their
   dataclasses added inline in Phase 4 + 5 (not new modules); same exemption.
3. Combined counter: counts both Any AND weak-struct patterns
   (dict_str_any, list_of_dict, optional_dict, etc.).

Modes:
- default: informational (exits 0; prints human report)
- --json: machine-readable with by_file, by_category, total_weak
- --strict: CI gate (exits 1 when current > baseline)
- --baseline: path to baseline file (default: scripts/audit_dataclass_coverage.baseline.json)

Baseline: scripts/audit_dataclass_coverage.baseline.json = 207 weak sites
(captured pre-Phase-1; expected to drop to ~118 after 89 sites promoted).

Verification:
  uv run python scripts/audit_dataclass_coverage.py --strict
    STRICT OK: 207 weak sites <= baseline 207
  uv run pytest tests/test_audit_dataclass_coverage.py --timeout=30
    7 passed in 5.15s
This commit is contained in:
2026-06-21 15:56:41 -04:00
parent 647ad3d49d
commit cfdf8988fb
2 changed files with 282 additions and 0 deletions
@@ -0,0 +1,8 @@
{
"total_weak": 207,
"files_with_findings": 35,
"by_category": {
"any": 188,
"dict_str_any": 19
}
}
+274
View File
@@ -0,0 +1,274 @@
#!/usr/bin/env python3
"""Audit src/ for residual `Any`-typed and `dict[str, Any]` annotations.
The complementary audit to `audit_weak_types.py`. Where the weak-types
audit tracks "weak STRUCT patterns" (dict, list of dict, tuple), this
audit tracks ALL remaining `Any` usages - including bare `Any`,
`Optional[Any]`, `list[Any]`, etc. It also counts literal `dict[str, Any]`
annotations NOT aliased to `Metadata`/`CommsLogEntry`/`FileItem`/etc.
This audit is the CI gate for the `any_type_componentization_20260621`
track: the post-track baseline documents the count AFTER the 89 fat-struct
sites are promoted to `dataclass(frozen=True)`.
Usage:
python scripts/audit_dataclass_coverage.py # human-readable report
python scripts/audit_dataclass_coverage.py --json # JSON output for tooling
python scripts/audit_dataclass_coverage.py --src src # override source dir
python scripts/audit_dataclass_coverage.py --top 15 # show top N files
python scripts/audit_dataclass_coverage.py --strict # CI gate; exit 1 on regression
python scripts/audit_dataclass_coverage.py --baseline X # custom baseline file
Exit codes:
0 - audit ran; in --strict mode, current count <= baseline
1 - usage error OR --strict mode regression
"""
from __future__ import annotations
import argparse
import ast
import json
import re
import sys
from collections import Counter
from dataclasses import dataclass, field
from pathlib import Path
ANY_PATTERNS: list[tuple[str, str]] = [
(r"\bAny\b", "any"),
]
WEAK_STRUCT_PATTERNS: list[tuple[str, str]] = [
(r"Dict\[str,\s*Any\]", "dict_str_any"),
(r"dict\[str,\s*Any\]", "dict_str_any"),
(r"List\[Dict\[", "list_of_dict"),
(r"list\[dict\[", "list_of_dict"),
(r"Optional\[List\[Dict\[", "optional_list_of_dict"),
(r"Optional\[list\[dict\[", "optional_list_of_dict"),
(r"Optional\[Dict\[", "optional_dict"),
(r"Optional\[dict\[", "optional_dict"),
]
PROMOTED_SITE_MODULES: set[str] = {
"src/mcp_tool_specs.py",
"src/openai_schemas.py",
"src/provider_state.py",
}
# Files where dataclass promotion already happened inline (Phase 4 + Phase 5).
# Any usages INSIDE these files are the new typed shapes; do NOT double-count.
INLINE_PROMOTED_SITE_MODULES: set[str] = {
"src/log_registry.py",
"src/api_hooks.py",
}
@dataclass(frozen=True)
class Finding:
filename: str
line: int
context: str
type_str: str
category: str
severity: str
@dataclass
class FileReport:
filename: str
weak: list[Finding] = field(default_factory=list)
positive: list[tuple[int, str, str]] = field(default_factory=list)
@property
def weak_count(self) -> int:
return len(self.weak)
def _is_promoted_site(filename: str) -> bool:
norm = filename.replace("\\", "/")
if norm in PROMOTED_SITE_MODULES:
return True
if norm in INLINE_PROMOTED_SITE_MODULES:
return True
return False
class CoverageVisitor(ast.NodeVisitor):
def __init__(self, filename: str, source: str) -> None:
self.filename = filename
self.source = source
self.report = FileReport(filename=filename)
self._func_stack: list[ast.FunctionDef] = []
self._class_stack: list[ast.ClassDef] = []
def _check_type(self, type_node: ast.AST | None, line: int, context: str) -> None:
if type_node is None:
return
type_str = ast.unparse(type_node).replace("\n", " ").strip()
promoted = _is_promoted_site(self.filename)
for pattern, category in WEAK_STRUCT_PATTERNS:
if re.search(pattern, type_str):
self.report.weak.append(Finding(
filename=self.filename,
line=line,
context=context,
type_str=type_str,
category=category,
severity="high",
))
break
for pattern, category in ANY_PATTERNS:
if re.search(pattern, type_str):
if not promoted:
self.report.weak.append(Finding(
filename=self.filename,
line=line,
context=context,
type_str=type_str,
category=category,
severity="medium",
))
break
def visit_FunctionDef(self, node: ast.FunctionDef) -> None:
self._func_stack.append(node)
try:
for arg in node.args.args + node.args.kwonlyargs:
self._check_type(arg.annotation, arg.lineno, f"{node.name}({arg.arg})")
if node.args.vararg and node.args.vararg.annotation:
self._check_type(node.args.vararg.annotation, node.args.vararg.lineno, f"{node.name}(*{node.args.vararg.arg})")
if node.args.kwarg and node.args.kwarg.annotation:
self._check_type(node.args.kwarg.annotation, node.args.kwarg.lineno, f"{node.name}(**{node.args.kwarg.arg})")
self._check_type(node.returns, node.returns.lineno if node.returns else node.lineno, f"{node.name} -> ...")
for stmt in node.body:
self.visit(stmt)
finally:
self._func_stack.pop()
def visit_ClassDef(self, node: ast.ClassDef) -> None:
self._class_stack.append(node)
try:
for stmt in node.body:
self.visit(stmt)
finally:
self._class_stack.pop()
def visit_AnnAssign(self, node: ast.AnnAssign) -> None:
target = ast.unparse(node.target)
self._check_type(node.annotation, node.lineno, f"{target}: ...")
self.generic_visit(node)
def audit_file(filepath: Path) -> FileReport:
try:
source = filepath.read_text(encoding="utf-8")
except (OSError, UnicodeDecodeError) as e:
print(f"WARN: could not read {filepath}: {e}", file=sys.stderr)
return FileReport(filename=str(filepath))
try:
tree = ast.parse(source, filename=str(filepath))
except SyntaxError as e:
print(f"WARN: syntax error in {filepath}: {e}", file=sys.stderr)
return FileReport(filename=str(filepath))
visitor = CoverageVisitor(str(filepath), source)
visitor.visit(tree)
return visitor.report
def find_python_files(root: Path) -> list[Path]:
if not root.exists():
raise FileNotFoundError(f"Source directory not found: {root}")
return sorted(p for p in root.rglob("*.py") if "artifacts" not in p.parts and "__pycache__" not in p.parts)
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("--src", default="src", help="Source directory to audit (default: src)")
parser.add_argument("--json", action="store_true", help="Output JSON instead of human-readable report")
parser.add_argument("--top", type=int, default=15, help="Show top N files by weak count (default: 15)")
parser.add_argument("--strict", action="store_true", help="CI mode; exits 1 if current count exceeds baseline")
parser.add_argument("--baseline", default="scripts/audit_dataclass_coverage.baseline.json", help="Baseline file for --strict mode")
args = parser.parse_args()
src = Path(args.src)
try:
files = find_python_files(src)
except FileNotFoundError as e:
print(f"ERROR: {e}", file=sys.stderr)
return 1
reports: list[FileReport] = [audit_file(f) for f in files]
reports = [r for r in reports if r.weak_count > 0]
if args.strict:
baseline_path = Path(args.baseline)
if not baseline_path.exists():
print(f"ERROR: baseline file not found: {baseline_path}", file=sys.stderr)
return 1
try:
with baseline_path.open("r", encoding="utf-8") as f:
baseline_data = json.load(f)
baseline_count = baseline_data.get("total_weak", 0)
except (OSError, json.JSONDecodeError) as e:
print(f"ERROR: could not read baseline {baseline_path}: {e}", file=sys.stderr)
return 1
current_count = sum(r.weak_count for r in reports)
if current_count > baseline_count:
print(f"STRICT: {current_count} weak sites found, baseline is {baseline_count} (regression of {current_count - baseline_count})", file=sys.stderr)
return 1
print(f"STRICT OK: {current_count} weak sites <= baseline {baseline_count}")
return 0
if args.json:
output = {
"src_dir": str(src),
"files_scanned": len(files),
"files_with_findings": len(reports),
"total_weak": sum(r.weak_count for r in reports),
"by_category": dict(Counter(f.category for r in reports for f in r.weak).most_common()),
"by_file": [
{
"filename": r.filename,
"weak_count": r.weak_count,
"findings": [
{
"line": f.line,
"context": f.context,
"type_str": f.type_str,
"category": f.category,
"severity": f.severity,
}
for f in r.weak
],
}
for r in sorted(reports, key=lambda r: -r.weak_count)
],
}
print(json.dumps(output, indent=2))
return 0
print(f"=== Dataclass Coverage Audit: {src} ===\n")
print(f"Files scanned: {len(files)}")
print(f"Files with findings: {len(reports)}")
print(f"Total weak findings: {sum(r.weak_count for r in reports)}\n")
cat_counts = Counter(f.category for r in reports for f in r.weak)
print("By category:")
for cat, n in cat_counts.most_common():
print(f" {cat:30s} {n:4d}")
print(f"\n--- Top {args.top} files by weak count ---")
top = sorted(reports, key=lambda r: -r.weak_count)[:args.top]
for r in top:
pct = (r.weak_count / max(sum(rr.weak_count for rr in reports), 1)) * 100
print(f"\n{r.filename} ({r.weak_count} findings, {pct:.1f}% of total)")
by_cat = Counter(f.category for f in r.weak)
for cat, n in by_cat.most_common():
print(f" {cat:30s} {n}")
return 0
if __name__ == "__main__":
sys.exit(main())