Skip to content
Snippets Groups Projects
Commit 679366eb authored by nygard_89's avatar nygard_89
Browse files

[riak] Added a workaround to allow strong-consistent scan transactions.

[riak] Moved the utility function createResultHashMap() to the utility class RiakUtils.java.
[riak] Added riak.strong_consistent_scans_bucket_type property to riak.properties file to set the fake bucket-type name.
[riak] Updated README.md with a guide on how to allow strong-consistent scan transactions.
parent 465fb7ef
No related branches found
No related tags found
No related merge requests found
...@@ -21,7 +21,7 @@ Riak KV Client for Yahoo! Cloud System Benchmark (YCSB) ...@@ -21,7 +21,7 @@ Riak KV Client for Yahoo! Cloud System Benchmark (YCSB)
The Riak KV YCSB client is designed to work with the Yahoo! Cloud System Benchmark (YCSB) project (https://github.com/brianfrankcooper/YCSB) to support performance testing for the 2.x.y line of the Riak KV database. The Riak KV YCSB client is designed to work with the Yahoo! Cloud System Benchmark (YCSB) project (https://github.com/brianfrankcooper/YCSB) to support performance testing for the 2.x.y line of the Riak KV database.
Creating a <i>bucket type</i> to use with YCSB Creating a <i>bucket-type</i> to use with YCSB
---------------------------- ----------------------------
Perform the following operations on your Riak cluster to configure it for the benchmarks. Perform the following operations on your Riak cluster to configure it for the benchmarks.
...@@ -31,8 +31,11 @@ Set the default backend for Riak to <i>LevelDB</i> in the `riak.conf` file of ev ...@@ -31,8 +31,11 @@ Set the default backend for Riak to <i>LevelDB</i> in the `riak.conf` file of ev
``` ```
storage_backend = leveldb storage_backend = leveldb
``` ```
After this, create a bucket type named "ycsb"<sup id="a1">[1](#f1)</sup> by logging into one of the nodes in your cluster. Now you're ready to set up the cluster to operate using one between strong and eventual consistency model as shown in the next two subsections.
Now, create a bucket type named "ycsb"<sup id="a1">[1](#f1)</sup> by logging into one of the nodes in your cluster. Then, to use the <i>strong consistency model</i><sup id="a2">[2](#f2)</sup> (default), you need to follow the next two steps. ###Strong consistency model
To use the <i>strong consistency model</i> (default), you need to follow the next two steps.
1. In every `riak.conf` file, search for the `##strong_consistency=on` line and uncomment it. It's important that you do this <b>before you start your cluster</b>! 1. In every `riak.conf` file, search for the `##strong_consistency=on` line and uncomment it. It's important that you do this <b>before you start your cluster</b>!
2. Run the following `riak-admin` commands: 2. Run the following `riak-admin` commands:
...@@ -42,9 +45,24 @@ Now, create a bucket type named "ycsb"<sup id="a1">[1](#f1)</sup> by logging int ...@@ -42,9 +45,24 @@ Now, create a bucket type named "ycsb"<sup id="a1">[1](#f1)</sup> by logging int
riak-admin bucket-type activate ycsb riak-admin bucket-type activate ycsb
``` ```
Note that when using the strong consistency model, you **may have to specify the number of replicas to create for each object**. The *R* and *W* parameters (see next section) will in fact be ignored. The only information needed by this consistency model is how many nodes the system has to successfully query to consider a transaction completed. To set this parameter, you can add `"n_val":N` to the list of properties shown above (by default `N` is set to 3). When using this model, you **may want to specify the number of replicas to create for each object**<sup id="a2">[2](#f2)</sup>: the *R* and *W* parameters (see next section) will in fact be ignored. The only information needed by this consistency model is how many nodes the system has to successfully query to consider a transaction completed. To set this parameter, you can add `"n_val":N` to the list of properties shown above (by default `N` is set to 3).
####A note on the scan transactions
Currently, `scan` transactions are not _directly_ supported, as there is no suitable mean to perform them properly. This will not cause the benchmark to fail, it simply won't perform any scan transaction at all (these will immediately return with a `Status.NOT_IMPLEMENTED` code).
However, a possible workaround has been provided: considering that Riak doesn't allow strong-consistent bucket-types to use secondary indexes, we can create an eventually consistent one just to store (*key*, *2i indexes*) pairs. This will be later used only to obtain the keys where the objects are located, which will be then used to retrieve the actual objects from the strong-consistent bucket. If you want to use this workaround, then you have to create and activate a "_fake bucket-type_" using the following commands:
```
riak-admin bucket-type create fakeBucketType '{"props":{"allow_mult":"false","n_val":1,"dvv_enabled":false,"last_write_wins":true}}'
riak-admin bucket-type activate fakeBucketType
```
A bucket-type so defined isn't allowed to _create siblings_ (`allow_mult":"false"`) and it'll have just _one replica_ (`"n_val":1`) which'll store the _last value provided_ (`"last_write_wins":true`). Note that setting `"n_val":1` means that the `scan` transactions won't be much *fault-tolerant*. You could increase this value, but this will necessarily require the cluster more work. So, the choice is yours to make!
Then you have to set the `riak.strong_consistent_scans_bucket_type` property (see next section) equal to the name you gave to the aforementioned "fake bucket-type" (e.g. `fakeBucketType` in this case).
Please note that this workaround involves a **double store operation for each insert transaction**, one to store the actual object and another one to save the corresponding 2i index. In practice, the client won't notice any difference, as the latter operation is performed asynchronously. However, the cluster will be obviously more loaded, and this is why the proposed "fake bucket-type" to create is as less _resource-demanding_ as possible.
If instead you want to use the <i>eventual consistency model</i> implemented in Riak, then type: ###Eventual consistency model
If you want to use the <i>eventual consistency model</i> implemented in Riak, you have just to type:
``` ```
riak-admin bucket-type create ycsb '{"props":{"allow_mult":"false"}}' riak-admin bucket-type create ycsb '{"props":{"allow_mult":"false"}}'
riak-admin bucket-type activate ycsb riak-admin bucket-type activate ycsb
...@@ -63,10 +81,12 @@ You can either specify these configuration parameters via command line or set th ...@@ -63,10 +81,12 @@ You can either specify these configuration parameters via command line or set th
* `riak.wait_time_before_retry` - <b>int</b>, the time (in milliseconds) before the client attempts to perform another read if the previous one failed. * `riak.wait_time_before_retry` - <b>int</b>, the time (in milliseconds) before the client attempts to perform another read if the previous one failed.
* `riak.transaction_time_limit` - <b>int</b>, the time (in seconds) the client waits before aborting the current transaction. * `riak.transaction_time_limit` - <b>int</b>, the time (in seconds) the client waits before aborting the current transaction.
* `riak.strong_consistency` - <b>boolean</b>, indicates whether to use *strong consistency* (true) or *eventual consistency* (false). * `riak.strong_consistency` - <b>boolean</b>, indicates whether to use *strong consistency* (true) or *eventual consistency* (false).
* `riak.strong_consistent_scans_bucket_type` - **string**, indicates the bucket-type to use to allow scans transactions when using strong consistency mode.
* `riak.debug` - <b>boolean</b>, enables debug mode. This displays all the properties (specified or defaults) when a benchmark is started. Moreover, it shows error causes whenever these occur. * `riak.debug` - <b>boolean</b>, enables debug mode. This displays all the properties (specified or defaults) when a benchmark is started. Moreover, it shows error causes whenever these occur.
<b>Note</b>: For more information on workloads and how to run them please see: https://github.com/brianfrankcooper/YCSB/wiki/Running-a-Workload <b>Note</b>: For more information on workloads and how to run them please see: https://github.com/brianfrankcooper/YCSB/wiki/Running-a-Workload
<b id="f1">1</b> As specified in the `riak.properties` file. See parameters configuration section for further info. [](#a1) <b id="f1">1</b> As specified in the `riak.properties` file. See parameters configuration section for further info. [](#a1)
<b id="f2">2</b> <b>IMPORTANT NOTE:</b> Currently the `scan` transactions are <b>NOT SUPPORTED</b> for the benchmarks which use the strong consistency model! However this will not cause the benchmark to fail, it simply won't perform any scan transaction at all. These latter will immediately return with a `Status.NOT_IMPLEMENTED` code. [](#a2) <b id="f2">2</b> More info about properly setting up a fault-tolerant cluster can be found at http://docs.basho.com/riak/kv/2.1.4/configuring/strong-consistency/#enabling-strong-consistency.[](#a2)
...@@ -19,8 +19,12 @@ ...@@ -19,8 +19,12 @@
package com.yahoo.ycsb.db.riak; package com.yahoo.ycsb.db.riak;
import com.basho.riak.client.api.commands.buckets.StoreBucketProperties; import com.basho.riak.client.api.commands.buckets.StoreBucketProperties;
import com.basho.riak.client.api.commands.kv.StoreValue;
import com.basho.riak.client.api.commands.kv.UpdateValue; import com.basho.riak.client.api.commands.kv.UpdateValue;
import com.basho.riak.client.core.RiakFuture; import com.basho.riak.client.core.RiakFuture;
import com.basho.riak.client.core.query.RiakObject;
import com.basho.riak.client.core.query.indexes.LongIntIndex;
import com.basho.riak.client.core.util.BinaryValue;
import com.yahoo.ycsb.*; import com.yahoo.ycsb.*;
import java.io.IOException; import java.io.IOException;
...@@ -34,17 +38,12 @@ import com.basho.riak.client.api.cap.Quorum; ...@@ -34,17 +38,12 @@ import com.basho.riak.client.api.cap.Quorum;
import com.basho.riak.client.api.commands.indexes.IntIndexQuery; import com.basho.riak.client.api.commands.indexes.IntIndexQuery;
import com.basho.riak.client.api.commands.kv.DeleteValue; import com.basho.riak.client.api.commands.kv.DeleteValue;
import com.basho.riak.client.api.commands.kv.FetchValue; import com.basho.riak.client.api.commands.kv.FetchValue;
import com.basho.riak.client.api.commands.kv.StoreValue;
import com.basho.riak.client.api.commands.kv.StoreValue.Option;
import com.basho.riak.client.core.RiakCluster; import com.basho.riak.client.core.RiakCluster;
import com.basho.riak.client.core.RiakNode; import com.basho.riak.client.core.RiakNode;
import com.basho.riak.client.core.query.Location; import com.basho.riak.client.core.query.Location;
import com.basho.riak.client.core.query.Namespace; import com.basho.riak.client.core.query.Namespace;
import com.basho.riak.client.core.query.RiakObject;
import com.basho.riak.client.core.query.indexes.LongIntIndex;
import com.basho.riak.client.core.util.BinaryValue;
import static com.yahoo.ycsb.db.riak.RiakUtils.deserializeTable; import static com.yahoo.ycsb.db.riak.RiakUtils.createResultHashMap;
import static com.yahoo.ycsb.db.riak.RiakUtils.getKeyAsLong; import static com.yahoo.ycsb.db.riak.RiakUtils.getKeyAsLong;
import static com.yahoo.ycsb.db.riak.RiakUtils.serializeTable; import static com.yahoo.ycsb.db.riak.RiakUtils.serializeTable;
...@@ -52,7 +51,7 @@ import static com.yahoo.ycsb.db.riak.RiakUtils.serializeTable; ...@@ -52,7 +51,7 @@ import static com.yahoo.ycsb.db.riak.RiakUtils.serializeTable;
* Riak KV 2.x.y client for YCSB framework. * Riak KV 2.x.y client for YCSB framework.
* *
*/ */
public final class RiakKVClient extends DB { public class RiakKVClient extends DB {
private static final String HOST_PROPERTY = "riak.hosts"; private static final String HOST_PROPERTY = "riak.hosts";
private static final String PORT_PROPERTY = "riak.port"; private static final String PORT_PROPERTY = "riak.port";
private static final String BUCKET_TYPE_PROPERTY = "riak.bucket_type"; private static final String BUCKET_TYPE_PROPERTY = "riak.bucket_type";
...@@ -62,6 +61,7 @@ public final class RiakKVClient extends DB { ...@@ -62,6 +61,7 @@ public final class RiakKVClient extends DB {
private static final String WAIT_TIME_BEFORE_RETRY_PROPERTY = "riak.wait_time_before_retry"; private static final String WAIT_TIME_BEFORE_RETRY_PROPERTY = "riak.wait_time_before_retry";
private static final String TRANSACTION_TIME_LIMIT_PROPERTY = "riak.transaction_time_limit"; private static final String TRANSACTION_TIME_LIMIT_PROPERTY = "riak.transaction_time_limit";
private static final String STRONG_CONSISTENCY_PROPERTY = "riak.strong_consistency"; private static final String STRONG_CONSISTENCY_PROPERTY = "riak.strong_consistency";
private static final String STRONG_CONSISTENT_SCANS_BUCKET_TYPE_PROPERTY = "riak.strong_consistent_scans_bucket_type";
private static final String DEBUG_PROPERTY = "riak.debug"; private static final String DEBUG_PROPERTY = "riak.debug";
private static final Status TIME_OUT = new Status("TIME_OUT", "Cluster didn't respond after maximum wait time."); private static final Status TIME_OUT = new Status("TIME_OUT", "Cluster didn't respond after maximum wait time.");
...@@ -69,12 +69,15 @@ public final class RiakKVClient extends DB { ...@@ -69,12 +69,15 @@ public final class RiakKVClient extends DB {
private String[] hosts; private String[] hosts;
private int port; private int port;
private String bucketType; private String bucketType;
private String bucketType2i;
private Quorum rvalue; private Quorum rvalue;
private Quorum wvalue; private Quorum wvalue;
private int readRetryCount; private int readRetryCount;
private int waitTimeBeforeRetry; private int waitTimeBeforeRetry;
private int transactionTimeLimit; private int transactionTimeLimit;
private boolean strongConsistency; private boolean strongConsistency;
private String strongConsistentScansBucketType;
private boolean performStrongConsistentScans;
private boolean debug; private boolean debug;
private RiakClient riakClient; private RiakClient riakClient;
...@@ -99,12 +102,15 @@ public final class RiakKVClient extends DB { ...@@ -99,12 +102,15 @@ public final class RiakKVClient extends DB {
waitTimeBeforeRetry = Integer.parseInt(propsPF.getProperty(WAIT_TIME_BEFORE_RETRY_PROPERTY)); waitTimeBeforeRetry = Integer.parseInt(propsPF.getProperty(WAIT_TIME_BEFORE_RETRY_PROPERTY));
transactionTimeLimit = Integer.parseInt(propsPF.getProperty(TRANSACTION_TIME_LIMIT_PROPERTY)); transactionTimeLimit = Integer.parseInt(propsPF.getProperty(TRANSACTION_TIME_LIMIT_PROPERTY));
strongConsistency = Boolean.parseBoolean(propsPF.getProperty(STRONG_CONSISTENCY_PROPERTY)); strongConsistency = Boolean.parseBoolean(propsPF.getProperty(STRONG_CONSISTENCY_PROPERTY));
strongConsistentScansBucketType = propsPF.getProperty(STRONG_CONSISTENT_SCANS_BUCKET_TYPE_PROPERTY);
debug = Boolean.parseBoolean(propsPF.getProperty(DEBUG_PROPERTY)); debug = Boolean.parseBoolean(propsPF.getProperty(DEBUG_PROPERTY));
} }
private void loadProperties() { private void loadProperties() {
// First, load the default properties...
loadDefaultProperties(); loadDefaultProperties();
// ...then, check for some props set at command line!
Properties props = getProperties(); Properties props = getProperties();
String portString = props.getProperty(PORT_PROPERTY); String portString = props.getProperty(PORT_PROPERTY);
...@@ -152,6 +158,11 @@ public final class RiakKVClient extends DB { ...@@ -152,6 +158,11 @@ public final class RiakKVClient extends DB {
strongConsistency = Boolean.parseBoolean(strongConsistencyString); strongConsistency = Boolean.parseBoolean(strongConsistencyString);
} }
String strongConsistentScansBucketTypeString = props.getProperty(STRONG_CONSISTENT_SCANS_BUCKET_TYPE_PROPERTY);
if (strongConsistentScansBucketTypeString != null) {
strongConsistentScansBucketType = strongConsistentScansBucketTypeString;
}
String debugString = props.getProperty(DEBUG_PROPERTY); String debugString = props.getProperty(DEBUG_PROPERTY);
if (debugString != null) { if (debugString != null) {
debug = Boolean.parseBoolean(debugString); debug = Boolean.parseBoolean(debugString);
...@@ -161,6 +172,32 @@ public final class RiakKVClient extends DB { ...@@ -161,6 +172,32 @@ public final class RiakKVClient extends DB {
public void init() throws DBException { public void init() throws DBException {
loadProperties(); loadProperties();
RiakNode.Builder builder = new RiakNode.Builder().withRemotePort(port);
List<RiakNode> nodes = RiakNode.Builder.buildNodes(builder, Arrays.asList(hosts));
riakCluster = new RiakCluster.Builder(nodes).build();
try {
riakCluster.start();
riakClient = new RiakClient(riakCluster);
} catch (Exception e) {
System.err.println("Unable to properly start up the cluster. Reason: " + e.toString());
throw new DBException(e);
}
// If strong consistency is in use, we need to change the bucket-type where the 2i indexes will be stored.
if (strongConsistency && !strongConsistentScansBucketType.isEmpty()) {
// The 2i indexes have to be stored in the appositely created strongConsistentScansBucketType: this however has
// to be done only if the user actually created it! So, if the latter doesn't exist, then the scan transactions
// will not be performed at all.
bucketType2i = strongConsistentScansBucketType;
performStrongConsistentScans = true;
} else {
// If instead eventual consistency is in use, then the 2i indexes have to be stored in the bucket-type
// indicated with the bucketType variable.
bucketType2i = bucketType;
performStrongConsistentScans = false;
}
if (debug) { if (debug) {
System.err.println("DEBUG ENABLED. Configuration parameters:"); System.err.println("DEBUG ENABLED. Configuration parameters:");
System.err.println("-----------------------------------------"); System.err.println("-----------------------------------------");
...@@ -173,18 +210,11 @@ public final class RiakKVClient extends DB { ...@@ -173,18 +210,11 @@ public final class RiakKVClient extends DB {
System.err.println("Wait Time Before Retry: " + waitTimeBeforeRetry + " ms"); System.err.println("Wait Time Before Retry: " + waitTimeBeforeRetry + " ms");
System.err.println("Transaction Time Limit: " + transactionTimeLimit + " s"); System.err.println("Transaction Time Limit: " + transactionTimeLimit + " s");
System.err.println("Consistency model: " + (strongConsistency ? "Strong" : "Eventual")); System.err.println("Consistency model: " + (strongConsistency ? "Strong" : "Eventual"));
}
RiakNode.Builder builder = new RiakNode.Builder().withRemotePort(port); if (strongConsistency) {
List<RiakNode> nodes = RiakNode.Builder.buildNodes(builder, Arrays.asList(hosts)); System.err.println("Strong Consistent Scan Transactions " + (performStrongConsistentScans ? "" : "NOT ") +
riakCluster = new RiakCluster.Builder(nodes).build(); "allowed.");
}
try {
riakCluster.start();
riakClient = new RiakClient(riakCluster);
} catch (Exception e) {
System.err.println("Unable to properly start up the cluster. Reason: " + e.toString());
throw new DBException(e);
} }
} }
...@@ -237,8 +267,6 @@ public final class RiakKVClient extends DB { ...@@ -237,8 +267,6 @@ public final class RiakKVClient extends DB {
* Perform a range scan for a set of records in the database. Each field/value pair from the result will be stored in * Perform a range scan for a set of records in the database. Each field/value pair from the result will be stored in
* a HashMap. * a HashMap.
* Note: The scan operation requires the use of secondary indexes (2i) and LevelDB. * Note: The scan operation requires the use of secondary indexes (2i) and LevelDB.
* IMPORTANT NOTE: the 2i queries DO NOT WORK in conjunction with strong consistency (ref: http://docs.basho
* .com/riak/kv/2.1.4/developing/usage/secondary-indexes/)!
* *
* @param table The name of the table (Riak bucket) * @param table The name of the table (Riak bucket)
* @param startkey The record key of the first record to read. * @param startkey The record key of the first record to read.
...@@ -250,29 +278,44 @@ public final class RiakKVClient extends DB { ...@@ -250,29 +278,44 @@ public final class RiakKVClient extends DB {
@Override @Override
public Status scan(String table, String startkey, int recordcount, Set<String> fields, public Status scan(String table, String startkey, int recordcount, Set<String> fields,
Vector<HashMap<String, ByteIterator>> result) { Vector<HashMap<String, ByteIterator>> result) {
// As of 2.1.4 Riak KV version, strong consistency does not support any suitable mean capable of searching if (strongConsistency && !performStrongConsistentScans) {
// consecutive stored keys, as requested by a scan transaction. So, the latter WILL NOT BE PERFORMED AT ALL!
// More info at http://docs.basho.com/riak/kv/2.1.4/developing/app-guide/strong-consistency/
if (strongConsistency) {
return Status.NOT_IMPLEMENTED; return Status.NOT_IMPLEMENTED;
} }
Namespace ns = new Namespace(bucketType, table); // The strong consistent bucket-type is not capable of storing 2i indexes. So, we need to read them from the fake
// one (which we use only to store indexes). This is why, when using such a consistency model, the bucketType2i
// variable is set to FAKE_BUCKET_TYPE.
IntIndexQuery iiq = new IntIndexQuery IntIndexQuery iiq = new IntIndexQuery
.Builder(ns, "key", getKeyAsLong(startkey), Long.MAX_VALUE) .Builder(new Namespace(bucketType2i, table), "key", getKeyAsLong(startkey), Long.MAX_VALUE)
.withMaxResults(recordcount) .withMaxResults(recordcount)
.withPaginationSort(true) .withPaginationSort(true)
.build(); .build();
Location location;
RiakFuture<IntIndexQuery.Response, IntIndexQuery> future = riakClient.executeAsync(iiq); RiakFuture<IntIndexQuery.Response, IntIndexQuery> future = riakClient.executeAsync(iiq);
try { try {
IntIndexQuery.Response response = future.get(transactionTimeLimit, TimeUnit.SECONDS); IntIndexQuery.Response response = future.get(transactionTimeLimit, TimeUnit.SECONDS);
List<IntIndexQuery.Response.Entry> entries = response.getEntries(); List<IntIndexQuery.Response.Entry> entries = response.getEntries();
// If no entries were retrieved, then something bad happened...
if (entries.size() == 0) {
if (debug) {
System.err.println("Unable to scan any record starting from key " + startkey + ", aborting transaction. " +
"Reason: NOT FOUND");
}
return Status.NOT_FOUND;
}
for (IntIndexQuery.Response.Entry entry : entries) { for (IntIndexQuery.Response.Entry entry : entries) {
Location location = entry.getRiakObjectLocation(); // If strong consistency is in use, then the actual location of the object we want to read is obtained by
// fetching the key from the one retrieved with the 2i indexes search operation.
if (strongConsistency) {
location = new Location(new Namespace(bucketType, table), entry.getRiakObjectLocation().getKeyAsString());
} else {
location = entry.getRiakObjectLocation();
}
FetchValue fv = new FetchValue.Builder(location) FetchValue fv = new FetchValue.Builder(location)
.withOption(FetchValue.Option.R, rvalue) .withOption(FetchValue.Option.R, rvalue)
...@@ -282,21 +325,22 @@ public final class RiakKVClient extends DB { ...@@ -282,21 +325,22 @@ public final class RiakKVClient extends DB {
if (keyResponse.isNotFound()) { if (keyResponse.isNotFound()) {
if (debug) { if (debug) {
System.err.println("Unable to scan all records starting from key " + startkey + ", aborting transaction. " + System.err.println("Unable to scan all requested records starting from key " + startkey + ", aborting " +
"Reason: NOT FOUND"); "transaction. Reason: NOT FOUND");
} }
return Status.NOT_FOUND; return Status.NOT_FOUND;
} }
// Create the partial result to add to the result vector.
HashMap<String, ByteIterator> partialResult = new HashMap<>(); HashMap<String, ByteIterator> partialResult = new HashMap<>();
createResultHashMap(fields, keyResponse, partialResult); createResultHashMap(fields, keyResponse, partialResult);
result.add(partialResult); result.add(partialResult);
} }
} catch (TimeoutException e) { } catch (TimeoutException e) {
if (debug) { if (debug) {
System.err.println("Unable to scan all records starting from key " + startkey + ", aborting transaction. " + System.err.println("Unable to scan all requested records starting from key " + startkey + ", aborting " +
"Reason: TIME OUT"); "transaction. Reason: TIME OUT");
} }
return TIME_OUT; return TIME_OUT;
...@@ -348,45 +392,6 @@ public final class RiakKVClient extends DB { ...@@ -348,45 +392,6 @@ public final class RiakKVClient extends DB {
return response; return response;
} }
/**
* Function that retrieves all the fields searched within a read or scan operation and puts them in the result
* HashMap.
*
* @param fields The list of fields to read, or null for all of them.
* @param response A Vector of HashMaps, where each HashMap is a set field/value pairs for one record.
* @param resultHashMap The HashMap to return as result.
*/
private void createResultHashMap(Set<String> fields, FetchValue.Response response, HashMap<String, ByteIterator>
resultHashMap) {
// If everything went fine, then a result must be given. Such an object is a hash table containing the (field,
// value) pairs based on the requested fields. Note that in a read operation, ONLY ONE OBJECT IS RETRIEVED!
// The following line retrieves the previously serialized table which was store with an insert transaction.
byte[] responseFieldsAndValues = response.getValues().get(0).getValue().getValue();
// Deserialize the stored response table.
HashMap<String, ByteIterator> deserializedTable = new HashMap<>();
deserializeTable(responseFieldsAndValues, deserializedTable);
// If only specific fields are requested, then only these should be put in the result object!
if (fields != null) {
// Populate the HashMap to provide as result.
for (Object field : fields.toArray()) {
// Comparison between a requested field and the ones retrieved. If they're equal (i.e. the get() operation
// DOES NOT return a null value), then proceed to store the pair in the resultHashMap.
ByteIterator value = deserializedTable.get(field);
if (value != null) {
resultHashMap.put((String) field, value);
}
}
} else {
// If, instead, no field is specified, then all the ones retrieved must be provided as result.
for (String field : deserializedTable.keySet()) {
resultHashMap.put(field, deserializedTable.get(field));
}
}
}
/** /**
* Insert a record in the database. Any field/value pairs in the specified values HashMap will be written into the * Insert a record in the database. Any field/value pairs in the specified values HashMap will be written into the
* record with the specified record key. Also creates a secondary index (2i) for each record consisting of the key * record with the specified record key. Also creates a secondary index (2i) for each record consisting of the key
...@@ -402,12 +407,40 @@ public final class RiakKVClient extends DB { ...@@ -402,12 +407,40 @@ public final class RiakKVClient extends DB {
Location location = new Location(new Namespace(bucketType, table), key); Location location = new Location(new Namespace(bucketType, table), key);
RiakObject object = new RiakObject(); RiakObject object = new RiakObject();
// Strong consistency doesn't support secondary indexing, but eventually consistent model does. So, we can mock a
// 2i usage by creating a fake object stored in an eventually consistent bucket-type with the SAME KEY THAT THE
// ACTUAL OBJECT HAS. This latter is obviously stored in the strong consistent bucket-type indicated with the
// riak.bucket_type property.
if (strongConsistency && performStrongConsistentScans) {
// Create a fake object to store in the default bucket-type just to keep track of the 2i indices.
Location fakeLocation = new Location(new Namespace(strongConsistentScansBucketType, table), key);
// Obviously, we want the fake object to contain as less data as possible. We can't create a void object, so
// we have to choose the minimum data size allowed: it is one byte.
RiakObject fakeObject = new RiakObject();
fakeObject.setValue(BinaryValue.create(new byte[]{0x00}));
fakeObject.getIndexes().getIndex(LongIntIndex.named("key_int")).add(getKeyAsLong(key));
StoreValue fakeStore = new StoreValue.Builder(fakeObject)
.withLocation(fakeLocation)
.build();
// We don't mind whether the operation is finished or not, because waiting for it to complete would slow down the
// client and make our solution too heavy to be seen as a valid compromise. This will obviously mean that under
// heavy load conditions a scan operation could fail due to an unfinished "fakeStore".
riakClient.executeAsync(fakeStore);
} else if (!strongConsistency) {
// The next operation is useless when using strong consistency model, so it's ok to perform it only when using
// eventual consistency.
object.getIndexes().getIndex(LongIntIndex.named("key_int")).add(getKeyAsLong(key));
}
// Store proper values into the object.
object.setValue(BinaryValue.create(serializeTable(values))); object.setValue(BinaryValue.create(serializeTable(values)));
object.getIndexes().getIndex(LongIntIndex.named("key_int")).add(getKeyAsLong(key));
StoreValue store = new StoreValue.Builder(object) StoreValue store = new StoreValue.Builder(object)
.withOption(StoreValue.Option.W, wvalue)
.withLocation(location) .withLocation(location)
.withOption(Option.W, wvalue)
.build(); .build();
RiakFuture<StoreValue.Response, Location> future = riakClient.executeAsync(store); RiakFuture<StoreValue.Response, Location> future = riakClient.executeAsync(store);
...@@ -416,15 +449,13 @@ public final class RiakKVClient extends DB { ...@@ -416,15 +449,13 @@ public final class RiakKVClient extends DB {
future.get(transactionTimeLimit, TimeUnit.SECONDS); future.get(transactionTimeLimit, TimeUnit.SECONDS);
} catch (TimeoutException e) { } catch (TimeoutException e) {
if (debug) { if (debug) {
System.err.println("Unable to " + (Thread.currentThread().getStackTrace()[2] System.err.println("Unable to insert key " + key + ". Reason: TIME OUT");
.getMethodName().equals("update") ? "update" : "insert") + " key " + key + ". Reason: TIME OUT");
} }
return TIME_OUT; return TIME_OUT;
} catch (Exception e) { } catch (Exception e) {
if (debug) { if (debug) {
System.err.println("Unable to " + (Thread.currentThread().getStackTrace()[2] System.err.println("Unable to insert key " + key + ". Reason: " + e.toString());
.getMethodName().equals("update") ? "update" : "insert") + " key " + key + ". Reason: " + e.toString());
} }
return Status.ERROR; return Status.ERROR;
...@@ -462,18 +493,15 @@ public final class RiakKVClient extends DB { ...@@ -462,18 +493,15 @@ public final class RiakKVClient extends DB {
*/ */
@Override @Override
public Status update(String table, String key, HashMap<String, ByteIterator> values) { public Status update(String table, String key, HashMap<String, ByteIterator> values) {
// If eventual consistency model is in use, then an update operation is pratically equivalent to an insert one.
if (!strongConsistency) { if (!strongConsistency) {
return insert(table, key, values); return insert(table, key, values);
} }
Location location = new Location(new Namespace(bucketType, table), key); Location location = new Location(new Namespace(bucketType, table), key);
RiakObject object = new RiakObject();
object.setValue(BinaryValue.create(serializeTable(values)));
object.getIndexes().getIndex(LongIntIndex.named("key_int")).add(getKeyAsLong(key));
UpdateValue update = new UpdateValue.Builder(location) UpdateValue update = new UpdateValue.Builder(location)
.withUpdate(new UpdateEntity(object)) .withUpdate(new UpdateEntity(new RiakObject().setValue(BinaryValue.create(serializeTable(values)))))
.build(); .build();
RiakFuture<UpdateValue.Response, Location> future = riakClient.executeAsync(update); RiakFuture<UpdateValue.Response, Location> future = riakClient.executeAsync(update);
...@@ -503,7 +531,6 @@ public final class RiakKVClient extends DB { ...@@ -503,7 +531,6 @@ public final class RiakKVClient extends DB {
return Status.OK; return Status.OK;
} }
/** /**
* Delete a record from the database. * Delete a record from the database.
* *
...@@ -548,14 +575,15 @@ public final class RiakKVClient extends DB { ...@@ -548,14 +575,15 @@ public final class RiakKVClient extends DB {
/** /**
* Auxiliary function needed for testing. It configures the default bucket-type to take care of the consistency * Auxiliary function needed for testing. It configures the default bucket-type to take care of the consistency
* problem by disallowing the siblings creation. Moreover, it disables strong consistency, as the scan transaction * problem by disallowing the siblings creation. Moreover, it disables strong consistency, because we don't have
* test would otherwise fail. * the possibility to create a proper bucket-type to use to fake 2i indexes usage.
* *
* @param bucket The bucket name. * @param bucket The bucket name.
* @throws Exception Thrown if something bad happens. * @throws Exception Thrown if something bad happens.
*/ */
void setTestEnvironment(String bucket) throws Exception { void setTestEnvironment(String bucket) throws Exception {
bucketType = "default"; bucketType = "default";
bucketType2i = bucketType;
strongConsistency = false; strongConsistency = false;
Namespace ns = new Namespace(bucketType, bucket); Namespace ns = new Namespace(bucketType, bucket);
......
...@@ -19,9 +19,11 @@ ...@@ -19,9 +19,11 @@
package com.yahoo.ycsb.db.riak; package com.yahoo.ycsb.db.riak;
import java.io.*; import java.io.*;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import com.basho.riak.client.api.commands.kv.FetchValue;
import com.yahoo.ycsb.ByteArrayByteIterator; import com.yahoo.ycsb.ByteArrayByteIterator;
import com.yahoo.ycsb.ByteIterator; import com.yahoo.ycsb.ByteIterator;
...@@ -101,12 +103,12 @@ final class RiakUtils { ...@@ -101,12 +103,12 @@ final class RiakUtils {
} }
/** /**
* Deserializes an input byte array, transforming it into a list of (String, ByteIterator) couples (i.e. a Map). * Deserializes an input byte array, transforming it into a list of (String, ByteIterator) pairs (i.e. a Map).
* *
* @param aValue A byte array containing the table to deserialize. * @param aValue A byte array containing the table to deserialize.
* @param theResult A Map containing the deserialized table. * @param theResult A Map containing the deserialized table.
*/ */
static void deserializeTable(final byte[] aValue, final Map<String, ByteIterator> theResult) { private static void deserializeTable(final byte[] aValue, final Map<String, ByteIterator> theResult) {
final ByteArrayInputStream anInputStream = new ByteArrayInputStream(aValue); final ByteArrayInputStream anInputStream = new ByteArrayInputStream(aValue);
byte[] aSizeBuffer = new byte[4]; byte[] aSizeBuffer = new byte[4];
...@@ -144,4 +146,43 @@ final class RiakUtils { ...@@ -144,4 +146,43 @@ final class RiakUtils {
return Long.parseLong(keyString); return Long.parseLong(keyString);
} }
/**
* Function that retrieves all the fields searched within a read or scan operation and puts them in the result
* HashMap.
*
* @param fields The list of fields to read, or null for all of them.
* @param response A Vector of HashMaps, where each HashMap is a set field/value pairs for one record.
* @param resultHashMap The HashMap to return as result.
*/
static void createResultHashMap(Set<String> fields, FetchValue.Response response,
HashMap<String, ByteIterator>resultHashMap) {
// If everything went fine, then a result must be given. Such an object is a hash table containing the (field,
// value) pairs based on the requested fields. Note that in a read operation, ONLY ONE OBJECT IS RETRIEVED!
// The following line retrieves the previously serialized table which was store with an insert transaction.
byte[] responseFieldsAndValues = response.getValues().get(0).getValue().getValue();
// Deserialize the stored response table.
HashMap<String, ByteIterator> deserializedTable = new HashMap<>();
deserializeTable(responseFieldsAndValues, deserializedTable);
// If only specific fields are requested, then only these should be put in the result object!
if (fields != null) {
// Populate the HashMap to provide as result.
for (Object field : fields.toArray()) {
// Comparison between a requested field and the ones retrieved. If they're equal (i.e. the get() operation
// DOES NOT return a null value), then proceed to store the pair in the resultHashMap.
ByteIterator value = deserializedTable.get(field);
if (value != null) {
resultHashMap.put((String) field, value);
}
}
} else {
// If, instead, no field is specified, then all those retrieved must be provided as result.
for (String field : deserializedTable.keySet()) {
resultHashMap.put(field, deserializedTable.get(field));
}
}
}
} }
...@@ -52,6 +52,10 @@ riak.transaction_time_limit=10 ...@@ -52,6 +52,10 @@ riak.transaction_time_limit=10
# riak.strong_consistency - boolean, indicates whether to use strong consistency (true) or eventual consistency (false). # riak.strong_consistency - boolean, indicates whether to use strong consistency (true) or eventual consistency (false).
riak.strong_consistency=true riak.strong_consistency=true
# riak.strong_consistent_scans_bucket_type - string, indicates the bucket-type to use to allow scans transactions
# when using strong consistency mode. Example: fakeBucketType.
riak.strong_consistent_scans_bucket_type=
# riak.debug - boolean, enables debug mode. This displays all the properties (specified or defaults) when a benchmark # riak.debug - boolean, enables debug mode. This displays all the properties (specified or defaults) when a benchmark
# is started. # is started.
riak.debug=false riak.debug=false
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment