Submit New Event

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Submit News Feature

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Contribute a Blog

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Sign up for Newsletter

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Jun 28, 2017

Use Apache Parquet

By

This work is supported by Continuum Analyticsand the Data Driven Discovery Initiative from the MooreFoundation.

This is a tiny blogpost to encourage you to useParquet instead of CSV for your dataframecomputations. I’ll use Dask.dataframe here but Pandas would work just as well.I’ll also use my local laptop here, but Parquet is an excellent format to useon a cluster.

CSV is convenient, but slow

I have the NYC taxi cab dataset on my laptop stored as CSV

[email protected]:~/data/nyc/csv$ ls
yellow_tripdata_2015-01.csv yellow_tripdata_2015-07.csv
yellow_tripdata_2015-02.csv yellow_tripdata_2015-08.csv
yellow_tripdata_2015-03.csv yellow_tripdata_2015-09.csv
yellow_tripdata_2015-04.csv yellow_tripdata_2015-10.csv
yellow_tripdata_2015-05.csv yellow_tripdata_2015-11.csv
yellow_tripdata_2015-06.csv yellow_tripdata_2015-12.csv

This is a convenient format for humans because we can read it directly.

[email protected]:~/data/nyc/csv$ head yellow_tripdata_2015-01.csv
VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
2,2015-01-15 19:05:39,2015-01-15
19:23:42,1,1.59,-73.993896484375,40.750110626220703,1,N,-73.974784851074219,40.750617980957031,1,12,1,0.5,3.25,0,0.3,17.05
1,2015-01-10 20:33:38,2015-01-10
20:53:28,1,3.30,-74.00164794921875,40.7242431640625,1,N,-73.994415283203125,40.759109497070313,1,14.5,0.5,0.5,2,0,0.3,17.8
1,2015-01-10 20:33:38,2015-01-10
20:43:41,1,1.80,-73.963340759277344,40.802787780761719,1,N,-73.951820373535156,40.824413299560547,2,9.5,0.5,0.5,0,0,0.3,10.8
1,2015-01-10 20:33:39,2015-01-10
20:35:31,1,.50,-74.009086608886719,40.713817596435547,1,N,-74.004325866699219,40.719985961914063,2,3.5,0.5,0.5,0,0,0.3,4.8
1,2015-01-10 20:33:39,2015-01-10
20:52:58,1,3.00,-73.971176147460938,40.762428283691406,1,N,-74.004180908203125,40.742652893066406,2,15,0.5,0.5,0,0,0.3,16.3
1,2015-01-10 20:33:39,2015-01-10
20:53:52,1,9.00,-73.874374389648438,40.7740478515625,1,N,-73.986976623535156,40.758193969726563,1,27,0.5,0.5,6.7,5.33,0.3,40.33
1,2015-01-10 20:33:39,2015-01-10
20:58:31,1,2.20,-73.9832763671875,40.726009368896484,1,N,-73.992469787597656,40.7496337890625,2,14,0.5,0.5,0,0,0.3,15.3
1,2015-01-10 20:33:39,2015-01-10
20:42:20,3,.80,-74.002662658691406,40.734142303466797,1,N,-73.995010375976563,40.726325988769531,1,7,0.5,0.5,1.66,0,0.3,9.96
1,2015-01-10 20:33:39,2015-01-10
21:11:35,3,18.20,-73.783042907714844,40.644355773925781,2,N,-73.987594604492187,40.759357452392578,2,52,0,0.5,0,5.33,0.3,58.13

We can use tools like Pandas or Dask.dataframe to read in all of this data.Because the data is large-ish, I’ll use Dask.dataframe

[email protected]:~/data/nyc/csv$ du -hs .
22G .

In [1]: import dask.dataframe as dd

In [2]: %time df = dd.read_csv('yellow_tripdata_2015-*.csv')
CPU times: user 340 ms, sys: 12 ms, total: 352 ms
Wall time: 377 ms

In [3]: df.head()
Out[3]:
VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count \
0 2 2015-01-15 19:05:39 2015-01-15 19:23:42 1
1 1 2015-01-10 20:33:38 2015-01-10 20:53:28 1
2 1 2015-01-10 20:33:38 2015-01-10 20:43:41 1
3 1 2015-01-10 20:33:39 2015-01-10 20:35:31 1
4 1 2015-01-10 20:33:39 2015-01-10 20:52:58 1

trip_distance pickup_longitude pickup_latitude RateCodeID \
0 1.59 -73.993896 40.750111 1
1 3.30 -74.001648 40.724243 1
2 1.80 -73.963341 40.802788 1
3 0.50 -74.009087 40.713818 1
4 3.00 -73.971176 40.762428 1

store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type \
0 N -73.974785 40.750618 1
1 N -73.994415 40.759109 1
2 N -73.951820 40.824413 2
3 N -74.004326 40.719986 2
4 N -74.004181 40.742653 2

fare_amount extra mta_tax tip_amount tolls_amount \
0 12.0 1.0 0.5 3.25 0.0
1 14.5 0.5 0.5 2.00 0.0
2 9.5 0.5 0.5 0.00 0.0
3 3.5 0.5 0.5 0.00 0.0
4 15.0 0.5 0.5 0.00 0.0

improvement_surcharge total_amount
0 0.3 17.05
1 0.3 17.80
2 0.3 10.80
3 0.3 4.80
4 0.3 16.30

In [4]: from dask.diagnostics import ProgressBar

In [5]: ProgressBar().register()

In [6]: df.passenger_count.sum().compute()
[########################################] | 100% Completed |
3min 58.8s
Out[6]: 245566747

We were able to ask questions about this data (and learn that 250 millionpeople rode cabs in 2016) even though it is too large to fit into memory. Thisis because Dask is able to operate lazily from disk. It reads in the data onan as-needed basis and then forgets it when it no longer needs it. This takesa while (4 minutes) but does just work.

However, when we read this data many times from disk we start to becomefrustrated by this four minute cost. In Pandas we suffered this cost once aswe moved data from disk to memory. On larger datasets when we don’t haveenough RAM we suffer this cost many times.

Parquet is faster

Lets try this same process with Parquet. I happen to have the same exact datastored in Parquet format on my hard drive.

[email protected]:~/data/nyc$ du -hs nyc-2016.parquet/
17G nyc-2016.parquet/

It is stored as a bunch of individual files, but we don’t actually care aboutthat. We’ll always refer to the directory as the dataset. These files arestored in binary format. We can’t read them as humans

[email protected]:~/data/nyc$ head nyc-2016.parquet/part.0.parquet
<a bunch of illegible bytes>

But computers are much more able to both read and navigate this data. Lets dothe same experiment from before:

In [1]: import dask.dataframe as dd

In [2]: df = dd.read_parquet('nyc-2016.parquet/')

In [3]: df.head()
Out[3]:
tpep_pickup_datetime VendorID tpep_dropoff_datetime passenger_count \
0 2015-01-01 00:00:00 2 2015-01-01 00:00:00 3
1 2015-01-01 00:00:00 2 2015-01-01 00:00:00 1
2 2015-01-01 00:00:00 1 2015-01-01 00:11:26 5
3 2015-01-01 00:00:01 1 2015-01-01 00:03:49 1
4 2015-01-01 00:00:03 2 2015-01-01 00:21:48 2

trip_distance pickup_longitude pickup_latitude RateCodeID \
0 1.56 -74.001320 40.729057 1
1 1.68 -73.991547 40.750069 1
2 4.00 -73.971436 40.760201 1
3 0.80 -73.860847 40.757294 1
4 2.57 -73.969017 40.754269 1

store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type \
0 N -74.010208 40.719662 1
1 N 0.000000 0.000000 2
2 N -73.921181 40.768269 2
3 N -73.868111 40.752285 2
4 N -73.994133 40.761600 2

fare_amount extra mta_tax tip_amount tolls_amount \
0 7.5 0.5 0.5 0.0 0.0
1 10.0 0.0 0.5 0.0 0.0
2 13.5 0.5 0.5 0.0 0.0
3 5.0 0.5 0.5 0.0 0.0
4 14.5 0.5 0.5 0.0 0.0

improvement_surcharge total_amount
0 0.3 8.8
1 0.3 10.8
2 0.0 14.5
3 0.0 6.3
4 0.3 15.8

In [4]: from dask.diagnostics import ProgressBar

In [5]: ProgressBar().register()

In [6]: df.passenger_count.sum().compute()
[########################################] | 100% Completed |
2.8s
Out[6]: 245566747

Same values, but now our computation happens in three seconds, rather than fourminutes. We’re cheating a little bit here (pulling out the passenger countcolumn is especially easy for Parquet) but generally Parquet will be muchfaster than CSV. This lets us work from disk comfortably without worryingabout how much memory we have.

Convert

So do yourself a favor and convert your data

In [1]: import dask.dataframe as dd
In [2]: df = dd.read_csv('csv/yellow_tripdata_2015-*.csv')
In [3]: from dask.diagnostics import ProgressBar
In [4]: ProgressBar().register()
In [5]: df.to_parquet('yellow_tripdata.parquet')
[############ ] | 30% Completed | 1min 54.7s

If you want to be more clever you can specify dtypes and compression whenconverting. This can definitely help give you significantly greater speedups,but just using the default settings will still be a large improvement.

Advantages

Parquet enables the following:

  1. Binary representation of data, allowing for speedy conversion ofbytes-on-disk to bytes-in-memory
  2. Columnar storage, meaning that you can load in as few columns as you needwithout loading the entire dataset
  3. Row-chunked storage so that you can pull out data from a particular rangewithout touching the others
  4. Per-chunk statistics so that you can find subsets quickly
  5. Compression

Parquet Versions

There are two nice Python packages with support for the Parquet format:

  1. pyarrow:Python bindings for the Apache Arrow and Apache Parquet C++ libraries
  2. fastparquet: a directNumPy + Numba implementation of the Parquet format

Both are good. Both can do most things. Each has separate strengths. Thecode above used fastparquet by default but you can change this in Dask withthe engine='arrow' keyword if desired.