Decorating tasks#

Tags: Intermediate

You can easily change how tasks behave by using decorators to wrap your task functions.

In order to make sure that your decorated function contains all the type annotation and docstring information that Flyte needs, you will need to use the built-in wraps() decorator.

To begin, import the required dependencies.

import logging
from functools import partial, wraps

from flytekit import task, workflow

Create a logger to monitor the execution’s progress.

logger = logging.getLogger(__file__)

Using a single decorator#

We define a decorator that logs the input and output details for a decorated task.

def log_io(fn):
    def wrapper(*args, **kwargs):"task {fn.__name__} called with args: {args}, kwargs: {kwargs}")
        out = fn(*args, **kwargs)"task {fn.__name__} output: {out}")
        return out

    return wrapper

We create a task named t1 that is decorated with log_io.


The order of invoking the decorators is important. @task should always be the outer-most decorator.

def t1(x: int) -> int:
    return x + 1

Stacking multiple decorators#

You can also stack multiple decorators on top of each other as long as @task is the outer-most decorator.

We define a decorator that verifies if the output from the decorated function is a positive number before it’s returned. If this assumption is violated, it raises a ValueError exception.

def validate_output(fn=None, *, floor=0):
    def wrapper(*args, **kwargs):
        out = fn(*args, **kwargs)
        if out <= floor:
            raise ValueError(f"output of task {fn.__name__} must be a positive number, found {out}")
        return out

    if fn is None:
        return partial(validate_output, floor=floor)

    return wrapper


The output of the validate_output task uses partial() to implement parameterized decorators.

We define a function that uses both the logging and validator decorators.

def t2(x: int) -> int:
    return x + 10

Finally, we compose a workflow that calls t1 and t2.

def decorating_task_wf(x: int) -> int:
    return t2(x=t1(x=x))

if __name__ == "__main__":
    print(f"Running decorating_task_wf(x=10) {decorating_task_wf(x=10)}")

Run the example on the Flyte cluster#

To run the provided workflow on the Flyte cluster, use the following command:

pyflyte run --remote \ \
  decorating_task_wf --x 10

In this example, you learned how to modify the behavior of tasks via function decorators using the built-in wraps() decorator pattern. To learn more about how to extend Flyte at a deeper level, for example creating custom types, custom tasks or backend plugins, see Extending Flyte.