This work is supported by Continuum Analyticsthe XDATA Programand the Data Driven Discovery Initiative from the MooreFoundation
Dask just grew to version 0.13.0. This is a signifcant release for arrays,dataframes, and the distributed scheduler. This blogpost outlines some of themajor changes since the last release November 4th.
You can install new versions using Conda or Pip
conda install -c conda-forge dask distributed
or
pip install dask[complete] distributed --upgrade
Dask and all necessary dependencies are now available on CondaForge for Python 3.6.
Thousand-core Dask deployments have become significantly more common in thelast few months. This has highlighted scaling issues in some of theDask.array and Dask.dataframe algorithms, which were originally designed forsingle workstations. Algorithmic and API changes can be grouped into thefollowing two categories:
Dask Dataframes now include a fuller set of the Pandas API, including thefollowing:
Additionally, collaboration with some of the larger Dask deployments hashighlighted scaling issues in some algorithms, resulting in the following improvements:
These same collaborations have also yielded better handling of open filedescriptors, changes upstream to Tornado, and upstream changes to theconda-forge CPython recipe itself to increase the default file descriptor limiton Windows up from 512.
You can now convert Dask dataframes into Dask arrays. This is mostly tosupport efforts of groups building statistics and machine learningapplications, where this conversion is common. For example you can load aterabyte of CSV or Parquet data, do some basic filtering and manipulation, andthen convert to a Dask array to do more numeric work like SVDs, regressions,etc..
import dask.dataframe as dd
import dask.array as da
df = dd.read_csv('s3://...') # Read raw data
x = df.values # Convert to dask.array
u, s, v = da.linalg.svd(x) # Perform serious numerics
This should help machine learning and statistics developers generally, as manyof the more sophisticated algorithms can be more easily implemented with theDask array model than can be done with distributed dataframes. This change wasdone specifically to support the nascent third-partydask-glm project by ChrisWhite at Capital One.
Previously this was hard because Dask.array wanted to know the size of everychunk of data, which Dask dataframes can’t provide (because, for example, it isimpossible to lazily tell how many rows are in a CSV file without actuallylooking through it). Now that Dask.arrays have relaxed this requirement theycan also support other unknown shape operations, like indexing an array withanother array.
y = x[x > 0]
Dask.dataframe now supports Parquet, a columnarbinary store for tabular data commonly used in distributed clusters and theHadoop ecosystem.
import dask.dataframe as dd
df = dd.read_parquet('myfile.parquet') # Read from Parquet
df.to_parquet('myfile.parquet', compression='snappy') # Write to Parquet
This is done through the newfastparquet library, aNumba-accelerated version of the Pure Pythonparquet-python. Fastparquet wasbuilt and is maintained by Martin Durant.It’s also exciting to see theParquet-cpp project gain Pythonsupport through Arrow and work byWes McKinney and UweKorn. Parquet has gone from inaccessible in Pythonto having multiple competing implementations, which is a wonderful and excitingchange for the “Big Data” Python ecosystem.
The internals of the distributed scheduler and workers are significantlymodified. Users shouldn’t experience much change here except for generalperformance enhancement, more upcoming features, and much deeper visualdiagnostics through Bokeh servers.
We’ve pushed some of the scheduling logic from the scheduler onto the workers. This lets us do two things:
While optimizing scheduler performance we built several new visual diagnosticsusing Bokeh. There is now a Bokeh Serverrunning within the scheduler and within every worker.
Current Dask.distributed users will be familiar with the current diagnosticdashboards:
These plots provide intuition about the state of the cluster and thecomputations currently in flight. These dashboards are generally well loved.
There are now many more of these, though more focused on internal state andtimings that will be of interest to developers and power users than to atypical users. Here are a couple of the new pages (of which there are seven)that show various timings and counters of various parts of the worker andscheduler internals.
The previous Bokeh dashboards were served from a separate process that queriedthe scheduler periodically (every 100ms). Now there are new Bokeh serverswithin every worker and a new Bokeh server within the scheduler processitself rather than in a separate process. Because these servers are embeddedthey have direct access to the state of the scheduler and workers whichsignificantly reduces barriers for us to build out new visuals. However, thisalso adds some load to the scheduler, which can often be compute bound. Thesepages are available at new ports, 8788 for the scheduler and 8789 for theworker by default.
This is actually a change that occurred in the last release, but I haven’twritten about it and it’s important, so I’m including it here.
Previously inter-worker communication of data was accomplished withPickle/Cloudpickle and optional generic compression like LZ4/Snappy. This wasrobust and worked mostly fine, but left out some exotic data types and did notprovide optimal performance.
Now we can serialize different types with special consideration. This allowsspecial types, like NumPy arrays, to pass through without unnecessary memorycopies and also allows us to use more exotic data-type specific compressiontechniques like Blosc.
It also allows Dask to serialize some previously unserializable types. Inparticular this was intended to solve the Dask.array climate sciencecommunity’s concern about HDF5 and NetCDF files which (correctly) areunpicklable and so restricted to single-machine use.
This is also the first step towards two frequently requested features (neitherof these exist yet):
So what should we expect to see in the future for Dask?
You can install or upgrade using Conda or Pip
conda install -c conda-forge dask distributed
or
pip install dask[complete] distributed --upgrade
You can learn more about Dask and its distributed scheduler at these websites:
Since the last main release the following developers have contributed to thecore Dask repostiory (parallel algorithms, arrays, dataframes, etc..)
And the following developers have contributed to the Dask/distributedrepository (distributed scheduling, network communication, etc..)