flytekit.map_task#

flytekit.map_task(task_function, concurrency=0, min_success_ratio=1.0, **kwargs)[source]#

Use a map task for parallelizable tasks that run across a list of an input type. A map task can be composed of any individual flytekit.PythonFunctionTask.

Invoke a map task with arguments using the list version of the expected input.

Usage:

@task
def my_mappable_task(a: int) -> typing.Optional[str]:
    return str(a)

@workflow
def my_wf(x: typing.List[int]) -> typing.List[typing.Optional[str]]:
    return map_task(
        my_mappable_task,
        metadata=TaskMetadata(retries=1),
        concurrency=10,
        min_success_ratio=0.75,
    )(a=x).with_overrides(requests=Resources(cpu="10M"))

At run time, the underlying map task will be run for every value in the input collection. Attributes such as flytekit.TaskMetadata and with_overrides are applied to individual instances of the mapped task.

Map Task Plugins

There are two plugins to run maptasks that ship as part of flyteplugins:

  1. K8s Array

  2. AWS batch

Enabling a plugin is controlled in the plugin configuration at values-sandbox.yaml.

K8s Array

By default, the map task uses the K8s Array plugin. It executes array tasks by launching a pod for every instance in the array. It’s simple to use, has a straightforward implementation, and works out of the box.

AWS batch

Learn more about AWS batch setup configuration here.

A custom plugin can also be implemented to handle the task type.

Parameters
  • task_function (Union[flytekit.core.python_function_task.PythonFunctionTask, flytekit.core.python_function_task.PythonInstanceTask, functools.partial]) – This argument is implicitly passed and represents the repeatable function

  • concurrency (int) – If specified, this limits the number of mapped tasks than can run in parallel to the given batch size. If the size of the input exceeds the concurrency value, then multiple batches will be run serially until all inputs are processed. If left unspecified, this means unbounded concurrency.

  • min_success_ratio (float) – If specified, this determines the minimum fraction of total jobs which can complete successfully before terminating this task and marking it successful.