Authoring Structure#

One of the core features of Flytekit is to enable users to write tasks and workflows. In this section, we will understand how it works internally.


Please refer to the design doc.

Types and Type Engine#

Flyte has its own type system, which is codified in the IDL. Python has its own type system despite being a dynamic language, which is primarily explained in PEP 484. Flytekit needs to build a medium to bridge the gap between these two type systems.

Type Engine#

This primariliy happens through the flytekit.extend.TypeEngine. This engine works by invoking a series of TypeTransformers. Each transformer is responsible for providing the functionality that the engine requires for a given native Python type.

Callable Entities#

Tasks, workflows, and launch plans form the core of the Flyte user experience. Each of these concepts is backed by one or more Python classes. These classes in turn, are instantiated by decorators (in the case of tasks and workflow) or a regular Python call (in the case of launch plans).


This is the current task class hierarchy:

Inheritance diagram of flytekit.core.python_function_task.PythonFunctionTask, flytekit.core.python_function_task.PythonInstanceTask, flytekit.extras.sqlite3.task.SQLite3Task

Please see the documentation on each of the classes for details.

class flytekit.core.base_task.Task(task_type, name, interface=None, metadata=None, task_type_version=0, security_ctx=None, **kwargs)[source]

The base of all Tasks in flytekit. This task is closest to the FlyteIDL TaskTemplate and captures information in FlyteIDL specification and does not have python native interfaces associated. Refer to the derived classes for examples of how to extend this class.

class flytekit.core.base_task.PythonTask(*args, **kwargs)[source]

Base Class for all Tasks with a Python native Interface. This should be directly used for task types, that do not have a python function to be executed. Otherwise refer to flytekit.PythonFunctionTask.

  • task_type (str) – defines a unique task-type for every new extension. If a backend plugin is required then this has to be done in-concert with the backend plugin identifier

  • name (str) – A unique name for the task instantiation. This is unique for every instance of task.

  • task_config (T) – Configuration for the task. This is used to configure the specific plugin that handles this task

  • interface (Optional[Interface]) – A python native typed interface (inputs) -> outputs that declares the signature of the task

  • environment (Optional[Dict[str, str]]) – Any environment variables that should be supplied during the execution of the task. Supplied as a dictionary of key/value pairs

  • disable_deck (bool) – If true, this task will not output deck html file

class flytekit.core.python_auto_container.PythonAutoContainerTask(*args, **kwargs)[source]

A Python AutoContainer task should be used as the base for all extensions that want the user’s code to be in the container and the container information to be automatically captured. This base will auto configure the image and image version to be used for all its derivatives.

If you are looking to extend, you might prefer to use PythonFunctionTask or PythonInstanceTask

  • name – unique name for the task, usually the function’s module and name.

  • task_config – Configuration object for Task. Should be a unique type for that specific Task.

  • task_type – String task type to be associated with this Task

  • container_image – String FQN for the image.

  • requests – custom resource request settings.

  • limits – custom resource limit settings.

  • environment – Environment variables you want the task to have when run.

  • task_resolver – Custom resolver - will pick up the default resolver if empty, or the resolver set in the compilation context if one is set.

  • secret_requests (List[Secret]) –

    Secrets that are requested by this container execution. These secrets will be mounted based on the configuration in the Secret and available through the SecretManager using the name of the secret as the group Ideally the secret keys should also be semi-descriptive. The key values will be available from runtime, if the backend is configured to provide secrets and if secrets are available in the configured secrets store. Possible options for secret stores are

class flytekit.core.python_function_task.PythonFunctionTask(*args, **kwargs)[source]

A Python Function task should be used as the base for all extensions that have a python function. It will automatically detect interface of the python function and when serialized on the hosted Flyte platform handles the writing execution command to execute the function

It is advised this task is used using the @task decorator as follows

In the above code, the name of the function, the module, and the interface (inputs = int and outputs = str) will be auto detected.

  • task_config (T) – Configuration object for Task. Should be a unique type for that specific Task

  • task_function (Callable) – Python function that has type annotations and works for the task

  • ignore_input_vars (Optional[List[str]]) – When supplied, these input variables will be removed from the interface. This can be used to inject some client side variables only. Prefer using ExecutionParams

  • execution_mode (Optional[ExecutionBehavior]) – Defines how the execution should behave, for example executing normally or specially handling a dynamic case.

  • task_type (str) – String task type to be associated with this Task


There are two workflow classes, and both inherit from the WorkflowBase class.

class flytekit.core.workflow.PythonFunctionWorkflow(*args, **kwargs)[source]

Please read Workflows first for a high-level understanding of what workflows are in Flyte. This Python object represents a workflow defined by a function and decorated with the @workflow decorator. Please see notes on that object for additional information.

class flytekit.core.workflow.ImperativeWorkflow(name, failure_policy=None, interruptible=False)[source]

An imperative workflow is a programmatic analogue to the typical @workflow function-based workflow and is better suited to programmatic applications.

Assuming you have some tasks like so

def t1(a: str) -> str:
    return a + " world"

def t2():
    print("side effect")

You could create a workflow imperatively like so

# Create the workflow with a name. This needs to be unique within the project and takes the place of the function
# name that's used for regular decorated function-based workflows.
wb = Workflow(name="my_workflow")
# Adds a top level input to the workflow. This is like an input to a workflow function.
wb.add_workflow_input("in1", str)
# Call your tasks.
node = wb.add_entity(t1, a=wb.inputs["in1"])
# This is analogous to a return statement
wb.add_workflow_output("from_n0t1", node.outputs["o0"])

This workflow would be identical on the back-end to

nt = typing.NamedTuple("wf_output", from_n0t1=str)

def my_workflow(in1: str) -> nt:
    x = t1(a=in1)
    return nt(

Note that the only reason we need the NamedTuple is so we can name the output the same thing as in the imperative example. The imperative paradigm makes the naming of workflow outputs easier, but this isn’t a big deal in function-workflows because names tend to not be necessary.


Launch Plan#

There is only one LaunchPlan class.

class flytekit.core.launch_plan.LaunchPlan(name, workflow, parameters, fixed_inputs, schedule=None, notifications=None, labels=None, annotations=None, raw_output_data_config=None, max_parallelism=None, security_context=None)[source]

Launch Plans are one of the core constructs of Flyte. Please take a look at the discussion in the core concepts if you are unfamiliar with them.

Every workflow is registered with a default launch plan, which is just a launch plan with none of the additional attributes set - no default values, fixed values, schedules, etc. Assuming you have the following workflow

def wf(a: int, c: str) -> str:

Create the default launch plan with


If you specify additional parameters, you’ll also have to give the launch plan a unique name. Default and fixed inputs can be expressed as Python native values like so:

    workflow=wf, name="your_lp_name_1", default_inputs={"a": 3}, fixed_inputs={"c": "4"}

Additionally, a launch plan can be configured to run on a schedule and emit notifications.

Please see the relevant Schedule and Notification objects as well.

To configure the remaining parameters, you’ll need to import the relevant model objects as well.

sched = CronSchedule("* * ? * * *", kickoff_time_input_arg="abc")
email_notif = notification.Email(
    phases=[_execution_model.WorkflowExecutionPhase.SUCCEEDED], recipients_email=[""]
    workflow=wf, name="your_lp_name_2", schedule=sched, notifications=[email_notif]
from flytekit.models.common import Annotations, AuthRole, Labels, RawOutputDataConfig

Then use as follows

auth_role_model = AuthRole(assumable_iam_role="my:iam:role")

labels_model = Labels({"label": "foo"})
annotations_model = Annotations({"annotate": "bar"})

raw_output_data_config = RawOutputDataConfig("s3://foo/output")
    workflow=wf, name="your_lp_name_5", raw_output_data_config=raw_output_data_config
  • name (str) –

  • workflow (_annotated_workflow.WorkflowBase) –

  • parameters (_interface_models.ParameterMap) –

  • fixed_inputs (_literal_models.LiteralMap) –

  • schedule (_schedule_model.Schedule) –

  • notifications (List[_common_models.Notification]) –

  • labels (_common_models.Labels) –

  • annotations (_common_models.Annotations) –

  • raw_output_data_config (_common_models.RawOutputDataConfig) –

  • max_parallelism (int) –

  • security_context (Optional[security.SecurityContext]) –

Exception Handling#

Exception handling takes place along two dimensions:

  • System vs. User: We try to differentiate between user exceptions and Flytekit/system-level exceptions. For instance, if Flytekit fails to upload its outputs, that’s a system exception. If the user raises a ValueError because of an unexpected input in the task code, that’s a user exception.

  • Recoverable vs. Non-recoverable: Recoverable errors will be retried and counted against the task’s retry count. Non-recoverable errors will simply fail. System exceptions are by default recoverable (since there’s a good chance it was just a blip).

Here’s the user exception tree. Feel free to raise any of these exception classes. Note that the FlyteRecoverableException is the only recoverable exception. All others, along with all the non-Flytekit defined exceptions, are non-recoverable.

Inheritance diagram of flytekit.exceptions.user.FlyteValidationException, flytekit.exceptions.user.FlyteEntityAlreadyExistsException, flytekit.exceptions.user.FlyteValueException, flytekit.exceptions.user.FlyteTimeout, flytekit.exceptions.user.FlyteAuthenticationException, flytekit.exceptions.user.FlyteRecoverableException


For those who want to dig deeper, take a look at the flytekit.common.exceptions.scopes.FlyteScopedException classes. There are two decorators that are interspersed throughout the codebase.

flytekit.exceptions.scopes.system_entry_point(wrapper=None, enabled=None, adapter=None, proxy=<class 'FunctionWrapper'>)[source]#

The reason these two (see the user one below) decorators exist is to categorize non-Flyte exceptions at arbitrary locations. For example, while there is a separate ecosystem of Flyte-defined user and system exceptions (see the FlyteException hierarchy), and we can easily understand and categorize those, if flytekit comes upon a random ValueError or other non-flytekit defined error, how would we know if it was a bug in flytekit versus an error with user code or something the user called? The purpose of these decorators is to categorize those (see the last case in the nested try/catch below.

Decorator for wrapping functions that enter a system context. This should decorate every method that may invoke some user code later on down the line. This will allow us to add differentiation between what is a user error and what is a system failure. Furthermore, we will clean the exception trace so as to make more sense to the user – allowing them to know if they should take action themselves or pass on to the platform owners. We will dispatch metrics and such appropriately.

flytekit.exceptions.scopes.user_entry_point(wrapper=None, enabled=None, adapter=None, proxy=<class 'FunctionWrapper'>)[source]#

See the comment for the system_entry_point above as well.

Decorator for wrapping functions that enter into a user context. This will help us differentiate user-created failures even when it is re-entrant into system code.

Note: a user_entry_point can ONLY ever be called from within a @system_entry_point wrapped function, therefore, we can always ensure we will hit a system_entry_point to correctly reformat our exceptions. Also, any exception we create here will only be handled within our system code so we don’t need to worry about leaking weird exceptions to the user.

Call Patterns#

The above-mentioned entities (tasks, workflows, and launch plan) are callable. They can be invoked to yield a unit (or units) of work in Flyte.

In Pythonic terms, when you add () to the end of one of the entities, it invokes the __call__ method on the object.

What happens when a callable entity is called depends on the current context, specifically the current flytekit.FlyteContext

Raw Task Execution#

This is what happens when a task is just run as part of a unit test. The @task decorator actually turns the decorated function into an instance of the PythonFunctionTask object, but when a user calls the task() outside of a workflow, the original function is called without any interference by Flytekit.

Task Execution Inside Workflow#

When a workflow is run locally (say as a part of a unit test), certain changes occur in the task.

Before going further, there is a special object that’s worth mentioning, the flytekit.extend.Promise.

class flytekit.core.promise.Promise(var, val)[source]

This object is a wrapper and exists for three main reasons. Let’s assume we’re dealing with a task like

def t1() -> (int, str): ...
  1. Handling the duality between compilation and local execution - when the task function is run in a local execution mode inside a workflow function, a Python integer and string are produced. When the task is being compiled as part of the workflow, the task call creates a Node instead, and the task returns two Promise objects that point to that Node.

  2. One needs to be able to call

    x = t1().with_overrides(...)

    If the task returns an integer or a (int, str) tuple like t1 above, calling with_overrides on the result would throw an error. This Promise object adds that.

  3. Assorted handling for conditionals.

  • var (str) –

  • val (Union[NodeOutput, _literal_models.Literal]) –

Let’s assume we have a workflow like

def t1(a: int) -> Tuple[int, str]:
    return a + 2, "world"

def t2(a: str, b: str) -> str:
    return b + a

def my_wf(a: int, b: str) -> Tuple[int, str]:
    x, y = t1(a=a).with_overrides(...)
    d = t2(a=y, b=b)
    return x, d

As discussed in the Promise object’s documentation, when a task is called from inside a workflow, the Python native values returned by the raw underlying functions are first converted into Flyte IDL literals and then wrapped inside Promise objects. One Promise is created for every return variable.

When the next task is called, the logic is triggered to unwrap these Promises.


When a workflow is compiled, instead of producing Promise objects that wrap literal values, they wrap a flytekit.core.promise.NodeOutput instead. This helps track data dependency between tasks.

Branch Skip#

If a flytekit.conditional() is determined to be false, then Flytekit will skip calling the task. This avoids running the unintended task.


We discussed about a task’s execution pattern above. The same pattern can be applied to workflows and launch plans too!