Submit New Event

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Submit News Feature

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Contribute a Blog

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Sign up for Newsletter

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Jul 21, 2020

Faster Scheduling

By

Summary

This post discusses Dask overhead costs for task scheduling,and then lays out a rough plan for acceleration.

This post is written for other maintainers, and often refers to internaldetails. It is not intended for broad readability.

How does this problem present?

When we submit large graphs there is a bit of a delay between us calling.compute() and work actually starting on the workers. In some cases, thatdelay can affect usability and performance.

Additionally, in far fewer cases, the gaps in between tasks can be an issue,especially if those tasks are very short and for some reason can not be madelonger.

Who cares?

First, this is a problem that affects about 1-5% of Dask users. These are peoplewho want to process millions of tasks relatively quickly. Let’s list a few usecases:

  1. Xarray/Pangeo workloads at the 10-100TB scale
  2. NVIDIA RAPIDS workloads on large tabular data (GPUs make computing fast, so other costs becomerelatively larger)
  3. Some mystery use cases inside of some hedge funds

It does not affect the everyday user, who processes 100GB to a few TB of data,and doesn’t mind waiting 10s for things to start running.

Coarse breakdown of costs

When you call x.sum().compute() a few things happen:

  1. Graph generation: Some Python code in a Dask collection library, likedask array, calls the sum function, which generates a task graph on theclient side.
  2. Graph Optimization: We then optimize that graph, also on the clientside, in order to remove unnecessary work, fuse tasks, apply important highlevel optimizations, and more.
  3. Graph Serializtion: We now pack up that graph in a form that can besent over to the scheduler.
  4. Graph Communication: We fire those bytes across a wire over to thescheduler
  5. Scheduler.update_graph: The scheduler receives these bytes, unpacksthem, and then updates its own internal data structures
  6. Scheduling: The scheduler then assigns ready tasks to workers
  7. Communicate to workers: The scheduler sends out lots of smallermessages to each of the workers with the tasks that they can perform
  8. Workers work: The workers then perform this work, and startcommunicating back and forth with the scheduler to receive new instructions

Generally most people today are concerned with steps 1-6. Once things get outto the workers and progress bars start moving people tend to care a bit less(but not zero).

What do other people do?

Let’s look at a few things that other projects do, and see if there are thingsthat we can learn. These are commonly suggested, but there are challenges withmost of them.

  1. Rewrite the scheduler it in C++/Rust/C/Cython
  2. Proposal: Python is slow. Want to make it faster? Don’t use Python. Seeacademic projects.
  3. Challenge: This makes sense for some parts of the pipeline above, but notfor others. It also makes it harder to attract maintainers.
  4. What we should consider: Some parts of the scheduler and optimizationalgorithms could be written in a lower level language, maybe Cython. We’llneed to be careful about maintainability.
  5. Distributed scheduling
  6. Proposal: The scheduler is slow, maybe have many schedulers? See Ray.
  7. Challenge: It’s actually really hard to make the kinds of decisions thatDask has to make if scheduling state is spread on many computers.Distributed scheduling works better when the workload is very eitheruniform or highly decoupled.Distributed scheduling is really attractive to people who like solvinginteresting/hard problems.
  8. What we should consider: We can move some simple logic down to the workers.We’ve already done this with the easy stuff though.It’s not clear how much additional benefit there is here.
  9. Build specialty scheduling around collections
  10. Proposal: If Dask were to become just a dataframe library or just an arraycomputing library then it could special-case things more effectively. SeeSpark, Mars, and others.
  11. Challenge: Yes, but Dask is not a dataframe library or an array library.The three use cases we mention above are all very different.
  12. What we should consider: modules like dask array and dask dataframe shoulddevelop high level query blocks, and we should endeavor tocommunicate these subgraphs over the wire directly so that they are morecompact.

What should we actually do?

Because our pipeline has many stages, each of which can be slow for differentreasons, we’ll have to do many things. Additionally, this is a hard problembecause changing one piece of the project at this level has repurcussions formany other pieces. The rest of this post tries to lay out a consistent set ofchanges. Let’s start with a summary:

  1. For Dask array/dataframe let’s use high level graphs more aggressively sothat we can communicate only abstract representations between the clientand scheduler.
  2. But this breaks low level graph optimizations, fuse, cull, and slice fusionin particular. We can make these unnecessary with two changes:
  3. We can make high level graphs considerably smarter to handle cull and slice fusion
  4. We can move a bit more of the scheduling down to the workers toreplicate the advantages of low-level fusion there
  5. Then, once all of the graph manipulation happens on the scheduler, let’stry to accelerate it, hopefully in a language that the current devcommunity can understand, like Cython
  6. At the same time in parallel, let’s take a look at our network stack

We’ll go into these in more depth below

Graph Generation

High Level Graph History

A year or two ago we moved graph generation costs from user-code-typing time tograph-optimization-time with high level graphs

y = x + 1 # graph generation used to happen here
(y,) = dask.optimize(y,) # now it happens here

This really improved usability, and also let us do some high leveloptimizations which sometimes allowed us to skip some lower-level optimizationcosts.

Can we push this further?

The first four stages of our pipeline happen on the client:

  1. Graph generation: Some Python code in a Dask collection library, likedask array, calls the sum function, which generates a task graph on theclient side.
  2. Graph Optimization: We then optimize that graph, also on the clientside, in order to remove unnecessary work, fuse tasks, apply important highlevel optimizations, and more.
  3. Graph Serializtion: We now pack up that graph in a form that can besent over to the scheduler.
  4. Graph Communication: We fire those bytes across a wire over to thescheduler

If we’re able to stay with the high level graph representation through thesestages all the way until graph communication, then we can communicate a farmore compact representation up to the scheduler. We can drop a lot of thesecosts, at least for the high level collection APIs (delayed and client.submitwould still be slow, client.map might be ok though).

This has a couple of other nice benefits:

  1. User’s code won’t block, and we can alert the user immediately that we’reon the job
  2. We’ve centralized costs in just the scheduler,so there is now only one place where we might have to think about low-level code

(some conversation here: https://github.com/dask/distributed/issues/3872)

However, low-level graph optimizations are going to be a problem

In principle changing the distributed scheduler to accept a variety of graphlayer types is a tedious but straightforward problem. I’m not concerned.

The bigger concern is what to do with low-level graph optimizations.Today we have three of these that really matter:

  1. Task fusion: this is what keeps your read_parquet task merged with yoursubsequent blockwise tasks
  2. Culling: this is what makes df.head() or x[0] fast
  3. Slice fusion: this is why x[:100][5] works well

In order for us to transmit abstract graph layers up to the scheduler, we needto remove the need for these low level graph optimizations. I think that wecan do this with a combination of two approaches:

More clever high level graph manipulation

We already do this a bit with blockwise, which has its own fusion, and whichremoves much of the need for fusion generally. But other blockwise-likeoperations, like read_* will probably have to join the Blockwise family.

Getting culling to work properly may require us to teach each of the individualgraph layers how to track dependencies in each layer type and cull themselves.This may get tricky.

Slicing is doable, we just need someone to go in, grok all of the currentslicing optimizations, and make high level graph layers for thesecomputations. This would be a great project for a sharp masters student

Send speculative tasks to the workers

High level Blockwise fusion handles many of the use cases for low-level fusion,but not all. For example I/O layers like dd.read_parquet or da.from_zarraren’t fused at a high level.

We can resolve this either by making them blockwise layers (this requiresexpanding the blockwise abstraction, which may be hard) or alternatively we canstart sending not-yet-ready tasks to workers before all of their dependenciesare finished if we’re highly confident that we know where they’re going to go.This would give us some of the same results of fusion, but would keep all ofthe task types separate (which would be nice for diagnostics) and might stillgive us some of the same performance benefits that we get from fusion.

Unpack abstract graph layers on the scheduler

So after we’ve removed the need for low level optimizations, and we just sendthe abstract graph layers up to the scheduler directly, we’ll need to teach thescheduler how to unpack those graph layers.

This is a little tricky because the Scheduler can’t run user Python code (forsecurity reasons). We’ll have to register layer types (like blockwise,rechunk, dataframe shuffle) that the scheduler knows about and trusts ahead oftime. We’ll still always support custom layers, and these will be at the samespeed that they’ve always been, but hopefully there will be far less need forthese if we go all-in on high level layers.

Rewrite scheduler in low-level language

Once most of the finicky bits are moved to the scheduler, we’ll have one placewhere we can focus on low level graph state manipulation.

Dask’s distributed scheduler is two things:

  1. A Tornado TCP application that receives signals from clients and workersand send signals out to clients and workers
  2. This is async-heavy networking code
  3. A complex state machine internally that responds to those state changes
  4. This is a complex data structure heavy Python code

Networking

Jim has an interesting project here that shows promise: https://github.com/jcrist/eryReducing latency between workers and the scheduler would be good, and wouldhelp to accelerate stages 7-8 in the pipeline listed at the top of this post.

State machine

Rewriting the state machine in some lower level language would be fine.Ideally this would be in a language that was easy for the current maintainercommunity to maintain, (Cython?) but we may also consider making a more firminterface here that would allow other groups to experiment safely.

There are some advantages to this (more experimentation by different groups)but also some costs (splitting of core efforts and mismatches for users).Also, I suspect that splitting out also probably means that we’ll probably lose the dashboard,unless those other groups are very careful to expose the same state to Bokeh.

There is more exploration to do here. Regardless I think that it probably makessense to try to isolate the state machine from the networking system.Maybe this also makes it easier for people to profile in isolation.

In speaking with a few different groups most people have expressed reservationabout having multiple different state machine codes. This was done inMapReduce and Spark and resulted in difficult to maintain community dynamics.

High Level Graph Optimizations

Once we have everything in smarter high level graph layers,we will also be more ripe for optimization.

We’ll need a better way to write down these optimizations with a separatedtraversal system and a set of rules. A few of us havewritten these things before, maybe it’s time we revisit them.

What we need

This would require some effort, but I think that it would hit several highprofile problems at once. There are a few tricky things to get right:

  1. A framework for high level graph layers
  2. An optimization system for high level graph layers
  3. Separation of the scheduler into two parts

For this I think that we’ll need people who are fairly familiar with Dask to do this right.

And there there is a fair amount of follow-on work

  1. Build a hierarchy of layers for dask dataframe
  2. Build a hierarchy of layers for dask array
  3. Build optimizations for those to remove the need for low level graphoptimizations
  4. Rewrite core parts of the scheduler in Cython
  5. Experiment with the networking layer, maybe with a new Comm

I’ve been thinking about the right way to enact this change.Historically most Dask changes over the past few years have been incremental orperipheral, due to how burdened the maintainers are. There might be enoughpressure on this problem though that we can get some dedicated engineeringeffort from a few organizations though, which might change how possible this is.We’ve gotten 25% time from a few groups. I’m curious if we can gate 100% timefor some people for a few months.