#!/usr/bin/env python3 """Audit try/except/finally/raise usage against the data-oriented error handling convention. This audit is INFORMATIONAL by default (exits 0) so developers can run it freely to see the current state. Pass `--strict` (or its alias `--ci`) to enable CI-gate mode (exits 1 on any violation). The 4-script enforcement set (see docs/AGENTS.md "Convention Enforcement") uses `--strict` mode for pre-commit hooks and CI. The convention (see conductor/code_styleguides/error_handling.md) requires: - SDK-boundary exceptions are caught and converted to ErrorInfo. - Internal code uses Result[T] (data + errors list), not Optional[T] + try/except. - except Exception is a code smell (broad catch without conversion). - `raise` is reserved for programmer errors (assert/raise for impossible states). - `try/finally` is the canonical cleanup pattern (like `goto defer`). - `raise` in __init__ is OK for "this constructor needs X" (programmer error). - FastAPI `raise HTTPException` in _api_* handlers is the FastAPI-idiomatic boundary; it's how the framework signals HTTP errors. The 3 fully-refactored files (mcp_client.py, ai_client.py, rag_engine.py) are the CONVENTION BASELINE. Everything outside them is the migration target. The script classifies every exception-handling site into one of: Category Convention status ---------------------------- ----------------------------------------- BOUNDARY_SDK Compliant (wraps third-party SDK or is in a *_result function returning Result) BOUNDARY_IO Compliant (wraps stdlib I/O that can raise) BOUNDARY_CONVERSION Compliant (catches + converts to ErrorInfo) BOUNDARY_FASTAPI Compliant (FastAPI HTTPException raise in _api_* handler; framework-idiomatic) INTERNAL_SILENT_SWALLOW Violation (except ...: pass or just logs) INTERNAL_BROAD_CATCH Violation (except Exception without conversion) INTERNAL_OPTIONAL_RETURN Violation (try/except + return None/Optional) INTERNAL_RETHROW Suspicious (try/except + raise; refactorable) INTERNAL_PROGRAMMER_RAISE Compliant (raise for impossible state in __init__/assert/precondition; not a violation) INTERNAL_COMPLIANT Compliant (try/finally cleanup pattern) UNCLEAR Manual review needed For each VIOLATION or SUSPICIOUS site, the script prints a 1-line hint at what the fix could look like (e.g., "return Result(data=NIL_T, errors=[...])"). Usage: uv run python scripts/audit_exception_handling.py # human report uv run python scripts/audit_exception_handling.py --json # JSON output uv run python scripts/audit_exception_handling.py --src src # source dir uv run python scripts/audit_exception_handling.py --top 20 # top N files uv run python scripts/audit_exception_handling.py --verbose # every site uv run python scripts/audit_exception_handling.py --strict # CI gate (exit 1 on violation) uv run python scripts/audit_exception_handling.py --ci # alias for --strict uv run python scripts/audit_exception_handling.py --summary # per-file summary table uv run python scripts/audit_exception_handling.py --by-size # group by migration effort Pre-commit / CI use (the convention's CI gate): uv run python scripts/audit_exception_handling.py --strict # Exits 1 on any violation. Use in pre-commit hooks and CI to enforce # the data-oriented error handling convention. Part of the 4-script # enforcement set (see docs/AGENTS.md "Convention Enforcement"). Output modes (mutually exclusive; --json / --summary / --by-size override the default human-readable report): --summary: per-file table sorted by V+S descending. Use this for "which files have the most violations" planning questions. --by-size: groups files into small/medium/large/baseline buckets. Use this for "how many migration tracks do I need" planning. (default): top-N files with per-site breakdown and 1-line hints. Exit codes: 0 - audit ran in informational mode (default; no violations fail the script) 1 - usage error, or --strict/--ci mode with violations found 2 - source directory not found """ from __future__ import annotations import argparse import ast import json import re import sys from collections import Counter from dataclasses import dataclass, field from pathlib import Path # The 3 files that were fully refactored to the convention by the # data_oriented_error_handling_20260606 track. Sites in these files are the # BASELINE; sites outside them are the MIGRATION TARGET. REFACTORED_BASELINE_FILES: frozenset[str] = frozenset({ "src/mcp_client.py", "src/ai_client.py", "src/rag_engine.py", }) # Third-party SDKs the convention recognizes as boundary callers. THIRD_PARTY_SDK_MODULES: frozenset[str] = frozenset({ "anthropic", "anthropic.types", "google", "google.generativeai", "google.genai", "google.api_core", "google.protobuf", "google.auth", "openai", "openai.types", "groq", "groq.types", "mistralai", "cohere", "chromadb", "sentence_transformers", "huggingface_hub", "transformers", "torch", "requests", "urllib3", "httpx", "aiohttp", "websockets", "fastapi", "uvicorn", "starlette", "psutil", "pydantic", "PIL", "cv2", "numpy", "tomli", "tomllib", "imgui_bundle", "dearpygui", "dearpygui.dearpygui", }) # Stdlib exceptions that almost always indicate a legitimate boundary wrap. STDLIB_IO_EXCEPTIONS: frozenset[str] = frozenset({ "OSError", "IOError", "FileNotFoundError", "FileExistsError", "PermissionError", "IsADirectoryError", "NotADirectoryError", "TimeoutError", "ConnectionError", "ConnectionRefusedError", "ConnectionResetError", "ConnectionAbortedError", "BrokenPipeError", "socket.timeout", "ssl.SSLError", "json.JSONDecodeError", "csv.Error", "sqlite3.Error", "sqlite3.IntegrityError", "sqlite3.OperationalError", "zipfile.BadZipFile", "xml.etree.ElementTree.ParseError", "subprocess.CalledProcessError", "subprocess.TimeoutExpired", }) # Third-party exception types commonly caught at the boundary. THIRD_PARTY_EXCEPTIONS: frozenset[str] = frozenset({ "anthropic.APIError", "anthropic.APIConnectionError", "anthropic.RateLimitError", "anthropic.AuthenticationError", "anthropic.BadRequestError", "anthropic.NotFoundError", "anthropic.PermissionDeniedError", "anthropic.UnprocessableEntityError", "google.api_core.exceptions.GoogleAPIError", "google.api_core.exceptions.ResourceExhausted", "google.api_core.exceptions.PermissionDenied", "google.api_core.exceptions.NotFound", "google.api_core.exceptions.InvalidArgument", "google.api_core.exceptions.DeadlineExceeded", "google.api_core.exceptions.ServiceUnavailable", "google.api_core.exceptions.Aborted", "openai.OpenAIError", "openai.APIError", "openai.APIConnectionError", "openai.RateLimitError", "openai.AuthenticationError", "openai.BadRequestError", "openai.NotFoundError", "openai.PermissionDeniedError", "requests.RequestException", "requests.ConnectionError", "requests.Timeout", "requests.HTTPError", "requests.exceptions.SSLError", "httpx.HTTPError", "httpx.RequestError", "httpx.TimeoutException", "chromadb.errors.ChromaError", "pydantic.ValidationError", }) # FastAPI boundary exception - idiomatic in _api_* handlers. FASTAPI_EXCEPTIONS: frozenset[str] = frozenset({ "fastapi.HTTPException", "HTTPException", }) # Programmer-error exceptions that are OK to raise (per the styleguide's # "When to Use This Convention" section: "Constructors (__init__) that fail # with programmer errors (use assert or raise for these)"). PROGRAMMER_ERROR_EXCEPTIONS: frozenset[str] = frozenset({ "AssertionError", "ValueError", "KeyError", "IndexError", "TypeError", "AttributeError", "NameError", "RuntimeError", "NotImplementedError", }) # Categories that are considered violations VIOLATION_CATEGORIES: frozenset[str] = frozenset({ "INTERNAL_SILENT_SWALLOW", "INTERNAL_BROAD_CATCH", "INTERNAL_OPTIONAL_RETURN", }) # Categories that are considered compliant (canonical) COMPLIANT_CATEGORIES: frozenset[str] = frozenset({ "BOUNDARY_SDK", "BOUNDARY_IO", "BOUNDARY_CONVERSION", "BOUNDARY_FASTAPI", "INTERNAL_PROGRAMMER_RAISE", "INTERNAL_COMPLIANT", }) @dataclass(frozen=True) class Finding: filename: str line: int kind: str context: str snippet: str category: str hint: str in_refactored_baseline: bool @dataclass class FileReport: filename: str findings: list[Finding] = field(default_factory=list) has_error: bool = False error_message: str = "" @property def violation_count(self) -> int: return sum(1 for f in self.findings if f.category in VIOLATION_CATEGORIES) @property def compliant_count(self) -> int: return sum(1 for f in self.findings if f.category in COMPLIANT_CATEGORIES) @property def unclear_count(self) -> int: return sum(1 for f in self.findings if f.category == "UNCLEAR") @property def suspicious_count(self) -> int: return sum(1 for f in self.findings if f.category == "INTERNAL_RETHROW") @property def is_refactored_baseline(self) -> bool: return any(f.in_refactored_baseline for f in self.findings) class ExceptionVisitor(ast.NodeVisitor): """Walks the AST and classifies every try/except/finally/raise node.""" def __init__(self, filename: str) -> None: self.filename = filename self.report = FileReport(filename=filename) self._func_stack: list[ast.FunctionDef | ast.AsyncFunctionDef] = [] self._try_stack: list[ast.Try | ast.TryStar] = [] # Normalize the filename for the baseline check rel = filename.replace("\\", "/") self._in_baseline = rel in {f.replace("\\", "/") for f in REFACTORED_BASELINE_FILES} def _current_func_name(self) -> str: if not self._func_stack: return "" return self._func_stack[-1].name def _current_func_node(self) -> ast.FunctionDef | ast.AsyncFunctionDef | None: return self._func_stack[-1] if self._func_stack else None def _is_third_party_call(self, body: list[ast.stmt]) -> bool: """Does this body make a call into a known third-party SDK?""" for node in ast.walk(ast.Module(body=body, type_ignores=[])): if isinstance(node, ast.Call): func_str = ast.unparse(node.func) top = func_str.split(".")[0] if top in THIRD_PARTY_SDK_MODULES: return True parts = func_str.split(".") for i in range(1, len(parts) + 1): prefix = ".".join(parts[:i]) if prefix in THIRD_PARTY_SDK_MODULES: return True return False def _is_fastapi_handler(self) -> bool: """Is the current function a FastAPI _api_* handler?""" name = self._current_func_name() return name.startswith("_api_") or name.startswith("api_") def _enclosing_returns_result(self) -> bool: """Does any enclosing function return a Result-like type?""" for func in self._func_stack: if func.returns is None: continue ret_str = ast.unparse(func.returns) if "Result[" in ret_str or ret_str == "Result": return True return False def _classify_except(self, handler: ast.ExceptHandler, try_node: ast.Try) -> tuple[str, str]: exc_type = handler.type exc_name = ast.unparse(exc_type) if exc_type is not None else "Exception" body = handler.body handler_module = ast.unparse(exc_type).split(".")[0] if exc_type else "" # Empty body or pass = silent swallow is_silent = ( len(body) == 0 or all(isinstance(s, ast.Pass) for s in body) ) # Re-raise detection re_raises = any( isinstance(s, ast.Raise) and s.exc is None for s in ast.walk(ast.Module(body=body, type_ignores=[])) ) # ErrorInfo creation creates_errorinfo = any( isinstance(s, ast.Call) and "ErrorInfo" in ast.unparse(s.func) for s in ast.walk(ast.Module(body=body, type_ignores=[])) ) # Returns None returns_none = any( isinstance(s, ast.Return) and (s.value is None or ast.unparse(s.value) == "None") for s in body ) # Enclosing function returns Optional[T]? enclosing_func = self._current_func_node() returns_optional = False if enclosing_func is not None and enclosing_func.returns is not None: ret_str = ast.unparse(enclosing_func.returns) if "Optional" in ret_str or " | None" in ret_str: returns_optional = True is_third_party = self._is_third_party_call(try_node.body) is_in_result_func = self._enclosing_returns_result() # ----- Classification logic ----- # 0. Heuristic A: Result-returning recovery — the canonical data-oriented pattern. # If the except body returns `Result(data=..., errors=[ErrorInfo(...)])`, # the function is following the convention. Classify as INTERNAL_COMPLIANT # BEFORE the BOUNDARY_CONVERSION check (which also fires for ErrorInfo creation). if self._returns_result(body): return ( "INTERNAL_COMPLIANT", "Compliant: `try: ...; except: return Result(data=..., errors=[...])` is the canonical Result-recovery pattern. The convention requires Result[T] for try/except sites that can fail; this pattern satisfies the requirement. The function-name-not-ending-in-`_result` is a smell (rename to `xxx_result`); the pattern itself is compliant. (per result_migration_small_files_20260617 Phase 11.2, Heuristic A)", ) # 1. ErrorInfo conversion = canonical boundary pattern if creates_errorinfo: return ( "BOUNDARY_CONVERSION", "Compliant: catch + ErrorInfo conversion in a Result-returning function. This is the canonical SDK boundary pattern (per styleguide 'Catch SDK exceptions at the boundary only').", ) # 2. FastAPI _api_* handler with broad catch (per app_controller pattern) if self._is_fastapi_handler() and exc_name in ("Exception", "BaseException", ""): return ( "BOUNDARY_FASTAPI", "Compliant: FastAPI _api_* handler catches and converts to HTTPException at the framework boundary. This is the FastAPI-idiomatic pattern.", ) # 3. Inside a *_result function with broad catch (likely SDK boundary) if is_in_result_func and exc_name in ("Exception", "BaseException", ""): if is_third_party: return ( "BOUNDARY_SDK", f"Compliant: broad `except {exc_name or 'Exception'}` in a *_result function that calls a third-party SDK. Consider narrowing the exception type or converting to ErrorInfo for a cleaner Result contract.", ) return ( "INTERNAL_BROAD_CATCH", f"Violation: `except {exc_name or 'Exception'}` in a Result-returning function without ErrorInfo conversion. Narrow the exception type, or convert to ErrorInfo in a Result (this is the canonical pattern in the 3 refactored files).", ) # 4. Third-party SDK call if is_third_party and (exc_name in THIRD_PARTY_EXCEPTIONS or "Error" in exc_name or "Exception" in exc_name or handler_module in THIRD_PARTY_SDK_MODULES): return ( "BOUNDARY_SDK", f"Compliant: third-party exception {exc_name} caught at the SDK boundary.", ) # 5. Stdlib I/O exception if is_third_party and exc_name in STDLIB_IO_EXCEPTIONS: return ( "BOUNDARY_IO", f"Compliant: stdlib I/O exception {exc_name} caught at a third-party call site.", ) # 6. Re-raise if re_raises: if is_third_party: return ( "BOUNDARY_SDK", f"Compliant: re-raise after {exc_name} preserves the SDK boundary; consider ErrorInfo conversion for a Result-based API.", ) return ( "INTERNAL_RETHROW", "Suspicious: re-raising without conversion is a control-flow smell. Consider whether the caller should handle this via a Result instead.", ) # 7. Silent swallow if is_silent: return ( "INTERNAL_SILENT_SWALLOW", "Violation: silent swallow (`except ...: pass`) hides failures. Either let it propagate, return Result(data=NIL_T, errors=[...]), or document the intentional swallow with a comment-free `assert` for the precondition.", ) # 8. Broad catch (Exception/BaseException) if exc_name in ("Exception", "BaseException") or exc_name == "": return ( "INTERNAL_BROAD_CATCH", f"Violation: broad `except {exc_name or 'Exception'}` catches more than intended. Narrow the exception type, or convert to ErrorInfo in a Result.", ) # 9. try/except + return None in Optional[T] function if returns_none and returns_optional: return ( "INTERNAL_OPTIONAL_RETURN", f"Violation: `except {exc_name}: return None` in a function that returns Optional[T] violates the convention. Replace with `Result[T]` and return `Result(data=NIL_T, errors=[ErrorInfo(kind=..., message=...)])`.", ) # 10. Stdlib I/O exception in our own code if exc_name in STDLIB_IO_EXCEPTIONS and not is_third_party: return ( "INTERNAL_COMPLIANT", f"Compliant: stdlib I/O exception {exc_name} caught in our own code is acceptable (per convention, file/network errors are converted to ErrorInfo).", ) # 11-17. Heuristics added by result_migration_review_pass_20260617 # These cover the 7 most common compliant patterns the review pass found. # Each heuristic inspects the try body + except body together. compliant = self._try_compliant_pattern(try_node, handler, exc_name) if compliant is not None: return compliant return ( "UNCLEAR", f"Manual review: catches {exc_name}; not obviously boundary or violation. Check whether the except site is converting to ErrorInfo (good) or hiding the error (bad).", ) def _has_call_with_attr(self, stmts: list[ast.stmt], attr_name: str) -> bool: """True if any statement contains a call to `.attr_name(...)` (e.g. list.index, dict.get).""" for s in stmts: for node in ast.walk(s): if isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute) and node.func.attr == attr_name: return True return False def _has_keyword_true_call(self, stmts: list[ast.stmt], attr_name: str, kw_name: str) -> bool: """True if any statement contains a call `.attr_name(..., kw_name=True)`.""" for s in stmts: for node in ast.walk(s): if isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute) and node.func.attr == attr_name: for kw in node.keywords: if kw.arg == kw_name and isinstance(kw.value, ast.Constant) and kw.value.value is True: return True return False def _has_print_call(self, stmts: list[ast.stmt]) -> bool: """True if any statement is an `Expr(Call(Name('print'), ...))`.""" for s in stmts: if isinstance(s, ast.Expr) and isinstance(s.value, ast.Call): f = s.value.func if isinstance(f, ast.Name) and f.id == "print": return True return False def _has_import_stmt(self, stmts: list[ast.stmt]) -> bool: """True if any statement is an `Import` or `ImportFrom`.""" for s in stmts: if isinstance(s, (ast.Import, ast.ImportFrom)): return True return False def _try_compliant_pattern(self, try_node: ast.Try, handler: ast.ExceptHandler, exc_name: str) -> tuple[str, str] | None: """Detect one of the 7 common compliant patterns found by the review pass. Returns (category, hint) if the pattern is compliant, else None. """ try_body = try_node.body except_body = handler.body exc_set = {e.strip() for e in exc_name.replace("(", "").replace(")", "").split(",") if e.strip()} # 11. list.index(x) with ValueError fallback to default index if exc_set & {"ValueError"} and self._has_call_with_attr(try_body, "index") and len(except_body) > 0: return ( "INTERNAL_COMPLIANT", f"Compliant: `try: list.index(x); except ({', '.join(sorted(exc_set))}): ...` is the canonical combo-box fallback pattern (per result_migration_review_pass_20260617).", ) # 12. dict[x] or get_capabilities(...) with KeyError fallback to default if exc_set == {"KeyError"} and len(except_body) > 0 and len(try_body) > 0: return ( "INTERNAL_COMPLIANT", f"Compliant: `try: ; except KeyError: ...` is the canonical lookup-miss-with-default pattern (per result_migration_review_pass_20260617).", ) # 13. datetime.fromisoformat(s) with ValueError: None if exc_set == {"ValueError"} and self._has_call_with_attr(try_body, "fromisoformat"): return ( "INTERNAL_COMPLIANT", f"Compliant: `try: datetime.fromisoformat(s); except ValueError: ...` is the canonical lenient-deserialization pattern (per result_migration_review_pass_20260617).", ) # 14. Path.resolve(strict=True) with (OSError, ValueError) fallback if exc_set == {"OSError", "ValueError"} and self._has_keyword_true_call(try_body, "resolve", "strict"): return ( "INTERNAL_COMPLIANT", f"Compliant: `try: Path(p).resolve(strict=True); except (OSError, ValueError): ...` is the canonical graceful-path-resolution pattern (per result_migration_review_pass_20260617).", ) # 15. Path.relative_to with ValueError: pass / return False if exc_set == {"ValueError"} and self._has_call_with_attr(try_body, "relative_to"): return ( "INTERNAL_COMPLIANT", f"Compliant: `try: rp.relative_to(base); except ValueError: ...` is the canonical subpath-check pattern (per result_migration_review_pass_20260617).", ) # 16. asyncio.get_running_loop() with RuntimeError: asyncio.run(...) if exc_set == {"RuntimeError"} and self._has_call_with_attr(try_body, "get_running_loop") and self._has_call_with_attr(except_body, "run"): return ( "INTERNAL_COMPLIANT", f"Compliant: `try: get_running_loop(); except RuntimeError: asyncio.run(...)` is the canonical sync/async bridge pattern (per result_migration_review_pass_20260617).", ) # 17. import with (ImportError, ModuleNotFoundError, AttributeError) + fallback stub if exc_set & {"ImportError", "ModuleNotFoundError", "AttributeError"} and self._has_import_stmt(try_body) and len(except_body) > 0: return ( "INTERNAL_COMPLIANT", f"Compliant: `try: import ...; except ({', '.join(sorted(exc_set))}): ` is the canonical graceful-degradation pattern (per result_migration_review_pass_20260617).", ) # 18. JSON parse with (json.JSONDecodeError, KeyError) and print() for CLI-style input if "JSONDecodeError" in exc_name and self._has_call_with_attr(try_body, "loads") and self._has_print_call(except_body): return ( "INTERNAL_COMPLIANT", f"Compliant: `try: json.loads(...); except json.JSONDecodeError: print(...)` is the canonical CLI-style JSON input parser pattern (per result_migration_review_pass_20260617).", ) if exc_set == {"KeyError"} and self._has_call_with_attr(try_body, "loads") and self._has_print_call(except_body): return ( "INTERNAL_COMPLIANT", f"Compliant: `try: json.loads(...); except KeyError: print(...)` is the canonical CLI-style JSON input parser pattern (per result_migration_review_pass_20260617).", ) # Heuristic #19 REMOVED in Phase 12.1: narrow except + log (sys.stderr.write / logging.*) # was classified as INTERNAL_COMPLIANT, but per error_handling.md Broad-Except Distinction # table and the user's principle (2026-06-17) "logging is NOT a drain", a catch+log # site is INTERNAL_SILENT_SWALLOW (a violation). Result[T] must propagate to a true # drain point. See conductor/tracks/result_migration_small_files_20260617/plan.md §12.1. # D. Drain-point patterns (per error_handling.md "Drain Points" section, Phase 12.3) # A drain point is a place where Result[T] propagation TERMINATES visibly to the # user or via intentional app action. Log-only / silent-fallback sites are NOT drain # points; they are INTERNAL_SILENT_SWALLOW (a violation). Drain-point checks MUST run # BEFORE the narrow+log reclassification below because a site may contain BOTH a log # call AND a drain point (e.g., sys.stderr.write + sys.exit). if len(except_body) > 0: # D.1 HTTP error response (BaseHTTPRequestHandler subclass) if self._has_send_response_call(except_body): return ( "INTERNAL_COMPLIANT", f"Compliant: drain point (HTTP error response). `try: ...; except ({', '.join(sorted(exc_set))}): self.send_response(...)` terminates Result[T] propagation with a visible HTTP error response (per error_handling.md Drain Points §Pattern 1, Phase 12.3).", ) # D.2 GUI error display (imgui.open_popup / imgui.text call) if self._has_imgui_error_display(except_body): return ( "INTERNAL_COMPLIANT", f"Compliant: drain point (GUI error display). `try: ...; except ({', '.join(sorted(exc_set))}): imgui.open_popup(...)` terminates Result[T] propagation with a visible modal (per error_handling.md Drain Points §Pattern 2, Phase 12.3).", ) # D.2b WebSocket error response (websocket.send) if self._has_websocket_send(except_body): return ( "INTERNAL_COMPLIANT", f"Compliant: drain point (WebSocket error response). `try: ...; except ({', '.join(sorted(exc_set))}): await websocket.send(...)` terminates Result[T] propagation with a visible client error message (per error_handling.md Drain Points §Pattern 2 extension, Phase 12.3).", ) # D.3 Intentional app termination (sys.exit) if self._has_sys_exit_call(except_body): return ( "INTERNAL_COMPLIANT", f"Compliant: drain point (intentional app termination). `try: ...; except ({', '.join(sorted(exc_set))}): sys.exit(...)` terminates Result[T] propagation via process termination (per error_handling.md Drain Points §Pattern 3, Phase 12.3).", ) # D.4 Telemetry emission (telemetry.emit_*) if self._has_telemetry_emit_call(except_body): return ( "INTERNAL_COMPLIANT", f"Compliant: drain point (telemetry emission). `try: ...; except ({', '.join(sorted(exc_set))}): telemetry.emit_*(...)` terminates Result[T] propagation by sending to monitoring (per error_handling.md Drain Points §Pattern 4, Phase 12.3).", ) # D.5 Bounded retry (for attempt in range(N): ...; return None) if self._has_bounded_retry(except_body): return ( "INTERNAL_COMPLIANT", f"Compliant: drain point (bounded retry). `try: ...; except ({', '.join(sorted(exc_set))}): for attempt in range(N): ...; return None` terminates Result[T] propagation via bounded retry followed by visible failure (per error_handling.md Drain Points §Pattern 5, Phase 12.3).", ) # Explicit reclassification (Phase 12.1): narrow except + log # (sys.stderr.write / logging.*) WITHOUT a drain point is INTERNAL_SILENT_SWALLOW (a violation). # This runs AFTER drain-point checks because a site may contain BOTH a log call # AND a drain point (e.g., sys.stderr.write + sys.exit); the drain point wins. if len(except_body) > 0 and self._has_log_call(except_body) and not exc_set & {"Exception", "BaseException", ""}: return ( "INTERNAL_SILENT_SWALLOW", f"Violation: narrow except + log (sys.stderr.write / logging.*) only. Per error_handling.md and the user's principle (2026-06-17): 'logging is NOT a drain'. The error context is lost. Use Result[T] propagation to a true drain point. (per result_migration_small_files_20260617 Phase 12.1)", ) # 20. ImGui scope cleanup guard (narrow except + imgui.end_* call) if exc_set & {"TypeError", "AttributeError", "RuntimeError"} and self._has_imgui_end_call(except_body): return ( "INTERNAL_COMPLIANT", f"Compliant: `try: ...; except ({', '.join(sorted(exc_set))}): imgui.end_*()` is the canonical ImGui scope cleanup guard (per result_migration_review_pass_20260617).", ) # 21. MCP tool boundary (broad except Exception + return string in str-returning function) enclosing_func = self._current_func_node() if enclosing_func is not None and enclosing_func.returns is not None and ast.unparse(enclosing_func.returns) == "str" and exc_set & {"Exception", "BaseException"} and self._has_string_return(except_body): return ( "INTERNAL_COMPLIANT", f"Compliant: `try: ...; except Exception: return ` in a `-> str` tool function is the canonical MCP tool boundary pattern (per result_migration_review_pass_20260617).", ) # A. Result-returning recovery (canonical Result pattern) — Phase 11.2 if len(except_body) > 0 and self._returns_result(except_body): return ( "INTERNAL_COMPLIANT", f"Compliant: `try: ...; except ({', '.join(sorted(exc_set))}): return Result(data=..., errors=[...])` is the canonical Result-recovery pattern. The function-name-not-ending-in-`_result` is a smell (rename to `xxx_result`); the pattern itself is the data-oriented convention. (per result_migration_small_files_20260617 Phase 11.2)", ) return None def _has_string_return(self, stmts: list[ast.stmt]) -> bool: """True if any statement is a `return `.""" for s in stmts: if isinstance(s, ast.Return) and s.value is not None: if isinstance(s.value, ast.Constant) and isinstance(s.value.value, str): return True if isinstance(s.value, ast.JoinedStr): return True return False def _has_simple_return(self, stmts: list[ast.stmt]) -> bool: """True if the body contains a `return ` statement (any value type).""" for s in stmts: if isinstance(s, ast.Return) and s.value is not None: return True return False def _returns_result(self, stmts: list[ast.stmt]) -> bool: """True if the body returns a `Result(...)` call (canonical Result-recovery pattern). Detects `return Result(data=..., errors=[...])` — the canonical data-oriented error handling pattern. Matches any call to `Result(...)` with at least a `data=` keyword argument. The pattern is compliant when used in a try/except: it satisfies the convention that every try/except site that can fail must return `Result[T]` with structured `ErrorInfo`. The function-name-not-ending-in-`_result` is a smell (the function should be renamed to `xxx_result`), but the pattern itself is compliant (heuristic A from Phase 11.2). """ for s in stmts: if not isinstance(s, ast.Return) or s.value is None: continue if not isinstance(s.value, ast.Call): continue f = s.value.func if isinstance(f, ast.Name) and f.id == "Result": return True if isinstance(f, ast.Attribute) and f.attr == "Result": return True return False def _uses_exception_inline(self, stmts: list[ast.stmt]) -> bool: """True if the body uses `e`/`exc` in a non-pass way (Name reference).""" for s in stmts: if isinstance(s, ast.Pass): continue for node in ast.walk(s): if isinstance(node, ast.Name) and node.id in ("e", "exc"): return True if isinstance(node, ast.Attribute): base = node.value while isinstance(base, ast.Attribute): base = base.value if isinstance(base, ast.Name) and base.id in ("e", "exc"): return True if isinstance(node, ast.FormattedValue): val = node.value while isinstance(val, ast.Attribute): val = val.value if isinstance(val, ast.Name) and val.id in ("e", "exc"): return True return False def _has_assign_fallback(self, stmts: list[ast.stmt]) -> bool: """True if the body contains `var = ` (an assignment, not a return).""" for s in stmts: if isinstance(s, ast.Assign): return True return False def _uses_traceback(self, stmts: list[ast.stmt]) -> bool: """True if the body uses `traceback.format_exc()` or `traceback.print_exc()`.""" for s in stmts: for node in ast.walk(s): if isinstance(node, ast.Call): f = node.func if isinstance(f, ast.Attribute): if isinstance(f.value, ast.Name) and f.value.id == "traceback": if f.attr in ("format_exc", "print_exc", "format_exception", "print_exception"): return True return False def _has_log_call(self, stmts: list[ast.stmt]) -> bool: """True if any statement is a log call (sys.stderr.write, logging.*, print).""" for s in stmts: for node in ast.walk(s): if isinstance(node, ast.Call): f = node.func if isinstance(f, ast.Attribute) and f.attr in ("write", "error", "warning", "info", "debug", "exception"): return True if isinstance(f, ast.Name) and f.id == "print": return True return False def _has_send_response_call(self, stmts: list[ast.stmt]) -> bool: """True if any statement calls self.send_response(...). Drain point D.1 (HTTP error response).""" for stmt in stmts: for node in ast.walk(stmt): if isinstance(node, ast.Call): f = node.func if isinstance(f, ast.Attribute) and isinstance(f.attr, str) and f.attr == "send_response": return True return False def _has_imgui_error_display(self, stmts: list[ast.stmt]) -> bool: """True if any statement opens an ImGui popup (drain point D.2 — GUI error display).""" for stmt in stmts: for node in ast.walk(stmt): if isinstance(node, ast.Call): f = node.func if isinstance(f, ast.Attribute) and isinstance(f.attr, str): if f.attr in ("open_popup", "popup", "modal"): return True return False def _has_websocket_send(self, stmts: list[ast.stmt]) -> bool: """True if any statement calls websocket.send(...) or self.websocket.send(...). Drain point D.2b.""" for stmt in stmts: for node in ast.walk(stmt): if isinstance(node, ast.Call): f = node.func if isinstance(f, ast.Attribute) and isinstance(f.attr, str) and f.attr == "send": return True return False def _has_sys_exit_call(self, stmts: list[ast.stmt]) -> bool: """True if any statement calls sys.exit(...). Drain point D.3 (intentional app termination).""" for stmt in stmts: for node in ast.walk(stmt): if isinstance(node, ast.Call): f = node.func if isinstance(f, ast.Attribute) and isinstance(f.value, ast.Name) and f.value.id == "sys" and f.attr == "exit": return True return False def _has_telemetry_emit_call(self, stmts: list[ast.stmt]) -> bool: """True if any statement calls telemetry.emit_*(...). Drain point D.4 (telemetry emission).""" for stmt in stmts: for node in ast.walk(stmt): if isinstance(node, ast.Call): f = node.func if isinstance(f, ast.Attribute) and isinstance(f.attr, str) and f.attr.startswith("emit_"): if isinstance(f.value, ast.Name) and f.value.id in ("telemetry", "metrics", "monitor"): return True return False def _has_bounded_retry(self, stmts: list[ast.stmt]) -> bool: """True if a bounded retry is present in the enclosing function: `for attempt in range(N): try: ...; except: ...; return None`. Drain point D.5. The bounded-retry pattern requires the SURROUNDING CONTEXT (not just the except body): the enclosing function (or block) must contain `for ... in range(N):` containing this try/except, AND a `return None` AFTER the for loop. The exception handler body's only job is to log/sleep; the real termination is the for-loop's exhaustion + the trailing return None. """ enclosing_func = self._current_func_node() if enclosing_func is None: return False has_for_range_with_try = False has_return_none_after = False for_loop_seen = False for node in ast.walk(enclosing_func): if isinstance(node, ast.For): if isinstance(node.iter, ast.Call) and isinstance(node.iter.func, ast.Name) and node.iter.func.id == "range": for_loop_seen = True for child in ast.walk(node): if isinstance(child, ast.Try): has_for_range_with_try = True break elif for_loop_seen and isinstance(node, ast.Return): if node.value is None: has_return_none_after = True elif isinstance(node.value, ast.Constant) and node.value.value is None: has_return_none_after = True return has_for_range_with_try and has_return_none_after def _has_imgui_end_call(self, stmts: list[ast.stmt]) -> bool: """True if any statement is a call to an imgui.end_* function.""" for s in stmts: for node in ast.walk(s): if isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute) and node.func.attr.startswith("end_"): return True return False def _enclosing_if_is_none_guard(self) -> bool: """True if the current raise is inside an `if is None:` block (validation pattern).""" # The _func_stack holds the function context; we don't track the if-stack. # Walk the AST of the current function and check if the raise is inside # an `if is None:` block. enclosing_func = self._current_func_node() if enclosing_func is None: return False for node in ast.walk(enclosing_func): if node is enclosing_func: continue if isinstance(node, ast.If): test = node.test if isinstance(test, ast.Compare) and isinstance(test.ops[0], ast.Is) and any(isinstance(c, ast.Constant) and c.value is None for c in test.comparators): for child in ast.walk(node): if isinstance(child, ast.Raise) and child is not node: return True return False def _function_body_is_just_this_raise(self, node: ast.Raise) -> bool: """True if the function body is just this raise (abstract method pattern).""" enclosing_func = self._current_func_node() if enclosing_func is None: return False body = enclosing_func.body if len(body) != 1: return False return body[0] is node def _extract_raise_name(self, node: ast.expr) -> str: """Extract the exception class name from a raise expression. For `raise HTTPException(...)` this returns 'HTTPException' (just the name). For `raise ValueError('msg')` this returns 'ValueError'. For `raise self.errors[0]` this returns the full expression (won't match). """ if isinstance(node, ast.Call): return ast.unparse(node.func) if isinstance(node, ast.Name): return node.id if isinstance(node, ast.Attribute): return ast.unparse(node) return ast.unparse(node) def _classify_raise(self, node: ast.Raise) -> tuple[str, str]: exc_str = ast.unparse(node) if node.exc else "raise" exc_name = self._extract_raise_name(node.exc) if node.exc else "" # Bare re-raise if node.exc is None: return ( "INTERNAL_RETHROW", "Suspicious: re-raising without conversion. Consider propagating via Result instead.", ) # FastAPI HTTPException in an _api_* handler exc_short = exc_name.split(".")[-1] if exc_short in {"HTTPException"} and self._is_fastapi_handler(): return ( "BOUNDARY_FASTAPI", "Compliant: FastAPI HTTPException in _api_* handler. This is the framework-idiomatic way to signal HTTP errors; FastAPI converts it to a JSON response at the framework level.", ) # Raising ErrorInfo if "ErrorInfo" in exc_name: return ( "INTERNAL_RETHROW", "Violation: raising ErrorInfo as an exception defeats the data-oriented pattern. Return Result(data=NIL_T, errors=[ErrorInfo(...)]) instead.", ) # Programmer error (in __init__ or as assert) if exc_short in PROGRAMMER_ERROR_EXCEPTIONS: func_name = self._current_func_name() if func_name == "__init__": return ( "INTERNAL_PROGRAMMER_RAISE", f"Compliant: `{exc_short}` in `__init__` is the canonical constructor-precondition pattern (per styleguide 'When to Use This Convention': constructors that fail with programmer errors use assert/raise).", ) if exc_short in {"AssertionError", "ValueError"} or "assert " in exc_str: return ( "INTERNAL_PROGRAMMER_RAISE", f"Compliant: `{exc_short}` for an impossible state / precondition check. The styleguide reserves `raise` for programmer errors.", ) # Heuristic added by result_migration_review_pass_20260617: # NotImplementedError as the entire function body = abstract method pattern. if exc_short == "NotImplementedError" and self._function_body_is_just_this_raise(node): return ( "INTERNAL_PROGRAMMER_RAISE", f"Compliant: `raise NotImplementedError()` as the entire function body is the canonical abstract-method pattern (per result_migration_review_pass_20260617).", ) # Heuristic added by result_migration_review_pass_20260617: # `if is None: raise ImportError(...)` = validation raise (precondition check). if exc_short in {"ImportError", "RuntimeError", "ValueError", "KeyError"} and self._enclosing_if_is_none_guard(): return ( "INTERNAL_PROGRAMMER_RAISE", f"Compliant: `raise {exc_short}` inside `if is None:` is the canonical validation/precondition-check pattern (per result_migration_review_pass_20260617).", ) return ( "INTERNAL_RETHROW", f"Review: `raise {exc_name}` in internal code. Confirm this is a programmer error (assertion) and not a runtime failure (which should be a Result).", ) def _snippet(self, node: ast.AST) -> str: return ast.unparse(node).replace("\n", " ").strip()[:120] def visit_FunctionDef(self, node: ast.FunctionDef) -> None: self._func_stack.append(node) try: self.generic_visit(node) finally: self._func_stack.pop() def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None: self._func_stack.append(node) try: self.generic_visit(node) finally: self._func_stack.pop() def _add_finding(self, kind: str, line: int, snippet: str, category: str, hint: str) -> None: self.report.findings.append(Finding( filename=self.filename, line=line, kind=kind, context=self._current_func_name(), snippet=snippet, category=category, hint=hint, in_refactored_baseline=self._in_baseline, )) def visit_Try(self, node: ast.Try) -> None: self._try_stack.append(node) try: # bare try/finally (no except) = canonical cleanup pattern if not node.handlers and node.finalbody: self._add_finding( "TRY", node.lineno, self._snippet(node), "INTERNAL_COMPLIANT", "Compliant: bare try/finally is the canonical cleanup pattern (analog of `goto defer`).", ) for child in node.body: self.visit(child) for handler in node.handlers: category, hint = self._classify_except(handler, node) self._add_finding("EXCEPT", handler.lineno, self._snippet(handler), category, hint) for child in handler.body: self.visit(child) for child in node.orelse: self.visit(child) for child in node.finalbody: self.visit(child) finally: self._try_stack.pop() def visit_TryStar(self, node: ast.TryStar) -> None: self.visit_Try(node) # type: ignore[arg-type] def visit_Raise(self, node: ast.Raise) -> None: category, hint = self._classify_raise(node) self._add_finding("RAISE", node.lineno, self._snippet(node), category, hint) self.generic_visit(node) def audit_file(filepath: Path) -> FileReport: try: source = filepath.read_text(encoding="utf-8") except (OSError, UnicodeDecodeError) as e: report = FileReport(filename=str(filepath)) report.has_error = True report.error_message = f"could not read: {e}" return report try: tree = ast.parse(source, filename=str(filepath)) except SyntaxError as e: report = FileReport(filename=str(filepath)) report.has_error = True report.error_message = f"syntax error: {e}" return report visitor = ExceptionVisitor(str(filepath)) visitor.visit(tree) return visitor.report def find_python_files(root: Path, exclude_artifacts: bool = True) -> list[Path]: if not root.exists(): raise FileNotFoundError(f"Source directory not found: {root}") files = sorted(p for p in root.rglob("*.py") if "__pycache__" not in p.parts) if exclude_artifacts: files = [p for p in files if "artifacts" not in p.parts] return files def render_human(reports: list[FileReport], files_scanned: int, top: int, verbose: bool) -> str: lines: list[str] = [] total_findings = sum(len(r.findings) for r in reports) total_violations = sum(r.violation_count for r in reports) total_compliant = sum(r.compliant_count for r in reports) total_unclear = sum(r.unclear_count for r in reports) total_suspicious = sum(r.suspicious_count for r in reports) try_count = sum(1 for r in reports for f in r.findings if f.kind == "TRY") except_count = sum(1 for r in reports for f in r.findings if f.kind == "EXCEPT") finally_count = sum(1 for r in reports for f in r.findings if f.kind == "FINALLY") raise_count = sum(1 for r in reports for f in r.findings if f.kind == "RAISE") # Separate baseline vs migration target baseline_findings = [f for r in reports for f in r.findings if f.in_refactored_baseline] migration_findings = [f for r in reports for f in r.findings if not f.in_refactored_baseline] baseline_violations = sum(1 for f in baseline_findings if f.category in VIOLATION_CATEGORIES) migration_violations = sum(1 for f in migration_findings if f.category in VIOLATION_CATEGORIES) lines.append("=== Exception Handling Audit (Data-Oriented Convention) ===\n") lines.append(f"Files scanned: {files_scanned}") lines.append(f"Files with findings: {len(reports)}") lines.append(f"Total sites: {total_findings}") lines.append(f" try: {try_count}") lines.append(f" except: {except_count}") lines.append(f" raise: {raise_count}") lines.append("") lines.append(f"Compliant sites: {total_compliant}") lines.append(f"Suspicious sites: {total_suspicious}") lines.append(f"Violation sites: {total_violations}") lines.append(f"Unclear (review): {total_unclear}") lines.append("") lines.append("--- Baseline (refactored files: mcp_client, ai_client, rag_engine) ---") lines.append(f" Sites: {len(baseline_findings)}, violations: {baseline_violations}") lines.append("--- Migration target (all other src/ files) ---") lines.append(f" Sites: {len(migration_findings)}, violations: {migration_violations}") lines.append("") cat_counts = Counter(f.category for r in reports for f in r.findings) lines.append("By category:") for cat, n in cat_counts.most_common(): mark = "" if cat in VIOLATION_CATEGORIES: mark = " (VIOLATION)" elif cat == "INTERNAL_RETHROW": mark = " (suspicious)" elif cat in COMPLIANT_CATEGORIES: mark = " (compliant)" elif cat == "UNCLEAR": mark = " (review)" lines.append(f" {cat:30s} {n:4d}{mark}") lines.append("") lines.append(f"--- Top {top} files by violation count (migration target only) ---") ranked = sorted( [r for r in reports if not r.is_refactored_baseline], key=lambda r: (-r.violation_count, -len(r.findings), r.filename), )[:top] for r in ranked: if r.violation_count == 0 and r.unclear_count == 0 and r.suspicious_count == 0: continue lines.append(f"\n{r.filename} (V={r.violation_count}, S={r.suspicious_count}, ?={r.unclear_count}, C={r.compliant_count}, total={len(r.findings)})") if verbose: for f in r.findings: if f.category in VIOLATION_CATEGORIES or f.category in ("UNCLEAR", "INTERNAL_RETHROW"): lines.append(f" L{f.line:4d} [{f.kind:7s}] {f.category:28s} in {f.context}") lines.append(f" {f.snippet[:100]}") lines.append(f" hint: {f.hint}") else: by_cat = Counter(f.category for f in r.findings if f.category in VIOLATION_CATEGORIES or f.category in ("UNCLEAR", "INTERNAL_RETHROW")) for cat, n in by_cat.most_common(): lines.append(f" {cat:30s} {n}") return "\n".join(lines) + "\n" def render_json(reports: list[FileReport], files_scanned: int, top: int, verbose: bool) -> str: total_findings = sum(len(r.findings) for r in reports) total_violations = sum(r.violation_count for r in reports) total_compliant = sum(r.compliant_count for r in reports) total_unclear = sum(r.unclear_count for r in reports) total_suspicious = sum(r.suspicious_count for r in reports) baseline_findings = [f for r in reports for f in r.findings if f.in_refactored_baseline] migration_findings = [f for r in reports for f in r.findings if not f.in_refactored_baseline] baseline_violations = sum(1 for f in baseline_findings if f.category in VIOLATION_CATEGORIES) migration_violations = sum(1 for f in migration_findings if f.category in VIOLATION_CATEGORIES) output = { "refactored_baseline_files": sorted(REFACTORED_BASELINE_FILES), "files_scanned": files_scanned, "files_with_findings": len(reports), "total_sites": total_findings, "by_kind": dict(Counter(f.kind for r in reports for f in r.findings)), "compliant_sites": total_compliant, "suspicious_sites": total_suspicious, "violation_sites": total_violations, "unclear_sites": total_unclear, "by_category": dict(Counter(f.category for r in reports for f in r.findings).most_common()), "violations_by_category": dict(Counter( f.category for r in reports for f in r.findings if f.category in VIOLATION_CATEGORIES ).most_common()), "baseline": { "file_count": len([f for f in REFACTORED_BASELINE_FILES]), "sites": len(baseline_findings), "violations": baseline_violations, }, "migration_target": { "sites": len(migration_findings), "violations": migration_violations, }, "files": [ { "filename": r.filename, "in_refactored_baseline": r.is_refactored_baseline, "violation_count": r.violation_count, "compliant_count": r.compliant_count, "suspicious_count": r.suspicious_count, "unclear_count": r.unclear_count, "has_error": r.has_error, "error_message": r.error_message, "findings": [ { "line": f.line, "kind": f.kind, "context": f.context, "category": f.category, "snippet": f.snippet, "hint": f.hint, } for f in r.findings ] if verbose else [ { "line": f.line, "kind": f.kind, "context": f.context, "category": f.category, } for f in r.findings ], } for r in sorted(reports, key=lambda r: (-r.violation_count, -r.suspicious_count, r.filename))[:top if not verbose else len(reports)] ], } return json.dumps(output, indent=2) def render_summary(reports: list[FileReport], files_scanned: int) -> str: """Per-file summary table. Used for planning migration tracks. Columns: file, total, V (violations), S (suspicious), ? (unclear), C (compliant). Sorted by V+S descending so the highest-impact files are at the top. """ lines: list[str] = [] lines.append("=== Exception Handling Audit: Per-File Summary ===\n") lines.append(f"Files scanned: {files_scanned}") lines.append(f"Files with findings: {len(reports)}\n") lines.append(f"{'file':<38} {'total':>6} {'V':>5} {'S':>5} {'?':>4} {'C':>5} baseline?") lines.append("-" * 90) for f in sorted(reports, key=lambda r: -(r.violation_count + r.suspicious_count)): total = f.violation_count + f.suspicious_count + f.unclear_count + f.compliant_count if total == 0: continue name = f.filename.replace("src/", "").replace("\\", "/") base = "*BASELINE*" if f.is_refactored_baseline else "" lines.append(f"{name:<38} {total:>6} {f.violation_count:>5} {f.suspicious_count:>5} {f.unclear_count:>4} {f.compliant_count:>5} {base}") lines.append("-" * 90) total_v = sum(r.violation_count for r in reports) total_s = sum(r.suspicious_count for r in reports) total_u = sum(r.unclear_count for r in reports) total_c = sum(r.compliant_count for r in reports) lines.append(f"{'TOTAL':<38} {total_v + total_s + total_u + total_c:>6} {total_v:>5} {total_s:>5} {total_u:>4} {total_c:>5}") return "\n".join(lines) + "\n" def render_by_size(reports: list[FileReport], files_scanned: int) -> str: """Group files by violation+suspicious count bucket for migration planning. Buckets: small (<=5), medium (6-15), large (>=16). Plus the 3 refactored baseline files as a separate bucket (the convention reference; remaining gaps should be closed to make them pure compliant). """ lines: list[str] = [] lines.append("=== Exception Handling Audit: Files Grouped by Migration Effort ===\n") lines.append(f"Files scanned: {files_scanned}") lines.append(f"Files with findings: {len(reports)}\n") baseline = [r for r in reports if r.is_refactored_baseline] large = [r for r in reports if not r.is_refactored_baseline and r.violation_count + r.suspicious_count >= 16] medium = [r for r in reports if not r.is_refactored_baseline and 6 <= r.violation_count + r.suspicious_count <= 15] small = [r for r in reports if not r.is_refactored_baseline and r.violation_count + r.suspicious_count <= 5] def _bucket(name: str, files: list[FileReport], note: str) -> None: if not files: return v = sum(r.violation_count for r in files) s = sum(r.suspicious_count for r in files) u = sum(r.unclear_count for r in files) c = sum(r.compliant_count for r in files) total = v + s + u + c lines.append(f"--- {name} ({len(files)} files, V+S={v+s}, V={v}, S={s}, ?={u}, C={c}, total={total}) ---") if note: lines.append(f" {note}") for r in sorted(files, key=lambda x: -(x.violation_count + x.suspicious_count)): name = r.filename.replace("src/", "").replace("\\", "/") lines.append(f" {name:<36} V={r.violation_count:>3} S={r.suspicious_count:>2} ?={r.unclear_count:>2} C={r.compliant_count:>3} total={len(r.findings)}") lines.append("") _bucket( "LARGE (>=16 V+S; dedicated track per file)", large, "Each file is too big for a batched track. 1 track per file; 2-3 days Tier 2 each.", ) _bucket( "MEDIUM (6-15 V+S; can group 2-3 files per track)", medium, "Each file is independent; can be batched in 1 track per group. 0.5-1 day Tier 2 each.", ) _bucket( "SMALL (<=5 V+S; batched in one 'small files' track)", small, "Each file is small enough for a single batched track. 0.5-1 day Tier 2 for the whole batch.", ) _bucket( "BASELINE (3 refactored files; the convention reference)", baseline, "These files ARE the convention. Remaining violations are gaps to close (deferred work from the parent track).", ) return "\n".join(lines) + "\n" def main() -> int: parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("--src", default="src", help="Source directory to audit (default: src)") parser.add_argument("--json", action="store_true", help="Output JSON instead of human-readable report") parser.add_argument("--top", type=int, default=200, help="Show top N files by violation count (default: 200)") parser.add_argument("--verbose", action="store_true", help="Show every site inline (default: top N summary)") parser.add_argument("--include-tests", action="store_true", help="Also scan tests/ and scripts/") parser.add_argument("--strict", action="store_true", help="Exit 1 if any violations are found (for CI use; the convention's CI gate)") parser.add_argument("--ci", dest="strict", action="store_true", help="Alias for --strict (clearer name for CI scripts; e.g., pre-commit hooks)") parser.add_argument("--include-baseline", action="store_true", help="Include the 3 refactored files in the violation count (default: exclude)") parser.add_argument("--summary", action="store_true", help="Per-file summary table (for migration planning)") parser.add_argument("--by-size", action="store_true", help="Group files by migration effort bucket (small/medium/large/baseline)") parser.add_argument("--exclude", action="append", default=[], help="Additional path components to exclude (can repeat)") args = parser.parse_args() src = Path(args.src) try: files = find_python_files(src) except FileNotFoundError as e: print(f"ERROR: {e}", file=sys.stderr) return 1 if args.include_tests: for extra in ("tests", "scripts"): p = Path(extra) if p.exists(): files.extend(find_python_files(p)) if args.exclude: files = [f for f in files if not any(ex in f.parts for ex in args.exclude)] reports: list[FileReport] = [audit_file(f) for f in files] reports = [r for r in reports if r.findings or r.has_error] if args.json: print(render_json(reports, len(files), args.top, args.verbose)) if args.include_baseline: total_violations = sum(r.violation_count for r in reports) else: total_violations = sum(r.violation_count for r in reports if not r.is_refactored_baseline) return 1 if (args.strict and total_violations > 0) else 0 if args.summary: print(render_summary(reports, len(files))) return 0 if args.by_size: print(render_by_size(reports, len(files))) return 0 print(render_human(reports, len(files), args.top, args.verbose)) if args.include_baseline: total_violations = sum(r.violation_count for r in reports) else: total_violations = sum(r.violation_count for r in reports if not r.is_refactored_baseline) if args.strict and total_violations > 0: print(f"\nSTRICT MODE: {total_violations} violation(s) found; exiting 1.", file=sys.stderr) return 1 return 0 if __name__ == "__main__": sys.exit(main())