Submit New Event

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Submit News Feature

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Contribute a Blog

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Sign up for Newsletter

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Nov 21, 2017

Dask Release 0.16.0

By

This work is supported by Anaconda Inc.and the Data Driven Discovery Initiative from the MooreFoundation.

I’m pleased to announce the release of Dask version 0.16.0. This is a majorrelease with new features, breaking changes, and stability improvements. Thisblogpost outlines notable changes since the 0.15.3 release on September 24th.

You can conda install Dask:

conda install dask

or pip install from PyPI:

pip install dask[complete] --upgrade

Conda packages are available on both conda-forge and default channels.

Full changelogs are available here:

Some notable changes follow.

Breaking Changes

  • The dask.async module was moved to dask.local for Python 3.7compatibility. This was previously deprecated and is now fully removed.
  • The distributed scheduler’s diagnostic JSON pages have been removed andreplaced by more informative templated HTML.
  • The use of commonly-used private methods _keys and _optimize have beenreplaced with the Dask collection interface (see below).

Dask collection interface

It is now easier to implement custom collections using the Dask collectioninterface.

Dask collections (arrays, dataframes, bags, delayed) interact with Daskschedulers (single-machine, distributed) with a few internal methods. Weformalized this interface into protocols like .__dask_graph__() and.__dask_keys__() and havepublished that interface.Any object that implements the methods described in that document will interactwith all Dask scheduler features as a first-class Dask object.

class MyDaskCollection(object):
def __dask_graph__(self):
...

def __dask_keys__(self):
...

def __dask_optimize__(self, ...):
...

...

This interface has already been implemented within the XArray project forlabeled and indexed arrays. Now all XArray classes (DataSet, DataArray,Variable) are fully understood by all Dask schedulers. They are as first-classas dask.arrays or dask.dataframes.

import xarray as xa
from dask.distributed import Client

client = Client()

ds = xa.open_mfdataset('*.nc', ...)

ds = client.persist(ds) # XArray object integrate seamlessly with Dask schedulers

Work on Dask’s collection interfaces was primarily done by Jim Crist.

Bandwidth and Tornado 5 compatibility

Dask is built on the Tornado library for concurrent network programming. In aneffort to improve inter-worker bandwidth on exotic hardware (Infiniband), Daskdevelopers are proposing changes to Tornado’s network infrastructure.

However, in order to use these changes Dask itself needs to run on the nextversion of Tornado in development, Tornado 5.0.0, which breaks a number ofinterfaces on which Dask has relied. Dask developers have been resolving theseand we encourage other PyData developers to do the same. For example, neitherBokeh nor Jupyter work on Tornado 5.0.0-dev.

Dask inter-worker bandwidth is peaking at around 1.5-2GB/s on a networktheoretically capable of 3GB/s. GitHub issue: pangeo #6

Dask worker bandwidth

Network performance and Tornado compatibility are primarily being handled byAntoine Pitrou.

Parquet Compatibility

Dask.dataframe can use either of the two common Parquet libraries in Python,Apache Arrow and Fastparquet. Each has its own strengths and its own base ofusers who prefer it. We’ve significantly extended Dask’s parquet test suite tocover each library, extending roundtrip compatibility. Notably, you can nowboth read and write with PyArrow.

df.to_parquet('...', engine='fastparquet')
df = dd.read_parquet('...', engine='pyarrow')

There is still work to be done here. The variety of parquet reader/writers andconventions out there makes completely solving this problem difficult. It’snice seeing the various projects slowly converge on common functionality.

This work was jointly done by Uwe Korn, Jim Crist, and Martin Durant.

Retrying Tasks

One of the most requested features for the Dask.distributed scheduler is theability to retry failed tasks. This is particularly useful to people usingDask as a task queue, rather than as a big dataframe or array.

future = client.submit(func, *args, retries=5)

Task retries were primarily built by Antoine Pitrou.

Transactional Work Stealing

The Dask.distributed task scheduler performs load balancing through workstealing. Previously this would sometimes result in the same task runningsimultaneously in two locations. Now stealing is transactional, meaning thatit will avoid accidentally running the same task twice. This behavior isespecially important for people using Dask tasks for side effects.

It is still possible for the same task to run twice, but now this only happensin more extreme situations, such as when a worker dies or a TCP connection issevered, neither of which are common on standard hardware.

Transactional work stealing was primarily implemented by Matthew Rocklin.

New Diagnostic Pages

There is a new set of diagnostic web pages available in the Info tab of thedashboard. These pages provide more in-depth information about each worker andtask, but are not dynamic in any way. They use Tornado templates rather thanBokeh plots, which means that they are less responsive but are much easier tobuild. This is an easy and cheap way to expose more scheduler state.

Task page of Dask's scheduler info dashboard

Nested compute calls

Calling .compute() within a task now invokes the same distributedscheduler. This enables writing more complex workloads with less thought tostarting worker clients.

import dask
from dask.distributed import Client
client = Client() # only works for the newer scheduler

@dask.delayed
def f(x):
...
return dask.compute(...) # can call dask.compute within delayed task

dask.compute([f(i) for ...])

Nested compute calls were primarily developed by Matthew Rocklin and OlivierGrisel.

More aggressive Garbage Collection

The workers now explicitly call gc.collect() at various times when undermemory pressure and when releasing data. This helps to avoid some memoryleaks, especially when using Pandas dataframes. Doing this carefully provedto require a surprising degree of detail.

Improved garbage collection was primarily implemented and tested by FabianKeller and Olivier Grisel, with recommendations by Antoine Pitrou.

Related projects

Dask-ML

A variety of Dask Machine Learning projects are now being assembled under oneunified repository, dask-ml. Weencourage users and researchers alike to read through that project. We believethere are many useful and interesting approaches contained within.

The work to assemble and curate these algorithms is primarily being handled byTom Augspurger.

XArray

The XArray project for indexed andlabeled arrays is also releasing their major 0.10.0 release this week, whichincludes many performance improvements, particularly for using Dask on largerdatasets.

Acknowledgements

The following people contributed to the dask/dask repository since the 0.15.3release on September 24th:

  • Ced4
  • Christopher Prohm
  • fjetter
  • Hai Nguyen Mau
  • Ian Hopkinson
  • James Bourbeau
  • James Munroe
  • Jesse Vogt
  • Jim Crist
  • John Kirkham
  • Keisuke Fujii
  • Matthias Bussonnier
  • Matthew Rocklin
  • mayl
  • Martin Durant
  • Olivier Grisel
  • severo
  • Simon Perkins
  • Stephan Hoyer
  • Thomas A Caswell
  • Tom Augspurger
  • Uwe L. Korn
  • Wei Ji
  • xwang777

The following people contributed to the dask/distributed repository since the1.19.1 release on September 24nd:

  • Alvaro Ulloa
  • Antoine Pitrou
  • chkoar
  • Fabian Keller
  • Ian Hopkinson
  • Jim Crist
  • Kelvin Yang
  • Krisztián Szűcs
  • Matthew Rocklin
  • Mike DePalatis
  • Olivier Grisel
  • rbubley
  • Tom Augspurger

The following people contributed to the dask/dask-ml repository

  • Evan Welch
  • Matthew Rocklin
  • severo
  • Tom Augspurger
  • Trey Causey

In addition, we are proud to announce that Olivier Grisel has accepted commitrights to the Dask projects. Olivier has been particularly active on thedistributed scheduler, and on related projects like Joblib, SKLearn, andCloudpickle.