This work is supported by Continuum Analyticsthe XDATA Programand the Data Driven Discovery Initiative from the MooreFoundation
This post describes two simple ways to use Dask to parallelize Scikit-Learnoperations either on a single computer or across a cluster.
For the impatient, these look like the following:
### Joblib
from joblib import parallel_backend
with parallel_backend('dask.distributed', scheduler_host='scheduler-address:8786'):
# your now-cluster-ified sklearn code here
### Dask-learn pipeline and GridSearchCV drop-in replacements
# from sklearn.grid_search import GridSearchCV
from dklearn.grid_search import GridSearchCV
# from sklearn.pipeline import Pipeline
from dklearn.pipeline import Pipeline
However, neither of these techniques are perfect. These are the easiest thingsto try, but not always the best solutions. This blogpost focuses onlow-hanging fruit.
Scikit-Learn already parallelizes across a multi-core CPU usingJoblib, a simple but powerful and maturelibrary that provides an extensible map operation. Here is a simple example ofusing Joblib on its own without sklearn:
# Sequential code
from time import sleep
def slowinc(x):
sleep(1) # take a bit of time to simulate real work
return x + 1
>>> [slowinc(i) for i in range(10)] # this takes 10 seconds
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# Parallel code
from joblib import Parallel, delayed
>>> Parallel(n_jobs=4)(delayed(slowinc)(i) for i in range(10)) # this takes 3 seconds
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Dask users will recognize the delayed function modifier. Dask stolethe delayed decorator from Joblib.
Many of Scikit-learn’s parallel algorithms use Joblib internally. If we canextend Joblib to clusters then we get some added parallelism fromjoblib-enabled Scikit-learn functions immediately.
Fortunately Joblib provides an interface for other parallel systems to step inand act as an execution engine. We can do this with the parallel_backendcontext manager to run with hundreds or thousands of cores in a nearby cluster:
import distributed.joblib
from joblib import parallel_backend
with parallel_backend('dask.distributed', scheduler_host='scheduler-address:8786'):
print(Parallel()(delayed(slowinc)(i) for i in list(range(100))))
The main value for Scikit-learn users here is that Scikit-learn already usesjoblib.Parallel within its code, so this trick works with the Scikit-learncode that you already have.
So we can use Joblib to parallelize normally on our multi-core processor:
estimator = GridSearchCV(n_jobs=4, ...) # use joblib on local multi-core processor
or we can use Joblib together with Dask.distributed to parallelize across amulti-node cluster:
with parallel_backend('dask.distributed', scheduler_host='scheduler-address:8786'):
estimator = GridSearchCV(...) # use joblib with Dask cluster
(There will be a more thorough example towards the end)
Joblib is used throughout many algorithms in Scikit-learn, but not all.Generally any operation that accepts an n_jobs= parameter is a possiblechoice.
From Dask’s perspective Joblib’s interface isn’t ideal. For example it willalways collect intermediate results back to the main process, rather thanleaving them on the cluster until necessary. For computationally intenseoperations this is fine but does add some unnecessary communication overhead.Also Joblib doesn’t allow for operations more complex than a parallel map, sothe range of algorithms that this can parallelize is somewhat limited.
Still though, given the wide use of Joblib-accelerated workflows (particularlywithin Scikit-learn) this is a simple thing to try if you have a cluster nearbywith a possible large payoff.
In July 2016, Jim Crist built and wroteabout a small project,dask-learn. This project was acollaboration with SKLearn developers and an attempt to see which parts ofScikit-learn were trivially and usefully parallelizable. By far the mostproductive thing to come out of this work were Dask variants of Scikit-learn’sPipeline, GridsearchCV, and RandomSearchCV objects that better handle nestedparallelism. Jim observed significant speedups over SKLearn code by usingthese drop-in replacements.
So if you replace the following imports you may get both better single-threadedperformance and the ability to scale out to a cluster:
# from sklearn.grid_search import GridSearchCV
from dklearn.grid_search import GridSearchCV
# from sklearn.pipeline import Pipeline
from dklearn.pipeline import Pipeline
Here is a simple example from Jim’s more in-depth blogpost:
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=10000,
n_features=500,
n_classes=2,
n_redundant=250,
random_state=42)
from sklearn import linear_model, decomposition
from sklearn.pipeline import Pipeline
from dklearn.pipeline import Pipeline
logistic = linear_model.LogisticRegression()
pca = decomposition.PCA()
pipe = Pipeline(steps=[('pca', pca),
('logistic', logistic)])
#Parameters of pipelines can be set using ‘__’ separated parameter names:
grid = dict(pca__n_components=[50, 100, 150, 250],
logistic__C=[1e-4, 1.0, 10, 1e4],
logistic__penalty=['l1', 'l2'])
# from sklearn.grid_search import GridSearchCV
from dklearn.grid_search import GridSearchCV
estimator = GridSearchCV(pipe, grid)
estimator.fit(X, y)
SKLearn performs this computation in around 40 seconds while the dask-learndrop-in replacements take around 10 seconds. Also, if you add the followinglines to connect to a runningcluster the wholething scales out:
from dask.distributed import Client
c = Client('scheduler-address:8786')
Here is a live Bokeh plot of thecomputation on a tiny eight process “cluster” running on my own laptop. I’musing processes here to highlight the costs of communication between processes(red). It’s actually about 30% faster to run this computation within the samesingle process.
This post showed a couple of simple mechanisms for scikit-learn users toaccelerate their existing workflows with Dask. These aren’t particularlysophisticated, nor are they performance-optimal, but they are easy tounderstand and easy to try out. In a future blogpost I plan to cover morecomplex ways in which Dask can accelerate sophisticated machine learningworkflows.
As always, I include a brief section on what went wrong or what we could havedone better with more time.