Submit New Event

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Submit News Feature

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Contribute a Blog

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Sign up for Newsletter

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Jan 3, 2017

Dask Release 0.13.0

By

This work is supported by Continuum Analyticsthe XDATA Programand the Data Driven Discovery Initiative from the MooreFoundation

Summary

Dask just grew to version 0.13.0. This is a signifcant release for arrays,dataframes, and the distributed scheduler. This blogpost outlines some of themajor changes since the last release November 4th.

  1. Python 3.6 support
  2. Algorithmic and API improvements for DataFrames
  3. Dataframe to Array conversions for Machine Learning
  4. Parquet support
  5. Scheduling Performance and Worker Rewrite
  6. Pervasive Visual Diagnostics with Embedded Bokeh Servers
  7. Windows continuous integration
  8. Custom serialization

You can install new versions using Conda or Pip

conda install -c conda-forge dask distributed

or

pip install dask[complete] distributed --upgrade

Python 3.6 Support

Dask and all necessary dependencies are now available on CondaForge for Python 3.6.

Algorithmic and API Improvements for DataFrames

Thousand-core Dask deployments have become significantly more common in thelast few months. This has highlighted scaling issues in some of theDask.array and Dask.dataframe algorithms, which were originally designed forsingle workstations. Algorithmic and API changes can be grouped into thefollowing two categories:

  1. Filling out the Pandas API
  2. Algorithms that needed to be changed or added due to scaling issues

Dask Dataframes now include a fuller set of the Pandas API, including thefollowing:

  1. Inplace operations like df['x'] = df.y + df.z
  2. The full Groupby-aggregate syntax like df.groupby(...).aggregate({'x': 'sum', 'y': ['min', max']})
  3. Resample on dataframes as well as series
  4. Pandas’ new rolling syntax df.x.rolling(10).mean()
  5. And much more

Additionally, collaboration with some of the larger Dask deployments hashighlighted scaling issues in some algorithms, resulting in the following improvements:

  1. Tree reductions for groupbys, aggregations, etc.
  2. Multi-output-partition aggregations for groupby-aggregations with millions of groups, drop_duplicates, etc..
  3. Approximate algorithms for nunique
  4. etc..

These same collaborations have also yielded better handling of open filedescriptors, changes upstream to Tornado, and upstream changes to theconda-forge CPython recipe itself to increase the default file descriptor limiton Windows up from 512.

Dataframe to Array Conversions

You can now convert Dask dataframes into Dask arrays. This is mostly tosupport efforts of groups building statistics and machine learningapplications, where this conversion is common. For example you can load aterabyte of CSV or Parquet data, do some basic filtering and manipulation, andthen convert to a Dask array to do more numeric work like SVDs, regressions,etc..

import dask.dataframe as dd
import dask.array as da

df = dd.read_csv('s3://...') # Read raw data

x = df.values # Convert to dask.array

u, s, v = da.linalg.svd(x) # Perform serious numerics

This should help machine learning and statistics developers generally, as manyof the more sophisticated algorithms can be more easily implemented with theDask array model than can be done with distributed dataframes. This change wasdone specifically to support the nascent third-partydask-glm project by ChrisWhite at Capital One.

Previously this was hard because Dask.array wanted to know the size of everychunk of data, which Dask dataframes can’t provide (because, for example, it isimpossible to lazily tell how many rows are in a CSV file without actuallylooking through it). Now that Dask.arrays have relaxed this requirement theycan also support other unknown shape operations, like indexing an array withanother array.

y = x[x > 0]

Parquet Support

Dask.dataframe now supports Parquet, a columnarbinary store for tabular data commonly used in distributed clusters and theHadoop ecosystem.

import dask.dataframe as dd

df = dd.read_parquet('myfile.parquet') # Read from Parquet

df.to_parquet('myfile.parquet', compression='snappy') # Write to Parquet

This is done through the newfastparquet library, aNumba-accelerated version of the Pure Pythonparquet-python. Fastparquet wasbuilt and is maintained by Martin Durant.It’s also exciting to see theParquet-cpp project gain Pythonsupport through Arrow and work byWes McKinney and UweKorn. Parquet has gone from inaccessible in Pythonto having multiple competing implementations, which is a wonderful and excitingchange for the “Big Data” Python ecosystem.

Scheduling Performance and Worker Rewrite

The internals of the distributed scheduler and workers are significantlymodified. Users shouldn’t experience much change here except for generalperformance enhancement, more upcoming features, and much deeper visualdiagnostics through Bokeh servers.

We’ve pushed some of the scheduling logic from the scheduler onto the workers. This lets us do two things:

  1. We keep a much larger backlog of tasks on the workers. This allows workersto optimize and saturate their hardware more effectively. As a result,complex computations end up being significantly faster.
  2. We can more easily deliver on a rising number of requests for complexscheduling features. For example, GPU users will be happy to learn thatyou can now specify abstract resource constraints like “this task requiresa GPU” and “this worker has four GPUs” and the scheduler and workers willallocate tasks accordingly. This is just one example of a feature that waseasy to implement after the scheduler/worker redesign and is now available.

Pervasive Visual Diagnostics with Embedded Bokeh Servers

While optimizing scheduler performance we built several new visual diagnosticsusing Bokeh. There is now a Bokeh Serverrunning within the scheduler and within every worker.

Current Dask.distributed users will be familiar with the current diagnosticdashboards:

Dask Bokeh Plots

These plots provide intuition about the state of the cluster and thecomputations currently in flight. These dashboards are generally well loved.

There are now many more of these, though more focused on internal state andtimings that will be of interest to developers and power users than to atypical users. Here are a couple of the new pages (of which there are seven)that show various timings and counters of various parts of the worker andscheduler internals.

Dask Bokeh counters page

The previous Bokeh dashboards were served from a separate process that queriedthe scheduler periodically (every 100ms). Now there are new Bokeh serverswithin every worker and a new Bokeh server within the scheduler processitself rather than in a separate process. Because these servers are embeddedthey have direct access to the state of the scheduler and workers whichsignificantly reduces barriers for us to build out new visuals. However, thisalso adds some load to the scheduler, which can often be compute bound. Thesepages are available at new ports, 8788 for the scheduler and 8789 for theworker by default.

Custom Serialization

This is actually a change that occurred in the last release, but I haven’twritten about it and it’s important, so I’m including it here.

Previously inter-worker communication of data was accomplished withPickle/Cloudpickle and optional generic compression like LZ4/Snappy. This wasrobust and worked mostly fine, but left out some exotic data types and did notprovide optimal performance.

Now we can serialize different types with special consideration. This allowsspecial types, like NumPy arrays, to pass through without unnecessary memorycopies and also allows us to use more exotic data-type specific compressiontechniques like Blosc.

It also allows Dask to serialize some previously unserializable types. Inparticular this was intended to solve the Dask.array climate sciencecommunity’s concern about HDF5 and NetCDF files which (correctly) areunpicklable and so restricted to single-machine use.

This is also the first step towards two frequently requested features (neitherof these exist yet):

  1. Better support for GPU-GPU specific serialization options. We are now alarge step closer to generalizing away our assumption of TCP Sockets asthe universal communication mechanism.
  2. Passing data between workers of different runtime languages. By embracingother protocols than Pickle we begin to allow for the communication of databetween workers of different software environments.

What’s Next

So what should we expect to see in the future for Dask?

  • Communication: Now that workers are more fully saturated we’ve foundthat communication issues are arising more frequently as bottlenecks. Thismight be because everything else is nearing optimal or it might bebecause of the increased contention in the workers now that they are idleless often. Many of our new diagnostics are intended to measure componentsof the communication pipeline.
  • Third Party Tools: We’re seeing a nice growth of utilities likedask-drmaa for launching clusters onDRMAA job schedulers (SGE, SLURM, LSF) anddask-glm for solvers for GLM-likemachine-learning algorithms. I hope that external projects like thesebecome the main focus of Dask development going forward as Dask penetratesnew domains.
  • Blogging: I’ll be launching a few fun blog posts throughout the nextcouple of weeks. Stay tuned.

Learn More

You can install or upgrade using Conda or Pip

conda install -c conda-forge dask distributed

or

pip install dask[complete] distributed --upgrade

You can learn more about Dask and its distributed scheduler at these websites:

Acknowledgements

Since the last main release the following developers have contributed to thecore Dask repostiory (parallel algorithms, arrays, dataframes, etc..)

  • Alexander C. Booth
  • Antoine Pitrou
  • Christopher Prohm
  • Frederic Laliberte
  • Jim Crist
  • Martin Durant
  • Matthew Rocklin
  • Mike Graham
  • Rolando (Max) Espinoza
  • Sinhrks
  • Stuart Archibald

And the following developers have contributed to the Dask/distributedrepository (distributed scheduling, network communication, etc..)

  • Antoine Pitrou
  • jakirkham
  • Jeff Reback
  • Jim Crist
  • Martin Durant
  • Matthew Rocklin
  • rbubley
  • Stephan Hoyer
  • strets123
  • Travis E. Oliphant