Getting Started with the Cloudera Kudu Storage Engine in Python
Cloudera Kudu is a new distributed storage engine for fast data analytics. It has been developed to bridge the gap between the Hadoop HDFS/Parquet combination, which provides fast data analytics and excels at scanning large amounts of data but lacks support for fast random reads/writes and updates, and HBase, which allows fast random reads/writes and updates but lacks fast analytical performance.
For more insight, watch the talk Kudu: Resolving transactional and analytic trade-offs in Hadoop from Todd Lipcon or listen to the O’Reilly Data Show podcast with Ben Lorica and Todd Lipcon.
Kudu can be used stand-alone or as an additional storage engine for Impala. Support for Apache Spark is planned, and there is a prototype by Ted Malaska on GitHub.
The Kudu Python client is still considered alpha but already usable. A simple way to get started is to use the Kudu Quickstart VM for VirtualBox, but precompiled binaries for RHEL and Ubuntu are also available. I also managed to compile Kudu from source on a Debian box without much trouble.
The first step is to connect to the tablet server and create a new table with a Kudu Schema. The basic data types int, float, double, bool, and UTF-8–encoded strings are available. The timestamp type is not yet supported by the Python client:
import kudu
client = kudu.Client("127.0.0.1:7051")
table_name = "sales"
if client.table_exists(table_name):
client.delete_table(table_name)
cols = [kudu.ColumnSchema.create("id", kudu.INT64),
kudu.ColumnSchema.create("product_name", kudu.STRING),
kudu.ColumnSchema.create("location_name", kudu.STRING),
kudu.ColumnSchema.create("price", kudu.FLOAT),
kudu.ColumnSchema.create("quantity", kudu.INT32),
]
schema = kudu.schema_from_list(cols, 1)
client.create_table(table_name, schema)
To interact with a table, you must open it. Basic metadata is also available:
>>> table = client.open_table(table_name)
>>> for i in range(table.num_columns):
print table.column(i)
...
ColumnSchema(name=id, type=int64, nullable=False)
ColumnSchema(name=product_name, type=string, nullable=False)
ColumnSchema(name=location_name, type=string, nullable=False)
ColumnSchema(name=price, type=float, nullable=False)
ColumnSchema(name=quantity, type=int32, nullable=False)
Inserts are done through a session object:
session = client.new_session()
op = table.insert()
op["id"] = 0
op["product_name"] = "Steak"
op["location_name"] = "Berlin"
op["price"] = 25.0
op["quantity"] = 1
session.apply(op)
session.flush()
records = [{"id": 1, "product_name": "Burger", "location_name": "Berlin",
"price": 5.2, "quantity": 5},
{"id": 2, "product_name": "Burger", "location_name": "New York",
"price": 10.2, "quantity": 3},
{"id": 3, "product_name": "Steak", "location_name": "New York",
"price": 42.5, "quantity": 5}]
for r in records:
op = table.insert()
for k, v in r.items():
op[k] = v
session.apply(op)
session.flush()
To query data, open a table scanner:
>>> scanner = table.scanner()
>>> scanner.open()
>>> batch = scanner.read_all()
>>> batch.as_tuples()
[(0, 'Steak', 'Berlin', 25.0, 1),
(1, 'Burger', 'Berlin', 5.1999998092651367, 5),
(2, 'Burger', 'New York', 10.199999809265137, 3),
(3, 'Steak', 'New York', 42.5, 5)]
Additional predicates can be used to filter rows on the Kudu server.
>>> scanner = table.scanner()
>>> scanner.add_comparison_predicate("id", kudu.GREATER_EQUAL, 1)
>>> scanner.add_comparison_predicate("id", kudu.LESS_EQUAL, 2)
>>> scanner.open()
>>> batch = scanner.read_all()
>>> batch.as_tuples()
[(1, 'Burger', 'Berlin', 5.1999998092651367, 5),
(2, 'Burger', 'New York', 10.199999809265137, 3)]
Documentation for the Python client is still limited, so the best resource at the moment is the Kudu Python Tests. Harsh J. Chouraria has written a similar post on writing a simple Kudu Java API program, and the Kudu examples also provide insights. More information about design decisions can be found in the Kudu Whitepaper.
Using Kudu as a back end for Impala still requires HDFS and the Hive Metastore, but according to Wes McKinney there is work being done to remove these dependencies:
@sampullara @tlipcon it's being worked on. Impala presently depends on HDFS and Hive metastore but is capable of reading from local FS too
— Wes McKinney (@wesmckinn) September 30, 2015
In combination with the Ibis data analysis framework, Impala/Kudu looks like a promising back end for a non-Java/JVM-based distributed data frame.
