Submit New Event

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Submit News Feature

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Contribute a Blog

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Sign up for Newsletter

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Aug 2, 2018

Dask Development Log

By

This work is supported by Anaconda Inc

To increase transparency I’m trying to blog more often about the current workgoing on around Dask and related projects. Nothing here is ready forproduction. This blogpost is written in haste, so refined polish should not beexpected.

Over the last two weeks we’ve seen activity in the following areas:

  1. An experimental Actor solution for stateful processing
  2. Machine learning experiments with hyper-parameter selection and parameterservers.
  3. Development of more preprocessing transformers
  4. Statistical profiling of the distributed scheduler’s internal event loopthread and internal optimizations
  5. A new release of dask-yarn
  6. A new narrative on dask-stories about modelling mobile networks
  7. Support for LSF clusters in dask-jobqueue
  8. Test suite cleanup for intermittent failures

Stateful processing with Actors

Some advanced workloads want to directly manage and mutate state on workers. Atask-based framework like Dask can be forced into this kind of workload usinglong-running-tasks, but it’s an uncomfortable experience. To address thiswe’ve been adding an experimental Actors framework to Dask alongside thestandard task-scheduling system. This provides reduced latencies, removesscheduling overhead, and provides the ability to directly mutate state on aworker, but loses niceties like resilience and diagnostics.

The idea to adopt Actors was shamelessly stolen from the Ray Project :)

Work for Actors is happening in dask/distributed #2133.

class Counter:
def __init__(self):
self.n = 0

def increment(self):
self.n += 1
return self.n

counter = client.submit(Counter, actor=True).result()

>>> future = counter.increment()
>>> future.result()
1

Machine learning experiments

Hyper parameter optimization on incrementally trained models

Many Scikit-Learn-style estimators feature a partial_fit method that enablesincremental training on batches of data. This is particularly well suited forsystems like Dask array or Dask dataframe, that are built from many batches ofNumpy arrays or Pandas dataframes. It’s a nice fit because all of thecomputational algorithm work is already done in Scikit-Learn, Dask just has toadministratively move models around to data and call scikit-learn (or othermachine learning models that follow the fit/transform/predict/score API). Thisapproach provides a nice community interface between parallelism and machinelearning developers.

However, this training is inherently sequential because the model only trainson one batch of data at a time. We’re leaving a lot of processing power on thetable.

To address this we can combine incremental training with hyper-parameterselection and train several models on the same data at the same time. This isoften required anyway, and lets us be more efficient with our computation.

However there are many ways to do incremental training with hyper-parameterselection, and the right algorithm likely depends on the problem at hand.This is an active field of research and so it’s hard for a general project likeDask to pick and implement a single method that works well for everyone. Thereis probably a handful of methods that will be necessary with various options onthem.

To help experimentation here we’ve been experimenting with some lower-leveltooling that we think will be helpful in a variety of cases. This accepts apolicy from the user as a Python function that gets scores from recentevaluations, and asks for how much further to progress on each set ofhyper-parameters before checking in again. This allows us to model a fewcommon situations like random search with early stopping conditions, successivehalving, and variations of those easily without having to write any Dask code:

This work is done by Scott Sievert and myself

Successive halving and random search

Parameter Servers

To improve the speed of training large models ScottSievert has been using Actors (mentioned above)to develop simple examples for parameter servers. These are helping toidentify and motivate performance and diagnostic improvements improvementswithin Dask itself:

These parameter servers manage the communication of models produced bydifferent workers, and leave the computation to the underlying deep learninglibrary. This is ongoing work.

Dataframe Preprocessing Transformers

We’ve started to orient some of the Dask-ML work around case studies. Ourfirst, written by Scott Sievert, uses theCriteo dataset for ads. It’s a good example of a combined dense/sparse datasetthat can be somewhat large (around 1TB). The first challenge we’re runninginto is preprocessing. These have lead to a few preprocessing improvements:

Some of these are also based off of improved dataframe handling features in theupcoming 0.20 release for Scikit-Learn.

This work is done byRoman Yurchak,James Bourbeau,Daniel Severo, andTom Augspurger.

Profiling the main thread

Profiling concurrent code is hard. Traditional profilers like CProfile becomeconfused by passing control between all of the different coroutines. Thismeans that we haven’t done a very comprehensive job of profiling and tuning thedistributed scheduler and workers. Statistical profilers on the other handtend to do a bit better. We’ve taken the statistical profiler that we usuallyuse on Dask worker threads (available in the dashboard on the “Profile” tab)and have applied it to the central administrative threads running the Tornadoevent loop as well. This has highlighted a few issues that we weren’t able tospot before, and should hopefully result in reduced overhead in futurereleases.

Profile of event loop thread

New release of Dask-Yarn

There is a new release of Dask-Yarnand the underlying library for managing Yarn jobs,Skein. These include a number of bug-fixesand improved concurrency primitives for YARN applications. The new features aredocumented here, and wereimplemented in jcrist/skein #40.

This work was done by Jim Crist

Support for LSF clusters in Dask-Jobqueue

Dask-jobqueue supports Daskuse on traditional HPC cluster managers like SGE, SLURM, PBS, and others.We’ve recently added support for LSF clusters

Work was done in dask/dask-jobqueue #78 by Ray Bell.

New Dask Story on mobile networks

The Dask Storiesrepository holds narrative about how people use Dask.Sameer Lalwanirecently added a story about using Dask tomodel mobile communication networks.It’s worth a read.

Test suite cleanup

The dask.distributed test suite has been suffering from intermittent failuresrecently. These are tests that fail very infrequently, and so are hard tocatch when writing them, but show up when future unrelated PRs run the testsuite on continuous integration and get failures. They add friction to thedevelopment process, but are expensive to track down (testing distributedsystems is hard).

We’re taking a bit of time this week to track these down. Progress here: