Submit New Event

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Submit News Feature

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Contribute a Blog

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Sign up for Newsletter

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
May 8, 2017

Dask Release 0.14.3

By

This work is supported by Continuum Analyticsand the Data Driven Discovery Initiative from the MooreFoundation.

I’m pleased to announce the release of Dask version 0.14.3. This releasecontains a variety of performance and feature improvements. This blogpostincludes some notable features and changes since the last release on March22nd.

As always you can conda install from conda-forge

conda install -c conda-forge dask distributed

or you can pip install from PyPI

pip install dask[complete] --upgrade

Conda packages should be on the default channel within a few days.

Arrays

Sparse Arrays

Dask.arrays now support sparse arrays and mixed dense/sparse arrays.

>>> import dask.array as da

>>> x = da.random.random(size=(10000, 10000, 10000, 10000),
... chunks=(100, 100, 100, 100))
>>> x[x < 0.99] = 0

>>> import sparse
>>> s = x.map_blocks(sparse.COO) # parallel array of sparse arrays

In order to support sparse arrays we did two things:

  1. Made dask.array support ndarray containers other than NumPy, as longas they were API compatible
  2. Made a small sparse array librarythat was API compatible to the numpy.ndarray

This process was pretty easy and could be extended to other systems.This also allows for different kinds of ndarrays in the same Dask array, aslong as interactions between the arrays are well defined (using the standardNumPy protocols like __array_priority__ and so on.)

Documentation: http://dask.pydata.org/en/latest/array-sparse.html

Update: there is already a pullrequest for Masked arrays

Reworked FFT code

The da.fft submodule has been extended to include most of the functions innp.fft, with the caveat that multi-dimensional FFTs will only work alongsingle-chunk dimensions. Still, given that rechunking is decently fast todaythis can be very useful for large image stacks.

Documentation: http://dask.pydata.org/en/latest/array-api.html#fast-fourier-transforms

Constructor Plugins

You can now run arbitrary code whenever a dask array is constructed. Thisempowers users to build in their own policies like rechunking, warning users,or eager evaluation. A dask.array plugin takes in a dask.array and returnseither a new dask array, or returns None, in which case the original will bereturned.

>>> def f(x):
... print('%d bytes' % x.nbytes)

>>> with dask.set_options(array_plugins=[f]):
... x = da.ones((10, 1), chunks=(5, 1))
... y = x.dot(x.T)
80 bytes
80 bytes
800 bytes
800 bytes

This can be used, for example, to convert dask.array code into numpy code toidentify bugs quickly:

>>> with dask.set_options(array_plugins=[lambda x: x.compute()]):
... x = da.arange(5, chunks=2)

>>> x # this was automatically converted into a numpy array
array([0, 1, 2, 3, 4])

Or to warn users if they accidentally produce an array with large chunks:

def warn_on_large_chunks(x):
shapes = list(itertools.product(*x.chunks))
nbytes = [x.dtype.itemsize * np.prod(shape) for shape in shapes]
if any(nb > 1e9 for nb in nbytes):
warnings.warn("Array contains very large chunks")

with dask.set_options(array_plugins=[warn_on_large_chunks]):
...

These features were heavily requested by the climate science community, whichtends to serve both highly technical computer scientists, and less technicalclimate scientists who were running into issues with the nuances of chunking.

DataFrames

Dask.dataframe changes are both numerous, and very small, making it difficultto give a representative accounting of recent changes within a blogpost.Typically these include small changes to either track new Pandas development,or to fix slight inconsistencies in corner cases (of which there are many.)

Still, two highlights follow:

Rolling windows with time intervals

>>> s.rolling('2s').count().compute()
2017-01-01 00:00:00 1.0
2017-01-01 00:00:01 2.0
2017-01-01 00:00:02 2.0
2017-01-01 00:00:03 2.0
2017-01-01 00:00:04 2.0
2017-01-01 00:00:05 2.0
2017-01-01 00:00:06 2.0
2017-01-01 00:00:07 2.0
2017-01-01 00:00:08 2.0
2017-01-01 00:00:09 2.0
dtype: float64

Read Parquet data with Arrow

Dask now supports reading Parquet data with bothfastparquet (a Numpy/Numbasolution) and Arrow andParquet-CPP.

df = dd.read_parquet('/path/to/mydata.parquet', engine='fastparquet')
df = dd.read_parquet('/path/to/mydata.parquet', engine='arrow')

Hopefully this capability increases the use of both projects and results ingreater feedback to those libraries so that they can continue to advancePython’s access to the Parquet format.

Graph Optimizations

Dask performs a few passes of simple linear-time graph optimizations beforesending a task graph to the scheduler. These optimizations currently vary bycollection type, for example dask.arrays have different optimizations thandask.dataframes. These optimizations can greatly improve performance in somecases, but can also increase overhead, which becomes very important for largegraphs.

As Dask has grown into more communities, each with strong and differingperformance constraints, we’ve found that we needed to allow each community todefine its own optimization schemes. The defaults have not changed, but nowyou can override them with your own. This can be set globally or with acontext manager.

def my_optimize_function(graph, keys):
""" Takes a task graph and a list of output keys, returns new graph """
new_graph = {...}
return new_graph

with dask.set_options(array_optimize=my_optimize_function,
dataframe_optimize=None,
delayed_optimize=my_other_optimize_function):
x, y = dask.compute(x, y)

Documentation: http://dask.pydata.org/en/latest/optimize.html#customizing-optimization

Speed improvements

Additionally, task fusion has been significantly accelerated. This is veryimportant for large graphs, particularly in dask.array computations.

Web Diagnostics

The distributed scheduler’s web diagnostic page is now served from within thedask scheduler process. This is both good and bad:

  • Good: It is much easier to make new visuals
  • Bad: Dask and Bokeh now share a single CPU

Because Bokeh and Dask now share the same Tornado event loop we no longer needto send messages between them to then send out to a web browser. The Bokehserver has full access to all of the scheduler state. This lets us build newdiagnostic pages more easily. This has been around for a while but was largelyused for development. In this version we’ve switched the new version to bedefault and turned off the old one.

The cost here is that the Bokeh scheduler can take 10-20% of the CPU use. Ifyou are running a computation that heavily taxes the scheduler then you mightwant to close your diagnostic pages. Fortunately, this almost never happens.The dask scheduler is typically fast enough to never get close to this limit.

Tornado difficulties

Beware that the current versions of Bokeh (0.12.5) and Tornado (4.5) do notplay well together. This has been fixed in development versions, and installingwith conda is fine, but if you naively pip install then you may experience bad behavior.

Joblib

The Dask.distributed Joblib backend now includes a scatter= keyword, allowingyou to pre-scatter select variables out to all of the Dask workers. Thissignificantly cuts down on overhead, especially on machine learning workloadswhere most of the data doesn’t change very much.

# Send the training data only once to each worker
with parallel_backend('dask.distributed', scheduler_host='localhost:8786',
scatter=[digits.data, digits.target]):
search.fit(digits.data, digits.target)

Early trials indicate that computations like scikit-learn’s RandomForest scalenicely on a cluster without any additional code.

Documentation: http://distributed.readthedocs.io/en/latest/joblib.html

Preload scripts

When starting a dask.distributed scheduler or worker people often want toinclude a bit of custom setup code, for example to configure loggers,authenticate with some network system, and so on. This has always been possible ifyou start scheduler and workers from withinPythonbut is tricky if you want to use the command line interface. Now you can writeyour custom code as a separate standalone script and ask the command lineinterface to run it for you at startup:

# scheduler-setup.py
from distributed.diagnostics.plugin import SchedulerPlugin

class MyPlugin(SchedulerPlugin):
""" Prints a message whenever a worker is added to the cluster """
def add_worker(self, scheduler=None, worker=None, **kwargs):
print("Added a new worker at", worker)

def dask_setup(scheduler):
plugin = MyPlugin()
scheduler.add_plugin(plugin)

dask-scheduler --preload scheduler-setup.py

This makes it easier for people to adapt Dask to their particular institution.

Documentation: http://distributed.readthedocs.io/en/latest/setup.html#customizing-initialization

Network Interfaces (for infiniband)

Many people use Dask on high performance supercomputers. This hardwarediffers from typical commodity clusters or cloud services in several ways,including very high performance network interconnects likeInfiniBand. Typically thesesystems also have normal ethernet and other networks. You’re probably familiarwith this on your own laptop when you have both ethernet and wireless:

$ ifconfig
lo Link encap:Local Loopback # Localhost
inet addr:127.0.0.1 Mask:255.0.0.0
inet6 addr: ::1/128 Scope:Host
eth0 Link encap:Ethernet HWaddr XX:XX:XX:XX:XX:XX # Ethernet
inet addr:192.168.0.101
...
ib0 Link encap:Infiniband # Fast InfiniBand
inet addr:172.42.0.101

The default systems Dask uses to determine network interfaces often chooseethernet by default. If you are on an HPC system then this is likely notoptimal. You can direct Dask to choose a particular network interface with the--interface keyword

$ dask-scheduler --interface ib0
distributed.scheduler - INFO - Scheduler at: tcp://172.42.0.101:8786

$ dask-worker tcp://172.42.0.101:8786 --interface ib0

Efficient as_completed

Theas_completediterator returns futures in the order in which they complete. It is the baseof many asynchronous applications using Dask.

>>> x, y, z = client.map(inc, [0, 1, 2])
>>> for future in as_completed([x, y, z]):
... print(future.result())
2
0
1

It can now also wait to yield an element only after the result also arrives

>>> for future, result in as_completed([x, y, z], with_results=True):
... print(result)
2
0
1

And also yield all futures (and results) that have finished up until thispoint.

>>> for futures in as_completed([x, y, z]).batches():
... print(client.gather(futures))
(2, 0)
(1,)

Both of these help to decrease the overhead of tight inner loops withinasynchronous applications.

Example blogpost here: /2017/04/19/dask-glm-2

Co-released libraries

This release is aligned with a number of other related libraries, notablyPandas, and several smaller libraries for accessing data, includings3fs,hdfs3,fastparquet, andpython-snappy each of which haveseen numerous updates over the past few months. Much of the work of theselatter libraries is being coordinated by MartinDurant

Acknowledgements

The following people contributed to the dask/dask repository since the 0.14.1 releaseon March 22nd

  • Antoine Pitrou
  • Dmitry Shachnev
  • Erik Welch
  • Eugene Pakhomov
  • Jeff Reback
  • Jim Crist
  • John A Kirkham
  • Joris Van den Bossche
  • Martin Durant
  • Matthew Rocklin
  • Michal Ficek
  • Noah D Brenowitz
  • Stuart Archibald
  • Tom Augspurger
  • Wes McKinney
  • wikiped

The following people contributed to the dask/distributed repository since the1.16.1 release on March 22nd

  • Antoine Pitrou
  • Bartosz Marcinkowski
  • Ben Schreck
  • Jim Crist
  • Jens Nie
  • Krisztián Szűcs
  • Lezyes
  • Luke Canavan
  • Martin Durant
  • Matthew Rocklin
  • Phil Elson