Source code for flytekit.exceptions.scopes

import os
import sys
import traceback
from functools import wraps as _wraps
from sys import exc_info as _exc_info
from traceback import format_tb as _format_tb

import flytekit
from flytekit.exceptions import base as _base_exceptions
from flytekit.exceptions import system as _system_exceptions
from flytekit.exceptions import user as _user_exceptions
from flytekit.loggers import is_rich_logging_enabled
from flytekit.models.core import errors as _error_model


class FlyteScopedException(Exception):
    def __init__(self, context, exc_type, exc_value, exc_tb, top_trim=0, bottom_trim=0, kind=None):
        self._exc_type = exc_type
        self._exc_value = exc_value
        self._exc_tb = exc_tb
        self._top_trim = top_trim
        self._bottom_trim = bottom_trim
        self._context = context
        self._kind = kind
        super(FlyteScopedException, self).__init__(str(self.value))

    @property
    def verbose_message(self):
        tb = self.traceback
        to_trim = self._top_trim
        while to_trim > 0 and tb.tb_next is not None:
            tb = tb.tb_next

        top_tb = tb
        limit = 0
        while tb is not None:
            limit += 1
            tb = tb.tb_next
        limit = max(0, limit - self._bottom_trim)

        lines = _format_tb(top_tb, limit=limit)
        lines = [line.rstrip() for line in lines]
        lines = "\n".join(lines).split("\n")
        traceback_str = "\n    ".join([""] + lines)

        format_str = "Traceback (most recent call last):\n" "{traceback}\n" "\n" "Message:\n" "\n" "    {message}"
        return format_str.format(traceback=traceback_str, message=f"{self.type.__name__}: {self.value}")

    def __str__(self):
        return str(self.value)

    @property
    def value(self):
        if isinstance(self._exc_value, FlyteScopedException):
            return self._exc_value.value
        return self._exc_value

    @property
    def traceback(self):
        if isinstance(self._exc_value, FlyteScopedException):
            return self._exc_value.traceback
        return self._exc_tb

    @property
    def type(self):
        if isinstance(self._exc_value, FlyteScopedException):
            return self._exc_value.type
        return self._exc_type

    @property
    def error_code(self):
        """
        :rtype: Text
        """
        if isinstance(self._exc_value, FlyteScopedException):
            return self._exc_value.error_code

        if hasattr(type(self._exc_value), "error_code"):
            return type(self._exc_value).error_code
        return "{}:Unknown".format(self._context)

    @property
    def kind(self) -> int:
        """
        :rtype: int
        """
        if self._kind is not None:
            # If kind is overridden, return it.
            return self._kind
        elif isinstance(self._exc_value, FlyteScopedException):
            # Otherwise, go lower in the scope to find the kind of exception.
            return self._exc_value.kind
        elif isinstance(self._exc_value, _base_exceptions.FlyteRecoverableException):
            # If it is an exception that is recoverable, we return it as such.
            return _error_model.ContainerError.Kind.RECOVERABLE
        else:
            # The remaining exceptions are considered unrecoverable.
            return _error_model.ContainerError.Kind.NON_RECOVERABLE


class FlyteScopedSystemException(FlyteScopedException):
    def __init__(self, exc_type, exc_value, exc_tb, **kwargs):
        super(FlyteScopedSystemException, self).__init__("SYSTEM", exc_type, exc_value, exc_tb, **kwargs)

    @property
    def verbose_message(self):
        """
        :rtype: Text
        """
        base_msg = super(FlyteScopedSystemException, self).verbose_message
        base_msg += "\n\nSYSTEM ERROR! Contact platform administrators."
        return base_msg


class FlyteScopedUserException(FlyteScopedException):
    def __init__(self, exc_type, exc_value, exc_tb, **kwargs):
        super(FlyteScopedUserException, self).__init__("USER", exc_type, exc_value, exc_tb, **kwargs)

    @property
    def verbose_message(self):
        """
        :rtype: Text
        """
        base_msg = super(FlyteScopedUserException, self).verbose_message
        base_msg += "\n\nUser error."
        return base_msg


_NULL_CONTEXT = 0
_USER_CONTEXT = 1
_SYSTEM_CONTEXT = 2

# Keep the stack with a null-context so we never have to range check when peeking back.
_CONTEXT_STACK = [_NULL_CONTEXT]


def _is_base_context():
    return _CONTEXT_STACK[-2] == _NULL_CONTEXT


def _decorator(outer_f):
    """Decorate a function with signature func(wrapped, args, kwargs)."""

    @_wraps(outer_f)
    def inner_decorator(inner_f):
        @_wraps(inner_f)
        def f(*args, **kwargs):
            return outer_f(inner_f, args, kwargs)

        return f

    return inner_decorator


[docs] @_decorator def system_entry_point(wrapped, args, kwargs): """ The reason these two (see the user one below) decorators exist is to categorize non-Flyte exceptions at arbitrary locations. For example, while there is a separate ecosystem of Flyte-defined user and system exceptions (see the FlyteException hierarchy), and we can easily understand and categorize those, if flytekit comes upon a random ``ValueError`` or other non-flytekit defined error, how would we know if it was a bug in flytekit versus an error with user code or something the user called? The purpose of these decorators is to categorize those (see the last case in the nested try/catch below. Decorator for wrapping functions that enter a system context. This should decorate every method that may invoke some user code later on down the line. This will allow us to add differentiation between what is a user error and what is a system failure. Furthermore, we will clean the exception trace so as to make more sense to the user -- allowing them to know if they should take action themselves or pass on to the platform owners. We will dispatch metrics and such appropriately. """ try: _CONTEXT_STACK.append(_SYSTEM_CONTEXT) if _is_base_context(): # If this is the first time either of this decorator, or the one below is called, then we unwrap the # exception. The first time these decorators are used is currently in the entrypoint.py file. The scoped # exceptions are unwrapped because at that point, we want to return the underlying error to the user. try: return wrapped(*args, **kwargs) except FlyteScopedException as ex: raise ex.value else: try: return wrapped(*args, **kwargs) except FlyteScopedException as scoped: raise scoped except _user_exceptions.FlyteUserException: # Re-raise from here. raise FlyteScopedUserException(*_exc_info()) except Exception: # This is why this function exists - arbitrary exceptions that we don't know what to do with are # interpreted as system errors. # System error, raise full stack-trace all the way up the chain. raise FlyteScopedSystemException(*_exc_info(), kind=_error_model.ContainerError.Kind.RECOVERABLE) finally: _CONTEXT_STACK.pop()
[docs] @_decorator def user_entry_point(wrapped, args, kwargs): """ See the comment for the system_entry_point above as well. Decorator for wrapping functions that enter into a user context. This will help us differentiate user-created failures even when it is re-entrant into system code. Note: a user_entry_point can ONLY ever be called from within a @system_entry_point wrapped function, therefore, we can always ensure we will hit a system_entry_point to correctly reformat our exceptions. Also, any exception we create here will only be handled within our system code so we don't need to worry about leaking weird exceptions to the user. """ try: _CONTEXT_STACK.append(_USER_CONTEXT) if _is_base_context(): # See comment at this location for system_entry_point fn_name = wrapped.__name__ try: return wrapped(*args, **kwargs) except FlyteScopedException as exc: raise exc.type(f"Error encountered while executing '{fn_name}':\n {exc.value}") from exc except Exception as exc: exc_type, exc_value, tb = sys.exc_info() tb = tb.tb_next # Remove the top frame [wrapped(*args, **kwargs)] from the stack if is_rich_logging_enabled(): from rich.console import Console from rich.traceback import Traceback console = Console() trace = Traceback.extract(exc_type, exc_value, tb) console.print(Traceback(trace)) else: traceback.print_tb(tb, file=sys.stderr) execution_state = flytekit.FlyteContextManager().current_context().execution_state if execution_state.is_local_execution() and os.environ.get("FLYTE_EXIT_ON_USER_EXCEPTION") != "0": exit(1) else: raise type(exc)(f"Error encountered while executing '{fn_name}':\n {exc}") from exc else: try: return wrapped(*args, **kwargs) except FlyteScopedException as scoped: raise scoped except _user_exceptions.FlyteUserException: raise FlyteScopedUserException(*_exc_info()) except _system_exceptions.FlyteSystemException: raise FlyteScopedSystemException(*_exc_info()) except Exception: # This is why this function exists - arbitrary exceptions that we don't know what to do with are # interpreted as user exceptions. # This will also catch FlyteUserException re-raised by the system_entry_point handler raise FlyteScopedUserException(*_exc_info()) finally: _CONTEXT_STACK.pop()