Submit New Event

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Submit News Feature

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Contribute a Blog

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Sign up for Newsletter

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Nov 1, 2019

Dask Deployment Updates

By

Summary

Over the last six months many Dask developers have worked on making Dask easierto deploy in a wide variety of situations. This post summarizes thoseefforts, and provides links to ongoing work.

What we mean by Deployment

In order to run Dask on a cluster, you need to setup a scheduler on onemachine:

$ dask-scheduler
Scheduler running at tcp://192.168.0.1

And start Dask workers on many other machines

$ dask-worker tcp://192.168.0.1
Waiting to connect to: tcp://scheduler:8786

$ dask-worker tcp://192.168.0.1
Waiting to connect to: tcp://scheduler:8786

$ dask-worker tcp://192.168.0.1
Waiting to connect to: tcp://scheduler:8786

$ dask-worker tcp://192.168.0.1
Waiting to connect to: tcp://scheduler:8786

For informal clusters people might do this manually, by logging into eachmachine and running these commands themselves. However it’s much more commonto use a cluster resource manager such as Kubernetes, Yarn (Hadoop/Spark),HPC batch schedulers (SGE, PBS, SLURM, LSF …), some cloud service or some custom system.

As Dask is used by more institutions and used more broadly within thoseinstitutions, making deployment smooth and natural becomes increasinglyimportant. This is so important in fact, that there have been seven separateefforts to improve deployment in some regard or another by a few differentgroups.

We’ll briefly summarize and link to this work below, and then we’ll finish upby talking about some internal design that helped to make this work moreconsistent.

Dask-SSH

According to our user survey, the most common deployment mechanism was stillSSH. Dask has had a command line dask-sshtool to make iteasier to deploy with SSH for some time. We recently updated this to alsoinclude a Python interface, which provides more control.

>>> from dask.distributed import Client, SSHCluster
>>> cluster = SSHCluster(
... ["host1", "host2", "host3", "host4"],
... connect_options={"known_hosts": None},
... worker_options={"nthreads": 2},
... scheduler_options={"port": 0, "dashboard_address": ":8797"}
... )
>>> client = Client(cluster)

This isn’t what we recommend for large institutions, but it can be helpful formore informal groups who are just getting started.

Dask-Jobqueue and Dask-Kubernetes Rewrite

We’ve rewritten Dask-Jobqueue for SLURM/PBS/LSF/SGE cluster managers typicallyfound in HPC centers and Dask-Kubernetes. These now share a common codebasealong with Dask SSH, and so are much more consistent and hopefully bug free.

Ideally users shouldn’t notice much difference with existing workloads,but new features like asynchronous operation, integration with the DaskJupyterLab extension, and so on are more consistently available. Also, we’vebeen able to unify development and reduce our maintenance burden considerably.

The new version of Dask Jobqueue where these changes take place is 0.7.0, andthe work was done in dask/dask-jobqueue #307.The new version of Dask Kubernetes is 0.10.0 and the work was done indask/dask-kubernetes #162.

Dask-CloudProvider

For cloud deployments we generally recommend using a hosted Kubernetes or Yarnservice, and then using Dask-Kubernetes or Dask-Yarn on top of these.

However, some institutions have made decisions or commitments to usecertain vendor specific technologies, and it’s more convenient to use APIs thatare more native to the particular cloud. The new package DaskCloudprovider handles this today for Amazon’sECS API, which has been around for a long while and is more universallyaccepted.

from dask_cloudprovider import ECSCluster
cluster = ECSCluster(cluster_arn="arn:aws:ecs:<region>:<acctid>:cluster/<clustername>")

from dask_cloudprovider import FargateCluster
cluster = FargateCluster()

Dask-Gateway

In some cases users may not have access to the cluster manager. For examplethe institution may not give all of their data science users access to the Yarnor Kubernetes cluster. In this case the Dask-Gatewayproject may be useful.It can launch and manage Dask jobs,and provide a proxy connection to these jobs if necessary.It is typically deployed with elevated permissions but managed directly by IT,giving them a point of greater control.

GPUs and Dask-CUDA

While using Dask with multi-GPU deployments the NVIDIARAPIDS has needed the ability to specify increasinglycomplex setups of Dask workers. They recommend the following deploymentstrategy:

  1. One Dask-worker per GPU on a machine
  2. Specify the CUDA_VISIBLE_DEVICES environment variable to pin that workerto that GPU
  3. If your machine has multiple network interfaces then choose the network interface that has the best connection to that GPU
  4. If your machine has multiple CPUs then set thread affinities to use the closest CPU
  5. … and more

For this reason we wanted to specify these configurations in code, like thefollowing:

specification = {
"worker-0": {
"cls": dask.distributed.Nanny,
"options": {"nthreads": 1, "env": {"CUDA_VISIBLE_DEVICES": "0,1,2,3"}, interface="ib0"},
},
"worker-1": {
"cls": dask.distributed.Nanny,
"options": {"nthreads": 1, "env": {"CUDA_VISIBLE_DEVICES": "1,2,3,0"}, interface="ib0"},
},
"worker-2": {
"cls": dask.distributed.Nanny,
"options": {"nthreads": 1, "env": {"CUDA_VISIBLE_DEVICES": "2,3,0,1"}, interface="ib1"},
},
"worker-2": {
"cls": dask.distributed.Nanny,
"options": {"nthreads": 1, "env": {"CUDA_VISIBLE_DEVICES": "3,0,1,2"}, interface="ib1"},
},
}

And the new SpecCluster class to deploy these workers:

cluster = SpecCluster(workers=specification)

We’ve used this technique in theDask-CUDA project to provideconvenient functions for deployment on multi-GPU systems.

This class was generic enough that it ended up forming the base of the SSH,Jobqueue, and Kubernetes solutions as well.

Standards and Conventions

The solutions above are built by different teams that work in different companies.This is great because those teams have hands-on experience with thecluster managers in the wild, but has historically been somewhat challenging tostandardize user experience. This is particularly challenging when we buildother tools like IPython widgets or the Dask JupyterLab extension, which wantto interoperate with all of the Dask deployment solutions.

The recent rewrite of Dask-SSH, Dask-Jobqueue, Dask-Kubernetes, and the newDask-Cloudprovider and Dask-CUDA libraries place themall under the same dask.distributed.SpecCluster superclass. So we can expect a high degree ofuniformity from them. Additionally, all of the classes now match thedask.distributed.Cluster interface, which standardizes things likeadaptivity, IPython widgets, logs, and some basic reporting.

  • Cluster
  • SpecCluster
  • Kubernetes
  • JobQueue (PBS/SLURM/LSF/SGE/Torque/Condor/Moab/OAR)
  • SSH
  • CloudProvider (ECS)
  • CUDA (LocalCUDACluster, DGX)
  • LocalCluster
  • Yarn
  • Gateway

Future Work

There is still plenty to do. Here are some of the themes we’ve seen amongcurrent development:

  1. Move the Scheduler off to a separate job/pod/container in the network,which is often helpful for complex networking situations
  2. Improve proxying of the dashboard in these situations
  3. Optionally separate the life-cycle of the cluster from the lifetime of thePython process that requested the cluster
  4. Write up best practices how to compose GPU support generally with all of the cluster managers