Sal
Peter Hoffmann Director Data Engineering at Blue Yonder. Python Developer, Conference Speaker, Mountaineer

Azure Data Lake Storage Gen 2 with Python

Microsoft has released a beta version of the python client azure-storage-file-datalake for the Azure Data Lake Storage Gen 2 service with support for hierarchical namespaces.

Microsoft has released a beta version of the python client azure-storage-file-datalake for the Azure Data Lake Storage Gen 2 service.

The service offers blob storage capabilities with filesystem semantics, atomic operations, and a hierarchical namespace. Azure Data Lake Storage Gen 2 is built on top of Azure Blob Storage, shares the same scaling and pricing structure (only transaction costs are a little bit higher). Multi protocol access allows you to use data created with azure blob storage APIs in the data lake and vice versa. This enables a smooth migration path if you already use the blob storage with tools like kartothek and simplekv to store your datasets in parquet. Naming terminologies differ a little bit. What is called a container in the blob storage APIs is now a file system in the adls context.

pip install azure-storage-file-datalake --pre

The entry point into the Azure Datalake is the DataLakeServiceClient which interacts with the service on a storage account level. It can be authenticated with the account and storage key, SAS tokens or a service principal. A storage account can have many file systems (aka blob containers) to store data isolated from each other.

import os
from azure.storage.filedatalake import DataLakeServiceClient
from azure.core.exceptions import ResourceExistsError

account_name = os.getenv("STORAGE_ACCOUNT_NAME")
credential = os.getenv("STORAGE_ACCOUNT_KEY")
account_url = "https://{}.dfs.core.windows.net/".format(account_name)

datalake_service = DataLakeServiceClient(
            account_url=account_url, credential=credential
        )
file_system = "testfs"
try:
    filesystem_client = datalake.create_file_system(file_system=file_system)
except ResourceExistsError as e:
    filesystem_client = datalake.get_file_system_client(file_system)

The FileSystemClient represents interactions with the directories and folders within it. So let's create some data in the storage.

dir_client = filesystem_client.get_directory_client("incoming")
dir_client.create_directory()

data = """name,population
Berlin, 3406000
Munich, 1275000
"""

file_client = dir_client.create_file("cities.txt")
file_client.append_data(data, 0, len(data))
file_client.flush_data(len(data))
>>> [(i.name, i.is_directory) for i in filesystem_client.get_paths("")]
... [('incoming', True),
     ('incoming/cities.txt', False)]

If the FileClient is created from a DirectoryClient it inherits the path of the direcotry, but you can also instanciate it directly from the FileSystemClient with an absolute path:

>>> file_client = filesystem_client.get_file_client('incoming/cities.txt')
>>> file_client.read_file()
... b'name,population\nBerlin, 3406000\nMunich, 1275000\n'

These interactions with the azure data lake do not differ that much to the existing blob storage API and the data lake client also uses the azure blob storage client behind the scenes.

What differs and is much more interesting is the hierarchical namespace support in azure datalake gen2. The convention of using slashes in the name/key of the objects/files have been already used to organize the content in the blob storage into a hierarchy. With prefix scans over the keys it has also been possible to get the contents of a folder. What has been missing in the azure blob storage API is a way to work on directories with atomic operations.

A typical use case are data pipelines where the data is partitioned over multiple files using a hive like partitioning scheme:

incoming/date=2019-01-01/part1.parquet
incoming/date=2019-01-01/part2.parquet
incoming/date=2019-01-01/part3.parquet
incoming/date=2019-01-02/part1.parquet
incoming/date=2019-01-02/part2.parquet
...

If you work with large datasets with thousands of files moving a daily subset of the data to a processed state would have involved looping over the files in the azure blob API and moving each file individually. This is not only inconvenient and rather slow but also lacks the characteristics of an atomic operation.

With the new azure data lake API it is now easily possible to do in one operation:

directory_client = filesystem_client.get_directory_client('processed')
directory_client.create_directory()

directory_client = filesystem_client.get_directory_client('incoming/date=2019-01-01')
directory_client.rename_directory('testfs/processed/date=2019-01-01')

[(i.name, i.is_directory) for i in filesystem_client.get_paths("")]
[('incoming', True),
 ('incoming/date=2019-01-02', True),
 ('incoming/date=2019-01-02/part1.parquet', False),
 ('incoming/date=2019-01-02/part2.parquet', False),
 ('processed', True),
 ('processed/date=2019-01-01', True),
 ('processed/date=2019-01-01/part1.parquet', False),
 ('processed/date=2019-01-01/part2.parquet', False),
 ('processed/date=2019-01-01/part3.parquet', False)
 ]

Deleting directories and files within is also supported as an atomic operation

directory_client = filesystem_client.get_directory_client('incoming/date=2019-01-02')
directory_client.delete_directory()

So especially the hierarchical namespace support and atomic operations make the new azure datalake API interesting for distributed data pipelines. Extra security features like POSIX permissions on individual directories and files are also notable.