Map Tasks

A map task lets you run a pod task or a regular task over a list of inputs within a single workflow node. This means you can run thousands of instances of the task without creating a node for every instance, providing valuable performance gains!

Some use cases of map tasks include:

  • Several inputs must run through the same code logic

  • Multiple data batches need to be processed in parallel

  • Hyperparameter optimization

Let’s look at an example now!

First, we import the libraries.

import typing

from flytekit import Resources, map_task, task, workflow

Next, we define a task that we will use in our map task.

Note

A map task can only accept one input and produce one output.

@task
def a_mappable_task(a: int) -> str:
    inc = a + 2
    stringified = str(inc)
    return stringified

We also define a task to reduce the mapped output to a string.

@task
def coalesce(b: typing.List[str]) -> str:
    coalesced = "".join(b)
    return coalesced

We send a_mappable_task to be repeated across a collection of inputs to the map_task() function. In our example, a of type typing.List[int] is the input. The task a_mappable_task is run for each element in the list.

with_overrides is useful to set resources for individual map task.

@workflow
def my_map_workflow(a: typing.List[int]) -> str:
    mapped_out = map_task(a_mappable_task)(a=a).with_overrides(
        requests=Resources(mem="300Mi"),
        limits=Resources(mem="500Mi"),
        retries=1,
    )
    coalesced = coalesce(b=mapped_out)
    return coalesced

Lastly, we can run the workflow locally!

if __name__ == "__main__":
    result = my_map_workflow(a=[1, 2, 3, 4, 5])
    print(f"{result}")

Map tasks can run on alternate execution backends, such as AWS Batch, which is a provisioned service that can scale to great sizes.

Total running time of the script: ( 0 minutes 0.000 seconds)

Gallery generated by Sphinx-Gallery