Go to the end to download the full example code
Writing a Dask Task#
Flyte has an optional plugin that makes it possible to run Dask jobs natively on your
kubernetes cluster. It makes it extremely easy to run your
dask code as a task. The plugin creates a new
virtual/ephemeral cluster for each
dask task, where Flyte manages the cluster lifecycle.
Dask in flytekit#
For a more complete example refer to the How Flytekit Simplifies the Usage of dask in User Code
daskplugin in the backend, following the steps from the previous section
Install the flytekit dask plugin
pip install flytekitplugins-dask
daskcode - with one change in the
@taskdecorator. Refer to the example below:
@task( task_config=Dask( job_pod_spec=JobPodSpec( limits=Resources(cpu="1", mem="2Gi"), ), cluster=DaskCluster( n_workers=10, limits=Resources(cpu="4", mem="10Gi"), ), ), cache_version="1", cache=True, ) def hello_dask(size: int) -> float: ... client = Client() # Create a client as you would in local code # Regular dask code ...
Run it locally
Use it in a workflow
Run it on a remote cluster
How Flytekit Simplifies the Usage of dask in User Code#
hello_dask runs a new dask cluster, which when run locally starts a local
dask cluster, but when run
remote spins up an arbitrarily sized cluster depending on the specified dask configuration.
The following imports are required to configure the Dask cluster in Flyte
from flytekitplugins.dask import Dask, WorkerGroup
Dask Task Sample#
This example shows how a Dask task can be written simply by adding a
@task(task_config=Dask(...), ...) decorator.
Refer to the Dask
class to understand the various configuration options.
@task( task_config=Dask( workers=WorkerGroup( number_of_workers=10, limits=Resources(cpu="4", mem="10Gi"), ), ), limits=Resources(cpu="1", mem="2Gi"), cache_version="1", cache=True, ) def hello_dask(size: int) -> float: # Dask will implicitly create a Client in the background by calling Client(). When executing # remotely, this Client() will use the deployed ``dask`` cluster. array = da.random.random(size) return float(array.mean().compute())
The function can be executed locally: Guard with:
if __name__ == ‘__main__’:
in a local Python script
Total running time of the script: ( 0 minutes 0.000 seconds)