This post was written by Jim Crist. The original post lives athttp://jcrist.github.io/dask-sklearn-part-1.html(with better styling)
This is the first of a series of posts discussing some recent experimentscombining dask andscikit-learn. A small (and extremely alpha)library has been built up from these experiments, and can be foundhere.
Before we start, I would like to make the following caveats:
There are several ways of parallelizing algorithms in machine learning. Somealgorithms can be made to be data-parallel (either across features or acrosssamples). In this post we’ll look instead at model-parallelism (use same dataacross different models), and dive into a daskified implementation ofGridSearchCV.
Many machine learning algorithms have hyperparameters which can be tuned toimprove the performance of the resulting estimator. A gridsearchis one way of optimizing these parameters — it works by doing a parametersweep across a cartesian product of a subset of these parameters (the “grid”),and then choosing the best resulting estimator. Since this is fitting manyindependent estimators across the same set of data, it can be fairly easilyparallelized.
In scikit-learn, a grid search is performed using the GridSearchCV class, andcan (optionally) be automatically parallelized usingjoblib.
This is best illustrated with an example. First we’ll make an example datasetfor doing classification against:
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=10000,
n_features=500,
n_classes=2,
n_redundant=250,
random_state=42)
To solve this classification problem, we’ll create a pipeline of a PCA and aLogisticRegression:
from sklearn import linear_model, decomposition
from sklearn.pipeline import Pipeline
logistic = linear_model.LogisticRegression()
pca = decomposition.PCA()
pipe = Pipeline(steps=[('pca', pca),
('logistic', logistic)])
Both of these classes take several hyperparameters, we’ll do a grid-searchacross only a few of them:
#Parameters of pipelines can be set using ‘__’ separated parameter names:
grid = dict(pca__n_components=[50, 100, 250],
logistic__C=[1e-4, 1.0, 1e4],
logistic__penalty=['l1', 'l2'])
Finally, we can create an instance of GridSearchCV, and perform the gridsearch. The parameter n_jobs=-1 tells joblib to use as many processes as Ihave cores (8).
>>> from sklearn.grid_search import GridSearchCV
>>> estimator = GridSearchCV(pipe, grid, n_jobs=-1)
>>> %time estimator.fit(X, y)
CPU times: user 5.3 s, sys: 243 ms, total: 5.54 s
Wall time: 21.6 s
What happened here was:
The corresponding best score, parameters, and estimator can all be found asattributes on the resulting object:
>>> estimator.best_score_
0.89290000000000003
>>> estimator.best_params_
{'logistic__C': 0.0001, 'logistic__penalty': 'l2', 'pca__n_components': 50}
>>> estimator.best_estimator_
Pipeline(steps=[('pca', PCA(copy=True, n_components=50, whiten=False)), ('logistic', LogisticRegression(C=0.0001, class_weight=None, dual=False,
fit_intercept=True, intercept_scaling=1, max_iter=100,
multi_class='ovr', n_jobs=1, penalty='l2', random_state=None,
solver='liblinear', tol=0.0001, verbose=0, warm_start=False))])<div class=md_output>
{'logistic__C': 0.0001, 'logistic__penalty': 'l2', 'pca__n_components': 50}
Here we’ll repeat the same fit using dask-learn. I’ve tried to match thescikit-learn interface as much as possible, although not everything isimplemented. Here the only thing that really changes is the GridSearchCVimport. We don’t need the n_jobs keyword, as this will be parallelized acrossall cores by default.
>>> from dklearn.grid_search import GridSearchCV as DaskGridSearchCV
>>> destimator = DaskGridSearchCV(pipe, grid)
>>> %time destimator.fit(X, y)
CPU times: user 16.3 s, sys: 1.89 s, total: 18.2 s
Wall time: 5.63 s
As before, the best score, parameters, and estimator can all be found asattributes on the object. Here we’ll just show that they’re equivalent:
>>> destimator.best_score_ == estimator.best_score_
True
>>> destimator.best_params_ == estimator.best_params_
True
>>> destimator.best_estimator_
Pipeline(steps=[('pca', PCA(copy=True, n_components=50, whiten=False)), ('logistic', LogisticRegression(C=0.0001, class_weight=None, dual=False,
fit_intercept=True, intercept_scaling=1, max_iter=100,
multi_class='ovr', n_jobs=1, penalty='l2', random_state=None,
solver='liblinear', tol=0.0001, verbose=0, warm_start=False))])<div class=md_output>
{'logistic__C': 0.0001, 'logistic__penalty': 'l2', 'pca__n_components': 50}
If you look at the times above, you’ll note that the dask version was ~4Xfaster than the scikit-learn version. This is not because we have optimized anyof the pieces of the Pipeline, or that there’s a significant amount ofoverhead to joblib (on the contrary, joblib does some pretty amazing things,and I had to construct a contrived example to beat it this badly). The reasonis simply that the dask version is doing less work.
This maybe best explained in pseudocode. The scikit-learn version of the above(in serial) looks something like (pseudocode):
for X_train, X_test, y_train, y_test in cv:
for n in grid['pca__n_components']:
for C in grid['logistic__C']:
for penalty in grid['logistic__penalty']:
# Create and fit a PCA on the input data
pca = PCA(n_components=n).fit(X_train, y_train)
# Transform both the train and test data
X_train2 = pca.transform(X_train)
X_test2 = pca.transform(X_test)
# Create and fit a LogisticRegression on the transformed data
logistic = LogisticRegression(C=C, penalty=penalty)
logistic.fit(X_train2, y_train)
# Score the total pipeline
score = logistic.score(X_test2, y_test)
# Save the score and parameters
scores_and_params.append((score, n, C))
# Find the best set of parameters (for some definition of best)
find_best_parameters(scores)
This is looping through a cartesian product of the cross-validation sets andall the parameter combinations, and then creating and fitting a new estimatorfor each combination. While embarassingly parallel, this can also result inrepeated work, as earlier stages in the pipeline are refit multiple times onthe same parameter + data combinations.
In contrast, the dask version hashes all inputs (forming a sort of MerkleDAG), resulting in the intermediateresults being shared. Keeping with the pseudocode above, the dask version mightlook like:
for X_train, X_test, y_train, y_test in cv:
for n in grid['pca__n_components']:
# Create and fit a PCA on the input data
pca = PCA(n_components=n).fit(X_train, y_train)
# Transform both the train and test data
X_train2 = pca.transform(X_train)
X_test2 = pca.transform(X_test)
for C in grid['logistic__C']:
for penalty in grid['logistic__penalty']:
# Create and fit a LogisticRegression on the transformed data
logistic = LogisticRegression(C=C, penalty=penalty)
logistic.fit(X_train2, y_train)
# Score the total pipeline
score = logistic.score(X_test2, y_test)
# Save the score and parameters
scores_and_params.append((score, n, C, penalty))
# Find the best set of parameters (for some definition of best)
find_best_parameters(scores)
This can still be parallelized, but in a less straightforward manner - thegraph is a bit more complicated than just a simple map-reduce pattern.Thankfully the daskschedulers are wellequipped to handle arbitrary graph topologies. Below is a GIF showing how thedask scheduler (the threaded scheduler specifically) executed the grid searchperformed above. Each rectangle represents data, and each circle represents atask. Each is categorized by color:
Looking at the trace, a few things stand out:
The schedulers usedin dask are configurable. The default (used above) is the threaded scheduler,but we can just as easily swap it out for the distributed scheduler. Here I’vejust spun up two local workers to demonstrate, but this works equally wellacross multiple machines.
>>> from distributed import Executor
>>> # Create an Executor, and set it as the default scheduler
>>> exc = Executor('10.0.0.3:8786', set_as_default=True)
>>> exc
<Executor: scheduler="10.0.0.3:8786" processes=2 cores=8>
>>> %time destimator.fit(X, y)
CPU times: user 1.69 s, sys: 433 ms, total: 2.12 s
Wall time: 7.66 s
>>> %time destimator.fit(X, y)
CPU times: user 1.69 s, sys: 433 ms, total: 2.12 s
Wall time: 7.66 s
>>> (destimator.best_score_ == estimator.best_score_ and
... destimator.best_params_ == estimator.best_params_)
True
Note that this is slightly slower than the threaded execution, so it doesn’tmake sense for this workload, but for others it might.
I am not a machine learning expert. Is any of this useful? Do you havesuggestions for improvements (or better yet PRs for improvements :))? Pleasefeel free to reach out in the comments below, or ongithub.
This work is supported by Continuum Analytics and theXDATA program as part of the BlazeProject.