Submit New Event

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Submit News Feature

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Contribute a Blog

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Sign up for Newsletter

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Mar 23, 2017

Dask Release 0.14.1

By

This work is supported by Continuum Analytics,the XDATA Program,and the Data Driven Discovery Initiative from the MooreFoundation.

I’m pleased to announce the release of Dask version 0.14.1. This releasecontains a variety of performance and feature improvements. This blogpostincludes some notable features and changes since the last release on February27th.

As always you can conda install from conda-forge

conda install -c conda-forge dask distributed

or you can pip install from PyPI

pip install dask[complete] --upgrade

Arrays

Recent work in distributed computing and machine learning have motivated newperformance-oriented and usability changes to how we handle arrays.

Automatic chunking and operation on NumPy arrays

Many interactions between Dask arrays and NumPy arrays work smoothly. NumPyarrays are made lazy and are appropriately chunked to match the operationand the Dask array.

>>> x = np.ones(10) # a numpy array
>>> y = da.arange(10, chunks=(5,)) # a dask array
>>> z = x + y # combined become a dask.array
>>> z
dask.array<add, shape=(10,), dtype=float64, chunksize=(5,)>

>>> z.compute()
array([ 1., 2., 3., 4., 5., 6., 7., 8., 9., 10.])

Reshape

Reshaping distributed arrays is simple in simple cases, and can be quitecomplex in complex cases. Reshape now supports a much more broad set of shapetransformations where any dimension is collapsed or merged to other dimensions.

>>> x = da.ones((2, 3, 4, 5, 6), chunks=(2, 2, 2, 2, 2))
>>> x.reshape((6, 2, 2, 30, 1))
dask.array<reshape, shape=(6, 2, 2, 30, 1), dtype=float64, chunksize=(3, 1, 2, 6, 1)>

This operation ends up being quite useful in a number of distributed arraycases.

Optimize Slicing to Minimize Communication

Dask.array slicing optimizations are now careful to produce graphs that avoidsituations that could cause excess inter-worker communication. The details ofhow they do this is a bit out of scope for a short blogpost, but the historyhere is interesting.

Historically dask.arrays were used almost exclusively by researchers with largeon-disk arrays stored as HDF5 or NetCDF files. These users primarily used thesingle machine multi-threaded scheduler. We heavily tailored Dask arrayoptimizations to this situation and made that community pretty happy.Now as some of that community switches to cluster computing on larger datasetsthe optimization goals shift a bit. We have tons of distributed disk bandwidthbut really want to avoid communicating large results between workers.Supporting both use cases is possible and I think that we’ve achieved that inthis release so far, but it’s starting to require increasing levels of care.

Micro-optimizations

With distributed computing also comes larger graphs and a growing importance ofgraph-creation overhead. This has been optimized somewhat in this release. Weexpect this to be a focus going forward.

DataFrames

Set_index

Set_index is smarter in two ways:

  1. If you set_index on a column that happens to be sorted then we’ll identifythat and avoid a costly shuffle. This was always possible with the sorted=keyword but users rarely used this feature. Now this is automatic.
  2. Similarly when setting the index we can look at the size of the data anddetermine if there are too many or too few partitions and rechunk the datawhile shuffling. This can significantly improve performance if there are toomany partitions (a common case).

Shuffle performance

We’ve micro-optimized some parts of dataframe shuffles. Big thanks to thePandas developers for the help here. This accelerates set_index, joins,groupby-applies, and so on.

Fastparquet

The fastparquet library hasseen a lot of use lately and has undergone a number of community bugfixes.

Importantly, Fastparquet now supports Python 2.

We strongly recommend Parquet as the standard data storage format for Daskdataframes (and Pandas DataFrames).

dask/fastparquet #87

Distributed Scheduler

Replay remote exceptions

Debugging is hard in part because exceptions happen on remote machines wherenormal debugging tools like pdb can’t reach. Previously we were able tobring back the traceback and exception, but you couldn’t dive into the stacktrace to investigate what went wrong:

def div(x, y):
return x / y

>>> future = client.submit(div, 1, 0)
>>> future
<Future: status: error, key: div-4a34907f5384bcf9161498a635311aeb>

>>> future.result() # getting result re-raises exception locally
<ipython-input-3-398a43a7781e> in div()
1 def div(x, y):
----> 2 return x / y

ZeroDivisionError: division by zero

Now Dask can bring a failing task and all necessary data back to the localmachine and rerun it so that users can leverage the normal Python debuggingtoolchain.

>>> client.recreate_error_locally(future)
<ipython-input-3-398a43a7781e> in div(x, y)
1 def div(x, y):
----> 2 return x / y
ZeroDivisionError: division by zero

Now if you’re in IPython or a Jupyter notebook you can use the %debug magicto jump into the stacktrace, investigate local variables, and so on.

In [8]: %debug
> <ipython-input-3-398a43a7781e>(2)div()
1 def div(x, y):
----> 2 return x / y

ipdb> pp x
1
ipdb> pp y
0

dask/distributed #894

Async/await syntax

Dask.distributed uses Tornado for network communication and Tornado coroutinesfor concurrency. Normal users rarely interact with Tornado coroutines; theyaren’t familiar to most people so we opted instead to copy theconcurrent.futures API. However some complex situations are much easier tosolve if you know a little bit of async programming.

Fortunately, the Python ecosystem seems to be embracing this change towardsnative async code with the async/await syntax in Python 3. In an effort tomotivate people to learn async programming and to gently nudge them towardsPython 3 Dask.distributed we now support async/await in a few cases.

You can wait on a dask Future

async def f():
future = client.submit(func, *args, **kwargs)
result = await future

You can put the as_completed iterator into an async for loop

async for future in as_completed(futures):
result = await future
... do stuff with result ...

And, because Tornado supports the await protocols you can also use the existingshadow concurrency API (everything prepended with an underscore) with await.(This was doable before.)

results = client.gather(futures) # synchronous
...
results = await client._gather(futures) # asynchronous

If you’re in Python 2 you can always do this with normal yield andthe tornado.gen.coroutine decorator.

dask/distributed #952

Inproc transport

In the last release we enabled Dask to communicate over more things than justTCP. In practice this doesn’t come up (TCP is pretty useful). However in thisrelease we now support single-machine “clusters” where the clients, scheduler,and workers are all in the same process and transfer data cost-free overin-memory queues.

This allows the in-memory user community to use some of the more advancedfeatures (asynchronous computation, spill-to-disk support, web-diagnostics)that are only available in the distributed scheduler.

This is on by default if you create a cluster with LocalCluster without usingNanny processes.

>>> from dask.distributed import LocalCluster, Client

>>> cluster = LocalCluster(nanny=False)

>>> client = Client(cluster)

>>> client
<Client: scheduler='inproc://192.168.1.115/8437/1' processes=1 cores=4>

>>> from threading import Lock # Not serializable
>>> lock = Lock() # Won't survive going over a socket
>>> [future] = client.scatter([lock]) # Yet we can send to a worker
>>> future.result() # ... and back
<unlocked _thread.lock object at 0x7fb7f12d08a0>

dask/distributed #919

Connection pooling for inter-worker communications

Workers now maintain a pool of sustained connections between each other. Thispool is of a fixed size and removes connections with a least-recently-usedpolicy. It avoids re-connection delays when transferring data between workers.In practice this shaves off a millisecond or two from every communication.

This is actually a revival of an old feature that we had turned off last yearwhen it became clear that the performance here wasn’t a problem.

Along with other enhancements, this takes our round-trip latency down to 11mson my laptop.

In [10]: %%time
...: for i in range(1000):
...: future = client.submit(inc, i)
...: result = future.result()
...:
CPU times: user 4.96 s, sys: 348 ms, total: 5.31 s
Wall time: 11.1 s

There may be room for improvement here though. For comparison here is the sametest with the concurent.futures.ProcessPoolExecutor.

In [14]: e = ProcessPoolExecutor(8)

In [15]: %%time
...: for i in range(1000):
...: future = e.submit(inc, i)
...: result = future.result()
...:
CPU times: user 320 ms, sys: 56 ms, total: 376 ms
Wall time: 442 ms

Also, just to be clear, this measures total roundtrip latency, not overhead.Dask’s distributed scheduler overhead remains in the low hundreds ofmicroseconds.

dask/distributed #935

Related Projects

There has been activity around Dask and machine learning:

  • dask-learn is undergoing someperformance enhancements. It turns out that when you offer distributed gridsearch people quickly want to scale up their computations to hundreds ofthousands of trials.
  • dask-glm now has a few decent algorithmsfor convex optimization. The authors of this wrote a blogpost very recentlyif you’re interested:Developing Convex Optimization Algorithms in Dask
  • dask-xgboost lets you hand offdistributed data in Dask dataframes or arrays and hand it directly to adistributed XGBoost system (that Dask will nicely set up and tear down foryou). This was a nice example of easy hand-off between two distributedservices running in the same processes.

Acknowledgements

The following people contributed to the dask/dask repository since the 0.14.0 releaseon February 27th

  • Antoine Pitrou
  • Brian Martin
  • Elliott Sales de Andrade
  • Erik Welch
  • Francisco de la Peña
  • jakirkham
  • Jim Crist
  • Jitesh Kumar Jha
  • Julien Lhermitte
  • Martin Durant
  • Matthew Rocklin
  • Markus Gonser
  • Talmaj

The following people contributed to the dask/distributed repository since the1.16.0 release on February 27th

  • Antoine Pitrou
  • Ben Schreck
  • Elliott Sales de Andrade
  • Martin Durant
  • Matthew Rocklin
  • Phil Elson