Decorating tasks

Tags: Intermediate

You can easily change how tasks behave by using decorators to wrap your task functions.

In order to make sure that your decorated function contains all the type annotation and docstring information that Flyte needs, you will need to use the built-in wraps() decorator.

Note

To clone and run the example code on this page, see the Flytesnacks repo.

To begin, import the required dependencies.

advanced_composition/decorating_tasks.py
import logging
from functools import partial, wraps

from flytekit import task, workflow

Create a logger to monitor the execution’s progress.

advanced_composition/decorating_tasks.py
logger = logging.getLogger(__file__)

Using a single decorator

We define a decorator that logs the input and output details for a decorated task.

advanced_composition/decorating_tasks.py
def log_io(fn):
    @wraps(fn)
    def wrapper(*args, **kwargs):
        logger.info(f"task {fn.__name__} called with args: {args}, kwargs: {kwargs}")
        out = fn(*args, **kwargs)
        logger.info(f"task {fn.__name__} output: {out}")
        return out

    return wrapper

We create a task named t1 that is decorated with log_io.

Note

The order of invoking the decorators is important. @task should always be the outer-most decorator.

advanced_composition/decorating_tasks.py
@task
@log_io
def t1(x: int) -> int:
    return x + 1

Stacking multiple decorators

You can also stack multiple decorators on top of each other as long as @task is the outer-most decorator.

We define a decorator that verifies if the output from the decorated function is a positive number before it’s returned. If this assumption is violated, it raises a ValueError exception.

advanced_composition/decorating_tasks.py
def validate_output(fn=None, *, floor=0):
    @wraps(fn)
    def wrapper(*args, **kwargs):
        out = fn(*args, **kwargs)
        if out <= floor:
            raise ValueError(f"output of task {fn.__name__} must be a positive number, found {out}")
        return out

    if fn is None:
        return partial(validate_output, floor=floor)

    return wrapper

Note

The output of the validate_output task uses partial() to implement parameterized decorators.

We define a function that uses both the logging and validator decorators.

advanced_composition/decorating_tasks.py
@task
@log_io
@validate_output(floor=10)
def t2(x: int) -> int:
    return x + 10

Finally, we compose a workflow that calls t1 and t2.

advanced_composition/decorating_tasks.py
@workflow
def decorating_task_wf(x: int) -> int:
    return t2(x=t1(x=x))


if __name__ == "__main__":
    print(f"Running decorating_task_wf(x=10) {decorating_task_wf(x=10)}")

Run the example on the Flyte cluster

To run the provided workflow on the Flyte cluster, use the following command:

pyflyte run --remote \
  https://raw.githubusercontent.com/flyteorg/flytesnacks/master/examples/advanced_composition/advanced_composition/decorating_tasks.py \
  decorating_task_wf --x 10

In this example, you learned how to modify the behavior of tasks via function decorators using the built-in wraps() decorator pattern. To learn more about how to extend Flyte at a deeper level, for example creating custom types, custom tasks or backend plugins, see Extending Flyte.