Tasks

Task is a fundamental building block and an extension point of Flyte, which encapsulates the users’ code. They possess the following properties:

  1. Versioned (usually tied to the git sha)

  2. Strong interfaces (specified inputs and outputs)

  3. Declarative

  4. Independently executable

  5. Unit testable

A task in Flytekit can be of two types:

  1. A task that has a Python function associated with it. The execution of the task is equivalent to the execution of this function

  2. A task that does not have a Python function, e.g., an SQL query or some other portable task like Sagemaker prebuilt algorithms, or something that just invokes an API

Flyte has multiple plugins for tasks, either a backend plugin (Athena).

In this example, you will learn how to write and execute a Python function task. Other types of tasks will be covered in the later sections.

For any task in flyte, there is always one required import, which is:

from flytekit import task

A PythonFunctionTask must always be decorated with the @task flytekit.task() decorator. The task in itself is a regular Python function, although with one exception: it needs all the inputs and outputs to be clearly annotated with the types. The types are regular python types; we’ll go over more on this in the type-system section.

@task
def square(n: int) -> int:
    """
     Parameters:
        n (int): name of the parameter for the task will be derived from the name of the input variable
               the type will be automatically deduced to be Types.Integer

    Return:
        int: The label for the output will be automatically assigned and type will be deduced from the annotation

    """
    return n * n

In this task, one input is n which has type int. The task square takes the number n and returns a new integer (squared value).

Note

Flytekit will assign a default name to the output variable like out0. In case of multiple outputs, each output will be numbered in the order starting with 0, e.g., -> out0, out1, out2, ....

You can execute a Flyte task as any normal function.

if __name__ == "__main__":
    print(square(n=10))

Single Task Execution

Tasks are the atomic units of execution in Flyte. Although workflows are traditionally composed of multiple tasks with dependencies defined by shared inputs and outputs, it can be helpful to execute a single task during the process of iterating on its definition. It can be tedious to write a new workflow definition every time you want to execute a single task under development but “single task executions” can be used to iterate on task logic easily.

You can launch a task on Flyte console by inputting an IAM role or a Kubernetes service account.

Alternatively, you can use flytectl to launch the task. Run the following commands in the cookbook directory.

Note

This example is building a Docker image and pushing it only for sandbox (for non-sandbox, you will have to push the image to a Docker registry).

Build a Docker image to package the task.

flytectl sandbox exec -- docker build . --tag "flytebasics:v1" -f core/Dockerfile

Package the task.

pyflyte --pkgs core.flyte_basics package --image flytebasics:v1

Register the task.

flytectl register files --project flytesnacks --domain development --archive flyte-package.tgz --version v1

Generate an execution spec file.

flytectl get task --domain development --project flytesnacks core.flyte_basics.task.square --version v1 --execFile exec_spec.yaml

Create an execution using the exec spec file.

flytectl create execution --project flytesnacks --domain development --execFile exec_spec.yaml

Note

For subsequent executions, you can simply run flytectl create execution ... and skip the previous commands. Alternatively, you can launch the task from the Flyte console.

Monitor the execution by providing the execution name from the create execution command.

flytectl get execution --project flytesnacks --domain development <execname>

Total running time of the script: ( 0 minutes 0.000 seconds)

Gallery generated by Sphinx-Gallery