diff --git a/.travis.yml b/.travis.yml index 43693e0fde13579987b465dd59ce28b55ecefcdf..a34a295a70c7aab459368db2979a121ccc86e205 100644 --- a/.travis.yml +++ b/.travis.yml @@ -22,8 +22,8 @@ language: java jdk: - oraclejdk8 - oraclejdk7 - - openjdk7 - + - openjdk7 + install: mvn install -q -DskipTests=true script: mvn test -q @@ -31,7 +31,8 @@ script: mvn test -q # Services to start for tests. services: - mongodb + - riak # Use the Container based infrastructure. -sudo: false +sudo: false \ No newline at end of file diff --git a/asynchbase/README.md b/asynchbase/README.md new file mode 100644 index 0000000000000000000000000000000000000000..1a300c9bd2358e7aec03d09f0c51d36232b039b2 --- /dev/null +++ b/asynchbase/README.md @@ -0,0 +1,59 @@ +<!-- +Copyright (c) 2016 YCSB contributors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); you +may not use this file except in compliance with the License. You +may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied. See the License for the specific language governing +permissions and limitations under the License. See accompanying +LICENSE file. +--> + +# AsyncHBase Driver for YCSB + +This driver provides a YCSB workload binding for Apache HBase using an alternative to the included HBase client. AsyncHBase is completely asynchronous for all operations and is particularly useful for write heavy workloads. Note that it supports a subset of the HBase client APIs but supports all public released versions of HBase. + +## Quickstart + +### 1. Setup Hbase + +Follow directions 1 to 3 from ``hbase098``'s readme. + +### 2. Load a Workload + +Switch to the root of the YCSB repo and choose the workload you want to run and `load` it first. With the CLI you must provide the column family at a minimum if HBase is running on localhost. Otherwise you must provide connection properties via CLI or the path to a config file. Additional configuration parameters are available below. + +``` +bin/ycsb load asynchbase -p columnfamily=cf -P workloads/workloada + +``` + +The `load` step only executes inserts into the datastore. After loading data, run the same workload to mix reads with writes. + +``` +bin/ycsb run asynchbase -p columnfamily=cf -P workloads/workloada + +``` + +## Configuration Options + +The following options can be configured using CLI (using the `-p` parameter) or via a JAVA style properties configuration file.. Check the [AsyncHBase Configuration](http://opentsdb.github.io/asynchbase/docs/build/html/configuration.html) project for additional tuning parameters. + +* `columnfamily`: (Required) The column family to target. +* `config`: Optional full path to a configuration file with AsyncHBase options. +* `hbase.zookeeper.quorum`: Zookeeper quorum list. +* `hbase.zookeeper.znode.parent`: Path used by HBase in Zookeeper. Default is "/hbase". +* `debug`: If true, prints debug information to standard out. The default is false. +* `clientbuffering`: Whether or not to use client side buffering and batching of write operations. This can significantly improve performance and defaults to true. +* `durable`: When set to false, writes and deletes bypass the WAL for quicker responses. Default is true. +* `jointimeout`: A timeout value, in milliseconds, for waiting on operations synchronously before an error is thrown. +* `prefetchmeta`: Whether or not to read meta for all regions in the table and connect to the proper region servers before starting operations. Defaults to false. + + +Note: This module includes some Google Guava source files from version 12 that were later removed but are still required by HBase's test modules for setting up the mini cluster during integration testing. \ No newline at end of file diff --git a/asynchbase/pom.xml b/asynchbase/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..cf4919075ad9c9e536bff405cd45705c7d56b9a0 --- /dev/null +++ b/asynchbase/pom.xml @@ -0,0 +1,105 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Copyright (c) 2016 YCSB contributors. All rights reserved. +Licensed under the Apache License, Version 2.0 (the "License"); you +may not use this file except in compliance with the License. You +may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied. See the License for the specific language governing +permissions and limitations under the License. See accompanying +LICENSE file. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>com.yahoo.ycsb</groupId> + <artifactId>binding-parent</artifactId> + <version>0.9.0-SNAPSHOT</version> + <relativePath>../binding-parent/</relativePath> + </parent> + + <artifactId>asynchbase-binding</artifactId> + <name>AsyncHBase Client Binding for Apache HBase</name> + + <dependencies> + <dependency> + <groupId>org.hbase</groupId> + <artifactId>asynchbase</artifactId> + <version>${asynchbase.version}</version> + </dependency> + + <dependency> + <groupId>com.yahoo.ycsb</groupId> + <artifactId>core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <version>3.4.5</version> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>jline</groupId> + <artifactId>jline</artifactId> + </exclusion> + <exclusion> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </exclusion> + <exclusion> + <groupId>org.jboss.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-testing-util</artifactId> + <version>${hbase10.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + <version>${hbase10.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <version>1.2.17</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>log4j-over-slf4j</artifactId> + <version>1.7.7</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> \ No newline at end of file diff --git a/asynchbase/src/main/java/com/yahoo/ycsb/db/AsyncHBaseClient.java b/asynchbase/src/main/java/com/yahoo/ycsb/db/AsyncHBaseClient.java new file mode 100644 index 0000000000000000000000000000000000000000..fddd1a7aef3b9be60178d7d36cb1852b547b8086 --- /dev/null +++ b/asynchbase/src/main/java/com/yahoo/ycsb/db/AsyncHBaseClient.java @@ -0,0 +1,409 @@ +/** + * Copyright (c) 2016 YCSB contributors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package com.yahoo.ycsb.db; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.Set; +import java.util.Vector; + +import org.hbase.async.Bytes; +import org.hbase.async.Config; +import org.hbase.async.DeleteRequest; +import org.hbase.async.GetRequest; +import org.hbase.async.HBaseClient; +import org.hbase.async.KeyValue; +import org.hbase.async.PutRequest; +import org.hbase.async.Scanner; + +import com.yahoo.ycsb.ByteArrayByteIterator; +import com.yahoo.ycsb.ByteIterator; +import com.yahoo.ycsb.DBException; +import com.yahoo.ycsb.Status; + +/** + * Alternative Java client for Apache HBase. + * + * This client provides a subset of the main HBase client and uses a completely + * asynchronous pipeline for all calls. It is particularly useful for write heavy + * workloads. It is also compatible with all production versions of HBase. + */ +public class AsyncHBaseClient extends com.yahoo.ycsb.DB { + public static final Charset UTF8_CHARSET = Charset.forName("UTF8"); + private static final String CLIENT_SIDE_BUFFERING_PROPERTY = "clientbuffering"; + private static final String DURABILITY_PROPERTY = "durability"; + private static final String PREFETCH_META_PROPERTY = "prefetchmeta"; + private static final String CONFIG_PROPERTY = "config"; + private static final String COLUMN_FAMILY_PROPERTY = "columnfamily"; + private static final String JOIN_TIMEOUT_PROPERTY = "jointimeout"; + private static final String JOIN_TIMEOUT_PROPERTY_DEFAULT = "30000"; + + /** Mutex for instantiating a single instance of the client. */ + private static final Object MUTEX = new Object(); + + /** Use for tracking running thread counts so we know when to shutdown the client. */ + private static int threadCount = 0; + + /** The client that's used for all threads. */ + private static HBaseClient client; + + /** Print debug information to standard out. */ + private boolean debug = false; + + /** The column family use for the workload. */ + private byte[] columnFamilyBytes; + + /** Cache for the last table name/ID to avoid byte conversions. */ + private String lastTable = ""; + private byte[] lastTableBytes; + + private long joinTimeout; + + /** Whether or not to bypass the WAL for puts and deletes. */ + private boolean durability = true; + + /** + * If true, buffer mutations on the client. This is the default behavior for + * AsyncHBase. For measuring insert/update/delete latencies, client side + * buffering should be disabled. + * + * A single instance of this + */ + private boolean clientSideBuffering = false; + + @Override + public void init() throws DBException { + if (getProperties().getProperty(CLIENT_SIDE_BUFFERING_PROPERTY, "false") + .toLowerCase().equals("true")) { + clientSideBuffering = true; + } + if (getProperties().getProperty(DURABILITY_PROPERTY, "true") + .toLowerCase().equals("false")) { + durability = false; + } + final String columnFamily = getProperties().getProperty(COLUMN_FAMILY_PROPERTY); + if (columnFamily == null || columnFamily.isEmpty()) { + System.err.println("Error, must specify a columnfamily for HBase table"); + throw new DBException("No columnfamily specified"); + } + columnFamilyBytes = columnFamily.getBytes(); + + if ((getProperties().getProperty("debug") != null) + && (getProperties().getProperty("debug").compareTo("true") == 0)) { + debug = true; + } + + joinTimeout = Integer.parseInt(getProperties().getProperty( + JOIN_TIMEOUT_PROPERTY, JOIN_TIMEOUT_PROPERTY_DEFAULT)); + + final boolean prefetchMeta = getProperties() + .getProperty(PREFETCH_META_PROPERTY, "false") + .toLowerCase().equals("true") ? true : false; + try { + synchronized (MUTEX) { + ++threadCount; + if (client == null) { + final String configPath = getProperties().getProperty(CONFIG_PROPERTY); + final Config config; + if (configPath == null || configPath.isEmpty()) { + config = new Config(); + final Iterator<Entry<Object, Object>> iterator = getProperties() + .entrySet().iterator(); + while (iterator.hasNext()) { + final Entry<Object, Object> property = iterator.next(); + config.overrideConfig((String)property.getKey(), + (String)property.getValue()); + } + } else { + config = new Config(configPath); + } + client = new HBaseClient(config); + + // Terminate right now if table does not exist, since the client + // will not propagate this error upstream once the workload + // starts. + String table = com.yahoo.ycsb.workloads.CoreWorkload.table; + try { + client.ensureTableExists(table).join(joinTimeout); + } catch (InterruptedException e1) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + throw new DBException(e); + } + + if (prefetchMeta) { + try { + if (debug) { + System.out.println("Starting meta prefetch for table " + table); + } + client.prefetchMeta(table).join(joinTimeout); + if (debug) { + System.out.println("Completed meta prefetch for table " + table); + } + } catch (InterruptedException e) { + System.err.println("Interrupted during prefetch"); + Thread.currentThread().interrupt(); + } catch (Exception e) { + throw new DBException("Failed prefetch", e); + } + } + } + } + } catch (IOException e) { + throw new DBException("Failed instantiation of client", e); + } + } + + @Override + public void cleanup() throws DBException { + synchronized (MUTEX) { + --threadCount; + if (client != null && threadCount < 1) { + try { + if (debug) { + System.out.println("Shutting down client"); + } + client.shutdown().joinUninterruptibly(joinTimeout); + } catch (Exception e) { + System.err.println("Failed to shutdown the AsyncHBase client " + + "properly: " + e.getMessage()); + } + client = null; + } + } + } + + @Override + public Status read(String table, String key, Set<String> fields, + HashMap<String, ByteIterator> result) { + setTable(table); + + final GetRequest get = new GetRequest( + lastTableBytes, key.getBytes(), columnFamilyBytes); + if (fields != null) { + get.qualifiers(getQualifierList(fields)); + } + + try { + if (debug) { + System.out.println("Doing read from HBase columnfamily " + + Bytes.pretty(columnFamilyBytes)); + System.out.println("Doing read for key: " + key); + } + + final ArrayList<KeyValue> row = client.get(get).join(joinTimeout); + if (row == null || row.isEmpty()) { + return Status.NOT_FOUND; + } + + // got something so populate the results + for (final KeyValue column : row) { + result.put(new String(column.qualifier()), + // TODO - do we need to clone this array? YCSB may keep it in memory + // for a while which would mean the entire KV would hang out and won't + // be GC'd. + new ByteArrayByteIterator(column.value())); + + if (debug) { + System.out.println( + "Result for field: " + Bytes.pretty(column.qualifier()) + + " is: " + Bytes.pretty(column.value())); + } + } + return Status.OK; + } catch (InterruptedException e) { + System.err.println("Thread interrupted"); + Thread.currentThread().interrupt(); + } catch (Exception e) { + System.err.println("Failure reading from row with key " + key + + ": " + e.getMessage()); + return Status.ERROR; + } + return Status.ERROR; + } + + @Override + public Status scan(String table, String startkey, int recordcount, + Set<String> fields, Vector<HashMap<String, ByteIterator>> result) { + setTable(table); + + final Scanner scanner = client.newScanner(lastTableBytes); + scanner.setFamily(columnFamilyBytes); + scanner.setStartKey(startkey.getBytes(UTF8_CHARSET)); + // No end key... *sniff* + if (fields != null) { + scanner.setQualifiers(getQualifierList(fields)); + } + + // no filters? *sniff* + ArrayList<ArrayList<KeyValue>> rows = null; + try { + int numResults = 0; + while ((rows = scanner.nextRows().join(joinTimeout)) != null) { + for (final ArrayList<KeyValue> row : rows) { + final HashMap<String, ByteIterator> rowResult = + new HashMap<String, ByteIterator>(row.size()); + for (final KeyValue column : row) { + rowResult.put(new String(column.qualifier()), + // TODO - do we need to clone this array? YCSB may keep it in memory + // for a while which would mean the entire KV would hang out and won't + // be GC'd. + new ByteArrayByteIterator(column.value())); + if (debug) { + System.out.println("Got scan result for key: " + + Bytes.pretty(column.key())); + } + } + result.add(rowResult); + numResults++; + + if (numResults >= recordcount) {// if hit recordcount, bail out + break; + } + } + } + scanner.close().join(joinTimeout); + return Status.OK; + } catch (InterruptedException e) { + System.err.println("Thread interrupted"); + Thread.currentThread().interrupt(); + } catch (Exception e) { + System.err.println("Failure reading from row with key " + startkey + + ": " + e.getMessage()); + return Status.ERROR; + } + + return Status.ERROR; + } + + @Override + public Status update(String table, String key, + HashMap<String, ByteIterator> values) { + setTable(table); + + if (debug) { + System.out.println("Setting up put for key: " + key); + } + + final byte[][] qualifiers = new byte[values.size()][]; + final byte[][] byteValues = new byte[values.size()][]; + + int idx = 0; + for (final Entry<String, ByteIterator> entry : values.entrySet()) { + qualifiers[idx] = entry.getKey().getBytes(); + byteValues[idx++] = entry.getValue().toArray(); + if (debug) { + System.out.println("Adding field/value " + entry.getKey() + "/" + + Bytes.pretty(entry.getValue().toArray()) + " to put request"); + } + } + + final PutRequest put = new PutRequest(lastTableBytes, key.getBytes(), + columnFamilyBytes, qualifiers, byteValues); + if (!durability) { + put.setDurable(false); + } + if (!clientSideBuffering) { + put.setBufferable(false); + try { + client.put(put).join(joinTimeout); + } catch (InterruptedException e) { + System.err.println("Thread interrupted"); + Thread.currentThread().interrupt(); + } catch (Exception e) { + System.err.println("Failure reading from row with key " + key + + ": " + e.getMessage()); + return Status.ERROR; + } + } else { + // hooray! Asynchronous write. But without a callback and an async + // YCSB call we don't know whether it succeeded or not + client.put(put); + } + + return Status.OK; + } + + @Override + public Status insert(String table, String key, + HashMap<String, ByteIterator> values) { + return update(table, key, values); + } + + @Override + public Status delete(String table, String key) { + setTable(table); + + if (debug) { + System.out.println("Doing delete for key: " + key); + } + + final DeleteRequest delete = new DeleteRequest( + lastTableBytes, key.getBytes(), columnFamilyBytes); + if (!durability) { + delete.setDurable(false); + } + if (!clientSideBuffering) { + delete.setBufferable(false); + try { + client.delete(delete).join(joinTimeout); + } catch (InterruptedException e) { + System.err.println("Thread interrupted"); + Thread.currentThread().interrupt(); + } catch (Exception e) { + System.err.println("Failure reading from row with key " + key + + ": " + e.getMessage()); + return Status.ERROR; + } + } else { + // hooray! Asynchronous write. But without a callback and an async + // YCSB call we don't know whether it succeeded or not + client.delete(delete); + } + return Status.OK; + } + + /** + * Little helper to set the table byte array. If it's different than the last + * table we reset the byte array. Otherwise we just use the existing array. + * @param table The table we're operating against + */ + private void setTable(final String table) { + if (!lastTable.equals(table)) { + lastTable = table; + lastTableBytes = table.getBytes(); + } + } + + /** + * Little helper to build a qualifier byte array from a field set. + * @param fields The fields to fetch. + * @return The column qualifier byte arrays. + */ + private byte[][] getQualifierList(final Set<String> fields) { + final byte[][] qualifiers = new byte[fields.size()][]; + int idx = 0; + for (final String field : fields) { + qualifiers[idx++] = field.getBytes(); + } + return qualifiers; + } +} \ No newline at end of file diff --git a/asynchbase/src/main/java/com/yahoo/ycsb/db/package-info.java b/asynchbase/src/main/java/com/yahoo/ycsb/db/package-info.java new file mode 100644 index 0000000000000000000000000000000000000000..72faa5016162c0e8d69f866c5d8f8ab0a8929f86 --- /dev/null +++ b/asynchbase/src/main/java/com/yahoo/ycsb/db/package-info.java @@ -0,0 +1,21 @@ +/** + * Copyright (c) 2016 YCSB contributors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +/** + * The YCSB binding for HBase using the AsyncHBase client. + */ +package com.yahoo.ycsb.db; diff --git a/asynchbase/src/test/java/com/google/common/base/Stopwatch.java b/asynchbase/src/test/java/com/google/common/base/Stopwatch.java new file mode 100644 index 0000000000000000000000000000000000000000..4d46924bda1ce1f78dafa50cce93b0f70634b13b --- /dev/null +++ b/asynchbase/src/test/java/com/google/common/base/Stopwatch.java @@ -0,0 +1,278 @@ +/* + * Copyright (C) 2008 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.common.base; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; + +import com.google.common.annotations.Beta; +import com.google.common.annotations.GwtCompatible; +import com.google.common.annotations.GwtIncompatible; + +import java.util.concurrent.TimeUnit; + +/** + * An object that measures elapsed time in nanoseconds. It is useful to measure + * elapsed time using this class instead of direct calls to {@link + * System#nanoTime} for a few reasons: + * + * <ul> + * <li>An alternate time source can be substituted, for testing or performance + * reasons. + * <li>As documented by {@code nanoTime}, the value returned has no absolute + * meaning, and can only be interpreted as relative to another timestamp + * returned by {@code nanoTime} at a different time. {@code Stopwatch} is a + * more effective abstraction because it exposes only these relative values, + * not the absolute ones. + * </ul> + * + * <p>Basic usage: + * <pre> + * Stopwatch stopwatch = Stopwatch.{@link #createStarted createStarted}(); + * doSomething(); + * stopwatch.{@link #stop stop}(); // optional + * + * long millis = stopwatch.elapsed(MILLISECONDS); + * + * log.info("that took: " + stopwatch); // formatted string like "12.3 ms" + * </pre> + * + * <p>Stopwatch methods are not idempotent; it is an error to start or stop a + * stopwatch that is already in the desired state. + * + * <p>When testing code that uses this class, use the {@linkplain + * #Stopwatch(Ticker) alternate constructor} to supply a fake or mock ticker. + * <!-- TODO(kevinb): restore the "such as" --> This allows you to + * simulate any valid behavior of the stopwatch. + * + * <p><b>Note:</b> This class is not thread-safe. + * + * @author Kevin Bourrillion + * @since 10.0 + */ +@Beta +@GwtCompatible(emulated = true) +public final class Stopwatch { + private final Ticker ticker; + private boolean isRunning; + private long elapsedNanos; + private long startTick; + + /** + * Creates (but does not start) a new stopwatch using {@link System#nanoTime} + * as its time source. + * + * @since 15.0 + */ + public static Stopwatch createUnstarted() { + return new Stopwatch(); + } + + /** + * Creates (but does not start) a new stopwatch, using the specified time + * source. + * + * @since 15.0 + */ + public static Stopwatch createUnstarted(Ticker ticker) { + return new Stopwatch(ticker); + } + + /** + * Creates (and starts) a new stopwatch using {@link System#nanoTime} + * as its time source. + * + * @since 15.0 + */ + public static Stopwatch createStarted() { + return new Stopwatch().start(); + } + + /** + * Creates (and starts) a new stopwatch, using the specified time + * source. + * + * @since 15.0 + */ + public static Stopwatch createStarted(Ticker ticker) { + return new Stopwatch(ticker).start(); + } + + /** + * Creates (but does not start) a new stopwatch using {@link System#nanoTime} + * as its time source. + * + * @deprecated Use {@link Stopwatch#createUnstarted()} instead. + */ + @Deprecated + public Stopwatch() { + this(Ticker.systemTicker()); + } + + /** + * Creates (but does not start) a new stopwatch, using the specified time + * source. + * + * @deprecated Use {@link Stopwatch#createUnstarted(Ticker)} instead. + */ + @Deprecated + public Stopwatch(Ticker ticker) { + this.ticker = checkNotNull(ticker, "ticker"); + } + + /** + * Returns {@code true} if {@link #start()} has been called on this stopwatch, + * and {@link #stop()} has not been called since the last call to {@code + * start()}. + */ + public boolean isRunning() { + return isRunning; + } + + /** + * Starts the stopwatch. + * + * @return this {@code Stopwatch} instance + * @throws IllegalStateException if the stopwatch is already running. + */ + public Stopwatch start() { + checkState(!isRunning, "This stopwatch is already running."); + isRunning = true; + startTick = ticker.read(); + return this; + } + + /** + * Stops the stopwatch. Future reads will return the fixed duration that had + * elapsed up to this point. + * + * @return this {@code Stopwatch} instance + * @throws IllegalStateException if the stopwatch is already stopped. + */ + public Stopwatch stop() { + long tick = ticker.read(); + checkState(isRunning, "This stopwatch is already stopped."); + isRunning = false; + elapsedNanos += tick - startTick; + return this; + } + + /** + * Sets the elapsed time for this stopwatch to zero, + * and places it in a stopped state. + * + * @return this {@code Stopwatch} instance + */ + public Stopwatch reset() { + elapsedNanos = 0; + isRunning = false; + return this; + } + + private long elapsedNanos() { + return isRunning ? ticker.read() - startTick + elapsedNanos : elapsedNanos; + } + + /** + * Returns the current elapsed time shown on this stopwatch, expressed + * in the desired time unit, with any fraction rounded down. + * + * <p>Note that the overhead of measurement can be more than a microsecond, so + * it is generally not useful to specify {@link TimeUnit#NANOSECONDS} + * precision here. + * + * @since 14.0 (since 10.0 as {@code elapsedTime()}) + */ + public long elapsed(TimeUnit desiredUnit) { + return desiredUnit.convert(elapsedNanos(), NANOSECONDS); + } + + /** + * Returns the current elapsed time shown on this stopwatch, expressed + * in the desired time unit, with any fraction rounded down. + * + * <p>Note that the overhead of measurement can be more than a microsecond, so + * it is generally not useful to specify {@link TimeUnit#NANOSECONDS} + * precision here. + * + * @deprecated Use {@link Stopwatch#elapsed(TimeUnit)} instead. This method is + * scheduled to be removed in Guava release 16.0. + */ + @Deprecated + public long elapsedTime(TimeUnit desiredUnit) { + return elapsed(desiredUnit); + } + + /** + * Returns the current elapsed time shown on this stopwatch, expressed + * in milliseconds, with any fraction rounded down. This is identical to + * {@code elapsed(TimeUnit.MILLISECONDS)}. + * + * @deprecated Use {@code stopwatch.elapsed(MILLISECONDS)} instead. This + * method is scheduled to be removed in Guava release 16.0. + */ + @Deprecated + public long elapsedMillis() { + return elapsed(MILLISECONDS); + } + + /** + * Returns a string representation of the current elapsed time. + */ + @GwtIncompatible("String.format()") + @Override public String toString() { + long nanos = elapsedNanos(); + + TimeUnit unit = chooseUnit(nanos); + double value = (double) nanos / NANOSECONDS.convert(1, unit); + + // Too bad this functionality is not exposed as a regular method call + return String.format("%.4g %s", value, abbreviate(unit)); + } + + private static TimeUnit chooseUnit(long nanos) { + if (SECONDS.convert(nanos, NANOSECONDS) > 0) { + return SECONDS; + } + if (MILLISECONDS.convert(nanos, NANOSECONDS) > 0) { + return MILLISECONDS; + } + if (MICROSECONDS.convert(nanos, NANOSECONDS) > 0) { + return MICROSECONDS; + } + return NANOSECONDS; + } + + private static String abbreviate(TimeUnit unit) { + switch (unit) { + case NANOSECONDS: + return "ns"; + case MICROSECONDS: + return "\u03bcs"; // μs + case MILLISECONDS: + return "ms"; + case SECONDS: + return "s"; + default: + throw new AssertionError(); + } + } +} \ No newline at end of file diff --git a/asynchbase/src/test/java/com/google/common/io/Closeables.java b/asynchbase/src/test/java/com/google/common/io/Closeables.java new file mode 100644 index 0000000000000000000000000000000000000000..4a92c9c09883709deb66f625d9cd3ade79f9a9a2 --- /dev/null +++ b/asynchbase/src/test/java/com/google/common/io/Closeables.java @@ -0,0 +1,104 @@ +/* + * Copyright (C) 2007 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.common.io; + +import com.google.common.annotations.Beta; +import com.google.common.annotations.VisibleForTesting; + +import java.io.Closeable; +import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.annotation.Nullable; + +/** + * Utility methods for working with {@link Closeable} objects. + * + * @author Michael Lancaster + * @since 1.0 + */ +@Beta +public final class Closeables { + @VisibleForTesting static final Logger logger + = Logger.getLogger(Closeables.class.getName()); + + private Closeables() {} + + /** + * Closes a {@link Closeable}, with control over whether an + * {@code IOException} may be thrown. This is primarily useful in a + * finally block, where a thrown exception needs to be logged but not + * propagated (otherwise the original exception will be lost). + * + * <p>If {@code swallowIOException} is true then we never throw + * {@code IOException} but merely log it. + * + * <p>Example: + * + * <p><pre>public void useStreamNicely() throws IOException { + * SomeStream stream = new SomeStream("foo"); + * boolean threw = true; + * try { + * // Some code which does something with the Stream. May throw a + * // Throwable. + * threw = false; // No throwable thrown. + * } finally { + * // Close the stream. + * // If an exception occurs, only rethrow it if (threw==false). + * Closeables.close(stream, threw); + * } + * </pre> + * + * @param closeable the {@code Closeable} object to be closed, or null, + * in which case this method does nothing + * @param swallowIOException if true, don't propagate IO exceptions + * thrown by the {@code close} methods + * @throws IOException if {@code swallowIOException} is false and + * {@code close} throws an {@code IOException}. + */ + public static void close(@Nullable Closeable closeable, + boolean swallowIOException) throws IOException { + if (closeable == null) { + return; + } + try { + closeable.close(); + } catch (IOException e) { + if (swallowIOException) { + logger.log(Level.WARNING, + "IOException thrown while closing Closeable.", e); + } else { + throw e; + } + } + } + + /** + * Equivalent to calling {@code close(closeable, true)}, but with no + * IOException in the signature. + * @param closeable the {@code Closeable} object to be closed, or null, in + * which case this method does nothing + */ + public static void closeQuietly(@Nullable Closeable closeable) { + try { + close(closeable, true); + } catch (IOException e) { + logger.log(Level.SEVERE, "IOException should not have been thrown.", e); + } + } +} \ No newline at end of file diff --git a/asynchbase/src/test/java/com/google/common/io/LimitInputStream.java b/asynchbase/src/test/java/com/google/common/io/LimitInputStream.java new file mode 100644 index 0000000000000000000000000000000000000000..a529f5e127fc422cf287693f18da105fbe78aacc --- /dev/null +++ b/asynchbase/src/test/java/com/google/common/io/LimitInputStream.java @@ -0,0 +1,104 @@ +/* + * Copyright (C) 2007 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.common.io; + +import com.google.common.annotations.Beta; +import com.google.common.base.Preconditions; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * An InputStream that limits the number of bytes which can be read. + * + * @author Charles Fry + * @since 1.0 + */ +@Beta +public final class LimitInputStream extends FilterInputStream { + + private long left; + private long mark = -1; + + /** + * Wraps another input stream, limiting the number of bytes which can be read. + * + * @param in the input stream to be wrapped + * @param limit the maximum number of bytes to be read + */ + public LimitInputStream(InputStream in, long limit) { + super(in); + Preconditions.checkNotNull(in); + Preconditions.checkArgument(limit >= 0, "limit must be non-negative"); + left = limit; + } + + @Override public int available() throws IOException { + return (int) Math.min(in.available(), left); + } + + @Override public synchronized void mark(int readlimit) { + in.mark(readlimit); + mark = left; + // it's okay to mark even if mark isn't supported, as reset won't work + } + + @Override public int read() throws IOException { + if (left == 0) { + return -1; + } + + int result = in.read(); + if (result != -1) { + --left; + } + return result; + } + + @Override public int read(byte[] b, int off, int len) throws IOException { + if (left == 0) { + return -1; + } + + len = (int) Math.min(len, left); + int result = in.read(b, off, len); + if (result != -1) { + left -= result; + } + return result; + } + + @Override public synchronized void reset() throws IOException { + if (!in.markSupported()) { + throw new IOException("Mark not supported"); + } + if (mark == -1) { + throw new IOException("Mark not set"); + } + + in.reset(); + left = mark; + } + + @Override public long skip(long n) throws IOException { + n = Math.min(n, left); + long skipped = in.skip(n); + left -= skipped; + return skipped; + } +} \ No newline at end of file diff --git a/asynchbase/src/test/java/com/yahoo/ycsb/db/AsyncHBaseTest.java b/asynchbase/src/test/java/com/yahoo/ycsb/db/AsyncHBaseTest.java new file mode 100644 index 0000000000000000000000000000000000000000..29a09a79677c7c42dd1b0a9177d9e24f0bbd0b53 --- /dev/null +++ b/asynchbase/src/test/java/com/yahoo/ycsb/db/AsyncHBaseTest.java @@ -0,0 +1,211 @@ +/** + * Copyright (c) 2016 YCSB contributors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package com.yahoo.ycsb.db; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.junit.Assume.assumeTrue; + +import com.yahoo.ycsb.ByteIterator; +import com.yahoo.ycsb.Status; +import com.yahoo.ycsb.StringByteIterator; +import com.yahoo.ycsb.db.AsyncHBaseClient; +import com.yahoo.ycsb.measurements.Measurements; +import com.yahoo.ycsb.workloads.CoreWorkload; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; +import java.util.Vector; + +/** + * Integration tests for the YCSB AsyncHBase client, using an HBase minicluster. + * These are the same as those for the hbase10 client. + */ +public class AsyncHBaseTest { + + private final static String COLUMN_FAMILY = "cf"; + + private static HBaseTestingUtility testingUtil; + private AsyncHBaseClient client; + private Table table = null; + + private static boolean isWindows() { + final String os = System.getProperty("os.name"); + return os.startsWith("Windows"); + } + + /** + * Creates a mini-cluster for use in these tests. + * + * This is a heavy-weight operation, so invoked only once for the test class. + */ + @BeforeClass + public static void setUpClass() throws Exception { + // Minicluster setup fails on Windows with an UnsatisfiedLinkError. + // Skip if windows. + assumeTrue(!isWindows()); + testingUtil = HBaseTestingUtility.createLocalHTU(); + testingUtil.startMiniCluster(); + } + + /** + * Tears down mini-cluster. + */ + @AfterClass + public static void tearDownClass() throws Exception { + if (testingUtil != null) { + testingUtil.shutdownMiniCluster(); + } + } + + /** + * Sets up the mini-cluster for testing. + * + * We re-create the table for each test. + */ + @Before + public void setUp() throws Exception { + Properties p = new Properties(); + p.setProperty("columnfamily", COLUMN_FAMILY); + + Measurements.setProperties(p); + final CoreWorkload workload = new CoreWorkload(); + workload.init(p); + + table = testingUtil.createTable(TableName.valueOf(CoreWorkload.table), Bytes.toBytes(COLUMN_FAMILY)); + + final String zkQuorum = "127.0.0.1:" + testingUtil.getZkCluster().getClientPort(); + p.setProperty("hbase.zookeeper.quorum", zkQuorum); + client = new AsyncHBaseClient(); + client.setProperties(p); + client.init(); + } + + @After + public void tearDown() throws Exception { + table.close(); + testingUtil.deleteTable(CoreWorkload.table); + } + + @Test + public void testRead() throws Exception { + final String rowKey = "row1"; + final Put p = new Put(Bytes.toBytes(rowKey)); + p.addColumn(Bytes.toBytes(COLUMN_FAMILY), + Bytes.toBytes("column1"), Bytes.toBytes("value1")); + p.addColumn(Bytes.toBytes(COLUMN_FAMILY), + Bytes.toBytes("column2"), Bytes.toBytes("value2")); + table.put(p); + + final HashMap<String, ByteIterator> result = new HashMap<String, ByteIterator>(); + final Status status = client.read(CoreWorkload.table, rowKey, null, result); + assertEquals(Status.OK, status); + assertEquals(2, result.size()); + assertEquals("value1", result.get("column1").toString()); + assertEquals("value2", result.get("column2").toString()); + } + + @Test + public void testReadMissingRow() throws Exception { + final HashMap<String, ByteIterator> result = new HashMap<String, ByteIterator>(); + final Status status = client.read(CoreWorkload.table, "Missing row", null, result); + assertEquals(Status.NOT_FOUND, status); + assertEquals(0, result.size()); + } + + @Test + public void testScan() throws Exception { + // Fill with data + final String colStr = "row_number"; + final byte[] col = Bytes.toBytes(colStr); + final int n = 10; + final List<Put> puts = new ArrayList<Put>(n); + for(int i = 0; i < n; i++) { + final byte[] key = Bytes.toBytes(String.format("%05d", i)); + final byte[] value = java.nio.ByteBuffer.allocate(4).putInt(i).array(); + final Put p = new Put(key); + p.addColumn(Bytes.toBytes(COLUMN_FAMILY), col, value); + puts.add(p); + } + table.put(puts); + + // Test + final Vector<HashMap<String, ByteIterator>> result = + new Vector<HashMap<String, ByteIterator>>(); + + // Scan 5 records, skipping the first + client.scan(CoreWorkload.table, "00001", 5, null, result); + + assertEquals(5, result.size()); + for(int i = 0; i < 5; i++) { + final HashMap<String, ByteIterator> row = result.get(i); + assertEquals(1, row.size()); + assertTrue(row.containsKey(colStr)); + final byte[] bytes = row.get(colStr).toArray(); + final ByteBuffer buf = ByteBuffer.wrap(bytes); + final int rowNum = buf.getInt(); + assertEquals(i + 1, rowNum); + } + } + + @Test + public void testUpdate() throws Exception{ + final String key = "key"; + final HashMap<String, String> input = new HashMap<String, String>(); + input.put("column1", "value1"); + input.put("column2", "value2"); + final Status status = client.insert(CoreWorkload.table, key, StringByteIterator.getByteIteratorMap(input)); + assertEquals(Status.OK, status); + + // Verify result + final Get get = new Get(Bytes.toBytes(key)); + final Result result = this.table.get(get); + assertFalse(result.isEmpty()); + assertEquals(2, result.size()); + for(final java.util.Map.Entry<String, String> entry : input.entrySet()) { + assertEquals(entry.getValue(), + new String(result.getValue(Bytes.toBytes(COLUMN_FAMILY), + Bytes.toBytes(entry.getKey())))); + } + } + + @Test + @Ignore("Not yet implemented") + public void testDelete() { + fail("Not yet implemented"); + } +} + diff --git a/asynchbase/src/test/resources/hbase-site.xml b/asynchbase/src/test/resources/hbase-site.xml new file mode 100644 index 0000000000000000000000000000000000000000..a8b29e451f440ad7c09b3b2f25eebaf56f07e6bb --- /dev/null +++ b/asynchbase/src/test/resources/hbase-site.xml @@ -0,0 +1,34 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Copyright (c) 2016 YCSB contributors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); you +may not use this file except in compliance with the License. You +may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied. See the License for the specific language governing +permissions and limitations under the License. See accompanying +LICENSE file. +--> + +<configuration> + <property> + <name>hbase.master.info.port</name> + <value>-1</value> + <description>The port for the hbase master web UI + Set to -1 if you do not want the info server to run. + </description> + </property> + <property> + <name>hbase.regionserver.info.port</name> + <value>-1</value> + <description>The port for the hbase regionserver web UI + Set to -1 if you do not want the info server to run. + </description> + </property> +</configuration> diff --git a/asynchbase/src/test/resources/log4j.properties b/asynchbase/src/test/resources/log4j.properties new file mode 100644 index 0000000000000000000000000000000000000000..a9df32e044b9374097b9c110a79f35ff34b5a793 --- /dev/null +++ b/asynchbase/src/test/resources/log4j.properties @@ -0,0 +1,28 @@ +# +# Copyright (c) 2015 YCSB contributors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you +# may not use this file except in compliance with the License. You +# may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing +# permissions and limitations under the License. See accompanying +# LICENSE file. +# + +# Root logger option +log4j.rootLogger=WARN, stderr + +log4j.appender.stderr=org.apache.log4j.ConsoleAppender +log4j.appender.stderr.target=System.err +log4j.appender.stderr.layout=org.apache.log4j.PatternLayout +log4j.appender.stderr.layout.conversionPattern=%d{yyyy/MM/dd HH:mm:ss} %-5p %c %x - %m%n + +# Suppress messages from ZKTableStateManager: Creates a large number of table +# state change messages. +log4j.logger.org.apache.hadoop.hbase.zookeeper.ZKTableStateManager=ERROR diff --git a/bin/ycsb b/bin/ycsb index ef2c18598f99926a3131c104c155bb4bdb5be9bf..e85ca43d7052cd2223a65f6f653c7e2237e3f0ac 100755 --- a/bin/ycsb +++ b/bin/ycsb @@ -51,6 +51,7 @@ COMMANDS = { DATABASES = { "accumulo" : "com.yahoo.ycsb.db.accumulo.AccumuloClient", "aerospike" : "com.yahoo.ycsb.db.AerospikeClient", + "asynchbase" : "com.yahoo.ycsb.db.AsyncHBaseClient", "basic" : "com.yahoo.ycsb.BasicDB", "cassandra-7" : "com.yahoo.ycsb.db.CassandraClient7", "cassandra-8" : "com.yahoo.ycsb.db.CassandraClient8", @@ -79,6 +80,7 @@ DATABASES = { "nosqldb" : "com.yahoo.ycsb.db.NoSqlDbClient", "orientdb" : "com.yahoo.ycsb.db.OrientDBClient", "redis" : "com.yahoo.ycsb.db.RedisClient", + "riak" : "com.yahoo.ycsb.db.riak.RiakKVClient", "s3" : "com.yahoo.ycsb.db.S3Client", "solr" : "com.yahoo.ycsb.db.SolrClient", "tarantool" : "com.yahoo.ycsb.db.TarantoolClient", diff --git a/core/src/main/java/com/yahoo/ycsb/Workload.java b/core/src/main/java/com/yahoo/ycsb/Workload.java index eaff108f2eb00142de462eecb1bfe5e5c1a55aff..7bebafbf3f913acea21082535cabf2c3f1a237d5 100644 --- a/core/src/main/java/com/yahoo/ycsb/Workload.java +++ b/core/src/main/java/com/yahoo/ycsb/Workload.java @@ -1,12 +1,12 @@ /** * Copyright (c) 2010 Yahoo! Inc. All rights reserved. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or @@ -28,7 +28,7 @@ import java.util.Properties; * load it dynamically. Any argument-based initialization should be * done by init(). * - * If you extend this class, you should support the "insertstart" property. This + * If you extend this class, you should support the "insertstart" property. This * allows the Client to proceed from multiple clients on different machines, in case * the client is the bottleneck. For example, if we want to load 1 million records from * 2 machines, the first machine should have insertstart=0 and the second insertstart=500000. Additionally, @@ -54,13 +54,13 @@ public abstract class Workload { * Initialize any state for a particular client thread. Since the scenario object * will be shared among all threads, this is the place to create any state that is specific * to one thread. To be clear, this means the returned object should be created anew on each - * call to initThread(); do not return the same object multiple times. - * The returned object will be passed to invocations of doInsert() and doTransaction() + * call to initThread(); do not return the same object multiple times. + * The returned object will be passed to invocations of doInsert() and doTransaction() * for this thread. There should be no side effects from this call; all state should be encapsulated * in the returned object. If you have no state to retain for this thread, return null. (But if you have * no state to retain for this thread, probably you don't need to override initThread().) * - * @return false if the workload knows it is done for this thread. Client will terminate the thread. + * @return false if the workload knows it is done for this thread. Client will terminate the thread. * Return true otherwise. Return true for workloads that rely on operationcount. For workloads that read * traces from a file, return true when there are more to do, false when you are done. */ @@ -73,19 +73,19 @@ public abstract class Workload { */ public void cleanup() throws WorkloadException { } - + /** - * Do one insert operation. Because it will be called concurrently from multiple client threads, this - * function must be thread safe. However, avoid synchronized, or the threads will block waiting for each + * Do one insert operation. Because it will be called concurrently from multiple client threads, this + * function must be thread safe. However, avoid synchronized, or the threads will block waiting for each * other, and it will be difficult to reach the target throughput. Ideally, this function would have no side * effects other than DB operations and mutations on threadstate. Mutations to threadstate do not need to be * synchronized, since each thread has its own threadstate instance. */ public abstract boolean doInsert(DB db, Object threadstate); - + /** - * Do one transaction operation. Because it will be called concurrently from multiple client threads, this - * function must be thread safe. However, avoid synchronized, or the threads will block waiting for each + * Do one transaction operation. Because it will be called concurrently from multiple client threads, this + * function must be thread safe. However, avoid synchronized, or the threads will block waiting for each * other, and it will be difficult to reach the target throughput. Ideally, this function would have no side * effects other than DB operations and mutations on threadstate. Mutations to threadstate do not need to be * synchronized, since each thread has its own threadstate instance. @@ -95,14 +95,14 @@ public abstract class Workload { * traces from a file, return true when there are more to do, false when you are done. */ public abstract boolean doTransaction(DB db, Object threadstate); - + /** * Allows scheduling a request to stop the workload. */ public void requestStop() { stopRequested.set(true); } - + /** * Check the status of the stop request flag. * @return true if stop was requested, false otherwise. diff --git a/core/src/main/java/com/yahoo/ycsb/generator/IncrementingPrintableStringGenerator.java b/core/src/main/java/com/yahoo/ycsb/generator/IncrementingPrintableStringGenerator.java new file mode 100644 index 0000000000000000000000000000000000000000..82406f020a1a64c37b2c2cc0891da046c52120a8 --- /dev/null +++ b/core/src/main/java/com/yahoo/ycsb/generator/IncrementingPrintableStringGenerator.java @@ -0,0 +1,389 @@ +/** + * Copyright (c) 2016 YCSB contributors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package com.yahoo.ycsb.generator; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Set; + +/** + * A generator that produces strings of {@link #length} using a set of code points + * from {@link #characterSet}. Each time {@link #nextValue()} is executed, the string + * is incremented by one character. Eventually the string may rollover to the beginning + * and the user may choose to have the generator throw a NoSuchElementException at that + * point or continue incrementing. (By default the generator will continue incrementing). + * <p> + * For example, if we set a length of 2 characters and the character set includes + * [A, B] then the generator output will be: + * <ul> + * <li>AA</li> + * <li>AB</li> + * <li>BA</li> + * <li>BB</li> + * <li>AA <-- rolled over</li> + * </ul> + * <p> + * This class includes some default character sets to choose from including ASCII + * and plane 0 UTF. + */ +public class IncrementingPrintableStringGenerator extends Generator<String> { + + /** Default string length for the generator. */ + public static final int DEFAULTSTRINGLENGTH = 8; + + /** + * Set of all character types that include every symbol other than non-printable + * control characters. + */ + public static final Set<Integer> CHAR_TYPES_ALL_BUT_CONTROL; + static { + CHAR_TYPES_ALL_BUT_CONTROL = new HashSet<Integer>(24); + // numbers + CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.DECIMAL_DIGIT_NUMBER); + CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.LETTER_NUMBER); + CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.OTHER_NUMBER); + + // letters + CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.UPPERCASE_LETTER); + CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.LOWERCASE_LETTER); + CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.TITLECASE_LETTER); + CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.OTHER_LETTER); + + // marks + CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.COMBINING_SPACING_MARK); + CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.NON_SPACING_MARK); + CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.ENCLOSING_MARK); + + // punctuation + CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.CONNECTOR_PUNCTUATION); + CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.DASH_PUNCTUATION); + CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.START_PUNCTUATION); + CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.END_PUNCTUATION); + CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.INITIAL_QUOTE_PUNCTUATION); + CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.FINAL_QUOTE_PUNCTUATION); + CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.OTHER_PUNCTUATION); + + // symbols + CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.MATH_SYMBOL); + CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.CURRENCY_SYMBOL); + CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.MODIFIER_SYMBOL); + CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.OTHER_SYMBOL); + + // separators + CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.SPACE_SEPARATOR); + CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.LINE_SEPARATOR); + CHAR_TYPES_ALL_BUT_CONTROL.add((int)Character.PARAGRAPH_SEPARATOR); + } + + /** + * Set of character types including only decimals, upper and lower case letters. + */ + public static final Set<Integer> CHAR_TYPES_BASIC_ALPHA; + static { + CHAR_TYPES_BASIC_ALPHA = new HashSet<Integer>(2); + CHAR_TYPES_BASIC_ALPHA.add((int)Character.UPPERCASE_LETTER); + CHAR_TYPES_BASIC_ALPHA.add((int)Character.LOWERCASE_LETTER); + } + + /** + * Set of character types including only decimals, upper and lower case letters. + */ + public static final Set<Integer> CHAR_TYPES_BASIC_ALPHANUMERICS; + static { + CHAR_TYPES_BASIC_ALPHANUMERICS = new HashSet<Integer>(3); + CHAR_TYPES_BASIC_ALPHANUMERICS.add((int)Character.DECIMAL_DIGIT_NUMBER); + CHAR_TYPES_BASIC_ALPHANUMERICS.add((int)Character.UPPERCASE_LETTER); + CHAR_TYPES_BASIC_ALPHANUMERICS.add((int)Character.LOWERCASE_LETTER); + } + + /** + * Set of character types including only decimals, letter numbers, + * other numbers, upper, lower, title case as well as letter modifiers + * and other letters. + */ + public static final Set<Integer> CHAR_TYPE_EXTENDED_ALPHANUMERICS; + static { + CHAR_TYPE_EXTENDED_ALPHANUMERICS = new HashSet<Integer>(8); + CHAR_TYPE_EXTENDED_ALPHANUMERICS.add((int)Character.DECIMAL_DIGIT_NUMBER); + CHAR_TYPE_EXTENDED_ALPHANUMERICS.add((int)Character.LETTER_NUMBER); + CHAR_TYPE_EXTENDED_ALPHANUMERICS.add((int)Character.OTHER_NUMBER); + CHAR_TYPE_EXTENDED_ALPHANUMERICS.add((int)Character.UPPERCASE_LETTER); + CHAR_TYPE_EXTENDED_ALPHANUMERICS.add((int)Character.LOWERCASE_LETTER); + CHAR_TYPE_EXTENDED_ALPHANUMERICS.add((int)Character.TITLECASE_LETTER); + CHAR_TYPE_EXTENDED_ALPHANUMERICS.add((int)Character.MODIFIER_LETTER); + CHAR_TYPE_EXTENDED_ALPHANUMERICS.add((int)Character.OTHER_LETTER); + } + + /** The character set to iterate over. */ + private final int[] characterSet; + + /** An array indices matching a position in the output string. */ + private int[] indices; + + /** The length of the output string in characters. */ + private final int length; + + /** The last value returned by the generator. Should be null if {@link #nextValue()} + * has not been called.*/ + private String lastValue; + + /** Whether or not to throw an exception when the string rolls over. */ + private boolean throwExceptionOnRollover; + + /** Whether or not the generator has rolled over. */ + private boolean hasRolledOver; + + /** + * Generates strings of 8 characters using only the upper and lower case alphabetical + * characters from the ASCII set. + */ + public IncrementingPrintableStringGenerator() { + this(DEFAULTSTRINGLENGTH, printableBasicAlphaASCIISet()); + } + + /** + * Generates strings of {@link #length} characters using only the upper and lower + * case alphabetical characters from the ASCII set. + * @param length The length of string to return from the generator. + * @throws IllegalArgumentException if the length is less than one. + */ + public IncrementingPrintableStringGenerator(final int length) { + this(length, printableBasicAlphaASCIISet()); + } + + /** + * Generates strings of {@link #length} characters using the code points in + * {@link #characterSet}. + * @param length The length of string to return from the generator. + * @param characterSet A set of code points to choose from. Code points in the + * set can be in any order, not necessarily lexical. + * @throws IllegalArgumentException if the length is less than one or the character + * set has fewer than one code points. + */ + public IncrementingPrintableStringGenerator(final int length, final int[] characterSet) { + if (length < 1) { + throw new IllegalArgumentException("Length must be greater than or equal to 1"); + } + if (characterSet == null || characterSet.length < 1) { + throw new IllegalArgumentException("Character set must have at least one character"); + } + this.length = length; + this.characterSet = characterSet; + indices = new int[length]; + } + + @Override + public String nextValue() { + if (hasRolledOver && throwExceptionOnRollover) { + throw new NoSuchElementException("The generator has rolled over to the beginning"); + } + + final StringBuilder buffer = new StringBuilder(length); + for (int i = 0; i < length; i++) { + buffer.append(Character.toChars(characterSet[indices[i]])); + } + + // increment the indices; + for (int i = length - 1; i >= 0; --i) { + if (indices[i] >= characterSet.length - 1) { + indices[i] = 0; + if (i == 0 || characterSet.length == 1 && lastValue != null) { + hasRolledOver = true; + } + } else { + ++indices[i]; + break; + } + } + + lastValue = buffer.toString(); + return lastValue; + } + + @Override + public String lastValue() { + return lastValue; + } + + /** @param exceptionOnRollover Whether or not to throw an exception on rollover. */ + public void setThrowExceptionOnRollover(final boolean exceptionOnRollover) { + this.throwExceptionOnRollover = exceptionOnRollover; + } + + /** @return Whether or not to throw an exception on rollover. */ + public boolean getThrowExceptionOnRollover() { + return throwExceptionOnRollover; + } + + /** + * Returns an array of printable code points with only the upper and lower + * case alphabetical characters from the basic ASCII set. + * @return An array of code points + */ + public static int[] printableBasicAlphaASCIISet() { + final List<Integer> validCharacters = + generatePrintableCharacterSet(0, 127, null, false, CHAR_TYPES_BASIC_ALPHA); + final int[] characterSet = new int[validCharacters.size()]; + for (int i = 0; i < validCharacters.size(); i++) { + characterSet[i] = validCharacters.get(i); + } + return characterSet; + } + + /** + * Returns an array of printable code points with the upper and lower case + * alphabetical characters as well as the numeric values from the basic + * ASCII set. + * @return An array of code points + */ + public static int[] printableBasicAlphaNumericASCIISet() { + final List<Integer> validCharacters = + generatePrintableCharacterSet(0, 127, null, false, CHAR_TYPES_BASIC_ALPHANUMERICS); + final int[] characterSet = new int[validCharacters.size()]; + for (int i = 0; i < validCharacters.size(); i++) { + characterSet[i] = validCharacters.get(i); + } + return characterSet; + } + + /** + * Returns an array of printable code points with the entire basic ASCII table, + * including spaces. Excludes new lines. + * @return An array of code points + */ + public static int[] fullPrintableBasicASCIISet() { + final List<Integer> validCharacters = + generatePrintableCharacterSet(32, 127, null, false, null); + final int[] characterSet = new int[validCharacters.size()]; + for (int i = 0; i < validCharacters.size(); i++) { + characterSet[i] = validCharacters.get(i); + } + return characterSet; + } + + /** + * Returns an array of printable code points with the entire basic ASCII table, + * including spaces and new lines. + * @return An array of code points + */ + public static int[] fullPrintableBasicASCIISetWithNewlines() { + final List<Integer> validCharacters =new ArrayList<Integer>(); + validCharacters.add(10); // newline + validCharacters.addAll(generatePrintableCharacterSet(32, 127, null, false, null)); + final int[] characterSet = new int[validCharacters.size()]; + for (int i = 0; i < validCharacters.size(); i++) { + characterSet[i] = validCharacters.get(i); + } + return characterSet; + } + + /** + * Returns an array of printable code points the first plane of Unicode characters + * including only the alpha-numeric values. + * @return An array of code points + */ + public static int[] printableAlphaNumericPlaneZeroSet() { + final List<Integer> validCharacters = + generatePrintableCharacterSet(0, 65535, null, false, CHAR_TYPES_BASIC_ALPHANUMERICS); + final int[] characterSet = new int[validCharacters.size()]; + for (int i = 0; i < validCharacters.size(); i++) { + characterSet[i] = validCharacters.get(i); + } + return characterSet; + } + + /** + * Returns an array of printable code points the first plane of Unicode characters + * including all printable characters. + * @return An array of code points + */ + public static int[] fullPrintablePlaneZeroSet() { + final List<Integer> validCharacters = + generatePrintableCharacterSet(0, 65535, null, false, CHAR_TYPES_ALL_BUT_CONTROL); + final int[] characterSet = new int[validCharacters.size()]; + for (int i = 0; i < validCharacters.size(); i++) { + characterSet[i] = validCharacters.get(i); + } + return characterSet; + } + + /** + * Generates a list of code points based on a range and filters. + * These can be used for generating strings with various ASCII and/or + * Unicode printable character sets for use with DBs that may have + * character limitations. + * <p> + * Note that control, surrogate, format, private use and unassigned + * code points are skipped. + * @param startCodePoint The starting code point, inclusive. + * @param lastCodePoint The final code point, inclusive. + * @param characterTypesFilter An optional set of allowable character + * types. See {@link Character} for types. + * @param isFilterAllowableList Determines whether the {@code allowableTypes} + * set is inclusive or exclusive. When true, only those code points that + * appear in the list will be included in the resulting set. Otherwise + * matching code points are excluded. + * @param allowableTypes An optional list of code points for inclusion or + * exclusion. + * @return A list of code points matching the given range and filters. The + * list may be empty but is guaranteed not to be null. + */ + public static List<Integer> generatePrintableCharacterSet( + final int startCodePoint, + final int lastCodePoint, + final Set<Integer> characterTypesFilter, + final boolean isFilterAllowableList, + final Set<Integer> allowableTypes) { + + // since we don't know the final size of the allowable character list we + // start with a list then we'll flatten it to an array. + final List<Integer> validCharacters = new ArrayList<Integer>(lastCodePoint); + + for (int codePoint = startCodePoint; codePoint <= lastCodePoint; ++codePoint) { + if (allowableTypes != null && + !allowableTypes.contains(Character.getType(codePoint))) { + continue; + } else { + // skip control points, formats, surrogates, etc + final int type = Character.getType(codePoint); + if (type == Character.CONTROL || + type == Character.SURROGATE || + type == Character.FORMAT || + type == Character.PRIVATE_USE || + type == Character.UNASSIGNED) { + continue; + } + } + + if (characterTypesFilter != null) { + // if the filter is enabled then we need to make sure the code point + // is in the allowable list if it's a whitelist or that the code point + // is NOT in the list if it's a blacklist. + if ((isFilterAllowableList && !characterTypesFilter.contains(codePoint)) || + (characterTypesFilter.contains(codePoint))) { + continue; + } + } + + validCharacters.add(codePoint); + } + return validCharacters; + } + +} diff --git a/core/src/main/java/com/yahoo/ycsb/generator/SequentialGenerator.java b/core/src/main/java/com/yahoo/ycsb/generator/SequentialGenerator.java index 3aa29491aab402a5ac785ba98a142822a056e20c..4d53385e312a5c8ab30a3a955643deb00a4b7bbc 100644 --- a/core/src/main/java/com/yahoo/ycsb/generator/SequentialGenerator.java +++ b/core/src/main/java/com/yahoo/ycsb/generator/SequentialGenerator.java @@ -1,12 +1,12 @@ /** * Copyright (c) 2016 YCSB Contributors All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you + * + * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or @@ -37,7 +37,7 @@ public class SequentialGenerator extends NumberGenerator { } /** - * If the generator returns numeric (integer) values, return the next value as an int. + * If the generator returns numeric (integer) values, return the next value as an int. * Default is to return -1, which is appropriate for generators that do not return numeric values. */ public int nextInt() { @@ -45,16 +45,19 @@ public class SequentialGenerator extends NumberGenerator { setLastValue(ret); return ret; } + @Override public Number nextValue() { int ret = _countstart + counter.getAndIncrement() % _interval; setLastValue(ret); return ret; } + @Override public Number lastValue() { return counter.get() + 1; } + @Override public double mean() { throw new UnsupportedOperationException("Can't compute mean of non-stationary distribution!"); diff --git a/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java b/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java index 04d5c20d32e864969cf7a3b63aa0145313bb744c..b9ff7e734cf3b3224511de51e4f1b16253f00ff7 100644 --- a/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java +++ b/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2010 Yahoo! Inc., 2016 YCSB Contributors All rights reserved. + * Copyright (c) 2010 Yahoo! Inc., Copyright (c) 2016 YCSB contributors. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You @@ -113,7 +113,7 @@ public class CoreWorkload extends Workload { * (favouring short records), "constant", and "histogram". * * If "uniform", "zipfian" or "constant", the maximum field length will be that specified by the - * fieldlength property. If "histogram", then the histogram will be read from the filename + * fieldlength property. If "histogram", then the histogram will be read from the filename * specified in the "fieldlengthhistogram" property. */ public static final String FIELD_LENGTH_DISTRIBUTION_PROPERTY = "fieldlengthdistribution"; @@ -256,7 +256,7 @@ public class CoreWorkload extends Workload { public static final String REQUEST_DISTRIBUTION_PROPERTY_DEFAULT = "uniform"; /** - * The name of the property for adding zero padding to record numbers in order to match + * The name of the property for adding zero padding to record numbers in order to match * string sort order. Controls the number of 0s to left pad with. */ public static final String ZERO_PADDING_PROPERTY = "zeropadding"; @@ -395,17 +395,7 @@ public class CoreWorkload extends Workload { fieldnames.add("field" + i); } fieldlengthgenerator = CoreWorkload.getFieldLengthGenerator(p); - - double readproportion = Double.parseDouble( - p.getProperty(READ_PROPORTION_PROPERTY, READ_PROPORTION_PROPERTY_DEFAULT)); - double updateproportion = Double.parseDouble( - p.getProperty(UPDATE_PROPORTION_PROPERTY, UPDATE_PROPORTION_PROPERTY_DEFAULT)); - double insertproportion = Double.parseDouble( - p.getProperty(INSERT_PROPORTION_PROPERTY, INSERT_PROPORTION_PROPERTY_DEFAULT)); - double scanproportion = Double.parseDouble( - p.getProperty(SCAN_PROPORTION_PROPERTY, SCAN_PROPORTION_PROPERTY_DEFAULT)); - double readmodifywriteproportion = Double.parseDouble(p.getProperty( - READMODIFYWRITE_PROPORTION_PROPERTY, READMODIFYWRITE_PROPORTION_PROPERTY_DEFAULT)); + recordcount = Integer.parseInt(p.getProperty(Client.RECORD_COUNT_PROPERTY, Client.DEFAULT_RECORD_COUNT)); if (recordcount == 0) { @@ -462,26 +452,7 @@ public class CoreWorkload extends Workload { } keysequence = new CounterGenerator(insertstart); - operationchooser = new DiscreteGenerator(); - if (readproportion > 0) { - operationchooser.addValue(readproportion, "READ"); - } - - if (updateproportion > 0) { - operationchooser.addValue(updateproportion, "UPDATE"); - } - - if (insertproportion > 0) { - operationchooser.addValue(insertproportion, "INSERT"); - } - - if (scanproportion > 0) { - operationchooser.addValue(scanproportion, "SCAN"); - } - - if (readmodifywriteproportion > 0) { - operationchooser.addValue(readmodifywriteproportion, "READMODIFYWRITE"); - } + operationchooser = createOperationGenerator(p); transactioninsertkeysequence = new AcknowledgedCounterGenerator(recordcount); if (requestdistrib.compareTo("uniform") == 0) { @@ -493,13 +464,13 @@ public class CoreWorkload extends Workload { // number of keys. // If the number of keys changes, this would shift the modulus, and we don't want that to // change which keys are popular so we'll actually construct the scrambled zipfian generator - // with a keyspace that is larger than exists at the beginning of the test. that is, we'll - // predict the number of inserts, and tell the scrambled zipfian generator the number of - // existing keys plus the number of predicted keys as the total keyspace. then, if the - // generator picks a key that hasn't been inserted yet, will just ignore it and pick another - // key. this way, the size ofthe keyspace doesn't change from the perspective of the scrambled - // zipfian generator. - + // with a keyspace that is larger than exists at the beginning of the test. that is, we'll predict + // the number of inserts, and tell the scrambled zipfian generator the number of existing keys + // plus the number of predicted keys as the total keyspace. then, if the generator picks a key + // that hasn't been inserted yet, will just ignore it and pick another key. this way, the size of + // the keyspace doesn't change from the perspective of the scrambled zipfian generator + final double insertproportion = Double.parseDouble( + p.getProperty(INSERT_PROPORTION_PROPERTY, INSERT_PROPORTION_PROPERTY_DEFAULT)); int opcount = Integer.parseInt(p.getProperty(Client.OPERATION_COUNT_PROPERTY)); int expectednewkeys = (int) ((opcount) * insertproportion * 2.0); // 2 is fudge factor @@ -511,7 +482,7 @@ public class CoreWorkload extends Workload { Double.parseDouble(p.getProperty(HOTSPOT_DATA_FRACTION, HOTSPOT_DATA_FRACTION_DEFAULT)); double hotopnfraction = Double.parseDouble(p.getProperty(HOTSPOT_OPN_FRACTION, HOTSPOT_OPN_FRACTION_DEFAULT)); - keychooser = new HotspotIntegerGenerator(insertstart, insertstart + insertcount - 1, + keychooser = new HotspotIntegerGenerator(insertstart, insertstart + insertcount - 1, hotsetfraction, hotopnfraction); } else { throw new WorkloadException("Unknown request distribution \"" + requestdistrib + "\""); @@ -661,7 +632,7 @@ public class CoreWorkload extends Workload { case "UPDATE": doTransactionUpdate(db); break; - case "INSERT": + case "INSERT": doTransactionInsert(db); break; case "SCAN": @@ -842,4 +813,51 @@ public class CoreWorkload extends Workload { transactioninsertkeysequence.acknowledge(keynum); } } + + /** + * Creates a weighted discrete values with database operations for a workload to perform. + * Weights/proportions are read from the properties list and defaults are used + * when values are not configured. + * Current operations are "READ", "UPDATE", "INSERT", "SCAN" and "READMODIFYWRITE". + * @param p The properties list to pull weights from. + * @return A generator that can be used to determine the next operation to perform. + * @throws IllegalArgumentException if the properties object was null. + */ + public static DiscreteGenerator createOperationGenerator(final Properties p) { + if (p == null) { + throw new IllegalArgumentException("Properties object cannot be null"); + } + final double readproportion = Double.parseDouble( + p.getProperty(READ_PROPORTION_PROPERTY, READ_PROPORTION_PROPERTY_DEFAULT)); + final double updateproportion = Double.parseDouble( + p.getProperty(UPDATE_PROPORTION_PROPERTY, UPDATE_PROPORTION_PROPERTY_DEFAULT)); + final double insertproportion = Double.parseDouble( + p.getProperty(INSERT_PROPORTION_PROPERTY, INSERT_PROPORTION_PROPERTY_DEFAULT)); + final double scanproportion = Double.parseDouble( + p.getProperty(SCAN_PROPORTION_PROPERTY, SCAN_PROPORTION_PROPERTY_DEFAULT)); + final double readmodifywriteproportion = Double.parseDouble(p.getProperty( + READMODIFYWRITE_PROPORTION_PROPERTY, READMODIFYWRITE_PROPORTION_PROPERTY_DEFAULT)); + + final DiscreteGenerator operationchooser = new DiscreteGenerator(); + if (readproportion > 0) { + operationchooser.addValue(readproportion, "READ"); + } + + if (updateproportion > 0) { + operationchooser.addValue(updateproportion, "UPDATE"); + } + + if (insertproportion > 0) { + operationchooser.addValue(insertproportion, "INSERT"); + } + + if (scanproportion > 0) { + operationchooser.addValue(scanproportion, "SCAN"); + } + + if (readmodifywriteproportion > 0) { + operationchooser.addValue(readmodifywriteproportion, "READMODIFYWRITE"); + } + return operationchooser; + } } diff --git a/core/src/test/java/com/yahoo/ycsb/generator/TestIncrementingPrintableStringGenerator.java b/core/src/test/java/com/yahoo/ycsb/generator/TestIncrementingPrintableStringGenerator.java new file mode 100644 index 0000000000000000000000000000000000000000..eea3d507861563f21ced4c7a12ff168316539159 --- /dev/null +++ b/core/src/test/java/com/yahoo/ycsb/generator/TestIncrementingPrintableStringGenerator.java @@ -0,0 +1,130 @@ +/** + * Copyright (c) 2016 YCSB contributors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package com.yahoo.ycsb.generator; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.fail; + +import java.util.NoSuchElementException; + +import org.testng.annotations.Test; + +public class TestIncrementingPrintableStringGenerator { + private final static int[] ATOC = new int[] { 65, 66, 67 }; + + @Test + public void rolloverOK() throws Exception { + final IncrementingPrintableStringGenerator gen = + new IncrementingPrintableStringGenerator(2, ATOC); + + assertNull(gen.lastValue()); + assertEquals(gen.nextValue(), "AA"); + assertEquals(gen.lastValue(), "AA"); + assertEquals(gen.nextValue(), "AB"); + assertEquals(gen.lastValue(), "AB"); + assertEquals(gen.nextValue(), "AC"); + assertEquals(gen.lastValue(), "AC"); + assertEquals(gen.nextValue(), "BA"); + assertEquals(gen.lastValue(), "BA"); + assertEquals(gen.nextValue(), "BB"); + assertEquals(gen.lastValue(), "BB"); + assertEquals(gen.nextValue(), "BC"); + assertEquals(gen.lastValue(), "BC"); + assertEquals(gen.nextValue(), "CA"); + assertEquals(gen.lastValue(), "CA"); + assertEquals(gen.nextValue(), "CB"); + assertEquals(gen.lastValue(), "CB"); + assertEquals(gen.nextValue(), "CC"); + assertEquals(gen.lastValue(), "CC"); + assertEquals(gen.nextValue(), "AA"); // <-- rollover + assertEquals(gen.lastValue(), "AA"); + } + + @Test + public void rolloverOneCharacterOK() throws Exception { + // It would be silly to create a generator with one character. + final IncrementingPrintableStringGenerator gen = + new IncrementingPrintableStringGenerator(2, new int[] { 65 }); + for (int i = 0; i < 5; i++) { + assertEquals(gen.nextValue(), "AA"); + } + } + + @Test + public void rolloverException() throws Exception { + final IncrementingPrintableStringGenerator gen = + new IncrementingPrintableStringGenerator(2, ATOC); + gen.setThrowExceptionOnRollover(true); + + int i = 0; + try { + while(i < 11) { + ++i; + gen.nextValue(); + } + fail("Expected NoSuchElementException"); + } catch (NoSuchElementException e) { + assertEquals(i, 10); + } + } + + @Test + public void rolloverOneCharacterException() throws Exception { + // It would be silly to create a generator with one character. + final IncrementingPrintableStringGenerator gen = + new IncrementingPrintableStringGenerator(2, new int[] { 65 }); + gen.setThrowExceptionOnRollover(true); + + int i = 0; + try { + while(i < 3) { + ++i; + gen.nextValue(); + } + fail("Expected NoSuchElementException"); + } catch (NoSuchElementException e) { + assertEquals(i, 2); + } + } + + @Test + public void invalidLengths() throws Exception { + try { + new IncrementingPrintableStringGenerator(0, ATOC); + fail("Expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { } + + try { + new IncrementingPrintableStringGenerator(-42, ATOC); + fail("Expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { } + } + + @Test + public void invalidCharacterSets() throws Exception { + try { + new IncrementingPrintableStringGenerator(2, null); + fail("Expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { } + + try { + new IncrementingPrintableStringGenerator(2, new int[] {}); + fail("Expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { } + } +} diff --git a/core/src/test/java/com/yahoo/ycsb/workloads/TestCoreWorkload.java b/core/src/test/java/com/yahoo/ycsb/workloads/TestCoreWorkload.java new file mode 100644 index 0000000000000000000000000000000000000000..d52f29b248cd069596ae1dd5bf9f8473e1fcaa64 --- /dev/null +++ b/core/src/test/java/com/yahoo/ycsb/workloads/TestCoreWorkload.java @@ -0,0 +1,70 @@ +/** + * Copyright (c) 2016 YCSB contributors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package com.yahoo.ycsb.workloads; + +import static org.testng.Assert.assertTrue; + +import java.util.Properties; + +import org.testng.annotations.Test; + +import com.yahoo.ycsb.generator.DiscreteGenerator; + +public class TestCoreWorkload { + + @Test + public void createOperationChooser() { + final Properties p = new Properties(); + p.setProperty(CoreWorkload.READ_PROPORTION_PROPERTY, "0.20"); + p.setProperty(CoreWorkload.UPDATE_PROPORTION_PROPERTY, "0.20"); + p.setProperty(CoreWorkload.INSERT_PROPORTION_PROPERTY, "0.20"); + p.setProperty(CoreWorkload.SCAN_PROPORTION_PROPERTY, "0.20"); + p.setProperty(CoreWorkload.READMODIFYWRITE_PROPORTION_PROPERTY, "0.20"); + final DiscreteGenerator generator = CoreWorkload.createOperationGenerator(p); + final int[] counts = new int[5]; + + for (int i = 0; i < 100; ++i) { + switch (generator.nextString()) { + case "READ": + ++counts[0]; + break; + case "UPDATE": + ++counts[1]; + break; + case "INSERT": + ++counts[2]; + break; + case "SCAN": + ++counts[3]; + break; + default: + ++counts[4]; + } + } + + for (int i : counts) { + // Doesn't do a wonderful job of equal distribution, but in a hundred, if we + // don't see at least one of each operation then the generator is really broke. + assertTrue(i > 1); + } + } + + @Test (expectedExceptions = IllegalArgumentException.class) + public void createOperationChooserNullProperties() { + CoreWorkload.createOperationGenerator(null); + } +} \ No newline at end of file diff --git a/couchbase2/README.md b/couchbase2/README.md index 455a4eea02205479b0cc255a82c808e063906ee9..786060da43941099e8f4f90e70f967fa6b86c273 100644 --- a/couchbase2/README.md +++ b/couchbase2/README.md @@ -91,6 +91,26 @@ to just "drive load" and disable the waiting. Note that when the "-p couchbase.s used, the measured results by YCSB can basically be thrown away. Still helpful sometimes during load phases to speed them up :) +## Debugging Latency +The Couchbase Java SDK has the ability to collect and dump different kinds of metrics which allow you to analyze +performance during benchmarking and production. By default this option is disabled in the benchmark, but by setting +`couchbase.networkMetricsInterval` and/or `couchbase.runtimeMetricsInterval` to something greater than 0 it will +output the information as JSON into the configured logger. The number provides is the interval in seconds. If you are +unsure what interval to pick, start with 10 or 30 seconds, depending on your runtime length. + +This is how such logs look like: + +``` +INFO: {"heap.used":{"init":268435456,"used":36500912,"committed":232259584,"max":3817865216},"gc.ps marksweep.collectionTime":0,"gc.ps scavenge.collectionTime":54,"gc.ps scavenge.collectionCount":17,"thread.count":26,"offHeap.used":{"init":2555904,"used":30865944,"committed":31719424,"max":-1},"gc.ps marksweep.collectionCount":0,"heap.pendingFinalize":0,"thread.peakCount":26,"event":{"name":"RuntimeMetrics","type":"METRIC"},"thread.startedCount":28} +INFO: {"localhost/127.0.0.1:11210":{"BINARY":{"ReplaceRequest":{"SUCCESS":{"metrics":{"percentiles":{"50.0":102,"90.0":136,"95.0":155,"99.0":244,"99.9":428},"min":55,"max":1564,"count":35787,"timeUnit":"MICROSECONDS"}}},"GetRequest":{"SUCCESS":{"metrics":{"percentiles":{"50.0":74,"90.0":98,"95.0":110,"99.0":158,"99.9":358},"min":34,"max":2310,"count":35604,"timeUnit":"MICROSECONDS"}}},"GetBucketConfigRequest":{"SUCCESS":{"metrics":{"percentiles":{"50.0":462,"90.0":462,"95.0":462,"99.0":462,"99.9":462},"min":460,"max":462,"count":1,"timeUnit":"MICROSECONDS"}}}}},"event":{"name":"NetworkLatencyMetrics","type":"METRIC"}} +``` + +It is recommended to either feed it into a program which can analyze and visualize JSON or just dump it into a JSON +pretty printer and look at it manually. Since the output can be changed (only by changing the code at the moment), you +can even configure to put those messages into another couchbase bucket and then analyze it through N1QL! You can learn +more about this in general [in the official docs](http://developer.couchbase.com/documentation/server/4.0/sdks/java-2.2/event-bus-metrics.html). + + ## Configuration Options Since no setup is the same and the goal of YCSB is to deliver realistic benchmarks, here are some setups that you can tune. Note that if you need more flexibility (let's say a custom transcoder), you still need to extend this driver and @@ -112,4 +132,6 @@ You can set the following properties (with the default settings applied): - couchbase.queryEndpoints=5: The number of N1QL Query sockets to open per server. - couchbase.epoll=false: If Epoll instead of NIO should be used (only available for linux. - couchbase.boost=3: If > 0 trades CPU for higher throughput. N is the number of event loops, ideally - set to the number of physical cores. Setting higher than that will likely degrade performance. \ No newline at end of file + set to the number of physical cores. Setting higher than that will likely degrade performance. + - couchbase.networkMetricsInterval=0: The interval in seconds when latency metrics will be logged. + - couchbase.runtimeMetricsInterval=0: The interval in seconds when runtime metrics will be logged. \ No newline at end of file diff --git a/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/Couchbase2Client.java b/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/Couchbase2Client.java index 41695243ee10f392baab65d0ed00cabec02e6cae..cdd57d8462c2bc6dbdaf32188c38a7bfa2749b76 100644 --- a/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/Couchbase2Client.java +++ b/couchbase2/src/main/java/com/yahoo/ycsb/db/couchbase2/Couchbase2Client.java @@ -21,6 +21,10 @@ import com.couchbase.client.core.env.DefaultCoreEnvironment; import com.couchbase.client.core.env.resources.IoPoolShutdownHook; import com.couchbase.client.core.logging.CouchbaseLogger; import com.couchbase.client.core.logging.CouchbaseLoggerFactory; +import com.couchbase.client.core.metrics.DefaultLatencyMetricsCollectorConfig; +import com.couchbase.client.core.metrics.DefaultMetricsCollectorConfig; +import com.couchbase.client.core.metrics.LatencyMetricsCollectorConfig; +import com.couchbase.client.core.metrics.MetricsCollectorConfig; import com.couchbase.client.deps.com.fasterxml.jackson.core.JsonFactory; import com.couchbase.client.deps.com.fasterxml.jackson.core.JsonGenerator; import com.couchbase.client.deps.com.fasterxml.jackson.databind.JsonNode; @@ -87,6 +91,8 @@ import java.util.concurrent.locks.LockSupport; * <li><b>couchbase.epoll=false</b> If Epoll instead of NIO should be used (only available for linux.</li> * <li><b>couchbase.boost=3</b> If > 0 trades CPU for higher throughput. N is the number of event loops, ideally * set to the number of physical cores. Setting higher than that will likely degrade performance.</li> + * <li><b>couchbase.networkMetricsInterval=0</b> The interval in seconds when latency metrics will be logged.</li> + * <li><b>couchbase.runtimeMetricsInterval=0</b> The interval in seconds when runtime metrics will be logged.</li> * </ul> */ public class Couchbase2Client extends DB { @@ -117,6 +123,8 @@ public class Couchbase2Client extends DB { private int kvEndpoints; private int queryEndpoints; private int boost; + private int networkMetricsInterval; + private int runtimeMetricsInterval; @Override public void init() throws DBException { @@ -137,14 +145,31 @@ public class Couchbase2Client extends DB { queryEndpoints = Integer.parseInt(props.getProperty("couchbase.queryEndpoints", "5")); epoll = props.getProperty("couchbase.epoll", "false").equals("true"); boost = Integer.parseInt(props.getProperty("couchbase.boost", "3")); + networkMetricsInterval = Integer.parseInt(props.getProperty("couchbase.networkMetricsInterval", "0")); + runtimeMetricsInterval = Integer.parseInt(props.getProperty("couchbase.runtimeMetricsInterval", "0")); try { synchronized (INIT_COORDINATOR) { if (env == null) { + + LatencyMetricsCollectorConfig latencyConfig = networkMetricsInterval <= 0 + ? DefaultLatencyMetricsCollectorConfig.disabled() + : DefaultLatencyMetricsCollectorConfig + .builder() + .emitFrequency(networkMetricsInterval) + .emitFrequencyUnit(TimeUnit.SECONDS) + .build(); + + MetricsCollectorConfig runtimeConfig = runtimeMetricsInterval <= 0 + ? DefaultMetricsCollectorConfig.disabled() + : DefaultMetricsCollectorConfig.create(runtimeMetricsInterval, TimeUnit.SECONDS); + DefaultCouchbaseEnvironment.Builder builder = DefaultCouchbaseEnvironment .builder() .queryEndpoints(queryEndpoints) .callbacksOnIoPool(true) + .runtimeMetricsCollectorConfig(runtimeConfig) + .networkLatencyMetricsCollectorConfig(latencyConfig) .kvEndpoints(kvEndpoints); // Tune boosting and epoll based on settings @@ -197,6 +222,8 @@ public class Couchbase2Client extends DB { sb.append(", queryEndpoints=").append(queryEndpoints); sb.append(", epoll=").append(epoll); sb.append(", boost=").append(boost); + sb.append(", networkMetricsInterval=").append(networkMetricsInterval); + sb.append(", runtimeMetricsInterval=").append(runtimeMetricsInterval); LOGGER.info("===> Using Params: " + sb.toString()); } diff --git a/distribution/pom.xml b/distribution/pom.xml index a74d8f1b16aae89ac8eca282df8f30074a0ed643..459b4db313a6036c7ac15b420b1295da43264376 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -49,6 +49,11 @@ LICENSE file. <artifactId>aerospike-binding</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>com.yahoo.ycsb</groupId> + <artifactId>asynchbase-binding</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>com.yahoo.ycsb</groupId> <artifactId>cassandra-binding</artifactId> @@ -149,6 +154,11 @@ LICENSE file. <artifactId>redis-binding</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>com.yahoo.ycsb</groupId> + <artifactId>riak-binding</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>com.yahoo.ycsb</groupId> <artifactId>s3-binding</artifactId> diff --git a/googledatastore/README.md b/googledatastore/README.md index a6755a6522ee100c5ab1a3c3ebad088a2ba31d7b..80b6a4cf8a12d95213a1a37c7a2f42588b132b6d 100644 --- a/googledatastore/README.md +++ b/googledatastore/README.md @@ -20,6 +20,9 @@ LICENSE file. https://cloud.google.com/datastore/docs/concepts/overview?hl=en +Please refer [here] (https://cloud.google.com/datastore/docs/apis/overview) for more information on +Google Cloud Datastore API. + ## Configure YCSB_HOME - YCSB home directory @@ -44,7 +47,7 @@ A. Configuration and setup: See this link for instructions about setting up Google Cloud Datastore and authentication: -https://cloud.google.com/datastore/docs/getstarted/start_java/ +https://cloud.google.com/datastore/docs/activate#accessing_the_datastore_api_from_another_platform After you setup your environment, you will have 3 pieces of information ready: - datasetId, diff --git a/googledatastore/conf/googledatastore.properties b/googledatastore/conf/googledatastore.properties index ac95b570c4ec4f469d6d6ceda0c6c13b3bdd9d29..408acf0d0d025f1dc419fa404633ea60f88ea133 100644 --- a/googledatastore/conf/googledatastore.properties +++ b/googledatastore/conf/googledatastore.properties @@ -26,7 +26,7 @@ # Google Cloud Datastore's read and update APIs do not support # reading or updating a select subset of properties for an entity. -# (as of version v1beta2-rev1-3.0.2) +# (as of version v1beta3) # Therefore, it's recommended that you set writeallfields and readallfields # to true to get stable and comparable performance numbers. writeallfields = true diff --git a/googledatastore/pom.xml b/googledatastore/pom.xml index 57db3505c288e6d27c309dbd19a0bcf1ba1fbd2e..3d636a2d523c6751240bf50d80fc6e285c5c8b52 100644 --- a/googledatastore/pom.xml +++ b/googledatastore/pom.xml @@ -31,9 +31,9 @@ LICENSE file. <dependencies> <dependency> - <groupId>com.google.apis</groupId> - <artifactId>google-api-services-datastore-protobuf</artifactId> - <version>v1beta2-rev1-3.0.2</version> + <groupId>com.google.cloud.datastore</groupId> + <artifactId>datastore-v1beta3-proto-client</artifactId> + <version>1.0.0-beta.1</version> </dependency> <dependency> <groupId>log4j</groupId> diff --git a/googledatastore/src/main/java/com/yahoo/ycsb/db/GoogleDatastoreClient.java b/googledatastore/src/main/java/com/yahoo/ycsb/db/GoogleDatastoreClient.java index 12fc0fac96ab0b442d4e1710160ab0fadb2d8842..a3f6553427d49565c61317474b3ceaaae74ab22e 100644 --- a/googledatastore/src/main/java/com/yahoo/ycsb/db/GoogleDatastoreClient.java +++ b/googledatastore/src/main/java/com/yahoo/ycsb/db/GoogleDatastoreClient.java @@ -18,15 +18,14 @@ package com.yahoo.ycsb.db; import com.google.api.client.auth.oauth2.Credential; -import com.google.api.services.datastore.DatastoreV1.*; -import com.google.api.services.datastore.DatastoreV1.CommitRequest.Mode; -import com.google.api.services.datastore.DatastoreV1.ReadOptions - .ReadConsistency; -import com.google.api.services.datastore.client.Datastore; -import com.google.api.services.datastore.client.DatastoreException; -import com.google.api.services.datastore.client.DatastoreFactory; -import com.google.api.services.datastore.client.DatastoreHelper; -import com.google.api.services.datastore.client.DatastoreOptions; +import com.google.datastore.v1beta3.*; +import com.google.datastore.v1beta3.CommitRequest.Mode; +import com.google.datastore.v1beta3.ReadOptions.ReadConsistency; +import com.google.datastore.v1beta3.client.Datastore; +import com.google.datastore.v1beta3.client.DatastoreException; +import com.google.datastore.v1beta3.client.DatastoreFactory; +import com.google.datastore.v1beta3.client.DatastoreHelper; +import com.google.datastore.v1beta3.client.DatastoreOptions; import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.DB; @@ -165,7 +164,7 @@ public class GoogleDatastoreClient extends DB { serviceAccountEmail + ", Private Key File Path: " + privateKeyFile); datastore = DatastoreFactory.get().create( - options.credential(credential).dataset(datasetId).build()); + options.credential(credential).projectId(datasetId).build()); } catch (GeneralSecurityException exception) { throw new DBException("Security error connecting to the datastore: " + @@ -184,7 +183,7 @@ public class GoogleDatastoreClient extends DB { public Status read(String table, String key, Set<String> fields, HashMap<String, ByteIterator> result) { LookupRequest.Builder lookupRequest = LookupRequest.newBuilder(); - lookupRequest.addKey(buildPrimaryKey(table, key)); + lookupRequest.addKeys(buildPrimaryKey(table, key)); lookupRequest.getReadOptionsBuilder().setReadConsistency( this.readConsistency); // Note above, datastore lookupRequest always reads the entire entity, it @@ -219,7 +218,7 @@ public class GoogleDatastoreClient extends DB { Entity entity = response.getFound(0).getEntity(); logger.debug("Read entity: " + entity.toString()); - Map<String, Value> properties = DatastoreHelper.getPropertyMap(entity); + Map<String, Value> properties = entity.getProperties(); Set<String> propertiesToReturn = (fields == null ? properties.keySet() : fields); @@ -267,11 +266,11 @@ public class GoogleDatastoreClient extends DB { if (this.entityGroupingMode == EntityGroupingMode.MULTI_ENTITY_PER_GROUP) { // All entities are in side the same group when we are in this mode. - result.addPathElement(Key.PathElement.newBuilder().setKind(table). + result.addPath(Key.PathElement.newBuilder().setKind(table). setName(rootEntityName)); } - return result.addPathElement(Key.PathElement.newBuilder().setKind(table) + return result.addPath(Key.PathElement.newBuilder().setKind(table) .setName(key)); } @@ -289,25 +288,25 @@ public class GoogleDatastoreClient extends DB { commitRequest.setMode(Mode.NON_TRANSACTIONAL); if (mutationType == MutationType.DELETE) { - commitRequest.getMutationBuilder().addDelete(datastoreKey); + commitRequest.addMutationsBuilder().setDelete(datastoreKey); } else { // If this is not for delete, build the entity. Entity.Builder entityBuilder = Entity.newBuilder(); entityBuilder.setKey(datastoreKey); for (Entry<String, ByteIterator> val : values.entrySet()) { - entityBuilder.addProperty(Property.newBuilder() - .setName(val.getKey()) - .setValue(Value.newBuilder() - .setStringValue(val.getValue().toString()))); + entityBuilder.getMutableProperties() + .put(val.getKey(), + Value.newBuilder() + .setStringValue(val.getValue().toString()).build()); } Entity entity = entityBuilder.build(); logger.debug("entity built as: " + entity.toString()); if (mutationType == MutationType.UPSERT) { - commitRequest.getMutationBuilder().addUpsert(entity); + commitRequest.addMutationsBuilder().setUpsert(entity); } else if (mutationType == MutationType.UPDATE){ - commitRequest.getMutationBuilder().addUpdate(entity); + commitRequest.addMutationsBuilder().setUpdate(entity); } else { throw new RuntimeException("Impossible MutationType, code bug."); } diff --git a/kudu/README.md b/kudu/README.md index cd5cffd6387a0d92a8327638a1a1d72c1f70d949..e1f2b286438411b562d9b15d8e4dab2458cf2148 100644 --- a/kudu/README.md +++ b/kudu/README.md @@ -42,3 +42,15 @@ Then, you can run the workload: ``` bin/ycsb run kudu -P workloads/workloada ``` + +## Using a previous client version + +If you wish to use a different Kudu client version than the one shipped with YCSB, you can specify on the +command line with `-Dkudu.version=x`. For example: + +``` +mvn -pl com.yahoo.ycsb:kudu-binding -am package -DskipTests -Dkudu.version=0.7.1 +``` + +Note that prior to 1.0, Kudu doesn't guarantee wire or API compability between versions and only the latest +one is officially supported. diff --git a/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java b/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java index 503c574af1f7f7dbcc2ececd8c8bff9ffc291fd1..05757b41f37b06dba7e8b604f3d062470b0c4556 100644 --- a/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java +++ b/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java @@ -233,17 +233,20 @@ public class KuduYCSBClient extends com.yahoo.ycsb.DB { scannerBuilder.setProjectedColumnNames(querySchema); } - PartialRow lowerBound = schema.newPartialRow(); - lowerBound.addString(0, startkey); - scannerBuilder.lowerBound(lowerBound); + KuduPredicate.ComparisonOp comparisonOp; if (recordcount == 1) { - PartialRow upperBound = schema.newPartialRow(); - // Keys are fixed length, just adding something at the end is safe. - upperBound.addString(0, startkey.concat(" ")); - scannerBuilder.exclusiveUpperBound(upperBound); + comparisonOp = KuduPredicate.ComparisonOp.EQUAL; + } else { + comparisonOp = KuduPredicate.ComparisonOp.GREATER_EQUAL; } + KuduPredicate keyPredicate = KuduPredicate.newComparisonPredicate( + schema.getColumnByIndex(0), + comparisonOp, + startkey); - KuduScanner scanner = scannerBuilder.limit(recordcount) // currently noop + KuduScanner scanner = scannerBuilder + .addPredicate(keyPredicate) + .limit(recordcount) // currently noop .build(); while (scanner.hasMoreRows()) { diff --git a/pom.xml b/pom.xml index 8261e9b32d084fe0a1ff04ed06bf97604bf71fa9..3ab675444d62a47792a6d23c82ace0d8320800fe 100644 --- a/pom.xml +++ b/pom.xml @@ -68,6 +68,7 @@ LICENSE file. <properties> <maven.assembly.version>2.5.5</maven.assembly.version> <maven.dependency.version>2.10</maven.dependency.version> + <asynchbase.version>1.7.1</asynchbase.version> <hbase094.version>0.94.27</hbase094.version> <hbase098.version>0.98.14-hadoop2</hbase098.version> <hbase10.version>1.0.2</hbase10.version> @@ -78,7 +79,7 @@ LICENSE file. <geode.version>1.0.0-incubating.M1</geode.version> <googlebigtable.version>0.2.3</googlebigtable.version> <infinispan.version>7.2.2.Final</infinispan.version> - <kudu.version>0.6.0</kudu.version> + <kudu.version>0.8.0</kudu.version> <openjpa.jdbc.version>2.1.1</openjpa.jdbc.version> <!--<mapkeeper.version>1.0</mapkeeper.version>--> <mongodb.version>3.0.3</mongodb.version> @@ -93,6 +94,7 @@ LICENSE file. <couchbase.version>1.4.10</couchbase.version> <couchbase2.version>2.2.6</couchbase2.version> <tarantool.version>1.6.5</tarantool.version> + <riak.version>2.0.5</riak.version> <aerospike.version>3.1.2</aerospike.version> <solr.version>5.4.0</solr.version> </properties> @@ -104,6 +106,7 @@ LICENSE file. <!-- all the datastore bindings, lex sorted please --> <module>accumulo</module> <module>aerospike</module> + <module>asynchbase</module> <module>cassandra</module> <module>cassandra2</module> <module>couchbase</module> @@ -127,6 +130,7 @@ LICENSE file. <module>nosqldb</module> <module>orientdb</module> <module>redis</module> + <module>riak</module> <module>s3</module> <module>solr</module> <module>tarantool</module> diff --git a/riak/README.md b/riak/README.md new file mode 100644 index 0000000000000000000000000000000000000000..d7160808affb90f60e2ddc0134aa9998dd7f897f --- /dev/null +++ b/riak/README.md @@ -0,0 +1,72 @@ +<!-- +Copyright (c) 2016 YCSB contributors. All rights reserved. +Copyright 2014 Basho Technologies, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); you +may not use this file except in compliance with the License. You +may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied. See the License for the specific language governing +permissions and limitations under the License. See accompanying +LICENSE file. +--> + +Riak KV Client for Yahoo! Cloud System Benchmark (YCSB) +======================================================= + +The Riak KV YCSB client is designed to work with the Yahoo! Cloud System Benchmark (YCSB) project (https://github.com/brianfrankcooper/YCSB) to support performance testing for the 2.x.y line of the Riak KV database. + +Creating a <i>bucket type</i> to use with YCSB +---------------------------- + +Perform the following operations on your Riak cluster to configure it for the benchmarks. + +Set the default backend for Riak to <i>LevelDB</i> in the `riak.conf` file of every node of your cluster. This is required to support <i>secondary indexes</i>, which are used for the `scan` transactions. You can do this by modifying the proper line as shown below. + +``` +storage_backend = leveldb +``` + +Now, create a bucket type named "ycsb"<sup id="a1">[1](#f1)</sup> by logging into one of the nodes in your cluster. Then, to use the <i>strong consistency model</i><sup id="a2">[2](#f2)</sup> (default), you need to follow the next two steps. + +1. In every `riak.conf` file, search for the `##strong_consistency=on` line and uncomment it. It's important that you do this <b>before you start your cluster</b>! +2. Run the following `riak-admin` commands: + + ``` + riak-admin bucket-type create ycsb '{"props":{"consistent":true}}' + riak-admin bucket-type activate ycsb + ``` + +Note that when using the strong consistency model, you **may have to specify the number of replicas to create for each object**. The *R* and *W* parameters (see next section) will in fact be ignored. The only information needed by this consistency model is how many nodes the system has to successfully query to consider a transaction completed. To set this parameter, you can add `"n_val":N` to the list of properties shown above (by default `N` is set to 3). + +If instead you want to use the <i>eventual consistency model</i> implemented in Riak, then type: +``` +riak-admin bucket-type create ycsb '{"props":{"allow_mult":"false"}}' +riak-admin bucket-type activate ycsb +``` + +Riak KV configuration parameters +---------------------------- +You can either specify these configuration parameters via command line or set them in the `riak.properties` file. + +* `riak.hosts` - <b>string list</b>, comma separated list of IPs or FQDNs. For example: `riak.hosts=127.0.0.1,127.0.0.2,127.0.0.3` or `riak.hosts=riak1.mydomain.com,riak2.mydomain.com,riak3.mydomain.com`. +* `riak.port` - <b>int</b>, the port on which every node is listening. It must match the one specified in the `riak.conf` file at the line `listener.protobuf.internal`. +* `riak.bucket_type` - <b>string</b>, it must match the name of the bucket type created during setup (see section above). +* `riak.r_val` - <b>int</b>, this value represents the number of Riak nodes that must return results for a read operation before the transaction is considered successfully completed. +* `riak.w_val` - <b>int</b>, this value represents the number of Riak nodes that must report success before an insert/update transaction is considered complete. +* `riak.read_retry_count` - <b>int</b>, the number of times the client will try to read a key from Riak. +* `riak.wait_time_before_retry` - <b>int</b>, the time (in milliseconds) before the client attempts to perform another read if the previous one failed. +* `riak.transaction_time_limit` - <b>int</b>, the time (in seconds) the client waits before aborting the current transaction. +* `riak.strong_consistency` - <b>boolean</b>, indicates whether to use *strong consistency* (true) or *eventual consistency* (false). +* `riak.debug` - <b>boolean</b>, enables debug mode. This displays all the properties (specified or defaults) when a benchmark is started. Moreover, it shows error causes whenever these occur. + +<b>Note</b>: For more information on workloads and how to run them please see: https://github.com/brianfrankcooper/YCSB/wiki/Running-a-Workload + +<b id="f1">1</b> As specified in the `riak.properties` file. See parameters configuration section for further info. [↩](#a1) + +<b id="f2">2</b> <b>IMPORTANT NOTE:</b> Currently the `scan` transactions are <b>NOT SUPPORTED</b> for the benchmarks which use the strong consistency model! However this will not cause the benchmark to fail, it simply won't perform any scan transaction at all. These latter will immediately return with a `Status.NOT_IMPLEMENTED` code. [↩](#a2) diff --git a/riak/pom.xml b/riak/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..da335adfd6a955a82a517229a84fd228d91cfab9 --- /dev/null +++ b/riak/pom.xml @@ -0,0 +1,59 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Copyright (c) 2016 YCSB contributors. All rights reserved. +Copyright 2014 Basho Technologies, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); you +may not use this file except in compliance with the License. You +may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied. See the License for the specific language governing +permissions and limitations under the License. See accompanying +LICENSE file. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>com.yahoo.ycsb</groupId> + <artifactId>binding-parent</artifactId> + <version>0.9.0-SNAPSHOT</version> + <relativePath>../binding-parent</relativePath> + </parent> + + <artifactId>riak-binding</artifactId> + <name>Riak KV Binding</name> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>com.basho.riak</groupId> + <artifactId>riak-client</artifactId> + <version>2.0.5</version> + </dependency> + <dependency> + <groupId>com.yahoo.ycsb</groupId> + <artifactId>core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.google.collections</groupId> + <artifactId>google-collections</artifactId> + <version>1.0</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> + </dependencies> + +</project> \ No newline at end of file diff --git a/riak/src/main/java/com/yahoo/ycsb/db/riak/RiakKVClient.java b/riak/src/main/java/com/yahoo/ycsb/db/riak/RiakKVClient.java new file mode 100644 index 0000000000000000000000000000000000000000..152cd86c03948dea7c570ebc6d4f6d6de1eb661d --- /dev/null +++ b/riak/src/main/java/com/yahoo/ycsb/db/riak/RiakKVClient.java @@ -0,0 +1,566 @@ +/** + * Copyright (c) 2016 YCSB contributors All rights reserved. + * Copyright 2014 Basho Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package com.yahoo.ycsb.db.riak; + +import com.basho.riak.client.api.commands.buckets.StoreBucketProperties; +import com.basho.riak.client.api.commands.kv.UpdateValue; +import com.basho.riak.client.core.RiakFuture; +import com.yahoo.ycsb.*; + +import java.io.IOException; +import java.io.InputStream; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import com.basho.riak.client.api.RiakClient; +import com.basho.riak.client.api.cap.Quorum; +import com.basho.riak.client.api.commands.indexes.IntIndexQuery; +import com.basho.riak.client.api.commands.kv.DeleteValue; +import com.basho.riak.client.api.commands.kv.FetchValue; +import com.basho.riak.client.api.commands.kv.StoreValue; +import com.basho.riak.client.api.commands.kv.StoreValue.Option; +import com.basho.riak.client.core.RiakCluster; +import com.basho.riak.client.core.RiakNode; +import com.basho.riak.client.core.query.Location; +import com.basho.riak.client.core.query.Namespace; +import com.basho.riak.client.core.query.RiakObject; +import com.basho.riak.client.core.query.indexes.LongIntIndex; +import com.basho.riak.client.core.util.BinaryValue; + +import static com.yahoo.ycsb.db.riak.RiakUtils.deserializeTable; +import static com.yahoo.ycsb.db.riak.RiakUtils.getKeyAsLong; +import static com.yahoo.ycsb.db.riak.RiakUtils.serializeTable; + +/** + * Riak KV 2.x.y client for YCSB framework. + * + */ +public final class RiakKVClient extends DB { + private static final String HOST_PROPERTY = "riak.hosts"; + private static final String PORT_PROPERTY = "riak.port"; + private static final String BUCKET_TYPE_PROPERTY = "riak.bucket_type"; + private static final String R_VALUE_PROPERTY = "riak.r_val"; + private static final String W_VALUE_PROPERTY = "riak.w_val"; + private static final String READ_RETRY_COUNT_PROPERTY = "riak.read_retry_count"; + private static final String WAIT_TIME_BEFORE_RETRY_PROPERTY = "riak.wait_time_before_retry"; + private static final String TRANSACTION_TIME_LIMIT_PROPERTY = "riak.transaction_time_limit"; + private static final String STRONG_CONSISTENCY_PROPERTY = "riak.strong_consistency"; + private static final String DEBUG_PROPERTY = "riak.debug"; + + private static final Status TIME_OUT = new Status("TIME_OUT", "Cluster didn't respond after maximum wait time."); + + private String[] hosts; + private int port; + private String bucketType; + private Quorum rvalue; + private Quorum wvalue; + private int readRetryCount; + private int waitTimeBeforeRetry; + private int transactionTimeLimit; + private boolean strongConsistency; + private boolean debug; + + private RiakClient riakClient; + private RiakCluster riakCluster; + + private void loadDefaultProperties() { + InputStream propFile = RiakKVClient.class.getClassLoader().getResourceAsStream("riak.properties"); + Properties propsPF = new Properties(System.getProperties()); + + try { + propsPF.load(propFile); + } catch (IOException e) { + e.printStackTrace(); + } + + hosts = propsPF.getProperty(HOST_PROPERTY).split(","); + port = Integer.parseInt(propsPF.getProperty(PORT_PROPERTY)); + bucketType = propsPF.getProperty(BUCKET_TYPE_PROPERTY); + rvalue = new Quorum(Integer.parseInt(propsPF.getProperty(R_VALUE_PROPERTY))); + wvalue = new Quorum(Integer.parseInt(propsPF.getProperty(W_VALUE_PROPERTY))); + readRetryCount = Integer.parseInt(propsPF.getProperty(READ_RETRY_COUNT_PROPERTY)); + waitTimeBeforeRetry = Integer.parseInt(propsPF.getProperty(WAIT_TIME_BEFORE_RETRY_PROPERTY)); + transactionTimeLimit = Integer.parseInt(propsPF.getProperty(TRANSACTION_TIME_LIMIT_PROPERTY)); + strongConsistency = Boolean.parseBoolean(propsPF.getProperty(STRONG_CONSISTENCY_PROPERTY)); + debug = Boolean.parseBoolean(propsPF.getProperty(DEBUG_PROPERTY)); + } + + private void loadProperties() { + loadDefaultProperties(); + + Properties props = getProperties(); + + String portString = props.getProperty(PORT_PROPERTY); + if (portString != null) { + port = Integer.parseInt(portString); + } + + String hostsString = props.getProperty(HOST_PROPERTY); + if (hostsString != null) { + hosts = hostsString.split(","); + } + + String bucketTypeString = props.getProperty(BUCKET_TYPE_PROPERTY); + if (bucketTypeString != null) { + bucketType = bucketTypeString; + } + + String rValueString = props.getProperty(R_VALUE_PROPERTY); + if (rValueString != null) { + rvalue = new Quorum(Integer.parseInt(rValueString)); + } + + String wValueString = props.getProperty(W_VALUE_PROPERTY); + if (wValueString != null) { + wvalue = new Quorum(Integer.parseInt(wValueString)); + } + + String readRetryCountString = props.getProperty(READ_RETRY_COUNT_PROPERTY); + if (readRetryCountString != null) { + readRetryCount = Integer.parseInt(readRetryCountString); + } + + String waitTimeBeforeRetryString = props.getProperty(WAIT_TIME_BEFORE_RETRY_PROPERTY); + if (waitTimeBeforeRetryString != null) { + waitTimeBeforeRetry = Integer.parseInt(waitTimeBeforeRetryString); + } + + String transactionTimeLimitString = props.getProperty(TRANSACTION_TIME_LIMIT_PROPERTY); + if (transactionTimeLimitString != null) { + transactionTimeLimit = Integer.parseInt(transactionTimeLimitString); + } + + String strongConsistencyString = props.getProperty(STRONG_CONSISTENCY_PROPERTY); + if (strongConsistencyString != null) { + strongConsistency = Boolean.parseBoolean(strongConsistencyString); + } + + String debugString = props.getProperty(DEBUG_PROPERTY); + if (debugString != null) { + debug = Boolean.parseBoolean(debugString); + } + } + + public void init() throws DBException { + loadProperties(); + + if (debug) { + System.err.println("DEBUG ENABLED. Configuration parameters:"); + System.err.println("-----------------------------------------"); + System.err.println("Hosts: " + Arrays.toString(hosts)); + System.err.println("Port: " + port); + System.err.println("Bucket Type: " + bucketType); + System.err.println("R Val: " + rvalue.toString()); + System.err.println("W Val: " + wvalue.toString()); + System.err.println("Read Retry Count: " + readRetryCount); + System.err.println("Wait Time Before Retry: " + waitTimeBeforeRetry + " ms"); + System.err.println("Transaction Time Limit: " + transactionTimeLimit + " s"); + System.err.println("Consistency model: " + (strongConsistency ? "Strong" : "Eventual")); + } + + RiakNode.Builder builder = new RiakNode.Builder().withRemotePort(port); + List<RiakNode> nodes = RiakNode.Builder.buildNodes(builder, Arrays.asList(hosts)); + riakCluster = new RiakCluster.Builder(nodes).build(); + + try { + riakCluster.start(); + riakClient = new RiakClient(riakCluster); + } catch (Exception e) { + System.err.println("Unable to properly start up the cluster. Reason: " + e.toString()); + throw new DBException(e); + } + } + + /** + * Read a record from the database. Each field/value pair from the result will be stored in a HashMap. + * + * @param table The name of the table (Riak bucket) + * @param key The record key of the record to read. + * @param fields The list of fields to read, or null for all of them + * @param result A HashMap of field/value pairs for the result + * @return Zero on success, a non-zero error code on error + */ + @Override + public Status read(String table, String key, Set<String> fields, HashMap<String, ByteIterator> result) { + Location location = new Location(new Namespace(bucketType, table), key); + FetchValue fv = new FetchValue.Builder(location).withOption(FetchValue.Option.R, rvalue).build(); + FetchValue.Response response; + + try { + response = fetch(fv); + + if (response.isNotFound()) { + if (debug) { + System.err.println("Unable to read key " + key + ". Reason: NOT FOUND"); + } + + return Status.NOT_FOUND; + } + } catch (TimeoutException e) { + if (debug) { + System.err.println("Unable to read key " + key + ". Reason: TIME OUT"); + } + + return TIME_OUT; + } catch (Exception e) { + if (debug) { + System.err.println("Unable to read key " + key + ". Reason: " + e.toString()); + } + + return Status.ERROR; + } + + // Create the result HashMap. + createResultHashMap(fields, response, result); + + return Status.OK; + } + + /** + * Perform a range scan for a set of records in the database. Each field/value pair from the result will be stored in + * a HashMap. + * Note: The scan operation requires the use of secondary indexes (2i) and LevelDB. + * IMPORTANT NOTE: the 2i queries DO NOT WORK in conjunction with strong consistency (ref: http://docs.basho + * .com/riak/kv/2.1.4/developing/usage/secondary-indexes/)! + * + * @param table The name of the table (Riak bucket) + * @param startkey The record key of the first record to read. + * @param recordcount The number of records to read + * @param fields The list of fields to read, or null for all of them + * @param result A Vector of HashMaps, where each HashMap is a set field/value pairs for one record + * @return Zero on success, a non-zero error code on error + */ + @Override + public Status scan(String table, String startkey, int recordcount, Set<String> fields, + Vector<HashMap<String, ByteIterator>> result) { + // As of 2.1.4 Riak KV version, strong consistency does not support any suitable mean capable of searching + // consecutive stored keys, as requested by a scan transaction. So, the latter WILL NOT BE PERFORMED AT ALL! + // More info at http://docs.basho.com/riak/kv/2.1.4/developing/app-guide/strong-consistency/ + if (strongConsistency) { + return Status.NOT_IMPLEMENTED; + } + + Namespace ns = new Namespace(bucketType, table); + + IntIndexQuery iiq = new IntIndexQuery + .Builder(ns, "key", getKeyAsLong(startkey), Long.MAX_VALUE) + .withMaxResults(recordcount) + .withPaginationSort(true) + .build(); + + RiakFuture<IntIndexQuery.Response, IntIndexQuery> future = riakClient.executeAsync(iiq); + + try { + IntIndexQuery.Response response = future.get(transactionTimeLimit, TimeUnit.SECONDS); + List<IntIndexQuery.Response.Entry> entries = response.getEntries(); + + for (IntIndexQuery.Response.Entry entry : entries) { + Location location = entry.getRiakObjectLocation(); + + FetchValue fv = new FetchValue.Builder(location) + .withOption(FetchValue.Option.R, rvalue) + .build(); + + FetchValue.Response keyResponse = fetch(fv); + + if (keyResponse.isNotFound()) { + if (debug) { + System.err.println("Unable to scan all records starting from key " + startkey + ", aborting transaction. " + + "Reason: NOT FOUND"); + } + + return Status.NOT_FOUND; + } + + HashMap<String, ByteIterator> partialResult = new HashMap<>(); + createResultHashMap(fields, keyResponse, partialResult); + result.add(partialResult); + } + } catch (TimeoutException e) { + if (debug) { + System.err.println("Unable to scan all records starting from key " + startkey + ", aborting transaction. " + + "Reason: TIME OUT"); + } + + return TIME_OUT; + } catch (Exception e) { + if (debug) { + System.err.println("Unable to scan all records starting from key " + startkey + ", aborting transaction. " + + "Reason: " + e.toString()); + } + + return Status.ERROR; + } + + return Status.OK; + } + + /** + * Tries to perform a read and, whenever it fails, retries to do it. It actually does try as many time as indicated, + * even if the function riakClient.execute(fv) throws an exception. This is needed for those situation in which the + * cluster is unable to respond properly due to overload. Note however that if the cluster doesn't respond after + * transactionTimeLimit, the transaction is discarded immediately. + * + * @param fv The value to fetch from the cluster. + */ + private FetchValue.Response fetch(FetchValue fv) throws TimeoutException { + FetchValue.Response response = null; + + for (int i = 0; i < readRetryCount; i++) { + RiakFuture<FetchValue.Response, Location> future = riakClient.executeAsync(fv); + + try { + response = future.get(transactionTimeLimit, TimeUnit.SECONDS); + + if (!response.isNotFound()) { + break; + } + } catch (TimeoutException e) { + // Let the callee decide how to handle this exception... + throw new TimeoutException(); + } catch (Exception e) { + // Sleep for a few ms before retrying... + try { + Thread.sleep(waitTimeBeforeRetry); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } + } + + return response; + } + + /** + * Function that retrieves all the fields searched within a read or scan operation and puts them in the result + * HashMap. + * + * @param fields The list of fields to read, or null for all of them. + * @param response A Vector of HashMaps, where each HashMap is a set field/value pairs for one record. + * @param resultHashMap The HashMap to return as result. + */ + private void createResultHashMap(Set<String> fields, FetchValue.Response response, HashMap<String, ByteIterator> + resultHashMap) { + // If everything went fine, then a result must be given. Such an object is a hash table containing the (field, + // value) pairs based on the requested fields. Note that in a read operation, ONLY ONE OBJECT IS RETRIEVED! + // The following line retrieves the previously serialized table which was store with an insert transaction. + byte[] responseFieldsAndValues = response.getValues().get(0).getValue().getValue(); + + // Deserialize the stored response table. + HashMap<String, ByteIterator> deserializedTable = new HashMap<>(); + deserializeTable(responseFieldsAndValues, deserializedTable); + + // If only specific fields are requested, then only these should be put in the result object! + if (fields != null) { + // Populate the HashMap to provide as result. + for (Object field : fields.toArray()) { + // Comparison between a requested field and the ones retrieved. If they're equal (i.e. the get() operation + // DOES NOT return a null value), then proceed to store the pair in the resultHashMap. + ByteIterator value = deserializedTable.get(field); + + if (value != null) { + resultHashMap.put((String) field, value); + } + } + } else { + // If, instead, no field is specified, then all the ones retrieved must be provided as result. + for (String field : deserializedTable.keySet()) { + resultHashMap.put(field, deserializedTable.get(field)); + } + } + } + + /** + * Insert a record in the database. Any field/value pairs in the specified values HashMap will be written into the + * record with the specified record key. Also creates a secondary index (2i) for each record consisting of the key + * converted to long to be used for the scan operation. + * + * @param table The name of the table (Riak bucket) + * @param key The record key of the record to insert. + * @param values A HashMap of field/value pairs to insert in the record + * @return Zero on success, a non-zero error code on error + */ + @Override + public Status insert(String table, String key, HashMap<String, ByteIterator> values) { + Location location = new Location(new Namespace(bucketType, table), key); + RiakObject object = new RiakObject(); + + object.setValue(BinaryValue.create(serializeTable(values))); + object.getIndexes().getIndex(LongIntIndex.named("key_int")).add(getKeyAsLong(key)); + + StoreValue store = new StoreValue.Builder(object) + .withLocation(location) + .withOption(Option.W, wvalue) + .build(); + + RiakFuture<StoreValue.Response, Location> future = riakClient.executeAsync(store); + + try { + future.get(transactionTimeLimit, TimeUnit.SECONDS); + } catch (TimeoutException e) { + if (debug) { + System.err.println("Unable to " + (Thread.currentThread().getStackTrace()[2] + .getMethodName().equals("update") ? "update" : "insert") + " key " + key + ". Reason: TIME OUT"); + } + + return TIME_OUT; + } catch (Exception e) { + if (debug) { + System.err.println("Unable to " + (Thread.currentThread().getStackTrace()[2] + .getMethodName().equals("update") ? "update" : "insert") + " key " + key + ". Reason: " + e.toString()); + } + + return Status.ERROR; + } + + return Status.OK; + } + + /** + * Auxiliary class needed for object substitution within the update operation. It is a fundamental part of the + * fetch-update (locally)-store cycle described by Basho to properly perform a strong-consistent update. + */ + private static final class UpdateEntity extends UpdateValue.Update<RiakObject> { + private final RiakObject object; + + private UpdateEntity(RiakObject object) { + this.object = object; + } + + //Simply returns the object. + @Override + public RiakObject apply(RiakObject original) { + return object; + } + } + + /** + * Update a record in the database. Any field/value pairs in the specified values HashMap will be written into the + * record with the specified record key, overwriting any existing values with the same field name. + * + * @param table The name of the table (Riak bucket) + * @param key The record key of the record to write. + * @param values A HashMap of field/value pairs to update in the record + * @return Zero on success, a non-zero error code on error + */ + @Override + public Status update(String table, String key, HashMap<String, ByteIterator> values) { + if (!strongConsistency) { + return insert(table, key, values); + } + + Location location = new Location(new Namespace(bucketType, table), key); + RiakObject object = new RiakObject(); + + object.setValue(BinaryValue.create(serializeTable(values))); + object.getIndexes().getIndex(LongIntIndex.named("key_int")).add(getKeyAsLong(key)); + + UpdateValue update = new UpdateValue.Builder(location) + .withUpdate(new UpdateEntity(object)) + .build(); + + RiakFuture<UpdateValue.Response, Location> future = riakClient.executeAsync(update); + + try { + // For some reason, the update transaction doesn't throw any exception when no cluster has been started, so one + // needs to check whether it was done or not. When calling the wasUpdated() function with no nodes available, a + // NullPointerException is thrown. + // Moreover, such exception could be thrown when more threads are trying to update the same key or, more + // generally, when the system is being queried by many clients (i.e. overloaded). This is a known limitation of + // Riak KV's strong consistency implementation. + future.get(transactionTimeLimit, TimeUnit.SECONDS).wasUpdated(); + } catch (TimeoutException e) { + if (debug) { + System.err.println("Unable to update key " + key + ". Reason: TIME OUT"); + } + + return TIME_OUT; + } catch (Exception e) { + if (debug) { + System.err.println("Unable to update key " + key + ". Reason: " + e.toString()); + } + + return Status.ERROR; + } + + return Status.OK; + } + + + /** + * Delete a record from the database. + * + * @param table The name of the table (Riak bucket) + * @param key The record key of the record to delete. + * @return Zero on success, a non-zero error code on error + */ + @Override + public Status delete(String table, String key) { + Location location = new Location(new Namespace(bucketType, table), key); + DeleteValue dv = new DeleteValue.Builder(location).build(); + + RiakFuture<Void, Location> future = riakClient.executeAsync(dv); + + try { + future.get(transactionTimeLimit, TimeUnit.SECONDS); + } catch (TimeoutException e) { + if (debug) { + System.err.println("Unable to delete key " + key + ". Reason: TIME OUT"); + } + + return TIME_OUT; + } catch (Exception e) { + if (debug) { + System.err.println("Unable to delete key " + key + ". Reason: " + e.toString()); + } + + return Status.ERROR; + } + + return Status.OK; + } + + public void cleanup() throws DBException { + try { + riakCluster.shutdown(); + } catch (Exception e) { + System.err.println("Unable to properly shutdown the cluster. Reason: " + e.toString()); + throw new DBException(e); + } + } + + /** + * Auxiliary function needed for testing. It configures the default bucket-type to take care of the consistency + * problem by disallowing the siblings creation. Moreover, it disables strong consistency, as the scan transaction + * test would otherwise fail. + * + * @param bucket The bucket name. + * @throws Exception Thrown if something bad happens. + */ + void setTestEnvironment(String bucket) throws Exception { + bucketType = "default"; + strongConsistency = false; + + Namespace ns = new Namespace(bucketType, bucket); + StoreBucketProperties newBucketProperties = new StoreBucketProperties.Builder(ns).withAllowMulti(false).build(); + + riakClient.execute(newBucketProperties); + } +} diff --git a/riak/src/main/java/com/yahoo/ycsb/db/riak/RiakUtils.java b/riak/src/main/java/com/yahoo/ycsb/db/riak/RiakUtils.java new file mode 100644 index 0000000000000000000000000000000000000000..e4471541c8ba2e225bacd4a43113896f4156e7a6 --- /dev/null +++ b/riak/src/main/java/com/yahoo/ycsb/db/riak/RiakUtils.java @@ -0,0 +1,147 @@ +/** + * Copyright (c) 2016 YCSB contributors All rights reserved. + * Copyright 2014 Basho Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package com.yahoo.ycsb.db.riak; + +import java.io.*; +import java.util.Map; +import java.util.Set; + +import com.yahoo.ycsb.ByteArrayByteIterator; +import com.yahoo.ycsb.ByteIterator; + +import static com.google.common.base.Preconditions.checkArgument; + +/** + * Utility class for Riak KV Client. + * + */ +final class RiakUtils { + + private RiakUtils() { + super(); + } + + private static byte[] toBytes(final int anInteger) { + byte[] aResult = new byte[4]; + + aResult[0] = (byte) (anInteger >> 24); + aResult[1] = (byte) (anInteger >> 16); + aResult[2] = (byte) (anInteger >> 8); + aResult[3] = (byte) (anInteger /* >> 0 */); + + return aResult; + } + + private static int fromBytes(final byte[] aByteArray) { + checkArgument(aByteArray.length == 4); + + return (aByteArray[0] << 24) | (aByteArray[1] & 0xFF) << 16 | (aByteArray[2] & 0xFF) << 8 | (aByteArray[3] & 0xFF); + } + + private static void close(final OutputStream anOutputStream) { + try { + anOutputStream.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + private static void close(final InputStream anInputStream) { + try { + anInputStream.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * Serializes a Map, transforming the contained list of (String, ByteIterator) couples into a byte array. + * + * @param aTable A Map to serialize. + * @return A byte array containng the serialized table. + */ + static byte[] serializeTable(Map<String, ByteIterator> aTable) { + final ByteArrayOutputStream anOutputStream = new ByteArrayOutputStream(); + final Set<Map.Entry<String, ByteIterator>> theEntries = aTable.entrySet(); + + try { + for (final Map.Entry<String, ByteIterator> anEntry : theEntries) { + final byte[] aColumnName = anEntry.getKey().getBytes(); + + anOutputStream.write(toBytes(aColumnName.length)); + anOutputStream.write(aColumnName); + + final byte[] aColumnValue = anEntry.getValue().toArray(); + + anOutputStream.write(toBytes(aColumnValue.length)); + anOutputStream.write(aColumnValue); + } + return anOutputStream.toByteArray(); + } catch (IOException e) { + throw new IllegalStateException(e); + } finally { + close(anOutputStream); + } + } + + /** + * Deserializes an input byte array, transforming it into a list of (String, ByteIterator) couples (i.e. a Map). + * + * @param aValue A byte array containing the table to deserialize. + * @param theResult A Map containing the deserialized table. + */ + static void deserializeTable(final byte[] aValue, final Map<String, ByteIterator> theResult) { + final ByteArrayInputStream anInputStream = new ByteArrayInputStream(aValue); + byte[] aSizeBuffer = new byte[4]; + + try { + while (anInputStream.available() > 0) { + anInputStream.read(aSizeBuffer); + final int aColumnNameLength = fromBytes(aSizeBuffer); + + final byte[] aColumnNameBuffer = new byte[aColumnNameLength]; + anInputStream.read(aColumnNameBuffer); + + anInputStream.read(aSizeBuffer); + final int aColumnValueLength = fromBytes(aSizeBuffer); + + final byte[] aColumnValue = new byte[aColumnValueLength]; + anInputStream.read(aColumnValue); + + theResult.put(new String(aColumnNameBuffer), new ByteArrayByteIterator(aColumnValue)); + } + } catch (Exception e) { + throw new IllegalStateException(e); + } finally { + close(anInputStream); + } + } + + /** + * Obtains a Long number from a key string. This will be the key used by Riak for all the transactions. + * + * @param key The key to convert from String to Long. + * @return A Long number parsed from the key String. + */ + static Long getKeyAsLong(String key) { + String keyString = key.replaceFirst("[a-zA-Z]*", ""); + + return Long.parseLong(keyString); + } +} diff --git a/riak/src/main/java/com/yahoo/ycsb/db/riak/package-info.java b/riak/src/main/java/com/yahoo/ycsb/db/riak/package-info.java new file mode 100644 index 0000000000000000000000000000000000000000..32d163fdcf7cc0d3b7134e382caf673d593e54b2 --- /dev/null +++ b/riak/src/main/java/com/yahoo/ycsb/db/riak/package-info.java @@ -0,0 +1,23 @@ +/** + * Copyright (c) 2016 YCSB contributors All rights reserved. + * Copyright 2014 Basho Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +/** + * The YCSB binding for <a href="http://basho.com/products/riak-kv/">Riak KV</a> 2.x.y. + * + */ +package com.yahoo.ycsb.db.riak; \ No newline at end of file diff --git a/riak/src/main/resources/riak.properties b/riak/src/main/resources/riak.properties new file mode 100644 index 0000000000000000000000000000000000000000..6e418848808a6acb852503060ef23889cde927fc --- /dev/null +++ b/riak/src/main/resources/riak.properties @@ -0,0 +1,57 @@ +## +# Copyright (c) 2016 YCSB contributors All rights reserved. +# Copyright 2014 Basho Technologies, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you +# may not use this file except in compliance with the License. You +# may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing +# permissions and limitations under the License. See accompanying +# LICENSE file. +# + +# RiakKVClient - Default Properties +# Note: Change the properties below to set the values to use for your test. You can set them either here or from the +# command line. Note that the latter choice overrides these settings. + +# riak.hosts - string list, comma separated list of IPs or FQDNs. +# EX: 127.0.0.1,127.0.0.2,127.0.0.3 or riak1.mydomain.com,riak2.mydomain.com,riak3.mydomain.com +riak.hosts=127.0.0.1 + +# riak.port - int, the port on which every node is listening. It must match the one specified in the riak.conf file +# at the line "listener.protobuf.internal". +riak.port=8087 + +# riak.bucket_type - string, must match value of bucket type created during setup. See readme.md for more information +riak.bucket_type=ycsb + +# riak.r_val - int, the R value represents the number of Riak nodes that must return results for a read before the read +# is considered successful. +riak.r_val=2 + +# riak.w_val - int, the W value represents the number of Riak nodes that must report success before an update is +# considered complete. +riak.w_val=2 + +# riak.read_retry_count - int, number of times the client will try to read a key from Riak. +riak.read_retry_count=5 + +# riak.wait_time_before_retry - int, time (in milliseconds) the client waits before attempting to perform another +# read if the previous one failed. +riak.wait_time_before_retry=200 + +# riak.transaction_time_limit - int, time (in seconds) the client waits before aborting the current transaction. +riak.transaction_time_limit=10 + +# riak.strong_consistency - boolean, indicates whether to use strong consistency (true) or eventual consistency (false). +riak.strong_consistency=true + +# riak.debug - boolean, enables debug mode. This displays all the properties (specified or defaults) when a benchmark +# is started. +riak.debug=false \ No newline at end of file diff --git a/riak/src/test/java/com/yahoo/ycsb/db/riak/RiakKVClientTest.java b/riak/src/test/java/com/yahoo/ycsb/db/riak/RiakKVClientTest.java new file mode 100644 index 0000000000000000000000000000000000000000..a571fe43242601a653227f0e1fbc255780e626ef --- /dev/null +++ b/riak/src/test/java/com/yahoo/ycsb/db/riak/RiakKVClientTest.java @@ -0,0 +1,264 @@ +/** + * Copyright (c) 2016 YCSB contributors All rights reserved. + * Copyright 2014 Basho Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package com.yahoo.ycsb.db.riak; + +import java.util.*; + +import com.yahoo.ycsb.ByteIterator; +import com.yahoo.ycsb.Status; +import com.yahoo.ycsb.StringByteIterator; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assume.assumeNoException; +import static org.junit.Assume.assumeThat; + +/** + * Integration tests for the Riak KV client. + */ +public class RiakKVClientTest { + private static RiakKVClient riakClient; + + private static final String bucket = "testBucket"; + private static final String keyPrefix = "testKey"; + private static final int recordsToInsert = 20; + private static final int recordsToScan = 7; + private static final String firstField = "Key number"; + private static final String secondField = "Key number doubled"; + private static final String thirdField = "Key number square"; + + private static boolean testStarted = false; + + /** + * Creates a cluster for testing purposes. + */ + @BeforeClass + public static void setUpClass() throws Exception { + riakClient = new RiakKVClient(); + riakClient.init(); + + // Set the test bucket environment with the appropriate parameters. + try { + riakClient.setTestEnvironment(bucket); + } catch(Exception e) { + assumeNoException("Unable to configure Riak KV for test, aborting.", e); + } + + // Just add some records to work on... + for (int i = 0; i < recordsToInsert; i++) { + // Abort the entire test whenever the dataset population operation fails. + assumeThat("Riak KV is NOT RUNNING, aborting test.", + riakClient.insert(bucket, keyPrefix + String.valueOf(i), StringByteIterator.getByteIteratorMap( + createExpectedHashMap(i))), + is(Status.OK)); + } + + // Variable to check to determine whether the test has started or not. + testStarted = true; + } + + /** + * Shuts down the cluster created. + */ + @AfterClass + public static void tearDownClass() throws Exception { + // Delete all added keys before cleanup ONLY IF TEST ACTUALLY STARTED. + if (testStarted) { + for (int i = 0; i <= recordsToInsert; i++) { + delete(keyPrefix + Integer.toString(i)); + } + } + + riakClient.cleanup(); + } + + /** + * Test method for read transaction. It is designed to read two of the three fields stored for each key, to also test + * if the createResultHashMap() function implemented in RiakKVClient.java works as expected. + */ + @Test + public void testRead() { + // Choose a random key to read, among the available ones. + int readKeyNumber = new Random().nextInt(recordsToInsert); + + // Prepare two fields to read. + Set<String> fields = new HashSet<>(); + fields.add(firstField); + fields.add(thirdField); + + // Prepare an expected result. + HashMap<String, String> expectedValue = new HashMap<>(); + expectedValue.put(firstField, Integer.toString(readKeyNumber)); + expectedValue.put(thirdField, Integer.toString(readKeyNumber * readKeyNumber)); + + // Define a HashMap to store the actual result. + HashMap<String, ByteIterator> readValue = new HashMap<>(); + + // If a read transaction has been properly done, then one has to receive a Status.OK return from the read() + // function. Moreover, the actual returned result MUST match the expected one. + assertEquals("Read transaction FAILED.", + Status.OK, + riakClient.read(bucket, keyPrefix + Integer.toString(readKeyNumber), fields, readValue)); + + assertEquals("Read test FAILED. Actual read transaction value is NOT MATCHING the expected one.", + expectedValue.toString(), + readValue.toString()); + } + + /** + * Test method for scan transaction. A scan transaction has to be considered successfully completed only if all the + * requested values are read (i.e. scan transaction returns with Status.OK). Moreover, one has to check if the + * obtained results match the expected ones. + */ + @Test + public void testScan() { + // Choose, among the available ones, a random key as starting point for the scan transaction. + int startScanKeyNumber = new Random().nextInt(recordsToInsert - recordsToScan); + + // Prepare a HashMap vector to store the scan transaction results. + Vector<HashMap<String, ByteIterator>> scannedValues = new Vector<>(); + + // Check whether the scan transaction is correctly performed or not. + assertEquals("Scan transaction FAILED.", + Status.OK, + riakClient.scan(bucket, keyPrefix + Integer.toString(startScanKeyNumber), recordsToScan, null, + scannedValues)); + + // After the scan transaction completes, compare the obtained results with the expected ones. + for (int i = 0; i < recordsToScan; i++) { + assertEquals("Scan test FAILED: the current scanned key is NOT MATCHING the expected one.", + createExpectedHashMap(startScanKeyNumber + i).toString(), + scannedValues.get(i).toString()); + } + } + + /** + * Test method for update transaction. The test is designed to restore the previously read key. It is assumed to be + * correct when, after performing the update transaction, one reads the just provided values. + */ + @Test + public void testUpdate() { + // Choose a random key to read, among the available ones. + int updateKeyNumber = new Random().nextInt(recordsToInsert); + + // Define a HashMap to save the previously stored values for eventually restoring them. + HashMap<String, ByteIterator> readValueBeforeUpdate = new HashMap<>(); + riakClient.read(bucket, keyPrefix + Integer.toString(updateKeyNumber), null, readValueBeforeUpdate); + + // Prepare an update HashMap to store. + HashMap<String, String> updateValue = new HashMap<>(); + updateValue.put(firstField, "UPDATED"); + updateValue.put(secondField, "UPDATED"); + updateValue.put(thirdField, "UPDATED"); + + // First of all, perform the update and check whether it's failed or not. + assertEquals("Update transaction FAILED.", + Status.OK, + riakClient.update(bucket, keyPrefix + Integer.toString(updateKeyNumber), StringByteIterator + .getByteIteratorMap(updateValue))); + + // Then, read the key again and... + HashMap<String, ByteIterator> readValueAfterUpdate = new HashMap<>(); + assertEquals("Update test FAILED. Unable to read key value.", + Status.OK, + riakClient.read(bucket, keyPrefix + Integer.toString(updateKeyNumber), null, readValueAfterUpdate)); + + // ...compare the result with the new one! + assertEquals("Update transaction NOT EXECUTED PROPERLY. Values DID NOT CHANGE.", + updateValue.toString(), + readValueAfterUpdate.toString()); + + // Finally, restore the previously read key. + assertEquals("Update test FAILED. Unable to restore previous key value.", + Status.OK, + riakClient.update(bucket, keyPrefix + Integer.toString(updateKeyNumber), readValueBeforeUpdate)); + } + + /** + * Test method for insert transaction. It is designed to insert a key just after the last key inserted in the setUp() + * phase. + */ + @Test + public void testInsert() { + // Define a HashMap to insert and another one for the comparison operation. + HashMap<String, String> insertValue = createExpectedHashMap(recordsToInsert); + HashMap<String, ByteIterator> readValue = new HashMap<>(); + + // Check whether the insertion transaction was performed or not. + assertEquals("Insert transaction FAILED.", + Status.OK, + riakClient.insert(bucket, keyPrefix + Integer.toString(recordsToInsert), StringByteIterator. + getByteIteratorMap(insertValue))); + + // Finally, compare the insertion performed with the one expected by reading the key. + assertEquals("Insert test FAILED. Unable to read inserted value.", + Status.OK, + riakClient.read(bucket, keyPrefix + Integer.toString(recordsToInsert), null, readValue)); + assertEquals("Insert test FAILED. Actual read transaction value is NOT MATCHING the inserted one.", + insertValue.toString(), + readValue.toString()); + } + + /** + * Test method for delete transaction. The test deletes a key, then performs a read that should give a + * Status.NOT_FOUND response. Finally, it restores the previously read key. + */ + @Test + public void testDelete() { + // Choose a random key to delete, among the available ones. + int deleteKeyNumber = new Random().nextInt(recordsToInsert); + + // Define a HashMap to save the previously stored values for its eventual restore. + HashMap<String, ByteIterator> readValueBeforeDelete = new HashMap<>(); + riakClient.read(bucket, keyPrefix + Integer.toString(deleteKeyNumber), null, readValueBeforeDelete); + + // First of all, delete the key. + assertEquals("Delete transaction FAILED.", + Status.OK, + delete(keyPrefix + Integer.toString(deleteKeyNumber))); + + // Then, check if the deletion was actually achieved. + assertEquals("Delete test FAILED. Key NOT deleted.", + Status.NOT_FOUND, + riakClient.read(bucket, keyPrefix + Integer.toString(deleteKeyNumber), null, null)); + + // Finally, restore the previously deleted key. + assertEquals("Delete test FAILED. Unable to restore previous key value.", + Status.OK, + riakClient.insert(bucket, keyPrefix + Integer.toString(deleteKeyNumber), readValueBeforeDelete)); + } + + private static Status delete(String key) { + return riakClient.delete(bucket, key); + } + + private static HashMap<String, String> createExpectedHashMap(int value) { + HashMap<String, String> values = new HashMap<>(); + + values.put(firstField, Integer.toString(value)); + values.put(secondField, Integer.toString(2 * value)); + values.put(thirdField, Integer.toString(value * value)); + + return values; + } +}