Submit New Event

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Submit News Feature

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Contribute a Blog

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Sign up for Newsletter

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Jul 23, 2015

Custom Parallel Workflows

By

This work is supported by Continuum Analyticsand the XDATA Programas part of the Blaze Project

tl;dr: We motivate the expansion of parallel programming beyond bigcollections. We discuss the usability custom of dask graphs.

Recent Parallel Work Focuses on Big Collections

Parallel databases, Spark, and Dask collections all provide large distributedcollections that handle parallel computation for you. You put data into thecollection, program with a small set of operations like map or groupby, andthe collections handle the parallel processing. This idea has become sopopular that there are now a dozen projects promising big and friendly Pandasclones.

This is good. These collections provide usable, high-level interfaces for alarge class of common problems.

Custom Workloads

However, many workloads are too complex for these collections. Workloads mightbe complex either because they come from sophisticated algorithms(as we saw in a recent post on SVD) or because they come from the real world,where problems tend to be messy.

In these cases I tend to see people do two things

  1. Fall back to multiprocessing, MPI or some other explicit form of parallelism
  2. Perform mental gymnastics to fit their problem into Spark using aclever choice of keys. These cases often fail to acheive much speedup.

Direct Dask Graphs

Historically I’ve recommended the manual construction of dask graphs in thesecases. Manual construction of dask graphs lets you specify fairly arbitraryworkloads that then use the dask schedulers to execute in parallel.The dask docs hold thefollowing example of a simple data processing pipeline:

def load(filename):
...
def clean(data):
...
def analyze(sequence_of_data):
...
def store(result):
...

dsk = {'load-1': (load, 'myfile.a.data'),
'load-2': (load, 'myfile.b.data'),
'load-3': (load, 'myfile.c.data'),
'clean-1': (clean, 'load-1'),
'clean-2': (clean, 'load-2'),
'clean-3': (clean, 'load-3'),
'analyze': (analyze, ['clean-%d' % i for i in [1, 2, 3]]),
'store': (store, 'analyze')}

from dask.multiprocessing import get
get(dsk, 'store') # executes in parallel

Feedback from users is that this is interesting and powerful but thatprogramming directly in dictionaries is not inutitive, doesn’t integrate wellwith IDEs, and is prone to error.

Introducing dask.do

To create the same custom parallel workloads using normal-ish Python code weuse the dask.do function.This do function turns any normal Python function into a delayed version thatadds to a dask graph. The do function lets us rewrite the computation aboveas follows:

from dask import do

loads = [do(load)('myfile.a.data'),
do(load)('myfile.b.data'),
do(load)('myfile.c.data')]

cleaned = [do(clean)(x) for x in loads]

analysis = do(analyze)(cleaned)
result = do(store)(analysis)

The explicit function calls here don’t perform work directly; instead theybuild up a dask graph which we can then execute in parallel with our choice ofscheduler.

from dask.multiprocessing import get
result.compute(get=get)

This interface was suggested by Gael Varoquauxbased on his experience with joblib. Itwas implemented by Jim Cristin PR (#408).

Example: Nested Cross Validation

I sat down with a Machine learning student, GabrielKrummenacher and worked to parallelize asmall code to do nested cross validation. Below is a comparison of asequential implementation that has been parallelized using dask.do:

You can safely skip reading this code in depth. The take-away is that it’ssomewhat involved but that the addition of parallelism is light.

parallized cross validation code

The parallel version runs about four times faster on my notebook.Disclaimer: The sequential version presented here is just a downgraded versionof the parallel code, hence why they look so similar. This is availableon github.

So the result of our normal imperative-style for-loop code is a fullyparallelizable dask graph. We visualize that graph below.

test_score.visualize()

Cross validation dask graph

Help

Is this a useful interface? It would be great if people could try this outand generate feedback on dask.do.

For more information on dask.do see thedask imperative documentation.