Running a Dask Task#

The plugin establishes a distinct virtual and short-lived cluster for each Dask task, with Flyte overseeing the entire cluster lifecycle.

To begin, import the required dependencies.

from flytekit import ImageSpec, Resources, task

Create an ImageSpec to encompass all the dependencies needed for the Dask task.

custom_image = ImageSpec(registry="ghcr.io/flyteorg", packages=["flytekitplugins-dask"])

Important

Replace ghcr.io/flyteorg with a container registry you’ve access to publish to. To upload the image to the local registry in the demo cluster, indicate the registry as localhost:30000.

The following imports are required to configure the Dask cluster in Flyte. You can load them on demand.

if custom_image.is_container():
    from dask import array as da
    from flytekitplugins.dask import Dask, WorkerGroup

When executed locally, Flyte launches a Dask cluster on the local environment. However, when executed remotely, Flyte triggers the creation of a cluster with a size determined by the specified Dask configuration.

@task(
    task_config=Dask(
        workers=WorkerGroup(
            number_of_workers=10,
            limits=Resources(cpu="4", mem="10Gi"),
        ),
    ),
    limits=Resources(cpu="1", mem="2Gi"),
    container_image=custom_image,
)
def hello_dask(size: int) -> float:
    # Dask will automatically generate a client in the background using the Client() function.
    # When running remotely, the Client() function will utilize the deployed Dask cluster.
    array = da.random.random(size)
    return float(array.mean().compute())

You can execute the task locally as follows:

if __name__ == "__main__":
    print(hello_dask(size=1000))