Writing a Dask Task#

Flyte has an optional plugin that makes it possible to run Dask jobs natively on your kubernetes cluster. It makes it extremely easy to run your dask code as a task. The plugin creates a new virtual/ephemeral cluster for each dask task, where Flyte manages the cluster lifecycle.

Dask in flytekit#

For a more complete example refer to the How Flytekit Simplifies the Usage of dask in User Code

  1. Enable the dask plugin in the backend, following the steps from the previous section

  2. Install the flytekit dask plugin

    pip install flytekitplugins-dask
    
  3. Write regular dask code - with one change in the @task decorator. Refer to the example below:

    @task(
        task_config=Dask(
            job_pod_spec=JobPodSpec(
                limits=Resources(cpu="1", mem="2Gi"),
            ),
            cluster=DaskCluster(
                n_workers=10,
                limits=Resources(cpu="4", mem="10Gi"),
            ),
        ),
        cache_version="1",
        cache=True,
    )
    def hello_dask(size: int) -> float:
        ...
        client = Client()  # Create a client as you would in local code
        # Regular dask code
        ...
    
  4. Run it locally

    hello_dask(size=10)
    
  5. Use it in a workflow

  6. Run it on a remote cluster

Examples#

How Flytekit Simplifies the Usage of dask in User Code#

The task hello_dask runs a new dask cluster, which when run locally starts a local dask cluster, but when run remote spins up an arbitrarily sized cluster depending on the specified dask configuration.

from dask import array as da
from flytekit import Resources, task

The following imports are required to configure the Dask cluster in Flyte

from flytekitplugins.dask import Dask, WorkerGroup

Dask Task Sample#

This example shows how a Dask task can be written simply by adding a @task(task_config=Dask(...), ...) decorator. Refer to the Dask class to understand the various configuration options.

@task(
    task_config=Dask(
        workers=WorkerGroup(
            number_of_workers=10,
            limits=Resources(cpu="4", mem="10Gi"),
        ),
    ),
    limits=Resources(cpu="1", mem="2Gi"),
    cache_version="1",
    cache=True,
)
def hello_dask(size: int) -> float:
    # Dask will implicitly create a Client in the background by calling Client(). When executing
    # remotely, this Client() will use the deployed ``dask`` cluster.
    array = da.random.random(size)
    return float(array.mean().compute())

The function can be executed locally: Guard with:

if __name__ == ‘__main__’:

in a local Python script

print(hello_dask(size=1000))

Total running time of the script: ( 0 minutes 0.000 seconds)

Gallery generated by Sphinx-Gallery