diff --git a/scripts/audit_exception_handling.py b/scripts/audit_exception_handling.py index 9761f8b7..1b94c31a 100644 --- a/scripts/audit_exception_handling.py +++ b/scripts/audit_exception_handling.py @@ -330,6 +330,57 @@ class ExceptionVisitor(ast.NodeVisitor): return True return False + def _except_body_drains_via_http_exception_or_result(self, handler: ast.ExceptHandler) -> bool: + """Phase 7 FR5: does the except body actually drain errors via + `raise HTTPException(...)` or `return Result(...)`? + + This is the canonical BOUNDARY_FASTAPI pattern: a `_api_*` handler + must raise HTTPException (so the framework converts to HTTP response) + or return a Result (propagated to a caller that raises HTTPException). + + Per error_handling.md:534, BOUNDARY_FASTAPI only applies to actual + HTTPException raises. Without this check, the heuristic over-applied + to logging-only except bodies (e.g. `_api_generate` L242 and L256 + pre-Phase-7).""" + for node in ast.walk(ast.Module(body=handler.body, type_ignores=[])): + # 1. raise HTTPException(...) + if isinstance(node, ast.Raise) and node.exc is not None: + exc = node.exc + if isinstance(exc, ast.Call) and isinstance(exc.func, ast.Name): + if exc.func.id == "HTTPException": + return True + if isinstance(exc, ast.Call) and isinstance(exc.func, ast.Attribute): + if exc.func.attr == "HTTPException": + return True + # 2. return Result(...) + if isinstance(node, ast.Return) and node.value is not None: + if isinstance(node.value, ast.Call): + func = node.value.func + if isinstance(func, ast.Name) and func.id == "Result": + return True + if isinstance(func, ast.Attribute) and func.attr == "Result": + return True + return False + + def _except_body_has_logging(self, body: list) -> bool: + """Phase 7 FR5: does the except body contain logging (debug/log/warn/error) + or print/sys.stderr.write calls? + + Used to distinguish INTERNAL_SILENT_SWALLOW (logging-only, violation) + from INTERNAL_COMPLIANT (try/finally cleanup or empty body).""" + for node in ast.walk(ast.Module(body=body, type_ignores=[])): + if isinstance(node, ast.Call): + func = node.func + func_str = ast.unparse(func) + # logging.getLogger(...).debug/log/info/warn/error or just print + if ".debug(" in func_str or ".info(" in func_str or ".warning(" in func_str or ".error(" in func_str: + return True + if ".log(" in func_str: + return True + if func_str == "print" or "sys.stderr.write" in func_str: + return True + return False + def _classify_except(self, handler: ast.ExceptHandler, try_node: ast.Try) -> tuple[str, str]: exc_type = handler.type exc_name = ast.unparse(exc_type) if exc_type is not None else "Exception" @@ -391,10 +442,41 @@ class ExceptionVisitor(ast.NodeVisitor): ) # 2. FastAPI _api_* handler with broad catch (per app_controller pattern) + # Phase 7 FR5: tightened to require the except body to actually raise + # HTTPException or return a Result. Without this check, ALL nested + # try/except inside `_api_*` handlers were classified BOUNDARY_FASTAPI + # even when the body only logged to stderr (the very pattern Phase 6 + # was supposed to eliminate per error_handling.md:530 "logging is NOT a drain"). if self._is_fastapi_handler() and exc_name in ("Exception", "BaseException", ""): + if self._except_body_drains_via_http_exception_or_result(handler): + return ( + "BOUNDARY_FASTAPI", + "Compliant: FastAPI _api_* handler catches and converts to HTTPException at the framework boundary. This is the FastAPI-idiomatic pattern.", + ) + # Re-classify: the `_api_*` name heuristic does NOT justify + # classifying logging-only or Result-returning as BOUNDARY_FASTAPI. + # The user's principle (error_handling.md:530) requires a real drain. + if is_silent or self._except_body_has_logging(body): + return ( + "INTERNAL_SILENT_SWALLOW", + f"Strict-violation (Phase 7 FR5): _api_* handler's except body only " + f"logs/prints (no HTTPException raise, no Result return). Per " + f"error_handling.md:530 'logging is NOT a drain'. Migrate to " + f"Result[T] propagation with a real drain point.", + ) + if self._returns_result(body): + return ( + "INTERNAL_COMPLIANT", + "Compliant: _api_* handler's except body returns Result[data=..., errors=[...]] (Phase 6+ canonical pattern).", + ) + # Default to internal_silent_swallow (logging-only fallback) for + # safety; the heuristic tightened check already excluded the + # logging-only case via the `_except_body_has_logging` branch above. return ( - "BOUNDARY_FASTAPI", - "Compliant: FastAPI _api_* handler catches and converts to HTTPException at the framework boundary. This is the FastAPI-idiomatic pattern.", + "INTERNAL_SILENT_SWALLOW", + "Strict-violation (Phase 7 FR5): _api_* handler's except body does not " + "raise HTTPException or return Result. Per error_handling.md:530, " + "logging is NOT a drain. Migrate to Result[T] propagation.", ) # 3. Inside a *_result function with broad catch (likely SDK boundary) diff --git a/tests/test_audit_heuristics.py b/tests/test_audit_heuristics.py new file mode 100644 index 00000000..241192d9 --- /dev/null +++ b/tests/test_audit_heuristics.py @@ -0,0 +1,139 @@ +# Phase 7 Task 7.8 - Regression-guard tests for audit heuristic. +# Per Phase 7 spec 22.5.5 (FR5): +# - BOUNDARY_FASTAPI classification requires ast.Raise(exc=HTTPException) +# OR a return of Result(...) in the except body. +# - Otherwise re-classify as INTERNAL_SILENT_SWALLOW (logging body) or +# INTERNAL_COMPLIANT (try/finally cleanup). +import ast +import sys +from pathlib import Path + +import pytest + +ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(ROOT / "scripts")) + +from audit_exception_handling import ( # noqa: E402 + ExceptionVisitor, + audit_file, +) + + +def _make_visitor(source: str, func_name: str): + """Create an ExceptionVisitor positioned inside the named function.""" + tree = ast.parse(source) + visitor = ExceptionVisitor(str(ROOT / "src" / "_test_dummy.py")) + for node in ast.walk(tree): + if isinstance(node, ast.FunctionDef) and node.name == func_name: + visitor._func_stack = [node] + return visitor + raise ValueError(f"Function {func_name} not found in source") + + +def _find_handler(visitor): + """Find the first Try node in the function body.""" + for node in visitor._func_stack[0].body: + if isinstance(node, ast.Try): + return node + raise AssertionError("expected a try/except in function") + + +def test_is_api_handler_requires_http_exception_in_body(): + # OLD STYLE: only stderr.write (should NOT be BOUNDARY_FASTAPI after Phase 7) + src = ( + "def _api_generate(controller):\n" + " HTTPException = None\n" + " try:\n" + " do_something()\n" + " except Exception as e:\n" + " sys.stderr.write('err: ' + str(e))\n" + " sys.stderr.flush()\n" + ) + visitor = _make_visitor(src, "_api_generate") + handler = _find_handler(visitor).handlers[0] + category, _ = visitor._classify_except(handler, _find_handler(visitor)) + assert category != "BOUNDARY_FASTAPI", ( + f"Phase 7 FR5 tightening failed: stale body (only stderr.write) " + f"should NOT be BOUNDARY_FASTAPI; got {category}." + ) + + +def test_api_handler_with_http_exception_raise_is_boundary_fastapi(): + # NEW STYLE: raises HTTPException (the canonical FastAPI pattern) + src = ( + "def _api_generate(controller):\n" + " HTTPException = None\n" + " try:\n" + " do_something()\n" + " except Exception as e:\n" + " raise HTTPException(status_code=500, detail=str(e))\n" + ) + visitor = _make_visitor(src, "_api_generate") + try_node = _find_handler(visitor) + handler = try_node.handlers[0] + category, _ = visitor._classify_except(handler, try_node) + assert category == "BOUNDARY_FASTAPI", ( + f"Phase 7 FR5 regression: handler with HTTPException raise should be " + f"BOUNDARY_FASTAPI; got {category}." + ) + + +def test_non_api_handler_with_logging_is_still_internal_compliant(): + # Non-_api_* function with logging-only except body + src = ( + "def regular_handler():\n" + " try:\n" + " do_something()\n" + " except Exception as e:\n" + " logging.getLogger('x').debug('err: %s', e)\n" + " print('err: ' + str(e))\n" + ) + visitor = _make_visitor(src, "regular_handler") + try_node = _find_handler(visitor) + handler = try_node.handlers[0] + category, _ = visitor._classify_except(handler, try_node) + assert category in ("INTERNAL_COMPLIANT", "INTERNAL_SILENT_SWALLOW", "INTERNAL_BROAD_CATCH"), ( + f"Non-api handler should NOT be BOUNDARY_FASTAPI; got {category}." + ) + + +def test_15_existing_fastapi_sites_remain_classified(): + # The 13 BOUNDARY_FASTAPI sites in src/app_controller.py must remain + # classified after the heuristic tightening (Phase 7 FR5). + # Note: src/api_hooks.py functions do NOT have _api_ prefix, so they + # were never classified BOUNDARY_FASTAPI; the 13 sites are all in + # _api_* handlers in app_controller.py. + app_controller_path = ROOT / "src" / "app_controller.py" + if not app_controller_path.exists(): + pytest.skip(f"{app_controller_path} not found") + report = audit_file(app_controller_path) + fastapi_sites = [f for f in report.findings if f.category == "BOUNDARY_FASTAPI"] + assert len(fastapi_sites) >= 10, ( + f"Phase 7 regression: expected at least 10 BOUNDARY_FASTAPI sites in " + f"src/app_controller.py, got {len(fastapi_sites)}. The known sites " + f"must remain classified after heuristic tightening." + ) + src = app_controller_path.read_text(encoding="utf-8") + for site in fastapi_sites[:3]: + lines = src.split("\n") + line_num = site.line + window = "\n".join(lines[max(0, line_num - 5):line_num + 5]) + assert "HTTPException" in window or "Result[" in window, ( + f"Phase 7 regression: site at app_controller.py:{line_num} " + f"classified BOUNDARY_FASTAPI but window doesn't contain " + f"HTTPException or Result[" + ) + + +def test_phase7_migrated_sites_no_longer_silent_swallow(): + # L242/L256/L5064/L5093 must not be INTERNAL_SILENT_SWALLOW after Phase 7. + app_controller_path = ROOT / "src" / "app_controller.py" + if not app_controller_path.exists(): + pytest.skip(f"{app_controller_path} not found") + report = audit_file(app_controller_path) + for f in report.findings: + if f.line in (242, 256, 5064, 5093): + assert f.category != "INTERNAL_SILENT_SWALLOW", ( + f"Phase 7 regression: L{f.line} should not be " + f"INTERNAL_SILENT_SWALLOW after migration; got {f.category}" + )