Peter Hoffmann

Understanding Predicate Pushdown at the Row-Group Level in Parquet with PyArrow and Python

Demo Dataset

We are using the NY Taxi Dataset throughout this blog post because it is a real-world dataset, has a reasonable size, and some useful properties like different data types. It also includes some messy data (like all real-world data engineering problems).

mkdir input
cd input
for i in {01..12};  do
  wget https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2018-$i.csv
done

Looking at the first rows of the data gives us some insight into the columns and data format

$ head -n4 input/yellow_tripdata_2019-01.csv
VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
1,2019-01-01 00:46:40,2019-01-01 00:53:20,1,1.50,1,N,151,239,1,7,0.5,0.5,1.65,0,0.3,9.95,
1,2019-01-01 00:59:47,2019-01-01 01:18:59,1,2.60,1,N,239,246,1,14,0.5,0.5,1,0,0.3,16.3,
2,2018-12-21 13:48:30,2018-12-21 13:52:40,3,.00,1,N,236,236,1,4.5,0.5,0.5,0,0,0.3,5.8,

Each of the files is roughly 700MiB of uncompressed data:

$ du -sh input/yellow_tripdata_2018-0*
737M	input/yellow_tripdata_2018-01.csv
715M	input/yellow_tripdata_2018-02.csv
794M	input/yellow_tripdata_2018-03.csv
784M	input/yellow_tripdata_2018-04.csv
777M	input/yellow_tripdata_2018-05.csv
734M	input/yellow_tripdata_2018-06.csv
661M	input/yellow_tripdata_2018-07.csv
661M	input/yellow_tripdata_2018-08.csv
678M	input/yellow_tripdata_2018-09.csv

To convert the data to Parquet, we are going to use pandas to read the CSV and store it in one large Parquet file:

import glob
import pandas as pd

files = glob.glob("input/yellow_tripdata_2018-*.csv")

def read_csv(filename):
    return pd.read_csv(
        filename,
        dtype={"store_and_fwd_flag": "bool"},
        parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
        index_col=False,
        infer_datetime_format=True,
        true_values=["Y"],
        false_values=["N"],
    )
dfs = list(map(read_csv, files))
df = pd.concat(dfs)
df.to_parquet("yellow_tripdata_2018.parquet")

The resulting Parquet file has a size of 2.2GiB, while the sum of the original CSV files was 11GiB. Pandas supports two Parquet implementations, fastparquet and pyarrow. They both have strengths and weaknesses. A comparison should be the topic of another blog post, and we are going to use PyArrow to analyze the data.

PyArrow can open a Parquet file without directly reading all the data. It exposes metadata and only reads the necessary byte ranges of the file to obtain this information. This is extremely helpful when you are working with Parquet files that are not available locally and are stored in a remote location (like Amazon S3 or Azure Blob Storage), because you are only reading a few kB instead of gigabytes of data to understand your dataset.

import pyarrow.parquet as pq

filename = "yellow_tripdata_2018.parquet"
pq_file = pq.ParquetFile(filename)

data = [["columns:", pq_file.metadata.num_columns],
        ["rows:", pq_file.metadata.num_rows],
        ["row_groups:", pq_file.metadata.num_row_groups]
        ]

So we are working with roughly 103 million records, 18 columns, and the file has 2 row groups.

  columns:          18
     rows:   102804250
row_groups:          2

The next step is to have a look at the schema of the Parquet file:

s = pq_file.metadata.schema
data = [[s.column(i).name, s.column(i).physical_type,  s.column(i).logical_type] for i in range(len(s))]
Column physical logical
VendorID INT64 NONE
tpep_pickup_datetime INT64 TIMESTAMP_MILLIS
tpep_dropoff_datetime INT64 TIMESTAMP_MILLIS
passenger_count INT64 NONE
trip_distance DOUBLE NONE
RatecodeID INT64 NONE
store_and_fwd_flag BOOLEAN NONE
PULocationID INT64 NONE
DOLocationID INT64 NONE
payment_type INT64 NONE
fare_amount DOUBLE NONE
extra DOUBLE NONE
mta_tax DOUBLE NONE
tip_amount DOUBLE NONE
tolls_amount DOUBLE NONE
improvement_surcharge DOUBLE NONE
total_amount DOUBLE NONE

Each column has a physical type that defines how the column is stored on disk and an optional logical type that is used to determine the actual data type. In the case of tpep_pickup_datetime and tpep_dropoff_datetime, the values are stored as INT64 types on disk but are represented as timestamps in Pandas.

Logical types are used to extend the types that Parquet can store by specifying how the primitive types should be interpreted. This keeps the set of primitive types to a minimum and reuses Parquet's efficient encodings. For example, strings are stored as byte arrays (binary) with a UTF8 annotation; the Parquet logical type definitions provide comprehensive documentation.

Now let's dive a little deeper into the file. A Parquet file consists of one or more row groups, which are a logical horizontal partitioning of the data into rows.

s = pq_file.metadata.schema
data = []
for rg in range(pq_file.metadata.num_row_groups):
    rg_meta = pq_file.metadata.row_group(rg)
    data.append([rg, rg_meta.num_rows, sizeof_fmt(rg_meta.total_byte_size)])

As we have written the Parquet file with the default values in Pandas, we get row groups with a size on disk between 512MiB and 1.5GiB.

row group rows size
0 67108864 1.4GiB
1 35695386 753.0MiB

To understand the defaults of the row group sizing, some historical context is helpful. The Parquet file format was developed as a columnar data storage format of the Apache Hadoop ecosystem and its underlying Hadoop Distributed File System (HDFS):

Larger row groups allow for larger column chunks which makes it possible to do larger sequential IO. Larger groups also require more buffering in the write path (or a two pass write). We recommend large row groups (512MB - 1GB). Since an entire row group might need to be read, we want it to completely fit on one HDFS block. Therefore, HDFS block sizes should also be set to be larger. An optimized read setup would be: 1GB row groups, 1GB HDFS block size, 1 HDFS block per HDFS file. #

When working with Parquet in Python, one typically does not use HDFS as a storage backend, but either the local file system or a cloud blob storage like Amazon S3 or Azure Blob Storage. Depending on the read scenarios, different row group sizes make sense.

Instead of concatenating the CSV files in Pandas and writing them in one batch, one can use pyarrow.ParquetWriter directly to control how many row groups are written:

import pandas as pd

import pyarrow as pa
import pyarrow.parquet as pq

months = range(1,13)

def read_csv(month):
    filename ="input/yellow_tripdata_2018-{:02d}.csv".format(month)
    df = pd.read_csv(
        filename,
        dtype={"store_and_fwd_flag": "bool"},
        parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
        index_col=False,
        infer_datetime_format=True,
        true_values=["Y"],
        false_values=["N"],
    )
    return df[(df['tpep_pickup_datetime'].dt.year==2018) & (df['tpep_pickup_datetime'].dt.month==month)]

dfs = list(map(read_csv, months))

table = pa.Table.from_pandas(dfs[0], preserve_index=False)
writer = pq.ParquetWriter('yellow_tripdata_2018-rowgroups.parquet', table.schema)

for df in dfs:
    table = pa.Table.from_pandas(df, preserve_index=False)
    writer.write_table(table)
writer.close()

I have also added some data cleansing because, as mentioned earlier, the taxi dataset includes some messy data, and we only want to have rows within a monthly dataset with the correct pickup_datetime.

If we analyze the new Parquet file again, we can see that we now have a row group for each month of data:

row group rows total_byte_size
0 8492076 152.0MiB
1 8173231 150.5MiB
2 8040133 148.1MiB
3 9430376 169.8MiB
4 8821105 162.4MiB
5 7849748 142.9MiB
6 9305515 168.0MiB
7 8145164 149.8MiB
8 9224063 167.1MiB
9 7849134 142.9MiB
10 8713831 157.8MiB
11 8759874 157.9MiB

In addition to the data in the row groups, the Parquet format specifies metadata that is written per row group:

rg_meta = pq_file.metadata.row_group(0)
rg_meta.column(0)

Per column we can retrieve metadata such as compression, size, and data type, but also statistical information about the values stored in the row group for the particular column.

<pyarrow._parquet.ColumnChunkMetaData object at 0x7fa958ab72d0>
  file_offset: 43125536
  file_path:
  physical_type: INT64
  num_values: 8759557
  path_in_schema: tpep_pickup_datetime
  is_stats_set: True
  statistics:
    <pyarrow._parquet.Statistics object at 0x7fa958ab7510>
      has_min_max: True
      min: 2001-01-05 11:45:23
      max: 2018-01-31 23:59:57
      null_count: 0
      distinct_count: 0
      num_values: 8759557
      physical_type: INT64
      logical_type: Timestamp(isAdjustedToUTC=false, timeUnit=microseconds, is_from_converted_type=false, force_set_converted_type=false)
      converted_type (legacy): NONE
  compression: SNAPPY
  encodings: ('PLAIN_DICTIONARY', 'PLAIN', 'RLE', 'PLAIN')
  has_dictionary_page: True
  dictionary_page_offset: 1312236
  data_page_offset: 2117164
  total_compressed_size: 41813300
  total_uncompressed_size: 68701768

Looking at the min and max statistics of the tpep_pickup_datetime:

column = 1 # tpep_pickup_datetime
data = [["row group", "min", "max"]]
for rg in range(pq_file.metadata.num_row_groups):
    rg_meta = pq_file.metadata.row_group(rg)
    data.append([rg, str(rg_meta.column(column).statistics.min), str(rg_meta.column(column).statistics.max)])
print_table(data)

The statistics show an interesting property. The values per row group are disjoint. This means without reading the full data you can know which values to expect in which row group.

row group min max
0 2018-01-01 00:00:00 2018-01-31 23:59:57
1 2018-02-01 00:00:00 2018-02-28 23:59:58
2 2018-03-01 00:00:00 2018-03-31 23:59:57
3 2018-04-01 00:00:00 2018-04-30 23:59:58
4 2018-05-01 00:00:00 2018-05-31 23:59:59
5 2018-06-01 00:00:00 2018-06-30 23:59:59
6 2018-07-01 00:00:00 2018-07-31 23:59:59
7 2018-08-01 00:00:00 2018-08-31 23:59:59
8 2018-09-01 00:00:00 2018-09-30 23:59:59
9 2018-10-01 00:00:00 2018-10-31 23:59:58
10 2018-11-01 00:00:00 2018-11-30 23:59:59
11 2018-12-01 00:00:00 2018-12-31 23:59:58

If columns are sorted and/or row groups have disjoint values in a dataset, readers can take advantage of this through a feature called predicate pushdown. To get all taxi trips on a certain day 2018-02-20, the Parquet reader can now look at the row group statistics, compare the predicate tpep_pickup_datetime.min <= 2018-02-20 and tpep_pickup_datetime.max >= 2018-02-20 against it, and only read the parts of the file that potentially include rows for the day. In our case, one would only have to read row group 1 and by this 150MiB instead of about 2.2GiB.

In contrast, if we print the statistics for the column trip_distance:

row group min max
0 0.0 189483.84
1 0.0 1061.2
2 0.0 302.8
3 0.0 943.5
4 0.0 910.8
5 0.0 833.1
6 0.0 7655.76
7 0.0 5381.5
8 0.0 329.63
9 0.0 302.0
10 0.0 932.9
11 0.0 602.3

Even if readers are only interested in rows with a certain trip_distance, one would have to read the whole dataset most of the time. Only for distances greater than 1000 could one skip some of the row groups.

Summary

Query engines on Parquet files like Hive, Presto, or Dremio provide predicate pushdown out of the box to speed up query times and reduce I/O.

In the Python ecosystem, fastparquet has support for predicate pushdown at row-group level. PyArrow has an open ticket for an efficient implementation in the Parquet C++ reader.

Implementing predicate pushdown in Python on top of the exposed statistics is straightforward. In my team we have done this within kartothek to speed up reads from large datasets from Azure Blob Storage.