Submit New Event

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Submit News Feature

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Contribute a Blog

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Sign up for Newsletter

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Mar 28, 2017

Dask and Pandas and XGBoost

By

This work is supported by Continuum Analyticsthe XDATA Programand the Data Driven Discovery Initiative from the MooreFoundation

Summary

This post talks about distributing Pandas Dataframes with Dask and then handingthem over to distributed XGBoost for training.

More generally it discusses the value of launching multiple distributed systemsin the same shared-memory processes and smoothly handing data back and forthbetween them.

Introduction

XGBoost is a well-loved library for a popular class of machine learningalgorithms, gradient boosted trees. It is used widely in business and is oneof the most popular solutions in Kaggle competitions. For larger datasets orfaster training, XGBoost also comes with its own distributed computing systemthat lets it scale to multiple machines on a cluster. Fantastic. Distributedgradient boosted trees are in high demand.

However before we can use distributed XGBoost we need to do three things:

  1. Prepare and clean our possibly large data, probably with a lot of Pandas wrangling
  2. Set up XGBoost master and workers
  3. Hand data our cleaned data from a bunch of distributed Pandas dataframes toXGBoost workers across our cluster

This ends up being surprisingly easy. This blogpost gives a quick exampleusing Dask.dataframe to do distributed Pandas data wrangling, then using a newdask-xgboost package to setup anXGBoost cluster inside the Dask cluster and perform the handoff.

After this example we’ll talk about the general design and what this means forother distributed systems.

Example

We have a ten-node cluster with eight cores each (m4.2xlarges on EC2)

import dask
from dask.distributed import Client, progress

>>> client = Client('172.31.33.0:8786')
>>> client.restart()
<Client: scheduler='tcp://172.31.33.0:8786' processes=10 cores=80>

We load the Airlines dataset using dask.dataframe (just a bunch of Pandasdataframes spread across a cluster) and do a bit of preprocessing:

import dask.dataframe as dd

# Subset of the columns to use
cols = ['Year', 'Month', 'DayOfWeek', 'Distance',
'DepDelay', 'CRSDepTime', 'UniqueCarrier', 'Origin', 'Dest']

# Create the dataframe
df = dd.read_csv('s3://dask-data/airline-data/20*.csv', usecols=cols,
storage_options={'anon': True})

df = df.sample(frac=0.2) # XGBoost requires a bit of RAM, we need a larger cluster

is_delayed = (df.DepDelay.fillna(16) > 15) # column of labels
del df['DepDelay'] # Remove delay information from training dataframe

df['CRSDepTime'] = df['CRSDepTime'].clip(upper=2399)

df, is_delayed = dask.persist(df, is_delayed) # start work in the background

This loaded a few hundred pandas dataframes from CSV data on S3. We then hadto downsample because how we are going to use XGBoost in the future seems torequire a lot of RAM. I am not an XGBoost expert. Please forgive my ignorancehere. At the end we have two dataframes:

  • df: Data from which we will learn if flights are delayed
  • is_delayed: Whether or not those flights were delayed.

Data scientists familiar with Pandas will probably be familiar with the codeabove. Dask.dataframe is very similar to Pandas, but operates on a cluster.

>>> df.head()

Year Month DayOfWeek CRSDepTime UniqueCarrier Origin Dest Distance 182193 2000 1 2 800 WN LAX OAK 337 83424 2000 1 6 1650 DL SJC SLC 585 346781 2000 1 5 1140 AA ORD LAX 1745 375935 2000 1 2 1940 DL PHL ATL 665 309373 2000 1 4 1028 CO MCI IAH 643

>>> is_delayed.head()
182193 False
83424 False
346781 False
375935 False
309373 False
Name: DepDelay, dtype: bool

Categorize and One Hot Encode

XGBoost doesn’t want to work with text data like destination=”LAX”. Instead wecreate new indicator columns for each of the known airports and carriers. Thisexpands our data into many boolean columns. Fortunately Dask.dataframe hasconvenience functions for all of this baked in (thank you Pandas!)

>>> df2 = dd.get_dummies(df.categorize()).persist()

This expands our data out considerably, but makes it easier to train on.

>>> len(df2.columns)
685

Split and Train

Great, now we’re ready to split our distributed dataframes

data_train, data_test = df2.random_split([0.9, 0.1],
random_state=1234)
labels_train, labels_test = is_delayed.random_split([0.9, 0.1],
random_state=1234)

Start up a distributed XGBoost instance, and train on this data

%%time
import dask_xgboost as dxgb

params = {'objective': 'binary:logistic', 'nround': 1000,
'max_depth': 16, 'eta': 0.01, 'subsample': 0.5,
'min_child_weight': 1, 'tree_method': 'hist',
'grow_policy': 'lossguide'}

bst = dxgb.train(client, params, data_train, labels_train)

CPU times: user 355 ms, sys: 29.7 ms, total: 385 ms
Wall time: 54.5 s

Great, so we were able to train an XGBoost model on this data in about a minuteusing our ten machines. What we get back is just a plain XGBoost Boosterobject.

>>> bst
<xgboost.core.Booster at 0x7fa1c18c4c18>

We could use this on normal Pandas data locally

import xgboost as xgb
pandas_df = data_test.head()
dtest = xgb.DMatrix(pandas_df)

>>> bst.predict(dtest)
array([ 0.464578 , 0.46631625, 0.47434333, 0.47245741, 0.46194169], dtype=float32)

Of we can use dask-xgboost again to train on our distributed holdout data,getting back another Dask series.

>>> predictions = dxgb.predict(client, bst, data_test).persist()
>>> predictions
Dask Series Structure:
npartitions=93
None float32
None ...
...
None ...
None ...
Name: predictions, dtype: float32
Dask Name: _predict_part, 93 tasks

Evaluate

We can bring these predictions to the local process and use normal Scikit-learnoperations to evaluate the results.

>>> from sklearn.metrics import roc_auc_score, roc_curve
>>> print(roc_auc_score(labels_test.compute(),
... predictions.compute()))
0.654800768411

fpr, tpr, _ = roc_curve(labels_test.compute(), predictions.compute())
# Taken from
http://scikit-learn.org/stable/auto_examples/model_selection/plot_roc.html#sphx-glr-auto-examples-model-selection-plot-roc-py
plt.figure(figsize=(8, 8))
lw = 2
plt.plot(fpr, tpr, color='darkorange', lw=lw, label='ROC curve')
plt.plot([0, 1], [0, 1], color='navy', lw=lw, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver operating characteristic example')
plt.legend(loc="lower right")
plt.show()

We might want to play with our parameters above or try different data toimprove our solution. The point here isn’t that we predicted airline delayswell, it was that if you are a data scientist who knows Pandas and XGBoost,everything we did above seemed pretty familiar. There wasn’t a whole lot ofnew material in the example above. We’re using the same tools as before, justat a larger scale.

Analysis

OK, now that we’ve demonstrated that this works lets talk a bit about whatjust happened and what that means generally for cooperation between distributedservices.

What dask-xgboost does

The dask-xgboost project is prettysmall and pretty simple (200 TLOC). Given a Dask cluster of one central scheduler andseveral distributed workers it starts up an XGBoost scheduler in the sameprocess running the Dask scheduler and starts up an XGBoost worker within eachof the Dask workers. They share the same physical processes and memoryspaces. Dask was built to support this kind of situation, so this isrelatively easy.

Then we ask the Dask.dataframe to fully materialize in RAM and we ask where allof the constituent Pandas dataframes live. We tell each Dask worker to giveall of the Pandas dataframes that it has to its local XGBoost worker and thenjust let XGBoost do its thing. Dask doesn’t power XGBoost, it’s justsets it up, gives it data, and lets it do it’s work in the background.

People often ask what machine learning capabilities Dask provides, how theycompare with other distributed machine learning libraries like H2O or Spark’sMLLib. For gradient boosted trees the 200-line dask-xgboost package is theanswer. Dask has no need to make such an algorithm because XGBoost alreadyexists, works well and provides Dask users with a fully featured and efficientsolution.

Because both Dask and XGBoost can live in the same Python process they canshare bytes between each other without cost, can monitor each other, etc..These two distributed systems co-exist together in multiple processes in thesame way that NumPy and Pandas operate together within a single process.Sharing distributed processes with multiple systems can be really beneficial ifyou want to use multiple specialized services easily and avoid large monolithicframeworks.

Connecting to Other distributed systems

A while ago I wrotea similar blogpostabout hosting TensorFlow from Dask in exactly the same way that we’ve donehere. It was similarly easy to setup TensorFlow alongside Dask, feed it data,and let TensorFlow do its thing.

Generally speaking this “serve other libraries” approach is how Dask operateswhen possible. We’re only able to cover the breadth of functionality that wedo today because we lean heavily on the existing open source ecosystem.Dask.arrays use Numpy arrays, Dask.dataframes use Pandas, and now the answer togradient boosted trees with Dask is just to make it really really easy to usedistributed XGBoost. Ta da! We get a fully featured solution that ismaintained by other devoted developers, and the entire connection process wasdone over a weekend (see dmlc/xgboost#2032 for details).

Since this has come out we’ve had requests to support other distributed systemslike Elemental and to do general hand-offs to MPIcomputations. If we’re able to start both systems with the same set ofprocesses then all of this is pretty doable. Many of the challenges ofinter-system collaboration go away when you can hand numpy arrays between theworkers of one system to the workers of the other system within the sameprocesses.

Acknowledgements

Thanks to Tianqi Chen and OlivierGrisel for their help when building andtesting dask-xgboost. Thanksto Will Warner for his help in editing thispost.