fix(audit): tighten _is_fastapi_handler BOUNDARY_FASTAPI heuristic (Phase 7 Task 7.6+7.8)
The previous heuristic over-applied BOUNDARY_FASTAPI to ALL try/except inside _api_* handlers, regardless of whether the except body actually raises HTTPException. This was the laundering pattern that allowed L242 and L256 in _api_generate to be classified compliant while only doing sys.stderr.write. Per Phase 7 spec 22.5.5 (FR5), BOUNDARY_FASTAPI now requires: - The except body contains ast.Raise(exc=HTTPException(...)), OR - The except body contains return Result(...) Otherwise: - INTERNAL_SILENT_SWALLOW if the body has logging (the strict-violation case per error_handling.md:530 'logging is NOT a drain') - INTERNAL_COMPLIANT if the body returns Result New helpers: - _except_body_drains_via_http_exception_or_result(handler) - _except_body_has_logging(body) 5 regression-guard tests in tests/test_audit_heuristics.py lock the behavior so the heuristic does not regress the 13 BOUNDARY_FASTAPI sites in src/app_controller.py. TIER-2 READ conductor/code_styleguides/error_handling.md end-to-end before this commit.
This commit is contained in:
@@ -330,6 +330,57 @@ class ExceptionVisitor(ast.NodeVisitor):
|
||||
return True
|
||||
return False
|
||||
|
||||
def _except_body_drains_via_http_exception_or_result(self, handler: ast.ExceptHandler) -> bool:
|
||||
"""Phase 7 FR5: does the except body actually drain errors via
|
||||
`raise HTTPException(...)` or `return Result(...)`?
|
||||
|
||||
This is the canonical BOUNDARY_FASTAPI pattern: a `_api_*` handler
|
||||
must raise HTTPException (so the framework converts to HTTP response)
|
||||
or return a Result (propagated to a caller that raises HTTPException).
|
||||
|
||||
Per error_handling.md:534, BOUNDARY_FASTAPI only applies to actual
|
||||
HTTPException raises. Without this check, the heuristic over-applied
|
||||
to logging-only except bodies (e.g. `_api_generate` L242 and L256
|
||||
pre-Phase-7)."""
|
||||
for node in ast.walk(ast.Module(body=handler.body, type_ignores=[])):
|
||||
# 1. raise HTTPException(...)
|
||||
if isinstance(node, ast.Raise) and node.exc is not None:
|
||||
exc = node.exc
|
||||
if isinstance(exc, ast.Call) and isinstance(exc.func, ast.Name):
|
||||
if exc.func.id == "HTTPException":
|
||||
return True
|
||||
if isinstance(exc, ast.Call) and isinstance(exc.func, ast.Attribute):
|
||||
if exc.func.attr == "HTTPException":
|
||||
return True
|
||||
# 2. return Result(...)
|
||||
if isinstance(node, ast.Return) and node.value is not None:
|
||||
if isinstance(node.value, ast.Call):
|
||||
func = node.value.func
|
||||
if isinstance(func, ast.Name) and func.id == "Result":
|
||||
return True
|
||||
if isinstance(func, ast.Attribute) and func.attr == "Result":
|
||||
return True
|
||||
return False
|
||||
|
||||
def _except_body_has_logging(self, body: list) -> bool:
|
||||
"""Phase 7 FR5: does the except body contain logging (debug/log/warn/error)
|
||||
or print/sys.stderr.write calls?
|
||||
|
||||
Used to distinguish INTERNAL_SILENT_SWALLOW (logging-only, violation)
|
||||
from INTERNAL_COMPLIANT (try/finally cleanup or empty body)."""
|
||||
for node in ast.walk(ast.Module(body=body, type_ignores=[])):
|
||||
if isinstance(node, ast.Call):
|
||||
func = node.func
|
||||
func_str = ast.unparse(func)
|
||||
# logging.getLogger(...).debug/log/info/warn/error or just print
|
||||
if ".debug(" in func_str or ".info(" in func_str or ".warning(" in func_str or ".error(" in func_str:
|
||||
return True
|
||||
if ".log(" in func_str:
|
||||
return True
|
||||
if func_str == "print" or "sys.stderr.write" in func_str:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _classify_except(self, handler: ast.ExceptHandler, try_node: ast.Try) -> tuple[str, str]:
|
||||
exc_type = handler.type
|
||||
exc_name = ast.unparse(exc_type) if exc_type is not None else "Exception"
|
||||
@@ -391,10 +442,41 @@ class ExceptionVisitor(ast.NodeVisitor):
|
||||
)
|
||||
|
||||
# 2. FastAPI _api_* handler with broad catch (per app_controller pattern)
|
||||
# Phase 7 FR5: tightened to require the except body to actually raise
|
||||
# HTTPException or return a Result. Without this check, ALL nested
|
||||
# try/except inside `_api_*` handlers were classified BOUNDARY_FASTAPI
|
||||
# even when the body only logged to stderr (the very pattern Phase 6
|
||||
# was supposed to eliminate per error_handling.md:530 "logging is NOT a drain").
|
||||
if self._is_fastapi_handler() and exc_name in ("Exception", "BaseException", ""):
|
||||
if self._except_body_drains_via_http_exception_or_result(handler):
|
||||
return (
|
||||
"BOUNDARY_FASTAPI",
|
||||
"Compliant: FastAPI _api_* handler catches and converts to HTTPException at the framework boundary. This is the FastAPI-idiomatic pattern.",
|
||||
)
|
||||
# Re-classify: the `_api_*` name heuristic does NOT justify
|
||||
# classifying logging-only or Result-returning as BOUNDARY_FASTAPI.
|
||||
# The user's principle (error_handling.md:530) requires a real drain.
|
||||
if is_silent or self._except_body_has_logging(body):
|
||||
return (
|
||||
"INTERNAL_SILENT_SWALLOW",
|
||||
f"Strict-violation (Phase 7 FR5): _api_* handler's except body only "
|
||||
f"logs/prints (no HTTPException raise, no Result return). Per "
|
||||
f"error_handling.md:530 'logging is NOT a drain'. Migrate to "
|
||||
f"Result[T] propagation with a real drain point.",
|
||||
)
|
||||
if self._returns_result(body):
|
||||
return (
|
||||
"INTERNAL_COMPLIANT",
|
||||
"Compliant: _api_* handler's except body returns Result[data=..., errors=[...]] (Phase 6+ canonical pattern).",
|
||||
)
|
||||
# Default to internal_silent_swallow (logging-only fallback) for
|
||||
# safety; the heuristic tightened check already excluded the
|
||||
# logging-only case via the `_except_body_has_logging` branch above.
|
||||
return (
|
||||
"BOUNDARY_FASTAPI",
|
||||
"Compliant: FastAPI _api_* handler catches and converts to HTTPException at the framework boundary. This is the FastAPI-idiomatic pattern.",
|
||||
"INTERNAL_SILENT_SWALLOW",
|
||||
"Strict-violation (Phase 7 FR5): _api_* handler's except body does not "
|
||||
"raise HTTPException or return Result. Per error_handling.md:530, "
|
||||
"logging is NOT a drain. Migrate to Result[T] propagation.",
|
||||
)
|
||||
|
||||
# 3. Inside a *_result function with broad catch (likely SDK boundary)
|
||||
|
||||
@@ -0,0 +1,139 @@
|
||||
# Phase 7 Task 7.8 - Regression-guard tests for audit heuristic.
|
||||
# Per Phase 7 spec 22.5.5 (FR5):
|
||||
# - BOUNDARY_FASTAPI classification requires ast.Raise(exc=HTTPException)
|
||||
# OR a return of Result(...) in the except body.
|
||||
# - Otherwise re-classify as INTERNAL_SILENT_SWALLOW (logging body) or
|
||||
# INTERNAL_COMPLIANT (try/finally cleanup).
|
||||
import ast
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
ROOT = Path(__file__).resolve().parent.parent
|
||||
sys.path.insert(0, str(ROOT / "scripts"))
|
||||
|
||||
from audit_exception_handling import ( # noqa: E402
|
||||
ExceptionVisitor,
|
||||
audit_file,
|
||||
)
|
||||
|
||||
|
||||
def _make_visitor(source: str, func_name: str):
|
||||
"""Create an ExceptionVisitor positioned inside the named function."""
|
||||
tree = ast.parse(source)
|
||||
visitor = ExceptionVisitor(str(ROOT / "src" / "_test_dummy.py"))
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, ast.FunctionDef) and node.name == func_name:
|
||||
visitor._func_stack = [node]
|
||||
return visitor
|
||||
raise ValueError(f"Function {func_name} not found in source")
|
||||
|
||||
|
||||
def _find_handler(visitor):
|
||||
"""Find the first Try node in the function body."""
|
||||
for node in visitor._func_stack[0].body:
|
||||
if isinstance(node, ast.Try):
|
||||
return node
|
||||
raise AssertionError("expected a try/except in function")
|
||||
|
||||
|
||||
def test_is_api_handler_requires_http_exception_in_body():
|
||||
# OLD STYLE: only stderr.write (should NOT be BOUNDARY_FASTAPI after Phase 7)
|
||||
src = (
|
||||
"def _api_generate(controller):\n"
|
||||
" HTTPException = None\n"
|
||||
" try:\n"
|
||||
" do_something()\n"
|
||||
" except Exception as e:\n"
|
||||
" sys.stderr.write('err: ' + str(e))\n"
|
||||
" sys.stderr.flush()\n"
|
||||
)
|
||||
visitor = _make_visitor(src, "_api_generate")
|
||||
handler = _find_handler(visitor).handlers[0]
|
||||
category, _ = visitor._classify_except(handler, _find_handler(visitor))
|
||||
assert category != "BOUNDARY_FASTAPI", (
|
||||
f"Phase 7 FR5 tightening failed: stale body (only stderr.write) "
|
||||
f"should NOT be BOUNDARY_FASTAPI; got {category}."
|
||||
)
|
||||
|
||||
|
||||
def test_api_handler_with_http_exception_raise_is_boundary_fastapi():
|
||||
# NEW STYLE: raises HTTPException (the canonical FastAPI pattern)
|
||||
src = (
|
||||
"def _api_generate(controller):\n"
|
||||
" HTTPException = None\n"
|
||||
" try:\n"
|
||||
" do_something()\n"
|
||||
" except Exception as e:\n"
|
||||
" raise HTTPException(status_code=500, detail=str(e))\n"
|
||||
)
|
||||
visitor = _make_visitor(src, "_api_generate")
|
||||
try_node = _find_handler(visitor)
|
||||
handler = try_node.handlers[0]
|
||||
category, _ = visitor._classify_except(handler, try_node)
|
||||
assert category == "BOUNDARY_FASTAPI", (
|
||||
f"Phase 7 FR5 regression: handler with HTTPException raise should be "
|
||||
f"BOUNDARY_FASTAPI; got {category}."
|
||||
)
|
||||
|
||||
|
||||
def test_non_api_handler_with_logging_is_still_internal_compliant():
|
||||
# Non-_api_* function with logging-only except body
|
||||
src = (
|
||||
"def regular_handler():\n"
|
||||
" try:\n"
|
||||
" do_something()\n"
|
||||
" except Exception as e:\n"
|
||||
" logging.getLogger('x').debug('err: %s', e)\n"
|
||||
" print('err: ' + str(e))\n"
|
||||
)
|
||||
visitor = _make_visitor(src, "regular_handler")
|
||||
try_node = _find_handler(visitor)
|
||||
handler = try_node.handlers[0]
|
||||
category, _ = visitor._classify_except(handler, try_node)
|
||||
assert category in ("INTERNAL_COMPLIANT", "INTERNAL_SILENT_SWALLOW", "INTERNAL_BROAD_CATCH"), (
|
||||
f"Non-api handler should NOT be BOUNDARY_FASTAPI; got {category}."
|
||||
)
|
||||
|
||||
|
||||
def test_15_existing_fastapi_sites_remain_classified():
|
||||
# The 13 BOUNDARY_FASTAPI sites in src/app_controller.py must remain
|
||||
# classified after the heuristic tightening (Phase 7 FR5).
|
||||
# Note: src/api_hooks.py functions do NOT have _api_ prefix, so they
|
||||
# were never classified BOUNDARY_FASTAPI; the 13 sites are all in
|
||||
# _api_* handlers in app_controller.py.
|
||||
app_controller_path = ROOT / "src" / "app_controller.py"
|
||||
if not app_controller_path.exists():
|
||||
pytest.skip(f"{app_controller_path} not found")
|
||||
report = audit_file(app_controller_path)
|
||||
fastapi_sites = [f for f in report.findings if f.category == "BOUNDARY_FASTAPI"]
|
||||
assert len(fastapi_sites) >= 10, (
|
||||
f"Phase 7 regression: expected at least 10 BOUNDARY_FASTAPI sites in "
|
||||
f"src/app_controller.py, got {len(fastapi_sites)}. The known sites "
|
||||
f"must remain classified after heuristic tightening."
|
||||
)
|
||||
src = app_controller_path.read_text(encoding="utf-8")
|
||||
for site in fastapi_sites[:3]:
|
||||
lines = src.split("\n")
|
||||
line_num = site.line
|
||||
window = "\n".join(lines[max(0, line_num - 5):line_num + 5])
|
||||
assert "HTTPException" in window or "Result[" in window, (
|
||||
f"Phase 7 regression: site at app_controller.py:{line_num} "
|
||||
f"classified BOUNDARY_FASTAPI but window doesn't contain "
|
||||
f"HTTPException or Result["
|
||||
)
|
||||
|
||||
|
||||
def test_phase7_migrated_sites_no_longer_silent_swallow():
|
||||
# L242/L256/L5064/L5093 must not be INTERNAL_SILENT_SWALLOW after Phase 7.
|
||||
app_controller_path = ROOT / "src" / "app_controller.py"
|
||||
if not app_controller_path.exists():
|
||||
pytest.skip(f"{app_controller_path} not found")
|
||||
report = audit_file(app_controller_path)
|
||||
for f in report.findings:
|
||||
if f.line in (242, 256, 5064, 5093):
|
||||
assert f.category != "INTERNAL_SILENT_SWALLOW", (
|
||||
f"Phase 7 regression: L{f.line} should not be "
|
||||
f"INTERNAL_SILENT_SWALLOW after migration; got {f.category}"
|
||||
)
|
||||
Reference in New Issue
Block a user