Peter Hoffmann

Getting Started with the Cloudera Kudu Storage Engine in Python

Cloudera Kudu is a new distributed storage engine for fast data analytics. It has been developed to bridge the gap between the Hadoop HDFS/Parquet combination, which provides fast data analytics and excels at scanning large amounts of data but lacks support for fast random reads/writes and updates, and HBase, which allows fast random reads/writes and updates but lacks fast analytical performance.

For more insight, watch the talk Kudu: Resolving transactional and analytic trade-offs in Hadoop from Todd Lipcon or listen to the O’Reilly Data Show podcast with Ben Lorica and Todd Lipcon.

Kudu can be used stand-alone or as an additional storage engine for Impala. Support for Apache Spark is planned, and there is a prototype by Ted Malaska on GitHub.

The Kudu Python client is still considered alpha but already usable. A simple way to get started is to use the Kudu Quickstart VM for VirtualBox, but precompiled binaries for RHEL and Ubuntu are also available. I also managed to compile Kudu from source on a Debian box without much trouble.

The first step is to connect to the tablet server and create a new table with a Kudu Schema. The basic data types int, float, double, bool, and UTF-8–encoded strings are available. The timestamp type is not yet supported by the Python client:

import kudu

client = kudu.Client("127.0.0.1:7051")

table_name = "sales"

if client.table_exists(table_name):
    client.delete_table(table_name)

cols = [kudu.ColumnSchema.create("id", kudu.INT64),
        kudu.ColumnSchema.create("product_name", kudu.STRING),
        kudu.ColumnSchema.create("location_name", kudu.STRING),
        kudu.ColumnSchema.create("price", kudu.FLOAT),
        kudu.ColumnSchema.create("quantity", kudu.INT32),
        ]
schema = kudu.schema_from_list(cols, 1)
client.create_table(table_name, schema)

To interact with a table, you must open it. Basic metadata is also available:

>>> table = client.open_table(table_name)
>>> for i in range(table.num_columns):
    print table.column(i)
...
ColumnSchema(name=id, type=int64, nullable=False)
ColumnSchema(name=product_name, type=string, nullable=False)
ColumnSchema(name=location_name, type=string, nullable=False)
ColumnSchema(name=price, type=float, nullable=False)
ColumnSchema(name=quantity, type=int32, nullable=False)

Inserts are done through a session object:

session = client.new_session()

op = table.insert()
op["id"] = 0
op["product_name"] = "Steak"
op["location_name"] = "Berlin"
op["price"] = 25.0
op["quantity"] = 1
session.apply(op)
session.flush()

records = [{"id": 1, "product_name": "Burger", "location_name": "Berlin",
            "price": 5.2, "quantity": 5},
           {"id": 2, "product_name": "Burger", "location_name": "New York",
            "price": 10.2, "quantity": 3},
           {"id": 3, "product_name": "Steak", "location_name": "New York",
            "price": 42.5, "quantity": 5}]

for r in records:
    op = table.insert()
    for k, v in r.items():
        op[k] = v
    session.apply(op)
session.flush()

To query data, open a table scanner:

>>> scanner = table.scanner()
>>> scanner.open()
>>> batch = scanner.read_all()
>>> batch.as_tuples()
[(0, 'Steak', 'Berlin', 25.0, 1), 
 (1, 'Burger', 'Berlin', 5.1999998092651367, 5), 
 (2, 'Burger', 'New York', 10.199999809265137, 3), 
 (3, 'Steak', 'New York', 42.5, 5)]

Additional predicates can be used to filter rows on the Kudu server.

>>> scanner = table.scanner()
>>> scanner.add_comparison_predicate("id", kudu.GREATER_EQUAL, 1)
>>> scanner.add_comparison_predicate("id", kudu.LESS_EQUAL, 2)
>>> scanner.open()
>>> batch = scanner.read_all()
>>> batch.as_tuples()
[(1, 'Burger', 'Berlin', 5.1999998092651367, 5), 
 (2, 'Burger', 'New York', 10.199999809265137, 3)]

Documentation for the Python client is still limited, so the best resource at the moment is the Kudu Python Tests. Harsh J. Chouraria has written a similar post on writing a simple Kudu Java API program, and the Kudu examples also provide insights. More information about design decisions can be found in the Kudu Whitepaper.

Using Kudu as a back end for Impala still requires HDFS and the Hive Metastore, but according to Wes McKinney there is work being done to remove these dependencies:

In combination with the Ibis data analysis framework, Impala/Kudu looks like a promising back end for a non-Java/JVM-based distributed data frame.