Submit New Event

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Submit News Feature

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Contribute a Blog

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Sign up for Newsletter

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Apr 20, 2016

Ad Hoc Distributed Random Forests

By

This work is supported by Continuum Analyticsand the XDATA Programas part of the Blaze Project

A screencast version of this post is available here:https://www.youtube.com/watch?v=FkPlEqB8AnE

TL;DR.

Dask.distributed lets you submit individual tasks to the cluster. We use thisability combined with Scikit Learn to train and run a distributed random foreston distributed tabular NYC Taxi data.

Our machine learning model does not perform well, but we do learn how toexecute ad-hoc computations easily.

Motivation

In the past few posts we analyzed data on a cluster with Dask collections:

  1. Dask.bag on JSON records
  2. Dask.dataframe on CSV data
  3. Dask.array on HDF5 data

Often our computations don’t fit neatly into the bag, dataframe, or arrayabstractions. In these cases we want the flexibility of normal code with forloops, but still with the computational power of a cluster. With thedask.distributed task interface, we achieve something close to this.

Application: Naive Distributed Random Forest Algorithm

As a motivating application we build a random forest algorithm from the groundup using the single-machine Scikit Learn library, and dask.distributed’sability to quickly submit individual tasks to run on the cluster. Ouralgorithm will look like the following:

  1. Pull data from some external source (S3) into several dataframes on thecluster
  2. For each dataframe, create and train one RandomForestClassifier
  3. Scatter single testing dataframe to all machines
  4. For each RandomForestClassifier predict output on test dataframe
  5. Aggregate independent predictions from each classifier together by amajority vote. To avoid bringing too much data to any one machine, performthis majority vote as a tree reduction.

Data: NYC Taxi 2015

As in our blogpost on distributeddataframeswe use the data on all NYC Taxi rides in 2015. This is around 20GB on disk and60GB in RAM.

We predict the number of passengers in each cab given the othernumeric columns like pickup and destination location, fare breakdown, distance,etc..

We do this first on a small bit of data on a single machine and then on theentire dataset on the cluster. Our cluster is composed of twelve m4.xlarges (4cores, 15GB RAM each).

Disclaimer and Spoiler Alert: I am not an expert in machine learning. Ouralgorithm will perform very poorly. If you’re excited about machinelearning you can stop reading here. However, if you’re interested in how tobuild distributed algorithms with Dask then you may want to read on,especially if you happen to know enough machine learning to improve upon mynaive solution.

API: submit, map, gather

We use a small number of dask.distributedfunctions to build ourcomputation:

futures = executor.scatter(data) # scatter data
future = executor.submit(function, *args, **kwargs) # submit single task
futures = executor.map(function, sequence) # submit many tasks
results = executor.gather(futures) # gather results
executor.replicate(futures, n=number_of_replications)

In particular, functions like executor.submit(function, *args) let us sendindividual functions out to our cluster thousands of times a second. Becausethese functions consume their own results we can create complex workflows thatstay entirely on the cluster and trust the distributed scheduler to move dataaround intelligently.

Load Pandas from S3

First we load data from Amazon S3. We use the s3.read_csv(..., collection=False)function to load 178 Pandas DataFrames on our cluster from CSV data on S3. Weget back a list of Future objects that refer to these remote dataframes. Theuse of collection=False gives us this list of futures rather than a singlecohesive Dask.dataframe object.

from distributed import Executor, s3
e = Executor('52.91.1.177:8786')

dfs = s3.read_csv('dask-data/nyc-taxi/2015',
parse_dates=['tpep_pickup_datetime',
'tpep_dropoff_datetime'],
collection=False)
dfs = e.compute(dfs)

Each of these is a lightweight Future pointing to a pandas.DataFrame on thecluster.

>>> dfs[:5]
[<Future: status: finished, type: DataFrame, key: finalize-a06c3dd25769f434978fa27d5a4cf24b>,
<Future: status: finished, type: DataFrame, key: finalize-7dcb27364a8701f45cb02d2fe034728a>,
<Future: status: finished, type: DataFrame, key: finalize-b0dfe075000bd59c3a90bfdf89a990da>,
<Future: status: finished, type: DataFrame, key: finalize-1c9bb25cefa1b892fac9b48c0aef7e04>,
<Future: status: finished, type: DataFrame, key: finalize-c8254256b09ae287badca3cf6d9e3142>]

If we’re willing to wait a bit then we can pull data from any future back toour local process using the .result() method. We don’t want to do this toomuch though, data transfer can be expensive and we can’t hold the entiredataset in the memory of a single machine. Here we just bring back one of thedataframes:

>>> df = dfs[0].result()
>>> df.head()

VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count trip_distance pickup_longitude pickup_latitude RateCodeID store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount 0 2 2015-01-15 19:05:39 2015-01-15 19:23:42 1 1.59 -73.993896 40.750111 1 N -73.974785 40.750618 1 12.0 1.0 0.5 3.25 0 0.3 17.05 1 1 2015-01-10 20:33:38 2015-01-10 20:53:28 1 3.30 -74.001648 40.724243 1 N -73.994415 40.759109 1 14.5 0.5 0.5 2.00 0 0.3 17.80 2 1 2015-01-10 20:33:38 2015-01-10 20:43:41 1 1.80 -73.963341 40.802788 1 N -73.951820 40.824413 2 9.5 0.5 0.5 0.00 0 0.3 10.80 3 1 2015-01-10 20:33:39 2015-01-10 20:35:31 1 0.50 -74.009087 40.713818 1 N -74.004326 40.719986 2 3.5 0.5 0.5 0.00 0 0.3 4.80 4 1 2015-01-10 20:33:39 2015-01-10 20:52:58 1 3.00 -73.971176 40.762428 1 N -74.004181 40.742653 2 15.0 0.5 0.5 0.00 0 0.3 16.30

Train on a single machine

To start lets go through the standard Scikit Learn fit/predict/score cycle withthis small bit of data on a single machine.

from sklearn.ensemble import RandomForestClassifier
from sklearn.cross_validation import train_test_split

df_train, df_test = train_test_split(df)

columns = ['trip_distance', 'pickup_longitude', 'pickup_latitude',
'dropoff_longitude', 'dropoff_latitude', 'payment_type',
'fare_amount', 'mta_tax', 'tip_amount', 'tolls_amount']

est = RandomForestClassifier(n_estimators=4)
est.fit(df_train[columns], df_train.passenger_count)

This builds a RandomForestClassifer with four decision trees and then trainsit against the numeric columns in the data, trying to predict thepassenger_count column. It takes around 10 seconds to train on a singlecore. We now see how well we do on the holdout testing data:

>>> est.score(df_test[columns], df_test.passenger_count)
0.65808188654721012

This 65% accuracy is actually pretty poor. About 70% of the rides in NYC havea single passenger, so the model of “always guess one” would out-perform ourfancy random forest.

>>> from sklearn.metrics import accuracy_score
>>> import numpy as np
>>> accuracy_score(df_test.passenger_count,
... np.ones_like(df_test.passenger_count))
0.70669390028780987

This is where my ignorance in machine learning reallykills us. There is likely a simple way to improve this. However, because I’mmore interested in showing how to build distributed computations with Dask thanin actually doing machine learning I’m going to go ahead with this naiveapproach. Spoiler alert: we’re going to do a lot of computation and still notbeat the “always guess one” strategy.

Fit across the cluster with executor.map

First we build a function that does just what we did before, builds a randomforest and then trains it on a dataframe.

def fit(df):
est = RandomForestClassifier(n_estimators=4)
est.fit(df[columns], df.passenger_count)
return est

Second we call this function on all of our training dataframes on the clusterusing the standard e.map(function, sequence) function. This sends out manysmall tasks for the cluster to run. We use all but the last dataframe fortraining data and hold out the last dataframe for testing. There are moreprincipled ways to do this, but again we’re going to charge ahead here.

train = dfs[:-1]
test = dfs[-1]

estimators = e.map(fit, train)

This takes around two minutes to train on all of the 177 dataframes and now wehave 177 independent estimators, each capable of guessing how many passengers aparticular ride had. There is relatively little overhead in this computation.

Predict on testing data

Recall that we kept separate a future, test, that points to a Pandas dataframe onthe cluster that was not used to train any of our 177 estimators. We’re goingto replicate this dataframe across all workers on the cluster and then ask eachestimator to predict the number of passengers for each ride in this dataset.

e.replicate([test], n=48)

def predict(est, X):
return est.predict(X[columns])

predictions = [e.submit(predict, est, test) for est in estimators]

Here we used the executor.submit(function, *args, **kwrags) function in alist comprehension to individually launch many tasks. The scheduler determineswhen and where to run these tasks for optimal computation time and minimal datatransfer. As with all functions, this returns futures that we can use tocollect data if we want in the future.

Developers note: we explicitly replicate here in order to take advantage ofefficient tree-broadcasting algorithms. This is purely a performanceconsideration, everything would have worked fine without this, but the explicitbroadcast turns a 30s communication+computation into a 2scommunication+computation.

Aggregate predictions by majority vote

For each estimator we now have an independent prediction of the passengercounts for all of the rides in our test data. In other words for each ride wehave 177 different opinions on how many passengers were in the cab. Byaveraging these opinions together we hope to achieve a more accurate consensusopinion.

For example, consider the first four prediction arrays:

>>> a_few_predictions = e.gather(predictions[:4]) # remote futures -> local arrays
>>> a_few_predictions
[array([1, 2, 1, ..., 2, 2, 1]),
array([1, 1, 1, ..., 1, 1, 1]),
array([2, 1, 1, ..., 1, 1, 1]),
array([1, 1, 1, ..., 1, 1, 1])]

For the first ride/column we see that three of the four predictions are for asingle passenger while one prediction disagrees and is for two passengers. Wecreate a consensus opinion by taking the mode of the stacked arrays:

from scipy.stats import mode
import numpy as np

def mymode(*arrays):
array = np.stack(arrays, axis=0)
return mode(array)[0][0]

>>> mymode(*a_few_predictions)
array([1, 1, 1, ..., 1, 1, 1])

And so when we average these four prediction arrays together we see that themajority opinion of one passenger dominates for all of the six rides visiblehere.

Tree Reduction

We could call our mymode function on all of our predictions like this:

>>> mode_prediction = e.submit(mymode, *predictions) # this doesn't scale well

Unfortunately this would move all of our results to a single machine to computethe mode there. This might swamp that single machine.

Instead we batch our predictions into groups of size 10, average each group,and then repeat the process with the smaller set of predictions until we haveonly one left. This sort of multi-step reduction is called a tree reduction.We can write it up with a couple nested loops and executor.submit. This isonly an approximation of the mode, but it’s a much more scalable computation.This finishes in about 1.5 seconds.

from toolz import partition_all

while len(predictions) > 1:
predictions = [e.submit(mymode, *chunk)
for chunk in partition_all(10, predictions)]

result = e.gather(predictions)[0]

>>> result
array([1, 1, 1, ..., 1, 1, 1])

Final Score

Finally, after completing all of our work on our cluster we can see how wellour distributed random forest algorithm does.

>>> accuracy_score(result, test.result().passenger_count)
0.67061974451423045

Still worse than the naive “always guess one” strategy. This just goes to showthat, no matter how sophisticated your Big Data solution is, there is nosubstitute for common sense and a little bit of domain expertise.

What didn’t work

As always I’ll have a section like this that honestly says what doesn’t workwell and what I would have done with more time.

  • Clearly this would have benefited from more machine learning knowledge.What would have been a good approach for this problem?
  • I’ve been thinking a bit about memory management of replicated data on thecluster. In this exercise we specifically replicated out the test data.Everything would have worked fine without this step but it would have beenmuch slower as every worker gathered data from the single worker thatoriginally had the test dataframe. Replicating data is great until youstart filling up distributed RAM. It will be interesting to think ofpolicies about when to start cleaning up redundant data and when to keep itaround.
  • Several people from both open source users and Continuum customers haveasked about a general Dask library for machine learning, something akin toSpark’s MLlib. Ideally a future Dask.learn module would leverageScikit-Learn in the same way that Dask.dataframe leverages Pandas. It’snot clear how to cleanly break up and parallelize Scikit-Learn algorithms.

Conclusion

This blogpost gives a concrete example using basic task submission withexecutor.map and executor.submit to build a non-trivial computation. Thisapproach is straightforward and not restrictive. Personally this interfaceexcites me more than collections like Dask.dataframe; there is a lot of freedomin arbitrary task submission.

Links