Submit New Event

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Submit News Feature

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Contribute a Blog

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Sign up for Newsletter

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Feb 22, 2016

Pandas on HDFS with Dask Dataframes

By

This work is supported by Continuum Analyticsand the XDATA Programas part of the Blaze Project

In this post we use Pandas in parallel across an HDFS cluster to read CSV data.We coordinate these computations with dask.dataframe. A screencast version ofthis blogpost is available hereand the previous post in this series is availablehere.

To start, we connect to our scheduler, import the hdfs module from thedistributed library, and read our CSV data from HDFS.

>>> from distributed import Executor, hdfs, progress
>>> e = Executor('127.0.0.1:8786')
>>> e
<Executor: scheduler=127.0.0.1:8786 workers=64 threads=64>

>>> nyc2014 = hdfs.read_csv('/nyctaxi/2014/*.csv',
... parse_dates=['pickup_datetime', 'dropoff_datetime'],
... skipinitialspace=True)

>>> nyc2015 = hdfs.read_csv('/nyctaxi/2015/*.csv',
... parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])

>>> nyc2014, nyc2015 = e.persist([nyc2014, nyc2015])
>>> progress(nyc2014, nyc2015)

Our data comes from the New York City Taxi and Limousine Commission whichpublishes all yellow cab taxi rides inNYC for variousyears. This is a nice model dataset for computational tabular data becauseit’s large enough to be annoying while also deep enough to be broadlyappealing. Each year is about 25GB on disk and about 60GB in memory as aPandas DataFrame.

HDFS breaks up our CSV files into 128MB chunks on various hard drives spreadthroughout the cluster. The dask.distributed workers each read the chunks ofbytes local to them and call the pandas.read_csv function on these bytes,producing 391 separate Pandas DataFrame objects spread throughout the memory ofour eight worker nodes. The returned objects, nyc2014 and nyc2015, aredask.dataframe objects whichpresent a subset of the Pandas API to the user, but farm out all of the work tothe many Pandas dataframes they control across the network.

Play with Distributed Data

If we wait for the data to load fully into memory then we can performpandas-style analysis at interactive speeds.

>>> nyc2015.head()

VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count trip_distance pickup_longitude pickup_latitude RateCodeID store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount 0 2 2015-01-15 19:05:39 2015-01-15 19:23:42 1 1.59 -73.993896 40.750111 1 N -73.974785 40.750618 1 12.0 1.0 0.5 3.25 0 0.3 17.05 1 1 2015-01-10 20:33:38 2015-01-10 20:53:28 1 3.30 -74.001648 40.724243 1 N -73.994415 40.759109 1 14.5 0.5 0.5 2.00 0 0.3 17.80 2 1 2015-01-10 20:33:38 2015-01-10 20:43:41 1 1.80 -73.963341 40.802788 1 N -73.951820 40.824413 2 9.5 0.5 0.5 0.00 0 0.3 10.80 3 1 2015-01-10 20:33:39 2015-01-10 20:35:31 1 0.50 -74.009087 40.713818 1 N -74.004326 40.719986 2 3.5 0.5 0.5 0.00 0 0.3 4.80 4 1 2015-01-10 20:33:39 2015-01-10 20:52:58 1 3.00 -73.971176 40.762428 1 N -74.004181 40.742653 2 15.0 0.5 0.5 0.00 0 0.3 16.30

>>> len(nyc2014)
165114373

>>> len(nyc2015)
146112989

Interestingly it appears that the NYC cab industry has contracted a bit in thelast year. There are fewer cab rides in 2015 than in 2014.

When we ask for something like the length of the full dask.dataframe weactually ask for the length of all of the hundreds of Pandas dataframes andthen sum them up. This process of reaching out to all of the workers completesin around 200-300 ms, which is generally fast enough to feel snappy in aninteractive session.

The dask.dataframe API looks just like the Pandas API, except that we call.compute() when we want an actual result.

>>> nyc2014.passenger_count.sum().compute()
279997507.0

>>> nyc2015.passenger_count.sum().compute()
245566747

Dask.dataframes build a plan to get your result and the distributed schedulercoordinates that plan on all of the little Pandas dataframes on the workersthat make up our dataset.

Pandas for Metadata

Let’s appreciate for a moment all the work we didn’t have to do around CSVhandling because Pandas magically handled it for us.

>>> nyc2015.dtypes
VendorID int64
tpep_pickup_datetime datetime64[ns]
tpep_dropoff_datetime datetime64[ns]
passenger_count int64
trip_distance float64
pickup_longitude float64
pickup_latitude float64
RateCodeID int64
store_and_fwd_flag object
dropoff_longitude float64
dropoff_latitude float64
payment_type int64
fare_amount float64
extra float64
mta_tax float64
tip_amount float64
tolls_amount float64
improvement_surcharge float64
total_amount\r float64
dtype: object

We didn’t have to find columns or specify data-types. We didn’t have to parseeach value with an int or float function as appropriate. We didn’t have toparse the datetimes, but instead just specified a parse_datetimes= keyword.The CSV parsing happened about as quickly as can be expected for this format,clocking in at a network total of a bit under 1 GB/s.

Pandas is well loved because it removes all of these little hurdles from thelife of the analyst. If we tried to reinvent a new“Big-Data-Frame” we would have to reimplement all of the work already well doneinside of Pandas. Instead, dask.dataframe just coordinates and reuses the codewithin the Pandas library. It is successful largely due to work from corePandas developers, notably Masaaki Horikoshi(@sinhrks), who have done tremendous work toalign the API precisely with the Pandas core library.

Analyze Tips and Payment Types

In an effort to demonstrate the abilities of dask.dataframe we ask a simplequestion of our data, “how do New Yorkers tip?”. The 2015 NYCTaxi data isquite good about breaking down the total cost of each ride into the fareamount, tip amount, and various taxes and fees. In particular this lets usmeasure the percentage that each rider decided to pay in tip.

>>> nyc2015[['fare_amount', 'tip_amount', 'payment_type']].head()

fare_amount tip_amount payment_type 0 12.0 3.25 1 1 14.5 2.00 1 2 9.5 0.00 2 3 3.5 0.00 2 4 15.0 0.00 2

In the first two lines we see evidence supporting the 15-20% tip standardcommon in the US. The following three lines interestingly show zero tip.Judging only by these first five lines (a very small sample) we see a strongcorrelation here with the payment type. We analyze this a bit more by countingoccurrences in the payment_type column both for the full dataset, andfiltered by zero tip:

>>> %time nyc2015.payment_type.value_counts().compute()
CPU times: user 132 ms, sys: 0 ns, total: 132 ms
Wall time: 558 ms

1 91574644
2 53864648
3 503070
4 170599
5 28
Name: payment_type, dtype: int64

>>> %time nyc2015[nyc2015.tip_amount == 0].payment_type.value_counts().compute()
CPU times: user 212 ms, sys: 4 ms, total: 216 ms
Wall time: 1.69 s

2 53862557
1 3365668
3 502025
4 170234
5 26
Name: payment_type, dtype: int64

We find that almost all zero-tip rides correspond to payment type 2, and thatalmost all payment type 2 rides don’t tip. My un-scientific hypothesis here ispayment type 2 corresponds to cash fares and that we’re observing a tendancy ofdrivers not to record cash tips. However we would need more domain knowledgeabout our data to actually make this claim with any degree of authority.

Analyze Tips Fractions

Lets make a new column, tip_fraction, and then look at the average of thiscolumn grouped by day of week and grouped by hour of day.

First, we need to filter out bad rows, both rows with this odd payment type,and rows with zero fare (there are a surprising number of free cab rides inNYC.) Second we create a new column equal to the ratio of tip_amount / fare_amount.

>>> df = nyc2015[(nyc2015.fare_amount > 0) & (nyc2015.payment_type != 2)]
>>> df = df.assign(tip_fraction=(df.tip_amount / df.fare_amount))

Next we choose to groupby the pickup datetime column in order to see how theaverage tip fraction changes by day of week and by hour. The groupby anddatetime handling of Pandas makes these operations trivial.

>>> dayofweek = df.groupby(df.tpep_pickup_datetime.dt.dayofweek).tip_fraction.mean()
>>> hour = df.groupby(df.tpep_pickup_datetime.dt.hour).tip_fraction.mean()

>>> dayofweek, hour = e.persist([dayofweek, hour])
>>> progress(dayofweek, hour)

Grouping by day-of-week doesn’t show anything too striking to my eye. HoweverI would like to note at how generous NYC cab riders seem to be. A 23-25% tipcan be quite nice:

>>> dayofweek.compute()
tpep_pickup_datetime
0 0.237510
1 0.236494
2 0.236073
3 0.246007
4 0.242081
5 0.232415
6 0.259974
Name: tip_fraction, dtype: float64

But grouping by hour shows that late night and early morning riders are morelikely to tip extravagantly:

>>> hour.compute()
tpep_pickup_datetime
0 0.263602
1 0.278828
2 0.293536
3 0.276784
4 0.348649
5 0.248618
6 0.233257
7 0.216003
8 0.221508
9 0.217018
10 0.225618
11 0.231396
12 0.225186
13 0.235662
14 0.237636
15 0.228832
16 0.234086
17 0.240635
18 0.237488
19 0.272792
20 0.235866
21 0.242157
22 0.243244
23 0.244586
Name: tip_fraction, dtype: float64
In [24]:

We plot this with matplotlib and see a nice trough during business hours with asurge in the early morning with an astonishing peak of 34% at 4am:

Performance

Lets dive into a few operations that run at different time scales. This givesa good understanding of the strengths and limits of the scheduler.

>>> %time nyc2015.head()
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 20.9 ms

This head computation is about as fast as a film projector. You could performthis roundtrip computation between every consecutive frame of a movie; to ahuman eye this appears fluid. In the last postwe asked about how low we could bring latency. In that post we were runningcomputations from my laptop in California and so were bound by transcontinentallatencies of 200ms. This time, because we’re operating from the cluster, wecan get down to 20ms. We’re only able to be this fast because we touch only asingle data element, the first partition. Things change when we need to touchthe entire dataset.

>>> %time len(nyc2015)
CPU times: user 48 ms, sys: 0 ns, total: 48 ms
Wall time: 271 ms

The length computation takes 200-300 ms. This computation takes longer because wetouch every individual partition of the data, of which there are 178. Thescheduler incurs about 1ms of overhead per task, add a bit of latencyand you get the ~200ms total. This means that the scheduler will likely be thebottleneck whenever computations are very fast, such as is the case forcomputing len. Really, this is good news; it means that by improving thescheduler we can reduce these durations even further.

If you look at the groupby computations above you can add the numbers in theprogress bars to show that we computed around 3000 tasks in around 7s. Itlooks like this computation is about half scheduler overhead and about halfbound by actual computation.

Conclusion

We used dask+distributed on a cluster to read CSV data from HDFSinto a dask dataframe. We then used dask.dataframe, which looks identical tothe Pandas dataframe, to manipulate our distributed dataset intuitively andefficiently.

We looked a bit at the performance characteristics of simple computations.

What doesn’t work

As always I’ll have a section like this that honestly says what doesn’t workwell and what I would have done with more time.

  • Dask dataframe implements a commonly used subset of Pandas functionality,not all of it. It’s surprisingly hard to communicate the exact bounds ofthis subset to users. Notably, in the distributed setting we don’t have ashuffle algorithm, so groupby(...).apply(...) and some joins are notyet possible.
  • If you want to use threads, you’ll need Pandas 0.18.0 which, at the time ofthis writing, was still in release candidate stage. This Pandas releasefixes some important GIL related issues.
  • The 1ms overhead per task limit is significant. While we can still scaleout to clusters far larger than what we have here, we probably won’t beable to strongly accelerate very quick operations until we reduce thisnumber.
  • We use the hdfs3 library to readdata from HDFS. This library seems to work great but is new and could usemore active users to flush out bug reports.

Links

Setup and Data

You can obtain public data from the New York City Taxi and Limousine Commissionhere. Idownloaded this onto the head node and dumped it into HDFS with commands likethe following:

wget https://storage.googleapis.com/tlc-trip-data/2015/yellow_tripdata_2015-{01..12}.csv
hdfs dfs -mkdir /nyctaxi
hdfs dfs -mkdir /nyctaxi/2015
hdfs dfs -put yellow*.csv /nyctaxi/2015/

The cluster was hosted on EC2 and was comprised of nine m3.2xlarges with 8cores and 30GB of RAM each. Eight of these nodes were used as workers; theyused processes for parallelism, not threads.