Sal
Peter Hoffmann Director Data Engineering at Blue Yonder. Python Developer, Conference Speaker, Mountaineer

Understand predicate pushdown on row group level in Parquet with pyarrow and python

Apache Parquet is a columnar file format to work with gigabytes of data. Reading and writing parquet files is efficiently exposed to python with pyarrow. Additional statistics allow clients to use predicate pushdown to only read subsets of data to reduce I/O. Organizing data by column allows for better compression, as data is more homogeneous. Better compression also reduces the bandwidth required to read the input.

Demo Dataset

We are using the NY Taxi Dataset throughout this blog post because it is a real world dataset, has a reasonable size and some nice properties like different datatypes and includes some messy data (like all real world data engineering problems).

mkdir input
cd input
for i in {01..12};  do
  wget get https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2018-$i.csv
done

Looking at the first rows of the data gives us some insight about the columns and data format

$ head -n4 input/yellow_tripdata_2019-01.csv
VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
1,2019-01-01 00:46:40,2019-01-01 00:53:20,1,1.50,1,N,151,239,1,7,0.5,0.5,1.65,0,0.3,9.95,
1,2019-01-01 00:59:47,2019-01-01 01:18:59,1,2.60,1,N,239,246,1,14,0.5,0.5,1,0,0.3,16.3,
2,2018-12-21 13:48:30,2018-12-21 13:52:40,3,.00,1,N,236,236,1,4.5,0.5,0.5,0,0,0.3,5.8,

Each of the files is roughly 700MiB uncompressed data:

$ du -sh input/yellow_tripdata_2018-0*
737M    input/yellow_tripdata_2018-01.csv
715M    input/yellow_tripdata_2018-02.csv
794M    input/yellow_tripdata_2018-03.csv
784M    input/yellow_tripdata_2018-04.csv
777M    input/yellow_tripdata_2018-05.csv
734M    input/yellow_tripdata_2018-06.csv
661M    input/yellow_tripdata_2018-07.csv
661M    input/yellow_tripdata_2018-08.csv
678M    input/yellow_tripdata_2018-09.csv

To convert the data to parquet we are going to use pandas to read the csv and store it in one large parquet file:

import glob
import pandas as pd

files = glob.glob("input/yellow_tripdata_2018-*.csv")

def read_csv(filename):
    return pd.read_csv(
        filename,
        dtype={"store_and_fwd_flag": "bool"},
        parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
        index_col=False,
        infer_datetime_format=True,
        true_values=["Y"],
        false_values=["N"],
    )
dfs = list(map(read_csv, files))
df = pd.concat(dfs)
df.to_parquet("yellow_tripdata_2018.parquet")

The resulting parquet file has a size of 2.2GiB, while the sum of the original CSV files was 11GiB. Pandas supports two parquet implementations, fastparquet and pyarrow. They both have strengths and weaknesses. A comparison should be topic of an other blog post, and we are going to use pyarrow to analyze the data.

pyarrow can open a parquet file without directly reading all the data. It exposes metadata and only reads the necessary byte ranges of the file to get this information. This is extremely helpful when you are working with parquet files that are not available locally and stored in a remote location (like Amazon S3 or Azure Blob Storage), because you are only reading some kB instead of gigabytes of data to understand your dataset.

import pyarrow.parquet as pq

filename = "yellow_tripdata_2018.parquet"
pq_file = pq.ParquetFile(filename)

data = [["columns:", pq_file.metadata.num_columns],
        ["rows:", pq_file.metadata.num_rows],
        ["row_roups:", pq_file.metadata.num_row_groups]
        ]

So we are working with roughly 10 million records, 18 columns and the file has 2 row groups.

columns:          18
     rows:   102804250
row_roups:           2

The next step is to have a look at the schema of the parquet file:

s = pq_file.metadata.schema
data = [[s.column(i).name, s.column(i).physical_type,  s.column(i).logical_type] for i in range(len(s))]
Column physical logical
VendorID INT64 NONE
tpep_pickup_datetime INT64 TIMESTAMP_MILLIS
tpep_dropoff_datetime INT64 TIMESTAMP_MILLIS
passenger_count INT64 NONE
trip_distance DOUBLE NONE
RatecodeID INT64 NONE
store_and_fwd_flag BOOLEAN NONE
PULocationID INT64 NONE
DOLocationID INT64 NONE
payment_type INT64 NONE
fare_amount DOUBLE NONE
extra DOUBLE NONE
mta_tax DOUBLE NONE
tip_amount DOUBLE NONE
tolls_amount DOUBLE NONE
improvement_surcharge DOUBLE NONE
total_amount DOUBLE NONE

Each column has a physical type that defines how the column is stored on disk and an optional logical type that is used to determine the actual data type. In case of the tpep_pickup_datetime and tpep_pickup_datetime the values are stored as INT64 types on disk but are represented as timestamps in pandas.

Logical types are used to extend the types that parquet can be used to store, by specifying how the primitive types should be interpreted. This keeps the set of primitive types to a minimum and reuses parquet's efficient encodings. For example, strings are stored as byte arrays (binary) with a UTF8 annotation, the parquet logical type definitions provides comprehensive documentation.

Now let's dive a little deeper into the file. A parquet file consists of one ore more row groups, which are a logical horizontal partitioning of the data into rows.

s = pq_file.metadata.schema
data = []
for rg in range(pq_file.metadata.num_row_groups):
    rg_meta = pq_file.metadata.row_group(rg)
    data.append([rg, rg_meta.num_rows, sizeof_fmt(rg_meta.total_byte_size)])

As we have written the parquet file with the default values in pandas we get row groups with a size on disk between 512MiB an 1,5GiB.

rowgroup rows size
0 67108864 1.4GiB
1 35695386 753.0MiB

To understand the defaults of the row group sizing, a little bit of historical context is necessary. The parquet file format was developed as a columnar data storage format of the Apache Hadoop ecosystem and its underlying Hadoop distributed file system (HDFS):

Larger row groups allow for larger column chunks which makes it possible to do larger sequential IO. Larger groups also require more buffering in the write path (or a two pass write). We recommend large row groups (512MB - 1GB). Since an entire row group might need to be read, we want it to completely fit on one HDFS block. Therefore, HDFS block sizes should also be set to be larger. An optimized read setup would be: 1GB row groups, 1GB HDFS block size, 1 HDFS block per HDFS file. #

When working with parquet in python one does typically not use HDFS as a storage backend, but either the local file system or a cloud blob storage like Amazon S3 or Azure blob store. Depending on the read scenarios different row group sizes make sense.

Instead of concatenating the csv files in pandas and write them in one batch, on can use pyarrow.ParquetWriter directly to control how many rowgroups are written:

import pandas as pd

import pyarrow as pa
import pyarrow.parquet as pq

months = range(1,13)

def read_csv(month):
    filename ="input/yellow_tripdata_2018-{:02d}.csv".format(month)
    df = pd.read_csv(
        filename,
        dtype={"store_and_fwd_flag": "bool"},
        parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
        index_col=False,
        infer_datetime_format=True,
        true_values=["Y"],
        false_values=["N"],
    )
    return df[(df['tpep_pickup_datetime'].dt.year==2018) & (df['tpep_pickup_datetime'].dt.month==month)]

dfs = list(map(read_csv, months))

table = pa.Table.from_pandas(dfs[0], preserve_index=False)
writer = pq.ParquetWriter('yellow_tripdata_2018-rowgroups.parquet', table.schema)

for df in dfs:
    table = pa.Table.from_pandas(df, preserve_index=False)
    writer.write_table(table)
writer.close()

I have also added some data cleansing because, as mentioned earlier, the taxi dataset includes some messy data and we only want to have rows within a monthly data set with the correct pickup_datetime.

If we analyze the new parquet file again we can see that we now have a row group fore each month of data:

rowgroup rows total_byte_size
0 8492076 152.0MiB
1 8173231 150.5MiB
2 8040133 148.1MiB
3 9430376 169.8MiB
4 8821105 162.4MiB
5 7849748 142.9MiB
6 9305515 168.0MiB
7 8145164 149.8MiB
8 9224063 167.1MiB
9 7849134 142.9MiB
10 8713831 157.8MiB
11 8759874 157.9MiB

Next to the data in the rowgroups the parquet format specifies some metadata that is written per rowgroup:

rg_meta = pq_file.metadata.row_group(0)
rg_meta.column(0)

Per column we can retrieve metadata like compression, sizing and datatype, but also statistical information about the values stored in the rowgroup for the particular column.

<pyarrow._parquet.ColumnChunkMetaData object at 0x7fa958ab72d0>
  file_offset: 43125536
  file_path:
  physical_type: INT64
  num_values: 8759557
  path_in_schema: tpep_pickup_datetime
  is_stats_set: True
  statistics:
    <pyarrow._parquet.Statistics object at 0x7fa958ab7510>
      has_min_max: True
      min: 2001-01-05 11:45:23
      max: 2018-01-31 23:59:57
      null_count: 0
      distinct_count: 0
      num_values: 8759557
      physical_type: INT64
      logical_type: Timestamp(isAdjustedToUTC=false, timeUnit=microseconds, is_from_converted_type=false, force_set_converted_type=false)
      converted_type (legacy): NONE
  compression: SNAPPY
  encodings: ('PLAIN_DICTIONARY', 'PLAIN', 'RLE', 'PLAIN')
  has_dictionary_page: True
  dictionary_page_offset: 1312236
  data_page_offset: 2117164
  total_compressed_size: 41813300
  total_uncompressed_size: 68701768

Looking at the min and max statistics of the tpep_pickup_datetime:

column = 1 # tpep_pickup_datetime
data = [["rowgroup", "min", "max"]]
for rg in range(pq_file.metadata.num_row_groups):
    rg_meta = pq_file.metadata.row_group(rg)
    data.append([rg, str(rg_meta.column(column).statistics.min), str(rg_meta.column(column).statistics.max)])
print_table(data)

The statistics show an interesting property. The values per rowgroup are disjunct. This means without reading the full data you can know which values to expect in which rowgroup.

rowgroup min max
0 2018-01-01 00:00:00 2018-01-31 23:59:57
1 2018-02-01 00:00:00 2018-02-28 23:59:58
2 2018-03-01 00:00:00 2018-03-31 23:59:57
3 2018-04-01 00:00:00 2018-04-30 23:59:58
4 2018-05-01 00:00:00 2018-05-31 23:59:59
5 2018-06-01 00:00:00 2018-06-30 23:59:59
6 2018-07-01 00:00:00 2018-07-31 23:59:59
7 2018-08-01 00:00:00 2018-08-31 23:59:59
8 2018-09-01 00:00:00 2018-09-30 23:59:59
9 2018-10-01 00:00:00 2018-10-31 23:59:58
10 2018-11-01 00:00:00 2018-11-30 23:59:59
11 2018-12-01 00:00:00 2018-12-31 23:59:58

If columns are sorted and/or rowgroups have disjunct values in a dataset, readers can take advantagea of this through a feature called predicate pushdown. To get all taxi trips on a certain day 2018-02-20 the parquet reader can now look at the rowgroup statistics, compare the predicate tpep_pickup_datetime.min <= 2019-02-20 and tpep_pickup_datetime.max >= 2019-02-20 against it and only read the parts of the file that potentially include rows for the day. In our case one would only have to read the rowgroup 1 and by this 150MiB instead of 2.1 GiB.

In contrast if we print the statistic for the column trip_distance:

rowgroup min max
0 0.0 189483.84
1 0.0 1061.2
2 0.0 302.8
3 0.0 943.5
4 0.0 910.8
5 0.0 833.1
6 0.0 7655.76
7 0.0 5381.5
8 0.0 329.63
9 0.0 302.0
10 0.0 932.9
11 0.0 602.3

Even if readers would only be interested in rows with a certaintrip_distance, one would have to read the whole dataset most of the time. Only for distances greater 1000 one could skip some of the rowgroups.

Summary

Query engines on parquet files like Hive, Presto or Dremio provide predicate pushdown out of the box to speed up query times and reduce I/O.

In the python ecosystem fastparquet has support for predicate pushdown on row group level. pyarrow has an open ticket for an efficient implementation in the parquet C++ reader.

Implementing predicate pushdown in python on top of the exposed statistics is not that hard. In my team we have done this within kartothek to speed up reads from large datasets from the Azure Blob storage.