Understanding Predicate Pushdown at the Row-Group Level in Parquet with PyArrow and Python
Apache Parquet is a columnar file format to work with gigabytes of data. Reading and writing parquet files is efficiently exposed to python with pyarrow. Additional statistics allow clients to use predicate pushdown to only read subsets of data to reduce I/O. Organizing data by column allows for better compression, as data is more homogeneous. Better compression also reduces the bandwidth required to read the input.
Demo Dataset
We are using the NY Taxi Dataset throughout this blog post because it is a real-world dataset, has a reasonable size, and some useful properties like different data types. It also includes some messy data (like all real-world data engineering problems).
mkdir input
cd input
for i in {01..12}; do
wget https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2018-$i.csv
done
Looking at the first rows of the data gives us some insight into the columns and data format
$ head -n4 input/yellow_tripdata_2019-01.csv
VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
1,2019-01-01 00:46:40,2019-01-01 00:53:20,1,1.50,1,N,151,239,1,7,0.5,0.5,1.65,0,0.3,9.95,
1,2019-01-01 00:59:47,2019-01-01 01:18:59,1,2.60,1,N,239,246,1,14,0.5,0.5,1,0,0.3,16.3,
2,2018-12-21 13:48:30,2018-12-21 13:52:40,3,.00,1,N,236,236,1,4.5,0.5,0.5,0,0,0.3,5.8,
Each of the files is roughly 700MiB of uncompressed data:
$ du -sh input/yellow_tripdata_2018-0*
737M input/yellow_tripdata_2018-01.csv
715M input/yellow_tripdata_2018-02.csv
794M input/yellow_tripdata_2018-03.csv
784M input/yellow_tripdata_2018-04.csv
777M input/yellow_tripdata_2018-05.csv
734M input/yellow_tripdata_2018-06.csv
661M input/yellow_tripdata_2018-07.csv
661M input/yellow_tripdata_2018-08.csv
678M input/yellow_tripdata_2018-09.csv
To convert the data to Parquet, we are going to use pandas to read the CSV and store it in one large Parquet file:
import glob
import pandas as pd
files = glob.glob("input/yellow_tripdata_2018-*.csv")
def read_csv(filename):
return pd.read_csv(
filename,
dtype={"store_and_fwd_flag": "bool"},
parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
index_col=False,
infer_datetime_format=True,
true_values=["Y"],
false_values=["N"],
)
dfs = list(map(read_csv, files))
df = pd.concat(dfs)
df.to_parquet("yellow_tripdata_2018.parquet")
The resulting Parquet file has a size of 2.2GiB, while the sum of the
original CSV files was 11GiB. Pandas supports two Parquet implementations,
fastparquet and
pyarrow. They both have
strengths and weaknesses. A comparison should be the topic of another blog post,
and we are going to use PyArrow to analyze the data.
PyArrow can open a Parquet file without directly reading all the data. It exposes metadata and only reads the necessary byte ranges of the file to obtain this information. This is extremely helpful when you are working with Parquet files that are not available locally and are stored in a remote location (like Amazon S3 or Azure Blob Storage), because you are only reading a few kB instead of gigabytes of data to understand your dataset.
import pyarrow.parquet as pq
filename = "yellow_tripdata_2018.parquet"
pq_file = pq.ParquetFile(filename)
data = [["columns:", pq_file.metadata.num_columns],
["rows:", pq_file.metadata.num_rows],
["row_groups:", pq_file.metadata.num_row_groups]
]
So we are working with roughly 103 million records, 18 columns, and the file has 2 row groups.
columns: 18
rows: 102804250
row_groups: 2
The next step is to have a look at the schema of the Parquet file:
s = pq_file.metadata.schema
data = [[s.column(i).name, s.column(i).physical_type, s.column(i).logical_type] for i in range(len(s))]
| Column | physical | logical |
|---|---|---|
| VendorID | INT64 | NONE |
| tpep_pickup_datetime | INT64 | TIMESTAMP_MILLIS |
| tpep_dropoff_datetime | INT64 | TIMESTAMP_MILLIS |
| passenger_count | INT64 | NONE |
| trip_distance | DOUBLE | NONE |
| RatecodeID | INT64 | NONE |
| store_and_fwd_flag | BOOLEAN | NONE |
| PULocationID | INT64 | NONE |
| DOLocationID | INT64 | NONE |
| payment_type | INT64 | NONE |
| fare_amount | DOUBLE | NONE |
| extra | DOUBLE | NONE |
| mta_tax | DOUBLE | NONE |
| tip_amount | DOUBLE | NONE |
| tolls_amount | DOUBLE | NONE |
| improvement_surcharge | DOUBLE | NONE |
| total_amount | DOUBLE | NONE |
Each column has a physical type that defines how the column is stored on disk and an
optional logical type that is used to determine the actual data type. In the case of
tpep_pickup_datetime and tpep_dropoff_datetime, the values are stored as INT64
types on disk but are represented as timestamps in Pandas.
Logical types are used to extend the types that Parquet can store by specifying how the primitive types should be interpreted. This keeps the set of primitive types to a minimum and reuses Parquet's efficient encodings. For example, strings are stored as byte arrays (binary) with a UTF8 annotation; the Parquet logical type definitions provide comprehensive documentation.
Now let's dive a little deeper into the file. A Parquet file consists of one or more row groups, which are a logical horizontal partitioning of the data into rows.

s = pq_file.metadata.schema
data = []
for rg in range(pq_file.metadata.num_row_groups):
rg_meta = pq_file.metadata.row_group(rg)
data.append([rg, rg_meta.num_rows, sizeof_fmt(rg_meta.total_byte_size)])
As we have written the Parquet file with the default values in Pandas, we get row groups with a size on disk between 512MiB and 1.5GiB.
| row group | rows | size |
|---|---|---|
| 0 | 67108864 | 1.4GiB |
| 1 | 35695386 | 753.0MiB |
To understand the defaults of the row group sizing, some historical context is helpful. The Parquet file format was developed as a columnar data storage format of the Apache Hadoop ecosystem and its underlying Hadoop Distributed File System (HDFS):
Larger row groups allow for larger column chunks which makes it possible to do larger sequential IO. Larger groups also require more buffering in the write path (or a two pass write). We recommend large row groups (512MB - 1GB). Since an entire row group might need to be read, we want it to completely fit on one HDFS block. Therefore, HDFS block sizes should also be set to be larger. An optimized read setup would be: 1GB row groups, 1GB HDFS block size, 1 HDFS block per HDFS file. #
When working with Parquet in Python, one typically does not use HDFS as a storage backend, but either the local file system or a cloud blob storage like Amazon S3 or Azure Blob Storage. Depending on the read scenarios, different row group sizes make sense.
Instead of concatenating the CSV files in Pandas and writing them in one batch, one can use pyarrow.ParquetWriter directly to control how many row groups are written:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
months = range(1,13)
def read_csv(month):
filename ="input/yellow_tripdata_2018-{:02d}.csv".format(month)
df = pd.read_csv(
filename,
dtype={"store_and_fwd_flag": "bool"},
parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
index_col=False,
infer_datetime_format=True,
true_values=["Y"],
false_values=["N"],
)
return df[(df['tpep_pickup_datetime'].dt.year==2018) & (df['tpep_pickup_datetime'].dt.month==month)]
dfs = list(map(read_csv, months))
table = pa.Table.from_pandas(dfs[0], preserve_index=False)
writer = pq.ParquetWriter('yellow_tripdata_2018-rowgroups.parquet', table.schema)
for df in dfs:
table = pa.Table.from_pandas(df, preserve_index=False)
writer.write_table(table)
writer.close()
I have also added some data cleansing because, as mentioned earlier, the
taxi dataset includes some messy data, and we only want to have rows within
a monthly dataset with the correct pickup_datetime.
If we analyze the new Parquet file again, we can see that we now have a row group for each month of data:
| row group | rows | total_byte_size |
|---|---|---|
| 0 | 8492076 | 152.0MiB |
| 1 | 8173231 | 150.5MiB |
| 2 | 8040133 | 148.1MiB |
| 3 | 9430376 | 169.8MiB |
| 4 | 8821105 | 162.4MiB |
| 5 | 7849748 | 142.9MiB |
| 6 | 9305515 | 168.0MiB |
| 7 | 8145164 | 149.8MiB |
| 8 | 9224063 | 167.1MiB |
| 9 | 7849134 | 142.9MiB |
| 10 | 8713831 | 157.8MiB |
| 11 | 8759874 | 157.9MiB |
In addition to the data in the row groups, the Parquet format specifies metadata that is written per row group:
rg_meta = pq_file.metadata.row_group(0)
rg_meta.column(0)
Per column we can retrieve metadata such as compression, size, and data type, but also statistical information about the values stored in the row group for the particular column.
<pyarrow._parquet.ColumnChunkMetaData object at 0x7fa958ab72d0>
file_offset: 43125536
file_path:
physical_type: INT64
num_values: 8759557
path_in_schema: tpep_pickup_datetime
is_stats_set: True
statistics:
<pyarrow._parquet.Statistics object at 0x7fa958ab7510>
has_min_max: True
min: 2001-01-05 11:45:23
max: 2018-01-31 23:59:57
null_count: 0
distinct_count: 0
num_values: 8759557
physical_type: INT64
logical_type: Timestamp(isAdjustedToUTC=false, timeUnit=microseconds, is_from_converted_type=false, force_set_converted_type=false)
converted_type (legacy): NONE
compression: SNAPPY
encodings: ('PLAIN_DICTIONARY', 'PLAIN', 'RLE', 'PLAIN')
has_dictionary_page: True
dictionary_page_offset: 1312236
data_page_offset: 2117164
total_compressed_size: 41813300
total_uncompressed_size: 68701768
Looking at the min and max statistics of the tpep_pickup_datetime:
column = 1 # tpep_pickup_datetime
data = [["row group", "min", "max"]]
for rg in range(pq_file.metadata.num_row_groups):
rg_meta = pq_file.metadata.row_group(rg)
data.append([rg, str(rg_meta.column(column).statistics.min), str(rg_meta.column(column).statistics.max)])
print_table(data)
The statistics show an interesting property. The values per row group are disjoint. This means without reading the full data you can know which values to expect in which row group.
| row group | min | max |
|---|---|---|
| 0 | 2018-01-01 00:00:00 | 2018-01-31 23:59:57 |
| 1 | 2018-02-01 00:00:00 | 2018-02-28 23:59:58 |
| 2 | 2018-03-01 00:00:00 | 2018-03-31 23:59:57 |
| 3 | 2018-04-01 00:00:00 | 2018-04-30 23:59:58 |
| 4 | 2018-05-01 00:00:00 | 2018-05-31 23:59:59 |
| 5 | 2018-06-01 00:00:00 | 2018-06-30 23:59:59 |
| 6 | 2018-07-01 00:00:00 | 2018-07-31 23:59:59 |
| 7 | 2018-08-01 00:00:00 | 2018-08-31 23:59:59 |
| 8 | 2018-09-01 00:00:00 | 2018-09-30 23:59:59 |
| 9 | 2018-10-01 00:00:00 | 2018-10-31 23:59:58 |
| 10 | 2018-11-01 00:00:00 | 2018-11-30 23:59:59 |
| 11 | 2018-12-01 00:00:00 | 2018-12-31 23:59:58 |
If columns are sorted and/or row groups have disjoint values in a dataset,
readers can take advantage of this through a feature called predicate pushdown.
To get all taxi trips on a certain day 2018-02-20, the Parquet reader can now
look at the row group statistics, compare the predicate
tpep_pickup_datetime.min <= 2018-02-20 and tpep_pickup_datetime.max >= 2018-02-20 against it, and
only read the parts of the file that potentially include rows for the day.
In our case, one would only have to read row group 1 and by this 150MiB
instead of about 2.2GiB.
In contrast, if we print the statistics for the column trip_distance:
| row group | min | max |
|---|---|---|
| 0 | 0.0 | 189483.84 |
| 1 | 0.0 | 1061.2 |
| 2 | 0.0 | 302.8 |
| 3 | 0.0 | 943.5 |
| 4 | 0.0 | 910.8 |
| 5 | 0.0 | 833.1 |
| 6 | 0.0 | 7655.76 |
| 7 | 0.0 | 5381.5 |
| 8 | 0.0 | 329.63 |
| 9 | 0.0 | 302.0 |
| 10 | 0.0 | 932.9 |
| 11 | 0.0 | 602.3 |
Even if readers are only interested in rows with a certain trip_distance,
one would have to read the whole dataset most of the time. Only for distances
greater than 1000 could one skip some of the row groups.
Summary
Query engines on Parquet files like Hive, Presto, or Dremio provide predicate pushdown out of the box to speed up query times and reduce I/O.
In the Python ecosystem, fastparquet has support for predicate pushdown at row-group level. PyArrow has an open ticket for an efficient implementation in the Parquet C++ reader.
Implementing predicate pushdown in Python on top of the exposed statistics is straightforward. In my team we have done this within kartothek to speed up reads from large datasets from Azure Blob Storage.
