diff --git a/bin/ycsb b/bin/ycsb
index 5782bdb01e8ece8ea880d6e423f541d2d8fd556c..a3769f18bd030338cf96285041aa607b93870590 100755
--- a/bin/ycsb
+++ b/bin/ycsb
@@ -62,6 +62,7 @@ DATABASES = {
     "infinispan-cs": "com.yahoo.ycsb.db.InfinispanRemoteClient",
     "infinispan"   : "com.yahoo.ycsb.db.InfinispanClient",
     "jdbc"         : "com.yahoo.ycsb.db.JdbcDBClient",
+    "kudu"         : "com.yahoo.ycsb.db.KuduYCSBClient",
     "mapkeeper"    : "com.yahoo.ycsb.db.MapKeeperClient",
     "mongodb"      : "com.yahoo.ycsb.db.MongoDbClient",
     "mongodb-async": "com.yahoo.ycsb.db.AsyncMongoDbClient",
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 6e5c0bb8b22289fa159ec4eec509231bf8f384eb..aaf8c6f577046fe7a8faee4f3db80eff9407510f 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -104,6 +104,11 @@ LICENSE file.
       <artifactId>jdbc-binding</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>com.yahoo.ycsb</groupId>
+      <artifactId>kudu-binding</artifactId>
+      <version>${project.version}</version>
+    </dependency>
     <dependency>
       <groupId>com.yahoo.ycsb</groupId>
       <artifactId>mongodb-binding</artifactId>
diff --git a/kudu/README.md b/kudu/README.md
new file mode 100644
index 0000000000000000000000000000000000000000..cd5cffd6387a0d92a8327638a1a1d72c1f70d949
--- /dev/null
+++ b/kudu/README.md
@@ -0,0 +1,44 @@
+<!--
+Copyright (c) 2015 YCSB contributors. All rights reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License"); you
+may not use this file except in compliance with the License. You
+may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+implied. See the License for the specific language governing
+permissions and limitations under the License. See accompanying
+LICENSE file.
+-->
+
+# Kudu bindings for YCSB
+
+[Kudu](http://getkudu.io) is a storage engine that enables fast analytics on fast data.
+
+## Benchmarking Kudu
+
+Use the following command line to load the initial data into an existing Kudu cluster with default
+configurations.
+
+```
+bin/ycsb load kudu -P workloads/workloada
+```
+
+Additional configurations:
+* `kudu_master_addresses`: The master's address. The default configuration expects a master on localhost.
+* `kudu_pre_split_num_tablets`: The number of tablets (or partitions) to create for the table. The default
+uses 4 tablets. A good rule of thumb is to use 5 per tablet server.
+* `kudu_table_num_replicas`: The number of replicas that each tablet will have. The default is 3. Should
+only be configured to use 1 instead, for single node tests.
+* `kudu_sync_ops`: If the client should wait after every write operation. The default is true.
+* `kudu_block_size`: The data block size used to configure columns. The default is 4096 bytes.
+
+Then, you can run the workload:
+
+```
+bin/ycsb run kudu -P workloads/workloada
+```
diff --git a/kudu/pom.xml b/kudu/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..0cce21bee3e8bca07edca33ae0be363af2008a3b
--- /dev/null
+++ b/kudu/pom.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Copyright (c) 2015 YCSB contributors. All rights reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License"); you
+may not use this file except in compliance with the License. You
+may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+implied. See the License for the specific language governing
+permissions and limitations under the License. See accompanying
+LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>com.yahoo.ycsb</groupId>
+    <artifactId>binding-parent</artifactId>
+    <version>0.5.0-SNAPSHOT</version>
+    <relativePath>../binding-parent</relativePath>
+  </parent>
+
+  <artifactId>kudu-binding</artifactId>
+  <name>Kudu DB Binding</name>
+  <packaging>jar</packaging>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.kududb</groupId>
+      <artifactId>kudu-client</artifactId>
+      <version>${kudu.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.yahoo.ycsb</groupId>
+      <artifactId>core</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+  <repositories>
+    <repository>
+      <releases>
+        <enabled>true</enabled>
+      </releases>
+      <snapshots>
+        <enabled>false</enabled>
+      </snapshots>
+      <id>cloudera-repo</id>
+      <name>Cloudera Releases</name>
+      <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
+    </repository>
+  </repositories>
+</project>
diff --git a/kudu/src/main/conf/log4j.properties b/kudu/src/main/conf/log4j.properties
new file mode 100644
index 0000000000000000000000000000000000000000..7317ad1cf75bdf3b19f45d725662dee6ed0b9bf5
--- /dev/null
+++ b/kudu/src/main/conf/log4j.properties
@@ -0,0 +1,24 @@
+#
+# Copyright (c) 2015 YCSB contributors. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you
+# may not use this file except in compliance with the License. You
+# may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied. See the License for the specific language governing
+# permissions and limitations under the License. See accompanying
+# LICENSE file.
+#
+# Enables getting logs from the client.
+
+log4j.rootLogger = INFO, out
+log4j.appender.out = org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout = org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n
+
+log4j.logger.kudu = INFO
diff --git a/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java b/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java
new file mode 100644
index 0000000000000000000000000000000000000000..9e65407a826507d11da6c4b447ed494f56270bc5
--- /dev/null
+++ b/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java
@@ -0,0 +1,320 @@
+/**
+ * Copyright (c) 2015 YCSB contributors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package com.yahoo.ycsb.db;
+
+import com.stumbleupon.async.TimeoutException;
+import com.yahoo.ycsb.ByteIterator;
+import com.yahoo.ycsb.DBException;
+import com.yahoo.ycsb.StringByteIterator;
+import com.yahoo.ycsb.workloads.CoreWorkload;
+import org.kududb.ColumnSchema;
+import org.kududb.Schema;
+import org.kududb.client.*;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.Vector;
+
+import static org.kududb.Type.STRING;
+
+/**
+ * Kudu client for YCSB framework
+ * Example to load:
+ * $ ./bin/ycsb load kudu -P workloads/workloada  -threads 5
+ * Example to run:
+ * ./bin/ycsb run kudu -P workloads/workloada -p kudu_sync_ops=true -threads 5
+ *
+ */
+public class KuduYCSBClient extends com.yahoo.ycsb.DB {
+  public static final String KEY = "key";
+  public static final int OK = 0;
+  public static final int SERVER_ERROR = -1;
+  public static final int NO_MATCHING_RECORD = -2;
+  public static final int TIMEOUT = -3;
+  public static final int MAX_TABLETS = 9000;
+  public static final long DEFAULT_SLEEP = 60000;
+  private static final String SYNC_OPS_OPT = "kudu_sync_ops";
+  private static final String DEBUG_OPT = "kudu_debug";
+  private static final String PRINT_ROW_ERRORS_OPT = "kudu_print_row_errors";
+  private static final String PRE_SPLIT_NUM_TABLETS_OPT = "kudu_pre_split_num_tablets";
+  private static final String TABLE_NUM_REPLICAS = "kudu_table_num_replicas";
+  private static final String BLOCK_SIZE_OPT = "kudu_block_size";
+  private static final String MASTER_ADDRESSES_OPT = "kudu_master_addresses";
+  private static final int BLOCK_SIZE_DEFAULT = 4096;
+  private static final List<String> columnNames = new ArrayList<String>();
+  private static KuduClient client;
+  private static Schema schema;
+  private static int fieldCount;
+  private boolean debug = false;
+  private boolean printErrors = false;
+  private String tableName;
+  private KuduSession session;
+  private KuduTable table;
+
+  @Override
+  public void init() throws DBException {
+    if (getProperties().getProperty(DEBUG_OPT) != null) {
+      this.debug = getProperties().getProperty(DEBUG_OPT).equals("true");
+    }
+    if (getProperties().getProperty(PRINT_ROW_ERRORS_OPT) != null) {
+      this.printErrors = getProperties().getProperty(PRINT_ROW_ERRORS_OPT).equals("true");
+    }
+    if (getProperties().getProperty(PRINT_ROW_ERRORS_OPT) != null) {
+      this.printErrors = getProperties().getProperty(PRINT_ROW_ERRORS_OPT).equals("true");
+    }
+    this.tableName = com.yahoo.ycsb.workloads.CoreWorkload.table;
+    initClient(debug, tableName, getProperties());
+    this.session = client.newSession();
+    if (getProperties().getProperty(SYNC_OPS_OPT) != null &&
+        getProperties().getProperty(SYNC_OPS_OPT).equals("false")) {
+      this.session.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_BACKGROUND);
+      this.session.setMutationBufferSpace(100);
+    } else {
+      this.session.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_SYNC);
+    }
+
+    try {
+      this.table = client.openTable(tableName);
+    } catch (Exception e) {
+      throw new DBException("Could not open a table because of:", e);
+    }
+  }
+
+  private synchronized static void initClient(boolean debug, String tableName, Properties prop)
+      throws DBException {
+    if (client != null) return;
+
+    String masterAddresses = prop.getProperty(MASTER_ADDRESSES_OPT);
+    if (masterAddresses == null) {
+      masterAddresses = "localhost:7051";
+    }
+
+    int numTablets = getIntFromProp(prop, PRE_SPLIT_NUM_TABLETS_OPT, 4);
+    if (numTablets > MAX_TABLETS) {
+      throw new DBException("Specified number of tablets (" + numTablets + ") must be equal " +
+          "or below " + MAX_TABLETS);
+    }
+
+    int numReplicas = getIntFromProp(prop, TABLE_NUM_REPLICAS, 3);
+
+    int blockSize = getIntFromProp(prop, BLOCK_SIZE_OPT, BLOCK_SIZE_DEFAULT);
+
+    client = new KuduClient.KuduClientBuilder(masterAddresses)
+        .defaultSocketReadTimeoutMs(DEFAULT_SLEEP)
+        .defaultOperationTimeoutMs(DEFAULT_SLEEP)
+        .build();
+    if (debug) {
+      System.out.println("Connecting to the masters at " + masterAddresses);
+    }
+
+    fieldCount = getIntFromProp(prop, CoreWorkload.FIELD_COUNT_PROPERTY,
+        Integer.parseInt(CoreWorkload.FIELD_COUNT_PROPERTY_DEFAULT));
+
+    List<ColumnSchema> columns = new ArrayList<ColumnSchema>(fieldCount + 1);
+
+    ColumnSchema keyColumn = new ColumnSchema.ColumnSchemaBuilder(KEY, STRING)
+                             .key(true)
+                             .desiredBlockSize(blockSize)
+                             .build();
+    columns.add(keyColumn);
+    columnNames.add(KEY);
+    for (int i = 0; i < fieldCount; i++) {
+      String name = "field" + i;
+      columnNames.add(name);
+      columns.add(new ColumnSchema.ColumnSchemaBuilder(name, STRING)
+                  .desiredBlockSize(blockSize)
+                  .build());
+    }
+    schema = new Schema(columns);
+
+    CreateTableBuilder builder = new CreateTableBuilder();
+    builder.setNumReplicas(numReplicas);
+    // create n-1 split keys, which will end up being n tablets master-side
+    for (int i = 1; i < numTablets + 0; i++) {
+      // We do +1000 since YCSB starts at user1.
+      int startKeyInt = (MAX_TABLETS / numTablets * i) + 1000;
+      String startKey = String.format("%04d", startKeyInt);
+      PartialRow splitRow = schema.newPartialRow();
+      splitRow.addString(0, "user" + startKey);
+      builder.addSplitRow(splitRow);
+    }
+
+    try {
+      client.createTable(tableName, schema, builder);
+    } catch (Exception e) {
+      if (!e.getMessage().contains("ALREADY_PRESENT")) {
+        throw new DBException("Couldn't create the table", e);
+      }
+    }
+  }
+
+  private static int getIntFromProp(Properties prop, String propName, int defaultValue)
+      throws DBException {
+    String intStr = prop.getProperty(propName);
+    if (intStr == null) {
+      return defaultValue;
+    } else {
+      try {
+        return Integer.valueOf(intStr);
+      } catch (NumberFormatException ex) {
+        throw new DBException("Provided number for " + propName + " isn't a valid integer");
+      }
+    }
+  }
+
+  @Override
+  public void cleanup() throws DBException {
+    try {
+      this.session.close();
+    } catch (Exception e) {
+      throw new DBException("Couldn't cleanup the session", e);
+    }
+  }
+
+  @Override
+  public int read(String table, String key, Set<String> fields,
+                  HashMap<String,ByteIterator> result) {
+    Vector<HashMap<String, ByteIterator>> results = new Vector<HashMap<String, ByteIterator>>();
+    int ret = scan(table, key, 1, fields, results);
+    if (ret != OK) return ret;
+    if (results.size() != 1) return NO_MATCHING_RECORD;
+    result.putAll(results.firstElement());
+    return OK;
+  }
+
+  @Override
+  public int scan(String table, String startkey, int recordcount, Set<String> fields,
+                  Vector<HashMap<String, ByteIterator>> result) {
+    try {
+      KuduScanner.KuduScannerBuilder scannerBuilder = client.newScannerBuilder(this.table);
+      List<String> querySchema;
+      if (fields == null) {
+        querySchema = columnNames;
+        // No need to set the projected columns with the whole schema.
+      } else {
+        querySchema = new ArrayList<String>(fields);
+        scannerBuilder.setProjectedColumnNames(querySchema);
+      }
+
+      PartialRow lowerBound = schema.newPartialRow();
+      lowerBound.addString(0, startkey);
+      scannerBuilder.lowerBound(lowerBound);
+      if (recordcount == 1) {
+        PartialRow upperBound = schema.newPartialRow();
+        // Keys are fixed length, just adding something at the end is safe.
+        upperBound.addString(0, startkey.concat(" "));
+        scannerBuilder.exclusiveUpperBound(upperBound);
+      }
+
+      KuduScanner scanner = scannerBuilder
+          .limit(recordcount) // currently noop
+          .build();
+
+      while (scanner.hasMoreRows()) {
+        RowResultIterator data = scanner.nextRows();
+        addAllRowsToResult(data, recordcount, querySchema, result);
+        if (recordcount == result.size()) break;
+      }
+      RowResultIterator closer = scanner.close();
+      addAllRowsToResult(closer, recordcount, querySchema, result);
+    } catch (TimeoutException te) {
+      if (printErrors) {
+        System.err.println("Waited too long for a scan operation with start key=" + startkey);
+      }
+      return TIMEOUT;
+    } catch (Exception e) {
+      System.err.println("Unexpected exception " + e);
+      e.printStackTrace();
+      return SERVER_ERROR;
+    }
+    return OK;
+  }
+
+  private void addAllRowsToResult(RowResultIterator it, int recordcount,
+                                  List<String> querySchema,
+                                  Vector<HashMap<String, ByteIterator>> result)
+      throws Exception {
+    RowResult row;
+    HashMap<String, ByteIterator> rowResult = new HashMap<String, ByteIterator>(querySchema.size());
+    if (it == null) return;
+    while (it.hasNext()) {
+      if (result.size() == recordcount) return;
+      row = it.next();
+      int colIdx = 0;
+      for (String col : querySchema) {
+        rowResult.put(col, new StringByteIterator(row.getString(colIdx)));
+        colIdx++;
+      }
+      result.add(rowResult);
+    }
+  }
+
+  @Override
+  public int update(String table, String key, HashMap<String, ByteIterator> values) {
+    Update update = this.table.newUpdate();
+    PartialRow row = update.getRow();
+    row.addString(KEY, key);
+    for (int i = 1; i < schema.getColumnCount(); i++) {
+      String columnName = schema.getColumnByIndex(i).getName();
+      if (values.containsKey(columnName)) {
+        String value = values.get(columnName).toString();
+        row.addString(columnName, value);
+      }
+    }
+    apply(update);
+    return OK;
+  }
+
+  @Override
+  public int insert(String table, String key, HashMap<String, ByteIterator> values) {
+    Insert insert = this.table.newInsert();
+    PartialRow row = insert.getRow();
+    row.addString(KEY, key);
+    for (int i = 1; i < schema.getColumnCount(); i++) {
+      row.addString(i, new String(values.get(schema.getColumnByIndex(i).getName()).toArray()));
+    }
+    apply(insert);
+    return OK;
+  }
+
+  @Override
+  public int delete(String table, String key) {
+    Delete delete = this.table.newDelete();
+    PartialRow row = delete.getRow();
+    row.addString(KEY, key);
+    apply(delete);
+    return OK;
+  }
+
+  private void apply(Operation op) {
+    try {
+      OperationResponse response = session.apply(op);
+      if (response != null && response.hasRowError() && printErrors) {
+        System.err.println("Got a row error " + response.getRowError());
+      }
+    } catch (Exception ex) {
+      if (printErrors) {
+        System.err.println("Failed to apply an operation " + ex.toString());
+        ex.printStackTrace();
+      }
+    }
+  }
+}
diff --git a/pom.xml b/pom.xml
index cf5f5bc09eb4d44ad951809e363ae0b997793cd6..5e71d9837daba411d8e4b694d6c0c1ebee0322d0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,6 +77,7 @@ LICENSE file.
     <cassandra.cql.version>1.0.3</cassandra.cql.version>
     <gemfire.version>8.1.0</gemfire.version>
     <infinispan.version>7.2.2.Final</infinispan.version>
+    <kudu.version>0.5.0</kudu.version>
     <openjpa.jdbc.version>2.1.1</openjpa.jdbc.version>
     <!--<mapkeeper.version>1.0</mapkeeper.version>-->
     <mongodb.version>3.0.3</mongodb.version>
@@ -111,6 +112,7 @@ LICENSE file.
     <module>hypertable</module>
     <module>infinispan</module>
     <module>jdbc</module>
+    <module>kudu</module>
     <!--<module>mapkeeper</module>-->
     <module>mongodb</module>
     <!--module>nosqldb</module-->