Sal
Peter Hoffmann Director Data Engineering at Blue Yonder. Python Developer, Conference Speaker, Mountaineer

Azure Data Explorer and Parquet files in the Azure Blob Storage

Last summer Microsoft has rebranded the Azure Kusto Query engine as Azure Data Explorer. While it does not support fully elastic scaling, it at least allows to scale up and out a cluster via an API or the Azure portal to adapt to different workloads. It also offers parquet support out of the box which made me spend some time to look into it.

Azure Data Explorer

With the heavy use of Apache Parquet datasets within my team at Blue Yonder we are always looking for managed, scalable/elastic query engines on flat files beside the usual suspects like drill, hive, presto or impala.

For the following tests I deployed a Azure Data Explorer cluster with two instances of Standard_D14_v2 servers with each 16 vCores, 112 GiB ram, 800 GiB SSD storage and a network bandwidth class extremely high (which corresponds to 8 NICs).

Data Preparation NY Taxi Dataset

Like in the understanding parquet predicate pushdown blog post we are using the NY Taxi dataset for the tests because it has a reasonable size and some nice properties like different datatypes and includes some messy data (like all real world data engineering problems).

mkdir csv
mkdir parquet
cd csv
for i in {01..12};  do
  wget get https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2018-$i.csv
done

We can convert the csv files to parquet with pandas and pyarrow:

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

months = range(1,13)

def csv_to_parquet(month):
    filename ="csv/yellow_tripdata_2018-{:02d}.csv".format(month)
    df = pd.read_csv(
        filename,
        dtype={"store_and_fwd_flag": "bool"},
        parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
        index_col=False,
        infer_datetime_format=True,
        true_values=["Y"],
        false_values=["N"],
    )
    df = df[(df['tpep_pickup_datetime'].dt.year==2018) & (df['tpep_pickup_datetime'].dt.month==month)]
    filename = "parquet/yellow_tripdata_2018-{:02d}.parquet".format(month)
    df.to_parquet(filename)

for m in months:
    csv_to_parquet(m)

Each csv file has about 700MiB, the parquet files about 180MiB and per file about 10 million rows.

Data Ingestion

The Azure Data Explorer supports control and query commands to interact with the cluster. Kusto control commands always start with a dot and are used to manage the service, query information about it and explore, create and alter tables. The primary query language is the kusto query language, but a subset of T-SQL is also supported.

The table schema definition supports a number of scalar data types:

Type Storage Type (internal name)
bool I8
datetime DateTime
guid UniqueId
int I32
long I64
real R64
string StringBuffer
timespan TimeSpan
decimal Decimal

To create a table for the NY Taxi dataset we can use the following control command with the table and columns names and the corresponding data types:

.create table nytaxi(
    VendorID: int,
    tpep_pickup_datetime: datetime,
    tpep_dropoff_datetime: datetime,
    passenger_count: int,
    trip_distance: real,
    RatecodeID: int,
    store_and_fwd_flag: bool,
    PULocationID: int,
    DOLocationID: int,
    payment_type: int,
    fare_amount: real,
    extra: real,
    mta_tax: real,
    tip_amount: real,
    tolls_amount: real,
    improvement_surcharge: real,
    total_amount: real
)

Ingest data into the Azure Data Explorer

The .ingest into table command can read the data from an Azure Blob or Azure Data Lake Storage and import the data into the cluster. This means it is ingesting the data and stores it locally for a better performance. Authentication is done with Azure SaS Tokens.

Importing one month of csv data takes about 110 seconds. As a reference parsing the same csv file with pandas.read_csv takes about 19 seconds.

One should always use of obfuscated strings (the h in front of the string values) to ensure that the SaS Token is never recorded or logged:

.ingest into table nytaxi
h'https://phoffmann.blob.core.windows.net/nytaxi/csv/yellow_tripdata_2018-01.csv?sp=r&st=2020-02-01T09:20:07Z&se=2020-02-24T20:20:07Z&spr=https&sv=2019-02-02&sr=b&sig=XXX'
with (ignoreFirstRecord=true)
;

Ingesting parquet data from the azure blob storage uses the similar command, and determines the different file format from the file extension. Beside csv and parquet quite some more data formats like json, jsonlines, ocr and avro are supported. According to the documentation it is also possible to specify the format by appending with (format="parquet").

.ingest into table nytaxi_parquet
h'https://phoffmann.blob.core.windows.net/nytaxi/parquet/yellow_tripdata_2018-01.parquet?sp=r&st=2020-02-01T09:17:43Z&se=2020-02-27T20:17:43Z&spr=https&sv=2019-02-02&sr=b&sig=xxx'
;

Loading the data from parquet only took 30s and already gives us a nice speedup. One can also use multiple parquet files in the blob store to load the data in one run, but I did not get a performance improvement (e.g better than duration times number files, which I interpret that there is no parallel import happening):

.ingest into table nytaxi_parquet(
h'https://phoffmann.blob.core.windows.net/nytaxi/parquet/yellow_tripdata_2018-01.parquet?sp=r&st=2020-02-01T09:17:43Z&se=2020-02-27T20:17:43Z&spr=https&sv=2019-02-02&sr=b&sig=xxx',
h'https://phoffmann.blob.core.windows.net/nytaxi/parquet/yellow_tripdata_2018-02.parquet?sp=r&st=2020-02-01T09:17:43Z&se=2020-02-27T20:17:43Z&spr=https&sv=2019-02-02&sr=b&sig=xxx',
h'https://phoffmann.blob.core.windows.net/nytaxi/parquet/yellow_tripdata_2018-03.parquet?sp=r&st=2020-02-01T09:17:43Z&se=2020-02-27T20:17:43Z&spr=https&sv=2019-02-02&sr=b&sig=xxx'
)
;

Once the data is ingested on can nicely query it using the Azure Data explorer either in the Kusto query language or in T-SQL:

Kusto Query

Query External Tables

Loading the data into the cluster gives best performance, but often one just wants to do an ad hoc query on parquet data in the blob storage. Using external tables supports exactly this scenario. And this time using multiple files/partitioning helped to speed up the query.

.create external table nytaxi_parquet_external(
VendorID: int,
tpep_pickup_datetime: datetime,
tpep_dropoff_datetime: datetime,
passenger_count: int,
trip_distance: real,
RatecodeID: int,
store_and_fwd_flag: bool,
PULocationID: int,
DOLocationID: int,
payment_type: int,
fare_amount: real,
extra: real,
mta_tax: real,
tip_amount: real,
tolls_amount: real,
improvement_surcharge: real,
total_amount: real)
kind=blob
dataformat=parquet (
h@'https://phoffmann.blob.core.windows.net/nytaxi/parquet/yellow_tripdata_2018-01.parquet;xxx',
h@'https://phoffmann.blob.core.windows.net/nytaxi/parquet/yellow_tripdata_2018-02.parquet;xxx',
h@'https://phoffmann.blob.core.windows.net/nytaxi/parquet/yellow_tripdata_2018-03.parquet;xxx',
h@'https://phoffmann.blob.core.windows.net/nytaxi/parquet/yellow_tripdata_2018-04.parquet;xxx',
h@'https://phoffmann.blob.core.windows.net/nytaxi/parquet/yellow_tripdata_2018-05.parquet;xxx',
h@'https://phoffmann.blob.core.windows.net/nytaxi/parquet/yellow_tripdata_2018-06.parquet;xxx',
h@'https://phoffmann.blob.core.windows.net/nytaxi/parquet/yellow_tripdata_2018-07.parquet;xxx',
h@'https://phoffmann.blob.core.windows.net/nytaxi/parquet/yellow_tripdata_2018-08.parquet;xxx',
h@'https://phoffmann.blob.core.windows.net/nytaxi/parquet/yellow_tripdata_2018-09.parquet;xxx',
h@'https://phoffmann.blob.core.windows.net/nytaxi/parquet/yellow_tripdata_2018-10.parquet;xxx',
h@'https://phoffmann.blob.core.windows.net/nytaxi/parquet/yellow_tripdata_2018-11.parquet;xxx',
h@'https://phoffmann.blob.core.windows.net/nytaxi/parquet/yellow_tripdata_2018-12.parquet;xxx'
)
with (docstring = "NyTaxiDataset")
;

Querying external data looks similar but has the benefit that one does not have to load the data into the cluster. In a follow up post I'll do some performance benchmarks. Based on my first experiences it seems like the query engine is aware of some of the parquet properties like columnar storage and predicate pushdown, because queries return results faster than loading the full data from the blob storage (with the 30mb/s limit) would take.

external_table("nytaxi_parquet_external")
    | take 100
;

external table

Export Data

For the for the sake of completeness I'll just show an example how to export data from the cluster back to a parquet file in the azure lob storage:

.export
  to parquet (
    h@"https://phoffmann.blob.core.windows.net/nytaxi/export/test.parquet;xxx"
  )
  <| nytaxi |  limit 100
  ;