This work is supported by Continuum Analyticsand the XDATA Programas part of the Blaze Project
tl;dr: We demonstrate a prototype distributed computing library and discussdata locality.
Here’s a new prototype library for distributed computing.It could use some critical feedback.
This blogpost uses distributed on a toy example. I won’t talk about thedesign here, but the docs should be a quick and informative read. I recommendthe quickstartin particular.
We’re going to do a simple computation a few different ways on a cluster offour nodes. The computation will be
We’ll do this directly with a distributed Pool and again with a dask graph.
I have a cluster of four m3.xlarges on EC2
ssh node1
dcenter
ssh node2
dworkder node1:8787
ssh node3
dworkder node1:8787
ssh node4
dworkder node1:8787
Notes on how I set up my cluster.
On the client side we spin up a distributed Pool and point it to the centernode.
>>> from distributed import Pool
>>> pool = Pool('node1:8787')
Then we create a bunch of random numpy arrays:
>>> import numpy as np
>>> arrays = pool.map(np.random.random, [1000000] * 1000)
Our result is a list of proxy objects that point back to individual numpy arrayson the worker computers. We don’t move data until we need to. (Though wecould call .get() on this to collect the numpy array from the worker.)
>>> arrays[0]
RemoteData<center=10.141.199.202:8787, key=3e446310-6...>
Further computations on this data happen on the cluster, on the worker nodesthat hold the data already.
>>> sums = pool.map(np.sum, arrays)
This avoids costly data transfer times. Data transfer will happen whennecessary though, as when we compute the final sum. This forces communicationbecause all of the intermediate sums must move to one node for the finaladdition.
>>> total = pool.apply(np.sum, args=(sums,))
>>> total.get() # finally transfer result to local machine
499853416.82058007
Now we do the same computation all at once by manually constructing a daskgraph (beware, this can get gnarly, friendlier approaches exist below.)
>>> dsk = dict()
>>> for i in range(1000):
... dsk[('x', i)] = (np.random.random, 1000000)
... dsk[('sum', i)] = (np.sum, ('x', i))
>>> dsk['total'] = (sum, [('sum', i) for i in range(1000)])
>>> from distributed.dask import get
>>> get('node1', 8787, dsk, 'total')
500004095.00759566
Apparently not everyone finds dask dictionaries to be pleasant to write byhand. You could also use this with dask.imperative or dask.array.
def get2(dsk, keys):
""" Make `get` scheduler that hardcodes the IP and Port """
return get('node1', 8787, dsk, keys)
>>> from dask.imperative import do
>>> arrays = [do(np.random.random)(1000000) for i in range(1000)]
>>> sums = [do(np.sum)(x) for x in arrays]
>>> total = do(np.sum)(sums)
>>> total.compute(get=get2)
499993637.00844824
>>> import dask.array as da
>>> x = da.random.random(1000000*1000, chunks=(1000000,))
>>> x.sum().compute(get=get2)
500000250.44921482
The dask approach was smart enough to delete all of the intermediates that itdidn’t need. It could have run intelligently on far more data than even ourcluster could hold. With the pool we manage data ourselves manually.
>>> from distributed import delete
>>> delete(('node0', 8787), arrays)
We can also mix these abstractions and put the results from the pool into daskgraphs.
>>> arrays = pool.map(np.random.random, [1000000] * 1000)
>>> dsk = {('sum', i): (np.sum, x) for i, x in enumerate(arrays)}
>>> dsk['total'] = (sum, [('sum', i) for i in range(1000)])
The Pool and get user interfaces are independent from each other but bothuse the same underlying network and both build off of the same codebase. Withdistributed I wanted to build a system that would allow me to experimenteasily. I’m mostly happy with the result so far.
One non-trivial theme here is data-locality. We keep intermediate results onthe cluster and schedule jobs on computers that already have the relevant dataif possible. The workers can communicate with each other if necessary so thatany worker can do any job, but we try to arrange jobs so that workers don’thave to communicate if not necessary.
Another non-trivial aspect is that the high level dask.array example workswithout any tweaking of dask. Dask’s separation of schedulers from collectionsmeans that existing dask.array code (or dask.dataframe, dask.bag,dask.imperative code) gets to evolve as we experiment with new fancierschedulers.
Finally, I hope that the cluster setup here feels pretty minimal. You do needsome way to run a command on a bunch of machines but most people with clustershave some mechanism to do that, even if its just ssh as I did above. My hopeis that distributed lowers the bar for non-trivial cluster computing inPython.
Everything here is very experimental. The library itself is brokenand unstable. It was made in the last few weeks and hasn’t been used onanything serious. Please adjust expectations accordingly andprovide critical feedback.