Submit New Event

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Submit News Feature

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Contribute a Blog

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Sign up for Newsletter

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Jun 23, 2015

Distributed Scheduling

By

This work is supported by Continuum Analyticsand the XDATA Programas part of the Blaze Project

tl;dr: We evaluate dask graphs with a variety of schedulers and introduce anew distributed memory scheduler.

Dask.distributed is new and is not battle-tested. Use at your own risk andadjust expectations accordingly.

Evaluate dask graphs

Most dask users use the dask collections, Array, Bag, andDataFrame. These collections are convenient ways to producedask graphs. A dask graph is a dictionary of tasks. A task is a tuple with afunction and arguments.

The graph comprising a dask collection (like a dask.array) is available throughits .dask attribute.

>>> import dask.array as da
>>> x = da.arange(15, chunks=(5,)) # 0..14 in three chunks of size five

>>> x.dask # dask array holds the graph to create the full array
{('x', 0): (np.arange, 0, 5),
('x', 1): (np.arange, 5, 10),
('x', 2): (np.arange, 10, 15)}

Further operations on x create more complex graphs

>>> z = (x + 100).sum()
>>> z.dask
{('x', 0): (np.arange, 0, 5),
('x', 1): (np.arange, 5, 10),
('x', 2): (np.arange, 10, 15),
('y', 0): (add, ('x', 0), 100),
('y', 1): (add, ('x', 1), 100),
('y', 2): (add, ('x', 2), 100),
('z', 0): (np.sum, ('y', 0)),
('z', 1): (np.sum, ('y', 1)),
('z', 2): (np.sum, ('y', 2)),
('z',): (sum, [('z', 0), ('z', 1), ('z', 2)])}

Hand-made dask graphs

We can make dask graphs by hand without dask collections. This involvescreating a dictionary of tuples of functions.

>>> def add(a, b):
... return a + b

>>> # x = 1
>>> # y = 2
>>> # z = add(x, y)

>>> dsk = {'x': 1,
... 'y': 2,
... 'z': (add, 'x', 'y')}

We evaluate these graphs with one of the dask schedulers

>>> from dask.threaded import get
>>> get(dsk, 'z') # Evaluate graph with multiple threads
3

>>> from dask.multiprocessing import get
>>> get(dsk, 'z') # Evaluate graph with multiple processes
3

We separate the evaluation of the graphs from their construction.

Distributed Scheduling

The separation of graphs from evaluation allows us to create new schedulers.In particular there exists a scheduler that operates on multiple machines inparallel, communicating over ZeroMQ.

This system has a single centralized scheduler, several workers, andpotentially several clients.

Workers and clients connecting to a sheduler

Clients send graphs to the central scheduler which farms out those tasks toworkers and coordinates the execution of the graph. While the schedulercentralizes metadata, the workers themselves handle transfer of intermediatedata in a peer-to-peer fashion. Once the graph completes the workers send datato the scheduler which passes it through to the appropriate user/client.

Example

And so now we can execute our dask graphs in parallel across multiple machines.

$ ipython # On your laptop $ ipython # Remote Process #1: Scheduler
>>> def add(a, b): >>> from dask.distributed import Scheduler
... return a + b >>> s = Scheduler(port_to_workers=4444,
... port_to_clients=5555,
>>> dsk = {'x': 1, ... hostname='notebook')
... 'y': 2,
... 'z': (add, 'x', 'y')} $ ipython # Remote Process #2: Worker
>>> from dask.distributed import Worker
>>> from dask.threaded import get >>> w = Worker('tcp://notebook:4444')
>>> get(dsk, 'z') # use threads
3 $ ipython # Remote Process #3: Worker
>>> from dask.distributed import Worker
>>> w = Worker('tcp://notebook:4444')

>>> from dask.distributed import Client
>>> c = Client('tcp://notebook:5555')

>>> c.get(dsk, 'z') # use distributed network
3

Choose Your Scheduler

This graph is small. We didn’t need a distributed network of machinesto compute it (a single thread would have been much faster)but this simple example can be easily extended to more important cases,including generic use with the dask collections (Array, Bag, DataFrame). Youcan control the scheduler with a keyword argument to any compute call.

>>> import dask.array as da
>>> x = da.random.normal(10, 0.1, size=(1000000000,), chunks=(1000000,))

>>> x.mean().compute(get=get) # use threads
>>> x.mean().compute(get=c.get) # use distributed network

Alternatively you can set the default scheduler in dask with dask.set_options

>>> import dask
>>> dask.set_options(get=c.get) # use distributed scheduler by default

Known Limitations

We intentionally made the simplest and dumbest distributed scheduler we couldthink of. Because dask separates graphs from schedulers we can iterate on thisproblem many times; building better schedulers after learning what isimportant. This current scheduler learns from our single-memory system but isthe first dask scheduler that has to think about distributed memory. As aresult it has the following known limitations:

  1. It does not consider data locality. While linear chains of tasks willexecute on the same machine we don’t think much about executing multi-inputtasks on nodes where only some of the data is local.
  2. In particular, this scheduler isn’t optimized for data-local file-systemslike HDFS. It’s still happy to read data from HDFS, but this results inunnecessary network communication. We’ve found that it’s great when pairedwith S3.
  3. This scheduler is new and hasn’t yet had its tires kicked. Vocal betausers are most welcome.
  4. We haven’t thought much about deployment. E.g. somehow you need to sshinto a bunch of machines and start up workers, then tear them down when you’redone. Dask.distributed can bootstrap off of anIPython Parallel cluster,and we’ve integrated it intoanaconda-cluster but deployment remainsa tough problem.

The dask.distributed module is available in the last release but I suggestusing the development master branch. There will be another release in earlyJuly.

Further Information

Blake Griffith has been playing withdask.distributed and dask.bag together on data fromhttp://githubarchive.org. He plans to write ablogpost to give a better demonstration of the use of dask.distributed onreal world problems. Look for that post in the next week or two.

You can read more about the internal design of dask.distributed at thedask docs.

Thanks

Special thanks to Min Regan-Kelley,John Jacobsen,Ben Zaitlen,and Hugo Shifor their advice on building distributed systems.

Also thanks to Blake Griffith for serving asoriginal user/developer and for smoothing over the user experience.