From 9a04153abdd4bf7a9b583441fba0cc04b225ea89 Mon Sep 17 00:00:00 2001 From: Ed_ Date: Tue, 16 Jun 2026 09:06:25 -0400 Subject: [PATCH] feat(scripts): add exception_handling audit script (10-category classification) --- scripts/audit_exception_handling.py | 792 ++++++++++++++++++++++++++++ 1 file changed, 792 insertions(+) create mode 100644 scripts/audit_exception_handling.py diff --git a/scripts/audit_exception_handling.py b/scripts/audit_exception_handling.py new file mode 100644 index 00000000..3306a3a1 --- /dev/null +++ b/scripts/audit_exception_handling.py @@ -0,0 +1,792 @@ +#!/usr/bin/env python3 +"""Audit try/except/finally/raise usage against the data-oriented error +handling convention. + +This is an INFORMATIONAL audit, not a CI gate. It produces a report that the +user (or a follow-up track) uses to decide whether a refactor is worth it. + +The convention (see conductor/code_styleguides/error_handling.md) requires: + + - SDK-boundary exceptions are caught and converted to ErrorInfo. + - Internal code uses Result[T] (data + errors list), not Optional[T] + try/except. + - except Exception is a code smell (broad catch without conversion). + - `raise` is reserved for programmer errors (assert/raise for impossible states). + - `try/finally` is the canonical cleanup pattern (like `goto defer`). + - `raise` in __init__ is OK for "this constructor needs X" (programmer error). + - FastAPI `raise HTTPException` in _api_* handlers is the FastAPI-idiomatic + boundary; it's how the framework signals HTTP errors. + +The 3 fully-refactored files (mcp_client.py, ai_client.py, rag_engine.py) are +the CONVENTION BASELINE. Everything outside them is the migration target. + +The script classifies every exception-handling site into one of: + + Category Convention status + ---------------------------- ----------------------------------------- + BOUNDARY_SDK Compliant (wraps third-party SDK or is in + a *_result function returning Result) + BOUNDARY_IO Compliant (wraps stdlib I/O that can raise) + BOUNDARY_CONVERSION Compliant (catches + converts to ErrorInfo) + BOUNDARY_FASTAPI Compliant (FastAPI HTTPException raise in + _api_* handler; framework-idiomatic) + INTERNAL_SILENT_SWALLOW Violation (except ...: pass or just logs) + INTERNAL_BROAD_CATCH Violation (except Exception without conversion) + INTERNAL_OPTIONAL_RETURN Violation (try/except + return None/Optional) + INTERNAL_RETHROW Suspicious (try/except + raise; refactorable) + INTERNAL_PROGRAMMER_RAISE Compliant (raise for impossible state in + __init__/assert/precondition; not a violation) + INTERNAL_COMPLIANT Compliant (try/finally cleanup pattern) + UNCLEAR Manual review needed + +For each VIOLATION or SUSPICIOUS site, the script prints a 1-line hint at what +the fix could look like (e.g., "return Result(data=NIL_T, errors=[...])"). + +Usage: + uv run python scripts/audit_exception_handling.py # human report + uv run python python scripts/audit_exception_handling.py --json # JSON output + uv run python python scripts/audit_exception_handling.py --src src + uv run python python scripts/audit_exception_handling.py --top 20 + uv run python python scripts/audit_exception_handling.py --verbose + uv run python python scripts/audit_exception_handling.py --strict + +Exit codes: + 0 - audit ran (informational mode; findings don't fail the script) + 1 - usage error, or --strict mode with violations found +""" +from __future__ import annotations +import argparse +import ast +import json +import re +import sys +from collections import Counter +from dataclasses import dataclass, field +from pathlib import Path + + +# The 3 files that were fully refactored to the convention by the +# data_oriented_error_handling_20260606 track. Sites in these files are the +# BASELINE; sites outside them are the MIGRATION TARGET. +REFACTORED_BASELINE_FILES: frozenset[str] = frozenset({ + "src/mcp_client.py", + "src/ai_client.py", + "src/rag_engine.py", +}) + +# Third-party SDKs the convention recognizes as boundary callers. +THIRD_PARTY_SDK_MODULES: frozenset[str] = frozenset({ + "anthropic", + "anthropic.types", + "google", + "google.generativeai", + "google.genai", + "google.api_core", + "google.protobuf", + "google.auth", + "openai", + "openai.types", + "groq", + "groq.types", + "mistralai", + "cohere", + "chromadb", + "sentence_transformers", + "huggingface_hub", + "transformers", + "torch", + "requests", + "urllib3", + "httpx", + "aiohttp", + "websockets", + "fastapi", + "uvicorn", + "starlette", + "psutil", + "pydantic", + "PIL", + "cv2", + "numpy", + "tomli", + "tomllib", + "imgui_bundle", + "dearpygui", + "dearpygui.dearpygui", +}) + +# Stdlib exceptions that almost always indicate a legitimate boundary wrap. +STDLIB_IO_EXCEPTIONS: frozenset[str] = frozenset({ + "OSError", + "IOError", + "FileNotFoundError", + "FileExistsError", + "PermissionError", + "IsADirectoryError", + "NotADirectoryError", + "TimeoutError", + "ConnectionError", + "ConnectionRefusedError", + "ConnectionResetError", + "ConnectionAbortedError", + "BrokenPipeError", + "socket.timeout", + "ssl.SSLError", + "json.JSONDecodeError", + "csv.Error", + "sqlite3.Error", + "sqlite3.IntegrityError", + "sqlite3.OperationalError", + "zipfile.BadZipFile", + "xml.etree.ElementTree.ParseError", + "subprocess.CalledProcessError", + "subprocess.TimeoutExpired", +}) + +# Third-party exception types commonly caught at the boundary. +THIRD_PARTY_EXCEPTIONS: frozenset[str] = frozenset({ + "anthropic.APIError", + "anthropic.APIConnectionError", + "anthropic.RateLimitError", + "anthropic.AuthenticationError", + "anthropic.BadRequestError", + "anthropic.NotFoundError", + "anthropic.PermissionDeniedError", + "anthropic.UnprocessableEntityError", + "google.api_core.exceptions.GoogleAPIError", + "google.api_core.exceptions.ResourceExhausted", + "google.api_core.exceptions.PermissionDenied", + "google.api_core.exceptions.NotFound", + "google.api_core.exceptions.InvalidArgument", + "google.api_core.exceptions.DeadlineExceeded", + "google.api_core.exceptions.ServiceUnavailable", + "google.api_core.exceptions.Aborted", + "openai.OpenAIError", + "openai.APIError", + "openai.APIConnectionError", + "openai.RateLimitError", + "openai.AuthenticationError", + "openai.BadRequestError", + "openai.NotFoundError", + "openai.PermissionDeniedError", + "requests.RequestException", + "requests.ConnectionError", + "requests.Timeout", + "requests.HTTPError", + "requests.exceptions.SSLError", + "httpx.HTTPError", + "httpx.RequestError", + "httpx.TimeoutException", + "chromadb.errors.ChromaError", + "pydantic.ValidationError", +}) + +# FastAPI boundary exception - idiomatic in _api_* handlers. +FASTAPI_EXCEPTIONS: frozenset[str] = frozenset({ + "fastapi.HTTPException", + "HTTPException", +}) + +# Programmer-error exceptions that are OK to raise (per the styleguide's +# "When to Use This Convention" section: "Constructors (__init__) that fail +# with programmer errors (use assert or raise for these)"). +PROGRAMMER_ERROR_EXCEPTIONS: frozenset[str] = frozenset({ + "AssertionError", + "ValueError", + "KeyError", + "IndexError", + "TypeError", + "AttributeError", + "NameError", + "RuntimeError", + "NotImplementedError", +}) + +# Categories that are considered violations +VIOLATION_CATEGORIES: frozenset[str] = frozenset({ + "INTERNAL_SILENT_SWALLOW", + "INTERNAL_BROAD_CATCH", + "INTERNAL_OPTIONAL_RETURN", +}) + +# Categories that are considered compliant (canonical) +COMPLIANT_CATEGORIES: frozenset[str] = frozenset({ + "BOUNDARY_SDK", + "BOUNDARY_IO", + "BOUNDARY_CONVERSION", + "BOUNDARY_FASTAPI", + "INTERNAL_PROGRAMMER_RAISE", + "INTERNAL_COMPLIANT", +}) + + +@dataclass(frozen=True) +class Finding: + filename: str + line: int + kind: str + context: str + snippet: str + category: str + hint: str + in_refactored_baseline: bool + + +@dataclass +class FileReport: + filename: str + findings: list[Finding] = field(default_factory=list) + has_error: bool = False + error_message: str = "" + + @property + def violation_count(self) -> int: + return sum(1 for f in self.findings if f.category in VIOLATION_CATEGORIES) + + @property + def compliant_count(self) -> int: + return sum(1 for f in self.findings if f.category in COMPLIANT_CATEGORIES) + + @property + def unclear_count(self) -> int: + return sum(1 for f in self.findings if f.category == "UNCLEAR") + + @property + def suspicious_count(self) -> int: + return sum(1 for f in self.findings if f.category == "INTERNAL_RETHROW") + + @property + def is_refactored_baseline(self) -> bool: + return any(f.in_refactored_baseline for f in self.findings) + + +class ExceptionVisitor(ast.NodeVisitor): + """Walks the AST and classifies every try/except/finally/raise node.""" + + def __init__(self, filename: str) -> None: + self.filename = filename + self.report = FileReport(filename=filename) + self._func_stack: list[ast.FunctionDef | ast.AsyncFunctionDef] = [] + self._try_stack: list[ast.Try | ast.TryStar] = [] + # Normalize the filename for the baseline check + rel = filename.replace("\\", "/") + self._in_baseline = rel in {f.replace("\\", "/") for f in REFACTORED_BASELINE_FILES} + + def _current_func_name(self) -> str: + if not self._func_stack: + return "" + return self._func_stack[-1].name + + def _current_func_node(self) -> ast.FunctionDef | ast.AsyncFunctionDef | None: + return self._func_stack[-1] if self._func_stack else None + + def _is_third_party_call(self, body: list[ast.stmt]) -> bool: + """Does this body make a call into a known third-party SDK?""" + for node in ast.walk(ast.Module(body=body, type_ignores=[])): + if isinstance(node, ast.Call): + func_str = ast.unparse(node.func) + top = func_str.split(".")[0] + if top in THIRD_PARTY_SDK_MODULES: + return True + parts = func_str.split(".") + for i in range(1, len(parts) + 1): + prefix = ".".join(parts[:i]) + if prefix in THIRD_PARTY_SDK_MODULES: + return True + return False + + def _is_fastapi_handler(self) -> bool: + """Is the current function a FastAPI _api_* handler?""" + name = self._current_func_name() + return name.startswith("_api_") or name.startswith("api_") + + def _enclosing_returns_result(self) -> bool: + """Does any enclosing function return a Result-like type?""" + for func in self._func_stack: + if func.returns is None: + continue + ret_str = ast.unparse(func.returns) + if "Result[" in ret_str or ret_str == "Result": + return True + return False + + def _classify_except(self, handler: ast.ExceptHandler, try_node: ast.Try) -> tuple[str, str]: + exc_type = handler.type + exc_name = ast.unparse(exc_type) if exc_type is not None else "Exception" + body = handler.body + handler_module = ast.unparse(exc_type).split(".")[0] if exc_type else "" + + # Empty body or pass = silent swallow + is_silent = ( + len(body) == 0 + or all(isinstance(s, ast.Pass) for s in body) + ) + + # Re-raise detection + re_raises = any( + isinstance(s, ast.Raise) and s.exc is None + for s in ast.walk(ast.Module(body=body, type_ignores=[])) + ) + + # ErrorInfo creation + creates_errorinfo = any( + isinstance(s, ast.Call) and "ErrorInfo" in ast.unparse(s.func) + for s in ast.walk(ast.Module(body=body, type_ignores=[])) + ) + + # Returns None + returns_none = any( + isinstance(s, ast.Return) and (s.value is None or ast.unparse(s.value) == "None") + for s in body + ) + + # Enclosing function returns Optional[T]? + enclosing_func = self._current_func_node() + returns_optional = False + if enclosing_func is not None and enclosing_func.returns is not None: + ret_str = ast.unparse(enclosing_func.returns) + if "Optional" in ret_str or " | None" in ret_str: + returns_optional = True + + is_third_party = self._is_third_party_call(try_node.body) + is_in_result_func = self._enclosing_returns_result() + + # ----- Classification logic ----- + + # 1. ErrorInfo conversion = canonical boundary pattern + if creates_errorinfo: + return ( + "BOUNDARY_CONVERSION", + "Compliant: catch + ErrorInfo conversion in a Result-returning function. This is the canonical SDK boundary pattern (per styleguide 'Catch SDK exceptions at the boundary only').", + ) + + # 2. FastAPI _api_* handler with broad catch (per app_controller pattern) + if self._is_fastapi_handler() and exc_name in ("Exception", "BaseException", ""): + return ( + "BOUNDARY_FASTAPI", + "Compliant: FastAPI _api_* handler catches and converts to HTTPException at the framework boundary. This is the FastAPI-idiomatic pattern.", + ) + + # 3. Inside a *_result function with broad catch (likely SDK boundary) + if is_in_result_func and exc_name in ("Exception", "BaseException", ""): + if is_third_party: + return ( + "BOUNDARY_SDK", + f"Compliant: broad `except {exc_name or 'Exception'}` in a *_result function that calls a third-party SDK. Consider narrowing the exception type or converting to ErrorInfo for a cleaner Result contract.", + ) + return ( + "INTERNAL_BROAD_CATCH", + f"Violation: `except {exc_name or 'Exception'}` in a Result-returning function without ErrorInfo conversion. Narrow the exception type, or convert to ErrorInfo in a Result (this is the canonical pattern in the 3 refactored files).", + ) + + # 4. Third-party SDK call + if is_third_party and (exc_name in THIRD_PARTY_EXCEPTIONS or "Error" in exc_name or "Exception" in exc_name or handler_module in THIRD_PARTY_SDK_MODULES): + return ( + "BOUNDARY_SDK", + f"Compliant: third-party exception {exc_name} caught at the SDK boundary.", + ) + + # 5. Stdlib I/O exception + if is_third_party and exc_name in STDLIB_IO_EXCEPTIONS: + return ( + "BOUNDARY_IO", + f"Compliant: stdlib I/O exception {exc_name} caught at a third-party call site.", + ) + + # 6. Re-raise + if re_raises: + if is_third_party: + return ( + "BOUNDARY_SDK", + f"Compliant: re-raise after {exc_name} preserves the SDK boundary; consider ErrorInfo conversion for a Result-based API.", + ) + return ( + "INTERNAL_RETHROW", + "Suspicious: re-raising without conversion is a control-flow smell. Consider whether the caller should handle this via a Result instead.", + ) + + # 7. Silent swallow + if is_silent: + return ( + "INTERNAL_SILENT_SWALLOW", + "Violation: silent swallow (`except ...: pass`) hides failures. Either let it propagate, return Result(data=NIL_T, errors=[...]), or document the intentional swallow with a comment-free `assert` for the precondition.", + ) + + # 8. Broad catch (Exception/BaseException) + if exc_name in ("Exception", "BaseException") or exc_name == "": + return ( + "INTERNAL_BROAD_CATCH", + f"Violation: broad `except {exc_name or 'Exception'}` catches more than intended. Narrow the exception type, or convert to ErrorInfo in a Result.", + ) + + # 9. try/except + return None in Optional[T] function + if returns_none and returns_optional: + return ( + "INTERNAL_OPTIONAL_RETURN", + f"Violation: `except {exc_name}: return None` in a function that returns Optional[T] violates the convention. Replace with `Result[T]` and return `Result(data=NIL_T, errors=[ErrorInfo(kind=..., message=...)])`.", + ) + + # 10. Stdlib I/O exception in our own code + if exc_name in STDLIB_IO_EXCEPTIONS and not is_third_party: + return ( + "INTERNAL_COMPLIANT", + f"Compliant: stdlib I/O exception {exc_name} caught in our own code is acceptable (per convention, file/network errors are converted to ErrorInfo).", + ) + + return ( + "UNCLEAR", + f"Manual review: catches {exc_name}; not obviously boundary or violation. Check whether the except site is converting to ErrorInfo (good) or hiding the error (bad).", + ) + + def _extract_raise_name(self, node: ast.expr) -> str: + """Extract the exception class name from a raise expression. + + For `raise HTTPException(...)` this returns 'HTTPException' (just the name). + For `raise ValueError('msg')` this returns 'ValueError'. + For `raise self.errors[0]` this returns the full expression (won't match). + """ + if isinstance(node, ast.Call): + return ast.unparse(node.func) + if isinstance(node, ast.Name): + return node.id + if isinstance(node, ast.Attribute): + return ast.unparse(node) + return ast.unparse(node) + + def _classify_raise(self, node: ast.Raise) -> tuple[str, str]: + exc_str = ast.unparse(node) if node.exc else "raise" + exc_name = self._extract_raise_name(node.exc) if node.exc else "" + + # Bare re-raise + if node.exc is None: + return ( + "INTERNAL_RETHROW", + "Suspicious: re-raising without conversion. Consider propagating via Result instead.", + ) + + # FastAPI HTTPException in an _api_* handler + exc_short = exc_name.split(".")[-1] + if exc_short in {"HTTPException"} and self._is_fastapi_handler(): + return ( + "BOUNDARY_FASTAPI", + "Compliant: FastAPI HTTPException in _api_* handler. This is the framework-idiomatic way to signal HTTP errors; FastAPI converts it to a JSON response at the framework level.", + ) + + # Raising ErrorInfo + if "ErrorInfo" in exc_name: + return ( + "INTERNAL_RETHROW", + "Violation: raising ErrorInfo as an exception defeats the data-oriented pattern. Return Result(data=NIL_T, errors=[ErrorInfo(...)]) instead.", + ) + + # Programmer error (in __init__ or as assert) + if exc_short in PROGRAMMER_ERROR_EXCEPTIONS: + func_name = self._current_func_name() + if func_name == "__init__": + return ( + "INTERNAL_PROGRAMMER_RAISE", + f"Compliant: `{exc_short}` in `__init__` is the canonical constructor-precondition pattern (per styleguide 'When to Use This Convention': constructors that fail with programmer errors use assert/raise).", + ) + if exc_short in {"AssertionError", "ValueError"} or "assert " in exc_str: + return ( + "INTERNAL_PROGRAMMER_RAISE", + f"Compliant: `{exc_short}` for an impossible state / precondition check. The styleguide reserves `raise` for programmer errors.", + ) + + return ( + "INTERNAL_RETHROW", + f"Review: `raise {exc_name}` in internal code. Confirm this is a programmer error (assertion) and not a runtime failure (which should be a Result).", + ) + + def _snippet(self, node: ast.AST) -> str: + return ast.unparse(node).replace("\n", " ").strip()[:120] + + def visit_FunctionDef(self, node: ast.FunctionDef) -> None: + self._func_stack.append(node) + try: + self.generic_visit(node) + finally: + self._func_stack.pop() + + def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None: + self._func_stack.append(node) + try: + self.generic_visit(node) + finally: + self._func_stack.pop() + + def _add_finding(self, kind: str, line: int, snippet: str, category: str, hint: str) -> None: + self.report.findings.append(Finding( + filename=self.filename, + line=line, + kind=kind, + context=self._current_func_name(), + snippet=snippet, + category=category, + hint=hint, + in_refactored_baseline=self._in_baseline, + )) + + def visit_Try(self, node: ast.Try) -> None: + self._try_stack.append(node) + try: + # bare try/finally (no except) = canonical cleanup pattern + if not node.handlers and node.finalbody: + self._add_finding( + "TRY", + node.lineno, + self._snippet(node), + "INTERNAL_COMPLIANT", + "Compliant: bare try/finally is the canonical cleanup pattern (analog of `goto defer`).", + ) + for handler in node.handlers: + category, hint = self._classify_except(handler, node) + self._add_finding("EXCEPT", handler.lineno, self._snippet(handler), category, hint) + for child in handler.body if node.handlers else []: + self.visit(child) + for child in node.orelse: + self.visit(child) + for child in node.finalbody: + self.visit(child) + finally: + self._try_stack.pop() + + def visit_TryStar(self, node: ast.TryStar) -> None: + self.visit_Try(node) # type: ignore[arg-type] + + def visit_Raise(self, node: ast.Raise) -> None: + category, hint = self._classify_raise(node) + self._add_finding("RAISE", node.lineno, self._snippet(node), category, hint) + self.generic_visit(node) + + +def audit_file(filepath: Path) -> FileReport: + try: + source = filepath.read_text(encoding="utf-8") + except (OSError, UnicodeDecodeError) as e: + report = FileReport(filename=str(filepath)) + report.has_error = True + report.error_message = f"could not read: {e}" + return report + try: + tree = ast.parse(source, filename=str(filepath)) + except SyntaxError as e: + report = FileReport(filename=str(filepath)) + report.has_error = True + report.error_message = f"syntax error: {e}" + return report + visitor = ExceptionVisitor(str(filepath)) + visitor.visit(tree) + return visitor.report + + +def find_python_files(root: Path, exclude_artifacts: bool = True) -> list[Path]: + if not root.exists(): + raise FileNotFoundError(f"Source directory not found: {root}") + files = sorted(p for p in root.rglob("*.py") if "__pycache__" not in p.parts) + if exclude_artifacts: + files = [p for p in files if "artifacts" not in p.parts] + return files + + +def render_human(reports: list[FileReport], files_scanned: int, top: int, verbose: bool) -> str: + lines: list[str] = [] + total_findings = sum(len(r.findings) for r in reports) + total_violations = sum(r.violation_count for r in reports) + total_compliant = sum(r.compliant_count for r in reports) + total_unclear = sum(r.unclear_count for r in reports) + total_suspicious = sum(r.suspicious_count for r in reports) + try_count = sum(1 for r in reports for f in r.findings if f.kind == "TRY") + except_count = sum(1 for r in reports for f in r.findings if f.kind == "EXCEPT") + finally_count = sum(1 for r in reports for f in r.findings if f.kind == "FINALLY") + raise_count = sum(1 for r in reports for f in r.findings if f.kind == "RAISE") + + # Separate baseline vs migration target + baseline_findings = [f for r in reports for f in r.findings if f.in_refactored_baseline] + migration_findings = [f for r in reports for f in r.findings if not f.in_refactored_baseline] + baseline_violations = sum(1 for f in baseline_findings if f.category in VIOLATION_CATEGORIES) + migration_violations = sum(1 for f in migration_findings if f.category in VIOLATION_CATEGORIES) + + lines.append("=== Exception Handling Audit (Data-Oriented Convention) ===\n") + lines.append(f"Files scanned: {files_scanned}") + lines.append(f"Files with findings: {len(reports)}") + lines.append(f"Total sites: {total_findings}") + lines.append(f" try: {try_count}") + lines.append(f" except: {except_count}") + lines.append(f" raise: {raise_count}") + lines.append("") + lines.append(f"Compliant sites: {total_compliant}") + lines.append(f"Suspicious sites: {total_suspicious}") + lines.append(f"Violation sites: {total_violations}") + lines.append(f"Unclear (review): {total_unclear}") + lines.append("") + lines.append("--- Baseline (refactored files: mcp_client, ai_client, rag_engine) ---") + lines.append(f" Sites: {len(baseline_findings)}, violations: {baseline_violations}") + lines.append("--- Migration target (all other src/ files) ---") + lines.append(f" Sites: {len(migration_findings)}, violations: {migration_violations}") + lines.append("") + + cat_counts = Counter(f.category for r in reports for f in r.findings) + lines.append("By category:") + for cat, n in cat_counts.most_common(): + mark = "" + if cat in VIOLATION_CATEGORIES: + mark = " (VIOLATION)" + elif cat == "INTERNAL_RETHROW": + mark = " (suspicious)" + elif cat in COMPLIANT_CATEGORIES: + mark = " (compliant)" + elif cat == "UNCLEAR": + mark = " (review)" + lines.append(f" {cat:30s} {n:4d}{mark}") + lines.append("") + + lines.append(f"--- Top {top} files by violation count (migration target only) ---") + ranked = sorted( + [r for r in reports if not r.is_refactored_baseline], + key=lambda r: (-r.violation_count, -len(r.findings), r.filename), + )[:top] + for r in ranked: + if r.violation_count == 0 and r.unclear_count == 0 and r.suspicious_count == 0: + continue + lines.append(f"\n{r.filename} (V={r.violation_count}, S={r.suspicious_count}, ?={r.unclear_count}, C={r.compliant_count}, total={len(r.findings)})") + if verbose: + for f in r.findings: + if f.category in VIOLATION_CATEGORIES or f.category in ("UNCLEAR", "INTERNAL_RETHROW"): + lines.append(f" L{f.line:4d} [{f.kind:7s}] {f.category:28s} in {f.context}") + lines.append(f" {f.snippet[:100]}") + lines.append(f" hint: {f.hint}") + else: + by_cat = Counter(f.category for f in r.findings if f.category in VIOLATION_CATEGORIES or f.category in ("UNCLEAR", "INTERNAL_RETHROW")) + for cat, n in by_cat.most_common(): + lines.append(f" {cat:30s} {n}") + + return "\n".join(lines) + "\n" + + +def render_json(reports: list[FileReport], files_scanned: int, top: int, verbose: bool) -> str: + total_findings = sum(len(r.findings) for r in reports) + total_violations = sum(r.violation_count for r in reports) + total_compliant = sum(r.compliant_count for r in reports) + total_unclear = sum(r.unclear_count for r in reports) + total_suspicious = sum(r.suspicious_count for r in reports) + baseline_findings = [f for r in reports for f in r.findings if f.in_refactored_baseline] + migration_findings = [f for r in reports for f in r.findings if not f.in_refactored_baseline] + baseline_violations = sum(1 for f in baseline_findings if f.category in VIOLATION_CATEGORIES) + migration_violations = sum(1 for f in migration_findings if f.category in VIOLATION_CATEGORIES) + + output = { + "refactored_baseline_files": sorted(REFACTORED_BASELINE_FILES), + "files_scanned": files_scanned, + "files_with_findings": len(reports), + "total_sites": total_findings, + "by_kind": dict(Counter(f.kind for r in reports for f in r.findings)), + "compliant_sites": total_compliant, + "suspicious_sites": total_suspicious, + "violation_sites": total_violations, + "unclear_sites": total_unclear, + "by_category": dict(Counter(f.category for r in reports for f in r.findings).most_common()), + "violations_by_category": dict(Counter( + f.category for r in reports for f in r.findings if f.category in VIOLATION_CATEGORIES + ).most_common()), + "baseline": { + "file_count": len([f for f in REFACTORED_BASELINE_FILES]), + "sites": len(baseline_findings), + "violations": baseline_violations, + }, + "migration_target": { + "sites": len(migration_findings), + "violations": migration_violations, + }, + "files": [ + { + "filename": r.filename, + "in_refactored_baseline": r.is_refactored_baseline, + "violation_count": r.violation_count, + "compliant_count": r.compliant_count, + "suspicious_count": r.suspicious_count, + "unclear_count": r.unclear_count, + "has_error": r.has_error, + "error_message": r.error_message, + "findings": [ + { + "line": f.line, + "kind": f.kind, + "context": f.context, + "category": f.category, + "snippet": f.snippet, + "hint": f.hint, + } + for f in r.findings + ] if verbose else [ + { + "line": f.line, + "kind": f.kind, + "context": f.context, + "category": f.category, + } + for f in r.findings + if f.category in VIOLATION_CATEGORIES or f.category in ("UNCLEAR", "INTERNAL_RETHROW") + ], + } + for r in sorted(reports, key=lambda r: (-r.violation_count, -r.suspicious_count, r.filename))[:top if not verbose else len(reports)] + ], + } + return json.dumps(output, indent=2) + + +def main() -> int: + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("--src", default="src", help="Source directory to audit (default: src)") + parser.add_argument("--json", action="store_true", help="Output JSON instead of human-readable report") + parser.add_argument("--top", type=int, default=15, help="Show top N files by violation count (default: 15)") + parser.add_argument("--verbose", action="store_true", help="Show every site inline (default: top N summary)") + parser.add_argument("--include-tests", action="store_true", help="Also scan tests/ and scripts/") + parser.add_argument("--strict", action="store_true", help="Exit 1 if any violations are found (for CI use)") + parser.add_argument("--include-baseline", action="store_true", help="Include the 3 refactored files in the violation count (default: exclude)") + parser.add_argument("--exclude", action="append", default=[], help="Additional path components to exclude (can repeat)") + args = parser.parse_args() + + src = Path(args.src) + try: + files = find_python_files(src) + except FileNotFoundError as e: + print(f"ERROR: {e}", file=sys.stderr) + return 1 + + if args.include_tests: + for extra in ("tests", "scripts"): + p = Path(extra) + if p.exists(): + files.extend(find_python_files(p)) + + if args.exclude: + files = [f for f in files if not any(ex in f.parts for ex in args.exclude)] + + reports: list[FileReport] = [audit_file(f) for f in files] + reports = [r for r in reports if r.findings or r.has_error] + + if args.json: + print(render_json(reports, len(files), args.top, args.verbose)) + if args.include_baseline: + total_violations = sum(r.violation_count for r in reports) + else: + total_violations = sum(r.violation_count for r in reports if not r.is_refactored_baseline) + return 1 if (args.strict and total_violations > 0) else 0 + + print(render_human(reports, len(files), args.top, args.verbose)) + + if args.include_baseline: + total_violations = sum(r.violation_count for r in reports) + else: + total_violations = sum(r.violation_count for r in reports if not r.is_refactored_baseline) + if args.strict and total_violations > 0: + print(f"\nSTRICT MODE: {total_violations} violation(s) found; exiting 1.", file=sys.stderr) + return 1 + return 0 + + +if __name__ == "__main__": + sys.exit(main())