To increase transparency I’m blogging weekly(ish) about the work done on Daskand related projects during the previous week. This log covers work donebetween 2017-04-20 and 2017-04-28. Nothing here is ready for production. Thisblogpost is written in haste, so refined polish should not be expected.
Development in Dask and Dask-related projects during the last week includes thefollowing notable changes:
Scikit learn parallelizes most of their algorithms withJoblib, which provides a simple interfacefor embarrassingly parallel computations. Dask has been able to hijackjoblib code andserve as the backend for some time now, but it had some limitations,particularly because we would repeatedly send data back and forth from aworker to client for every batch of computations.
from joblib import Parallel, parallel_backend
with parallel_backend('dask.distributed', scheduler_host='HOST:PORT'):
# normal Joblib code
Now there is a scatter= keyword, which allows you to pre-scatter selectvariables out to all of the Dask workers. This significantly cuts down onoverhead, especially on machine learning workloads where most of the datadoesn’t change very much.
# Send the training data only once to each worker
with parallel_backend('dask.distributed', scheduler_host='localhost:8786',
Early trials indicate that computations like scikit-learn’s RandomForest scalenicely on a cluster without any additional code.
This is particularly nice because it allows Dask and Scikit-Learn to play welltogether without having to introduce Dask within the Scikit-Learn codebase atall. From a maintenance perspective this combination is very attractive.
The convex optimization solvers in thedask-glm project allow us to solve commonmachine learning and statistics problems in parallel and at scale.Historically this young library has contained only optimization solvers andrelatively little in the way of user API.
This week dask-glm grew new LogisticRegression and LinearRegression estimatorsthat expose the scalable convex optimization algorithms within dask-glm througha Scikit-Learn compatible interface. This can both speedup solutions on asingle computer or provide solutions for datasets that were previously toolarge to fit in memory.
from dask_glm.estimators import LogisticRegression
est = LogisticRegression()
This notebookcompares performance to the latest release of scikit-learn on a 5,000,000dataset running on a single machine. Dask-glm beats scikit-learn by a factorof four, which is also roughly the number of cores on the development machine.However in response thisnotebookby Olivier Grisel shows the development version ofscikit-learn (with a new algorithm) beating out dask-glm by a factor of six.This just goes to show you that being smarter about your algorithms is almostalways a better use of time than adopting parallelism.
The Parquet format is quickly becoming a standard for parallel and distributeddataframes. There are currently two Parquet reader/writers accessible fromPython, fastparquet aNumPy/Numba solution, and Parquet-CPP aC++ solution with wrappers provided by Arrow.Dask.dataframe has supported parquet for a while now with fastparquet.
However, users will now have an option to use Arrow instead by switching theengine= keyword in the dd.read_parquet function.
df = dd.read_parquet('/path/to/mydata.parquet', engine='fastparquet')
df = dd.read_parquet('/path/to/mydata.parquet', engine='arrow')
Hopefully this capability increases the use of both projects and results ingreater feedback to those libraries so that they can continue to advancePython’s access to the Parquet format. As a gentle reminder, you can typicallyget much faster query times by switching from CSV to Parquet. This is oftenmuch more effective than parallel computing.
There is a small multi-dimensional sparse array library here:https://github.com/mrocklin/sparse. Itallows us to represent arrays compactly in memory when most entries are zero.This differs from the standard solution inscipy.sparse,which can only support arrays of dimension two (matrices) and not greater.
pip install sparse
>>> import numpy as np
>>> x = np.random.random(size=(10, 10, 10, 10))
>>> x[x < 0.9] = 0
>>> import sparse
>>> s = sparse.COO(x)
<COO: shape=(10, 10, 10, 10), dtype=float64, nnz=1074>
>>> sparse.tensordot(s, s, axes=((1, 0, 3), (2, 1, 0))).sum(axis=1)
array([ 100.93868073, 128.72312323, 119.12997217, 118.56304153,
133.24522101, 98.33555365, 90.25304866, 98.99823973,
Additionally, this sparse library more faithfully follows the numpy.ndarrayAPI, which is exactly what dask.array expects. Because of this close APImatching dask.array is able to parallelize around sparse arrays just as easilyas it parallelizes around dense numpy arrays. This gives us a decentdistributed multidimensional sparse array library relatively cheaply.
>>> import dask.array as da
>>> x = da.random.random(size=(10000, 10000, 10000, 10000),
... chunks=(100, 100, 100, 100))
>>> x[x < 0.9] = 0
>>> s = x.map_blocks(sparse.COO) # parallel array of sparse arrays
I’ve been playing with a 50GB sample of the 1TB Criteodataset on mylaptop (this is where I’m using sparse arrays). To make computations flow abit faster I’ve improved the performance of Dask’s spill-to-disk policies.
Now, rather than depend on (cloud)pickle we use Dask’s network protocol, whichhandles data more efficiently, compresses well, and has special handling forcommon and important types like NumPy arrays and things built out of NumPyarrays (like sparse arrays).
As a result reading and writing excess data to disk is significantly faster.When performing machine learning computations (which are fairly heavy-weight)disk access is now fast enough that I don’t notice it in practice and runningout of memory doesn’t significantly impact performance.
This is only really relevant when using common types (like numpy arrays) andwhen your computation to disk access ratio is relatively high (such as is thecase for analytic workloads), but it was a simple fix and yielded a nice boostto my personal productivity.
Work by myself in dask/distributed #946.
The Dask.distributed scheduler maintains a fully asynchronous API for use withnon-blocking systems like Tornado or AsyncIO. Because Dask supports Python 2all of our internal code is written with Tornado. While Tornado and AsyncIOcan work together, this generally requires a bit of excess book-keeping, liketurning Tornado futures into AsyncIO futures, etc..
Now there is an AsyncIO specific Client that only includes non-blocking methodsthat are AsyncIO native. This allows for more idiomatic asynchronous code inPython 3.
async with AioClient('scheduler-address:8786') as c:
future = c.submit(func, *args, **kwargs)
result = await future
TLS (previously called SSL) is a common and trusted solution to authenticationand encryption. It is a commonly requested feature by companies ofinstitutions where intra-network security is important. This is currentlybeing worked on now at dask/distributed#1034. I encourage anyone whothis may affect to engage on that pull request.
This recent change in NumPy (literally merged as I was typing this blogpost)allows other array libraries to take control of the the existing NumPy ufuncs,so if you call something like np.exp(my_dask_array) this will no longerconvert to a NumPy array, but will rather call the appropriatedask.array.exp function. This is a big step towards writing generic arraycode that works both on NumPy arrays as well as other array projects likedask.array, xarray, bcolz, sparse, etc..
As with all large changes in NumPy this was accomplished through acollaboration of many people. PR in numpy/numpy #8247.