Azure Data Lake Storage Gen2 with Python
Microsoft has released a beta version of the Python client azure-storage-file-datalake for the Azure Data Lake Storage Gen2 service.
The service offers blob storage capabilities with file system semantics, atomic operations, and a hierarchical namespace. Azure Data Lake Storage Gen2 is built on top of Azure Blob Storage and shares the same scaling and pricing structure (only transaction costs are slightly higher). Multi-protocol access allows you to use data created with Azure Blob Storage APIs in the data lake and vice versa. This enables a smooth migration path if you already use the blob storage with tools like kartothek and simplekv to store your datasets in Parquet. Terminology differs slightly: what is called a container in the Blob Storage APIs is a file system in the ADLS context.
pip install azure-storage-file-datalake --pre
The entry point into Azure Data Lake Storage is the DataLakeServiceClient, which interacts with the service at the storage account level. It can authenticate using the account name and key, SAS token(s), or a service principal. A storage account can have many file systems (also known as blob containers) to store data isolated from each other.
import os
from azure.storage.filedatalake import DataLakeServiceClient
from azure.core.exceptions import ResourceExistsError
account_name = os.getenv("STORAGE_ACCOUNT_NAME")
credential = os.getenv("STORAGE_ACCOUNT_KEY")
account_url = "https://{}.dfs.core.windows.net/".format(account_name)
datalake_service = DataLakeServiceClient(
account_url=account_url, credential=credential
)
file_system = "testfs"
try:
filesystem_client = datalake_service.create_file_system(file_system=file_system)
except ResourceExistsError:
filesystem_client = datalake_service.get_file_system_client(file_system)
The FileSystemClient represents interactions with the files and directories within it. So let’s create some data in the storage.
dir_client = filesystem_client.get_directory_client("incoming")
dir_client.create_directory()
data = """name,population
Berlin, 3406000
Munich, 1275000
"""
file_client = dir_client.create_file("cities.txt")
file_client.append_data(data, 0, len(data))
file_client.flush_data(len(data))
>>> [(i.name, i.is_directory) for i in filesystem_client.get_paths("")]
... [('incoming', True),
('incoming/cities.txt', False)]
If the FileClient is created from a DirectoryClient, it inherits the path of the directory, but you can also instantiate it directly from the FileSystemClient with an absolute path:
>>> file_client = filesystem_client.get_file_client('incoming/cities.txt')
>>> file_client.read_file()
... b'name,population\nBerlin, 3406000\nMunich, 1275000\n'
These interactions with Azure Data Lake do not differ much from the existing Blob Storage API, and the Data Lake client also uses the Azure Blob Storage client behind the scenes.
What differs, and is much more interesting, is the hierarchical namespace support in Azure Data Lake Storage Gen2. The convention of using slashes in the name/key of objects/files has already been used to organize content in Blob Storage into a hierarchy. Prefix scans over keys also made it possible to get the contents of a folder. What was missing in the Azure Blob Storage API was a way to perform atomic operations on directories.
A typical use case is data pipelines where the data is partitioned over multiple files using a Hive-like partitioning scheme:
incoming/date=2019-01-01/part1.parquet
incoming/date=2019-01-01/part2.parquet
incoming/date=2019-01-01/part3.parquet
incoming/date=2019-01-02/part1.parquet
incoming/date=2019-01-02/part2.parquet
...
If you work with large datasets with thousands of files, moving a daily subset of the data to a processed state would involve looping over the files in the Azure Blob API and moving each file individually. This is not only inconvenient and slow but also lacks atomicity.
With the new Azure Data Lake API, it is now easy to do this in one operation:
directory_client = filesystem_client.get_directory_client('processed')
directory_client.create_directory()
directory_client = filesystem_client.get_directory_client('incoming/date=2019-01-01')
directory_client.rename_directory('testfs/processed/date=2019-01-01')
[(i.name, i.is_directory) for i in filesystem_client.get_paths("")]
[('incoming', True),
('incoming/date=2019-01-02', True),
('incoming/date=2019-01-02/part1.parquet', False),
('incoming/date=2019-01-02/part2.parquet', False),
('processed', True),
('processed/date=2019-01-01', True),
('processed/date=2019-01-01/part1.parquet', False),
('processed/date=2019-01-01/part2.parquet', False),
('processed/date=2019-01-01/part3.parquet', False)
]
Deleting directories (and their files) is also supported as an atomic operation:
directory_client = filesystem_client.get_directory_client('incoming/date=2019-01-02')
directory_client.delete_directory()
In particular, hierarchical namespace support and atomic operations make the new Azure Data Lake API attractive for distributed data pipelines. Additional security features, such as POSIX permissions on individual directories and files, are also notable.
