Peter Hoffmann

Azure Data Explorer and Parquet Files in Azure Blob Storage

Azure Data Explorer

With the heavy use of Apache Parquet datasets within my team at Blue Yonder, we are always looking for managed, scalable, elastic query engines on flat files, besides the usual suspects like Drill, Hive, Presto, or Impala.

For the following tests, I deployed an Azure Data Explorer cluster with two instances of Standard_D14_v2 servers, each with 16 vCores, 112 GiB RAM, 800 GiB SSD storage, and an Extremely High network bandwidth class (which corresponds to 8 NICs).

Data Preparation NY Taxi Dataset

Like in the understanding parquet predicate pushdown blog post, we are using the NY Taxi dataset for the tests because it has a reasonable size and useful properties, such as different data types, and it includes some messy data (like all real-world data engineering problems).

mkdir csv
mkdir parquet
cd csv
for i in {01..12};  do
  wget https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2018-$i.csv
done

We can convert the CSV files to Parquet with pandas and pyarrow:

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

months = range(1,13)

def csv_to_parquet(month):
    filename ="csv/yellow_tripdata_2018-{:02d}.csv".format(month)
    df = pd.read_csv(
        filename,
        dtype={"store_and_fwd_flag": "bool"},
        parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
        index_col=False,
        infer_datetime_format=True,
        true_values=["Y"],
        false_values=["N"],
    )
    df = df[(df['tpep_pickup_datetime'].dt.year==2018) & (df['tpep_pickup_datetime'].dt.month==month)]
    filename = "parquet/yellow_tripdata_2018-{:02d}.parquet".format(month)
    df.to_parquet(filename)

for m in months:
    csv_to_parquet(m)

Each CSV file is about 700 MiB; the Parquet files are about 180 MiB; each file has about 10 million rows.

Data Ingestion

Azure Data Explorer supports control and query commands to interact with the cluster. Kusto control commands always start with a dot and are used to manage the service, query information about it, and explore, create, and alter tables. The primary query language is the Kusto Query Language, but a subset of T-SQL is also supported.

The table schema definition supports a number of scalar data types:

Type Storage Type (internal name)
bool I8
datetime DateTime
guid UniqueId
int I32
long I64
real R64
string StringBuffer
timespan TimeSpan
decimal Decimal

To create a table for the NY Taxi dataset, we can use the following control command with the table and column names and the corresponding data types:

.create table nytaxi(
    VendorID: int,
    tpep_pickup_datetime: datetime,
    tpep_dropoff_datetime: datetime,
    passenger_count: int,
    trip_distance: real,
    RatecodeID: int,
    store_and_fwd_flag: bool,
    PULocationID: int,
    DOLocationID: int,
    payment_type: int,
    fare_amount: real,
    extra: real,
    mta_tax: real,
    tip_amount: real,
    tolls_amount: real,
    improvement_surcharge: real,
    total_amount: real
)

Ingest data into the Azure Data Explorer

The .ingest into table command can read data from Azure Blob Storage or Azure Data Lake Storage and import the data into the cluster. This means it ingests the data and stores it locally for better performance. Authentication is done with Azure SAS tokens.

Importing one month of CSV data takes about 110 seconds. As a reference, parsing the same CSV file with pandas.read_csv takes about 19 seconds.

You should always use obfuscated strings (the h prefix) to ensure that the SAS token is never recorded or logged:

.ingest into table nytaxi
h'https://phoffmann.blob.core.windows.net/nytaxi/csv/yellow_tripdata_2018-01.csv?sp=r&st=2020-02-01T09:20:07Z&se=2020-02-24T20:20:07Z&spr=https&sv=2019-02-02&sr=b&sig=XXX'
with (ignoreFirstRecord=true)
;

Ingesting Parquet data from Azure Blob Storage uses a similar command and determines the file format from the file extension. Besides CSV and Parquet, many more data formats like JSON, JSON Lines, ORC, and Avro are supported. According to the documentation, it is also possible to specify the format by appending with (format="parquet").

.ingest into table nytaxi_parquet
h'https://phoffmann.blob.core.windows.net/nytaxi/parquet/yellow_tripdata_2018-01.parquet?sp=r&st=2020-02-01T09:17:43Z&se=2020-02-27T20:17:43Z&spr=https&sv=2019-02-02&sr=b&sig=xxx'
;

Loading the data from Parquet only took 30 s and already gives a nice speedup. You can also use multiple Parquet files in the blob store to load the data in one run, but I did not see a performance improvement (i.e., better than duration × number of files), which I interpret as no parallel import happening:

.ingest into table nytaxi_parquet(
h'https://phoffmann.blob.core.windows.net/nytaxi/parquet/yellow_tripdata_2018-01.parquet?sp=r&st=2020-02-01T09:17:43Z&se=2020-02-27T20:17:43Z&spr=https&sv=2019-02-02&sr=b&sig=xxx',
h'https://phoffmann.blob.core.windows.net/nytaxi/parquet/yellow_tripdata_2018-02.parquet?sp=r&st=2020-02-01T09:17:43Z&se=2020-02-27T20:17:43Z&spr=https&sv=2019-02-02&sr=b&sig=xxx',
h'https://phoffmann.blob.core.windows.net/nytaxi/parquet/yellow_tripdata_2018-03.parquet?sp=r&st=2020-02-01T09:17:43Z&se=2020-02-27T20:17:43Z&spr=https&sv=2019-02-02&sr=b&sig=xxx'
)
;

Once the data is ingested, one can query it using Azure Data Explorer, either with the Kusto Query Language or T-SQL:

Kusto Query

Query External Tables

Loading the data into the cluster gives the best performance, but often you just want to run an ad hoc query on Parquet data in Blob storage. Using external tables supports exactly this scenario. This time, using multiple files/partitioning helped speed up the query.

.create external table nytaxi_parquet_external(
VendorID: int,
tpep_pickup_datetime: datetime,
tpep_dropoff_datetime: datetime,
passenger_count: int,
trip_distance: real,
RatecodeID: int,
store_and_fwd_flag: bool,
PULocationID: int,
DOLocationID: int,
payment_type: int,
fare_amount: real,
extra: real,
mta_tax: real,
tip_amount: real,
tolls_amount: real,
improvement_surcharge: real,
total_amount: real)
kind=blob
dataformat=parquet (
h@'https://phoffmann.blob.core.windows.net/nytaxi/parquet/yellow_tripdata_2018-01.parquet;xxx',
h@'https://phoffmann.blob.core.windows.net/nytaxi/parquet/yellow_tripdata_2018-02.parquet;xxx',
h@'https://phoffmann.blob.core.windows.net/nytaxi/parquet/yellow_tripdata_2018-03.parquet;xxx',
h@'https://phoffmann.blob.core.windows.net/nytaxi/parquet/yellow_tripdata_2018-04.parquet;xxx',
h@'https://phoffmann.blob.core.windows.net/nytaxi/parquet/yellow_tripdata_2018-05.parquet;xxx',
h@'https://phoffmann.blob.core.windows.net/nytaxi/parquet/yellow_tripdata_2018-06.parquet;xxx',
h@'https://phoffmann.blob.core.windows.net/nytaxi/parquet/yellow_tripdata_2018-07.parquet;xxx',
h@'https://phoffmann.blob.core.windows.net/nytaxi/parquet/yellow_tripdata_2018-08.parquet;xxx',
h@'https://phoffmann.blob.core.windows.net/nytaxi/parquet/yellow_tripdata_2018-09.parquet;xxx',
h@'https://phoffmann.blob.core.windows.net/nytaxi/parquet/yellow_tripdata_2018-10.parquet;xxx',
h@'https://phoffmann.blob.core.windows.net/nytaxi/parquet/yellow_tripdata_2018-11.parquet;xxx',
h@'https://phoffmann.blob.core.windows.net/nytaxi/parquet/yellow_tripdata_2018-12.parquet;xxx'
)
with (docstring = "NyTaxiDataset")
;

Querying external data looks similar, but has the benefit that you do not have to load the data into the cluster. In a follow-up post, I'll do some performance benchmarks. Based on my initial experiences, it seems that the query engine is aware of some Parquet properties, such as columnar storage and predicate pushdown, because queries return results faster than loading the full data from Blob storage (with the 30 MB/s limit) would take.

external_table("nytaxi_parquet_external")
    | take 100
;

external table

Export Data

For the sake of completeness, I'll just show an example of how to export data from the cluster back to a Parquet file in Azure Blob Storage:

.export
  to parquet (
    h@"https://phoffmann.blob.core.windows.net/nytaxi/export/test.parquet;xxx"
  )
  <| nytaxi |  limit 100
  ;