Enabling users to write tasks and workflows is the core feature of flytekit, it is why it exists. This document goes over how some of the internals work.
Types and Type Engine¶
Flyte has its own type system, which is codified in the IDL. Python of course has its own typing system, even though it’s a dynamic language, and is mostly explained in PEP 484. In order to work properly, flytekit needs to be able to convert between the two.
Tasks, workflows, and launch plans form the core of the Flyte user experience. Each of these concepts are backed by one or more Python classes. These classes in turn, are instantiated by decorators (in the case of tasks and workflow) or a normal Python call (in the case of launch plans).
This is the current task class hierarchy.
Please see the documentation on each of the classes for details.
Task(task_type: str, name: str, interface: Optional[flytekit.models.interface.TypedInterface] = None, metadata: Optional[flytekit.core.base_task.TaskMetadata] = None, task_type_version=0, security_ctx: Optional[flytekit.models.security.SecurityContext] = None, **kwargs)
The base of all Tasks in flytekit. This task is closest to the FlyteIDL TaskTemplate and captures information in FlyteIDL specification and does not have python native interfaces associated. For any real extension please refer to the derived classes.
Base Class for all Tasks with a Python native
Interface. This should be directly used for task types, that do not have a python function to be executed. Otherwise refer to
A Python AutoContainer task should be used as the base for all extensions that want the user’s code to be in the container and the container information to be automatically captured. This base will auto configure the image and image version to be used for all its derivatives.
If you are looking to extend, you might prefer to use
A Python Function task should be used as the base for all extensions that have a python function. It will automatically detect interface of the python function and also, create the write execution command to execute the function
It is advised this task is used using the @task decorator as follows
In the above code, the name of the function, the module, and the interface (inputs = int and outputs = str) will be auto detected.
There is currently only one
Please read Workflows first for a high-level understanding of what workflows are in Flyte. This Python object represents a workflow defined by a function and decorated with the
@workflowdecorator. Please see notes on that object for additional information.
There is also only one
LaunchPlan(name: str, workflow: flytekit.core.workflow.WorkflowBase, parameters: flytekit.models.interface.ParameterMap, fixed_inputs: flytekit.models.literals.LiteralMap, schedule: Optional[flytekit.models.schedule.Schedule] = None, notifications: Optional[List[flytekit.models.common.Notification]] = None, labels: Optional[flytekit.models.common.Labels] = None, annotations: Optional[flytekit.models.common.Annotations] = None, raw_output_data_config: Optional[flytekit.models.common.RawOutputDataConfig] = None, auth_role: Optional[flytekit.models.common.AuthRole] = None)
The three entities above are all callable. In Flyte terms that means they can be invoked to yield a unit (or units) of work.
In Python terms that means you can add
() to the end of one of it which invokes the
__call__ method on the object.
What happens when a callable entity is called depends on the current context, specifically the current
Raw Task Execution¶
This is what happens when a task is just run as part of a unit test. The
@task decorator actually turns the decorated function into an instance of the
PythonFunctionTask object but when a user calls it,
task1(), outside of a workflow, the original function is called without interference by flytekit.
Task Execution Inside Workflow¶
This is what happens, to the task when a workflow is being run locally, say as part of a unit test for the workflow.
Before going further, there is a special object that’s worth mentioning, the
Promise(var: str, val: Union[flytekit.core.promise.NodeOutput, flytekit.models.literals.Literal])
This object is a wrapper and exists for three main reasons. Let’s assume we’re dealing with a task like
@task def t1() -> (int, str): ...
Handling the duality between compilation and local execution - when the task function is run in a local execution mode inside a workflow function, a Python integer and string are produced. When the task is being compiled as part of the workflow, the task call creates a Node instead, and the task returns two Promise objects that point to that Node.
One needs to be able to call
x = t1().with_overrides(...)
If the task returns an integer or a
(int, str)tuple like
with_overrideson the result would throw an error. This Promise object adds that.
Assorted handling for conditionals.
Let’s assume we have a workflow like
@task def t1(a: int) -> (int, str): return a + 2, "world" @task def t2(a: str, b: str) -> str: return b + a @workflow def my_wf(a: int, b: str) -> (int, str): x, y = t1(a=a).with_overrides(...) d = t2(a=y, b=b) return x, d
As discussed in the Promise object’s documentation, when a task is called from inside a workflow, the Python native values that the raw underlying functions return are first converted into Flyte IDL literals, and then wrapped inside
Promise objects. One
Promise is created for every return variable.
When the next task is called, logic is triggered to unwrap these promises.
When a workflow is compiled, instead of producing promise objects that wrap literal values, they wrap a
flytekit.core.promise.NodeOutput instead. This is how data dependency is tracked between tasks.
If it’s been determined that a conditional is not true, then flytekit will skip actually calling the task which means that any side-effects in the task logic will not be run.
Even though in the discussion above, we talked about a task’s execution pattern, the same actually applied to workflows and launch plans.