This work is supported by Continuum Analyticsand the XDATA Programas part of the Blaze Project
A screencast version of this post is available here:https://www.youtube.com/watch?v=FkPlEqB8AnE
Dask.distributed lets you submit individual tasks to the cluster. We use thisability combined with Scikit Learn to train and run a distributed random foreston distributed tabular NYC Taxi data.
Our machine learning model does not perform well, but we do learn how toexecute ad-hoc computations easily.
In the past few posts we analyzed data on a cluster with Dask collections:
Often our computations don’t fit neatly into the bag, dataframe, or arrayabstractions. In these cases we want the flexibility of normal code with forloops, but still with the computational power of a cluster. With thedask.distributed task interface, we achieve something close to this.
As a motivating application we build a random forest algorithm from the groundup using the single-machine Scikit Learn library, and dask.distributed’sability to quickly submit individual tasks to run on the cluster. Ouralgorithm will look like the following:
As in our blogpost on distributeddataframeswe use the data on all NYC Taxi rides in 2015. This is around 20GB on disk and60GB in RAM.
We predict the number of passengers in each cab given the othernumeric columns like pickup and destination location, fare breakdown, distance,etc..
We do this first on a small bit of data on a single machine and then on theentire dataset on the cluster. Our cluster is composed of twelve m4.xlarges (4cores, 15GB RAM each).
Disclaimer and Spoiler Alert: I am not an expert in machine learning. Ouralgorithm will perform very poorly. If you’re excited about machinelearning you can stop reading here. However, if you’re interested in how tobuild distributed algorithms with Dask then you may want to read on,especially if you happen to know enough machine learning to improve upon mynaive solution.
We use a small number of dask.distributedfunctions to build ourcomputation:
futures = executor.scatter(data) # scatter data
future = executor.submit(function, *args, **kwargs) # submit single task
futures = executor.map(function, sequence) # submit many tasks
results = executor.gather(futures) # gather results
executor.replicate(futures, n=number_of_replications)
In particular, functions like executor.submit(function, *args) let us sendindividual functions out to our cluster thousands of times a second. Becausethese functions consume their own results we can create complex workflows thatstay entirely on the cluster and trust the distributed scheduler to move dataaround intelligently.
First we load data from Amazon S3. We use the s3.read_csv(..., collection=False)function to load 178 Pandas DataFrames on our cluster from CSV data on S3. Weget back a list of Future objects that refer to these remote dataframes. Theuse of collection=False gives us this list of futures rather than a singlecohesive Dask.dataframe object.
from distributed import Executor, s3
e = Executor('52.91.1.177:8786')
dfs = s3.read_csv('dask-data/nyc-taxi/2015',
parse_dates=['tpep_pickup_datetime',
'tpep_dropoff_datetime'],
collection=False)
dfs = e.compute(dfs)
Each of these is a lightweight Future pointing to a pandas.DataFrame on thecluster.
>>> dfs[:5]
[<Future: status: finished, type: DataFrame, key: finalize-a06c3dd25769f434978fa27d5a4cf24b>,
<Future: status: finished, type: DataFrame, key: finalize-7dcb27364a8701f45cb02d2fe034728a>,
<Future: status: finished, type: DataFrame, key: finalize-b0dfe075000bd59c3a90bfdf89a990da>,
<Future: status: finished, type: DataFrame, key: finalize-1c9bb25cefa1b892fac9b48c0aef7e04>,
<Future: status: finished, type: DataFrame, key: finalize-c8254256b09ae287badca3cf6d9e3142>]
If we’re willing to wait a bit then we can pull data from any future back toour local process using the .result() method. We don’t want to do this toomuch though, data transfer can be expensive and we can’t hold the entiredataset in the memory of a single machine. Here we just bring back one of thedataframes:
>>> df = dfs[0].result()
>>> df.head()
VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count trip_distance pickup_longitude pickup_latitude RateCodeID store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount 0 2 2015-01-15 19:05:39 2015-01-15 19:23:42 1 1.59 -73.993896 40.750111 1 N -73.974785 40.750618 1 12.0 1.0 0.5 3.25 0 0.3 17.05 1 1 2015-01-10 20:33:38 2015-01-10 20:53:28 1 3.30 -74.001648 40.724243 1 N -73.994415 40.759109 1 14.5 0.5 0.5 2.00 0 0.3 17.80 2 1 2015-01-10 20:33:38 2015-01-10 20:43:41 1 1.80 -73.963341 40.802788 1 N -73.951820 40.824413 2 9.5 0.5 0.5 0.00 0 0.3 10.80 3 1 2015-01-10 20:33:39 2015-01-10 20:35:31 1 0.50 -74.009087 40.713818 1 N -74.004326 40.719986 2 3.5 0.5 0.5 0.00 0 0.3 4.80 4 1 2015-01-10 20:33:39 2015-01-10 20:52:58 1 3.00 -73.971176 40.762428 1 N -74.004181 40.742653 2 15.0 0.5 0.5 0.00 0 0.3 16.30
To start lets go through the standard Scikit Learn fit/predict/score cycle withthis small bit of data on a single machine.
from sklearn.ensemble import RandomForestClassifier
from sklearn.cross_validation import train_test_split
df_train, df_test = train_test_split(df)
columns = ['trip_distance', 'pickup_longitude', 'pickup_latitude',
'dropoff_longitude', 'dropoff_latitude', 'payment_type',
'fare_amount', 'mta_tax', 'tip_amount', 'tolls_amount']
est = RandomForestClassifier(n_estimators=4)
est.fit(df_train[columns], df_train.passenger_count)
This builds a RandomForestClassifer with four decision trees and then trainsit against the numeric columns in the data, trying to predict thepassenger_count column. It takes around 10 seconds to train on a singlecore. We now see how well we do on the holdout testing data:
>>> est.score(df_test[columns], df_test.passenger_count)
0.65808188654721012
This 65% accuracy is actually pretty poor. About 70% of the rides in NYC havea single passenger, so the model of “always guess one” would out-perform ourfancy random forest.
>>> from sklearn.metrics import accuracy_score
>>> import numpy as np
>>> accuracy_score(df_test.passenger_count,
... np.ones_like(df_test.passenger_count))
0.70669390028780987
This is where my ignorance in machine learning reallykills us. There is likely a simple way to improve this. However, because I’mmore interested in showing how to build distributed computations with Dask thanin actually doing machine learning I’m going to go ahead with this naiveapproach. Spoiler alert: we’re going to do a lot of computation and still notbeat the “always guess one” strategy.
First we build a function that does just what we did before, builds a randomforest and then trains it on a dataframe.
def fit(df):
est = RandomForestClassifier(n_estimators=4)
est.fit(df[columns], df.passenger_count)
return est
Second we call this function on all of our training dataframes on the clusterusing the standard e.map(function, sequence) function. This sends out manysmall tasks for the cluster to run. We use all but the last dataframe fortraining data and hold out the last dataframe for testing. There are moreprincipled ways to do this, but again we’re going to charge ahead here.
train = dfs[:-1]
test = dfs[-1]
estimators = e.map(fit, train)
This takes around two minutes to train on all of the 177 dataframes and now wehave 177 independent estimators, each capable of guessing how many passengers aparticular ride had. There is relatively little overhead in this computation.
Recall that we kept separate a future, test, that points to a Pandas dataframe onthe cluster that was not used to train any of our 177 estimators. We’re goingto replicate this dataframe across all workers on the cluster and then ask eachestimator to predict the number of passengers for each ride in this dataset.
e.replicate([test], n=48)
def predict(est, X):
return est.predict(X[columns])
predictions = [e.submit(predict, est, test) for est in estimators]
Here we used the executor.submit(function, *args, **kwrags) function in alist comprehension to individually launch many tasks. The scheduler determineswhen and where to run these tasks for optimal computation time and minimal datatransfer. As with all functions, this returns futures that we can use tocollect data if we want in the future.
Developers note: we explicitly replicate here in order to take advantage ofefficient tree-broadcasting algorithms. This is purely a performanceconsideration, everything would have worked fine without this, but the explicitbroadcast turns a 30s communication+computation into a 2scommunication+computation.
For each estimator we now have an independent prediction of the passengercounts for all of the rides in our test data. In other words for each ride wehave 177 different opinions on how many passengers were in the cab. Byaveraging these opinions together we hope to achieve a more accurate consensusopinion.
For example, consider the first four prediction arrays:
>>> a_few_predictions = e.gather(predictions[:4]) # remote futures -> local arrays
>>> a_few_predictions
[array([1, 2, 1, ..., 2, 2, 1]),
array([1, 1, 1, ..., 1, 1, 1]),
array([2, 1, 1, ..., 1, 1, 1]),
array([1, 1, 1, ..., 1, 1, 1])]
For the first ride/column we see that three of the four predictions are for asingle passenger while one prediction disagrees and is for two passengers. Wecreate a consensus opinion by taking the mode of the stacked arrays:
from scipy.stats import mode
import numpy as np
def mymode(*arrays):
array = np.stack(arrays, axis=0)
return mode(array)[0][0]
>>> mymode(*a_few_predictions)
array([1, 1, 1, ..., 1, 1, 1])
And so when we average these four prediction arrays together we see that themajority opinion of one passenger dominates for all of the six rides visiblehere.
We could call our mymode function on all of our predictions like this:
>>> mode_prediction = e.submit(mymode, *predictions) # this doesn't scale well
Unfortunately this would move all of our results to a single machine to computethe mode there. This might swamp that single machine.
Instead we batch our predictions into groups of size 10, average each group,and then repeat the process with the smaller set of predictions until we haveonly one left. This sort of multi-step reduction is called a tree reduction.We can write it up with a couple nested loops and executor.submit. This isonly an approximation of the mode, but it’s a much more scalable computation.This finishes in about 1.5 seconds.
from toolz import partition_all
while len(predictions) > 1:
predictions = [e.submit(mymode, *chunk)
for chunk in partition_all(10, predictions)]
result = e.gather(predictions)[0]
>>> result
array([1, 1, 1, ..., 1, 1, 1])
Finally, after completing all of our work on our cluster we can see how wellour distributed random forest algorithm does.
>>> accuracy_score(result, test.result().passenger_count)
0.67061974451423045
Still worse than the naive “always guess one” strategy. This just goes to showthat, no matter how sophisticated your Big Data solution is, there is nosubstitute for common sense and a little bit of domain expertise.
As always I’ll have a section like this that honestly says what doesn’t workwell and what I would have done with more time.
This blogpost gives a concrete example using basic task submission withexecutor.map and executor.submit to build a non-trivial computation. Thisapproach is straightforward and not restrictive. Personally this interfaceexcites me more than collections like Dask.dataframe; there is a lot of freedomin arbitrary task submission.