Submit New Event

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Submit News Feature

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Contribute a Blog

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Sign up for Newsletter

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Jan 24, 2017

Custom Parallel Algorithms on a Cluster with Dask

By

This work is supported by Continuum Analyticsthe XDATA Programand the Data Driven Discovery Initiative from the MooreFoundation

Summary

This post describes Dask as a computational task scheduler that fits somewhereon a spectrum between big data computing frameworks like Hadoop/Spark and taskschedulers like Airflow/Celery/Luigi. We see how, by combining elements fromboth of these types of systems Dask is able to handle complex data scienceproblems particularly well.

This post is in contrast to two recent posts on structured parallelcollections:

  1. Distributed DataFrames
  2. Distributed Arrays

Big Data Collections

Most distributed computing systems like Hadoop or Spark or SQL databasesimplement a small but powerful set of parallel operations like map, reduce,groupby, and join. As long as you write your programs using only thoseoperations then the platforms understand your program and serve you well. Mostof the time this is great because most big data problems are pretty simple.

However, as we explore new complex algorithms or messier data science problems,these large parallel operations start to become insufficiently flexible. Forexample, consider the following data loading and cleaning problem:

  1. Load data from 100 different files (this is a simple map operation)
  2. Also load a reference dataset from a SQL database (not parallel at all, butcould run alongside the map above)
  3. Normalize each of the 100 datasets against the reference dataset (sort oflike a map, but with another input)
  4. Consider a sliding window of every three normalized datasets (Might be ableto hack this with a very clever join? Not sure.)
  5. Of all of the 98 outputs of the last stage, consider all pairs. (Join orcartesian product) However, because we don’t want to compute all ~10000possibilities, let’s just evaluate a random sample of these pairs
  6. Find the best of all of these possibilities (reduction)

In sequential for-loopy code this might look like the following:

filenames = ['mydata-%d.dat' % i for i in range(100)]
data = [load(fn) for fn in filenames]

reference = load_from_sql('sql://mytable')
processed = [process(d, reference) for d in data]

rolled = []
for i in range(len(processed) - 2):
a = processed[i]
b = processed[i + 1]
c = processed[i + 2]
r = roll(a, b, c)
rolled.append(r)

compared = []
for i in range(200):
a = random.choice(rolled)
b = random.choice(rolled)
c = compare(a, b)
compared.append(c)

best = reduction(compared)

This code is clearly parallelizeable, but it’s not clear how to write it downas a MapReduce program, a Spark computation, or a SQL query. These toolsgenerally fail when asked to express complex or messy problems. We can stilluse Hadoop/Spark to solve this problem, but we are often forced to change andsimplify our objectives a bit. (This problem is not particularly complex, and Isuspect that there are clever ways to do it, but it’s not trivial and ofteninefficient.)

Task Schedulers

So instead people use task schedulers like Celery, Luigi, or Airflow. Thesesystems track hundreds of tasks, each of which is just a normal Pythonfunction that runs on some normal Python data. The task scheduler tracksdependencies between tasks and so runs as many as it can at once if they don’tdepend on each other.

This is a far more granular approach than the Big-Bulk-Collection approach ofMapReduce and Spark. However systems like Celery, Luigi, and Airflow are alsogenerally less efficient. This is both because they know less about their computations (map is much easier to schedule than an arbitrary graph) and because they just don’t have machinery for inter-worker communication, efficient serialization of custom datatypes, etc..

Dask Mixes Task Scheduling with Efficient Computation

Dask is both a big data system like Hadoop/Spark that is aware of resilience,inter-worker communication, live state, etc. and also a general task schedulerlike Celery, Luigi, or Airflow, capable of arbitrary task execution.

Many Dask users use something like Dask dataframe, which generates these graphsautomatically, and so never really observe the task scheduler aspect of DaskThis is, however, the core of what distinguishes Dask from other systems likeHadoop and Spark. Dask is incredibly flexible in the kinds of algorithms itcan run. This is because, at its core, it can run any graph of tasks and notjust map, reduce, groupby, join, etc.. Users can do this natively, withouthaving to subclass anything or extend Dask to get this extra power.

There are significant performance advantages to this. For example:

  1. Dask.dataframe can easily represent nearest neighbor computations forfast time-series algorithms
  2. Dask.array can implement complex linear algebra solvers or SVD algorithmsfrom the latest research
  3. Complex Machine Learning algorithms are often easier to implement in Dask,allowing it to be more efficient through smarter algorithms, as well asthrough scalable computing.
  4. Complex hierarchies from bespoke data storage solutions can be explicitlymodeled and loaded in to other Dask systems

This doesn’t come for free. Dask’s scheduler has to be very intelligent tosmoothly schedule arbitrary graphs while still optimizing for data locality,worker failure, minimal communication, load balancing, scarce resources likeGPUs and more. It’s a tough job.

Dask.delayed

So let’s go ahead and run the data ingestion job described with Dask.

We craft some fake functions to simulate actual work:

import random
from time import sleep

def load(address):
sleep(random.random() / 2)

def load_from_sql(address):
sleep(random.random() / 2 + 0.5)

def process(data, reference):
sleep(random.random() / 2)

def roll(a, b, c):
sleep(random.random() / 5)

def compare(a, b):
sleep(random.random() / 10)

def reduction(seq):
sleep(random.random() / 1)

We annotate these functions with dask.delayed, which changes a function sothat instead of running immediately it captures its inputs and puts everythinginto a task graph for future execution.

from dask import delayed

load = delayed(load)
load_from_sql = delayed(load_from_sql)
process = delayed(process)
roll = delayed(roll)
compare = delayed(compare)
reduction = delayed(reduction)

Now we just call our normal Python for-loopy code from before. However nowrather than run immediately our functions capture a computational graph thatcan be run elsewhere.

filenames = ['mydata-%d.dat' % i for i in range(100)]
data = [load(fn) for fn in filenames]

reference = load_from_sql('sql://mytable')
processed = [process(d, reference) for d in data]

rolled = []
for i in range(len(processed) - 2):
a = processed[i]
b = processed[i + 1]
c = processed[i + 2]
r = roll(a, b, c)
rolled.append(r)

compared = []
for i in range(200):
a = random.choice(rolled)
b = random.choice(rolled)
c = compare(a, b)
compared.append(c)

best = reduction(compared)

Here is an image of that graph for a smaller input of only 10 files and 20random pairs

Custom ETL Dask Graph

We can connect to a small cluster with 20 cores

from dask.distributed import Client
client = Client('scheduler-address:8786')

We compute the result and see the trace of the computation running in realtime.

result = best.compute()

Custom ETL Task Stream

The completed Bokeh image below is interactive.You can pan and zoom by selecting the tools in the upper right. You can seeevery task, which worker it ran on and how long it took by hovering over therectangles.

We see that we use all 20 cores well. Intermediate results are transferredbetween workers as necessary (these are the red rectangles). We can scale thisup as necessary. Dask scales to thousands of cores.

Final Thoughts

Dask’s ability to write down arbitrary computational graphsCelery/Luigi/Airflow-style and yet run them with the scalability promises ofHadoop/Spark allows for a pleasant freedom to write comfortably and yet stillcompute scalably. This ability opens up new possibilities both to support moresophisticated algorithms and also to handle messy situations that arise in thereal world (enterprise data systems are sometimes messy) while still remainingwithin the bounds of “normal and supported” Dask operation.