Submit New Event

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Submit News Feature

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Contribute a Blog

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Sign up for Newsletter

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Sep 22, 2016

Dask Cluster Deployments

By

This work is supported by Continuum Analyticsand the XDATA Programas part of the Blaze Project

All code in this post is experimental. It should not be relied upon. Forpeople looking to deploy dask.distributed on a cluster please refer instead tothe documentation instead.

Dask is deployed today on the following systems in the wild:

  • SGE
  • SLURM,
  • Torque
  • Condor
  • LSF
  • Mesos
  • Marathon
  • Kubernetes
  • SSH and custom scripts
  • … there may be more. This is what I know of first-hand.

These systems provide users access to cluster resources and ensure thatmany distributed services / users play nicely together. They’re essential forany modern cluster deployment.

The people deploying Dask on these cluster resource managers are power-users;they know how their resource managers work and they read the documentation onhow to setup Daskclusters. Generallythese users are pretty happy; however we should reduce this barrier so thatnon-power-users with access to a cluster resource manager can use Dask on theircluster just as easily.

Unfortunately, there are a few challenges:

  1. Several cluster resource managers exist, each with significant adoption.Finite developer time stops us from supporting all of them.
  2. Policies for scaling out vary widely.For example we might want a fixed number of workers, or we might wantworkers that scale out based on current use. Different groups will wantdifferent solutions.
  3. Individual cluster deployments are highly configurable. Dask needs to getout of the way quickly and let existing technologies configure themselves.

This post talks about some of these issues. It does not contain a definitivesolution.

Example: Kubernetes

For example, both Olivier Griesl (INRIA, scikit-learn)and Tim O’Donnell (Mount Sinai, Hammer lab)publish instructions on how to deploy Dask.distributed onKubernetes.

These instructions are well organized. They include Dockerfiles, publishedimages, Kubernetes config files, and instructions on how to interact with cloudproviders’ infrastructure. Olivier and Tim both obviously know what they’redoing and care about helping others to do the same.

Tim (who came second) wasn’t aware of Olivier’s solution and wrote up his own.Tim was capable of doing this but many beginners wouldn’t be.

One solution would be to include a prominent registry of solutions likethese within Dask documentation so that people can find quality references touse as starting points. I’ve started a list of resources here:dask/distributed #547 commentspointing to other resources would be most welcome..

However, even if Tim did find Olivier’s solution I suspect he would still needto change it. Tim has different software and scalability needs than Olivier.This raises the question of “What should Dask provide and what should it leaveto administrators?” It may be that the best we can do is to supportcopy-paste-edit workflows.

What is Dask-specific, resource-manager specific, and what needs to beconfigured by hand each time?

Adaptive Deployments

In order to explore this topic of separable solutions I built a small adaptivedeployment system for Dask.distributed onMarathon, an orchestration platformon top of Mesos.

This solution does two things:

  1. It scales a Dask cluster dynamically based on the current use. If thereare more tasks in the scheduler then it asks for more workers.
  2. It deploys those workers using Marathon.

To encourage replication, these two different aspects are solved in two different pieces of code with a clean API boundary.

  1. A backend-agnostic piece for adaptivity that says when to scale workers upand how to scale them down safely
  2. A Marathon-specific piece that deploys or destroys dask-workers using theMarathon HTTP API

This combines a policy, adaptive scaling, with a backend, Marathon suchthat either can be replaced easily. For example we could replace the adaptivepolicy with a fixed one to always keep N workers online, or we could replaceMarathon with Kubernetes or Yarn.

My hope is that this demonstration encourages others to develop third partypackages. The rest of this post will be about diving into this particularsolution.

Adaptivity

The distributed.deploy.Adaptive class wraps around a Scheduler anddetermines when we should scale up and by how many nodes, and when we shouldscale down specifying which idle workers to release.

The current policy is fairly straightforward:

  1. If there are unassigned tasks or any stealable tasks and no idle workers,or if the average memory use is over 50%, then increase the number ofworkers by a fixed factor (defaults to two).
  2. If there are idle workers and the average memory use is below 50% thenreclaim the idle workers with the least data on them (after moving data tonearby workers) until we’re near 50%

Think this policy could be improved or have other thoughts? Great. It waseasy to implement and entirely separable from the main code so you should beable to edit it easily or create your own. The current implementation is about80 lines(source).

However, this Adaptive class doesn’t actually know how to perform thescaling. Instead it depends on being handed a separate object, with twomethods, scale_up and scale_down:

class MyCluster(object):
def scale_up(n):
"""
Bring the total count of workers up to ``n``

This function/coroutine should bring the total number of workers up to
the number ``n``.
"""
raise NotImplementedError()

def scale_down(self, workers):
"""
Remove ``workers`` from the cluster

Given a list of worker addresses this function should remove those
workers from the cluster.
"""
raise NotImplementedError()

This cluster object contains the backend-specific bits of how to scale up anddown, but none of the adaptive logic of when to scale up and down. Thesingle-machineLocalClusterobject serves as reference implementation.

So we combine this adaptive scheme with a deployment scheme. We’ll use a tinyDask-Marathon deployment library availablehere

from dask_marathon import MarathonCluster
from distributed import Scheduler
from distributed.deploy import Adaptive

s = Scheduler()
mc = MarathonCluster(s, cpus=1, mem=4000,
docker_image='mrocklin/dask-distributed')
ac = Adaptive(s, mc)

This combines a policy, Adaptive, with a deployment scheme, Marathon in acomposable way. The Adaptive cluster watches the scheduler and calls thescale_up/down methods on the MarathonCluster as necessary.

Marathon code

Because we’ve isolated all of the “when” logic to the Adaptive code, theMarathon specific code is blissfully short and specific. We include a slightlysimplified version below. There is a fair amount of Marathon-specific setup inthe constructor and then simple scale_up/down methods below:

from marathon import MarathonClient, MarathonApp
from marathon.models.container import MarathonContainer


class MarathonCluster(object):
def __init__(self, scheduler,
executable='dask-worker',
docker_image='mrocklin/dask-distributed',
marathon_address='http://localhost:8080',
name=None, cpus=1, mem=4000, **kwargs):
self.scheduler = scheduler

# Create Marathon App to run dask-worker
args = [
executable,
scheduler.address,
'--nthreads', str(cpus),
'--name', '$MESOS_TASK_ID', # use Mesos task ID as worker name
'--worker-port', '$PORT_WORKER',
'--nanny-port', '$PORT_NANNY',
'--http-port', '$PORT_HTTP'
]

ports = [{'port': 0,
'protocol': 'tcp',
'name': name}
for name in ['worker', 'nanny', 'http']]

args.extend(['--memory-limit',
str(int(mem * 0.6 * 1e6))])

kwargs['cmd'] = ' '.join(args)
container = MarathonContainer({'image': docker_image})

app = MarathonApp(instances=0,
container=container,
port_definitions=ports,
cpus=cpus, mem=mem, **kwargs)

# Connect and register app
self.client = MarathonClient(marathon_address)
self.app = self.client.create_app(name or 'dask-%s' % uuid.uuid4(), app)

def scale_up(self, instances):
self.client.scale_app(self.app.id, instances=instances)

def scale_down(self, workers):
for w in workers:
self.client.kill_task(self.app.id,
self.scheduler.worker_info[w]['name'],
scale=True)

This isn’t trivial, you need to know about Marathon for this to make sense, butfortunately you don’t need to know much else. My hope is that people familiarwith other cluster resource managers will be able to write similar objects andwill publish them as third party libraries as I have with this Marathonsolution here:https://github.com/mrocklin/dask-marathon(thanks goes to Ben Zaitlen for setting up a great testing harness for this andgetting everything started.)

Adaptive Policies

Similarly, we can design new policies for deployment. You can read more aboutthe policies for the Adaptive class in thedocumentation orthesource(about eighty lines long). I encourage people to implement and use otherpolicies and contribute back those policies that are useful in practice.

Final thoughts

We laid out a problem

  • How does a distributed system support a variety of cluster resource managersand a variety of scheduling policies while remaining sensible?

We proposed two solutions:

  1. Maintain a registry of links to solutions, supporting copy-paste-edit practices
  2. Develop an API boundary that encourages separable development of third party libraries.

It’s not clear that either solution is sufficient, or that the currentimplementation of either solution is any good. This is is an important problemthough as Dask.distributed is, today, still mostly used by super-users. Iwould like to engage community creativity here as we search for a goodsolution.