Private
Public Access
0
0
Files
manual_slop/scripts/audit_exception_handling.py
T

793 lines
28 KiB
Python

#!/usr/bin/env python3
"""Audit try/except/finally/raise usage against the data-oriented error
handling convention.
This is an INFORMATIONAL audit, not a CI gate. It produces a report that the
user (or a follow-up track) uses to decide whether a refactor is worth it.
The convention (see conductor/code_styleguides/error_handling.md) requires:
- SDK-boundary exceptions are caught and converted to ErrorInfo.
- Internal code uses Result[T] (data + errors list), not Optional[T] + try/except.
- except Exception is a code smell (broad catch without conversion).
- `raise` is reserved for programmer errors (assert/raise for impossible states).
- `try/finally` is the canonical cleanup pattern (like `goto defer`).
- `raise` in __init__ is OK for "this constructor needs X" (programmer error).
- FastAPI `raise HTTPException` in _api_* handlers is the FastAPI-idiomatic
boundary; it's how the framework signals HTTP errors.
The 3 fully-refactored files (mcp_client.py, ai_client.py, rag_engine.py) are
the CONVENTION BASELINE. Everything outside them is the migration target.
The script classifies every exception-handling site into one of:
Category Convention status
---------------------------- -----------------------------------------
BOUNDARY_SDK Compliant (wraps third-party SDK or is in
a *_result function returning Result)
BOUNDARY_IO Compliant (wraps stdlib I/O that can raise)
BOUNDARY_CONVERSION Compliant (catches + converts to ErrorInfo)
BOUNDARY_FASTAPI Compliant (FastAPI HTTPException raise in
_api_* handler; framework-idiomatic)
INTERNAL_SILENT_SWALLOW Violation (except ...: pass or just logs)
INTERNAL_BROAD_CATCH Violation (except Exception without conversion)
INTERNAL_OPTIONAL_RETURN Violation (try/except + return None/Optional)
INTERNAL_RETHROW Suspicious (try/except + raise; refactorable)
INTERNAL_PROGRAMMER_RAISE Compliant (raise for impossible state in
__init__/assert/precondition; not a violation)
INTERNAL_COMPLIANT Compliant (try/finally cleanup pattern)
UNCLEAR Manual review needed
For each VIOLATION or SUSPICIOUS site, the script prints a 1-line hint at what
the fix could look like (e.g., "return Result(data=NIL_T, errors=[...])").
Usage:
uv run python scripts/audit_exception_handling.py # human report
uv run python python scripts/audit_exception_handling.py --json # JSON output
uv run python python scripts/audit_exception_handling.py --src src
uv run python python scripts/audit_exception_handling.py --top 20
uv run python python scripts/audit_exception_handling.py --verbose
uv run python python scripts/audit_exception_handling.py --strict
Exit codes:
0 - audit ran (informational mode; findings don't fail the script)
1 - usage error, or --strict mode with violations found
"""
from __future__ import annotations
import argparse
import ast
import json
import re
import sys
from collections import Counter
from dataclasses import dataclass, field
from pathlib import Path
# The 3 files that were fully refactored to the convention by the
# data_oriented_error_handling_20260606 track. Sites in these files are the
# BASELINE; sites outside them are the MIGRATION TARGET.
REFACTORED_BASELINE_FILES: frozenset[str] = frozenset({
"src/mcp_client.py",
"src/ai_client.py",
"src/rag_engine.py",
})
# Third-party SDKs the convention recognizes as boundary callers.
THIRD_PARTY_SDK_MODULES: frozenset[str] = frozenset({
"anthropic",
"anthropic.types",
"google",
"google.generativeai",
"google.genai",
"google.api_core",
"google.protobuf",
"google.auth",
"openai",
"openai.types",
"groq",
"groq.types",
"mistralai",
"cohere",
"chromadb",
"sentence_transformers",
"huggingface_hub",
"transformers",
"torch",
"requests",
"urllib3",
"httpx",
"aiohttp",
"websockets",
"fastapi",
"uvicorn",
"starlette",
"psutil",
"pydantic",
"PIL",
"cv2",
"numpy",
"tomli",
"tomllib",
"imgui_bundle",
"dearpygui",
"dearpygui.dearpygui",
})
# Stdlib exceptions that almost always indicate a legitimate boundary wrap.
STDLIB_IO_EXCEPTIONS: frozenset[str] = frozenset({
"OSError",
"IOError",
"FileNotFoundError",
"FileExistsError",
"PermissionError",
"IsADirectoryError",
"NotADirectoryError",
"TimeoutError",
"ConnectionError",
"ConnectionRefusedError",
"ConnectionResetError",
"ConnectionAbortedError",
"BrokenPipeError",
"socket.timeout",
"ssl.SSLError",
"json.JSONDecodeError",
"csv.Error",
"sqlite3.Error",
"sqlite3.IntegrityError",
"sqlite3.OperationalError",
"zipfile.BadZipFile",
"xml.etree.ElementTree.ParseError",
"subprocess.CalledProcessError",
"subprocess.TimeoutExpired",
})
# Third-party exception types commonly caught at the boundary.
THIRD_PARTY_EXCEPTIONS: frozenset[str] = frozenset({
"anthropic.APIError",
"anthropic.APIConnectionError",
"anthropic.RateLimitError",
"anthropic.AuthenticationError",
"anthropic.BadRequestError",
"anthropic.NotFoundError",
"anthropic.PermissionDeniedError",
"anthropic.UnprocessableEntityError",
"google.api_core.exceptions.GoogleAPIError",
"google.api_core.exceptions.ResourceExhausted",
"google.api_core.exceptions.PermissionDenied",
"google.api_core.exceptions.NotFound",
"google.api_core.exceptions.InvalidArgument",
"google.api_core.exceptions.DeadlineExceeded",
"google.api_core.exceptions.ServiceUnavailable",
"google.api_core.exceptions.Aborted",
"openai.OpenAIError",
"openai.APIError",
"openai.APIConnectionError",
"openai.RateLimitError",
"openai.AuthenticationError",
"openai.BadRequestError",
"openai.NotFoundError",
"openai.PermissionDeniedError",
"requests.RequestException",
"requests.ConnectionError",
"requests.Timeout",
"requests.HTTPError",
"requests.exceptions.SSLError",
"httpx.HTTPError",
"httpx.RequestError",
"httpx.TimeoutException",
"chromadb.errors.ChromaError",
"pydantic.ValidationError",
})
# FastAPI boundary exception - idiomatic in _api_* handlers.
FASTAPI_EXCEPTIONS: frozenset[str] = frozenset({
"fastapi.HTTPException",
"HTTPException",
})
# Programmer-error exceptions that are OK to raise (per the styleguide's
# "When to Use This Convention" section: "Constructors (__init__) that fail
# with programmer errors (use assert or raise for these)").
PROGRAMMER_ERROR_EXCEPTIONS: frozenset[str] = frozenset({
"AssertionError",
"ValueError",
"KeyError",
"IndexError",
"TypeError",
"AttributeError",
"NameError",
"RuntimeError",
"NotImplementedError",
})
# Categories that are considered violations
VIOLATION_CATEGORIES: frozenset[str] = frozenset({
"INTERNAL_SILENT_SWALLOW",
"INTERNAL_BROAD_CATCH",
"INTERNAL_OPTIONAL_RETURN",
})
# Categories that are considered compliant (canonical)
COMPLIANT_CATEGORIES: frozenset[str] = frozenset({
"BOUNDARY_SDK",
"BOUNDARY_IO",
"BOUNDARY_CONVERSION",
"BOUNDARY_FASTAPI",
"INTERNAL_PROGRAMMER_RAISE",
"INTERNAL_COMPLIANT",
})
@dataclass(frozen=True)
class Finding:
filename: str
line: int
kind: str
context: str
snippet: str
category: str
hint: str
in_refactored_baseline: bool
@dataclass
class FileReport:
filename: str
findings: list[Finding] = field(default_factory=list)
has_error: bool = False
error_message: str = ""
@property
def violation_count(self) -> int:
return sum(1 for f in self.findings if f.category in VIOLATION_CATEGORIES)
@property
def compliant_count(self) -> int:
return sum(1 for f in self.findings if f.category in COMPLIANT_CATEGORIES)
@property
def unclear_count(self) -> int:
return sum(1 for f in self.findings if f.category == "UNCLEAR")
@property
def suspicious_count(self) -> int:
return sum(1 for f in self.findings if f.category == "INTERNAL_RETHROW")
@property
def is_refactored_baseline(self) -> bool:
return any(f.in_refactored_baseline for f in self.findings)
class ExceptionVisitor(ast.NodeVisitor):
"""Walks the AST and classifies every try/except/finally/raise node."""
def __init__(self, filename: str) -> None:
self.filename = filename
self.report = FileReport(filename=filename)
self._func_stack: list[ast.FunctionDef | ast.AsyncFunctionDef] = []
self._try_stack: list[ast.Try | ast.TryStar] = []
# Normalize the filename for the baseline check
rel = filename.replace("\\", "/")
self._in_baseline = rel in {f.replace("\\", "/") for f in REFACTORED_BASELINE_FILES}
def _current_func_name(self) -> str:
if not self._func_stack:
return "<module>"
return self._func_stack[-1].name
def _current_func_node(self) -> ast.FunctionDef | ast.AsyncFunctionDef | None:
return self._func_stack[-1] if self._func_stack else None
def _is_third_party_call(self, body: list[ast.stmt]) -> bool:
"""Does this body make a call into a known third-party SDK?"""
for node in ast.walk(ast.Module(body=body, type_ignores=[])):
if isinstance(node, ast.Call):
func_str = ast.unparse(node.func)
top = func_str.split(".")[0]
if top in THIRD_PARTY_SDK_MODULES:
return True
parts = func_str.split(".")
for i in range(1, len(parts) + 1):
prefix = ".".join(parts[:i])
if prefix in THIRD_PARTY_SDK_MODULES:
return True
return False
def _is_fastapi_handler(self) -> bool:
"""Is the current function a FastAPI _api_* handler?"""
name = self._current_func_name()
return name.startswith("_api_") or name.startswith("api_")
def _enclosing_returns_result(self) -> bool:
"""Does any enclosing function return a Result-like type?"""
for func in self._func_stack:
if func.returns is None:
continue
ret_str = ast.unparse(func.returns)
if "Result[" in ret_str or ret_str == "Result":
return True
return False
def _classify_except(self, handler: ast.ExceptHandler, try_node: ast.Try) -> tuple[str, str]:
exc_type = handler.type
exc_name = ast.unparse(exc_type) if exc_type is not None else "Exception"
body = handler.body
handler_module = ast.unparse(exc_type).split(".")[0] if exc_type else ""
# Empty body or pass = silent swallow
is_silent = (
len(body) == 0
or all(isinstance(s, ast.Pass) for s in body)
)
# Re-raise detection
re_raises = any(
isinstance(s, ast.Raise) and s.exc is None
for s in ast.walk(ast.Module(body=body, type_ignores=[]))
)
# ErrorInfo creation
creates_errorinfo = any(
isinstance(s, ast.Call) and "ErrorInfo" in ast.unparse(s.func)
for s in ast.walk(ast.Module(body=body, type_ignores=[]))
)
# Returns None
returns_none = any(
isinstance(s, ast.Return) and (s.value is None or ast.unparse(s.value) == "None")
for s in body
)
# Enclosing function returns Optional[T]?
enclosing_func = self._current_func_node()
returns_optional = False
if enclosing_func is not None and enclosing_func.returns is not None:
ret_str = ast.unparse(enclosing_func.returns)
if "Optional" in ret_str or " | None" in ret_str:
returns_optional = True
is_third_party = self._is_third_party_call(try_node.body)
is_in_result_func = self._enclosing_returns_result()
# ----- Classification logic -----
# 1. ErrorInfo conversion = canonical boundary pattern
if creates_errorinfo:
return (
"BOUNDARY_CONVERSION",
"Compliant: catch + ErrorInfo conversion in a Result-returning function. This is the canonical SDK boundary pattern (per styleguide 'Catch SDK exceptions at the boundary only').",
)
# 2. FastAPI _api_* handler with broad catch (per app_controller pattern)
if self._is_fastapi_handler() and exc_name in ("Exception", "BaseException", ""):
return (
"BOUNDARY_FASTAPI",
"Compliant: FastAPI _api_* handler catches and converts to HTTPException at the framework boundary. This is the FastAPI-idiomatic pattern.",
)
# 3. Inside a *_result function with broad catch (likely SDK boundary)
if is_in_result_func and exc_name in ("Exception", "BaseException", ""):
if is_third_party:
return (
"BOUNDARY_SDK",
f"Compliant: broad `except {exc_name or 'Exception'}` in a *_result function that calls a third-party SDK. Consider narrowing the exception type or converting to ErrorInfo for a cleaner Result contract.",
)
return (
"INTERNAL_BROAD_CATCH",
f"Violation: `except {exc_name or 'Exception'}` in a Result-returning function without ErrorInfo conversion. Narrow the exception type, or convert to ErrorInfo in a Result (this is the canonical pattern in the 3 refactored files).",
)
# 4. Third-party SDK call
if is_third_party and (exc_name in THIRD_PARTY_EXCEPTIONS or "Error" in exc_name or "Exception" in exc_name or handler_module in THIRD_PARTY_SDK_MODULES):
return (
"BOUNDARY_SDK",
f"Compliant: third-party exception {exc_name} caught at the SDK boundary.",
)
# 5. Stdlib I/O exception
if is_third_party and exc_name in STDLIB_IO_EXCEPTIONS:
return (
"BOUNDARY_IO",
f"Compliant: stdlib I/O exception {exc_name} caught at a third-party call site.",
)
# 6. Re-raise
if re_raises:
if is_third_party:
return (
"BOUNDARY_SDK",
f"Compliant: re-raise after {exc_name} preserves the SDK boundary; consider ErrorInfo conversion for a Result-based API.",
)
return (
"INTERNAL_RETHROW",
"Suspicious: re-raising without conversion is a control-flow smell. Consider whether the caller should handle this via a Result instead.",
)
# 7. Silent swallow
if is_silent:
return (
"INTERNAL_SILENT_SWALLOW",
"Violation: silent swallow (`except ...: pass`) hides failures. Either let it propagate, return Result(data=NIL_T, errors=[...]), or document the intentional swallow with a comment-free `assert` for the precondition.",
)
# 8. Broad catch (Exception/BaseException)
if exc_name in ("Exception", "BaseException") or exc_name == "":
return (
"INTERNAL_BROAD_CATCH",
f"Violation: broad `except {exc_name or 'Exception'}` catches more than intended. Narrow the exception type, or convert to ErrorInfo in a Result.",
)
# 9. try/except + return None in Optional[T] function
if returns_none and returns_optional:
return (
"INTERNAL_OPTIONAL_RETURN",
f"Violation: `except {exc_name}: return None` in a function that returns Optional[T] violates the convention. Replace with `Result[T]` and return `Result(data=NIL_T, errors=[ErrorInfo(kind=..., message=...)])`.",
)
# 10. Stdlib I/O exception in our own code
if exc_name in STDLIB_IO_EXCEPTIONS and not is_third_party:
return (
"INTERNAL_COMPLIANT",
f"Compliant: stdlib I/O exception {exc_name} caught in our own code is acceptable (per convention, file/network errors are converted to ErrorInfo).",
)
return (
"UNCLEAR",
f"Manual review: catches {exc_name}; not obviously boundary or violation. Check whether the except site is converting to ErrorInfo (good) or hiding the error (bad).",
)
def _extract_raise_name(self, node: ast.expr) -> str:
"""Extract the exception class name from a raise expression.
For `raise HTTPException(...)` this returns 'HTTPException' (just the name).
For `raise ValueError('msg')` this returns 'ValueError'.
For `raise self.errors[0]` this returns the full expression (won't match).
"""
if isinstance(node, ast.Call):
return ast.unparse(node.func)
if isinstance(node, ast.Name):
return node.id
if isinstance(node, ast.Attribute):
return ast.unparse(node)
return ast.unparse(node)
def _classify_raise(self, node: ast.Raise) -> tuple[str, str]:
exc_str = ast.unparse(node) if node.exc else "raise"
exc_name = self._extract_raise_name(node.exc) if node.exc else ""
# Bare re-raise
if node.exc is None:
return (
"INTERNAL_RETHROW",
"Suspicious: re-raising without conversion. Consider propagating via Result instead.",
)
# FastAPI HTTPException in an _api_* handler
exc_short = exc_name.split(".")[-1]
if exc_short in {"HTTPException"} and self._is_fastapi_handler():
return (
"BOUNDARY_FASTAPI",
"Compliant: FastAPI HTTPException in _api_* handler. This is the framework-idiomatic way to signal HTTP errors; FastAPI converts it to a JSON response at the framework level.",
)
# Raising ErrorInfo
if "ErrorInfo" in exc_name:
return (
"INTERNAL_RETHROW",
"Violation: raising ErrorInfo as an exception defeats the data-oriented pattern. Return Result(data=NIL_T, errors=[ErrorInfo(...)]) instead.",
)
# Programmer error (in __init__ or as assert)
if exc_short in PROGRAMMER_ERROR_EXCEPTIONS:
func_name = self._current_func_name()
if func_name == "__init__":
return (
"INTERNAL_PROGRAMMER_RAISE",
f"Compliant: `{exc_short}` in `__init__` is the canonical constructor-precondition pattern (per styleguide 'When to Use This Convention': constructors that fail with programmer errors use assert/raise).",
)
if exc_short in {"AssertionError", "ValueError"} or "assert " in exc_str:
return (
"INTERNAL_PROGRAMMER_RAISE",
f"Compliant: `{exc_short}` for an impossible state / precondition check. The styleguide reserves `raise` for programmer errors.",
)
return (
"INTERNAL_RETHROW",
f"Review: `raise {exc_name}` in internal code. Confirm this is a programmer error (assertion) and not a runtime failure (which should be a Result).",
)
def _snippet(self, node: ast.AST) -> str:
return ast.unparse(node).replace("\n", " ").strip()[:120]
def visit_FunctionDef(self, node: ast.FunctionDef) -> None:
self._func_stack.append(node)
try:
self.generic_visit(node)
finally:
self._func_stack.pop()
def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None:
self._func_stack.append(node)
try:
self.generic_visit(node)
finally:
self._func_stack.pop()
def _add_finding(self, kind: str, line: int, snippet: str, category: str, hint: str) -> None:
self.report.findings.append(Finding(
filename=self.filename,
line=line,
kind=kind,
context=self._current_func_name(),
snippet=snippet,
category=category,
hint=hint,
in_refactored_baseline=self._in_baseline,
))
def visit_Try(self, node: ast.Try) -> None:
self._try_stack.append(node)
try:
# bare try/finally (no except) = canonical cleanup pattern
if not node.handlers and node.finalbody:
self._add_finding(
"TRY",
node.lineno,
self._snippet(node),
"INTERNAL_COMPLIANT",
"Compliant: bare try/finally is the canonical cleanup pattern (analog of `goto defer`).",
)
for handler in node.handlers:
category, hint = self._classify_except(handler, node)
self._add_finding("EXCEPT", handler.lineno, self._snippet(handler), category, hint)
for child in handler.body if node.handlers else []:
self.visit(child)
for child in node.orelse:
self.visit(child)
for child in node.finalbody:
self.visit(child)
finally:
self._try_stack.pop()
def visit_TryStar(self, node: ast.TryStar) -> None:
self.visit_Try(node) # type: ignore[arg-type]
def visit_Raise(self, node: ast.Raise) -> None:
category, hint = self._classify_raise(node)
self._add_finding("RAISE", node.lineno, self._snippet(node), category, hint)
self.generic_visit(node)
def audit_file(filepath: Path) -> FileReport:
try:
source = filepath.read_text(encoding="utf-8")
except (OSError, UnicodeDecodeError) as e:
report = FileReport(filename=str(filepath))
report.has_error = True
report.error_message = f"could not read: {e}"
return report
try:
tree = ast.parse(source, filename=str(filepath))
except SyntaxError as e:
report = FileReport(filename=str(filepath))
report.has_error = True
report.error_message = f"syntax error: {e}"
return report
visitor = ExceptionVisitor(str(filepath))
visitor.visit(tree)
return visitor.report
def find_python_files(root: Path, exclude_artifacts: bool = True) -> list[Path]:
if not root.exists():
raise FileNotFoundError(f"Source directory not found: {root}")
files = sorted(p for p in root.rglob("*.py") if "__pycache__" not in p.parts)
if exclude_artifacts:
files = [p for p in files if "artifacts" not in p.parts]
return files
def render_human(reports: list[FileReport], files_scanned: int, top: int, verbose: bool) -> str:
lines: list[str] = []
total_findings = sum(len(r.findings) for r in reports)
total_violations = sum(r.violation_count for r in reports)
total_compliant = sum(r.compliant_count for r in reports)
total_unclear = sum(r.unclear_count for r in reports)
total_suspicious = sum(r.suspicious_count for r in reports)
try_count = sum(1 for r in reports for f in r.findings if f.kind == "TRY")
except_count = sum(1 for r in reports for f in r.findings if f.kind == "EXCEPT")
finally_count = sum(1 for r in reports for f in r.findings if f.kind == "FINALLY")
raise_count = sum(1 for r in reports for f in r.findings if f.kind == "RAISE")
# Separate baseline vs migration target
baseline_findings = [f for r in reports for f in r.findings if f.in_refactored_baseline]
migration_findings = [f for r in reports for f in r.findings if not f.in_refactored_baseline]
baseline_violations = sum(1 for f in baseline_findings if f.category in VIOLATION_CATEGORIES)
migration_violations = sum(1 for f in migration_findings if f.category in VIOLATION_CATEGORIES)
lines.append("=== Exception Handling Audit (Data-Oriented Convention) ===\n")
lines.append(f"Files scanned: {files_scanned}")
lines.append(f"Files with findings: {len(reports)}")
lines.append(f"Total sites: {total_findings}")
lines.append(f" try: {try_count}")
lines.append(f" except: {except_count}")
lines.append(f" raise: {raise_count}")
lines.append("")
lines.append(f"Compliant sites: {total_compliant}")
lines.append(f"Suspicious sites: {total_suspicious}")
lines.append(f"Violation sites: {total_violations}")
lines.append(f"Unclear (review): {total_unclear}")
lines.append("")
lines.append("--- Baseline (refactored files: mcp_client, ai_client, rag_engine) ---")
lines.append(f" Sites: {len(baseline_findings)}, violations: {baseline_violations}")
lines.append("--- Migration target (all other src/ files) ---")
lines.append(f" Sites: {len(migration_findings)}, violations: {migration_violations}")
lines.append("")
cat_counts = Counter(f.category for r in reports for f in r.findings)
lines.append("By category:")
for cat, n in cat_counts.most_common():
mark = ""
if cat in VIOLATION_CATEGORIES:
mark = " (VIOLATION)"
elif cat == "INTERNAL_RETHROW":
mark = " (suspicious)"
elif cat in COMPLIANT_CATEGORIES:
mark = " (compliant)"
elif cat == "UNCLEAR":
mark = " (review)"
lines.append(f" {cat:30s} {n:4d}{mark}")
lines.append("")
lines.append(f"--- Top {top} files by violation count (migration target only) ---")
ranked = sorted(
[r for r in reports if not r.is_refactored_baseline],
key=lambda r: (-r.violation_count, -len(r.findings), r.filename),
)[:top]
for r in ranked:
if r.violation_count == 0 and r.unclear_count == 0 and r.suspicious_count == 0:
continue
lines.append(f"\n{r.filename} (V={r.violation_count}, S={r.suspicious_count}, ?={r.unclear_count}, C={r.compliant_count}, total={len(r.findings)})")
if verbose:
for f in r.findings:
if f.category in VIOLATION_CATEGORIES or f.category in ("UNCLEAR", "INTERNAL_RETHROW"):
lines.append(f" L{f.line:4d} [{f.kind:7s}] {f.category:28s} in {f.context}")
lines.append(f" {f.snippet[:100]}")
lines.append(f" hint: {f.hint}")
else:
by_cat = Counter(f.category for f in r.findings if f.category in VIOLATION_CATEGORIES or f.category in ("UNCLEAR", "INTERNAL_RETHROW"))
for cat, n in by_cat.most_common():
lines.append(f" {cat:30s} {n}")
return "\n".join(lines) + "\n"
def render_json(reports: list[FileReport], files_scanned: int, top: int, verbose: bool) -> str:
total_findings = sum(len(r.findings) for r in reports)
total_violations = sum(r.violation_count for r in reports)
total_compliant = sum(r.compliant_count for r in reports)
total_unclear = sum(r.unclear_count for r in reports)
total_suspicious = sum(r.suspicious_count for r in reports)
baseline_findings = [f for r in reports for f in r.findings if f.in_refactored_baseline]
migration_findings = [f for r in reports for f in r.findings if not f.in_refactored_baseline]
baseline_violations = sum(1 for f in baseline_findings if f.category in VIOLATION_CATEGORIES)
migration_violations = sum(1 for f in migration_findings if f.category in VIOLATION_CATEGORIES)
output = {
"refactored_baseline_files": sorted(REFACTORED_BASELINE_FILES),
"files_scanned": files_scanned,
"files_with_findings": len(reports),
"total_sites": total_findings,
"by_kind": dict(Counter(f.kind for r in reports for f in r.findings)),
"compliant_sites": total_compliant,
"suspicious_sites": total_suspicious,
"violation_sites": total_violations,
"unclear_sites": total_unclear,
"by_category": dict(Counter(f.category for r in reports for f in r.findings).most_common()),
"violations_by_category": dict(Counter(
f.category for r in reports for f in r.findings if f.category in VIOLATION_CATEGORIES
).most_common()),
"baseline": {
"file_count": len([f for f in REFACTORED_BASELINE_FILES]),
"sites": len(baseline_findings),
"violations": baseline_violations,
},
"migration_target": {
"sites": len(migration_findings),
"violations": migration_violations,
},
"files": [
{
"filename": r.filename,
"in_refactored_baseline": r.is_refactored_baseline,
"violation_count": r.violation_count,
"compliant_count": r.compliant_count,
"suspicious_count": r.suspicious_count,
"unclear_count": r.unclear_count,
"has_error": r.has_error,
"error_message": r.error_message,
"findings": [
{
"line": f.line,
"kind": f.kind,
"context": f.context,
"category": f.category,
"snippet": f.snippet,
"hint": f.hint,
}
for f in r.findings
] if verbose else [
{
"line": f.line,
"kind": f.kind,
"context": f.context,
"category": f.category,
}
for f in r.findings
if f.category in VIOLATION_CATEGORIES or f.category in ("UNCLEAR", "INTERNAL_RETHROW")
],
}
for r in sorted(reports, key=lambda r: (-r.violation_count, -r.suspicious_count, r.filename))[:top if not verbose else len(reports)]
],
}
return json.dumps(output, indent=2)
def main() -> int:
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("--src", default="src", help="Source directory to audit (default: src)")
parser.add_argument("--json", action="store_true", help="Output JSON instead of human-readable report")
parser.add_argument("--top", type=int, default=15, help="Show top N files by violation count (default: 15)")
parser.add_argument("--verbose", action="store_true", help="Show every site inline (default: top N summary)")
parser.add_argument("--include-tests", action="store_true", help="Also scan tests/ and scripts/")
parser.add_argument("--strict", action="store_true", help="Exit 1 if any violations are found (for CI use)")
parser.add_argument("--include-baseline", action="store_true", help="Include the 3 refactored files in the violation count (default: exclude)")
parser.add_argument("--exclude", action="append", default=[], help="Additional path components to exclude (can repeat)")
args = parser.parse_args()
src = Path(args.src)
try:
files = find_python_files(src)
except FileNotFoundError as e:
print(f"ERROR: {e}", file=sys.stderr)
return 1
if args.include_tests:
for extra in ("tests", "scripts"):
p = Path(extra)
if p.exists():
files.extend(find_python_files(p))
if args.exclude:
files = [f for f in files if not any(ex in f.parts for ex in args.exclude)]
reports: list[FileReport] = [audit_file(f) for f in files]
reports = [r for r in reports if r.findings or r.has_error]
if args.json:
print(render_json(reports, len(files), args.top, args.verbose))
if args.include_baseline:
total_violations = sum(r.violation_count for r in reports)
else:
total_violations = sum(r.violation_count for r in reports if not r.is_refactored_baseline)
return 1 if (args.strict and total_violations > 0) else 0
print(render_human(reports, len(files), args.top, args.verbose))
if args.include_baseline:
total_violations = sum(r.violation_count for r in reports)
else:
total_violations = sum(r.violation_count for r in reports if not r.is_refactored_baseline)
if args.strict and total_violations > 0:
print(f"\nSTRICT MODE: {total_violations} violation(s) found; exiting 1.", file=sys.stderr)
return 1
return 0
if __name__ == "__main__":
sys.exit(main())