efe0637a92
Heuristic E: narrow + structured error carrier (per TIER1_REVIEW_phase9_dilemma_20260620):
- except (NarrowType): return ErrorInfo(...) -> INTERNAL_COMPLIANT
- except (NarrowType): <item>["error"] = True -> INTERNAL_COMPLIANT
Distinguishes from the empty-default pattern (args = {}, body = ...) which
is explicitly NOT a drain per error_handling.md:528-531.
Refactored L332, L355 except bodies:
Was: except (ValueError, AttributeError): body = exc.response.text
Now: except (ValueError, AttributeError) as e: return ErrorInfo(...)
The function still returns ErrorInfo either way. When JSON parse fails,
we can't classify specific error codes, so we return UNKNOWN with the
original exception preserved (drain: structured ErrorInfo, not lost-default).
Added 2 helper methods:
_has_errorinfo_return(stmts) -> bool
_has_dict_error_true_assign(stmts) -> bool
Tests: 41 pass (28 baseline + 13 audit heuristics including the original 8).
Audit: ai_client UNCLEAR 6 -> 4 (L332+L355 now BOUNDARY_CONVERSION).
Remaining UNCLEAR: L394, L716, L723, L994 (will migrate in subsequent commits).
1589 lines
68 KiB
Python
1589 lines
68 KiB
Python
#!/usr/bin/env python3
|
|
"""Audit try/except/finally/raise usage against the data-oriented error
|
|
handling convention.
|
|
|
|
This audit is INFORMATIONAL by default (exits 0) so developers can run it
|
|
freely to see the current state. Pass `--strict` (or its alias `--ci`)
|
|
to enable CI-gate mode (exits 1 on any violation). The 4-script
|
|
enforcement set (see docs/AGENTS.md "Convention Enforcement") uses
|
|
`--strict` mode for pre-commit hooks and CI.
|
|
|
|
The convention (see conductor/code_styleguides/error_handling.md) requires:
|
|
|
|
- SDK-boundary exceptions are caught and converted to ErrorInfo.
|
|
- Internal code uses Result[T] (data + errors list), not Optional[T] + try/except.
|
|
- except Exception is a code smell (broad catch without conversion).
|
|
- `raise` is reserved for programmer errors (assert/raise for impossible states).
|
|
- `try/finally` is the canonical cleanup pattern (like `goto defer`).
|
|
- `raise` in __init__ is OK for "this constructor needs X" (programmer error).
|
|
- FastAPI `raise HTTPException` in _api_* handlers is the FastAPI-idiomatic
|
|
boundary; it's how the framework signals HTTP errors.
|
|
|
|
The 3 fully-refactored files (mcp_client.py, ai_client.py, rag_engine.py) are
|
|
the CONVENTION BASELINE. Everything outside them is the migration target.
|
|
|
|
The script classifies every exception-handling site into one of:
|
|
|
|
Category Convention status
|
|
---------------------------- -----------------------------------------
|
|
BOUNDARY_SDK Compliant (wraps third-party SDK or is in
|
|
a *_result function returning Result)
|
|
BOUNDARY_IO Compliant (wraps stdlib I/O that can raise)
|
|
BOUNDARY_CONVERSION Compliant (catches + converts to ErrorInfo)
|
|
BOUNDARY_FASTAPI Compliant (FastAPI HTTPException raise in
|
|
_api_* handler; framework-idiomatic)
|
|
INTERNAL_SILENT_SWALLOW Violation (except ...: pass or just logs)
|
|
INTERNAL_BROAD_CATCH Violation (except Exception without conversion)
|
|
INTERNAL_OPTIONAL_RETURN Violation (try/except + return None/Optional)
|
|
INTERNAL_RETHROW Suspicious (try/except + raise; refactorable)
|
|
INTERNAL_PROGRAMMER_RAISE Compliant (raise for impossible state in
|
|
__init__/assert/precondition; not a violation)
|
|
INTERNAL_COMPLIANT Compliant (try/finally cleanup pattern)
|
|
UNCLEAR Manual review needed
|
|
|
|
For each VIOLATION or SUSPICIOUS site, the script prints a 1-line hint at what
|
|
the fix could look like (e.g., "return Result(data=NIL_T, errors=[...])").
|
|
|
|
Usage:
|
|
uv run python scripts/audit_exception_handling.py # human report
|
|
uv run python scripts/audit_exception_handling.py --json # JSON output
|
|
uv run python scripts/audit_exception_handling.py --src src # source dir
|
|
uv run python scripts/audit_exception_handling.py --top 20 # top N files
|
|
uv run python scripts/audit_exception_handling.py --verbose # every site
|
|
uv run python scripts/audit_exception_handling.py --strict # CI gate (exit 1 on violation)
|
|
uv run python scripts/audit_exception_handling.py --ci # alias for --strict
|
|
uv run python scripts/audit_exception_handling.py --summary # per-file summary table
|
|
uv run python scripts/audit_exception_handling.py --by-size # group by migration effort
|
|
|
|
Pre-commit / CI use (the convention's CI gate):
|
|
uv run python scripts/audit_exception_handling.py --strict
|
|
# Exits 1 on any violation. Use in pre-commit hooks and CI to enforce
|
|
# the data-oriented error handling convention. Part of the 4-script
|
|
# enforcement set (see docs/AGENTS.md "Convention Enforcement").
|
|
|
|
Output modes (mutually exclusive; --json / --summary / --by-size override
|
|
the default human-readable report):
|
|
--summary: per-file table sorted by V+S descending. Use this for
|
|
"which files have the most violations" planning questions.
|
|
--by-size: groups files into small/medium/large/baseline buckets.
|
|
Use this for "how many migration tracks do I need" planning.
|
|
(default): top-N files with per-site breakdown and 1-line hints.
|
|
|
|
Exit codes:
|
|
0 - audit ran in informational mode (default; no violations fail the script)
|
|
1 - usage error, or --strict/--ci mode with violations found
|
|
2 - source directory not found
|
|
"""
|
|
from __future__ import annotations
|
|
import argparse
|
|
import ast
|
|
import json
|
|
import re
|
|
import sys
|
|
from collections import Counter
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
|
|
|
|
# The 3 files that were fully refactored to the convention by the
|
|
# data_oriented_error_handling_20260606 track. Sites in these files are the
|
|
# BASELINE; sites outside them are the MIGRATION TARGET.
|
|
REFACTORED_BASELINE_FILES: frozenset[str] = frozenset({
|
|
"src/mcp_client.py",
|
|
"src/ai_client.py",
|
|
"src/rag_engine.py",
|
|
})
|
|
|
|
# Third-party SDKs the convention recognizes as boundary callers.
|
|
THIRD_PARTY_SDK_MODULES: frozenset[str] = frozenset({
|
|
"anthropic",
|
|
"anthropic.types",
|
|
"google",
|
|
"google.generativeai",
|
|
"google.genai",
|
|
"google.api_core",
|
|
"google.protobuf",
|
|
"google.auth",
|
|
"openai",
|
|
"openai.types",
|
|
"groq",
|
|
"groq.types",
|
|
"mistralai",
|
|
"cohere",
|
|
"chromadb",
|
|
"sentence_transformers",
|
|
"huggingface_hub",
|
|
"transformers",
|
|
"torch",
|
|
"requests",
|
|
"urllib3",
|
|
"httpx",
|
|
"aiohttp",
|
|
"websockets",
|
|
"fastapi",
|
|
"uvicorn",
|
|
"starlette",
|
|
"psutil",
|
|
"pydantic",
|
|
"PIL",
|
|
"cv2",
|
|
"numpy",
|
|
"tomli",
|
|
"tomllib",
|
|
"imgui_bundle",
|
|
"dearpygui",
|
|
"dearpygui.dearpygui",
|
|
})
|
|
|
|
# Stdlib exceptions that almost always indicate a legitimate boundary wrap.
|
|
STDLIB_IO_EXCEPTIONS: frozenset[str] = frozenset({
|
|
"OSError",
|
|
"IOError",
|
|
"FileNotFoundError",
|
|
"FileExistsError",
|
|
"PermissionError",
|
|
"IsADirectoryError",
|
|
"NotADirectoryError",
|
|
"TimeoutError",
|
|
"ConnectionError",
|
|
"ConnectionRefusedError",
|
|
"ConnectionResetError",
|
|
"ConnectionAbortedError",
|
|
"BrokenPipeError",
|
|
"socket.timeout",
|
|
"ssl.SSLError",
|
|
"json.JSONDecodeError",
|
|
"csv.Error",
|
|
"sqlite3.Error",
|
|
"sqlite3.IntegrityError",
|
|
"sqlite3.OperationalError",
|
|
"zipfile.BadZipFile",
|
|
"xml.etree.ElementTree.ParseError",
|
|
"subprocess.CalledProcessError",
|
|
"subprocess.TimeoutExpired",
|
|
})
|
|
|
|
# Third-party exception types commonly caught at the boundary.
|
|
THIRD_PARTY_EXCEPTIONS: frozenset[str] = frozenset({
|
|
"anthropic.APIError",
|
|
"anthropic.APIConnectionError",
|
|
"anthropic.RateLimitError",
|
|
"anthropic.AuthenticationError",
|
|
"anthropic.BadRequestError",
|
|
"anthropic.NotFoundError",
|
|
"anthropic.PermissionDeniedError",
|
|
"anthropic.UnprocessableEntityError",
|
|
"google.api_core.exceptions.GoogleAPIError",
|
|
"google.api_core.exceptions.ResourceExhausted",
|
|
"google.api_core.exceptions.PermissionDenied",
|
|
"google.api_core.exceptions.NotFound",
|
|
"google.api_core.exceptions.InvalidArgument",
|
|
"google.api_core.exceptions.DeadlineExceeded",
|
|
"google.api_core.exceptions.ServiceUnavailable",
|
|
"google.api_core.exceptions.Aborted",
|
|
"openai.OpenAIError",
|
|
"openai.APIError",
|
|
"openai.APIConnectionError",
|
|
"openai.RateLimitError",
|
|
"openai.AuthenticationError",
|
|
"openai.BadRequestError",
|
|
"openai.NotFoundError",
|
|
"openai.PermissionDeniedError",
|
|
"requests.RequestException",
|
|
"requests.ConnectionError",
|
|
"requests.Timeout",
|
|
"requests.HTTPError",
|
|
"requests.exceptions.SSLError",
|
|
"httpx.HTTPError",
|
|
"httpx.RequestError",
|
|
"httpx.TimeoutException",
|
|
"chromadb.errors.ChromaError",
|
|
"pydantic.ValidationError",
|
|
})
|
|
|
|
# FastAPI boundary exception - idiomatic in _api_* handlers.
|
|
FASTAPI_EXCEPTIONS: frozenset[str] = frozenset({
|
|
"fastapi.HTTPException",
|
|
"HTTPException",
|
|
})
|
|
|
|
# Programmer-error exceptions that are OK to raise (per the styleguide's
|
|
# "When to Use This Convention" section: "Constructors (__init__) that fail
|
|
# with programmer errors (use assert or raise for these)").
|
|
PROGRAMMER_ERROR_EXCEPTIONS: frozenset[str] = frozenset({
|
|
"AssertionError",
|
|
"ValueError",
|
|
"KeyError",
|
|
"IndexError",
|
|
"TypeError",
|
|
"AttributeError",
|
|
"NameError",
|
|
"RuntimeError",
|
|
"NotImplementedError",
|
|
})
|
|
|
|
# Lazy-loader method names: the canonical naming convention for proxy
|
|
# classes that defer a heavy import until first attribute access or call
|
|
# (e.g. _LazyModule._resolve, _load, _get, _try_load). The audit
|
|
# recognizes these as the canonical context for the sentinel-fallback
|
|
# pattern (Phase 12.1 result_migration_gui_2_20260619): when the import
|
|
# or attribute access fails, the except body falls back to a documented
|
|
# sentinel class instance with an `available: bool = False` flag (or
|
|
# similar) so the UI can detect the stub and offer an alternative
|
|
# path. This is the canonical graceful-degradation pattern per
|
|
# error_handling.md:625-690 (Re-Raise Patterns).
|
|
LAZY_LOADER_METHOD_NAMES: frozenset[str] = frozenset({
|
|
"_resolve",
|
|
"_load",
|
|
"_get",
|
|
"_try_load",
|
|
})
|
|
|
|
# Categories that are considered violations
|
|
VIOLATION_CATEGORIES: frozenset[str] = frozenset({
|
|
"INTERNAL_SILENT_SWALLOW",
|
|
"INTERNAL_BROAD_CATCH",
|
|
"INTERNAL_OPTIONAL_RETURN",
|
|
})
|
|
|
|
# Categories that are considered compliant (canonical)
|
|
COMPLIANT_CATEGORIES: frozenset[str] = frozenset({
|
|
"BOUNDARY_SDK",
|
|
"BOUNDARY_IO",
|
|
"BOUNDARY_CONVERSION",
|
|
"BOUNDARY_FASTAPI",
|
|
"INTERNAL_PROGRAMMER_RAISE",
|
|
"INTERNAL_COMPLIANT",
|
|
})
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Finding:
|
|
filename: str
|
|
line: int
|
|
kind: str
|
|
context: str
|
|
snippet: str
|
|
category: str
|
|
hint: str
|
|
in_refactored_baseline: bool
|
|
|
|
|
|
@dataclass
|
|
class FileReport:
|
|
filename: str
|
|
findings: list[Finding] = field(default_factory=list)
|
|
has_error: bool = False
|
|
error_message: str = ""
|
|
|
|
@property
|
|
def violation_count(self) -> int:
|
|
return sum(1 for f in self.findings if f.category in VIOLATION_CATEGORIES)
|
|
|
|
@property
|
|
def compliant_count(self) -> int:
|
|
return sum(1 for f in self.findings if f.category in COMPLIANT_CATEGORIES)
|
|
|
|
@property
|
|
def unclear_count(self) -> int:
|
|
return sum(1 for f in self.findings if f.category == "UNCLEAR")
|
|
|
|
@property
|
|
def suspicious_count(self) -> int:
|
|
return sum(1 for f in self.findings if f.category == "INTERNAL_RETHROW")
|
|
|
|
@property
|
|
def is_refactored_baseline(self) -> bool:
|
|
return any(f.in_refactored_baseline for f in self.findings)
|
|
|
|
|
|
class ExceptionVisitor(ast.NodeVisitor):
|
|
"""Walks the AST and classifies every try/except/finally/raise node."""
|
|
|
|
def __init__(self, filename: str) -> None:
|
|
self.filename = filename
|
|
self.report = FileReport(filename=filename)
|
|
self._func_stack: list[ast.FunctionDef | ast.AsyncFunctionDef] = []
|
|
self._try_stack: list[ast.Try | ast.TryStar] = []
|
|
# Normalize the filename for the baseline check
|
|
rel = filename.replace("\\", "/")
|
|
self._in_baseline = rel in {f.replace("\\", "/") for f in REFACTORED_BASELINE_FILES}
|
|
|
|
def _current_func_name(self) -> str:
|
|
if not self._func_stack:
|
|
return "<module>"
|
|
return self._func_stack[-1].name
|
|
|
|
def _current_func_node(self) -> ast.FunctionDef | ast.AsyncFunctionDef | None:
|
|
return self._func_stack[-1] if self._func_stack else None
|
|
|
|
def _is_third_party_call(self, body: list[ast.stmt]) -> bool:
|
|
"""Does this body make a call into a known third-party SDK?"""
|
|
for node in ast.walk(ast.Module(body=body, type_ignores=[])):
|
|
if isinstance(node, ast.Call):
|
|
func_str = ast.unparse(node.func)
|
|
top = func_str.split(".")[0]
|
|
if top in THIRD_PARTY_SDK_MODULES:
|
|
return True
|
|
parts = func_str.split(".")
|
|
for i in range(1, len(parts) + 1):
|
|
prefix = ".".join(parts[:i])
|
|
if prefix in THIRD_PARTY_SDK_MODULES:
|
|
return True
|
|
return False
|
|
|
|
def _is_fastapi_handler(self) -> bool:
|
|
"""Is the current function a FastAPI _api_* handler?"""
|
|
name = self._current_func_name()
|
|
return name.startswith("_api_") or name.startswith("api_")
|
|
|
|
def _enclosing_returns_result(self) -> bool:
|
|
"""Does any enclosing function return a Result-like type?"""
|
|
for func in self._func_stack:
|
|
if func.returns is None:
|
|
continue
|
|
ret_str = ast.unparse(func.returns)
|
|
if "Result[" in ret_str or ret_str == "Result":
|
|
return True
|
|
return False
|
|
|
|
def _except_body_drains_via_http_exception_or_result(self, handler: ast.ExceptHandler) -> bool:
|
|
"""Phase 7 FR5: does the except body actually drain errors via
|
|
`raise HTTPException(...)` or `return Result(...)`?
|
|
|
|
This is the canonical BOUNDARY_FASTAPI pattern: a `_api_*` handler
|
|
must raise HTTPException (so the framework converts to HTTP response)
|
|
or return a Result (propagated to a caller that raises HTTPException).
|
|
|
|
Per error_handling.md:534, BOUNDARY_FASTAPI only applies to actual
|
|
HTTPException raises. Without this check, the heuristic over-applied
|
|
to logging-only except bodies (e.g. `_api_generate` L242 and L256
|
|
pre-Phase-7)."""
|
|
for node in ast.walk(ast.Module(body=handler.body, type_ignores=[])):
|
|
# 1. raise HTTPException(...)
|
|
if isinstance(node, ast.Raise) and node.exc is not None:
|
|
exc = node.exc
|
|
if isinstance(exc, ast.Call) and isinstance(exc.func, ast.Name):
|
|
if exc.func.id == "HTTPException":
|
|
return True
|
|
if isinstance(exc, ast.Call) and isinstance(exc.func, ast.Attribute):
|
|
if exc.func.attr == "HTTPException":
|
|
return True
|
|
# 2. return Result(...)
|
|
if isinstance(node, ast.Return) and node.value is not None:
|
|
if isinstance(node.value, ast.Call):
|
|
func = node.value.func
|
|
if isinstance(func, ast.Name) and func.id == "Result":
|
|
return True
|
|
if isinstance(func, ast.Attribute) and func.attr == "Result":
|
|
return True
|
|
return False
|
|
|
|
def _except_body_has_logging(self, body: list) -> bool:
|
|
"""Phase 7 FR5: does the except body contain logging (debug/log/warn/error)
|
|
or print/sys.stderr.write calls?
|
|
|
|
Used to distinguish INTERNAL_SILENT_SWALLOW (logging-only, violation)
|
|
from INTERNAL_COMPLIANT (try/finally cleanup or empty body)."""
|
|
for node in ast.walk(ast.Module(body=body, type_ignores=[])):
|
|
if isinstance(node, ast.Call):
|
|
func = node.func
|
|
func_str = ast.unparse(func)
|
|
# logging.getLogger(...).debug/log/info/warn/error or just print
|
|
if ".debug(" in func_str or ".info(" in func_str or ".warning(" in func_str or ".error(" in func_str:
|
|
return True
|
|
if ".log(" in func_str:
|
|
return True
|
|
if func_str == "print" or "sys.stderr.write" in func_str:
|
|
return True
|
|
return False
|
|
|
|
def _classify_except(self, handler: ast.ExceptHandler, try_node: ast.Try) -> tuple[str, str]:
|
|
exc_type = handler.type
|
|
exc_name = ast.unparse(exc_type) if exc_type is not None else "Exception"
|
|
body = handler.body
|
|
handler_module = ast.unparse(exc_type).split(".")[0] if exc_type else ""
|
|
|
|
# Empty body or pass = silent swallow
|
|
is_silent = (
|
|
len(body) == 0
|
|
or all(isinstance(s, ast.Pass) for s in body)
|
|
)
|
|
|
|
# Re-raise detection
|
|
re_raises = any(
|
|
isinstance(s, ast.Raise) and s.exc is None
|
|
for s in ast.walk(ast.Module(body=body, type_ignores=[]))
|
|
)
|
|
|
|
# ErrorInfo creation
|
|
creates_errorinfo = any(
|
|
isinstance(s, ast.Call) and "ErrorInfo" in ast.unparse(s.func)
|
|
for s in ast.walk(ast.Module(body=body, type_ignores=[]))
|
|
)
|
|
|
|
# Returns None
|
|
returns_none = any(
|
|
isinstance(s, ast.Return) and (s.value is None or ast.unparse(s.value) == "None")
|
|
for s in body
|
|
)
|
|
|
|
# Enclosing function returns Optional[T]?
|
|
enclosing_func = self._current_func_node()
|
|
returns_optional = False
|
|
if enclosing_func is not None and enclosing_func.returns is not None:
|
|
ret_str = ast.unparse(enclosing_func.returns)
|
|
if "Optional" in ret_str or " | None" in ret_str:
|
|
returns_optional = True
|
|
|
|
is_third_party = self._is_third_party_call(try_node.body)
|
|
is_in_result_func = self._enclosing_returns_result()
|
|
|
|
# ----- Classification logic -----
|
|
|
|
# 0. Heuristic A: Result-returning recovery — the canonical data-oriented pattern.
|
|
# If the except body returns `Result(data=..., errors=[ErrorInfo(...)])`,
|
|
# the function is following the convention. Classify as INTERNAL_COMPLIANT
|
|
# BEFORE the BOUNDARY_CONVERSION check (which also fires for ErrorInfo creation).
|
|
if self._returns_result(body):
|
|
return (
|
|
"INTERNAL_COMPLIANT",
|
|
"Compliant: `try: ...; except: return Result(data=..., errors=[...])` is the canonical Result-recovery pattern. The convention requires Result[T] for try/except sites that can fail; this pattern satisfies the requirement. The function-name-not-ending-in-`_result` is a smell (rename to `xxx_result`); the pattern itself is compliant. (per result_migration_small_files_20260617 Phase 11.2, Heuristic A)",
|
|
)
|
|
|
|
# 1. ErrorInfo conversion = canonical boundary pattern
|
|
if creates_errorinfo:
|
|
return (
|
|
"BOUNDARY_CONVERSION",
|
|
"Compliant: catch + ErrorInfo conversion in a Result-returning function. This is the canonical SDK boundary pattern (per styleguide 'Catch SDK exceptions at the boundary only').",
|
|
)
|
|
|
|
# 2. FastAPI _api_* handler with broad catch (per app_controller pattern)
|
|
# Phase 7 FR5: tightened to require the except body to actually raise
|
|
# HTTPException or return a Result. Without this check, ALL nested
|
|
# try/except inside `_api_*` handlers were classified BOUNDARY_FASTAPI
|
|
# even when the body only logged to stderr (the very pattern Phase 6
|
|
# was supposed to eliminate per error_handling.md:530 "logging is NOT a drain").
|
|
if self._is_fastapi_handler() and exc_name in ("Exception", "BaseException", ""):
|
|
if self._except_body_drains_via_http_exception_or_result(handler):
|
|
return (
|
|
"BOUNDARY_FASTAPI",
|
|
"Compliant: FastAPI _api_* handler catches and converts to HTTPException at the framework boundary. This is the FastAPI-idiomatic pattern.",
|
|
)
|
|
# Re-classify: the `_api_*` name heuristic does NOT justify
|
|
# classifying logging-only or Result-returning as BOUNDARY_FASTAPI.
|
|
# The user's principle (error_handling.md:530) requires a real drain.
|
|
if is_silent or self._except_body_has_logging(body):
|
|
return (
|
|
"INTERNAL_SILENT_SWALLOW",
|
|
f"Strict-violation (Phase 7 FR5): _api_* handler's except body only "
|
|
f"logs/prints (no HTTPException raise, no Result return). Per "
|
|
f"error_handling.md:530 'logging is NOT a drain'. Migrate to "
|
|
f"Result[T] propagation with a real drain point.",
|
|
)
|
|
if self._returns_result(body):
|
|
return (
|
|
"INTERNAL_COMPLIANT",
|
|
"Compliant: _api_* handler's except body returns Result[data=..., errors=[...]] (Phase 6+ canonical pattern).",
|
|
)
|
|
# Default to internal_silent_swallow (logging-only fallback) for
|
|
# safety; the heuristic tightened check already excluded the
|
|
# logging-only case via the `_except_body_has_logging` branch above.
|
|
return (
|
|
"INTERNAL_SILENT_SWALLOW",
|
|
"Strict-violation (Phase 7 FR5): _api_* handler's except body does not "
|
|
"raise HTTPException or return Result. Per error_handling.md:530, "
|
|
"logging is NOT a drain. Migrate to Result[T] propagation.",
|
|
)
|
|
|
|
# 3. Inside a *_result function with broad catch (likely SDK boundary)
|
|
if is_in_result_func and exc_name in ("Exception", "BaseException", ""):
|
|
if is_third_party:
|
|
return (
|
|
"BOUNDARY_SDK",
|
|
f"Compliant: broad `except {exc_name or 'Exception'}` in a *_result function that calls a third-party SDK. Consider narrowing the exception type or converting to ErrorInfo for a cleaner Result contract.",
|
|
)
|
|
return (
|
|
"INTERNAL_BROAD_CATCH",
|
|
f"Violation: `except {exc_name or 'Exception'}` in a Result-returning function without ErrorInfo conversion. Narrow the exception type, or convert to ErrorInfo in a Result (this is the canonical pattern in the 3 refactored files).",
|
|
)
|
|
|
|
# 4. Third-party SDK call
|
|
if is_third_party and (exc_name in THIRD_PARTY_EXCEPTIONS or "Error" in exc_name or "Exception" in exc_name or handler_module in THIRD_PARTY_SDK_MODULES):
|
|
return (
|
|
"BOUNDARY_SDK",
|
|
f"Compliant: third-party exception {exc_name} caught at the SDK boundary.",
|
|
)
|
|
|
|
# 5. Stdlib I/O exception
|
|
if is_third_party and exc_name in STDLIB_IO_EXCEPTIONS:
|
|
return (
|
|
"BOUNDARY_IO",
|
|
f"Compliant: stdlib I/O exception {exc_name} caught at a third-party call site.",
|
|
)
|
|
|
|
# 6. Re-raise
|
|
if re_raises:
|
|
if is_third_party:
|
|
return (
|
|
"BOUNDARY_SDK",
|
|
f"Compliant: re-raise after {exc_name} preserves the SDK boundary; consider ErrorInfo conversion for a Result-based API.",
|
|
)
|
|
return (
|
|
"INTERNAL_RETHROW",
|
|
"Suspicious: re-raising without conversion is a control-flow smell. Consider whether the caller should handle this via a Result instead.",
|
|
)
|
|
|
|
# 7. Silent swallow
|
|
if is_silent:
|
|
return (
|
|
"INTERNAL_SILENT_SWALLOW",
|
|
"Violation: silent swallow (`except ...: pass`) hides failures. Either let it propagate, return Result(data=NIL_T, errors=[...]), or document the intentional swallow with a comment-free `assert` for the precondition.",
|
|
)
|
|
|
|
# 8. Broad catch (Exception/BaseException)
|
|
if exc_name in ("Exception", "BaseException") or exc_name == "":
|
|
return (
|
|
"INTERNAL_BROAD_CATCH",
|
|
f"Violation: broad `except {exc_name or 'Exception'}` catches more than intended. Narrow the exception type, or convert to ErrorInfo in a Result.",
|
|
)
|
|
|
|
# 9. try/except + return None in Optional[T] function
|
|
if returns_none and returns_optional:
|
|
return (
|
|
"INTERNAL_OPTIONAL_RETURN",
|
|
f"Violation: `except {exc_name}: return None` in a function that returns Optional[T] violates the convention. Replace with `Result[T]` and return `Result(data=NIL_T, errors=[ErrorInfo(kind=..., message=...)])`.",
|
|
)
|
|
|
|
# 10. Stdlib I/O exception in our own code
|
|
if exc_name in STDLIB_IO_EXCEPTIONS and not is_third_party:
|
|
return (
|
|
"INTERNAL_COMPLIANT",
|
|
f"Compliant: stdlib I/O exception {exc_name} caught in our own code is acceptable (per convention, file/network errors are converted to ErrorInfo).",
|
|
)
|
|
|
|
# 11-17. Heuristics added by result_migration_review_pass_20260617
|
|
# These cover the 7 most common compliant patterns the review pass found.
|
|
# Each heuristic inspects the try body + except body together.
|
|
compliant = self._try_compliant_pattern(try_node, handler, exc_name)
|
|
if compliant is not None:
|
|
return compliant
|
|
|
|
return (
|
|
"UNCLEAR",
|
|
f"Manual review: catches {exc_name}; not obviously boundary or violation. Check whether the except site is converting to ErrorInfo (good) or hiding the error (bad).",
|
|
)
|
|
|
|
def _has_call_with_attr(self, stmts: list[ast.stmt], attr_name: str) -> bool:
|
|
"""True if any statement contains a call to `.attr_name(...)` (e.g. list.index, dict.get)."""
|
|
for s in stmts:
|
|
for node in ast.walk(s):
|
|
if isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute) and node.func.attr == attr_name:
|
|
return True
|
|
return False
|
|
|
|
def _has_keyword_true_call(self, stmts: list[ast.stmt], attr_name: str, kw_name: str) -> bool:
|
|
"""True if any statement contains a call `.attr_name(..., kw_name=True)`."""
|
|
for s in stmts:
|
|
for node in ast.walk(s):
|
|
if isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute) and node.func.attr == attr_name:
|
|
for kw in node.keywords:
|
|
if kw.arg == kw_name and isinstance(kw.value, ast.Constant) and kw.value.value is True:
|
|
return True
|
|
return False
|
|
|
|
def _has_print_call(self, stmts: list[ast.stmt]) -> bool:
|
|
"""True if any statement is an `Expr(Call(Name('print'), ...))`."""
|
|
for s in stmts:
|
|
if isinstance(s, ast.Expr) and isinstance(s.value, ast.Call):
|
|
f = s.value.func
|
|
if isinstance(f, ast.Name) and f.id == "print":
|
|
return True
|
|
return False
|
|
|
|
def _has_import_stmt(self, stmts: list[ast.stmt]) -> bool:
|
|
"""True if any statement is an `Import` or `ImportFrom`."""
|
|
for s in stmts:
|
|
if isinstance(s, (ast.Import, ast.ImportFrom)):
|
|
return True
|
|
return False
|
|
|
|
def _try_compliant_pattern(self, try_node: ast.Try, handler: ast.ExceptHandler, exc_name: str) -> tuple[str, str] | None:
|
|
"""Detect one of the 7 common compliant patterns found by the review pass.
|
|
|
|
Returns (category, hint) if the pattern is compliant, else None.
|
|
"""
|
|
try_body = try_node.body
|
|
except_body = handler.body
|
|
exc_set = {e.strip() for e in exc_name.replace("(", "").replace(")", "").split(",") if e.strip()}
|
|
|
|
# 11. list.index(x) with ValueError fallback to default index
|
|
if exc_set & {"ValueError"} and self._has_call_with_attr(try_body, "index") and len(except_body) > 0:
|
|
return (
|
|
"INTERNAL_COMPLIANT",
|
|
f"Compliant: `try: list.index(x); except ({', '.join(sorted(exc_set))}): ...` is the canonical combo-box fallback pattern (per result_migration_review_pass_20260617).",
|
|
)
|
|
|
|
# 12. dict[x] or get_capabilities(...) with KeyError fallback to default
|
|
if exc_set == {"KeyError"} and len(except_body) > 0 and len(try_body) > 0:
|
|
return (
|
|
"INTERNAL_COMPLIANT",
|
|
f"Compliant: `try: <lookup>; except KeyError: ...` is the canonical lookup-miss-with-default pattern (per result_migration_review_pass_20260617).",
|
|
)
|
|
|
|
# 13. datetime.fromisoformat(s) with ValueError: None
|
|
if exc_set == {"ValueError"} and self._has_call_with_attr(try_body, "fromisoformat"):
|
|
return (
|
|
"INTERNAL_COMPLIANT",
|
|
f"Compliant: `try: datetime.fromisoformat(s); except ValueError: ...` is the canonical lenient-deserialization pattern (per result_migration_review_pass_20260617).",
|
|
)
|
|
|
|
# 14. Path.resolve(strict=True) with (OSError, ValueError) fallback
|
|
if exc_set == {"OSError", "ValueError"} and self._has_keyword_true_call(try_body, "resolve", "strict"):
|
|
return (
|
|
"INTERNAL_COMPLIANT",
|
|
f"Compliant: `try: Path(p).resolve(strict=True); except (OSError, ValueError): ...` is the canonical graceful-path-resolution pattern (per result_migration_review_pass_20260617).",
|
|
)
|
|
|
|
# 15. Path.relative_to with ValueError: pass / return False
|
|
if exc_set == {"ValueError"} and self._has_call_with_attr(try_body, "relative_to"):
|
|
return (
|
|
"INTERNAL_COMPLIANT",
|
|
f"Compliant: `try: rp.relative_to(base); except ValueError: ...` is the canonical subpath-check pattern (per result_migration_review_pass_20260617).",
|
|
)
|
|
|
|
# 16. asyncio.get_running_loop() with RuntimeError: asyncio.run(...)
|
|
if exc_set == {"RuntimeError"} and self._has_call_with_attr(try_body, "get_running_loop") and self._has_call_with_attr(except_body, "run"):
|
|
return (
|
|
"INTERNAL_COMPLIANT",
|
|
f"Compliant: `try: get_running_loop(); except RuntimeError: asyncio.run(...)` is the canonical sync/async bridge pattern (per result_migration_review_pass_20260617).",
|
|
)
|
|
|
|
# 17. import with (ImportError, ModuleNotFoundError, AttributeError) + fallback stub
|
|
if exc_set & {"ImportError", "ModuleNotFoundError", "AttributeError"} and self._has_import_stmt(try_body) and len(except_body) > 0:
|
|
return (
|
|
"INTERNAL_COMPLIANT",
|
|
f"Compliant: `try: import ...; except ({', '.join(sorted(exc_set))}): <stub>` is the canonical graceful-degradation pattern (per result_migration_review_pass_20260617).",
|
|
)
|
|
|
|
# 18. JSON parse with (json.JSONDecodeError, KeyError) and print() for CLI-style input
|
|
if "JSONDecodeError" in exc_name and self._has_call_with_attr(try_body, "loads") and self._has_print_call(except_body):
|
|
return (
|
|
"INTERNAL_COMPLIANT",
|
|
f"Compliant: `try: json.loads(...); except json.JSONDecodeError: print(...)` is the canonical CLI-style JSON input parser pattern (per result_migration_review_pass_20260617).",
|
|
)
|
|
if exc_set == {"KeyError"} and self._has_call_with_attr(try_body, "loads") and self._has_print_call(except_body):
|
|
return (
|
|
"INTERNAL_COMPLIANT",
|
|
f"Compliant: `try: json.loads(...); except KeyError: print(...)` is the canonical CLI-style JSON input parser pattern (per result_migration_review_pass_20260617).",
|
|
)
|
|
|
|
# Heuristic #19 REMOVED in Phase 12.1: narrow except + log (sys.stderr.write / logging.*)
|
|
# was classified as INTERNAL_COMPLIANT, but per error_handling.md Broad-Except Distinction
|
|
# table and the user's principle (2026-06-17) "logging is NOT a drain", a catch+log
|
|
# site is INTERNAL_SILENT_SWALLOW (a violation). Result[T] must propagate to a true
|
|
# drain point. See conductor/tracks/result_migration_small_files_20260617/plan.md §12.1.
|
|
|
|
# D. Drain-point patterns (per error_handling.md "Drain Points" section, Phase 12.3)
|
|
# A drain point is a place where Result[T] propagation TERMINATES visibly to the
|
|
# user or via intentional app action. Log-only / silent-fallback sites are NOT drain
|
|
# points; they are INTERNAL_SILENT_SWALLOW (a violation). Drain-point checks MUST run
|
|
# BEFORE the narrow+log reclassification below because a site may contain BOTH a log
|
|
# call AND a drain point (e.g., sys.stderr.write + sys.exit).
|
|
if len(except_body) > 0:
|
|
# D.1 HTTP error response (BaseHTTPRequestHandler subclass)
|
|
if self._has_send_response_call(except_body):
|
|
return (
|
|
"INTERNAL_COMPLIANT",
|
|
f"Compliant: drain point (HTTP error response). `try: ...; except ({', '.join(sorted(exc_set))}): self.send_response(...)` terminates Result[T] propagation with a visible HTTP error response (per error_handling.md Drain Points §Pattern 1, Phase 12.3).",
|
|
)
|
|
# D.2 GUI error display (imgui.open_popup / imgui.text call)
|
|
if self._has_imgui_error_display(except_body):
|
|
return (
|
|
"INTERNAL_COMPLIANT",
|
|
f"Compliant: drain point (GUI error display). `try: ...; except ({', '.join(sorted(exc_set))}): imgui.open_popup(...)` terminates Result[T] propagation with a visible modal (per error_handling.md Drain Points §Pattern 2, Phase 12.3).",
|
|
)
|
|
# D.2b WebSocket error response (websocket.send)
|
|
if self._has_websocket_send(except_body):
|
|
return (
|
|
"INTERNAL_COMPLIANT",
|
|
f"Compliant: drain point (WebSocket error response). `try: ...; except ({', '.join(sorted(exc_set))}): await websocket.send(...)` terminates Result[T] propagation with a visible client error message (per error_handling.md Drain Points §Pattern 2 extension, Phase 12.3).",
|
|
)
|
|
# D.3 Intentional app termination (sys.exit)
|
|
if self._has_sys_exit_call(except_body):
|
|
return (
|
|
"INTERNAL_COMPLIANT",
|
|
f"Compliant: drain point (intentional app termination). `try: ...; except ({', '.join(sorted(exc_set))}): sys.exit(...)` terminates Result[T] propagation via process termination (per error_handling.md Drain Points §Pattern 3, Phase 12.3).",
|
|
)
|
|
# D.4 Telemetry emission (telemetry.emit_*)
|
|
if self._has_telemetry_emit_call(except_body):
|
|
return (
|
|
"INTERNAL_COMPLIANT",
|
|
f"Compliant: drain point (telemetry emission). `try: ...; except ({', '.join(sorted(exc_set))}): telemetry.emit_*(...)` terminates Result[T] propagation by sending to monitoring (per error_handling.md Drain Points §Pattern 4, Phase 12.3).",
|
|
)
|
|
# D.5 Bounded retry (for attempt in range(N): ...; return None)
|
|
if self._has_bounded_retry(except_body):
|
|
return (
|
|
"INTERNAL_COMPLIANT",
|
|
f"Compliant: drain point (bounded retry). `try: ...; except ({', '.join(sorted(exc_set))}): for attempt in range(N): ...; return None` terminates Result[T] propagation via bounded retry followed by visible failure (per error_handling.md Drain Points §Pattern 5, Phase 12.3).",
|
|
)
|
|
|
|
# Explicit reclassification (Phase 12.1): narrow except + log
|
|
# (sys.stderr.write / logging.*) WITHOUT a drain point is INTERNAL_SILENT_SWALLOW (a violation).
|
|
# This runs AFTER drain-point checks because a site may contain BOTH a log call
|
|
# AND a drain point (e.g., sys.stderr.write + sys.exit); the drain point wins.
|
|
if len(except_body) > 0 and self._has_log_call(except_body) and not exc_set & {"Exception", "BaseException", ""}:
|
|
return (
|
|
"INTERNAL_SILENT_SWALLOW",
|
|
f"Violation: narrow except + log (sys.stderr.write / logging.*) only. Per error_handling.md and the user's principle (2026-06-17): 'logging is NOT a drain'. The error context is lost. Use Result[T] propagation to a true drain point. (per result_migration_small_files_20260617 Phase 12.1)",
|
|
)
|
|
|
|
# 20. ImGui scope cleanup guard (narrow except + imgui.end_* call)
|
|
if exc_set & {"TypeError", "AttributeError", "RuntimeError"} and self._has_imgui_end_call(except_body):
|
|
return (
|
|
"INTERNAL_COMPLIANT",
|
|
f"Compliant: `try: ...; except ({', '.join(sorted(exc_set))}): imgui.end_*()` is the canonical ImGui scope cleanup guard (per result_migration_review_pass_20260617).",
|
|
)
|
|
|
|
# 21. MCP tool boundary (broad except Exception + return string in str-returning function)
|
|
enclosing_func = self._current_func_node()
|
|
if enclosing_func is not None and enclosing_func.returns is not None and ast.unparse(enclosing_func.returns) == "str" and exc_set & {"Exception", "BaseException"} and self._has_string_return(except_body):
|
|
return (
|
|
"INTERNAL_COMPLIANT",
|
|
f"Compliant: `try: ...; except Exception: return <string>` in a `-> str` tool function is the canonical MCP tool boundary pattern (per result_migration_review_pass_20260617).",
|
|
)
|
|
|
|
# A. Result-returning recovery (canonical Result pattern) — Phase 11.2
|
|
if len(except_body) > 0 and self._returns_result(except_body):
|
|
return (
|
|
"INTERNAL_COMPLIANT",
|
|
f"Compliant: `try: ...; except ({', '.join(sorted(exc_set))}): return Result(data=..., errors=[...])` is the canonical Result-recovery pattern. The function-name-not-ending-in-`_result` is a smell (rename to `xxx_result`); the pattern itself is the data-oriented convention. (per result_migration_small_files_20260617 Phase 11.2)",
|
|
)
|
|
|
|
# B. Lazy-loading sentinel fallback — Phase 12.1 (result_migration_gui_2_20260619)
|
|
# Per error_handling.md:625-690 (Re-Raise Patterns) and the lazy-loading
|
|
# pattern guidance, when a module is loaded lazily (e.g. numpy, tkinter
|
|
# at first attribute access) and the import or attribute access fails,
|
|
# falling back to a documented sentinel class instance with an
|
|
# `available: bool = False` flag is the canonical graceful-degradation
|
|
# pattern. The sentinel is NOT a silent swallow: the UI can detect the
|
|
# stub via the `available` flag and offer an alternative code path
|
|
# (e.g. ImGui file dialog when tkinter.filedialog is unavailable).
|
|
# This is analogous to the nil-sentinel dataclass (Pattern 1 in
|
|
# error_handling.md). The function-name heuristic (`_resolve`/`_load`/
|
|
# `_get`/`_try_load`) is the standard lazy-loader naming convention.
|
|
# The except body must NOT re-raise; the recovery is via assignment
|
|
# to `self.<attr>` (directly or via a nested try/except).
|
|
except_body_re_raises = any(
|
|
isinstance(s, ast.Raise) and s.exc is None
|
|
for s in ast.walk(ast.Module(body=except_body, type_ignores=[]))
|
|
)
|
|
if (
|
|
self._current_func_name() in LAZY_LOADER_METHOD_NAMES
|
|
and not except_body_re_raises
|
|
and exc_set & {"AttributeError", "ImportError", "ModuleNotFoundError"}
|
|
and self._has_self_attr_assign(except_body)
|
|
):
|
|
return (
|
|
"INTERNAL_COMPLIANT",
|
|
f"Compliant: lazy-loading sentinel fallback. `try: ...; except ({', '.join(sorted(exc_set))}): self.<attr> = <sentinel>()` in `{self._current_func_name()}` is the canonical graceful-degradation pattern. The sentinel class exposes an `available: bool = False` flag (or similar) so the UI can detect the stub and offer an alternative path. Per error_handling.md:625-690 and Phase 12.1 result_migration_gui_2_20260619.",
|
|
)
|
|
|
|
# E. Narrow + structured error carrier (Phase 9 redo, 2026-06-20, Tier 1 directive)
|
|
# Per the TIER1_REVIEW: distinguishes "return ErrorInfo(...)" or
|
|
# "err_item["error"] = True" (structured error carriers = COMPLIANT) from
|
|
# "args = {}" or "body = exc.response.text" (empty defaults = sliming).
|
|
# The empty-default pattern is explicitly NOT a drain per the styleguide
|
|
# (error_handling.md:528-531): "the original error context is lost; the
|
|
# caller cannot distinguish success from failure".
|
|
#
|
|
# This heuristic recognizes ONLY narrow except bodies (not Exception or
|
|
# BaseException). Broad catches with structured carriers are still
|
|
# violations (use BOUNDARY_CONVERSION via _returns_result or ErrorInfo).
|
|
if exc_set and not exc_set & {"Exception", "BaseException", ""}:
|
|
if self._has_errorinfo_return(except_body):
|
|
return (
|
|
"INTERNAL_COMPLIANT",
|
|
f"Compliant: narrow except + structured error carrier. `try: ...; except ({', '.join(sorted(exc_set))}): return ErrorInfo(...)` is a true drain: the structured ErrorInfo carries the original exception via `original=e` and is returned to the caller. Per error_handling.md:462-540 and TIER1_REVIEW_phase9_dilemma_20260620.",
|
|
)
|
|
if self._has_dict_error_true_assign(except_body):
|
|
return (
|
|
"INTERNAL_COMPLIANT",
|
|
f"Compliant: narrow except + structured error carrier (in-band flag). `try: ...; except ({', '.join(sorted(exc_set))}): <item>[\"error\"] = True` is a true drain: the dict's `error` flag is the structured carrier (the caller checks the flag). Per error_handling.md:462-540 and TIER1_REVIEW_phase9_dilemma_20260620. NOTE: this heuristic does NOT verify the caller reads the flag — that is a Tier-2 per-site decision documented in the track notes.",
|
|
)
|
|
|
|
return None
|
|
|
|
def _has_string_return(self, stmts: list[ast.stmt]) -> bool:
|
|
"""True if any statement is a `return <f-string or string constant>`."""
|
|
for s in stmts:
|
|
if isinstance(s, ast.Return) and s.value is not None:
|
|
if isinstance(s.value, ast.Constant) and isinstance(s.value.value, str):
|
|
return True
|
|
if isinstance(s.value, ast.JoinedStr):
|
|
return True
|
|
return False
|
|
|
|
def _has_errorinfo_return(self, stmts: list[ast.stmt]) -> bool:
|
|
"""True if any statement is a `return ErrorInfo(...)` call (structured error carrier).
|
|
|
|
Used by Heuristic E (narrow + structured error carrier) to recognize the
|
|
pattern where the except body directly returns a structured ErrorInfo. This
|
|
is a true drain: the structured error is the function's contract, not a
|
|
lost-default fallback. (per result_migration_baseline_cleanup_20260620 Phase 9 redo)
|
|
|
|
Distinguishes from `_returns_result` (Heuristic A): that checks for
|
|
`return Result(...)` (full data + side-channel errors). `_has_errorinfo_return`
|
|
checks for `return ErrorInfo(...)` (legacy function that returns the
|
|
structured error directly).
|
|
"""
|
|
for s in stmts:
|
|
if not isinstance(s, ast.Return) or s.value is None:
|
|
continue
|
|
if not isinstance(s.value, ast.Call):
|
|
continue
|
|
f = s.value.func
|
|
if isinstance(f, ast.Name) and f.id == "ErrorInfo":
|
|
return True
|
|
return False
|
|
|
|
def _has_dict_error_true_assign(self, stmts: list[ast.stmt]) -> bool:
|
|
"""True if any statement assigns `True` to a dict subscript whose key is "error".
|
|
|
|
Detects the `err_item["error"] = True` in-band error flag pattern.
|
|
Used by Heuristic E (narrow + structured error carrier) when the caller
|
|
reads the flag downstream. The audit does NOT verify caller reads the
|
|
flag — that is a Tier-2 per-site decision documented in the track notes.
|
|
|
|
Per the styleguide (error_handling.md:528-531) the empty-default pattern
|
|
is NOT a drain. This heuristic explicitly does NOT match `args = {}` or
|
|
`body = ""` (assignment to a bare variable without a dict subscript key
|
|
of "error"). The distinction matters: `args = {}` is sliming (Tier 1
|
|
2026-06-20 directive); `err_item["error"] = True` is a structured carrier.
|
|
"""
|
|
for s in stmts:
|
|
for node in ast.walk(s):
|
|
if isinstance(node, ast.Assign) and len(node.targets) == 1:
|
|
target = node.targets[0]
|
|
if isinstance(target, ast.Subscript):
|
|
slc = target.slice
|
|
if isinstance(slc, ast.Constant) and slc.value == "error":
|
|
# Verify the value is `True`
|
|
if isinstance(node.value, ast.Constant) and node.value.value is True:
|
|
return True
|
|
return False
|
|
|
|
def _has_simple_return(self, stmts: list[ast.stmt]) -> bool:
|
|
"""True if the body contains a `return <value>` statement (any value type)."""
|
|
for s in stmts:
|
|
if isinstance(s, ast.Return) and s.value is not None:
|
|
return True
|
|
return False
|
|
|
|
def _returns_result(self, stmts: list[ast.stmt]) -> bool:
|
|
"""True if the body returns a `Result(...)` call (canonical Result-recovery pattern).
|
|
|
|
Detects `return Result(data=..., errors=[...])` — the canonical
|
|
data-oriented error handling pattern. Matches any call to `Result(...)`
|
|
with at least a `data=` keyword argument. The pattern is compliant
|
|
when used in a try/except: it satisfies the convention that every
|
|
try/except site that can fail must return `Result[T]` with structured
|
|
`ErrorInfo`. The function-name-not-ending-in-`_result` is a smell
|
|
(the function should be renamed to `xxx_result`), but the pattern
|
|
itself is compliant (heuristic A from Phase 11.2).
|
|
"""
|
|
for s in stmts:
|
|
if not isinstance(s, ast.Return) or s.value is None:
|
|
continue
|
|
if not isinstance(s.value, ast.Call):
|
|
continue
|
|
f = s.value.func
|
|
if isinstance(f, ast.Name) and f.id == "Result":
|
|
return True
|
|
if isinstance(f, ast.Attribute) and f.attr == "Result":
|
|
return True
|
|
return False
|
|
|
|
def _uses_exception_inline(self, stmts: list[ast.stmt]) -> bool:
|
|
"""True if the body uses `e`/`exc` in a non-pass way (Name reference)."""
|
|
for s in stmts:
|
|
if isinstance(s, ast.Pass):
|
|
continue
|
|
for node in ast.walk(s):
|
|
if isinstance(node, ast.Name) and node.id in ("e", "exc"):
|
|
return True
|
|
if isinstance(node, ast.Attribute):
|
|
base = node.value
|
|
while isinstance(base, ast.Attribute):
|
|
base = base.value
|
|
if isinstance(base, ast.Name) and base.id in ("e", "exc"):
|
|
return True
|
|
if isinstance(node, ast.FormattedValue):
|
|
val = node.value
|
|
while isinstance(val, ast.Attribute):
|
|
val = val.value
|
|
if isinstance(val, ast.Name) and val.id in ("e", "exc"):
|
|
return True
|
|
return False
|
|
|
|
def _has_assign_fallback(self, stmts: list[ast.stmt]) -> bool:
|
|
"""True if the body contains `var = <value>` (an assignment, not a return)."""
|
|
for s in stmts:
|
|
if isinstance(s, ast.Assign):
|
|
return True
|
|
return False
|
|
|
|
def _uses_traceback(self, stmts: list[ast.stmt]) -> bool:
|
|
"""True if the body uses `traceback.format_exc()` or `traceback.print_exc()`."""
|
|
for s in stmts:
|
|
for node in ast.walk(s):
|
|
if isinstance(node, ast.Call):
|
|
f = node.func
|
|
if isinstance(f, ast.Attribute):
|
|
if isinstance(f.value, ast.Name) and f.value.id == "traceback":
|
|
if f.attr in ("format_exc", "print_exc", "format_exception", "print_exception"):
|
|
return True
|
|
return False
|
|
|
|
def _has_log_call(self, stmts: list[ast.stmt]) -> bool:
|
|
"""True if any statement is a log call (sys.stderr.write, logging.*, print)."""
|
|
for s in stmts:
|
|
for node in ast.walk(s):
|
|
if isinstance(node, ast.Call):
|
|
f = node.func
|
|
if isinstance(f, ast.Attribute) and f.attr in ("write", "error", "warning", "info", "debug", "exception"):
|
|
return True
|
|
if isinstance(f, ast.Name) and f.id == "print":
|
|
return True
|
|
return False
|
|
|
|
def _has_send_response_call(self, stmts: list[ast.stmt]) -> bool:
|
|
"""True if any statement calls self.send_response(...). Drain point D.1 (HTTP error response)."""
|
|
for stmt in stmts:
|
|
for node in ast.walk(stmt):
|
|
if isinstance(node, ast.Call):
|
|
f = node.func
|
|
if isinstance(f, ast.Attribute) and isinstance(f.attr, str) and f.attr == "send_response":
|
|
return True
|
|
return False
|
|
|
|
def _has_imgui_error_display(self, stmts: list[ast.stmt]) -> bool:
|
|
"""True if any statement opens an ImGui popup (drain point D.2 — GUI error display)."""
|
|
for stmt in stmts:
|
|
for node in ast.walk(stmt):
|
|
if isinstance(node, ast.Call):
|
|
f = node.func
|
|
if isinstance(f, ast.Attribute) and isinstance(f.attr, str):
|
|
if f.attr in ("open_popup", "popup", "modal"):
|
|
return True
|
|
return False
|
|
|
|
def _has_websocket_send(self, stmts: list[ast.stmt]) -> bool:
|
|
"""True if any statement calls websocket.send(...) or self.websocket.send(...). Drain point D.2b."""
|
|
for stmt in stmts:
|
|
for node in ast.walk(stmt):
|
|
if isinstance(node, ast.Call):
|
|
f = node.func
|
|
if isinstance(f, ast.Attribute) and isinstance(f.attr, str) and f.attr == "send":
|
|
return True
|
|
return False
|
|
|
|
def _has_sys_exit_call(self, stmts: list[ast.stmt]) -> bool:
|
|
"""True if any statement calls sys.exit(...). Drain point D.3 (intentional app termination)."""
|
|
for stmt in stmts:
|
|
for node in ast.walk(stmt):
|
|
if isinstance(node, ast.Call):
|
|
f = node.func
|
|
if isinstance(f, ast.Attribute) and isinstance(f.value, ast.Name) and f.value.id == "sys" and f.attr == "exit":
|
|
return True
|
|
return False
|
|
|
|
def _has_telemetry_emit_call(self, stmts: list[ast.stmt]) -> bool:
|
|
"""True if any statement calls telemetry.emit_*(...). Drain point D.4 (telemetry emission)."""
|
|
for stmt in stmts:
|
|
for node in ast.walk(stmt):
|
|
if isinstance(node, ast.Call):
|
|
f = node.func
|
|
if isinstance(f, ast.Attribute) and isinstance(f.attr, str) and f.attr.startswith("emit_"):
|
|
if isinstance(f.value, ast.Name) and f.value.id in ("telemetry", "metrics", "monitor"):
|
|
return True
|
|
return False
|
|
|
|
def _has_bounded_retry(self, stmts: list[ast.stmt]) -> bool:
|
|
"""True if a bounded retry is present in the enclosing function: `for attempt in range(N): try: ...; except: ...; return None`. Drain point D.5.
|
|
|
|
The bounded-retry pattern requires the SURROUNDING CONTEXT (not just the
|
|
except body): the enclosing function (or block) must contain
|
|
`for ... in range(N):` containing this try/except, AND a `return None`
|
|
AFTER the for loop. The exception handler body's only job is to log/sleep;
|
|
the real termination is the for-loop's exhaustion + the trailing return None.
|
|
"""
|
|
enclosing_func = self._current_func_node()
|
|
if enclosing_func is None:
|
|
return False
|
|
has_for_range_with_try = False
|
|
has_return_none_after = False
|
|
for_loop_seen = False
|
|
for node in ast.walk(enclosing_func):
|
|
if isinstance(node, ast.For):
|
|
if isinstance(node.iter, ast.Call) and isinstance(node.iter.func, ast.Name) and node.iter.func.id == "range":
|
|
for_loop_seen = True
|
|
for child in ast.walk(node):
|
|
if isinstance(child, ast.Try):
|
|
has_for_range_with_try = True
|
|
break
|
|
elif for_loop_seen and isinstance(node, ast.Return):
|
|
if node.value is None:
|
|
has_return_none_after = True
|
|
elif isinstance(node.value, ast.Constant) and node.value.value is None:
|
|
has_return_none_after = True
|
|
return has_for_range_with_try and has_return_none_after
|
|
|
|
def _has_self_attr_assign(self, stmts: list[ast.stmt]) -> bool:
|
|
"""True if any statement (recursively) assigns to a `self.<attr>` attribute.
|
|
|
|
Used by the lazy-loading sentinel fallback heuristic (Phase 12.1) to
|
|
detect the canonical graceful-degradation pattern: the except body
|
|
falls back to a sentinel class instance via `self._cached = _Stub()`
|
|
either directly OR via a nested try/except (e.g., an outer try that
|
|
catches AttributeError and a nested try that ultimately falls back
|
|
to the stub). The recursive walk handles both cases:
|
|
|
|
- Direct: `try: getattr(...); except AttributeError: self._cached = _Stub()`
|
|
- Nested: `try: getattr(...); except AttributeError: try: importlib...; except: self._cached = _Stub()`
|
|
|
|
Per the styleguide (error_handling.md:625-690), this is the canonical
|
|
graceful-degradation pattern for lazy-loading modules that may not
|
|
be present on every Python install. The sentinel's `available: bool = False`
|
|
flag (or similar) lets the UI detect the stub and offer an alternative
|
|
path (e.g., ImGui file dialog when tkinter.filedialog is unavailable).
|
|
"""
|
|
for s in stmts:
|
|
for node in ast.walk(s):
|
|
if isinstance(node, ast.Assign):
|
|
for target in node.targets:
|
|
if (
|
|
isinstance(target, ast.Attribute)
|
|
and isinstance(target.value, ast.Name)
|
|
and target.value.id == "self"
|
|
):
|
|
return True
|
|
return False
|
|
|
|
def _has_imgui_end_call(self, stmts: list[ast.stmt]) -> bool:
|
|
"""True if any statement is a call to an imgui.end_* function."""
|
|
for s in stmts:
|
|
for node in ast.walk(s):
|
|
if isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute) and node.func.attr.startswith("end_"):
|
|
return True
|
|
return False
|
|
|
|
def _enclosing_if_is_none_guard(self) -> bool:
|
|
"""True if the current raise is inside an `if <var> is None:` block (validation pattern)."""
|
|
# The _func_stack holds the function context; we don't track the if-stack.
|
|
# Walk the AST of the current function and check if the raise is inside
|
|
# an `if <var> is None:` block.
|
|
enclosing_func = self._current_func_node()
|
|
if enclosing_func is None:
|
|
return False
|
|
for node in ast.walk(enclosing_func):
|
|
if node is enclosing_func:
|
|
continue
|
|
if isinstance(node, ast.If):
|
|
test = node.test
|
|
if isinstance(test, ast.Compare) and isinstance(test.ops[0], ast.Is) and any(isinstance(c, ast.Constant) and c.value is None for c in test.comparators):
|
|
for child in ast.walk(node):
|
|
if isinstance(child, ast.Raise) and child is not node:
|
|
return True
|
|
return False
|
|
|
|
def _function_body_is_just_this_raise(self, node: ast.Raise) -> bool:
|
|
"""True if the function body is just this raise (abstract method pattern)."""
|
|
enclosing_func = self._current_func_node()
|
|
if enclosing_func is None:
|
|
return False
|
|
body = enclosing_func.body
|
|
if len(body) != 1:
|
|
return False
|
|
return body[0] is node
|
|
|
|
def _extract_raise_name(self, node: ast.expr) -> str:
|
|
"""Extract the exception class name from a raise expression.
|
|
|
|
For `raise HTTPException(...)` this returns 'HTTPException' (just the name).
|
|
For `raise ValueError('msg')` this returns 'ValueError'.
|
|
For `raise self.errors[0]` this returns the full expression (won't match).
|
|
"""
|
|
if isinstance(node, ast.Call):
|
|
return ast.unparse(node.func)
|
|
if isinstance(node, ast.Name):
|
|
return node.id
|
|
if isinstance(node, ast.Attribute):
|
|
return ast.unparse(node)
|
|
return ast.unparse(node)
|
|
|
|
def _classify_raise(self, node: ast.Raise) -> tuple[str, str]:
|
|
exc_str = ast.unparse(node) if node.exc else "raise"
|
|
exc_name = self._extract_raise_name(node.exc) if node.exc else ""
|
|
|
|
# Bare re-raise
|
|
if node.exc is None:
|
|
return (
|
|
"INTERNAL_RETHROW",
|
|
"Suspicious: re-raising without conversion. Consider propagating via Result instead.",
|
|
)
|
|
|
|
# FastAPI HTTPException in an _api_* handler
|
|
exc_short = exc_name.split(".")[-1]
|
|
if exc_short in {"HTTPException"} and self._is_fastapi_handler():
|
|
return (
|
|
"BOUNDARY_FASTAPI",
|
|
"Compliant: FastAPI HTTPException in _api_* handler. This is the framework-idiomatic way to signal HTTP errors; FastAPI converts it to a JSON response at the framework level.",
|
|
)
|
|
|
|
# Raising ErrorInfo
|
|
if "ErrorInfo" in exc_name:
|
|
return (
|
|
"INTERNAL_RETHROW",
|
|
"Violation: raising ErrorInfo as an exception defeats the data-oriented pattern. Return Result(data=NIL_T, errors=[ErrorInfo(...)]) instead.",
|
|
)
|
|
|
|
# Programmer error (in __init__ or as assert)
|
|
if exc_short in PROGRAMMER_ERROR_EXCEPTIONS:
|
|
func_name = self._current_func_name()
|
|
if func_name == "__init__":
|
|
return (
|
|
"INTERNAL_PROGRAMMER_RAISE",
|
|
f"Compliant: `{exc_short}` in `__init__` is the canonical constructor-precondition pattern (per styleguide 'When to Use This Convention': constructors that fail with programmer errors use assert/raise).",
|
|
)
|
|
if exc_short in {"AssertionError", "ValueError"} or "assert " in exc_str:
|
|
return (
|
|
"INTERNAL_PROGRAMMER_RAISE",
|
|
f"Compliant: `{exc_short}` for an impossible state / precondition check. The styleguide reserves `raise` for programmer errors.",
|
|
)
|
|
# Heuristic added by result_migration_review_pass_20260617:
|
|
# NotImplementedError as the entire function body = abstract method pattern.
|
|
if exc_short == "NotImplementedError" and self._function_body_is_just_this_raise(node):
|
|
return (
|
|
"INTERNAL_PROGRAMMER_RAISE",
|
|
f"Compliant: `raise NotImplementedError()` as the entire function body is the canonical abstract-method pattern (per result_migration_review_pass_20260617).",
|
|
)
|
|
|
|
# Heuristic added by result_migration_review_pass_20260617:
|
|
# `if <var> is None: raise ImportError(...)` = validation raise (precondition check).
|
|
if exc_short in {"ImportError", "RuntimeError", "ValueError", "KeyError"} and self._enclosing_if_is_none_guard():
|
|
return (
|
|
"INTERNAL_PROGRAMMER_RAISE",
|
|
f"Compliant: `raise {exc_short}` inside `if <var> is None:` is the canonical validation/precondition-check pattern (per result_migration_review_pass_20260617).",
|
|
)
|
|
|
|
# Heuristic added by result_migration_gui_2_20260619 (Phase 11):
|
|
# Bare `raise AttributeError(...)` or `raise NameError(...)` in a dunder
|
|
# method (__getattr__/__getattribute__/__setattr__/__delattr__) is the
|
|
# canonical Python dunder-method programmer-error pattern. Per the
|
|
# styleguide "Re-Raise Patterns" (error_handling.md lines 625-690), bare
|
|
# raises are reserved for programmer errors / impossible states /
|
|
# canonical dunder method behaviors. The Python data-model contract for
|
|
# these dunders explicitly raises AttributeError when an attribute does
|
|
# not exist or is not settable.
|
|
if exc_short in {"AttributeError", "NameError"} and self._current_func_name() in {"__getattr__", "__getattribute__", "__setattr__", "__delattr__"}:
|
|
return (
|
|
"INTERNAL_PROGRAMMER_RAISE",
|
|
f"Compliant: `raise {exc_short}` in `{self._current_func_name()}` is the canonical dunder-method programmer-error pattern (per styleguide 'Re-Raise Patterns' and Phase 11 result_migration_gui_2_20260619).",
|
|
)
|
|
|
|
return (
|
|
"INTERNAL_RETHROW",
|
|
f"Review: `raise {exc_name}` in internal code. Confirm this is a programmer error (assertion) and not a runtime failure (which should be a Result).",
|
|
)
|
|
|
|
def _snippet(self, node: ast.AST) -> str:
|
|
return ast.unparse(node).replace("\n", " ").strip()[:120]
|
|
|
|
def visit_FunctionDef(self, node: ast.FunctionDef) -> None:
|
|
self._func_stack.append(node)
|
|
try:
|
|
self.generic_visit(node)
|
|
finally:
|
|
self._func_stack.pop()
|
|
|
|
def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None:
|
|
self._func_stack.append(node)
|
|
try:
|
|
self.generic_visit(node)
|
|
finally:
|
|
self._func_stack.pop()
|
|
|
|
def _add_finding(self, kind: str, line: int, snippet: str, category: str, hint: str) -> None:
|
|
self.report.findings.append(Finding(
|
|
filename=self.filename,
|
|
line=line,
|
|
kind=kind,
|
|
context=self._current_func_name(),
|
|
snippet=snippet,
|
|
category=category,
|
|
hint=hint,
|
|
in_refactored_baseline=self._in_baseline,
|
|
))
|
|
|
|
def visit_Try(self, node: ast.Try) -> None:
|
|
self._try_stack.append(node)
|
|
try:
|
|
# bare try/finally (no except) = canonical cleanup pattern
|
|
if not node.handlers and node.finalbody:
|
|
self._add_finding(
|
|
"TRY",
|
|
node.lineno,
|
|
self._snippet(node),
|
|
"INTERNAL_COMPLIANT",
|
|
"Compliant: bare try/finally is the canonical cleanup pattern (analog of `goto defer`).",
|
|
)
|
|
for child in node.body:
|
|
self.visit(child)
|
|
for handler in node.handlers:
|
|
category, hint = self._classify_except(handler, node)
|
|
self._add_finding("EXCEPT", handler.lineno, self._snippet(handler), category, hint)
|
|
for child in handler.body:
|
|
self.visit(child)
|
|
for child in node.orelse:
|
|
self.visit(child)
|
|
for child in node.finalbody:
|
|
self.visit(child)
|
|
finally:
|
|
self._try_stack.pop()
|
|
|
|
def visit_TryStar(self, node: ast.TryStar) -> None:
|
|
self.visit_Try(node) # type: ignore[arg-type]
|
|
|
|
def visit_Raise(self, node: ast.Raise) -> None:
|
|
category, hint = self._classify_raise(node)
|
|
self._add_finding("RAISE", node.lineno, self._snippet(node), category, hint)
|
|
self.generic_visit(node)
|
|
|
|
|
|
def audit_file(filepath: Path) -> FileReport:
|
|
try:
|
|
source = filepath.read_text(encoding="utf-8")
|
|
except (OSError, UnicodeDecodeError) as e:
|
|
report = FileReport(filename=str(filepath))
|
|
report.has_error = True
|
|
report.error_message = f"could not read: {e}"
|
|
return report
|
|
try:
|
|
tree = ast.parse(source, filename=str(filepath))
|
|
except SyntaxError as e:
|
|
report = FileReport(filename=str(filepath))
|
|
report.has_error = True
|
|
report.error_message = f"syntax error: {e}"
|
|
return report
|
|
visitor = ExceptionVisitor(str(filepath))
|
|
visitor.visit(tree)
|
|
return visitor.report
|
|
|
|
|
|
def find_python_files(root: Path, exclude_artifacts: bool = True) -> list[Path]:
|
|
if not root.exists():
|
|
raise FileNotFoundError(f"Source directory not found: {root}")
|
|
files = sorted(p for p in root.rglob("*.py") if "__pycache__" not in p.parts)
|
|
if exclude_artifacts:
|
|
files = [p for p in files if "artifacts" not in p.parts]
|
|
return files
|
|
|
|
|
|
def render_human(reports: list[FileReport], files_scanned: int, top: int, verbose: bool) -> str:
|
|
lines: list[str] = []
|
|
total_findings = sum(len(r.findings) for r in reports)
|
|
total_violations = sum(r.violation_count for r in reports)
|
|
total_compliant = sum(r.compliant_count for r in reports)
|
|
total_unclear = sum(r.unclear_count for r in reports)
|
|
total_suspicious = sum(r.suspicious_count for r in reports)
|
|
try_count = sum(1 for r in reports for f in r.findings if f.kind == "TRY")
|
|
except_count = sum(1 for r in reports for f in r.findings if f.kind == "EXCEPT")
|
|
finally_count = sum(1 for r in reports for f in r.findings if f.kind == "FINALLY")
|
|
raise_count = sum(1 for r in reports for f in r.findings if f.kind == "RAISE")
|
|
|
|
# Separate baseline vs migration target
|
|
baseline_findings = [f for r in reports for f in r.findings if f.in_refactored_baseline]
|
|
migration_findings = [f for r in reports for f in r.findings if not f.in_refactored_baseline]
|
|
baseline_violations = sum(1 for f in baseline_findings if f.category in VIOLATION_CATEGORIES)
|
|
migration_violations = sum(1 for f in migration_findings if f.category in VIOLATION_CATEGORIES)
|
|
|
|
lines.append("=== Exception Handling Audit (Data-Oriented Convention) ===\n")
|
|
lines.append(f"Files scanned: {files_scanned}")
|
|
lines.append(f"Files with findings: {len(reports)}")
|
|
lines.append(f"Total sites: {total_findings}")
|
|
lines.append(f" try: {try_count}")
|
|
lines.append(f" except: {except_count}")
|
|
lines.append(f" raise: {raise_count}")
|
|
lines.append("")
|
|
lines.append(f"Compliant sites: {total_compliant}")
|
|
lines.append(f"Suspicious sites: {total_suspicious}")
|
|
lines.append(f"Violation sites: {total_violations}")
|
|
lines.append(f"Unclear (review): {total_unclear}")
|
|
lines.append("")
|
|
lines.append("--- Baseline (refactored files: mcp_client, ai_client, rag_engine) ---")
|
|
lines.append(f" Sites: {len(baseline_findings)}, violations: {baseline_violations}")
|
|
lines.append("--- Migration target (all other src/ files) ---")
|
|
lines.append(f" Sites: {len(migration_findings)}, violations: {migration_violations}")
|
|
lines.append("")
|
|
|
|
cat_counts = Counter(f.category for r in reports for f in r.findings)
|
|
lines.append("By category:")
|
|
for cat, n in cat_counts.most_common():
|
|
mark = ""
|
|
if cat in VIOLATION_CATEGORIES:
|
|
mark = " (VIOLATION)"
|
|
elif cat == "INTERNAL_RETHROW":
|
|
mark = " (suspicious)"
|
|
elif cat in COMPLIANT_CATEGORIES:
|
|
mark = " (compliant)"
|
|
elif cat == "UNCLEAR":
|
|
mark = " (review)"
|
|
lines.append(f" {cat:30s} {n:4d}{mark}")
|
|
lines.append("")
|
|
|
|
lines.append(f"--- Top {top} files by violation count (migration target only) ---")
|
|
ranked = sorted(
|
|
[r for r in reports if not r.is_refactored_baseline],
|
|
key=lambda r: (-r.violation_count, -len(r.findings), r.filename),
|
|
)[:top]
|
|
for r in ranked:
|
|
if r.violation_count == 0 and r.unclear_count == 0 and r.suspicious_count == 0:
|
|
continue
|
|
lines.append(f"\n{r.filename} (V={r.violation_count}, S={r.suspicious_count}, ?={r.unclear_count}, C={r.compliant_count}, total={len(r.findings)})")
|
|
if verbose:
|
|
for f in r.findings:
|
|
if f.category in VIOLATION_CATEGORIES or f.category in ("UNCLEAR", "INTERNAL_RETHROW"):
|
|
lines.append(f" L{f.line:4d} [{f.kind:7s}] {f.category:28s} in {f.context}")
|
|
lines.append(f" {f.snippet[:100]}")
|
|
lines.append(f" hint: {f.hint}")
|
|
else:
|
|
by_cat = Counter(f.category for f in r.findings if f.category in VIOLATION_CATEGORIES or f.category in ("UNCLEAR", "INTERNAL_RETHROW"))
|
|
for cat, n in by_cat.most_common():
|
|
lines.append(f" {cat:30s} {n}")
|
|
|
|
return "\n".join(lines) + "\n"
|
|
|
|
|
|
def render_json(reports: list[FileReport], files_scanned: int, top: int, verbose: bool) -> str:
|
|
total_findings = sum(len(r.findings) for r in reports)
|
|
total_violations = sum(r.violation_count for r in reports)
|
|
total_compliant = sum(r.compliant_count for r in reports)
|
|
total_unclear = sum(r.unclear_count for r in reports)
|
|
total_suspicious = sum(r.suspicious_count for r in reports)
|
|
baseline_findings = [f for r in reports for f in r.findings if f.in_refactored_baseline]
|
|
migration_findings = [f for r in reports for f in r.findings if not f.in_refactored_baseline]
|
|
baseline_violations = sum(1 for f in baseline_findings if f.category in VIOLATION_CATEGORIES)
|
|
migration_violations = sum(1 for f in migration_findings if f.category in VIOLATION_CATEGORIES)
|
|
|
|
output = {
|
|
"refactored_baseline_files": sorted(REFACTORED_BASELINE_FILES),
|
|
"files_scanned": files_scanned,
|
|
"files_with_findings": len(reports),
|
|
"total_sites": total_findings,
|
|
"by_kind": dict(Counter(f.kind for r in reports for f in r.findings)),
|
|
"compliant_sites": total_compliant,
|
|
"suspicious_sites": total_suspicious,
|
|
"violation_sites": total_violations,
|
|
"unclear_sites": total_unclear,
|
|
"by_category": dict(Counter(f.category for r in reports for f in r.findings).most_common()),
|
|
"violations_by_category": dict(Counter(
|
|
f.category for r in reports for f in r.findings if f.category in VIOLATION_CATEGORIES
|
|
).most_common()),
|
|
"baseline": {
|
|
"file_count": len([f for f in REFACTORED_BASELINE_FILES]),
|
|
"sites": len(baseline_findings),
|
|
"violations": baseline_violations,
|
|
},
|
|
"migration_target": {
|
|
"sites": len(migration_findings),
|
|
"violations": migration_violations,
|
|
},
|
|
"files": [
|
|
{
|
|
"filename": r.filename,
|
|
"in_refactored_baseline": r.is_refactored_baseline,
|
|
"violation_count": r.violation_count,
|
|
"compliant_count": r.compliant_count,
|
|
"suspicious_count": r.suspicious_count,
|
|
"unclear_count": r.unclear_count,
|
|
"has_error": r.has_error,
|
|
"error_message": r.error_message,
|
|
"findings": [
|
|
{
|
|
"line": f.line,
|
|
"kind": f.kind,
|
|
"context": f.context,
|
|
"category": f.category,
|
|
"snippet": f.snippet,
|
|
"hint": f.hint,
|
|
}
|
|
for f in r.findings
|
|
] if verbose else [
|
|
{
|
|
"line": f.line,
|
|
"kind": f.kind,
|
|
"context": f.context,
|
|
"category": f.category,
|
|
}
|
|
for f in r.findings
|
|
],
|
|
}
|
|
for r in sorted(reports, key=lambda r: (-r.violation_count, -r.suspicious_count, r.filename))[:top if not verbose else len(reports)]
|
|
],
|
|
}
|
|
return json.dumps(output, indent=2)
|
|
|
|
|
|
def render_summary(reports: list[FileReport], files_scanned: int) -> str:
|
|
"""Per-file summary table. Used for planning migration tracks.
|
|
|
|
Columns: file, total, V (violations), S (suspicious), ? (unclear), C (compliant).
|
|
Sorted by V+S descending so the highest-impact files are at the top.
|
|
"""
|
|
lines: list[str] = []
|
|
lines.append("=== Exception Handling Audit: Per-File Summary ===\n")
|
|
lines.append(f"Files scanned: {files_scanned}")
|
|
lines.append(f"Files with findings: {len(reports)}\n")
|
|
lines.append(f"{'file':<38} {'total':>6} {'V':>5} {'S':>5} {'?':>4} {'C':>5} baseline?")
|
|
lines.append("-" * 90)
|
|
for f in sorted(reports, key=lambda r: -(r.violation_count + r.suspicious_count)):
|
|
total = f.violation_count + f.suspicious_count + f.unclear_count + f.compliant_count
|
|
if total == 0:
|
|
continue
|
|
name = f.filename.replace("src/", "").replace("\\", "/")
|
|
base = "*BASELINE*" if f.is_refactored_baseline else ""
|
|
lines.append(f"{name:<38} {total:>6} {f.violation_count:>5} {f.suspicious_count:>5} {f.unclear_count:>4} {f.compliant_count:>5} {base}")
|
|
lines.append("-" * 90)
|
|
total_v = sum(r.violation_count for r in reports)
|
|
total_s = sum(r.suspicious_count for r in reports)
|
|
total_u = sum(r.unclear_count for r in reports)
|
|
total_c = sum(r.compliant_count for r in reports)
|
|
lines.append(f"{'TOTAL':<38} {total_v + total_s + total_u + total_c:>6} {total_v:>5} {total_s:>5} {total_u:>4} {total_c:>5}")
|
|
return "\n".join(lines) + "\n"
|
|
|
|
|
|
def render_by_size(reports: list[FileReport], files_scanned: int) -> str:
|
|
"""Group files by violation+suspicious count bucket for migration planning.
|
|
|
|
Buckets: small (<=5), medium (6-15), large (>=16). Plus the 3 refactored
|
|
baseline files as a separate bucket (the convention reference; remaining
|
|
gaps should be closed to make them pure compliant).
|
|
"""
|
|
lines: list[str] = []
|
|
lines.append("=== Exception Handling Audit: Files Grouped by Migration Effort ===\n")
|
|
lines.append(f"Files scanned: {files_scanned}")
|
|
lines.append(f"Files with findings: {len(reports)}\n")
|
|
|
|
baseline = [r for r in reports if r.is_refactored_baseline]
|
|
large = [r for r in reports if not r.is_refactored_baseline and r.violation_count + r.suspicious_count >= 16]
|
|
medium = [r for r in reports if not r.is_refactored_baseline and 6 <= r.violation_count + r.suspicious_count <= 15]
|
|
small = [r for r in reports if not r.is_refactored_baseline and r.violation_count + r.suspicious_count <= 5]
|
|
|
|
def _bucket(name: str, files: list[FileReport], note: str) -> None:
|
|
if not files:
|
|
return
|
|
v = sum(r.violation_count for r in files)
|
|
s = sum(r.suspicious_count for r in files)
|
|
u = sum(r.unclear_count for r in files)
|
|
c = sum(r.compliant_count for r in files)
|
|
total = v + s + u + c
|
|
lines.append(f"--- {name} ({len(files)} files, V+S={v+s}, V={v}, S={s}, ?={u}, C={c}, total={total}) ---")
|
|
if note:
|
|
lines.append(f" {note}")
|
|
for r in sorted(files, key=lambda x: -(x.violation_count + x.suspicious_count)):
|
|
name = r.filename.replace("src/", "").replace("\\", "/")
|
|
lines.append(f" {name:<36} V={r.violation_count:>3} S={r.suspicious_count:>2} ?={r.unclear_count:>2} C={r.compliant_count:>3} total={len(r.findings)}")
|
|
lines.append("")
|
|
|
|
_bucket(
|
|
"LARGE (>=16 V+S; dedicated track per file)",
|
|
large,
|
|
"Each file is too big for a batched track. 1 track per file; 2-3 days Tier 2 each.",
|
|
)
|
|
_bucket(
|
|
"MEDIUM (6-15 V+S; can group 2-3 files per track)",
|
|
medium,
|
|
"Each file is independent; can be batched in 1 track per group. 0.5-1 day Tier 2 each.",
|
|
)
|
|
_bucket(
|
|
"SMALL (<=5 V+S; batched in one 'small files' track)",
|
|
small,
|
|
"Each file is small enough for a single batched track. 0.5-1 day Tier 2 for the whole batch.",
|
|
)
|
|
_bucket(
|
|
"BASELINE (3 refactored files; the convention reference)",
|
|
baseline,
|
|
"These files ARE the convention. Remaining violations are gaps to close (deferred work from the parent track).",
|
|
)
|
|
return "\n".join(lines) + "\n"
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(
|
|
description=__doc__,
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
)
|
|
parser.add_argument("--src", default="src", help="Source directory to audit (default: src)")
|
|
parser.add_argument("--json", action="store_true", help="Output JSON instead of human-readable report")
|
|
parser.add_argument("--top", type=int, default=200, help="Show top N files by violation count (default: 200)")
|
|
parser.add_argument("--verbose", action="store_true", help="Show every site inline (default: top N summary)")
|
|
parser.add_argument("--include-tests", action="store_true", help="Also scan tests/ and scripts/")
|
|
parser.add_argument("--strict", action="store_true", help="Exit 1 if any violations are found (for CI use; the convention's CI gate)")
|
|
parser.add_argument("--ci", dest="strict", action="store_true", help="Alias for --strict (clearer name for CI scripts; e.g., pre-commit hooks)")
|
|
parser.add_argument("--include-baseline", action="store_true", help="Include the 3 refactored files in the violation count (default: exclude)")
|
|
parser.add_argument("--summary", action="store_true", help="Per-file summary table (for migration planning)")
|
|
parser.add_argument("--by-size", action="store_true", help="Group files by migration effort bucket (small/medium/large/baseline)")
|
|
parser.add_argument("--exclude", action="append", default=[], help="Additional path components to exclude (can repeat)")
|
|
args = parser.parse_args()
|
|
|
|
src = Path(args.src)
|
|
try:
|
|
files = find_python_files(src)
|
|
except FileNotFoundError as e:
|
|
print(f"ERROR: {e}", file=sys.stderr)
|
|
return 1
|
|
|
|
if args.include_tests:
|
|
for extra in ("tests", "scripts"):
|
|
p = Path(extra)
|
|
if p.exists():
|
|
files.extend(find_python_files(p))
|
|
|
|
if args.exclude:
|
|
files = [f for f in files if not any(ex in f.parts for ex in args.exclude)]
|
|
|
|
reports: list[FileReport] = [audit_file(f) for f in files]
|
|
reports = [r for r in reports if r.findings or r.has_error]
|
|
|
|
if args.json:
|
|
print(render_json(reports, len(files), args.top, args.verbose))
|
|
if args.include_baseline:
|
|
total_violations = sum(r.violation_count for r in reports)
|
|
else:
|
|
total_violations = sum(r.violation_count for r in reports if not r.is_refactored_baseline)
|
|
return 1 if (args.strict and total_violations > 0) else 0
|
|
|
|
if args.summary:
|
|
print(render_summary(reports, len(files)))
|
|
return 0
|
|
|
|
if args.by_size:
|
|
print(render_by_size(reports, len(files)))
|
|
return 0
|
|
|
|
print(render_human(reports, len(files), args.top, args.verbose))
|
|
|
|
if args.include_baseline:
|
|
total_violations = sum(r.violation_count for r in reports)
|
|
else:
|
|
total_violations = sum(r.violation_count for r in reports if not r.is_refactored_baseline)
|
|
if args.strict and total_violations > 0:
|
|
print(f"\nSTRICT MODE: {total_violations} violation(s) found; exiting 1.", file=sys.stderr)
|
|
return 1
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|