Tasks, Workflows and LaunchPlans#

In “Getting started with workflow development”, we got a basic sense of how Flyte works by creating and running a basic workflow made up of a few tasks. In this guide, you’ll learn more about how tasks and workflows fit into the Flyte programming model.

Tasks#

Flyte tasks are the core building blocks of larger, more complex workflows.

Tasks are Containerized Blocks of Compute#

You can think of a Flyte task as a containerized block of compute. When a task runs on a Flyte backend, it’s isolated within its own container, separate from all other tasks. Consider this simple one:

from typing import List
from flytekit import task

@task
def mean(values: List[float]) -> float:
    return sum(values) / len(values)

As you can see, a task is just a regular Python function that’s decorated with @task. We can run this function just like any other Python function:

mean(values=[float(i) for i in range(1, 11)])
5.5

Important

There are three important things to note here:

  • Most of the Flyte tasks you’ll ever write can be executed locally.

  • Tasks and workflows must be invoked with keyword arguments.

  • When a task runs on a Flyte cluster, it runs on a Kubernetes Pod, where Flyte orchestrates what task to run at what time in the context of a workflow.

Tasks are Strongly Typed#

You might also notice that the mean function signature is type-annotated with Python type hints. Flyte uses these annotations to check the input and output types of the task when it’s compiled or invoked.

Under the hood, Flyte uses its own type system that translates values to and from Flyte types and the SDK language types, in this case Python. The Flyte type system uses Python type annotations to make sure that the data passing through tasks and workflows are compatible with the explicitly stated types that we define through a function signature.

So if we call the mean function with the wrong types, we get an error:

try:
    mean(values="hi")
except Exception as e:
    print(e)
Failed to convert inputs of task 'mean':
  Failed argument 'values': Expected a list

This may not seem like much for this simple example, but as you start dealing with more complex data types and pipelines, Flyte’s type system becomes invaluable for catching bugs early.

Flyte’s type system is also used for caching, data lineage tracking, and automatic serialization and deserialization of data as it’s passed from one task to another. You can learn more about it in the User Guide.

Workflows#

Workflows compose multiple tasks – or other workflows – into meaningful steps of computation to produce some useful set of outputs or outcomes.

Suppose the mean task is just one building block of a larger computation. This is where Flyte workflows can help us manage the added complexity.

Workflows Build Execution Graphs#

Suppose that we want to mean-center and standard-deviation-scale a set of values. In addition to a mean function, we also need to compute standard deviation and implement the centering and scaling logic.

Let’s go ahead and implement those as tasks:

from math import sqrt
from flytekit import workflow


@task
def standard_deviation(values: List[float], mu: float) -> float:
    variance = sum([(x - mu) ** 2 for x in values])
    return sqrt(variance)

@task
def standard_scale(values: List[float], mu: float, sigma: float) -> List[float]:
    return [(x - mu) / sigma for x in values]

Then we put all the pieces together into a workflow, which is a function that’s decorated with @workflow:

@workflow
def standard_scale_workflow(values: List[float]) -> List[float]:
    mu = mean(values=values)
    sigma = standard_deviation(values=values, mu=mu)
    return standard_scale(values=values, mu=mu, sigma=sigma)

Just like tasks, workflows are executable in a regular Python runtime:

standard_scale_workflow(values=[float(i) for i in range(1, 11)])
[-0.4954336943068623,
 -0.3853373177942262,
 -0.27524094128159016,
 -0.1651445647689541,
 -0.055048188256318034,
 0.055048188256318034,
 0.1651445647689541,
 0.27524094128159016,
 0.3853373177942262,
 0.4954336943068623]

Workflows versus Tasks Under the Hood#

Although Flyte workflow syntax looks like Python code, it’s actually a domain-specific language (DSL) for building execution graphs where tasks – and other workflows – serve as the building blocks.

This means that the workflow function body only supports a subset of Python’s semantics:

  • In workflows, you shouldn’t use non-deterministic operations like rand.random, time.now(), etc. These functions will be invoked at compile time and your workflows will not behave as you expect them to.

  • Within workflows, the inputs of workflow and the outputs of tasks function are promises under the hood, so you can’t access and operate on them like typical Python function outputs. You can only pass promises into tasks, workflows, and other Flyte constructs.

  • Regular Python conditionals won’t work as intended in workflows: you need to use the conditional construct.

In contrast to workflow code, the code within tasks is actually executed by a Python interpreter when it’s run locally or inside a container when run on a Flyte cluster.

Workflows Deal with Promises#

A promise is essentially a placeholder for a value that hasn’t been materialized yet. To show you what this means concretely, let’s re-define the workflow above but let’s also print the output of one of the tasks:

@workflow
def standard_scale_workflow_with_print(values: List[float]) -> List[float]:
    mu = mean(values=values)
    print(mu)  # this is not the actual float value!
    sigma = standard_deviation(values=values, mu=mu)
    return standard_scale(values=values, mu=mu, sigma=sigma)

We didn’t even execute the workflow and we’re already seeing the value of mu, which is a promise. So what’s happening here?

When we decorate standard_scale_workflow_with_print with @workflow, Flyte compiles an execution graph that’s defined inside the function body, so it doesn’t actually run the computations yet. Therefore, when Flyte compiles a workflow, the outputs of task calls are actually promises and not regular python values.

Workflows are Strongly Typed Too#

Since both tasks and workflows are strongly typed, Flyte can actually catch type errors! When we learn more about packaging and registering in the next few guides, we’ll see that Flyte can also catch compile-time errors even before you running any code!

For now, however, we can run the workflow locally to see that we’ll get an error if we introduce a bug in the standard_scale task.

@task
def buggy_standard_scale(values: List[float], mu: float, sigma: float) -> float:
    """
    🐞 The implementation and output type of this task is incorrect! It should
    be List[float] instead of a sum of all the scaled values.
    """
    return sum([(x - mu) / sigma for x in values])

@workflow
def buggy_standard_scale_workflow(values: List[float]) -> List[float]:
    mu = mean(values=values)
    sigma = standard_deviation(values=values, mu=mu)
    return buggy_standard_scale(values=values, mu=mu, sigma=sigma)

try:
    buggy_standard_scale_workflow(values=[float(i) for i in range(1, 11)])
except Exception as e:
    print(e)
Encountered error while executing workflow 'buggy_standard_scale_workflow':
  Failed to convert output in position 0 of value <FlyteLiteral scalar { primitive { float_value: -5.5511151231257827e-17 } }>, expected type typing.List[float].

Workflows can be Embedded in Other Workflows#

When a workflow uses another workflow as part of the execution graph, we call the inner workflow a subworkflow. Subworkflows are strongly typed and can be invoked just like tasks when defining the outer workflow.

For example, we can embed standard_scale_workflow inside workflow_with_subworkflow, which uses a generate_data task to supply the data for scaling:

import random

@task
def generate_data(num_samples: int, seed: int) -> List[float]:
    random.seed(seed)
    return [random.random() for _ in range(num_samples)]

@workflow
def workflow_with_subworkflow(num_samples: int, seed: int) -> List[float]:
    data = generate_data(num_samples=num_samples, seed=seed)
    return standard_scale_workflow(values=data)

workflow_with_subworkflow(num_samples=10, seed=3)
[-0.1770160403096259,
 0.20693563724701722,
 -0.011544780023631346,
 0.2817676225164289,
 0.30909774347569724,
 -0.3931918658076169,
 -0.45883459142410704,
 0.5745587174373288,
 -0.15020102101136665,
 -0.18157142210012475]

Important

Learn more about subworkflows in the User Guide.

Specifying Dependencies without Passing Data#

You can also specify dependencies between tasks and subworkflows without passing data from the upstream entity to the downstream entity using the >> right shift operator:

@workflow
def wf():
    promise1 = task1()
    promise2 = task2()
    promise3 = subworkflow()
    promise1 >> promise2
    promise2 >> promise3

In this workflow, task1 will execute before task2, but it won’t pass any of its data to task2. Similarly, task2 will execute before subworkflow.

Important

Learn more about chaining flyte entities in the User Guide.

Launch plans#

A Flyte LaunchPlan is a partial or complete binding of inputs necessary to launch a workflow. You can think of it like the partial() function in the Python standard library where you can define default (overrideable) and fixed (non-overrideable) inputs.

Note

Additionally, LaunchPlans provides an interface for specifiying run-time overrides such as notifications, schedules, and more.

Create a launch plan like so:

from flytekit import LaunchPlan

standard_scale_launch_plan = LaunchPlan.get_or_create(
    standard_scale_workflow,
    name="standard_scale_lp",
    default_inputs={"values": [3.0, 4.0, 5.0]}
)

Invoking LaunchPlans Locally#

You can run a LaunchPlan locally. This is, using the local Python interpreter (REPL). It will use the default_inputs dictionary whenever it’s invoked:

standard_scale_launch_plan()
[-0.7071067811865475, 0.0, 0.7071067811865475]

Of course, these defaults can be overridden:

standard_scale_launch_plan(values=[float(x) for x in range(20, 30)])
[-0.4954336943068623,
 -0.3853373177942262,
 -0.27524094128159016,
 -0.1651445647689541,
 -0.055048188256318034,
 0.055048188256318034,
 0.1651445647689541,
 0.27524094128159016,
 0.3853373177942262,
 0.4954336943068623]

Later, you’ll learn how to run a launch plan on a cron schedule, but for the time being you can think of them as a way for you to templatize workflows for some set of related use cases, such as model training with a fixed dataset for reproducibility purposes.

LaunchPlans can be Embedded in Workflows#

Similar to subworkflows, launchplans can be used in a workflow definition:

@workflow
def workflow_with_launchplan(num_samples: int, seed: int) -> List[float]:
    data = generate_data(num_samples=num_samples, seed=seed)
    return standard_scale_launch_plan(values=data)

workflow_with_launchplan(num_samples=10, seed=3)
[-0.1770160403096259,
 0.20693563724701722,
 -0.011544780023631346,
 0.2817676225164289,
 0.30909774347569724,
 -0.3931918658076169,
 -0.45883459142410704,
 0.5745587174373288,
 -0.15020102101136665,
 -0.18157142210012475]

The main difference between subworkflows and launch plans invoked in workflows is that the latter will kick off a new workflow execution on the Flyte cluster with its own execution name, while the former will execute the workflow in the context of the parent workflow’s execution context.

Important

Learn more about subworkflows in the User Guide.

What’s Next?#

So far we’ve been working with small code snippets and self-contained scripts. Next, we’ll see how to organize a Flyte project that follows software engineering best practices, including organizing code into meaningful modules, defining third-party dependencies, and creating a container image for making our workflows reproducible.