import sys import os from ast import ( AsyncFunctionDef, FunctionDef, ) from collections import deque from types import FrameType from typing import Any, Callable, Optional, cast from defer.errors import FreeVarsError from defer.sugar.transformer import RewriteDefer def _is_dunder(name: str) -> bool: return name.startswith("__") and name.endswith("__") _SITE_PACKAGES = "site-packages" _VENV = ".venv" _THIRDPARTY = "thirdparty" _SRC_WIN = "\\src\\" _SRC_UNIX = "/src/" def _is_our_code(filename: str) -> bool: if not filename: return False if _SRC_WIN in filename or _SRC_UNIX in filename: return True return False class _ParseDefer: IDENTITY = lambda *_: None # noqa: E731 def __init__(self, tracefn: Optional[Callable]) -> None: self.tracefn = tracefn or self.IDENTITY self.pending: deque[Executing] = deque() def __call__(self, frame: FrameType, event: str, arg: Any): self.tracefn(frame, event, arg) try: filename = frame.f_code.co_filename if not filename: return self if _SITE_PACKAGES in filename or _VENV in filename or _THIRDPARTY in filename: return self if not _is_our_code(filename): return self except Exception: pass if event != "line": return self try: from executing.executing import Executing, Source except ImportError: return self try: exc = Source.executing(frame) except Exception: return self if not (stmt := next(iter(exc.statements), None)): return self if isinstance(stmt, (AsyncFunctionDef, FunctionDef)): if _is_dunder(stmt.name): return self self.pending.append(exc) return self if not self.pending or frame.f_back is not self.pending[-1].frame.f_back: return self stmts = self.pending.pop().statements node = cast(FunctionDef | AsyncFunctionDef, next(iter(stmts))) fn_name = node.name if fn_name not in frame.f_locals: return self fn = frame.f_locals[fn_name] if not callable(fn): return self try: code = fn.__code__ except AttributeError: return self try: mod = fn.__module__ except AttributeError: return self if mod in sys.stdlib_module_names: return self if code.co_freevars: return self if not (ast := RewriteDefer.transform(node)): return self locals = frame.f_locals.copy() del locals[fn_name] exec(compile(ast, frame.f_code.co_filename, "exec"), frame.f_globals, locals) frame.f_locals[fn_name].__code__ = locals[fn_name].__code__ return self