Authoring Structure

Enabling users to write tasks and workflows is the core feature of flytekit, it is why it exists. This document goes over how some of the internals work.

Background

Please see the design doc.

Types and Type Engine

Flyte has its own type system, which is codified in the IDL. Python of course has its own typing system, even though it’s a dynamic language, and is mostly explained in PEP 484. In order to work properly, flytekit needs to be able to convert between the two.

Type Engine

The primary way this happens is through the flytekit.extend.TypeEngine. This engine works by invoking a series of TypeTransformers. Each transformer is responsible for providing the functionality that the engine needs for a given native Python type.

Callable Entities

Tasks, workflows, and launch plans form the core of the Flyte user experience. Each of these concepts are backed by one or more Python classes. These classes in turn, are instantiated by decorators (in the case of tasks and workflow) or a normal Python call (in the case of launch plans).

Tasks

This is the current task class hierarchy.

Inheritance diagram of flytekit.core.python_function_task.PythonFunctionTask, flytekit.core.python_function_task.PythonInstanceTask, flytekit.extras.sqlite3.task.SQLite3Task

Please see the documentation on each of the classes for details.

class flytekit.core.base_task.Task(task_type, name, interface=None, metadata=None, task_type_version=0, security_ctx=None, **kwargs)[source]

The base of all Tasks in flytekit. This task is closest to the FlyteIDL TaskTemplate and captures information in FlyteIDL specification and does not have python native interfaces associated. Refer to the derived classes for examples of how to extend this class.

Parameters
class flytekit.core.base_task.PythonTask(*args, **kwargs)[source]

Base Class for all Tasks with a Python native Interface. This should be directly used for task types, that do not have a python function to be executed. Otherwise refer to flytekit.PythonFunctionTask.

Parameters
  • task_type (str) – defines a unique task-type for every new extension. If a backend plugin is required then this has to be done in-concert with the backend plugin identifier

  • name (str) – A unique name for the task instantiation. This is unique for every instance of task.

  • task_config (T) – Configuration for the task. This is used to configure the specific plugin that handles this task

  • interface (Optional[Interface]) – A python native typed interface (inputs) -> outputs that declares the signature of the task

  • environment (Optional[Dict[str, str]]) – Any environment variables that should be supplied during the execution of the task. Supplied as a dictionary of key/value pairs

class flytekit.core.python_auto_container.PythonAutoContainerTask(*args, **kwargs)[source]

A Python AutoContainer task should be used as the base for all extensions that want the user’s code to be in the container and the container information to be automatically captured. This base will auto configure the image and image version to be used for all its derivatives.

If you are looking to extend, you might prefer to use PythonFunctionTask or PythonInstanceTask

Parameters
  • name – unique name for the task, usually the function’s module and name.

  • task_config – Configuration object for Task. Should be a unique type for that specific Task

  • task_type – String task type to be associated with this Task

  • container_image – String FQN for the image.

  • requests – custom resource request settings.

  • limits – custom resource limit settings.

  • environment – Environment variables you want the task to have when run.

  • task_resolver – Custom resolver - will pick up the default resolver if empty, or the resolver set in the compilation context if one is set.

  • secret_requests (List[Secret]) –

    Secrets that are requested by this container execution. These secrets will be mounted based on the configuration in the Secret and available through the SecretManager using the name of the secret as the group Ideally the secret keys should also be semi-descriptive. The key values will be available from runtime, if the backend is configured to provide secrets and if secrets are available in the configured secrets store. Possible options for secret stores are

class flytekit.core.python_function_task.PythonFunctionTask(*args, **kwargs)[source]

A Python Function task should be used as the base for all extensions that have a python function. It will automatically detect interface of the python function and when serialized on the hosted Flyte platform handles the writing execution command to execute the function

It is advised this task is used using the @task decorator as follows

In the above code, the name of the function, the module, and the interface (inputs = int and outputs = str) will be auto detected.

Parameters
  • task_config (T) – Configuration object for Task. Should be a unique type for that specific Task

  • task_function (Callable) – Python function that has type annotations and works for the task

  • ignore_input_vars (Optional[List[str]]) – When supplied, these input variables will be removed from the interface. This can be used to inject some client side variables only. Prefer using ExecutionParams

  • execution_mode (Optional[ExecutionBehavior]) – Defines how the execution should behave, for example executing normally or specially handling a dynamic case.

  • task_type (Optional[TaskResolverMixin]) – String task type to be associated with this Task

Workflows

There are two workflow classes, which both inherit from the WorkflowBase class.

class flytekit.core.workflow.PythonFunctionWorkflow(*args, **kwargs)[source]

Please read Workflows first for a high-level understanding of what workflows are in Flyte. This Python object represents a workflow defined by a function and decorated with the @workflow decorator. Please see notes on that object for additional information.

class flytekit.core.workflow.ImperativeWorkflow(name, failure_policy=None, interruptible=False)[source]

An imperative workflow is a programmatic analogue to the typical @workflow function-based workflow and is better suited to programmatic applications.

Assuming you have some tasks like so

@task
def t1(a: str) -> str:
    return a + " world"

@task
def t2():
    print("side effect")

You could create a workflow imperatively like so

# Create the workflow with a name. This needs to be unique within the project and takes the place of the function
# name that's used for regular decorated function-based workflows.
wb = Workflow(name="my_workflow")
# Adds a top level input to the workflow. This is like an input to a workflow function.
wb.add_workflow_input("in1", str)
# Call your tasks.
node = wb.add_entity(t1, a=wb.inputs["in1"])
wb.add_entity(t2)
# This is analogous to a return statement
wb.add_workflow_output("from_n0t1", node.outputs["o0"])

This workflow would be identical on the back-end to

nt = typing.NamedTuple("wf_output", from_n0t1=str)

@workflow
def my_workflow(in1: str) -> nt:
    x = t1(a=in1)
    t2()
    return nt(
        x,
    )

Note that the only reason we need the NamedTuple is so we can name the output the same thing as in the imperative example. The imperative paradigm makes the naming of workflow outputs easier, but this isn’t a big deal in function-workflows because names tend to not be necessary.

Parameters

Launch Plan

There is also only one LaunchPlan class.

class flytekit.core.launch_plan.LaunchPlan(name, workflow, parameters, fixed_inputs, schedule=None, notifications=None, labels=None, annotations=None, raw_output_data_config=None, auth_role=None, max_parallelism=None)[source]

Launch Plans are one of the core constructs of Flyte. Please take a look at the discussion in the core concepts if you are unfamiliar with them.

Every workflow is registered with a default launch plan, which is just a launch plan with none of the additional attributes set - no default values, fixed values, schedules, etc. Assuming you have the following workflow

@workflow
def wf(a: int, c: str) -> str:
    ...

Create the default launch plan with

LaunchPlan.get_or_create(workflow=my_wf)

If you specify additional parameters, you’ll also have to give the launch plan a unique name. Default and fixed inputs can be expressed as Python native values like so:

launch_plan.LaunchPlan.get_or_create(
    workflow=wf, name="your_lp_name_1", default_inputs={"a": 3}, fixed_inputs={"c": "4"}
)

Additionally, a launch plan can be configured to run on a schedule and emit notifications.

Please see the relevant Schedule and Notification objects as well.

To configure the remaining parameters, you’ll need to import the relevant model objects as well.

sched = CronSchedule("* * ? * * *", kickoff_time_input_arg="abc")
email_notif = notification.Email(
    phases=[_execution_model.WorkflowExecutionPhase.SUCCEEDED], recipients_email=["my-team@email.com"]
)
launch_plan.LaunchPlan.get_or_create(
    workflow=wf, name="your_lp_name_2", schedule=sched, notifications=[email_notif]
)
from flytekit.models.common import Annotations, AuthRole, Labels, RawOutputDataConfig

Then use as follows

auth_role_model = AuthRole(assumable_iam_role="my:iam:role")
launch_plan.LaunchPlan.get_or_create(
    workflow=wf,
    name="your_lp_name_3",
)

labels_model = Labels({"label": "foo"})
annotations_model = Annotations({"annotate": "bar"})
launch_plan.LaunchPlan.get_or_create(
    workflow=wf,
    name="your_lp_name_4",
    auth_role=auth_role_model,
    labels=labels_model,
    annotations=annotations_model,
)

raw_output_data_config = RawOutputDataConfig("s3://foo/output")
launch_plan.LaunchPlan.get_or_create(
    workflow=wf, name="your_lp_name_5", raw_output_data_config=raw_output_data_config
)
Parameters
  • name (str) –

  • workflow (_annotated_workflow.WorkflowBase) –

  • parameters (_interface_models.ParameterMap) –

  • fixed_inputs (_literal_models.LiteralMap) –

  • schedule (_schedule_model.Schedule) –

  • notifications (List[_admin_common.Notification]) –

  • labels (_admin_common.Labels) –

  • annotations (_admin_common.Annotations) –

  • raw_output_data_config (_admin_common.RawOutputDataConfig) –

  • auth_role (flytekit.models.admin.common.AuthRole) –

  • max_parallelism (int) –

Exception Handling

Exception handling is done along two dimensions

  • System vs User: We try to differentiate between user exceptions and flytekit/system level exceptions. For instance, if flytekit fails to upload its outputs, that’s a system exception. If you the user raise a ValueError because of unexpected input in the task code, that’s a user exception.

  • Recoverable vs Non-recoverable: Recoverable errors will be retried and count against your task’s retry count. Non-recoverable errors will just fail. System exceptions are by default recoverable (since there’s a good chance it was just a blip).

This is the user exception tree. Feel free to raise any of these exception classes. Note that the FlyteRecoverableException is the only recoverable one. All others, along with all non-flytekit defined exceptions, are non-recoverable.

Inheritance diagram of flytekit.common.exceptions.user.FlyteValidationException, flytekit.common.exceptions.user.FlyteEntityAlreadyExistsException, flytekit.common.exceptions.user.FlyteValueException, flytekit.common.exceptions.user.FlyteTimeout, flytekit.common.exceptions.user.FlyteAuthenticationException, flytekit.common.exceptions.user.FlyteRecoverableException

Implementation

For those that want to dig a bit deeper, take a look at the flytekit.common.exceptions.scopes.FlyteScopedException classes. There are also two decorators which you’ll find interspersed throughout the codebase.

flytekit.common.exceptions.scopes.system_entry_point(wrapper=None, enabled=None, adapter=None, proxy=<class 'FunctionWrapper'>)[source]

The reason these two (see the user one below) decorators exist is to categorize non-Flyte exceptions at arbitrary locations. For example, while there is a separate ecosystem of Flyte-defined user and system exceptions (see the FlyteException hierarchy), and we can easily understand and categorize those, if flytekit comes upon a random ValueError or other non-flytekit defined error, how would we know if it was a bug in flytekit versus an error with user code or something the user called? The purpose of these decorators is to categorize those (see the last case in the nested try/catch below.

Decorator for wrapping functions that enter a system context. This should decorate every method that may invoke some user code later on down the line. This will allow us to add differentiation between what is a user error and what is a system failure. Furthermore, we will clean the exception trace so as to make more sense to the user – allowing them to know if they should take action themselves or pass on to the platform owners. We will dispatch metrics and such appropriately.

flytekit.common.exceptions.scopes.user_entry_point(wrapper=None, enabled=None, adapter=None, proxy=<class 'FunctionWrapper'>)[source]

See the comment for the system_entry_point above as well.

Decorator for wrapping functions that enter into a user context. This will help us differentiate user-created failures even when it is re-entrant into system code.

Note: a user_entry_point can ONLY ever be called from within a @system_entry_point wrapped function, therefore, we can always ensure we will hit a system_entry_point to correctly reformat our exceptions. Also, any exception we create here will only be handled within our system code so we don’t need to worry about leaking weird exceptions to the user.

Call Patterns

The three entities above are all callable. In Flyte terms that means they can be invoked to yield a unit (or units) of work. In Python terms that means you can add () to the end of one of it which invokes the __call__ method on the object.

What happens when a callable entity is called depends on the current context, specifically the current flytekit.FlyteContext

Raw Task Execution

This is what happens when a task is just run as part of a unit test. The @task decorator actually turns the decorated function into an instance of the PythonFunctionTask object but when a user calls it, task1(), outside of a workflow, the original function is called without interference by flytekit.

Task Execution Inside Workflow

This is what happens, to the task when a workflow is being run locally, say as part of a unit test for the workflow.

Before going further, there is a special object that’s worth mentioning, the flytekit.extend.Promise.

class flytekit.core.promise.Promise(var, val)[source]

This object is a wrapper and exists for three main reasons. Let’s assume we’re dealing with a task like

@task
def t1() -> (int, str): ...
  1. Handling the duality between compilation and local execution - when the task function is run in a local execution mode inside a workflow function, a Python integer and string are produced. When the task is being compiled as part of the workflow, the task call creates a Node instead, and the task returns two Promise objects that point to that Node.

  2. One needs to be able to call

    x = t1().with_overrides(...)
    

    If the task returns an integer or a (int, str) tuple like t1 above, calling with_overrides on the result would throw an error. This Promise object adds that.

  3. Assorted handling for conditionals.

Parameters
  • var (str) –

  • val (Union[NodeOutput, _literal_models.Literal]) –

Let’s assume we have a workflow like

@task
def t1(a: int) -> (int, str):
    return a + 2, "world"

@task
def t2(a: str, b: str) -> str:
    return b + a

@workflow
def my_wf(a: int, b: str) -> (int, str):
    x, y = t1(a=a).with_overrides(...)
    d = t2(a=y, b=b)
    return x, d

As discussed in the Promise object’s documentation, when a task is called from inside a workflow, the Python native values that the raw underlying functions return are first converted into Flyte IDL literals, and then wrapped inside Promise objects. One Promise is created for every return variable.

When the next task is called, logic is triggered to unwrap these promises.

Compilation

When a workflow is compiled, instead of producing promise objects that wrap literal values, they wrap a flytekit.core.promise.NodeOutput instead. This is how data dependency is tracked between tasks.

Branch Skip

If it’s been determined that a conditional is not true, then flytekit will skip actually calling the task which means that any side-effects in the task logic will not be run.

Note

Even though in the discussion above, we talked about a task’s execution pattern, the same actually applied to workflows and launch plans.