Azure Synapse SQL On-Demand OPENROWSET Common Table Expression with SQLAlchemy
In a previous post I showed how to use turbodbc to access Azure Synapse SQL-on-Demand endpoints. A common pattern is to use the openrowset function to query Parquet data from an external data source like Azure Blob Storage:
select
result.filepath(1) as [c_date],
*
FROM
OPENROWSET(
BULK 'https://<storage_account>.dfs.core.windows.net/<filesystem>/sales/table/c_date=*/*.parquet',
FORMAT='PARQUET'
) with(
[l_id] bigint,
[sales_euro] float,
) as [result]
where c_date='2020-09-01'
Common table expressions help make the SQL code more readable, especially if more than one external data source is queried. Once you have defined the CTE statements at the top, you can use them like normal tables inside your queries:
WITH location AS
(SELECT
*
FROM
OPENROWSET(
BULK 'https://<storage_account>.dfs.core.windows.net/<filesystem>/location/table/*.parquet',
FORMAT='PARQUET'
) with(
[l_id] bigint,
[l_name] varchar(100),
[latitude] float,
[longitude] float
) as [result]
),
sales AS
(SELECT
result.filepath(1) as [c_date],
*
FROM
OPENROWSET(
BULK 'https://<storage_account>.dfs.core.windows.net/<filesystem>/sales/table/c_date=*/*.parquet',
FORMAT='PARQUET'
) with(
[l_id] bigint,
[sales_euro] float,
) as [result]
)
SELECT location.l_id, sales.sales_euro
FROM sales JOIN location ON sales.l_id = location.l_id
where c_date = '2020-01-01'
Still, writing such queries in data pipelines soon becomes cumbersome and error-prone. So once we moved from writing the queries in the Azure Synapse Workbench to using them in our daily workflows with Python, we wanted a better way to programmatically generate SQL statements.
SQLAlchemy is still our library of choice to work with SQL in Python. SQLAlchemy already has support for Microsoft SQL Server, so most of the Azure Synapse SQL-on-Demand features are covered. I have not yet found a native way to work with openrowset queries, but it is easy to use the text() feature to inject the missing statement.
import sqlalchemy as sa
cte_location_raw = '''
*
FROM
OPENROWSET(
BULK 'https://<storage_account>.dfs.core.windows.net/<filesystem>/location/table/*.parquet',
FORMAT='PARQUET'
) with(
[l_id] bigint,
[l_name] varchar(100),
[latitude] float,
[longitude] float
) as [result]
'''
cte = sa.select([sa.text(cte_location_raw)]).cte('location')
q = sa.select([sa.column('l_id'), sa.column('l_code'), sa.column('l_name')]).select_from(cte)
The cte returns a Common Table Expression instance, which is a subclass of the BaseSelect SELECT statement, and can be used in other statements to generate the following code:
WITH location AS
(SELECT
*
FROM
OPENROWSET(
BULK 'https://<storage_account>.dfs.core.windows.net/<filesystem>/location/table/*.parquet',
FORMAT='PARQUET'
) with(
[l_id] bigint,
[l_name] varchar(100),
[latitude] float,
[longitude] float
) as [result]
)
SELECT l_id, l_code, l_name FROM location
The CTE statement does not know about its columns because it only receives the raw SQL text. But you can annotate the sa.text statement with a typemap dictionary so that it exposes which columns are available from the statement. By annotating the CTE, we can use the table.c.column attribute later to reference the columns instead of using sa.column('l_code') as above.
import sqlalchemy as sa
cte_location_raw = '''
*
FROM
OPENROWSET(
BULK 'https://<storage_account>.dfs.core.windows.net/<filesystem>/location/table/*.parquet',
FORMAT='PARQUET'
) with(
[l_id] bigint,
[l_name] varchar(100),
[latitude] float,
[longitude] float
) as [result]
'''
typemap = {"l_id": sa.Integer, "l_code": sa.String, "l_name": sa.String, "latitude": sa.Float, "longitude": sa.Float}
cte = sa.select([sa.text(cte_location_raw, typemap=typemap)]).cte('location')
q = sa.select([cte.c.l_id, cte.c.l_name]).select_from(cte)
So, putting everything together, you can define and test your CTEs in Python:
import sqlalchemy as sa
cte_sales_raw = '''
SELECT
result.filepath(1) as [c_date],
*
FROM
OPENROWSET(
BULK 'https://<storage_account>.dfs.core.windows.net/<filesystem>/sales/table/*.parquet',
FORMAT='PARQUET'
) with(
[l_id] bigint,
[sales_euro] float,
) as [result]
'''
cte_location_raw = '''
SELECT
*
FROM
OPENROWSET(
BULK 'https://<storage_account>.dfs.core.windows.net/<filesystem>/location/table/*.parquet',
FORMAT='PARQUET'
) with(
[l_id] bigint,
[l_name] varchar(100),
[latitude] float,
[longitude] float
) as [result]
'''
typemap_location = {"l_id": sa.Integer, "l_name": sa.String, "latitude": sa.Float, "longitude": sa.Float}
location = sa.select([sa.text(cte_location_raw, typemap=typemap_location).alias("tmp1")]).cte('location')
typemap_sales = {"l_id": sa.Integer, "c_date": sa.Date, "sales_euro": sa.Float}
sales = sa.select([sa.text(cte_sales_raw, typemap=typemap_sales).alias("tmp2")]).cte('sales')
and then compose more complex statements as with any other SQLAlchemy table definitions:
cols = [sales.c.c_date, sales.c.l_id, location.c.l_name, location.c.latitude, location.c.longitude]
q = sa.select(cols).select_from(sales.join(location, sales.c.l_id == location.c.l_id ))
In our production data pipelines at Blue Yonder we typically provide the building blocks to create complex queries in libraries that are maintained by a central team. Testing smaller parts with SQLAlchemy works much better, and it is easier for data scientists to plug them together and focus on high-level model logic.
We like the power of Azure SQL-on-Demand, but managing and testing complex SQL statements is still a challenge, as you can already see from the result of the above code. But at least SQLAlchemy and Python make it easier:
WITH sales AS
(SELECT l_id AS l_id, c_date AS c_date, sales_euro AS sales_euro
FROM (
SELECT
result.filepath(1) as [c_date],
*
FROM
OPENROWSET(
BULK 'https://<storage_account>.dfs.core.windows.net/<filesystem>/sales/table/*.parquet',
FORMAT='PARQUET'
) with(
[l_id] bigint,
[sales_euro] float,
) as [result]
)as tmp1),
location AS
(SELECT l_id AS l_id, l_name AS l_name, latitude AS latitude, longitude AS longitude
FROM (
SELECT
*
FROM
OPENROWSET(
BULK 'https://<storage_account>.dfs.core.windows.net/<filesystem>/location/table/*.parquet',
FORMAT='PARQUET'
) with(
[l_id] bigint,
[l_name] varchar(100),
[latitude] float,
[longitude] float
) as [result]
) as tmp2)
SELECT sales.c_date, sales.l_id, location.l_name, location.latitude, location.longitude
FROM sales JOIN location ON sales.l_id = location.l_id
