diff --git a/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java b/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java index ebd6a8225f6bb1ce81fb1165b9a684b9d0ed5f2b..02827cadb5411bba60e3e0f1b1faa164bd8e73d6 100644 --- a/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java +++ b/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java @@ -69,98 +69,136 @@ public class KuduYCSBClient extends com.yahoo.ycsb.DB { private static final Status TIMEOUT = new Status("TIMEOUT", "The operation timed out."); private static final int MAX_TABLETS = 9000; private static final long DEFAULT_SLEEP = 60000; + private static final int DEFAULT_NUM_CLIENTS = 1; + private static final int DEFAULT_NUM_REPLICAS = 3; + private static final String SYNC_OPS_OPT = "kudu_sync_ops"; + private static final String BUFFER_NUM_OPS_OPT = "kudu_buffer_num_ops"; private static final String PRE_SPLIT_NUM_TABLETS_OPT = "kudu_pre_split_num_tablets"; private static final String TABLE_NUM_REPLICAS = "kudu_table_num_replicas"; private static final String BLOCK_SIZE_OPT = "kudu_block_size"; private static final String MASTER_ADDRESSES_OPT = "kudu_master_addresses"; + private static final String NUM_CLIENTS_OPT = "kudu_num_clients"; + private static final int BLOCK_SIZE_DEFAULT = 4096; + private static final int BUFFER_NUM_OPS_DEFAULT = 2000; private static final List<String> COLUMN_NAMES = new ArrayList<>(); - private static KuduClient client; - private static Schema schema; + + private static List<KuduClient> clients = new ArrayList<>(); + private static int clientRoundRobin = 0; + private static boolean tableSetup = false; + private KuduClient client; + private Schema schema; + private String tableName; private KuduSession session; private KuduTable kuduTable; @Override public void init() throws DBException { - String tableName = getProperties().getProperty(TABLENAME_PROPERTY, TABLENAME_PROPERTY_DEFAULT); - initClient(tableName, getProperties()); + this.tableName = getProperties().getProperty(TABLENAME_PROPERTY, TABLENAME_PROPERTY_DEFAULT); + initClient(); this.session = client.newSession(); if (getProperties().getProperty(SYNC_OPS_OPT) != null && getProperties().getProperty(SYNC_OPS_OPT).equals("false")) { this.session.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_BACKGROUND); - this.session.setMutationBufferSpace(100); + this.session.setMutationBufferSpace( + getIntFromProp(getProperties(), BUFFER_NUM_OPS_OPT, BUFFER_NUM_OPS_DEFAULT)); } else { this.session.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_SYNC); } try { this.kuduTable = client.openTable(tableName); + this.schema = kuduTable.getSchema(); } catch (Exception e) { throw new DBException("Could not open a table because of:", e); } } - private static synchronized void initClient(String tableName, - Properties prop) throws DBException { - if (client != null) { - return; - } + /** + * Initialize the 'clients' member with the configured number of + * clients. + */ + private void initClients() throws DBException { + synchronized (KuduYCSBClient.class) { + if (!clients.isEmpty()) { + return; + } - String masterAddresses = prop.getProperty(MASTER_ADDRESSES_OPT); - if (masterAddresses == null) { - masterAddresses = "localhost:7051"; - } + Properties prop = getProperties(); - int numTablets = getIntFromProp(prop, PRE_SPLIT_NUM_TABLETS_OPT, 4); - if (numTablets > MAX_TABLETS) { - throw new DBException(String.format( - "Specified number of tablets (%s) must be equal or below %s", numTablets, MAX_TABLETS)); - } + String masterAddresses = prop.getProperty(MASTER_ADDRESSES_OPT, + "localhost:7051"); + LOG.debug("Connecting to the masters at {}", masterAddresses); - int numReplicas = getIntFromProp(prop, TABLE_NUM_REPLICAS, 3); - int blockSize = getIntFromProp(prop, BLOCK_SIZE_OPT, BLOCK_SIZE_DEFAULT); - - client = new KuduClient.KuduClientBuilder(masterAddresses) - .defaultSocketReadTimeoutMs(DEFAULT_SLEEP) - .defaultOperationTimeoutMs(DEFAULT_SLEEP) - .defaultAdminOperationTimeoutMs(DEFAULT_SLEEP) - .build(); - LOG.debug("Connecting to the masters at {}", masterAddresses); - - int fieldCount = getIntFromProp(prop, CoreWorkload.FIELD_COUNT_PROPERTY, - Integer.parseInt(CoreWorkload.FIELD_COUNT_PROPERTY_DEFAULT)); - - List<ColumnSchema> columns = new ArrayList<>(fieldCount + 1); - - ColumnSchema keyColumn = new ColumnSchema.ColumnSchemaBuilder(KEY, STRING) - .key(true) - .desiredBlockSize(blockSize) - .build(); - columns.add(keyColumn); - COLUMN_NAMES.add(KEY); - for (int i = 0; i < fieldCount; i++) { - String name = "field" + i; - COLUMN_NAMES.add(name); - columns.add(new ColumnSchema.ColumnSchemaBuilder(name, STRING) - .desiredBlockSize(blockSize) + int numClients = getIntFromProp(prop, NUM_CLIENTS_OPT, DEFAULT_NUM_CLIENTS); + for (int i = 0; i < numClients; i++) { + clients.add(new KuduClient.KuduClientBuilder(masterAddresses) + .defaultSocketReadTimeoutMs(DEFAULT_SLEEP) + .defaultOperationTimeoutMs(DEFAULT_SLEEP) + .defaultAdminOperationTimeoutMs(DEFAULT_SLEEP) .build()); + } + } + } + + private void initClient() throws DBException { + initClients(); + synchronized (clients) { + client = clients.get(clientRoundRobin++ % clients.size()); } - schema = new Schema(columns); + setupTable(); + } - CreateTableOptions builder = new CreateTableOptions(); - builder.setRangePartitionColumns(new ArrayList<String>()); - List<String> hashPartitionColumns = new ArrayList<>(); - hashPartitionColumns.add(KEY); - builder.addHashPartitions(hashPartitionColumns, numTablets); - builder.setNumReplicas(numReplicas); - try { - client.createTable(tableName, schema, builder); - } catch (Exception e) { - if (!e.getMessage().contains("already exists")) { - throw new DBException("Couldn't create the table", e); + private void setupTable() throws DBException { + Properties prop = getProperties(); + synchronized (KuduYCSBClient.class) { + if (tableSetup) { + return; + } + int numTablets = getIntFromProp(prop, PRE_SPLIT_NUM_TABLETS_OPT, 4); + if (numTablets > MAX_TABLETS) { + throw new DBException("Specified number of tablets (" + numTablets + + ") must be equal " + "or below " + MAX_TABLETS); + } + int numReplicas = getIntFromProp(prop, TABLE_NUM_REPLICAS, DEFAULT_NUM_REPLICAS); + int blockSize = getIntFromProp(prop, BLOCK_SIZE_OPT, BLOCK_SIZE_DEFAULT); + int fieldCount = getIntFromProp(prop, CoreWorkload.FIELD_COUNT_PROPERTY, + Integer.parseInt(CoreWorkload.FIELD_COUNT_PROPERTY_DEFAULT)); + + List<ColumnSchema> columns = new ArrayList<ColumnSchema>(fieldCount + 1); + + ColumnSchema keyColumn = new ColumnSchema.ColumnSchemaBuilder(KEY, STRING) + .key(true) + .desiredBlockSize(blockSize) + .build(); + columns.add(keyColumn); + COLUMN_NAMES.add(KEY); + for (int i = 0; i < fieldCount; i++) { + String name = "field" + i; + COLUMN_NAMES.add(name); + columns.add(new ColumnSchema.ColumnSchemaBuilder(name, STRING) + .desiredBlockSize(blockSize) + .build()); + } + schema = new Schema(columns); + + CreateTableOptions builder = new CreateTableOptions(); + builder.setRangePartitionColumns(new ArrayList<String>()); + List<String> hashPartitionColumns = new ArrayList<>(); + hashPartitionColumns.add(KEY); + builder.addHashPartitions(hashPartitionColumns, numTablets); + builder.setNumReplicas(numReplicas); + + try { + client.createTable(tableName, schema, builder); + } catch (Exception e) { + if (!e.getMessage().contains("already exists")) { + throw new DBException("Couldn't create the table", e); + } } + tableSetup = true; } } @@ -277,9 +315,9 @@ public class KuduYCSBClient extends com.yahoo.ycsb.DB { row.addString(KEY, key); for (int i = 1; i < schema.getColumnCount(); i++) { String columnName = schema.getColumnByIndex(i).getName(); - if (values.containsKey(columnName)) { - String value = values.get(columnName).toString(); - row.addString(columnName, value); + ByteIterator b = values.get(columnName); + if (b != null) { + row.addStringUtf8(columnName, b.toArray()); } } apply(update); @@ -292,7 +330,7 @@ public class KuduYCSBClient extends com.yahoo.ycsb.DB { PartialRow row = insert.getRow(); row.addString(KEY, key); for (int i = 1; i < schema.getColumnCount(); i++) { - row.addString(i, values.get(schema.getColumnByIndex(i).getName()).toString()); + row.addStringUtf8(i, values.get(schema.getColumnByIndex(i).getName()).toArray()); } apply(insert); return Status.OK; diff --git a/pom.xml b/pom.xml index af13d1b156342eceb224b3351ea3bd738b71e808..98a413052aeef391daa2899e8d36a6ceb314f9c8 100644 --- a/pom.xml +++ b/pom.xml @@ -93,7 +93,7 @@ LICENSE file. <hbase20.version>2.0.0-beta-2</hbase20.version> <hypertable.version>0.9.5.6</hypertable.version> <infinispan.version>7.2.2.Final</infinispan.version> - <kudu.version>1.1.0</kudu.version> + <kudu.version>1.6.0</kudu.version> <maprhbase.version>1.1.8-mapr-1710</maprhbase.version> <!--<mapkeeper.version>1.0</mapkeeper.version>--> <mongodb.version>3.6.3</mongodb.version>