This work is supported by Continuum Analyticsand the XDATA Programas part of the Blaze Project
tl;dr: We motivate the expansion of parallel programming beyond bigcollections. We discuss the usability custom of dask graphs.
Parallel databases, Spark, and Dask collections all provide large distributedcollections that handle parallel computation for you. You put data into thecollection, program with a small set of operations like map or groupby, andthe collections handle the parallel processing. This idea has become sopopular that there are now a dozen projects promising big and friendly Pandasclones.
This is good. These collections provide usable, high-level interfaces for alarge class of common problems.
However, many workloads are too complex for these collections. Workloads mightbe complex either because they come from sophisticated algorithms(as we saw in a recent post on SVD) or because they come from the real world,where problems tend to be messy.
In these cases I tend to see people do two things
Historically I’ve recommended the manual construction of dask graphs in thesecases. Manual construction of dask graphs lets you specify fairly arbitraryworkloads that then use the dask schedulers to execute in parallel.The dask docs hold thefollowing example of a simple data processing pipeline:
def load(filename):
...
def clean(data):
...
def analyze(sequence_of_data):
...
def store(result):
...
dsk = {'load-1': (load, 'myfile.a.data'),
'load-2': (load, 'myfile.b.data'),
'load-3': (load, 'myfile.c.data'),
'clean-1': (clean, 'load-1'),
'clean-2': (clean, 'load-2'),
'clean-3': (clean, 'load-3'),
'analyze': (analyze, ['clean-%d' % i for i in [1, 2, 3]]),
'store': (store, 'analyze')}
from dask.multiprocessing import get
get(dsk, 'store') # executes in parallel
Feedback from users is that this is interesting and powerful but thatprogramming directly in dictionaries is not inutitive, doesn’t integrate wellwith IDEs, and is prone to error.
To create the same custom parallel workloads using normal-ish Python code weuse the dask.do function.This do function turns any normal Python function into a delayed version thatadds to a dask graph. The do function lets us rewrite the computation aboveas follows:
from dask import do
loads = [do(load)('myfile.a.data'),
do(load)('myfile.b.data'),
do(load)('myfile.c.data')]
cleaned = [do(clean)(x) for x in loads]
analysis = do(analyze)(cleaned)
result = do(store)(analysis)
The explicit function calls here don’t perform work directly; instead theybuild up a dask graph which we can then execute in parallel with our choice ofscheduler.
from dask.multiprocessing import get
result.compute(get=get)
This interface was suggested by Gael Varoquauxbased on his experience with joblib. Itwas implemented by Jim Cristin PR (#408).
I sat down with a Machine learning student, GabrielKrummenacher and worked to parallelize asmall code to do nested cross validation. Below is a comparison of asequential implementation that has been parallelized using dask.do:
You can safely skip reading this code in depth. The take-away is that it’ssomewhat involved but that the addition of parallelism is light.
The parallel version runs about four times faster on my notebook.Disclaimer: The sequential version presented here is just a downgraded versionof the parallel code, hence why they look so similar. This is availableon github.
So the result of our normal imperative-style for-loop code is a fullyparallelizable dask graph. We visualize that graph below.
test_score.visualize()
Is this a useful interface? It would be great if people could try this outand generate feedback on dask.do.
For more information on dask.do see thedask imperative documentation.