tl;dr We show off a multi-threaded shared-memory task scheduler. We sharetwo techniques for space-constrained computing. We end with pretty GIFs.
Disclaimer: This post is on experimental buggy code. This is not ready for public use.
Disclaimer 2: This post is technical and intended for people who care abouttask scheduling, not for traditional users.
In this post we discuss a more sophisticated scheduler.In this post we outline a less elegent but more effective scheduler that usesmultiple threads and caching to achieve performance on an interesting class ofarray operations.
We create scheduling policies to minimize the memory footprint of ourcomputation.
First, we establish value by doing a hard thing well. Given two largearrays stored in HDF5:
f = h5py.File('myfile.hdf5')
A = f.create_dataset(name='A', shape=(4000, 2000000), dtype='f8',
chunks=(250, 250), fillvalue=1.0)
B = f.create_dataset(name='B', shape=(4000, 4000), dtype='f8',
chunks=(250, 250), fillvalue=1.0)
We do a transpose and dot product.
from blaze import Data, into
from dask.obj import Array
f = h5py.File('myfile.hdf5')
a = into(Array, f['A'], blockshape=(1000, 1000), name='A')
b = into(Array, f['B'], blockshape=(1000, 1000), name='B')
A = Data(a)
B = Data(b)
expr = A.T.dot(B)
result = into('myfile.hdf5::/C', expr)
This uses all of our cores and can be done with only 100MB or so of ram. Thisis impressive because neither the inputs, outputs, nor any intermediate stageof the computation can fit in memory.
We failed to achieve this exactly (see note at bottom) but still, in theory,we’re great!
To keep a small memory footprint we avoid holding on to unnecessaryintermediate data. The full computation graph of a smaller problemmight look like the following:
Boxes represent data, circles represent functions that run on that data ,arrows specify which functions produce/consume which data.
The top row of circles represent the actual blocked dot products (note the manydata dependence arrows originating from them). The bottom row of circlesrepresents pulling blocks of data from the the A HDF5 dataset to in-memorynumpy arrays. The second row transposes the blocks from the first row and addsmore blocks from B.
Naively performed, this graph can be very bad; we replicate our data four timeshere, once in each of the rows. We pull out all of the chunks, transpose eachof them, and then finally do a dot product. If we couldn’t fit the originaldata in memory then there is no way that this will work.
We resolve this in two ways. First, we don’t cache intermediate values forfast-running functions (like np.transpose). Instead we inline fast functionsinto tasks involving expensive functions (like np.dot).We may end up running the same quick function twice, but at least wewon’t have to store the result. We trade computation for memory.
The result of the graph above with all access and transpose operationsinlined looks like the following:
Now our tasks nest (see below). We run all functions within a nested task asa single operation. (non-LISP-ers avert your eyes):
('x_6', 6, 0): (dotmany, [(np.transpose, (ndslice, 'A', (1000, 1000), 0, 6)),
(np.transpose, (ndslice, 'A', (1000, 1000), 1, 6))],
[(ndslice, 'B', (1000, 1000), 0, 0),
(ndslice, 'B', (1000, 1000), 1, 0)]),
This effectively shoves all of the storage responsibility back down to the HDF5store. We end up pulling out the same blocks multiple times, but repeateddisk access is inevitable on large complex problems.
This is automatic. Dask now includes an inline function that does thisfor you. You just give it a set of “fast” functions to ignore, e.g.
dsk2 = inline(dsk, [np.transpose, ndslice, add, mul, ...])
Now that we have a nice dask to crunch on, we run those tasks with multipleworker threads. This is the job of a scheduler.
We build and document such a schedulerhere. Ittargets a shared-memory single-process multi-threaded environment. It replacesthe elegant 20 line reference solution with a large blob of ugly codefilled with locks and mutable state. Still, it manages the computation sanely,performs some critical optimizations, and uses all of our hardware cores(Moore’s law is dead! Long live Moore’s law!)
Many NumPy operations release the GIL and so are highly amenable tomulti-threading. NumPy programs do not suffer thesingle-active-core-per-process limitation of most Python code.
We follow a fairly standard model. We create a ThreadPool with a fixednumber of workers. We analyze the dask to determine “ready to run” tasks.We send a task to each of our worker threads. As they finish they update therun-state, marking jobs as finished, inserting their results into a sharedcache, and then marking new jobs as ready based on the newly available data.This update process is fully indexed and handled by the worker threadsthemselves (with appropriate locks) making the overhead negligible andhopefully scalable to complex workloads.
When a newly available worker selects a new ready task they often have severalto choose from. We have a choice. The choice we make here is very powerful.The next section talks about our selection policy:
Select tasks that immediately free data resources on completion
Our policy to prefer tasks that free resources lets us run many computations ina very small space. We now show three expressions, their resulting schedules,and an animation showing the scheduler walk through those schedules. Thesewere taken from live runs.
On the right we show an animated GIF of the progress of the followingembarrassingly parallel computation:
expr = (((A + 1) * 2) ** 3)
Circles represent computations, boxes represent data.
Red means actively taking up resources. Red is bad.
Blue means finished or released. Blue is good.
We want to turn all nodes blue while minimizing the number of red boxes we haveat any given time.
The policy to execute tasks that free resources causes “vertical” executionwhen possible. In this example our approach is optimal because the number ofred boxes is kept small throughout the computation. We have one only red boxfor each of our four threads.
We show a more complex expression:
expr = (B - B.mean(axis=0)) + (B.T / B.std())
This extends the class of expressions that we’ve seen so far to reductions andreductions along an axis. The per-chunk reductions start at the bottom anddepend only on the chunk from which they originated. These per-chunk resultsare then concatenated together and re-reduced with the large circles (zoom into see the text concatenate in there.) The next level takes these (small)results and the original data again (note the long edges back down the bottomdata resources) which result in per-chunk subtractions and divisions.
From there on out the work is embarrassing, resembling the computation above.In this case we have relatively little parallelism, so the frontier of redboxes covers the entire image; fortunately the dataset is small.
We show a case where our greedy solution fails miserably:
expr = (A.T.dot(B) - B.mean(axis=0))
The two large functions on the second level from the bottom are the computationof the mean. These are cheap and, once completed,allow each of the blocks of the dot product to terminate quickly and releasememory.
Tragically these mean computations execute at the last possible moment,keeping all of the dot product result trapped in the cache.We have almost a full row of red boxes at one point in the computation.
In this case our greedy solution was short sighted; a slightly more globalsolution would quickly select these large circles to run quickly. Perhapsbetweenness centrality would resole this problem.
We’ll never build a good enough scheduler. We need to be able to fall backto on-disk caching. Actually this isn’t terrible. Highperformance SSDs get close to 1 GB/second throughput and in the complex caseswhere data-aware scheduling fails we probably compute slower than that anyway.
I have a little disk-backed dictionary project,chest, for this but it’s immature. Ingeneral I’d like to see more projects implement the dict interface withinteresting policies.
I have a confession, the first computation, the very large dot product,sometimes crashes my machine. While then scheduler manages memory well I havea memory leak somewhere. I suspect that I use HDF5 improperly.
Blaze currently generates dasks for the following:
We also comply with the NumPy API on these operations.. At the time of writingnotable missing elements include the following:
I’d love to hear what’s important to the community. Re-implementing all ofNumPy is hard, re-implementing a few choice pieces of NumPy is relativelystraightforward. Knowing what those few choices pieces are requires communityinvolvement.
My experience building dynamic schedulers is limited and my approach is likelysuboptimal. It would be great to see other approaches. None of the logic inthis post is specific to Blaze or even to NumPy. To build a scheduler you onlyneed to understand the model of a graph of computations with data dependencies.
If we were ambitious we might consider a distributed scheduler to execute thesetask graphs across many nodes in a distributed-memory situation (like acluster). This is a hard problem but it would open up a different class ofcomputational solutions. The Blaze code to generate the dasks would not change; the graphs that we generate are orthogonal to our choice of scheduler.
I could use help with all of this, either as open source work (links below) orfor money. Continuum has funding for employees and ample interesting problems.