Files
manual_slop/thirdparty/defer/sugar/_parse.py
T
2026-05-13 05:55:46 -04:00

105 lines
2.9 KiB
Python

import sys
from ast import (
AsyncFunctionDef,
FunctionDef,
)
from collections import deque
from os.path import isabs
from types import FrameType
from typing import Any, Callable, Optional, cast
from defer.errors import FreeVarsError
from defer.sugar.transformer import RewriteDefer
def _is_dunder(name: str) -> bool:
return name.startswith("__") and name.endswith("__")
def _is_excluded_path(filename: str) -> bool:
if not filename:
return True
return (
filename.startswith(sys.base_exec_prefix)
or filename.startswith(sys.base_prefix)
or filename.startswith(sys.exec_prefix)
or filename.startswith(sys.prefix)
or ".venv" in filename
or filename.startswith("<frozen ")
or filename.startswith("<string>")
or filename.startswith("<pytest ")
or "thirdparty" in filename
or "__future__" in filename
)
class _ParseDefer:
IDENTITY = lambda *_: None # noqa: E731
def __init__(self, tracefn: Optional[Callable]) -> None:
self.tracefn = tracefn or self.IDENTITY
self.pending: deque[Executing] = deque()
def __call__(self, frame: FrameType, event: str, arg: Any):
try:
self._do_call(frame, event, arg)
except Exception:
pass
return self
def _do_call(self, frame: FrameType, event: str, arg: Any):
self.tracefn(frame, event, arg)
try:
filename = frame.f_code.co_filename
except Exception:
return
if _is_excluded_path(filename):
return
if not isabs(filename):
return
if event != "line":
return
try:
from executing.executing import Executing, Source
except ImportError:
return
try:
exc = Source.executing(frame)
except Exception:
return
if not (stmt := next(iter(exc.statements), None)):
return
if isinstance(stmt, (AsyncFunctionDef, FunctionDef)):
if _is_dunder(stmt.name):
return
self.pending.append(exc)
return
if not self.pending or frame.f_back is not self.pending[-1].frame.f_back:
return
stmts = self.pending.pop().statements
node = cast(FunctionDef | AsyncFunctionDef, next(iter(stmts)))
fn_name = node.name
if fn_name not in frame.f_locals:
return
fn = frame.f_locals[fn_name]
if fn.__module__ in sys.stdlib_module_names:
return
if fn.__code__.co_freevars:
raise FreeVarsError(fn)
if not (ast := RewriteDefer.transform(node)):
return
locals = frame.f_locals.copy()
del locals[fn_name]
exec(compile(ast, frame.f_code.co_filename, "exec"), frame.f_globals, locals)
frame.f_locals[fn_name].__code__ = locals[fn_name].__code__