Skip to content
Snippets Groups Projects
Commit 70590bbb authored by Dan Burkert's avatar Dan Burkert Committed by Kevin Risden
Browse files

[kudu] Update bindings (#879)

[kudu] Update Kudu binding to 1.1.0, use slf4j for logging, improve table partitioning
parent b83dd7ee
No related branches found
No related tags found
No related merge requests found
<!--
Copyright (c) 2015 YCSB contributors. All rights reserved.
Copyright (c) 2015-2016 YCSB contributors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You
......@@ -17,25 +17,30 @@ LICENSE file.
# Kudu bindings for YCSB
[Kudu](http://getkudu.io) is a storage engine that enables fast analytics on fast data.
[Apache Kudu](https://kudu.apache.org) is a storage engine that enables fast
analytics on fast data.
## Benchmarking Kudu
Use the following command line to load the initial data into an existing Kudu cluster with default
configurations.
Use the following command line to load the initial data into an existing Kudu
cluster with default configurations.
```
bin/ycsb load kudu -P workloads/workloada
```
Additional configurations:
* `kudu_master_addresses`: The master's address. The default configuration expects a master on localhost.
* `kudu_pre_split_num_tablets`: The number of tablets (or partitions) to create for the table. The default
uses 4 tablets. A good rule of thumb is to use 5 per tablet server.
* `kudu_table_num_replicas`: The number of replicas that each tablet will have. The default is 3. Should
only be configured to use 1 instead, for single node tests.
* `kudu_sync_ops`: If the client should wait after every write operation. The default is true.
* `kudu_block_size`: The data block size used to configure columns. The default is 4096 bytes.
* `kudu_master_addresses`: The master's address. The default configuration
expects a master on localhost.
* `kudu_pre_split_num_tablets`: The number of tablets (or partitions) to create
for the table. The default uses 4 tablets. A good rule of thumb is to use 5
per tablet server.
* `kudu_table_num_replicas`: The number of replicas that each tablet will have.
The default is 3. Should only be configured to use 1 instead, for single node tests.
* `kudu_sync_ops`: If the client should wait after every write operation. The
default is true.
* `kudu_block_size`: The data block size used to configure columns. The default
is 4096 bytes.
Then, you can run the workload:
......@@ -45,12 +50,12 @@ bin/ycsb run kudu -P workloads/workloada
## Using a previous client version
If you wish to use a different Kudu client version than the one shipped with YCSB, you can specify on the
command line with `-Dkudu.version=x`. For example:
If you wish to use a different Kudu client version than the one shipped with
YCSB, you can specify on the command line with `-Dkudu.version=x`. For example:
```
mvn -pl com.yahoo.ycsb:kudu-binding -am package -DskipTests -Dkudu.version=0.7.1
mvn -pl com.yahoo.ycsb:kudu-binding -am package -DskipTests -Dkudu.version=1.0.1
```
Note that prior to 1.0, Kudu doesn't guarantee wire or API compability between versions and only the latest
one is officially supported.
Note that only versions since 1.0 are supported, since Kudu did not guarantee
wire or API compatibility prior to 1.0.
......@@ -31,7 +31,7 @@ LICENSE file.
<dependencies>
<dependency>
<groupId>org.kududb</groupId>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>${kudu.version}</version>
</dependency>
......@@ -41,18 +41,15 @@ LICENSE file.
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
</dependencies>
<repositories>
<repository>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
<id>cloudera-repo</id>
<name>Cloudera Releases</name>
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
</repository>
</repositories>
</project>
/**
* Copyright (c) 2015 YCSB contributors. All rights reserved.
* Copyright (c) 2015-2016 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
......@@ -23,9 +23,12 @@ import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.Status;
import com.yahoo.ycsb.StringByteIterator;
import com.yahoo.ycsb.workloads.CoreWorkload;
import org.kududb.ColumnSchema;
import org.kududb.Schema;
import org.kududb.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.client.*;
import java.util.ArrayList;
import java.util.HashMap;
......@@ -34,67 +37,51 @@ import java.util.Properties;
import java.util.Set;
import java.util.Vector;
import static org.kududb.Type.STRING;
import static org.apache.kudu.Type.STRING;
import static org.apache.kudu.client.KuduPredicate.ComparisonOp.EQUAL;
import static org.apache.kudu.client.KuduPredicate.ComparisonOp.GREATER_EQUAL;
/**
* Kudu client for YCSB framework. Example to load: <blockquote>
*
*
* <pre>
* <code>
* $ ./bin/ycsb load kudu -P workloads/workloada -threads 5
* $ ./bin/ycsb load kudu -P workloads/workloada -threads 5
* </code>
* </pre>
*
*
* </blockquote> Example to run: <blockquote>
*
*
* <pre>
* <code>
* ./bin/ycsb run kudu -P workloads/workloada -p kudu_sync_ops=true -threads 5
* </code>
* </pre>
*
*
* </blockquote>
*/
public class KuduYCSBClient extends com.yahoo.ycsb.DB {
public static final String KEY = "key";
public static final Status TIMEOUT =
new Status("TIMEOUT", "The operation timed out.");
public static final int MAX_TABLETS = 9000;
public static final long DEFAULT_SLEEP = 60000;
private static final Logger LOG = LoggerFactory.getLogger(KuduYCSBClient.class);
private static final String KEY = "key";
private static final Status TIMEOUT = new Status("TIMEOUT", "The operation timed out.");
private static final int MAX_TABLETS = 9000;
private static final long DEFAULT_SLEEP = 60000;
private static final String SYNC_OPS_OPT = "kudu_sync_ops";
private static final String DEBUG_OPT = "kudu_debug";
private static final String PRINT_ROW_ERRORS_OPT = "kudu_print_row_errors";
private static final String PRE_SPLIT_NUM_TABLETS_OPT =
"kudu_pre_split_num_tablets";
private static final String PRE_SPLIT_NUM_TABLETS_OPT = "kudu_pre_split_num_tablets";
private static final String TABLE_NUM_REPLICAS = "kudu_table_num_replicas";
private static final String BLOCK_SIZE_OPT = "kudu_block_size";
private static final String MASTER_ADDRESSES_OPT = "kudu_master_addresses";
private static final int BLOCK_SIZE_DEFAULT = 4096;
private static final List<String> COLUMN_NAMES = new ArrayList<String>();
private static final List<String> COLUMN_NAMES = new ArrayList<>();
private static KuduClient client;
private static Schema schema;
private static int fieldCount;
private boolean debug = false;
private boolean printErrors = false;
private String tableName;
private KuduSession session;
private KuduTable kuduTable;
@Override
public void init() throws DBException {
if (getProperties().getProperty(DEBUG_OPT) != null) {
this.debug = getProperties().getProperty(DEBUG_OPT).equals("true");
}
if (getProperties().getProperty(PRINT_ROW_ERRORS_OPT) != null) {
this.printErrors =
getProperties().getProperty(PRINT_ROW_ERRORS_OPT).equals("true");
}
if (getProperties().getProperty(PRINT_ROW_ERRORS_OPT) != null) {
this.printErrors =
getProperties().getProperty(PRINT_ROW_ERRORS_OPT).equals("true");
}
this.tableName = com.yahoo.ycsb.workloads.CoreWorkload.table;
initClient(debug, tableName, getProperties());
String tableName = CoreWorkload.table;
initClient(tableName, getProperties());
this.session = client.newSession();
if (getProperties().getProperty(SYNC_OPS_OPT) != null
&& getProperties().getProperty(SYNC_OPS_OPT).equals("false")) {
......@@ -111,8 +98,8 @@ public class KuduYCSBClient extends com.yahoo.ycsb.DB {
}
}
private static synchronized void initClient(boolean debug, String tableName,
Properties prop) throws DBException {
private static synchronized void initClient(String tableName,
Properties prop) throws DBException {
if (client != null) {
return;
}
......@@ -124,65 +111,59 @@ public class KuduYCSBClient extends com.yahoo.ycsb.DB {
int numTablets = getIntFromProp(prop, PRE_SPLIT_NUM_TABLETS_OPT, 4);
if (numTablets > MAX_TABLETS) {
throw new DBException("Specified number of tablets (" + numTablets
+ ") must be equal " + "or below " + MAX_TABLETS);
throw new DBException(String.format(
"Specified number of tablets (%s) must be equal or below %s", numTablets, MAX_TABLETS));
}
int numReplicas = getIntFromProp(prop, TABLE_NUM_REPLICAS, 3);
int blockSize = getIntFromProp(prop, BLOCK_SIZE_OPT, BLOCK_SIZE_DEFAULT);
client = new KuduClient.KuduClientBuilder(masterAddresses)
.defaultSocketReadTimeoutMs(DEFAULT_SLEEP)
.defaultOperationTimeoutMs(DEFAULT_SLEEP)
.defaultAdminOperationTimeoutMs(DEFAULT_SLEEP).build();
if (debug) {
System.out.println("Connecting to the masters at " + masterAddresses);
}
.defaultSocketReadTimeoutMs(DEFAULT_SLEEP)
.defaultOperationTimeoutMs(DEFAULT_SLEEP)
.defaultAdminOperationTimeoutMs(DEFAULT_SLEEP)
.build();
LOG.debug("Connecting to the masters at {}", masterAddresses);
fieldCount = getIntFromProp(prop, CoreWorkload.FIELD_COUNT_PROPERTY,
Integer.parseInt(CoreWorkload.FIELD_COUNT_PROPERTY_DEFAULT));
int fieldCount = getIntFromProp(prop, CoreWorkload.FIELD_COUNT_PROPERTY,
Integer.parseInt(CoreWorkload.FIELD_COUNT_PROPERTY_DEFAULT));
List<ColumnSchema> columns = new ArrayList<ColumnSchema>(fieldCount + 1);
List<ColumnSchema> columns = new ArrayList<>(fieldCount + 1);
ColumnSchema keyColumn = new ColumnSchema.ColumnSchemaBuilder(KEY, STRING)
.key(true).desiredBlockSize(blockSize).build();
.key(true)
.desiredBlockSize(blockSize)
.build();
columns.add(keyColumn);
COLUMN_NAMES.add(KEY);
for (int i = 0; i < fieldCount; i++) {
String name = "field" + i;
COLUMN_NAMES.add(name);
columns.add(new ColumnSchema.ColumnSchemaBuilder(name, STRING)
.desiredBlockSize(blockSize).build());
.desiredBlockSize(blockSize)
.build());
}
schema = new Schema(columns);
CreateTableOptions builder = new CreateTableOptions();
List<String> rangePartitionColumns = new ArrayList<>(1);
rangePartitionColumns.add(KEY);
builder.setRangePartitionColumns(rangePartitionColumns);
builder.setRangePartitionColumns(new ArrayList<String>());
List<String> hashPartitionColumns = new ArrayList<>();
hashPartitionColumns.add(KEY);
builder.addHashPartitions(hashPartitionColumns, numTablets);
builder.setNumReplicas(numReplicas);
// create n-1 split keys, which will end up being n tablets master-side
for (int i = 1; i < numTablets + 0; i++) {
// We do +1000 since YCSB starts at user1.
int startKeyInt = (MAX_TABLETS / numTablets * i) + 1000;
String startKey = String.format("%04d", startKeyInt);
PartialRow splitRow = schema.newPartialRow();
splitRow.addString(0, "user" + startKey);
builder.addSplitRow(splitRow);
}
try {
client.createTable(tableName, schema, builder);
} catch (Exception e) {
if (!e.getMessage().contains("ALREADY_PRESENT")) {
if (!e.getMessage().contains("already exists")) {
throw new DBException("Couldn't create the table", e);
}
}
}
private static int getIntFromProp(Properties prop, String propName,
int defaultValue) throws DBException {
private static int getIntFromProp(Properties prop,
String propName,
int defaultValue) throws DBException {
String intStr = prop.getProperty(propName);
if (intStr == null) {
return defaultValue;
......@@ -190,8 +171,7 @@ public class KuduYCSBClient extends com.yahoo.ycsb.DB {
try {
return Integer.valueOf(intStr);
} catch (NumberFormatException ex) {
throw new DBException(
"Provided number for " + propName + " isn't a valid integer");
throw new DBException("Provided number for " + propName + " isn't a valid integer");
}
}
}
......@@ -206,10 +186,11 @@ public class KuduYCSBClient extends com.yahoo.ycsb.DB {
}
@Override
public Status read(String table, String key, Set<String> fields,
HashMap<String, ByteIterator> result) {
Vector<HashMap<String, ByteIterator>> results =
new Vector<HashMap<String, ByteIterator>>();
public Status read(String table,
String key,
Set<String> fields,
HashMap<String, ByteIterator> result) {
Vector<HashMap<String, ByteIterator>> results = new Vector<>();
final Status status = scan(table, key, 1, fields, results);
if (!status.equals(Status.OK)) {
return status;
......@@ -222,11 +203,13 @@ public class KuduYCSBClient extends com.yahoo.ycsb.DB {
}
@Override
public Status scan(String table, String startkey, int recordcount,
Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
public Status scan(String table,
String startkey,
int recordcount,
Set<String> fields,
Vector<HashMap<String, ByteIterator>> result) {
try {
KuduScanner.KuduScannerBuilder scannerBuilder =
client.newScannerBuilder(this.kuduTable);
KuduScanner.KuduScannerBuilder scannerBuilder = client.newScannerBuilder(kuduTable);
List<String> querySchema;
if (fields == null) {
querySchema = COLUMN_NAMES;
......@@ -236,16 +219,10 @@ public class KuduYCSBClient extends com.yahoo.ycsb.DB {
scannerBuilder.setProjectedColumnNames(querySchema);
}
PartialRow lowerBound = schema.newPartialRow();
lowerBound.addString(0, startkey);
scannerBuilder.lowerBound(lowerBound);
if (recordcount == 1) {
PartialRow upperBound = schema.newPartialRow();
upperBound.addString(0, startkey + '\0');
scannerBuilder.exclusiveUpperBound(upperBound);
}
ColumnSchema column = schema.getColumnByIndex(0);
KuduPredicate.ComparisonOp predicateOp = recordcount == 1 ? EQUAL : GREATER_EQUAL;
KuduPredicate predicate = KuduPredicate.newComparisonPredicate(column, predicateOp, startkey);
scannerBuilder.addPredicate(predicate);
scannerBuilder.limit(recordcount); // currently noop
KuduScanner scanner = scannerBuilder.build();
......@@ -260,25 +237,21 @@ public class KuduYCSBClient extends com.yahoo.ycsb.DB {
RowResultIterator closer = scanner.close();
addAllRowsToResult(closer, recordcount, querySchema, result);
} catch (TimeoutException te) {
if (printErrors) {
System.err.println(
"Waited too long for a scan operation with start key=" + startkey);
}
LOG.info("Waited too long for a scan operation with start key={}", startkey);
return TIMEOUT;
} catch (Exception e) {
System.err.println("Unexpected exception " + e);
e.printStackTrace();
LOG.warn("Unexpected exception", e);
return Status.ERROR;
}
return Status.OK;
}
private void addAllRowsToResult(RowResultIterator it, int recordcount,
List<String> querySchema, Vector<HashMap<String, ByteIterator>> result)
throws Exception {
private void addAllRowsToResult(RowResultIterator it,
int recordcount,
List<String> querySchema,
Vector<HashMap<String, ByteIterator>> result) throws Exception {
RowResult row;
HashMap<String, ByteIterator> rowResult =
new HashMap<String, ByteIterator>(querySchema.size());
HashMap<String, ByteIterator> rowResult = new HashMap<>(querySchema.size());
if (it == null) {
return;
}
......@@ -297,8 +270,7 @@ public class KuduYCSBClient extends com.yahoo.ycsb.DB {
}
@Override
public Status update(String table, String key,
HashMap<String, ByteIterator> values) {
public Status update(String table, String key, HashMap<String, ByteIterator> values) {
Update update = this.kuduTable.newUpdate();
PartialRow row = update.getRow();
row.addString(KEY, key);
......@@ -314,14 +286,12 @@ public class KuduYCSBClient extends com.yahoo.ycsb.DB {
}
@Override
public Status insert(String table, String key,
HashMap<String, ByteIterator> values) {
public Status insert(String table, String key, HashMap<String, ByteIterator> values) {
Insert insert = this.kuduTable.newInsert();
PartialRow row = insert.getRow();
row.addString(KEY, key);
for (int i = 1; i < schema.getColumnCount(); i++) {
row.addString(i, new String(
values.get(schema.getColumnByIndex(i).getName()).toArray()));
row.addString(i, values.get(schema.getColumnByIndex(i).getName()).toString());
}
apply(insert);
return Status.OK;
......@@ -339,14 +309,11 @@ public class KuduYCSBClient extends com.yahoo.ycsb.DB {
private void apply(Operation op) {
try {
OperationResponse response = session.apply(op);
if (response != null && response.hasRowError() && printErrors) {
System.err.println("Got a row error " + response.getRowError());
}
} catch (Exception ex) {
if (printErrors) {
System.err.println("Failed to apply an operation " + ex.toString());
ex.printStackTrace();
if (response != null && response.hasRowError()) {
LOG.info("Write operation failed: {}", response.getRowError());
}
} catch (KuduException ex) {
LOG.warn("Write operation failed", ex);
}
}
}
/*
* Copyright (c) 2014, Yahoo!, Inc. All rights reserved.
/**
* Copyright (c) 2015-2016 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
......@@ -16,7 +16,7 @@
*/
/**
* The YCSB binding for <a href="http://getkudu.io/">Kudu</a>.
* The YCSB binding for <a href="http://kudu.apache.org/">Apache Kudu</a>.
*/
package com.yahoo.ycsb.db;
# Root logger option
log4j.rootLogger=DEBUG, stderr
log4j.logger.com.stumbleupon.async=WARN
log4j.logger.org.apache.kudu=WARN
# Direct log messages to stderr
log4j.appender.stderr = org.apache.log4j.ConsoleAppender
log4j.appender.stderr.Target=System.err
log4j.appender.stderr.layout = org.apache.log4j.PatternLayout
log4j.appender.stderr.layout.ConversionPattern = [%p] %d{HH:mm:ss.SSS} (%F:%L) %m%n
......@@ -77,7 +77,7 @@ LICENSE file.
<geode.version>1.0.0-incubating.M3</geode.version>
<googlebigtable.version>0.2.3</googlebigtable.version>
<infinispan.version>7.2.2.Final</infinispan.version>
<kudu.version>0.9.0</kudu.version>
<kudu.version>1.1.0</kudu.version>
<openjpa.jdbc.version>2.1.1</openjpa.jdbc.version>
<!--<mapkeeper.version>1.0</mapkeeper.version>-->
<mongodb.version>3.0.3</mongodb.version>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment