Getting started with the Cloudera Kudu storage engine in python

Cloudera Kudu is a distributed storage engine for fast data analytics. The python api is in alpha stage but already usable.

Cloudera Kudu is a new distributed storage engine for fast data analytics. It has been developed to bridge the gap between the Hadoop HDFS/Parquet combination, which provides fast data analytics and excels at scanning over large amount of data, but lacks the support for fast random reads/writes and updating data and HBase which allows fast random reads/writes and updates but lacks fast analytical performance.

To get more insight you should watch the talk Kudu: Resolving transactional and analytic trade-offs in Hadoop from Todd Lipcon or listen to the latest O’Reilly Data Show podcast with Ben Lorica and Todd Lipcon.

Kudu can be used standalone or as an additional storage engine for Impala. Support for Apache Spark is planned and a there is a prototype from Ted Malaska on github.

The Kudu Python support is still considered alpha, but already usable. A simple way to get started is to use the Kudu Quickstart VM for Virtualbox, but precompiled binaries for RHEL and Ubuntu are also available. I also managed to compile Kudu from source on a Debian box without much trouble.

The first thing is to connect to the table server and create a new table with a Kudu Schema. The basic datatypes int, float, double, bool and utf8 encoded strings are available. The timestamp type is not yet supported by the python client:

import kudu

client = kudu.Client("127.0.0.1:7051")


table_name = "sales"

if client.table_exists(table_name):
    client.delete_table(table_name)

cols = [kudu.ColumnSchema.create("id", kudu.INT64),
        kudu.ColumnSchema.create("product_name", kudu.STRING),
        kudu.ColumnSchema.create("location_name", kudu.STRING),
        kudu.ColumnSchema.create("price", kudu.FLOAT),
        kudu.ColumnSchema.create("quantity", kudu.INT32),
        ]
schema = kudu.schema_from_list(cols, 1)
client.create_table(table_name, schema)

To interact with a table you have to open it. Basic metadata is available too:

>>> table = client.open_table(table_name)
>>> for i in range(table.num_columns):
    print table.column(i)
...
ColumnSchema(name=id, type=int64, nullable=False)
ColumnSchema(name=product_name, type=string, nullable=False)
ColumnSchema(name=location_name, type=string, nullable=False)
ColumnSchema(name=price, type=float, nullable=False)
ColumnSchema(name=quantity, type=int32, nullable=False)

Inserting is done through a session object:

session = client.new_session()

op = table.insert()
op["id"] = 0
op["product_name"] = "Steak"
op["location_name"] = "Berlin"
op["price"] = 25.0
op["quantity"] = 1
session.apply(op)
session.flush()


records = [{"id": 1, "product_name": "Burger", "location_name": "Berlin", 
            "price": 5.2, "quantity": 5},
           {"id": 2, "product_name": "Burger", "location_name": "New York", 
            "price": 10.2, "quantity": 3},
           {"id": 3, "product_name": "Steak", "location_name": "New York", 
            "price": 42.5, "quantity":5}]


for r in records:
    op = table.insert()
    for k,v in r.items():
        op[k] = v
    session.apply(op)
session.flush()

To query data you have to open a table scanner:

>>> scanner = table.scanner()
>>> scanner.open()
>>> batch = scanner.read_all()
>>> batch.as_tuples()
[(0, 'Steak', 'Berlin', 25.0, 1), 
 (1, 'Burger', 'Berlin', 5.1999998092651367, 5), 
 (2, 'Burger', 'New York', 10.199999809265137, 3), 
 (3, 'Steak', 'New York', 42.5, 5)]

Additional predicates can be used to filter rows on the kudu server.

>>> scanner = table.scanner()
>>> scanner.add_comparison_predicate("id", kudu.GREATER_EQUAL, 1)
>>> scanner.add_comparison_predicate("id", kudu.LESS_EQUAL, 2)
>>> scanner.open()
>>> batch = scanner.read_all()
>>> batch.as_tuples()
[(1, 'Burger', 'Berlin', 5.1999998092651367, 5), 
 (2, 'Burger', 'New York', 10.199999809265137, 3)]

Documentation for the Python client is still missing, so the best resource at the moment are the Kudu Python Tests. Harsh J Chouraria has written a similar post on writing a simple Kudu Java API program and the Kudu examples also provide some insights. More information about the design decisions can be found in the Kudu Whitepaper.

Using Kudu as a backend for Impala still requires a HDFS and Hive metastore deployment, but accoring to Wes McKinney there is work done to get rid of these dependencies:

In combination with the Ibis data analysis framework, Impala/Kudu looks like a promising backend for a non Java/JVM based distributed dataframe.