Submit New Event

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Submit News Feature

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Contribute a Blog

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Sign up for Newsletter

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Feb 27, 2017

Dask Release 0.14.0

By

This work is supported by Continuum Analyticsthe XDATA Programand the Data Driven Discovery Initiative from the MooreFoundation

Summary

Dask just released version 0.14.0. This release contains some significantinternal changes as well as the usual set of increased API coverage and bugfixes. This blogpost outlines some of the major changes since the last releaseJanuary, 27th 2017.

  1. Structural sharing of graphs between collections
  2. Refactor communications system
  3. Many small dataframe improvements
  4. Top-level persist function

You can install new versions using Conda or Pip

conda install -c conda-forge dask distributed

or

pip install dask[complete] distributed --upgrade

Share Graphs between Collections

Dask collections (arrays, bags, dataframes, delayed) hold onto task graphs thathave all of the tasks necessary to create the desired result. For largerdatasets or complex calculations these graphs may have thousands, or sometimeseven millions of tasks. In some cases the overhead of handling these graphscan become significant.

This is especially true because dask collections don’t modify their graphs inplace, they make new graphs with updated computations. Copying graph datastructures with millions of nodes can take seconds and interrupt interactiveworkflows.

To address this dask.arrays and dask.delayed collections now use special graphdata structures with structural sharing. This significantly cuts down on theamount of overhead when building repetitive computations.

import dask.array as da

x = da.ones(1000000, chunks=(1000,)) # 1000 chunks of size 1000

Version 0.13.0

%time for i in range(100): x = x + 1
CPU times: user 2.69 s, sys: 96 ms, total: 2.78 s
Wall time: 2.78 s

Version 0.14.0

%time for i in range(100): x = x + 1
CPU times: user 756 ms, sys: 8 ms, total: 764 ms
Wall time: 763 ms

The difference in this toy problem is moderate but for real world cases thiscan difference can grow fairly large. This was also one of the blockersidentified by the climate science community stopping them from handlingpetabyte scale analyses.

We chose to roll this out for arrays and delayed first just because those arethe two collections that typically produce large task graphs. Dataframes andbags remain as before for the time being.

Communications System

Dask communicates over TCP sockets. It uses Tornado’s IOStreams to handlenon-blocking communication, framing, etc.. We’ve run into some performanceissues with Tornado when moving large amounts of data. Some of this has beenimproved upstream in Tornadodirectly, but we still want the ability to optionally drop Tornado’sbyte-handling communication stack in the future. This is especially importantas dask gets used in institutions with faster and more exotic interconnects(supercomputers). We’ve been asked a few times to support other transportmechanisms like MPI.

The first step (and probably hardest step) was to make Dask’s communicationsystem is pluggable so that we can use different communication options withoutsignificant source-code changes. We managed this a month ago and now it ispossible to add other transports to Dask relatively easily. TCP remains theonly real choice today though there is also an experimental ZeroMQ option(which provides little-to-no performance benefit over TCP) as well as a fullyin-memory option in development.

For users the main difference you’ll see is that tcp:// is now prepended manyplaces. For example:

$ dask-scheduler
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Scheduler at: tcp://192.168.1.115:8786
...

Variety of Dataframe Changes

As usual the Pandas API has been more fully covered by community contributors.Some representative changes include the following:

  1. Support non-uniform categoricals: We no longer need to do a full passthrough the data when categorizing a column. Instead we categorize eachpartition independently (even if they have different category values) andthen unify these categories only when necessary
  2. df['x'] = df['x'].astype('category') # this is now fast
  3. Groupby cumulative reductions
  4. df.groupby('x').cumsum()
  5. Support appending to Parquet collections
  6. df.to_parquet('/path/to/foo.parquet', append=True)
  7. A new string and HTML representation of dask.dataframes. Typically Pandasprints dataframes on the screen by rendering the first few rows of data.However, Because Dask.dataframes are lazy we don’t have this data and sotypically render some metadata about the dataframe
  8. >>> df # version 0.13.0
    dd.DataFrame<make-ti..., npartitions=366, divisions=(Timestamp('2000-01-01
    00:00:00', freq='D'), Timestamp('2000-01-02 00:00:00', freq='D'),
    Timestamp('2000-01-03 00:00:00', freq='D'), ..., Timestamp('2000-12-31
    00:00:00', freq='D'), Timestamp('2001-01-01 00:00:00', freq='D'))>
  9. This rendering, while informative, can be improved. Now we renderdataframes as a Pandas dataframe, but place metadata in the dataframeinstead of the actual data.
  10. >>> df # version 0.14.0
    Dask DataFrame Structure:
    x y z
    npartitions=366
    2000-01-01 float64 float64 int64
    2000-01-02 ... ... ...
    ... ... ... ...
    2000-12-31 ... ... ...
    2001-01-01 ... ... ...
    Dask Name: make-timeseries, 366 tasks
  11. Additionally this renders nicely as an HTML table in a Jupyter notebook

Variety of Distributed System Changes

There have also been a wide variety of changes to the distributed system. I’llinclude a representative sample here to give a flavor of what has beenhappening:

  1. Ensure first-come-first-served priorities when dealing with multipleclients
  2. Send small amounts of data through Channels. Channels are a way formultiple clients/users connected to the same scheduler to publish andexchange data between themselves. Previously they only transmitted Futures(which could in trun point to larger data living on the cluster). Howeverwe found that it was useful to communicate small bits of metadata as well,for example to signal progress or stopping critera between clientscollaborating on the same workloads. Now you can publish any msgpackserializable data on Channels.
  3. # Publishing Client
    scores = client.channel('scores')
    scores.append(123.456)

    # Subscribing Client
    scores = client.channel('scores')
    while scores.data[-1] < THRESHOLD:
    ... continue working ...
  4. We’re better at estimating the size in data of SciPy Sparse matrices andKeras models. This allows Dask to make smarter choices about when itshould and should not move data around for load balancing. AdditionallyDask can now also serialize Keras models.
  5. To help people deploying on clusters that have a shared network file system(as is often the case in scientific or academic institutions) the schedulerand workers can now communicate connection information using the--scheduler-file keyword
  6. dask-scheduler --scheduler-file /path/to/scheduler.json
    dask-worker --scheduler-file /path/to/scheduler.json
    dask-worker --scheduler-file /path/to/scheduler.json

    >>> client = Client(scheduler_file='/path/to/scheudler.json')
  7. Previously we needed to communicate the address of the scheduler, whichcould be challenging when we didn’t know on which node the scheduler wouldbe run.

Other

There are a number of smaller details not mentioned in this blogpost. For moreinformation visit the changelogs and documentation

Additionally a great deal of Dask work over the last month has happenedoutside of these core dask repositories.

You can install or upgrade using Conda or Pip

conda install -c conda-forge dask distributed

or

pip install dask[complete] distributed --upgrade

Acknowledgements

Since the last 0.13.0 release on January 27th the following developers havecontributed to the dask/dask repository:

  • Antoine Pitrou
  • Chris Barber
  • Daniel Davis
  • Elmar Ritsch
  • Erik Welch
  • jakirkham
  • Jim Crist
  • John Crickett
  • jspreston
  • Juan Luis Cano Rodríguez
  • kayibal
  • Kevin Ernst
  • Markus Gonser
  • Matthew Rocklin
  • Martin Durant
  • Nir
  • Sinhrks
  • Talmaj Marinc
  • Vlad Frolov
  • Will Warner

And the following developers have contributed to the dask/distributedrepository:

  • Antoine Pitrou
  • Ben Schreck
  • bmaisonn
  • Brett Naul
  • Demian Wassermann
  • Israel Saeta Pérez
  • John Crickett
  • Joseph Crail
  • Malte Gerken
  • Martin Durant
  • Matthew Rocklin
  • Min RK
  • strets123