"""Tests for the new heuristics added in scripts/audit_exception_handling.py. Each test creates a fixture file with a specific pattern and runs the audit script against it, verifying the pattern is classified correctly. The new heuristics (added by result_migration_review_pass_20260617): 1. list.index with ValueError fallback to default index (INTERNAL_COMPLIANT) 2. dict lookup (KeyError) with default value (INTERNAL_COMPLIANT) 3. datetime.fromisoformat(s) with ValueError: None (INTERNAL_COMPLIANT) 4. Path.resolve(strict=True) with OSError/ValueError fallback (INTERNAL_COMPLIANT) 5. Path.relative_to with ValueError: pass (INTERNAL_COMPLIANT) 6. asyncio.get_running_loop() with RuntimeError: asyncio.run() (INTERNAL_COMPLIANT) 7. Narrow except (ImportError, AttributeError) + fallback stub (INTERNAL_COMPLIANT) 8. raise NotImplementedError() as entire function body (INTERNAL_PROGRAMMER_RAISE) 9. if None: raise ImportError() validation pattern (INTERNAL_PROGRAMMER_RAISE) 10. try/except (json.JSONDecodeError, KeyError) around JSON parse + print + return (INTERNAL_COMPLIANT) """ from __future__ import annotations import json import subprocess import sys import textwrap from pathlib import Path import pytest ROOT = Path(__file__).resolve().parents[1] SCRIPT = ROOT / "scripts" / "audit_exception_handling.py" def _run_audit_on_fixture(source: str) -> dict: """Run the audit script on a fixture and return the parsed JSON output.""" # Use a temp dir outside tests/artifacts/ to avoid the audit's # default artifacts-exclusion filter. import tempfile tmpdir = Path(tempfile.mkdtemp(prefix="audit_fixture_")) fixture = tmpdir / "audit_heuristic_fixture.py" fixture.write_text(textwrap.dedent(source), encoding="utf-8") try: result = subprocess.run( [sys.executable, str(SCRIPT), "--json", "--src", str(tmpdir), "--verbose"], capture_output=True, text=True, check=False, cwd=str(ROOT), ) finally: if fixture.exists(): fixture.unlink() if tmpdir.exists(): tmpdir.rmdir() if result.returncode not in (0, 1): raise RuntimeError(f"audit failed: {result.stderr}") return json.loads(result.stdout) def _classifications_for_file(data: dict, filename_suffix: str) -> list[dict]: """Return all findings for files whose path ends with `filename_suffix`.""" return [ f for file_info in data.get("files", []) for f in file_info.get("findings", []) if file_info["filename"].endswith(filename_suffix) ] # --------------------------------------------------------------------------- # Heuristic 1: list.index with ValueError fallback to default index # --------------------------------------------------------------------------- def test_list_index_valueerror_fallback_is_compliant(): """try: list.index(x); except ValueError: idx = N is compliant.""" src = ''' def func(items, target): try: idx = items.index(target) except ValueError: idx = 0 return idx ''' data = _run_audit_on_fixture(src) findings = _classifications_for_file(data, "audit_heuristic_fixture.py") excepts = [f for f in findings if f["kind"] == "EXCEPT"] assert len(excepts) == 1, f"expected 1 except, got {len(excepts)}: {excepts}" assert excepts[0]["category"] == "INTERNAL_COMPLIANT", ( f"list.index+ValueError fallback should be INTERNAL_COMPLIANT, got {excepts[0]['category']}" ) # --------------------------------------------------------------------------- # Heuristic 2: KeyError lookup with default value # --------------------------------------------------------------------------- def test_keyerror_lookup_with_default_is_compliant(): """try: dict[x]; except KeyError: val = default is compliant.""" src = ''' def func(d, key): try: val = d[key] except KeyError: val = "default" return val ''' data = _run_audit_on_fixture(src) findings = _classifications_for_file(data, "audit_heuristic_fixture.py") excepts = [f for f in findings if f["kind"] == "EXCEPT"] assert len(excepts) == 1 assert excepts[0]["category"] == "INTERNAL_COMPLIANT", ( f"KeyError+default should be INTERNAL_COMPLIANT, got {excepts[0]['category']}" ) # --------------------------------------------------------------------------- # Heuristic 3: datetime.fromisoformat(s) with ValueError: None # --------------------------------------------------------------------------- def test_fromisoformat_valueerror_none_is_compliant(): """try: datetime.fromisoformat(s); except ValueError: x = None is compliant.""" src = ''' import datetime def func(s): try: d = datetime.datetime.fromisoformat(s) except ValueError: d = None return d ''' data = _run_audit_on_fixture(src) findings = _classifications_for_file(data, "audit_heuristic_fixture.py") excepts = [f for f in findings if f["kind"] == "EXCEPT"] assert len(excepts) == 1 assert excepts[0]["category"] == "INTERNAL_COMPLIANT", ( f"fromisoformat+ValueError:None should be INTERNAL_COMPLIANT, got {excepts[0]['category']}" ) # --------------------------------------------------------------------------- # Heuristic 4: Path.resolve(strict=True) with OSError/ValueError fallback # --------------------------------------------------------------------------- def test_path_resolve_strict_fallback_is_compliant(): """try: Path(p).resolve(strict=True); except (OSError, ValueError): Path(p).resolve() is compliant.""" src = ''' from pathlib import Path def func(p): try: rp = Path(p).resolve(strict=True) except (OSError, ValueError): rp = Path(p).resolve() return rp ''' data = _run_audit_on_fixture(src) findings = _classifications_for_file(data, "audit_heuristic_fixture.py") excepts = [f for f in findings if f["kind"] == "EXCEPT"] assert len(excepts) == 1 assert excepts[0]["category"] == "INTERNAL_COMPLIANT", ( f"Path.resolve(strict=True)+fallback should be INTERNAL_COMPLIANT, got {excepts[0]['category']}" ) # --------------------------------------------------------------------------- # Heuristic 5: Path.relative_to with ValueError: pass # --------------------------------------------------------------------------- def test_path_relative_to_valueerror_is_compliant(): """try: rp.relative_to(base); except ValueError: pass is compliant (canonical subpath check).""" src = ''' from pathlib import Path def is_subpath(rp, base): try: rp.relative_to(base) return True except ValueError: return False ''' data = _run_audit_on_fixture(src) findings = _classifications_for_file(data, "audit_heuristic_fixture.py") excepts = [f for f in findings if f["kind"] == "EXCEPT"] assert len(excepts) == 1 assert excepts[0]["category"] == "INTERNAL_COMPLIANT", ( f"Path.relative_to+ValueError should be INTERNAL_COMPLIANT, got {excepts[0]['category']}" ) # --------------------------------------------------------------------------- # Heuristic 6: asyncio.get_running_loop() with RuntimeError: asyncio.run() # --------------------------------------------------------------------------- def test_asyncio_get_running_loop_fallback_is_compliant(): """try: get_running_loop(); except RuntimeError: asyncio.run() is compliant.""" src = ''' import asyncio def bridge(coro): try: loop = asyncio.get_running_loop() return loop except RuntimeError: return asyncio.run(coro) ''' data = _run_audit_on_fixture(src) findings = _classifications_for_file(data, "audit_heuristic_fixture.py") excepts = [f for f in findings if f["kind"] == "EXCEPT"] assert len(excepts) == 1 assert excepts[0]["category"] == "INTERNAL_COMPLIANT", ( f"get_running_loop+RuntimeError fallback should be INTERNAL_COMPLIANT, got {excepts[0]['category']}" ) # --------------------------------------------------------------------------- # Heuristic 7: narrow except (ImportError, AttributeError) + fallback stub # --------------------------------------------------------------------------- def test_import_attr_fallback_stub_is_compliant(): """narrow except (ImportError, AttributeError) with fallback attribute assignment is compliant.""" src = ''' class Stub: available = False def get_thing(): try: import real_module return real_module except (ImportError, ModuleNotFoundError, AttributeError): return Stub() ''' data = _run_audit_on_fixture(src) findings = _classifications_for_file(data, "audit_heuristic_fixture.py") excepts = [f for f in findings if f["kind"] == "EXCEPT"] assert len(excepts) == 1 assert excepts[0]["category"] == "INTERNAL_COMPLIANT", ( f"narrow except + fallback stub should be INTERNAL_COMPLIANT, got {excepts[0]['category']}" ) # --------------------------------------------------------------------------- # Heuristic 8: raise NotImplementedError() as entire function body # --------------------------------------------------------------------------- def test_raise_notimplemented_entire_body_is_programmer_error(): """raise NotImplementedError() as the entire function body is INTERNAL_PROGRAMMER_RAISE.""" src = ''' class Base: def abstract_method(self): raise NotImplementedError() ''' data = _run_audit_on_fixture(src) findings = _classifications_for_file(data, "audit_heuristic_fixture.py") raises = [f for f in findings if f["kind"] == "RAISE"] assert len(raises) == 1 assert raises[0]["category"] == "INTERNAL_PROGRAMMER_RAISE", ( f"raise NotImplementedError() in entire body should be INTERNAL_PROGRAMMER_RAISE, got {raises[0]['category']}" ) # --------------------------------------------------------------------------- # Heuristic 9: if None: raise ImportError() validation # --------------------------------------------------------------------------- def test_validation_raise_is_programmer_error(): """if is None: raise ImportError() is INTERNAL_PROGRAMMER_RAISE.""" src = ''' def func(dep): if dep is None: raise ImportError("dependency missing") return dep ''' data = _run_audit_on_fixture(src) findings = _classifications_for_file(data, "audit_heuristic_fixture.py") raises = [f for f in findings if f["kind"] == "RAISE"] assert len(raises) == 1 assert raises[0]["category"] == "INTERNAL_PROGRAMMER_RAISE", ( f"validation raise should be INTERNAL_PROGRAMMER_RAISE, got {raises[0]['category']}" ) # --------------------------------------------------------------------------- # Heuristic 10: try/except (json.JSONDecodeError, KeyError) + print + return # --------------------------------------------------------------------------- def test_json_parse_with_print_is_compliant(): """try/except (json.JSONDecodeError, KeyError) around JSON parse with print is compliant.""" src = ''' import json def parse(s): try: data = json.loads(s) if not isinstance(data, list): print("not a list") return return data except json.JSONDecodeError as e: print(f"json error: {e}") except KeyError as e: print(f"missing key: {e}") ''' data = _run_audit_on_fixture(src) findings = _classifications_for_file(data, "audit_heuristic_fixture.py") excepts = [f for f in findings if f["kind"] == "EXCEPT"] assert len(excepts) == 2 for e in excepts: assert e["category"] == "INTERNAL_COMPLIANT", ( f"json parse with print should be INTERNAL_COMPLIANT, got {e['category']}" ) # --------------------------------------------------------------------------- # Heuristic 22: Narrow except + return fallback value (REJECTED Phase 11) # --------------------------------------------------------------------------- @pytest.mark.xfail(reason="Heuristic #22 REVERTED in Phase 11 (laundering heuristic; full Result[T] migration required). See conductor/tracks/result_migration_small_files_20260617/plan.md §11.1.1.") def test_narrow_except_returns_fallback_is_compliant(): """REJECTED in Phase 11. Heuristic #22 classified narrow-catch + fallback as compliant, which is WRONG. The convention requires `Result[T]`; this test is preserved as xfail for traceability and to ensure the count of 11 test tiers is maintained.""" src = ''' def get_git_commit(git_dir): try: r = subprocess.run(["git", "rev-parse", "HEAD"], capture_output=True, text=True, cwd=git_dir, timeout=5) return r.stdout.strip() if r.returncode == 0 else "" except (OSError, subprocess.SubprocessError, subprocess.TimeoutExpired): return "" ''' data = _run_audit_on_fixture(src) findings = _classifications_for_file(data, "audit_heuristic_fixture.py") excepts = [f for f in findings if f["kind"] == "EXCEPT"] assert len(excepts) == 1 assert excepts[0]["category"] == "INTERNAL_COMPLIANT", ( f"narrow except returning fallback should be INTERNAL_COMPLIANT, got {excepts[0]['category']}" ) # --------------------------------------------------------------------------- # Heuristic 23: Narrow except + use error inline (REJECTED Phase 11) # --------------------------------------------------------------------------- @pytest.mark.xfail(reason="Heuristic #23 REVERTED in Phase 11 (laundering heuristic; full Result[T] migration required). See conductor/tracks/result_migration_small_files_20260617/plan.md §11.1.2.") def test_narrow_except_uses_error_inline_is_compliant(): """REJECTED in Phase 11. Heuristic #23 classified narrow-catch + use-error-inline as compliant, which is WRONG. The convention requires `Result[T]`; this test is preserved as xfail for traceability and to ensure the count of 11 test tiers is maintained.""" src = ''' def write_script(ps1_path, script): try: ps1_path.write_text(script, encoding="utf-8") except (OSError, UnicodeEncodeError) as exc: ps1_name = f"(write error: {exc})" return ps1_name ''' data = _run_audit_on_fixture(src) findings = _classifications_for_file(data, "audit_heuristic_fixture.py") excepts = [f for f in findings if f["kind"] == "EXCEPT"] assert len(excepts) == 1 assert excepts[0]["category"] == "INTERNAL_COMPLIANT", ( f"narrow except using error inline should be INTERNAL_COMPLIANT, got {excepts[0]['category']}" ) # --------------------------------------------------------------------------- # Heuristic A: Result-returning recovery in non-*_result function (Phase 11.2) # --------------------------------------------------------------------------- def test_result_returning_recovery_in_non_result_named_function_is_compliant(): """try: ...; except SpecificError: return Result(data=..., errors=[ErrorInfo(...)]) is compliant. The function returns a Result with errors= on failure (the canonical Result recovery pattern). The convention requires Result[T] for try/except sites that can fail; this pattern satisfies the requirement. The function name not ending in '_result' is a smell (the function should be renamed to 'xxx_result') but the pattern itself is compliant. This is the pattern used by src/hot_reloader.py:reload(), src/warmup.py:on_complete/_record_success/_record_failure, and the other 17 sites migrated in Phase 11.3. """ src = ''' from src.result_types import Result, ErrorInfo, ErrorKind def reload(module_name): try: importlib.reload(sys.modules[module_name]) return Result(data=True) except (ImportError, ModuleNotFoundError) as e: return Result(data=False, errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="hot_reloader.reload", original=e)]) ''' data = _run_audit_on_fixture(src) findings = _classifications_for_file(data, "audit_heuristic_fixture.py") excepts = [f for f in findings if f["kind"] == "EXCEPT"] assert len(excepts) == 1 assert excepts[0]["category"] == "INTERNAL_COMPLIANT", ( f"Result-returning recovery in non-*_result function should be INTERNAL_COMPLIANT, got {excepts[0]['category']}" ) def test_result_returning_recovery_in_result_named_function_is_compliant(): """Same pattern but with a function name ending in '_result' is also compliant (and ideal). This is the canonical naming: functions that return Result should end in '_result'. """ src = ''' from src.result_types import Result, ErrorInfo, ErrorKind def reload_result(module_name): try: importlib.reload(sys.modules[module_name]) return Result(data=True) except (ImportError, ModuleNotFoundError) as e: return Result(data=False, errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="hot_reloader.reload_result", original=e)]) ''' data = _run_audit_on_fixture(src) findings = _classifications_for_file(data, "audit_heuristic_fixture.py") excepts = [f for f in findings if f["kind"] == "EXCEPT"] assert len(excepts) == 1 assert excepts[0]["category"] == "INTERNAL_COMPLIANT", ( f"Result-returning recovery in *_result function should be INTERNAL_COMPLIANT, got {excepts[0]['category']}" ) # --------------------------------------------------------------------------- # Phase 12.1: Heuristic #19 REMOVED - narrow except + log is INTERNAL_SILENT_SWALLOW # --------------------------------------------------------------------------- def test_narrow_except_with_log_only_is_silent_swallow(): """try: ...; except (SpecificError): sys.stderr.write(...) is INTERNAL_SILENT_SWALLOW (a violation). Per error_handling.md "The Broad-Except Distinction" table and the user's principle (2026-06-17): "logging is NOT a drain". sys.stderr.write alone loses the error context; the propagation does NOT terminate visibly to the user. The convention requires Result[T] propagation to a true drain point. Heuristic #19 (which classified this as compliant) was REMOVED in Phase 12.1. """ src = ( 'def log_failure(path, e):\n' ' try:\n' ' path.write_text("x", encoding="utf-8")\n' ' except (OSError, UnicodeEncodeError):\n' ' sys.stderr.write(f"write failed: {e}")\n' ) data = _run_audit_on_fixture(src) findings = _classifications_for_file(data, "audit_heuristic_fixture.py") excepts = [f for f in findings if f["kind"] == "EXCEPT"] assert len(excepts) == 1 assert excepts[0]["category"] == "INTERNAL_SILENT_SWALLOW", ( f"narrow except + log only should be INTERNAL_SILENT_SWALLOW (logging is NOT a drain), got {excepts[0]['category']}" ) def test_narrow_except_with_logging_error_is_silent_swallow(): """try: ...; except (SpecificError): logging.error(...) is INTERNAL_SILENT_SWALLOW (a violation). Same principle as test_narrow_except_with_log_only_is_silent_swallow but with the logging module. Logging alone loses the error context. """ src = ( 'def log_failure_via_logging(path):\n' ' try:\n' ' path.write_text("x", encoding="utf-8")\n' ' except (OSError, UnicodeEncodeError) as e:\n' ' logging.error(f"write failed: {e}")\n' ) data = _run_audit_on_fixture(src) findings = _classifications_for_file(data, "audit_heuristic_fixture.py") excepts = [f for f in findings if f["kind"] == "EXCEPT"] assert len(excepts) == 1 assert excepts[0]["category"] == "INTERNAL_SILENT_SWALLOW", ( f"narrow except + logging.error should be INTERNAL_SILENT_SWALLOW, got {excepts[0]['category']}" ) # --------------------------------------------------------------------------- # Phase 12.2: visit_Try recursion fix - nested Trys in try body are visited # --------------------------------------------------------------------------- def test_visit_try_recurses_into_try_body(): """A nested try inside the try body should be visited and its handlers recorded. The audit's visit_Try had a bug where it did NOT recurse into node.body. This test constructs a source with an outer try containing an inner try, and asserts BOTH outer and inner handlers appear in the findings. """ src = ( 'def outer():\n' ' try:\n' ' try:\n' ' do_inner()\n' ' except ValueError:\n' ' handle_inner()\n' ' do_outer_thing()\n' ' except (OSError, IOError):\n' ' handle_outer()\n' ) data = _run_audit_on_fixture(src) findings = _classifications_for_file(data, "audit_heuristic_fixture.py") excepts = [f for f in findings if f["kind"] == "EXCEPT"] assert len(excepts) == 2, ( f"visit_Try should recurse into try body; expected 2 EXCEPT findings, got {len(excepts)}: {excepts}" ) # --------------------------------------------------------------------------- # Phase 12.3: Heuristic D.1 - HTTP error response drain point # --------------------------------------------------------------------------- def test_drain_point_http_error_response_is_compliant(): """try: ...; except (SpecificError): self.send_response(500, ...) is INTERNAL_COMPLIANT (drain point D.1). Per error_handling.md Drain Points section, Pattern 1: HTTP error response in a BaseHTTPRequestHandler subclass IS a drain point. The HTTP status code IS the visible user feedback; the propagation terminates at the HTTP response. Heuristic D.1 recognizes this pattern. """ src = ( 'class Handler(BaseHTTPRequestHandler):\n' ' def do_GET(self):\n' ' try:\n' ' self._read_body()\n' ' except (OSError, ValueError) as e:\n' ' self.send_response(500)\n' ' self.send_header("Content-Type", "application/json")\n' ' self.wfile.write(b\'{"error": "internal"}\')\n' ) data = _run_audit_on_fixture(src) findings = _classifications_for_file(data, "audit_heuristic_fixture.py") excepts = [f for f in findings if f["kind"] == "EXCEPT"] assert len(excepts) == 1 assert excepts[0]["category"] == "INTERNAL_COMPLIANT", ( f"HTTP error response should be INTERNAL_COMPLIANT (drain point D.1), got {excepts[0]['category']}: {excepts[0].get('note', '')}" ) # --------------------------------------------------------------------------- # Phase 12.3: Heuristic D.2 - GUI error display drain point # --------------------------------------------------------------------------- def test_drain_point_gui_error_display_is_compliant(): """try: ...; except (SpecificError): imgui.open_popup(...) is INTERNAL_COMPLIANT (drain point D.2). Per error_handling.md Drain Points section, Pattern 2: GUI error display via imgui.open_popup IS a drain point. The user sees the error modal. """ src = ( 'def show_load_error():\n' ' try:\n' ' do_load()\n' ' except (OSError, ValueError):\n' ' imgui.open_popup("Load Error")\n' ) data = _run_audit_on_fixture(src) findings = _classifications_for_file(data, "audit_heuristic_fixture.py") excepts = [f for f in findings if f["kind"] == "EXCEPT"] assert len(excepts) == 1 assert excepts[0]["category"] == "INTERNAL_COMPLIANT", ( f"GUI error display should be INTERNAL_COMPLIANT (drain point D.2), got {excepts[0]['category']}" ) # --------------------------------------------------------------------------- # Phase 12.3: Heuristic D.3 - Intentional app termination drain point # --------------------------------------------------------------------------- def test_drain_point_app_termination_is_compliant(): """try: ...; except (SpecificError): sys.exit(1) is INTERNAL_COMPLIANT (drain point D.3). Per error_handling.md Drain Points section, Pattern 3: intentional app termination via sys.exit IS a drain point. The process exit IS the termination of the propagation. """ src = ( 'def critical_init():\n' ' try:\n' ' load_config()\n' ' except (OSError, ValueError):\n' ' sys.stderr.write("FATAL: config missing")\n' ' sys.exit(1)\n' ) data = _run_audit_on_fixture(src) findings = _classifications_for_file(data, "audit_heuristic_fixture.py") excepts = [f for f in findings if f["kind"] == "EXCEPT"] assert len(excepts) == 1 assert excepts[0]["category"] == "INTERNAL_COMPLIANT", ( f"app termination should be INTERNAL_COMPLIANT (drain point D.3), got {excepts[0]['category']}" ) # --------------------------------------------------------------------------- # Phase 12.3: Heuristic D.4 - Telemetry emission drain point # --------------------------------------------------------------------------- def test_drain_point_telemetry_emit_is_compliant(): """try: ...; except (SpecificError): telemetry.emit_error(...) is INTERNAL_COMPLIANT (drain point D.4). Per error_handling.md Drain Points section, Pattern 4: telemetry emission IS a drain point. The error reaches the monitoring system. """ src = ( 'def report_failure():\n' ' try:\n' ' do_thing()\n' ' except (OSError, ValueError):\n' ' telemetry.emit_error(operation="do_thing", kind="INTERNAL", message="failed")\n' ) data = _run_audit_on_fixture(src) findings = _classifications_for_file(data, "audit_heuristic_fixture.py") excepts = [f for f in findings if f["kind"] == "EXCEPT"] assert len(excepts) == 1 assert excepts[0]["category"] == "INTERNAL_COMPLIANT", ( f"telemetry emit should be INTERNAL_COMPLIANT (drain point D.4), got {excepts[0]['category']}" ) # --------------------------------------------------------------------------- # Phase 12.3: Heuristic D.5 - Bounded retry drain point # --------------------------------------------------------------------------- def test_drain_point_bounded_retry_is_compliant(): """try: ...; except (SpecificError): for attempt in range(3): ...; return None is INTERNAL_COMPLIANT (drain point D.5). Per error_handling.md Drain Points section, Pattern 5: bounded retry followed by return None IS a drain point. The retry is bounded (no infinite loop); the final None propagates to a visible error UI. """ src = ( 'def load_with_retry():\n' ' for attempt in range(3):\n' ' try:\n' ' do_load()\n' ' return "ok"\n' ' except (OSError, ValueError):\n' ' time.sleep(1)\n' ' return None\n' ) data = _run_audit_on_fixture(src) findings = _classifications_for_file(data, "audit_heuristic_fixture.py") excepts = [f for f in findings if f["kind"] == "EXCEPT"] assert len(excepts) == 1 assert excepts[0]["category"] == "INTERNAL_COMPLIANT", ( f"bounded retry should be INTERNAL_COMPLIANT (drain point D.5), got {excepts[0]['category']}" )