This work is supported by Anaconda Inc
To increase transparency I’m trying to blog more often about the current workgoing on around Dask and related projects. Nothing here is ready forproduction. This blogpost is written in haste, so refined polish should not beexpected.
Over the last two weeks we’ve seen activity in the following areas:
Some advanced workloads want to directly manage and mutate state on workers. Atask-based framework like Dask can be forced into this kind of workload usinglong-running-tasks, but it’s an uncomfortable experience. To address thiswe’ve been adding an experimental Actors framework to Dask alongside thestandard task-scheduling system. This provides reduced latencies, removesscheduling overhead, and provides the ability to directly mutate state on aworker, but loses niceties like resilience and diagnostics.
The idea to adopt Actors was shamelessly stolen from the Ray Project :)
Work for Actors is happening in dask/distributed #2133.
self.n = 0
self.n += 1
counter = client.submit(Counter, actor=True).result()
>>> future = counter.increment()
Many Scikit-Learn-style estimators feature a partial_fit method that enablesincremental training on batches of data. This is particularly well suited forsystems like Dask array or Dask dataframe, that are built from many batches ofNumpy arrays or Pandas dataframes. It’s a nice fit because all of thecomputational algorithm work is already done in Scikit-Learn, Dask just has toadministratively move models around to data and call scikit-learn (or othermachine learning models that follow the fit/transform/predict/score API). Thisapproach provides a nice community interface between parallelism and machinelearning developers.
However, this training is inherently sequential because the model only trainson one batch of data at a time. We’re leaving a lot of processing power on thetable.
To address this we can combine incremental training with hyper-parameterselection and train several models on the same data at the same time. This isoften required anyway, and lets us be more efficient with our computation.
However there are many ways to do incremental training with hyper-parameterselection, and the right algorithm likely depends on the problem at hand.This is an active field of research and so it’s hard for a general project likeDask to pick and implement a single method that works well for everyone. Thereis probably a handful of methods that will be necessary with various options onthem.
To help experimentation here we’ve been experimenting with some lower-leveltooling that we think will be helpful in a variety of cases. This accepts apolicy from the user as a Python function that gets scores from recentevaluations, and asks for how much further to progress on each set ofhyper-parameters before checking in again. This allows us to model a fewcommon situations like random search with early stopping conditions, successivehalving, and variations of those easily without having to write any Dask code:
This work is done by Scott Sievert and myself
To improve the speed of training large models ScottSievert has been using Actors (mentioned above)to develop simple examples for parameter servers. These are helping toidentify and motivate performance and diagnostic improvements improvementswithin Dask itself:
These parameter servers manage the communication of models produced bydifferent workers, and leave the computation to the underlying deep learninglibrary. This is ongoing work.
We’ve started to orient some of the Dask-ML work around case studies. Ourfirst, written by Scott Sievert, uses theCriteo dataset for ads. It’s a good example of a combined dense/sparse datasetthat can be somewhat large (around 1TB). The first challenge we’re runninginto is preprocessing. These have lead to a few preprocessing improvements:
Some of these are also based off of improved dataframe handling features in theupcoming 0.20 release for Scikit-Learn.
Profiling concurrent code is hard. Traditional profilers like CProfile becomeconfused by passing control between all of the different coroutines. Thismeans that we haven’t done a very comprehensive job of profiling and tuning thedistributed scheduler and workers. Statistical profilers on the other handtend to do a bit better. We’ve taken the statistical profiler that we usuallyuse on Dask worker threads (available in the dashboard on the “Profile” tab)and have applied it to the central administrative threads running the Tornadoevent loop as well. This has highlighted a few issues that we weren’t able tospot before, and should hopefully result in reduced overhead in futurereleases.
There is a new release of Dask-Yarnand the underlying library for managing Yarn jobs,Skein. These include a number of bug-fixesand improved concurrency primitives for YARN applications. The new features aredocumented here, and wereimplemented in jcrist/skein #40.
This work was done by Jim Crist
The dask.distributed test suite has been suffering from intermittent failuresrecently. These are tests that fail very infrequently, and so are hard tocatch when writing them, but show up when future unrelated PRs run the testsuite on continuous integration and get failures. They add friction to thedevelopment process, but are expensive to track down (testing distributedsystems is hard).
We’re taking a bit of time this week to track these down. Progress here: