Authoring Structure

Enabling users to write tasks and workflows is the core feature of flytekit, it is why it exists. This document goes over how some of the internals work.

Background

Please see the design doc.

Types and Type Engine

Flyte has its own type system, which is codified in the IDL. Python of course has its own typing system, even though it’s a dynamic language, and is mostly explained in PEP 484. In order to work properly, flytekit needs to be able to convert between the two.

Type Engine

The primary way this happens is through the flytekit.extend.TypeEngine. This engine works by invoking a series of TypeTransformers. Each transformer is responsible for providing the functionality that the engine needs for a given native Python type.

Callable Entities

Tasks, workflows, and launch plans form the core of the Flyte user experience. Each of these concepts are backed by one or more Python classes. These classes in turn, are instantiated by decorators (in the case of tasks and workflow) or a normal Python call (in the case of launch plans).

Tasks

This is the current task class hierarchy.

Inheritance diagram of flytekit.core.python_function_task.PythonFunctionTask, flytekit.core.python_function_task.PythonInstanceTask, flytekit.extras.sqlite3.task.SQLite3Task

Please see the documentation on each of the classes for details.

class flytekit.core.base_task.Task(task_type: str, name: str, interface: Optional[flytekit.models.interface.TypedInterface] = None, metadata: Optional[flytekit.core.base_task.TaskMetadata] = None, task_type_version=0, security_ctx: Optional[flytekit.models.security.SecurityContext] = None, **kwargs)[source]

The base of all Tasks in flytekit. This task is closest to the FlyteIDL TaskTemplate and captures information in FlyteIDL specification and does not have python native interfaces associated. For any real extension please refer to the derived classes.

class flytekit.core.base_task.PythonTask(*args, **kwargs)[source]

Base Class for all Tasks with a Python native Interface. This should be directly used for task types, that do not have a python function to be executed. Otherwise refer to flytekit.PythonFunctionTask.

class flytekit.core.python_function_task.PythonAutoContainerTask(*args, **kwargs)[source]

A Python AutoContainer task should be used as the base for all extensions that want the user’s code to be in the container and the container information to be automatically captured. This base will auto configure the image and image version to be used for all its derivatives.

If you are looking to extend, you might prefer to use PythonFunctionTask or PythonInstanceTask

class flytekit.core.python_function_task.PythonFunctionTask(*args, **kwargs)[source]

A Python Function task should be used as the base for all extensions that have a python function. It will automatically detect interface of the python function and also, create the write execution command to execute the function

It is advised this task is used using the @task decorator as follows

In the above code, the name of the function, the module, and the interface (inputs = int and outputs = str) will be auto detected.

Workflows

There is currently only one Workflow class.

class flytekit.core.workflow.PythonFunctionWorkflow(*args, **kwargs)[source]

Please read Workflows first for a high-level understanding of what workflows are in Flyte. This Python object represents a workflow defined by a function and decorated with the @workflow decorator. Please see notes on that object for additional information.

Launch Plan

There is also only one LaunchPlan class.

class flytekit.core.launch_plan.LaunchPlan(name: str, workflow: flytekit.core.workflow.WorkflowBase, parameters: flytekit.models.interface.ParameterMap, fixed_inputs: flytekit.models.literals.LiteralMap, schedule: Optional[flytekit.models.schedule.Schedule] = None, notifications: Optional[List[flytekit.models.common.Notification]] = None, labels: Optional[flytekit.models.common.Labels] = None, annotations: Optional[flytekit.models.common.Annotations] = None, raw_output_data_config: Optional[flytekit.models.common.RawOutputDataConfig] = None, auth_role: Optional[flytekit.models.common.AuthRole] = None)[source]

Call Patterns

The three entities above are all callable. In Flyte terms that means they can be invoked to yield a unit (or units) of work. In Python terms that means you can add () to the end of one of it which invokes the __call__ method on the object.

What happens when a callable entity is called depends on the current context, specifically the current flytekit.FlyteContext

Raw Task Execution

This is what happens when a task is just run as part of a unit test. The @task decorator actually turns the decorated function into an instance of the PythonFunctionTask object but when a user calls it, task1(), outside of a workflow, the original function is called without interference by flytekit.

Task Execution Inside Workflow

This is what happens, to the task when a workflow is being run locally, say as part of a unit test for the workflow.

Before going further, there is a special object that’s worth mentioning, the flytekit.extend.Promise.

class flytekit.core.promise.Promise(var: str, val: Union[flytekit.core.promise.NodeOutput, flytekit.models.literals.Literal])[source]

This object is a wrapper and exists for three main reasons. Let’s assume we’re dealing with a task like

@task
def t1() -> (int, str): ...
  1. Handling the duality between compilation and local execution - when the task function is run in a local execution mode inside a workflow function, a Python integer and string are produced. When the task is being compiled as part of the workflow, the task call creates a Node instead, and the task returns two Promise objects that point to that Node.

  2. One needs to be able to call

    x = t1().with_overrides(...)
    

    If the task returns an integer or a (int, str) tuple like t1 above, calling with_overrides on the result would throw an error. This Promise object adds that.

  3. Assorted handling for conditionals.

Let’s assume we have a workflow like

@task
def t1(a: int) -> (int, str):
    return a + 2, "world"

@task
def t2(a: str, b: str) -> str:
    return b + a

@workflow
def my_wf(a: int, b: str) -> (int, str):
    x, y = t1(a=a).with_overrides(...)
    d = t2(a=y, b=b)
    return x, d

As discussed in the Promise object’s documentation, when a task is called from inside a workflow, the Python native values that the raw underlying functions return are first converted into Flyte IDL literals, and then wrapped inside Promise objects. One Promise is created for every return variable.

When the next task is called, logic is triggered to unwrap these promises.

Compilation

When a workflow is compiled, instead of producing promise objects that wrap literal values, they wrap a flytekit.core.promise.NodeOutput instead. This is how data dependency is tracked between tasks.

Branch Skip

If it’s been determined that a conditional is not true, then flytekit will skip actually calling the task which means that any side-effects in the task logic will not be run.

Note

Even though in the discussion above, we talked about a task’s execution pattern, the same actually applied to workflows and launch plans.