Submit New Event

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Submit News Feature

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Contribute a Blog

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Sign up for Newsletter

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Feb 7, 2017

Two Easy Ways to Use Scikit Learn and Dask

By

This work is supported by Continuum Analyticsthe XDATA Programand the Data Driven Discovery Initiative from the MooreFoundation

Summary

This post describes two simple ways to use Dask to parallelize Scikit-Learnoperations either on a single computer or across a cluster.

  1. Use the Dask Joblib backend
  2. Use the dklearn projects drop-in replacements for Pipeline,GridSearchCV, and RandomSearchCV

For the impatient, these look like the following:

### Joblib

from joblib import parallel_backend
with parallel_backend('dask.distributed', scheduler_host='scheduler-address:8786'):
# your now-cluster-ified sklearn code here


### Dask-learn pipeline and GridSearchCV drop-in replacements

# from sklearn.grid_search import GridSearchCV
from dklearn.grid_search import GridSearchCV
# from sklearn.pipeline import Pipeline
from dklearn.pipeline import Pipeline

However, neither of these techniques are perfect. These are the easiest thingsto try, but not always the best solutions. This blogpost focuses onlow-hanging fruit.

Joblib

Scikit-Learn already parallelizes across a multi-core CPU usingJoblib, a simple but powerful and maturelibrary that provides an extensible map operation. Here is a simple example ofusing Joblib on its own without sklearn:

# Sequential code
from time import sleep
def slowinc(x):
sleep(1) # take a bit of time to simulate real work
return x + 1

>>> [slowinc(i) for i in range(10)] # this takes 10 seconds
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# Parallel code
from joblib import Parallel, delayed
>>> Parallel(n_jobs=4)(delayed(slowinc)(i) for i in range(10)) # this takes 3 seconds
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Dask users will recognize the delayed function modifier. Dask stolethe delayed decorator from Joblib.

Many of Scikit-learn’s parallel algorithms use Joblib internally. If we canextend Joblib to clusters then we get some added parallelism fromjoblib-enabled Scikit-learn functions immediately.

Distributed Joblib

Fortunately Joblib provides an interface for other parallel systems to step inand act as an execution engine. We can do this with the parallel_backendcontext manager to run with hundreds or thousands of cores in a nearby cluster:

import distributed.joblib
from joblib import parallel_backend

with parallel_backend('dask.distributed', scheduler_host='scheduler-address:8786'):
print(Parallel()(delayed(slowinc)(i) for i in list(range(100))))

The main value for Scikit-learn users here is that Scikit-learn already usesjoblib.Parallel within its code, so this trick works with the Scikit-learncode that you already have.

So we can use Joblib to parallelize normally on our multi-core processor:

estimator = GridSearchCV(n_jobs=4, ...) # use joblib on local multi-core processor

or we can use Joblib together with Dask.distributed to parallelize across amulti-node cluster:

with parallel_backend('dask.distributed', scheduler_host='scheduler-address:8786'):
estimator = GridSearchCV(...) # use joblib with Dask cluster

(There will be a more thorough example towards the end)

Limitations

Joblib is used throughout many algorithms in Scikit-learn, but not all.Generally any operation that accepts an n_jobs= parameter is a possiblechoice.

From Dask’s perspective Joblib’s interface isn’t ideal. For example it willalways collect intermediate results back to the main process, rather thanleaving them on the cluster until necessary. For computationally intenseoperations this is fine but does add some unnecessary communication overhead.Also Joblib doesn’t allow for operations more complex than a parallel map, sothe range of algorithms that this can parallelize is somewhat limited.

Still though, given the wide use of Joblib-accelerated workflows (particularlywithin Scikit-learn) this is a simple thing to try if you have a cluster nearbywith a possible large payoff.

Dask-learn Pipeline and Gridsearch

In July 2016, Jim Crist built and wroteabout a small project,dask-learn. This project was acollaboration with SKLearn developers and an attempt to see which parts ofScikit-learn were trivially and usefully parallelizable. By far the mostproductive thing to come out of this work were Dask variants of Scikit-learn’sPipeline, GridsearchCV, and RandomSearchCV objects that better handle nestedparallelism. Jim observed significant speedups over SKLearn code by usingthese drop-in replacements.

So if you replace the following imports you may get both better single-threadedperformance and the ability to scale out to a cluster:

# from sklearn.grid_search import GridSearchCV
from dklearn.grid_search import GridSearchCV
# from sklearn.pipeline import Pipeline
from dklearn.pipeline import Pipeline

Here is a simple example from Jim’s more in-depth blogpost:

from sklearn.datasets import make_classification

X, y = make_classification(n_samples=10000,
n_features=500,
n_classes=2,
n_redundant=250,
random_state=42)

from sklearn import linear_model, decomposition
from sklearn.pipeline import Pipeline
from dklearn.pipeline import Pipeline

logistic = linear_model.LogisticRegression()
pca = decomposition.PCA()
pipe = Pipeline(steps=[('pca', pca),
('logistic', logistic)])


#Parameters of pipelines can be set using ‘__’ separated parameter names:
grid = dict(pca__n_components=[50, 100, 150, 250],
logistic__C=[1e-4, 1.0, 10, 1e4],
logistic__penalty=['l1', 'l2'])

# from sklearn.grid_search import GridSearchCV
from dklearn.grid_search import GridSearchCV

estimator = GridSearchCV(pipe, grid)

estimator.fit(X, y)

SKLearn performs this computation in around 40 seconds while the dask-learndrop-in replacements take around 10 seconds. Also, if you add the followinglines to connect to a runningcluster the wholething scales out:

from dask.distributed import Client
c = Client('scheduler-address:8786')

Here is a live Bokeh plot of thecomputation on a tiny eight process “cluster” running on my own laptop. I’musing processes here to highlight the costs of communication between processes(red). It’s actually about 30% faster to run this computation within the samesingle process.

Conclusion

This post showed a couple of simple mechanisms for scikit-learn users toaccelerate their existing workflows with Dask. These aren’t particularlysophisticated, nor are they performance-optimal, but they are easy tounderstand and easy to try out. In a future blogpost I plan to cover morecomplex ways in which Dask can accelerate sophisticated machine learningworkflows.

What we could have done better

As always, I include a brief section on what went wrong or what we could havedone better with more time.

  • See the bottom of Jim’s postfor a more thorough explanation of “what we could have done better” fordask-learn’s pipeline and gridsearch
  • Joblib + Dask.distributed interaction is convenient, but leaves someperformance on the table. It’s not clear how Dask can help the sklearncodebase without being too invasive.
  • It would have been nice to spin up an actual cluster on parallel hardwarefor this post. I wrote this quickly (in a few hours) so decided to skipthis. If anyone wants to write a follow-on experiment I would be happyto publish it.