Skip to content
Snippets Groups Projects
Commit 336a173e authored by Todd Lipcon's avatar Todd Lipcon Committed by Sean Busbey
Browse files

[kudu] Kudu binding update (#1098)

* Make Kudu buffer size configurable
* update to latest client and avoid string encoding
* support using multiple KuduClient instances
parent 3b6059bb
No related branches found
No related tags found
No related merge requests found
......@@ -69,98 +69,136 @@ public class KuduYCSBClient extends com.yahoo.ycsb.DB {
private static final Status TIMEOUT = new Status("TIMEOUT", "The operation timed out.");
private static final int MAX_TABLETS = 9000;
private static final long DEFAULT_SLEEP = 60000;
private static final int DEFAULT_NUM_CLIENTS = 1;
private static final int DEFAULT_NUM_REPLICAS = 3;
private static final String SYNC_OPS_OPT = "kudu_sync_ops";
private static final String BUFFER_NUM_OPS_OPT = "kudu_buffer_num_ops";
private static final String PRE_SPLIT_NUM_TABLETS_OPT = "kudu_pre_split_num_tablets";
private static final String TABLE_NUM_REPLICAS = "kudu_table_num_replicas";
private static final String BLOCK_SIZE_OPT = "kudu_block_size";
private static final String MASTER_ADDRESSES_OPT = "kudu_master_addresses";
private static final String NUM_CLIENTS_OPT = "kudu_num_clients";
private static final int BLOCK_SIZE_DEFAULT = 4096;
private static final int BUFFER_NUM_OPS_DEFAULT = 2000;
private static final List<String> COLUMN_NAMES = new ArrayList<>();
private static KuduClient client;
private static Schema schema;
private static List<KuduClient> clients = new ArrayList<>();
private static int clientRoundRobin = 0;
private static boolean tableSetup = false;
private KuduClient client;
private Schema schema;
private String tableName;
private KuduSession session;
private KuduTable kuduTable;
@Override
public void init() throws DBException {
String tableName = getProperties().getProperty(TABLENAME_PROPERTY, TABLENAME_PROPERTY_DEFAULT);
initClient(tableName, getProperties());
this.tableName = getProperties().getProperty(TABLENAME_PROPERTY, TABLENAME_PROPERTY_DEFAULT);
initClient();
this.session = client.newSession();
if (getProperties().getProperty(SYNC_OPS_OPT) != null
&& getProperties().getProperty(SYNC_OPS_OPT).equals("false")) {
this.session.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_BACKGROUND);
this.session.setMutationBufferSpace(100);
this.session.setMutationBufferSpace(
getIntFromProp(getProperties(), BUFFER_NUM_OPS_OPT, BUFFER_NUM_OPS_DEFAULT));
} else {
this.session.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_SYNC);
}
try {
this.kuduTable = client.openTable(tableName);
this.schema = kuduTable.getSchema();
} catch (Exception e) {
throw new DBException("Could not open a table because of:", e);
}
}
private static synchronized void initClient(String tableName,
Properties prop) throws DBException {
if (client != null) {
return;
}
/**
* Initialize the 'clients' member with the configured number of
* clients.
*/
private void initClients() throws DBException {
synchronized (KuduYCSBClient.class) {
if (!clients.isEmpty()) {
return;
}
String masterAddresses = prop.getProperty(MASTER_ADDRESSES_OPT);
if (masterAddresses == null) {
masterAddresses = "localhost:7051";
}
Properties prop = getProperties();
int numTablets = getIntFromProp(prop, PRE_SPLIT_NUM_TABLETS_OPT, 4);
if (numTablets > MAX_TABLETS) {
throw new DBException(String.format(
"Specified number of tablets (%s) must be equal or below %s", numTablets, MAX_TABLETS));
}
String masterAddresses = prop.getProperty(MASTER_ADDRESSES_OPT,
"localhost:7051");
LOG.debug("Connecting to the masters at {}", masterAddresses);
int numReplicas = getIntFromProp(prop, TABLE_NUM_REPLICAS, 3);
int blockSize = getIntFromProp(prop, BLOCK_SIZE_OPT, BLOCK_SIZE_DEFAULT);
client = new KuduClient.KuduClientBuilder(masterAddresses)
.defaultSocketReadTimeoutMs(DEFAULT_SLEEP)
.defaultOperationTimeoutMs(DEFAULT_SLEEP)
.defaultAdminOperationTimeoutMs(DEFAULT_SLEEP)
.build();
LOG.debug("Connecting to the masters at {}", masterAddresses);
int fieldCount = getIntFromProp(prop, CoreWorkload.FIELD_COUNT_PROPERTY,
Integer.parseInt(CoreWorkload.FIELD_COUNT_PROPERTY_DEFAULT));
List<ColumnSchema> columns = new ArrayList<>(fieldCount + 1);
ColumnSchema keyColumn = new ColumnSchema.ColumnSchemaBuilder(KEY, STRING)
.key(true)
.desiredBlockSize(blockSize)
.build();
columns.add(keyColumn);
COLUMN_NAMES.add(KEY);
for (int i = 0; i < fieldCount; i++) {
String name = "field" + i;
COLUMN_NAMES.add(name);
columns.add(new ColumnSchema.ColumnSchemaBuilder(name, STRING)
.desiredBlockSize(blockSize)
int numClients = getIntFromProp(prop, NUM_CLIENTS_OPT, DEFAULT_NUM_CLIENTS);
for (int i = 0; i < numClients; i++) {
clients.add(new KuduClient.KuduClientBuilder(masterAddresses)
.defaultSocketReadTimeoutMs(DEFAULT_SLEEP)
.defaultOperationTimeoutMs(DEFAULT_SLEEP)
.defaultAdminOperationTimeoutMs(DEFAULT_SLEEP)
.build());
}
}
}
private void initClient() throws DBException {
initClients();
synchronized (clients) {
client = clients.get(clientRoundRobin++ % clients.size());
}
schema = new Schema(columns);
setupTable();
}
CreateTableOptions builder = new CreateTableOptions();
builder.setRangePartitionColumns(new ArrayList<String>());
List<String> hashPartitionColumns = new ArrayList<>();
hashPartitionColumns.add(KEY);
builder.addHashPartitions(hashPartitionColumns, numTablets);
builder.setNumReplicas(numReplicas);
try {
client.createTable(tableName, schema, builder);
} catch (Exception e) {
if (!e.getMessage().contains("already exists")) {
throw new DBException("Couldn't create the table", e);
private void setupTable() throws DBException {
Properties prop = getProperties();
synchronized (KuduYCSBClient.class) {
if (tableSetup) {
return;
}
int numTablets = getIntFromProp(prop, PRE_SPLIT_NUM_TABLETS_OPT, 4);
if (numTablets > MAX_TABLETS) {
throw new DBException("Specified number of tablets (" + numTablets
+ ") must be equal " + "or below " + MAX_TABLETS);
}
int numReplicas = getIntFromProp(prop, TABLE_NUM_REPLICAS, DEFAULT_NUM_REPLICAS);
int blockSize = getIntFromProp(prop, BLOCK_SIZE_OPT, BLOCK_SIZE_DEFAULT);
int fieldCount = getIntFromProp(prop, CoreWorkload.FIELD_COUNT_PROPERTY,
Integer.parseInt(CoreWorkload.FIELD_COUNT_PROPERTY_DEFAULT));
List<ColumnSchema> columns = new ArrayList<ColumnSchema>(fieldCount + 1);
ColumnSchema keyColumn = new ColumnSchema.ColumnSchemaBuilder(KEY, STRING)
.key(true)
.desiredBlockSize(blockSize)
.build();
columns.add(keyColumn);
COLUMN_NAMES.add(KEY);
for (int i = 0; i < fieldCount; i++) {
String name = "field" + i;
COLUMN_NAMES.add(name);
columns.add(new ColumnSchema.ColumnSchemaBuilder(name, STRING)
.desiredBlockSize(blockSize)
.build());
}
schema = new Schema(columns);
CreateTableOptions builder = new CreateTableOptions();
builder.setRangePartitionColumns(new ArrayList<String>());
List<String> hashPartitionColumns = new ArrayList<>();
hashPartitionColumns.add(KEY);
builder.addHashPartitions(hashPartitionColumns, numTablets);
builder.setNumReplicas(numReplicas);
try {
client.createTable(tableName, schema, builder);
} catch (Exception e) {
if (!e.getMessage().contains("already exists")) {
throw new DBException("Couldn't create the table", e);
}
}
tableSetup = true;
}
}
......@@ -277,9 +315,9 @@ public class KuduYCSBClient extends com.yahoo.ycsb.DB {
row.addString(KEY, key);
for (int i = 1; i < schema.getColumnCount(); i++) {
String columnName = schema.getColumnByIndex(i).getName();
if (values.containsKey(columnName)) {
String value = values.get(columnName).toString();
row.addString(columnName, value);
ByteIterator b = values.get(columnName);
if (b != null) {
row.addStringUtf8(columnName, b.toArray());
}
}
apply(update);
......@@ -292,7 +330,7 @@ public class KuduYCSBClient extends com.yahoo.ycsb.DB {
PartialRow row = insert.getRow();
row.addString(KEY, key);
for (int i = 1; i < schema.getColumnCount(); i++) {
row.addString(i, values.get(schema.getColumnByIndex(i).getName()).toString());
row.addStringUtf8(i, values.get(schema.getColumnByIndex(i).getName()).toArray());
}
apply(insert);
return Status.OK;
......
......@@ -93,7 +93,7 @@ LICENSE file.
<hbase20.version>2.0.0-beta-2</hbase20.version>
<hypertable.version>0.9.5.6</hypertable.version>
<infinispan.version>7.2.2.Final</infinispan.version>
<kudu.version>1.1.0</kudu.version>
<kudu.version>1.6.0</kudu.version>
<maprhbase.version>1.1.8-mapr-1710</maprhbase.version>
<!--<mapkeeper.version>1.0</mapkeeper.version>-->
<mongodb.version>3.6.3</mongodb.version>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment