Submit New Event

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Submit News Feature

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Contribute a Blog

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Sign up for Newsletter

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Aug 16, 2016

Dask for Institutions

By

This work is supported by Continuum Analytics

Introduction

Institutions use software differently than individuals. Over the last fewmonths I’ve had dozens of conversations about using Dask within largerorganizations like universities, research labs, private companies, andnon-profit learning systems. This post provides a very coarse summary of thoseconversations and extracts common questions. I’ll then try to answer thosequestions.

Note: some of this post will be necessarily vague at points. Some companiesprefer privacy. All details here are either in public Dask issues or have comeup with enough institutions (say at least five) that I’m comfortable listingthe problem here.

Common story

Institution X, a university/research lab/company/… has manyscientists/analysts/modelers who develop models and analyze data with Python,the PyData stack like NumPy/Pandas/SKLearn, and a large amount of custom code.These models/data sometimes grow to be large enough to need a moderately largeamount of parallel computing.

Fortunately, Institution X has an in-house cluster acquired for exactly thispurpose of accelerating modeling and analysis of large computations anddatasets. Users can submit jobs to the cluster using a job scheduler likeSGE/LSF/Mesos/Other.

However the cluster is still under-utilized and the users are still asking forhelp with parallel computing. Either users aren’t comfortable using theSGE/LSF/Mesos/Other interface, it doesn’t support sufficiently complex/dynamicworkloads, or the interaction times aren’t good enough for the interactive usethat users appreciate.

There was an internal effort to build a more complex/interactive/Pythonicsystem on top of SGE/LSF/Mesos/Other but it’s not particularly mature anddefinitely isn’t something that Institution X wants to pursue. It turned outto be a harder problem than expected to design/build/maintain such a systemin-house. They’d love to find an open source solution that was well featuredand maintained by a community.

The Dask.distributed scheduler looks like it’s 90% of the system thatInstitution X needs. However there are a few open questions:

  • How do we integrate dask.distributed with the SGE/LSF/Mesos/Other jobscheduler?
  • How can we grow and shrink the cluster dynamically based on use?
  • How do users manage software environments on the workers?
  • How secure is the distributed scheduler?
  • Dask is resilient to worker failure, how about scheduler failure?
  • What happens if dask-workers are in two different data centers? Can wescale in an asymmetric way?
  • How do we handle multiple concurrent users and priorities?
  • How does this compare with Spark?

So for the rest of this post I’m going to answer these questions. As usual,few of answers will be of the form “Yes Dask can solve all of your problems.”These are open questions, not the questions that were easy to answer. We’llget into what’s possible today and how we might solve these problems in thefuture.

How do we integrate dask.distributed with SGE/LSF/Mesos/Other?

It’s not difficult to deploy dask.distributed at scale within an existingcluster using a tool like SGE/LSF/Mesos/Other. In many cases there is alreadya researcher within the institution doing this manually by runningdask-scheduler on some static node in the cluster and launching dask-workera few hundred times with their job scheduler and a small job script.

The goal now is how to formalize this process for the individual version ofSGE/LSF/Mesos/Other used within the institution while also developing andmaintaining a standard Pythonic interface so that all of these tools can bemaintained cheaply by Dask developers into the foreseeable future. In somecases Institution X is happy to pay for the development of a convenient “startdask on my job scheduler” tool, but they are less excited about paying tomaintain it forever.

We want Python users to be able to say something like the following:

from dask.distributed import Executor, SGECluster

c = SGECluster(nworkers=200, **options)
e = Executor(c)

… and have this same interface be standardized across different jobschedulers.

How can we grow and shrink the cluster dynamically based on use?

Alternatively, we could have a single dask.distributed deployment running 24/7that scales itself up and down dynamically based on current load. Again, thisis entirely possible today if you want to do it manually (you can add andremove workers on the fly) but we should add some signals to the scheduler likethe following:

  • “I’m under duress, please add workers”
  • “I’ve been idling for a while, please reclaim workers”

and connect these signals to a manager that talks to the job scheduler. Thisremoves an element of control from the users and places it in the hands of apolicy that IT can tune to play more nicely with their other services on thesame network.

How do users manage software environments on the workers?

Today Dask assumes that all users and workers share the exact same softwareenvironment. There are some small tools to send updated .py and .egg filesto the workers but that’s it.

Generally Dask trusts that the full software environment will be handled bysomething else. This might be a network file system (NFS) mount on traditionalcluster setups, or it might be handled by moving docker or conda environmentsaround by some other tool like knitfor YARN deployments or something more custom. For example Continuum sellsproprietary software thatdoes this.

Getting the standard software environment setup generally isn’t such a big dealfor institutions. They typically have some system in place to handle thisalready. Where things become interesting is when users want to usedrastically different environments from the system environment, like using Python2 vs Python 3 or installing a bleeding-edge scikit-learn version. They mayalso want to change the software environment many times in a single session.

The best solution I can think of here is to pass around fully downloaded condaenvironments using the dask.distributed network (it’s good at moving largebinary blobs throughout the network) and then teaching the dask-workers tobootstrap themselves within this environment. We should be able to teareverything down and restart things within a small number of seconds. Thisrequires some work; first to make relocatable conda binaries (which is usuallyfine but is not always fool-proof due to links) and then to help thedask-workers learn to bootstrap themselves.

Somewhat related, Hussain Sultan of Capital One recently contributed adask-submit command to run scripts on the cluster:http://distributed.readthedocs.io/en/latest/submitting-applications.html

How secure is the distributed scheduler?

Dask.distributed is incredibly insecure. It allows anyone with network accessto the scheduler to execute arbitrary code in an unprotected environment. Datais sent in the clear. Any malicious actor can both steal your secrets and thencripple your cluster.

This is entirely the norm however. Security is usually handled by otherservices that manage computational frameworks like Dask.

For example we might rely on Docker to isolate workers from destroying theirsurrounding environment and rely on network access controls to protect dataaccess.

Because Dask runs on Tornado, a serious networking library and web framework,there are some things we can do easily like enabling SSL, authentication, etc..However I hesitate to jump into providing “just a little bit of security”without going all the way for fear of providing a false sense of security. Inshort, I have no plans to work on this without a lot of encouragement. Eventhen I would strongly recommend that institutions couple Dask with toolsintended for security. I believe that is common practice for distributedcomputational systems generally.

Dask is resilient to worker failure, how about scheduler failure?

Workers can come and go. Clients can come and go. The state in the scheduleris currently irreplaceable and no attempt is made to back it up. There are afew things you could imagine here:

  1. Backup state and recent events to some persistent storage so that state canbe recovered in case of catastrophic loss
  2. Have a hot failover node that gets a copy of every action that thescheduler takes
  3. Have multiple peer schedulers operate simultaneously in a way that they canpick up slack from lost peers
  4. Have clients remember what they have submitted and resubmit when ascheduler comes back online

Currently option 4 is currently the most feasible and gets us most of the waythere. However options 2 or 3 would probably be necessary if Dask were to everrun as critical infrastructure in a giant institution. We’re not there yet.

As of recent work spurred on byStefan van der Walt at UC Berkeley/BIDS the scheduler can now die and come backand everyone will reconnect. The state for computations in flight is entirelylost but the computational infrastructure remains intact so that people canresubmit jobs without significant loss of service.

Dask has a bit of a harder time with this topic because it offers a persistentstateful interface. This problem is much easier for distributed databaseprojects that run ephemeral queries off of persistent storage, return theresults, and then clear out state.

What happens if dask-workers are in two different data centers? Can we scale in an asymmetric way?

The short answer is no. Other than number of cores and available RAM allworkers are considered equal to each other (except when the user explicitlyspecifiesotherwise).

However this problem and problems like it have come up a lot lately. Here are afew examples of similar cases:

  1. Multiple data centers geographically distributed around the country
  2. Multiple racks within a single data center
  3. Multiple workers that have GPUs that can move data between each other easily
  4. Multiple processes on a single machine

Having some notion of hierarchical worker group membership or inter-workerpreferred relationships is probably inevitable long term. As with alldistributed scheduling questions the hard part isn’t deciding that this isuseful, or even coming up with a sensible design, but rather figuring out howto make decisions on the sensible design that are foolproof and operate inconstant time. I don’t personally see a good approach here yet but expect oneto arise as more high priority use cases come in.

How do we handle multiple concurrent users and priorities?

There are several sub-questions here:

  • Can multiple users use Dask on my cluster at the same time?

Yes, either by spinning up separate scheduler/worker sets or by sharing the sameset.

  • If they’re sharing the same workers then won’t they clobber each other’sdata?

This is very unlikely. Dask is careful about naming tasks, so it’s veryunlikely that the two users will submit conflicting computations that compute todifferent values but occupy the same key in memory. However if they both submitcomputations that overlap somewhat then the scheduler will nicely avoidrecomputation. This can be very nice when you have many people doing slightlydifferent computations on the same hardware. This works in the same way thatGit works.

  • If they’re sharing the same workers then won’t they clobber each other’sresources?

Yes, this is definitely possible. If you’re concerned about this then youshould give everyone their own scheduler/workers (which is easy and standardpractice). There is not currently much user management built into Dask.

How does this compare with Spark?

At an institutional level Spark seems to primarily target ETL + Database-likecomputations. While Dask modules like Dask.bag and Dask.dataframe can happilyplay in this space this doesn’t seem to be the focus of recent conversations.

Recent conversations are almost entirely around supporting interactive customparallelism (lots of small tasks with complex dependencies between them) ratherthan the big Map->Filter->Groupby->Join abstractions you often find in adatabase or Spark. That’s not to say that these operations aren’t hugelyimportant; there is a lot of selection bias here. The people I talk to arepeople for whom Spark/Databases are clearly not an appropriate fit. They aretackling problems that are way more complex, more heterogeneous, and with abroader variety of users.

I usually describe this situation with an analogy comparing “Big data” systemsto human transportation mechanisms in a city. Here we go:

  • A Database is like a train: it goes between a set of well defined pointswith great efficiency, speed, and predictability. These are popular andprofitable routes that many people travel between (e.g. business analytics).You do have to get from home to the train station on your own (ETL), but onceyou’re in the database/train you’re quite comfortable.
  • Spark is like an automobile: it takes you door-to-door from your home toyour destination with a single tool. While this may not be as fast as the train forthe long-distance portion, it can be extremely convenient to do ETL, Databasework, and some machine learning all from the comfort of a single system.
  • Dask is like an all-terrain-vehicle: it takes you out of town on roughground that hasn’t been properly explored before. This is a good match forthe Python community, which typically does a lot of exploration into newapproaches. You can also drive your ATV around town and you’ll be just fine,but if you want to do thousands of SQL queries then you should probablyinvest in a proper database or in Spark.

Again, there is a lot of selection bias here, if what you want is a databasethen you should probably get a database. Dask is not a database.

This is also wildly over-simplifying things. Databases like Oracle have lotsof ETL and analytics tools, Spark is known to go off road, etc.. I obviouslyhave a bias towards Dask. You really should never trust an author of a projectto give a fair and unbiased view of the capabilities of the tools in thesurrounding landscape.

Conclusion

That’s a rough sketch of current conversations and open problems for “How Daskmight evolve to support institutional use cases.” It’s really quite surprisingjust how prevalent this story is among the full spectrum from universities tohedge funds.

The problems listed above are by no means halting adoption. I’m not listingthe 100 or so questions that are answered with “yes, that’s already supportedquite well”. Right now I’m seeing Dask being adopted by individuals and smallgroups within various institutions. Those individuals and small groups arepushing that interest up the stack. It’s still several months before any 1000+person organization adopts Dask as infrastructure, but the speed at whichmomentum is building is quite encouraging.

I’d also like to thank the several nameless people who exercise Dask on variousinfrastructures at various scales on interesting problems and have reportedserious bugs. These people don’t show up on the GitHub issue tracker but theirutility in flushing out bugs is invaluable.

As interest in Dask grows it’s interesting to see how it will evolve.Culturally Dask has managed to simultaneously cater to both the open sciencecrowd as well as the private-sector crowd. The project gets both financialsupport and open source contributions from each side. So far there hasn’t beenany conflict of interest (everyone is pushing in roughly the same direction)which has been a really fruitful experience for all involved I think.