This work is supported by Continuum Analyticsand the Data Driven Discovery Initiative from the MooreFoundation.
We measure the performance of Dask’s distributed scheduler for a variety ofdifferent workloads under increasing scales of both problem and cluster size.This helps to answer questions about dask’s scalability and also helps toeducate readers on the sorts of computations that scale well.
We will vary our computations in a few ways to see how they stress performance.We consider the following:
We will start with benchmarks for straight tasks, which are the most flexiblesystem and also the easiest to understand. This will help us to understandscaling limits on arrays and dataframes.
Note: we did not tune our benchmarks or configuration at all for theseexperiments. They are well below what is possible, but perhaps representativeof what a beginning user might experience upon setting up a cluster withoutexpertise or thinking about configuration.
you can safely skip this section if you’re in a rush
This is a technical document, not a marketing piece. These benchmarks adhereto the principles laid out in thisblogpost andattempt to avoid those pitfalls around developer bias. In particular thefollowing are true:
We estimate that expert use would result in about a 5-10x scaling improvementover what we’ll see. We’ll detail how to improve scaling with expert methodsat the bottom of the post.
All that being said the author of this blogpost is paid to write this softwareand so you probably shouldn’t trust him. We invite readers to explore thingsindependently. All configuration, notebooks, plotting code, and data areavailable below:
We start by benchmarking the task scheduling API. Dask’s task scheduling APIsare at the heart of the other “big data” APIs (like dataframes). We start withtasks because they’re the simplest and most raw representation of Dask. Mostlywe’ll run the following functions on integers, but you could fill in anyfunction here, like a pandas dataframe method or sklearn routine.
import time
def inc(x):
return x + 1
def add(x, y):
return x + y
def slowinc(x, delay=0.1):
time.sleep(delay)
return x + 1
def slowadd(x, y, delay=0.1):
time.sleep(delay)
return x + y
def slowsum(L, delay=0.1):
time.sleep(delay)
return sum(L)
We run the following code on our cluster and measure how long they take tocomplete:
futures = client.map(slowinc, range(4 * n), delay=1) # 1s delay
wait(futures)
futures = client.map(slowinc, range(100 * n_cores)) # 100ms delay
wait(futures)
futures = client.map(inc, range(n_cores * 200)) # fast
wait(futures)
We see that for fast tasks the system can process around 2000-3000 tasks persecond. This is mostly bound by scheduler and client overhead. Adding moreworkers into the system doesn’t give us any more tasks per second. However ifour tasks take any amount of time (like 100ms or 1s) then we see decentspeedups.
If you switch to linear scales on the plots, you’ll see that as we get out to512 cores we start to slow down by about a factor of two. I’m surprised to seethis behavior (hooray benchmarks) because all of Dask’s scheduling decisionsare independent of cluster size. My first guess is that the scheduler may bebeing swamped with administrative messages, but we’ll have to dig in a bitdeeper here.
Not all computations are embarrassingly parallel. Many computations havedependencies between them. Consider a tree reduction, where we combineneighboring elements until there is only one left. This stresses taskdependencies and small data movement.
from dask import delayed
L = range(2**7 * n)
while len(L) > 1: # while there is more than one element left
# add neighbors together
L = [delayed(slowadd)(a, b) for a, b in zip(L[::2], L[1::2])]
L[0].compute()
We see similar scaling to the embarrassingly parallel case. Things proceedlinearly until they get to around 3000 tasks per second, at which point theyfall behind linear scaling. Dask doesn’t seem to mind dependencies, evencustom situations like this one.
Nearest neighbor computations are common in data analysis when you need to share a bit of data between neighboring elements, such as frequently occurs in timeseries computations in dataframes or overlapping image processing in arrays or PDE computations.
L = range(20 * n)
L = client.map(slowadd, L[:-1], L[1:])
L = client.map(slowadd, L[:-1], L[1:])
wait(L)
Scaling is similar to the tree reduction case. Interesting dependencystructures don’t incur significant overhead or scaling costs.
We consider a computation that isn’t parallel at all, but is instead highly sequential. Increasing the number of workers shouldn’t help here (there is only one thing to do at a time) but this does demonstrate the extra stresses that arise from a large number of workers. Note that we have turned off task fusion for this, so here we’re measuring how many roundtrips can occur between the scheduler and worker every second.
x = 1
for i in range(100):
x = delayed(inc)(x)
x.compute()
So we get something like 100 roundtrips per second, or around 10ms roundtriplatencies. It turns out that a decent chunk of this cost was due to anoptimization; workers prefer to batch small messages for higher throughput. Inthis case that optimization hurts us. Still though, we’re about 2-4x fasterthan video frame-rate here (video runs at around 24Hz or 40ms between frames).
Finally we consider a reduction that consumes whichever futures finish firstand adds them together. This is an example of using client-side logic withinthe computation, which is often helpful in complex algorithms. This alsoscales a little bit better because there are fewer dependencies to track withinthe scheduler. The client takes on a bit of the load.
from dask.distributed import as_completed
futures = client.map(slowinc, range(n * 20))
pool = as_completed(futures)
batches = pool.batches()
while True:
try:
batch = next(batches)
if len(batch) == 1:
batch += next(batches)
except StopIteration:
break
future = client.submit(slowsum, batch)
pool.add(future)
We show most of the plots from above for comparison.
When we combine NumPy arrays with the task scheduling system above we getdask.array, a distributed multi-dimensional array. This section showscomputations like the last section (maps, reductions, nearest-neighbor), butnow these computations are motivated by actual data-oriented computations andinvolve real data movement.
We make a square array with somewhat random data. This array scales with thenumber of cores. We cut it into uniform chunks of size 2000 by 2000.
N = int(5000 * math.sqrt(n_cores))
x = da.random.randint(0, 10000, size=(N, N), chunks=(2000, 2000))
x = x.persist()
wait(x)
Creating this array is embarrassingly parallel. There is an odd corner in thegraph here that I’m not able to explain.
We perform some numerical computation element-by-element on this array.
y = da.sin(x) ** 2 + da.cos(x) ** 2
y = y.persist()
wait(y)
This is also embarrassingly parallel. Each task here takes around 300ms(the time it takes to call this on a single 2000 by 2000 numpy array chunk).
We sum the array. This is implemented as a tree reduction.
x.std().compute()
We get a single element from the array. This shouldn’t get any faster withmore workers, but it may get slower depending on how much base-line load aworker adds to the scheduler.
x[1234, 4567].compute()
We get around 400-800 bytes per second, which translates to response times of10-20ms, about twice the speed of video framerate. We see that performancedoes degrade once we have a hundred or so active connections.
We add the array to its transpose. This forces different chunks to move aroundthe network so that they can add to each other. Roughly half of the arraymoves on the network.
y = x + x.T
y = y.persist()
wait(y)
The task structure of this computation is something like nearest-neighbors. Ithas a regular pattern with a small number of connections per task. It’s reallymore a test of the network hardware, which we see does not impose anyadditional scaling limitations (this looks like normal slightly-sub-linearscaling).
Sometimes communication is composed of many small transfers. For example ifyou have a time series of images so that each image is a chunk, you might wantto rechunk the data so that all of the time values for each pixel are in achunk instead. Doing this can be very challenging because every output chunkrequires a little bit of data from every input chunk, resulting in potentiallyn-squared transfers.
y = x.rechunk((20000, 200)).persist()
wait(y)
y = y.rechunk((200, 20000)).persist()
wait(y)
This computation can be very hard. We see that dask does it more slowly thanfast computations like reductions, but it still scales decently well up tohundreds of workers.
Dask.array includes the ability to overlap small bits of neighboring blocks toenable functions that require a bit of continuity like derivatives or spatialsmoothing functions.
y = x.map_overlap(slowinc, depth=1, delay=0.1).persist()
wait(y)
We can combine Pandas Dataframes with Dask to obtain Dask dataframes,distributed tables. This section will be much like the last section on arraysbut will instead focus on pandas-style computations.
We make an array of random integers with ten columns and two million rows percore, but into chunks of size one million. We turn this into a dataframe ofintegers:
x = da.random.randint(0, 10000, size=(N, 10), chunks=(1000000, 10))
df = dd.from_dask_array(x).persist()
wait(df)
We can perform 100ms tasks or try out a bunch of arithmetic.
y = df.map_partitions(slowinc, meta=df).persist()
wait(y)
y = (df[0] + 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 + 10).persist()
wait(y)
Similarly we can try random access with loc.
df.loc[123456].compute()
We can try reductions along the full dataset or a single series:
df.std().compute()
df[0].std().compute()
Groupby aggregations like df.groupby(...).column.mean() operate verysimilarly to reductions, just with slightly more complexity.
df.groupby(0)[1].mean().compute()
However operations like df.groupby(...).apply(...) are much harder toaccomplish because we actually need to construct the groups. This requires afull shuffle of all of the data, which can be quite expensive.
This is also the same operation that occurs when we sort or call set_index.
df.groupby(0).apply(len).compute() # this would be faster as df.groupby(0).size()
y = df.set_index(1).persist()
wait(y)
This still performs decently and scales well out to a hundred or so workers.
Timeseries operations often require nearest neighbor computations. Here welook at rolling aggregations, but cumulative operations, resampling, and soon are all much the same.
y = df.rolling(5).mean().persist()
wait(y)
Let’s start with a few main observations:
lengths. This suggests that we may have an overhead issue that needs to beresolved.
So given our experience here, let’s now tweak settings to make Dask run well.We want to avoid two things:
So lets change some things:
When we make these changes we find that all metrics improve at larger scales.Some notable improvements are included in a table below (sorry for not havingpretty plots in this case).
We see that for some operations we can get significant improvements(dask.dataframe is now churning through data at 60/s) and for other operationsthat are largely scheduler or network bound this doesn’t strongly improve thesituation (and sometimes hurts).
Still though, even with naive settings we’re routinely pushing through 10s ofgigabytes a second on a modest cluster. These speeds are available for a verywide range of computations.
Hopefully these notes help people to understand Dask’s scalability. Like alltools it has limits, but even under normal settings Dask should scale well outto a hundred workers or so. Once you reach this limit you might want to starttaking other factors into consideration, especially threads-per-worker andblock size, both of which can help push well into the thousands-of-cores range.
The included notebooks are self contained, with code to both run and time thecomputations as well as produce the Bokeh figures. I would love to see otherpeople reproduce these benchmarks (or others!) on different hardware or withdifferent settings.
This blogpost made use of the following tools: