This work is supported by Continuum Analytics,the XDATA Program,and the Data Driven Discovery Initiative from the MooreFoundation.
I’m pleased to announce the release of Dask version 0.14.1. This releasecontains a variety of performance and feature improvements. This blogpostincludes some notable features and changes since the last release on February27th.
As always you can conda install from conda-forge
conda install -c conda-forge dask distributed
or you can pip install from PyPI
pip install dask[complete] --upgrade
Recent work in distributed computing and machine learning have motivated newperformance-oriented and usability changes to how we handle arrays.
Many interactions between Dask arrays and NumPy arrays work smoothly. NumPyarrays are made lazy and are appropriately chunked to match the operationand the Dask array.
>>> x = np.ones(10) # a numpy array
>>> y = da.arange(10, chunks=(5,)) # a dask array
>>> z = x + y # combined become a dask.array
>>> z
dask.array<add, shape=(10,), dtype=float64, chunksize=(5,)>
>>> z.compute()
array([ 1., 2., 3., 4., 5., 6., 7., 8., 9., 10.])
Reshaping distributed arrays is simple in simple cases, and can be quitecomplex in complex cases. Reshape now supports a much more broad set of shapetransformations where any dimension is collapsed or merged to other dimensions.
>>> x = da.ones((2, 3, 4, 5, 6), chunks=(2, 2, 2, 2, 2))
>>> x.reshape((6, 2, 2, 30, 1))
dask.array<reshape, shape=(6, 2, 2, 30, 1), dtype=float64, chunksize=(3, 1, 2, 6, 1)>
This operation ends up being quite useful in a number of distributed arraycases.
Dask.array slicing optimizations are now careful to produce graphs that avoidsituations that could cause excess inter-worker communication. The details ofhow they do this is a bit out of scope for a short blogpost, but the historyhere is interesting.
Historically dask.arrays were used almost exclusively by researchers with largeon-disk arrays stored as HDF5 or NetCDF files. These users primarily used thesingle machine multi-threaded scheduler. We heavily tailored Dask arrayoptimizations to this situation and made that community pretty happy.Now as some of that community switches to cluster computing on larger datasetsthe optimization goals shift a bit. We have tons of distributed disk bandwidthbut really want to avoid communicating large results between workers.Supporting both use cases is possible and I think that we’ve achieved that inthis release so far, but it’s starting to require increasing levels of care.
With distributed computing also comes larger graphs and a growing importance ofgraph-creation overhead. This has been optimized somewhat in this release. Weexpect this to be a focus going forward.
Set_index is smarter in two ways:
We’ve micro-optimized some parts of dataframe shuffles. Big thanks to thePandas developers for the help here. This accelerates set_index, joins,groupby-applies, and so on.
The fastparquet library hasseen a lot of use lately and has undergone a number of community bugfixes.
Importantly, Fastparquet now supports Python 2.
We strongly recommend Parquet as the standard data storage format for Daskdataframes (and Pandas DataFrames).
Debugging is hard in part because exceptions happen on remote machines wherenormal debugging tools like pdb can’t reach. Previously we were able tobring back the traceback and exception, but you couldn’t dive into the stacktrace to investigate what went wrong:
def div(x, y):
return x / y
>>> future = client.submit(div, 1, 0)
>>> future
<Future: status: error, key: div-4a34907f5384bcf9161498a635311aeb>
>>> future.result() # getting result re-raises exception locally
<ipython-input-3-398a43a7781e> in div()
1 def div(x, y):
----> 2 return x / y
ZeroDivisionError: division by zero
Now Dask can bring a failing task and all necessary data back to the localmachine and rerun it so that users can leverage the normal Python debuggingtoolchain.
>>> client.recreate_error_locally(future)
<ipython-input-3-398a43a7781e> in div(x, y)
1 def div(x, y):
----> 2 return x / y
ZeroDivisionError: division by zero
Now if you’re in IPython or a Jupyter notebook you can use the %debug magicto jump into the stacktrace, investigate local variables, and so on.
In [8]: %debug
> <ipython-input-3-398a43a7781e>(2)div()
1 def div(x, y):
----> 2 return x / y
ipdb> pp x
1
ipdb> pp y
0
Dask.distributed uses Tornado for network communication and Tornado coroutinesfor concurrency. Normal users rarely interact with Tornado coroutines; theyaren’t familiar to most people so we opted instead to copy theconcurrent.futures API. However some complex situations are much easier tosolve if you know a little bit of async programming.
Fortunately, the Python ecosystem seems to be embracing this change towardsnative async code with the async/await syntax in Python 3. In an effort tomotivate people to learn async programming and to gently nudge them towardsPython 3 Dask.distributed we now support async/await in a few cases.
You can wait on a dask Future
async def f():
future = client.submit(func, *args, **kwargs)
result = await future
You can put the as_completed iterator into an async for loop
async for future in as_completed(futures):
result = await future
... do stuff with result ...
And, because Tornado supports the await protocols you can also use the existingshadow concurrency API (everything prepended with an underscore) with await.(This was doable before.)
results = client.gather(futures) # synchronous
...
results = await client._gather(futures) # asynchronous
If you’re in Python 2 you can always do this with normal yield andthe tornado.gen.coroutine decorator.
In the last release we enabled Dask to communicate over more things than justTCP. In practice this doesn’t come up (TCP is pretty useful). However in thisrelease we now support single-machine “clusters” where the clients, scheduler,and workers are all in the same process and transfer data cost-free overin-memory queues.
This allows the in-memory user community to use some of the more advancedfeatures (asynchronous computation, spill-to-disk support, web-diagnostics)that are only available in the distributed scheduler.
This is on by default if you create a cluster with LocalCluster without usingNanny processes.
>>> from dask.distributed import LocalCluster, Client
>>> cluster = LocalCluster(nanny=False)
>>> client = Client(cluster)
>>> client
<Client: scheduler='inproc://192.168.1.115/8437/1' processes=1 cores=4>
>>> from threading import Lock # Not serializable
>>> lock = Lock() # Won't survive going over a socket
>>> [future] = client.scatter([lock]) # Yet we can send to a worker
>>> future.result() # ... and back
<unlocked _thread.lock object at 0x7fb7f12d08a0>
Workers now maintain a pool of sustained connections between each other. Thispool is of a fixed size and removes connections with a least-recently-usedpolicy. It avoids re-connection delays when transferring data between workers.In practice this shaves off a millisecond or two from every communication.
This is actually a revival of an old feature that we had turned off last yearwhen it became clear that the performance here wasn’t a problem.
Along with other enhancements, this takes our round-trip latency down to 11mson my laptop.
In [10]: %%time
...: for i in range(1000):
...: future = client.submit(inc, i)
...: result = future.result()
...:
CPU times: user 4.96 s, sys: 348 ms, total: 5.31 s
Wall time: 11.1 s
There may be room for improvement here though. For comparison here is the sametest with the concurent.futures.ProcessPoolExecutor.
In [14]: e = ProcessPoolExecutor(8)
In [15]: %%time
...: for i in range(1000):
...: future = e.submit(inc, i)
...: result = future.result()
...:
CPU times: user 320 ms, sys: 56 ms, total: 376 ms
Wall time: 442 ms
Also, just to be clear, this measures total roundtrip latency, not overhead.Dask’s distributed scheduler overhead remains in the low hundreds ofmicroseconds.
There has been activity around Dask and machine learning:
The following people contributed to the dask/dask repository since the 0.14.0 releaseon February 27th
The following people contributed to the dask/distributed repository since the1.16.0 release on February 27th