This post discusses Dask overhead costs for task scheduling,and then lays out a rough plan for acceleration.
This post is written for other maintainers, and often refers to internaldetails. It is not intended for broad readability.
When we submit large graphs there is a bit of a delay between us calling.compute() and work actually starting on the workers. In some cases, thatdelay can affect usability and performance.
Additionally, in far fewer cases, the gaps in between tasks can be an issue,especially if those tasks are very short and for some reason can not be madelonger.
First, this is a problem that affects about 1-5% of Dask users. These are peoplewho want to process millions of tasks relatively quickly. Let’s list a few usecases:
It does not affect the everyday user, who processes 100GB to a few TB of data,and doesn’t mind waiting 10s for things to start running.
When you call x.sum().compute() a few things happen:
Generally most people today are concerned with steps 1-6. Once things get outto the workers and progress bars start moving people tend to care a bit less(but not zero).
Let’s look at a few things that other projects do, and see if there are thingsthat we can learn. These are commonly suggested, but there are challenges withmost of them.
Because our pipeline has many stages, each of which can be slow for differentreasons, we’ll have to do many things. Additionally, this is a hard problembecause changing one piece of the project at this level has repurcussions formany other pieces. The rest of this post tries to lay out a consistent set ofchanges. Let’s start with a summary:
We’ll go into these in more depth below
A year or two ago we moved graph generation costs from user-code-typing time tograph-optimization-time with high level graphs
y = x + 1 # graph generation used to happen here
(y,) = dask.optimize(y,) # now it happens here
This really improved usability, and also let us do some high leveloptimizations which sometimes allowed us to skip some lower-level optimizationcosts.
The first four stages of our pipeline happen on the client:
If we’re able to stay with the high level graph representation through thesestages all the way until graph communication, then we can communicate a farmore compact representation up to the scheduler. We can drop a lot of thesecosts, at least for the high level collection APIs (delayed and client.submitwould still be slow, client.map might be ok though).
This has a couple of other nice benefits:
(some conversation here: https://github.com/dask/distributed/issues/3872)
In principle changing the distributed scheduler to accept a variety of graphlayer types is a tedious but straightforward problem. I’m not concerned.
The bigger concern is what to do with low-level graph optimizations.Today we have three of these that really matter:
In order for us to transmit abstract graph layers up to the scheduler, we needto remove the need for these low level graph optimizations. I think that wecan do this with a combination of two approaches:
We already do this a bit with blockwise, which has its own fusion, and whichremoves much of the need for fusion generally. But other blockwise-likeoperations, like read_* will probably have to join the Blockwise family.
Getting culling to work properly may require us to teach each of the individualgraph layers how to track dependencies in each layer type and cull themselves.This may get tricky.
Slicing is doable, we just need someone to go in, grok all of the currentslicing optimizations, and make high level graph layers for thesecomputations. This would be a great project for a sharp masters student
High level Blockwise fusion handles many of the use cases for low-level fusion,but not all. For example I/O layers like dd.read_parquet or da.from_zarraren’t fused at a high level.
We can resolve this either by making them blockwise layers (this requiresexpanding the blockwise abstraction, which may be hard) or alternatively we canstart sending not-yet-ready tasks to workers before all of their dependenciesare finished if we’re highly confident that we know where they’re going to go.This would give us some of the same results of fusion, but would keep all ofthe task types separate (which would be nice for diagnostics) and might stillgive us some of the same performance benefits that we get from fusion.
So after we’ve removed the need for low level optimizations, and we just sendthe abstract graph layers up to the scheduler directly, we’ll need to teach thescheduler how to unpack those graph layers.
This is a little tricky because the Scheduler can’t run user Python code (forsecurity reasons). We’ll have to register layer types (like blockwise,rechunk, dataframe shuffle) that the scheduler knows about and trusts ahead oftime. We’ll still always support custom layers, and these will be at the samespeed that they’ve always been, but hopefully there will be far less need forthese if we go all-in on high level layers.
Once most of the finicky bits are moved to the scheduler, we’ll have one placewhere we can focus on low level graph state manipulation.
Dask’s distributed scheduler is two things:
Jim has an interesting project here that shows promise: https://github.com/jcrist/eryReducing latency between workers and the scheduler would be good, and wouldhelp to accelerate stages 7-8 in the pipeline listed at the top of this post.
Rewriting the state machine in some lower level language would be fine.Ideally this would be in a language that was easy for the current maintainercommunity to maintain, (Cython?) but we may also consider making a more firminterface here that would allow other groups to experiment safely.
There are some advantages to this (more experimentation by different groups)but also some costs (splitting of core efforts and mismatches for users).Also, I suspect that splitting out also probably means that we’ll probably lose the dashboard,unless those other groups are very careful to expose the same state to Bokeh.
There is more exploration to do here. Regardless I think that it probably makessense to try to isolate the state machine from the networking system.Maybe this also makes it easier for people to profile in isolation.
In speaking with a few different groups most people have expressed reservationabout having multiple different state machine codes. This was done inMapReduce and Spark and resulted in difficult to maintain community dynamics.
Once we have everything in smarter high level graph layers,we will also be more ripe for optimization.
We’ll need a better way to write down these optimizations with a separatedtraversal system and a set of rules. A few of us havewritten these things before, maybe it’s time we revisit them.
This would require some effort, but I think that it would hit several highprofile problems at once. There are a few tricky things to get right:
For this I think that we’ll need people who are fairly familiar with Dask to do this right.
And there there is a fair amount of follow-on work
I’ve been thinking about the right way to enact this change.Historically most Dask changes over the past few years have been incremental orperipheral, due to how burdened the maintainers are. There might be enoughpressure on this problem though that we can get some dedicated engineeringeffort from a few organizations though, which might change how possible this is.We’ve gotten 25% time from a few groups. I’m curious if we can gate 100% timefor some people for a few months.