Submit New Event

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Submit News Feature

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Contribute a Blog

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Sign up for Newsletter

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Jan 12, 2017

Distributed Pandas on a Cluster with Dask DataFrames

By

This work is supported by Continuum Analyticsthe XDATA Programand the Data Driven Discovery Initiative from the MooreFoundation

Summary

Dask Dataframe extends the popular Pandas library to operate on big data-setson a distributed cluster. We show its capabilities by running through commondataframe operations on a common dataset. We break up these computations intothe following sections:

  1. Introduction: Pandas is intuitive and fast, but needs Dask to scale
  2. Read CSV and Basic operations
  3. Read CSV
  4. Basic Aggregations and Groupbys
  5. Joins and Correlations
  6. Shuffles and Time Series
  7. Parquet I/O
  8. Final thoughts
  9. What we could have done better

Accompanying Plots

Throughout this post we accompany computational examples with profiles ofexactly what task ran where on our cluster and when. These profiles areinteractive Bokeh plots that include every taskthat every worker in our cluster runs over time. For example the followingcomputation read_csv computation produces the following profile:

>>> df = dd.read_csv('s3://dask-data/nyc-taxi/2015/*.csv')

If you are reading this through a syndicated website like planet.python.org orthrough an RSS reader then these plots will not show up. You may want to visit/2017/01/12/dask-dataframesdirectly.

Dask.dataframe breaks up reading this data into many small tasks ofdifferent types. For example reading bytes and parsing those bytes intopandas dataframes. Each rectangle corresponds to one task. The y-axisenumerates each of the worker processes. We have 64 processes spread over8 machines so there are 64 rows. You can hover over any rectangle to get moreinformation about that task. You can also use the tools in the upper rightto zoom around and focus on different regions in the computation. In thiscomputation we can see that workers interleave reading bytes from S3 (lightgreen) and parsing bytes to dataframes (dark green). The entire computationtook about a minute and most of the workers were busy the entire time (littlewhite space). Inter-worker communication is always depicted in red (which isabsent in this relatively straightforward computation.)

Introduction

Pandas provides an intuitive, powerful, and fast data analysis experience ontabular data. However, because Pandas uses only one thread of execution andrequires all data to be in memory at once, it doesn’t scale well to datasetsmuch beyond the gigabyte scale. That component is missing. Generally peoplemove to Spark DataFrames on HDFS or a proper relational database to resolvethis scaling issue. Dask is a Python library for parallel and distributedcomputing that aims to fill this need for parallelism among the PyData projects(NumPy, Pandas, Scikit-Learn, etc.). Dask dataframes combine Dask and Pandasto deliver a faithful “big data” version of Pandas operating in parallel over acluster.

I’ve written about this topicbefore.This blogpost is newer and will focus on performance and newer features likefast shuffles and the Parquet format.

CSV Data and Basic Operations

I have an eight node cluster on EC2 of m4.2xlarges (eight cores, 30GB RAM each).Dask is running on each node with one process per core.

We have the 2015 Yellow Cab NYC Taxidata as 12 CSVfiles on S3. We look at that data briefly withs3fs

>>> import s3fs
>>> s3 = S3FileSystem()
>>> s3.ls('dask-data/nyc-taxi/2015/')
['dask-data/nyc-taxi/2015/yellow_tripdata_2015-01.csv',
'dask-data/nyc-taxi/2015/yellow_tripdata_2015-02.csv',
'dask-data/nyc-taxi/2015/yellow_tripdata_2015-03.csv',
'dask-data/nyc-taxi/2015/yellow_tripdata_2015-04.csv',
'dask-data/nyc-taxi/2015/yellow_tripdata_2015-05.csv',
'dask-data/nyc-taxi/2015/yellow_tripdata_2015-06.csv',
'dask-data/nyc-taxi/2015/yellow_tripdata_2015-07.csv',
'dask-data/nyc-taxi/2015/yellow_tripdata_2015-08.csv',
'dask-data/nyc-taxi/2015/yellow_tripdata_2015-09.csv',
'dask-data/nyc-taxi/2015/yellow_tripdata_2015-10.csv',
'dask-data/nyc-taxi/2015/yellow_tripdata_2015-11.csv',
'dask-data/nyc-taxi/2015/yellow_tripdata_2015-12.csv']

This data is too large to fit into Pandas on a single computer. However, itcan fit in memory if we break it up into many small pieces and load thesepieces onto different computers across a cluster.

We connect a client to our Dask cluster, composed of one centralizeddask-scheduler process and several dask-worker processes running on each of themachines in our cluster.

from dask.distributed import Client
client = Client('scheduler-address:8786')

And we load our CSV data using dask.dataframe which looks and feels justlike Pandas, even though it’s actually coordinating hundreds of small Pandasdataframes. This takes about a minute to load and parse.

import dask.dataframe as dd

df = dd.read_csv('s3://dask-data/nyc-taxi/2015/*.csv',
parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
storage_options={'anon': True})
df = client.persist(df)

This cuts up our 12 CSV files on S3 into a few hundred blocks of bytes, each64MB large. On each of these 64MB blocks we then call pandas.read_csv tocreate a few hundred Pandas dataframes across our cluster, one for each blockof bytes. Our single Dask Dataframe object, df, coordinates all of thosePandas dataframes. Because we’re just using Pandas calls it’s very easy forDask dataframes to use all of the tricks from Pandas. For example we can usemost of the keyword arguments from pd.read_csv in dd.read_csv withouthaving to relearn anything.

This data is about 20GB on disk or 60GB in RAM. It’s not huge, but is alsolarger than we’d like to manage on a laptop, especially if we valueinteractivity. The interactive image above is a trace over time of what eachof our 64 cores was doing at any given moment. By hovering your mouse over therectangles you can see that cores switched between downloading byte ranges fromS3 and parsing those bytes with pandas.read_csv.

Our dataset includes every cab ride in the city of New York in the year of2015, including when and where it started and stopped, a breakdown of the fare,etc.

>>> df.head()

VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count trip_distance pickup_longitude pickup_latitude RateCodeID store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount 0 2 2015-01-15 19:05:39 2015-01-15 19:23:42 1 1.59 -73.993896 40.750111 1 N -73.974785 40.750618 1 12.0 1.0 0.5 3.25 0.0 0.3 17.05 1 1 2015-01-10 20:33:38 2015-01-10 20:53:28 1 3.30 -74.001648 40.724243 1 N -73.994415 40.759109 1 14.5 0.5 0.5 2.00 0.0 0.3 17.80 2 1 2015-01-10 20:33:38 2015-01-10 20:43:41 1 1.80 -73.963341 40.802788 1 N -73.951820 40.824413 2 9.5 0.5 0.5 0.00 0.0 0.3 10.80 3 1 2015-01-10 20:33:39 2015-01-10 20:35:31 1 0.50 -74.009087 40.713818 1 N -74.004326 40.719986 2 3.5 0.5 0.5 0.00 0.0 0.3 4.80 4 1 2015-01-10 20:33:39 2015-01-10 20:52:58 1 3.00 -73.971176 40.762428 1 N -74.004181 40.742653 2 15.0 0.5 0.5 0.00 0.0 0.3 16.30

Basic Aggregations and Groupbys

As a quick exercise, we compute the length of the dataframe. When we calllen(df) Dask.dataframe translates this into many len calls on each of theconstituent Pandas dataframes, followed by communication of the intermediateresults to one node, followed by a sum of all of the intermediate lengths.

>>> len(df)
146112989

This takes around 400-500ms. You can see that a few hundred lengthcomputations happened quickly on the left, followed by some delay, then a bitof data transfer (the red bar in the plot), and a final summation call.

More complex operations like simple groupbys look similar, although sometimeswith more communications. Throughout this post we’re going to do more and morecomplex computations and our profiles will similarly become more and more richwith information. Here we compute the average trip distance, grouped by numberof passengers. We find that single and double person rides go far longerdistances on average. We acheive this one big-data-groupby by performing manysmall Pandas groupbys and then cleverly combining their results.

>>> df.groupby(df.passenger_count).trip_distance.mean().compute()
passenger_count
0 2.279183
1 15.541413
2 11.815871
3 1.620052
4 7.481066
5 3.066019
6 2.977158
9 5.459763
7 3.303054
8 3.866298
Name: trip_distance, dtype: float64

As a more complex operation we see how well New Yorkers tip by hour of day andby day of week.

df2 = df[(df.tip_amount > 0) & (df.fare_amount > 0)] # filter out bad rows
df2['tip_fraction'] = df2.tip_amount / df2.fare_amount # make new column

dayofweek = (df2.groupby(df2.tpep_pickup_datetime.dt.dayofweek)
.tip_fraction
.mean())
hour = (df2.groupby(df2.tpep_pickup_datetime.dt.hour)
.tip_fraction
.mean())

tip fraction by hour

We see that New Yorkers are generally pretty generous, tipping around 20%-25%on average. We also notice that they become very generous at 4am, tipping anaverage of 38%.

This more complex operation uses more of the Dask dataframe API (which mimicsthe Pandas API). Pandas users should find the code above fairly familiar. Weremove rows with zero fare or zero tip (not every tip gets recorded), make anew column which is the ratio of the tip amount to the fare amount, and thengroupby the day of week and hour of day, computing the average tip fraction foreach hour/day.

Dask evaluates this computation with thousands of small Pandas calls across thecluster (try clicking the wheel zoom icon in the upper right of the imageabove and zooming in). The answer comes back in about 3 seconds.

Joins and Correlations

To show off more basic functionality we’ll join this Dask dataframe against asmaller Pandas dataframe that includes names of some of the more crypticcolumns. Then we’ll correlate two derived columns to determine if there is arelationship between paying Cash and the recorded tip.

>>> payments = pd.Series({1: 'Credit Card',
2: 'Cash',
3: 'No Charge',
4: 'Dispute',
5: 'Unknown',
6: 'Voided trip'})

>>> df2 = df.merge(payments, left_on='payment_type', right_index=True)
>>> df2.groupby(df2.payment_name).tip_amount.mean().compute()
payment_name
Cash 0.000217
Credit Card 2.757708
Dispute -0.011553
No charge 0.003902
Unknown 0.428571
Name: tip_amount, dtype: float64

We see that while the average tip for a credit card transaction is $2.75, theaverage tip for a cash transaction is very close to zero. At first glance itseems like cash tips aren’t being reported. To investigate this a bit furtherlets compute the Pearson correlation between paying cash and having zero tip.Again, this code should look very familiar to Pandas users.

zero_tip = df2.tip_amount == 0
cash = df2.payment_name == 'Cash'

dd.concat([zero_tip, cash], axis=1).corr().compute()

tip_amount payment_name tip_amount 1.000000 0.943123 payment_name 0.943123 1.000000

So we see that standard operations like row filtering, column selection,groupby-aggregations, joining with a Pandas dataframe, correlations, etc. alllook and feel like the Pandas interface. Additionally, we’ve seen throughprofile plots that most of the time is spent just running Pandas functions onour workers, so Dask.dataframe is, in most cases, adding relatively littleoverhead. These little functions represented by the rectangles in these plotsare just pandas functions. For example the plot above has many rectangleslabeled merge if you hover over them. This is just the standardpandas.merge function that we love and know to be very fast in memory.

Shuffles and Time Series

Distributed dataframe experts will know that none of the operations aboverequire a shuffle. That is we can do most of our work with relatively littleinter-node communication. However not all operations can avoid communicationlike this and sometimes we need to exchange most of the data between differentworkers.

For example if our dataset is sorted by customer ID but we want to sort it bytime then we need to collect all the rows for January over to one Pandasdataframe, all the rows for February over to another, etc.. This operation iscalled a shuffle and is the base of computations like groupby-apply,distributed joins on columns that are not the index, etc..

You can do a lot with dask.dataframe without performing shuffles, but sometimesit’s necessary. In the following example we sort our data by pickup datetime.This will allow fast lookups, fast joins, and fast time series operations, allcommon cases. We do one shuffle ahead of time to make all future computationsfast.

We set the index as the pickup datetime column. This takes anywhere from25-40s and is largely network bound (60GB, some text, eight machines witheight cores each on AWS non-enhanced network). This also requires runningsomething like 16000 tiny tasks on the cluster. It’s worth zooming in on theplot below.

>>> df = c.persist(df.set_index('tpep_pickup_datetime'))

This operation is expensive, far more expensive than it was with Pandas whenall of the data was in the same memory space on the same computer. This is agood time to point out that you should only use distributed tools likeDask.datframe and Spark after tools like Pandas break down. We should onlymove to distributed systems when absolutely necessary. However, when it doesbecome necessary, it’s nice knowing that Dask.dataframe can faithfully executePandas operations, even if some of them take a bit longer.

As a result of this shuffle our data is now nicely sorted by time, which willkeep future operations close to optimal. We can see how the dataset is sortedby pickup time by quickly looking at the first entries, last entries, andentries for a particular day.

>>> df.head() # has the first entries of 2015

VendorID tpep_dropoff_datetime passenger_count trip_distance pickup_longitude pickup_latitude RateCodeID store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount tpep_pickup_datetime 2015-01-01 00:00:00 2 2015-01-01 00:00:00 3 1.56 -74.001320 40.729057 1 N -74.010208 40.719662 1 7.5 0.5 0.5 0.0 0.0 0.3 8.8 2015-01-01 00:00:00 2 2015-01-01 00:00:00 1 1.68 -73.991547 40.750069 1 N 0.000000 0.000000 2 10.0 0.0 0.5 0.0 0.0 0.3 10.8 2015-01-01 00:00:00 1 2015-01-01 00:11:26 5 4.00 -73.971436 40.760201 1 N -73.921181 40.768269 2 13.5 0.5 0.5 0.0 0.0 0.0 14.5

>>> df.tail() # has the last entries of 2015

VendorID tpep_dropoff_datetime passenger_count trip_distance pickup_longitude pickup_latitude RateCodeID store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount tpep_pickup_datetime 2015-12-31 23:59:56 1 2016-01-01 00:09:25 1 1.00 -73.973900 40.742893 1 N -73.989571 40.750549 1 8.0 0.5 0.5 1.85 0.0 0.3 11.15 2015-12-31 23:59:58 1 2016-01-01 00:05:19 2 2.00 -73.965271 40.760281 1 N -73.939514 40.752388 2 7.5 0.5 0.5 0.00 0.0 0.3 8.80 2015-12-31 23:59:59 2 2016-01-01 00:10:26 1 1.96 -73.997559 40.725693 1 N -74.017120 40.705322 2 8.5 0.5 0.5 0.00 0.0 0.3 9.80

>>> df.loc['2015-05-05'].head() # has the entries for just May 5th

VendorID tpep_dropoff_datetime passenger_count trip_distance pickup_longitude pickup_latitude RateCodeID store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount tpep_pickup_datetime 2015-05-05 2 2015-05-05 00:00:00 1 1.20 -73.981941 40.766460 1 N -73.972771 40.758007 2 6.5 1.0 0.5 0.00 0.00 0.3 8.30 2015-05-05 1 2015-05-05 00:10:12 1 1.70 -73.994675 40.750507 1 N -73.980247 40.738560 1 9.0 0.5 0.5 2.57 0.00 0.3 12.87 2015-05-05 1 2015-05-05 00:07:50 1 2.50 -74.002930 40.733681 1 N -74.013603 40.702362 2 9.5 0.5 0.5 0.00 0.00 0.3 10.80

Because we know exactly which Pandas dataframe holds which data we canexecute row-local queries like this very quickly. The total round trip frompressing enter in the interpreter or notebook is about 40ms. For reference,40ms is the delay between two frames in a movie running at 25 Hz. This meansthat it’s fast enough that human users perceive this query to be entirelyfluid.

Time Series

Additionally, once we have a nice datetime index all of Pandas’ time seriesfunctionality becomes available to us.

For example we can resample by day:

>>> (df.passenger_count
.resample('1d')
.mean()
.compute()
.plot())

resample by day

We observe a strong periodic signal here. The number of passengers is reliablyhigher on the weekends.

We can perform a rolling aggregation in about a second:

>>> s = client.persist(df.passenger_count.rolling(10).mean())

Because Dask.dataframe inherits the Pandas index all of these operations becomevery fast and intuitive.

Parquet

Pandas’ standard “fast” recommended storage solution has generally been theHDF5 data format. Unfortunately the HDF5 file format is not ideal fordistributed computing, so most Dask dataframe users have had to switch down toCSV historically. This is unfortunate because CSV is slow, doesn’t supportpartial queries (you can’t read in just one column), and also isn’t supportedwell by the other standard distributed Dataframe solution, Spark. This makes ithard to move data back and forth.

Fortunately there are now two decent Python readers for Parquet, a fastcolumnar binary store that shards nicely on distributed data stores like theHadoop File System (HDFS, not to be confused with HDF5) and Amazon’s S3. Thealready fast Parquet-cpp project hasbeen growing Python and Pandas support throughArrow, and the Fastparquetproject, which is an offshoot from thepure-python parquet library hasbeen growing speed through use ofNumPy andNumba.

Using Fastparquet under the hood, Dask.dataframe users can now happily read andwrite to Parquet files. This increases speed, decreases storage costs, andprovides a shared format that both Dask dataframes and Spark dataframes canunderstand, improving the ability to use both computational systems in the sameworkflow.

Writing our Dask dataframe to S3 can be as simple as the following:

df.to_parquet('s3://dask-data/nyc-taxi/tmp/parquet')

However there are also a variety of options we can use to store our data morecompactly through compression, encodings, etc.. Expert users will probablyrecognize some of the terms below.

df = df.astype({'VendorID': 'uint8',
'passenger_count': 'uint8',
'RateCodeID': 'uint8',
'payment_type': 'uint8'})

df.to_parquet('s3://dask-data/nyc-taxi/tmp/parquet',
compression='snappy',
has_nulls=False,
object_encoding='utf8',
fixed_text={'store_and_fwd_flag': 1})

We can then read our nicely indexed dataframe back with thedd.read_parquet function:

>>> df2 = dd.read_parquet('s3://dask-data/nyc-taxi/tmp/parquet')

The main benefit here is that we can quickly compute on single columns. Thefollowing computation runs in around 6 seconds, even though we don’t have anydata in memory to start (recall that we started this blogpost with aminute-long call to read_csv.andClient.persist)

>>> df2.passenger_count.value_counts().compute()
1 102991045
2 20901372
5 7939001
3 6135107
6 5123951
4 2981071
0 40853
7 239
8 181
9 169
Name: passenger_count, dtype: int64

Final Thoughts

With the recent addition of faster shuffles and Parquet support, Daskdataframes become significantly more attractive. This blogpost gave a fewcategories of common computations, along with precise profiles of theirexecution on a small cluster. Hopefully people find this combination of Pandassyntax and scalable computing useful.

Now would also be a good time to remind people that Dask dataframe is only onemodule among many within the Dask project.Dataframes are nice, certainly, but Dask’s main strength is its flexibility tomove beyond just plain dataframe computations to handle even more complexproblems.

Learn More

If you’d like to learn more about Dask dataframe, the Dask distributed system,or other components you should look at the following documentation:

  1. http://dask.pydata.org/en/latest/
  2. http://distributed.readthedocs.io/en/latest/

The workflows presented here are captured in the following notebooks (amongother examples):

  1. NYC Taxi example, shuffling, others
  2. Parquet

What we could have done better

As always with computational posts we include a section on what went wrong, orwhat could have gone better.

  1. The 400ms computation of len(df) is a regression from previousversions where this was closer to 100ms. We’re getting bogged downsomewhere in many small inter-worker communications.
  2. It would be nice to repeat this computation at a larger scale. Daskdeployments in the wild are often closer to 1000 cores rather than the 64core cluster we have here and datasets are often in the terrabyte scalerather than our 60 GB NYC Taxi dataset. Unfortunately representative largeopen datasets are hard to find.
  3. The Parquet timings are nice, but there is still room for improvement. Weseem to be making many small expensive queries of S3 when reading Thriftheaders.
  4. It would be nice to support both Python Parquet readers, both theNumba solutionfastparquet and the C++ solutionparquet-cpp